腾讯云TI-ONE训练平台PySpark 组件说明_AI解决方案_同尘科技
PySpark 组件为使用 Python 的 Spark 用户提供服务,用户通过 Python 编写 Spark 应用程序,通过 PySpark 组件完成部署。更多详细介绍可参考 Spark 官方 Python API 文档 。
PySpark 包含标准 Spark 的功能,同时支持上传 Python 脚本、实时修改脚本和 SQL 功能,更加灵活,推荐您使用 PySpark 进行数据预处理。
操作步骤
1. 添加组件从左侧菜单栏中,选择组件算子 > 机器学习列表下的 PySpark 节点,并将其拖拽至画布中。2. 算法 IO 参数高级设置内,主要包含输入数据和输出数据的设置,您可以通过自定义路径设定或通过拖拽数据源、输出算子连接到组件算子上进行设置。输入数据:平台会默认把容器本地路径的数据上传到 COS 源路径下,您可以打开自定义路径开关修改本地路径;COS 场景下支持 COSN 直连方式,输入数据 0 对应的环境变量是 P_INPUT0 ,例如:P_INPUT0=cosn://${cos_bucket}/${cos_path}输出数据:COS 场景下请使用 COSN 直连方式,输出数据 0 对应的环境变量是 P_OUTPUT0 ,例如:P_OUTPUT0=cosn://${cos_bucket}/${project_path}/${flow_id}/${node_run_id}/output0
3. 算法参数*
代码包:从指定对象存储 COS 存储桶中选择文件夹。*
启动命令:算子执行时启动的 python 脚本。*
调优参数:填写的超参数 JSON 会保存为 /opt/ml/input/config/hyperparameters.json 文件,您的代码需自行解析。4. 资源参数*
框架版本:使用的 Spark 框架版本。*
训练模式:默认为 SPARK 。*
计费模式:有以下两种选择:按量计费:*
Driver 节点算力规格。*
Executor 节点算力规格。*
Executor 节点数量。包年包月:*
资源组:选择您拥有的资源组。*
资源申请:*
Driver-cores :Driver 节点 CPU 核数。*
Driver-memory :Driver 节点内存大小。*
Executor-cores :Excutor 节点 CPU 核数。*
Executor-memory :Excutor 节点内存大小。*
Executors:Executor 节点数量。5. 运行单击保存并运行工作流。6. 查看 PySpark 日志在 PySpark 节点上单击右键菜单,可查看详细日志。
Demo
下面演示如何通过 PySpark 组件算子运行 Spark 官方样例计算圆周率。1. 准备好代码及数据包,您可以点击 链接 下载。解压后将文件夹上传到您的 COS 存储桶中,使其可以在组件算子中被导入。2. 代码的核心内容如下,主要逻辑就是通过 COSN 直连的方式从 COS 存储桶加载数据需要聚类的数据,通过 Spark 提供的 KMeans 算法,求出聚类结果后再将结果通过 COSN 直连的方式写回 COS 存储桶:
if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("KMeansExample") \ .getOrCreate()
# Loads data via cosn. dataset = spark.read.format("libsvm").load(os.environ["P_INPUT0"] + "/sample_kmeans_data.txt")
# Trains a k-means model. kmeans = KMeans().setK(2).setSeed(1) model = kmeans.fit(dataset)
# Make predictions predictions = model.transform(dataset)
# Write the label and prediction back via cosn. predictions.drop("features").write.option("header", "true").csv(os.environ["P_OUTPUT0"])
spark.stop()
可以看到,上述代码中在对数据的读取和写入,填写的路径都使用到了环境变量,以读取 COSN 路径,分别是 P_INPUT0 和 P_OUTPUT0 。而数据的构成则比较简单,为 6 个数据点,其中前 3 个点和后 3 个点为显著的两部分,因此在代码中 KMeans 的 K 值也是设置为了 2 :
0 1:0.1 2:0.1 3:0.11 1:0.2 2:0.2 3:0.22 1:0.3 2:0.3 3:0.33 1:9.0 2:9.0 3:9.04 1:9.1 2:9.1 3:9.15 1:9.2 2:9.2 3:9.2
3. 从控制台中拽拖出数据源 > 数据输入 > COS 算子以及组件算子 > 机器学习 > PySpark 算子,并进行连接如下:
4. 进行参数配置,首先配置 COS 算子的 cos 数据路径,由于输入数据已经存放在代码包中,直接选择您上传代码包的所在位置。即为:存储桶列表 / 存储桶名 / … / pyspark :
然后是 PySpark 算子的代码包路径,同上。而启动命令则填写 kmeans_example.py :
为了在 COS 存储桶中获得输出数据,还需要在 PySpark 算子的高级设置中,对输出数据 0 设置自定义路径,数据源类型为 COS 。由于此处只做展示,目标路径仍选择代码包路径,您在使用时可选取希望数据输出到的路径:
资源参数则如下:*
框架版本:spark2.4.5-py3.7-cpu*
训练模式:SPARK*
计费模式:按量付费*
Driver 节点算力规格:2C4G*
Executor 节点算力规格:2C4G*
Executor 节点数量:15. 运行工作流,等待运行结束后,查看 COS 存储桶中输出数据对应到的路径,可以看到已经多出一个文件夹(以下界面为 COSBrowser 的界面):
打开这个文件夹,并打开里面的 output 文件夹,可以看到有两个文件,分别为 _SUCCESS 以及一个以 part 开头的文件。打开后者,可以看到结果为:
label,prediction0.0,01.0,02.0,03.0,14.0,15.0,1
您还可以通过在平台中右键点击组件算子,选择查看数据中的查看中间结果 0 ,可以看到:
前三个点被预测为 0 类,后三个点被预测为 1 类。符合对结果的预测。
使用建议
使用 PySpark 的目的是更好地借助其分布式计算的优势,以解决单机完成不了的计算。如果您在 PySpark 中仍然是调用常规的 Python 库做单机计算,那就失去了使用 PySpark 的意义。下面举例说明如何编写 PySpark 分布式计算代码。
使用 Spark 的 DataFrame,而不要使用 Pandas 的 DataFrame
PySpark 本身就具有类似 pandas.DataFrame 的 DataFrame,所以直接使用 PySpark 的 DataFrame 即可,基于 PySpark的DataFrame 的操作都是分布式执行的,而 pandas.DataFrame 是单机执行的,例如:
...df = spark.read.json("examples/src/main/resources/people.json")df.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+ pandas_df = df.toPandas() # 将 PySpark 的 DataFrame 转换成 pandas.DataFrame,并获取'age'列age = pandas_df['age']...
df.toPandas() 操作会将分布在各节点的数据全部收集到 Driver上,再转成单机的 pandas.DataFrame 数据结构,适用于数据量很小的场景,如果数据量较大时,则此方法不可取。
PySpark的DataFrame 本身支持很多操作,直接基于它实现后续的业务逻辑即可,例如上述代码可以改成age = df.select('age')
。
在 Task 里使用 Python 库,而不是在 Driver上 使用 Python 库
下面有段代码,将数据全部 collect 到 Driver 端,然后使用 sklearn 进行预处理。
from sklearn import preprocessingdata = np.array(rdd.collect(), dtype=np.float)normalized = preprocessing.normalize(data)
上述代码实际上已退化为单机程序,如果数据量较大的话,collect 操作会把 Driver 的内存填满,甚至 OOM(超出内存),通常基于 RDD 或 DataFrame 的 API 可以满足大多数需求,例如标准化操作:
from pyspark.ml.feature import Normalizer df = spark.read.format("libsvm").load(path) # Normalize each Vector using $L^1$ norm.normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)l1NormData = normalizer.transform(dataFrame)
如果 RDD 或 DataFrame 没有满足您要求的 API,您也可以自行写一个处理函数,针对每条记录进行处理:
# record -> other recorddef process_fn(record): # your process logic # for example # import numpy as np # x = np.array(record, type=np.int32) # ... # record -> True or Flasedef judge_fn(record): # return True or Flase processed = rdd.map(process_fn).map(lambda x: x[1:3])filtered = processed.filter(judge_fn)
process_fn 或 judge_fn 会分发到每个节点上分布式执行,您可以在 process_fn 或 judge_fn 中使用任何 Python 库(如 numpy、scikit-learn 等)。
对解决方案有疑惑?想了解解决方案收费? 联系解决方案专家
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心,购买腾讯云享受折上折,更有现金返利:同意关联,立享优惠
阿里云解决方案也看看?: 点击对比阿里云的解决方案
暂无评论,你要说点什么吗?