亚马逊AWS官方博客

使用 Amazon SageMaker 托管 Spark 容器与 Amazon SageMaker SDK 按需运行无服务器Apache Spark数据处理作业

原文链接:

https://aws.amazon.com/cn/blogs/machine-learning/running-on-demand-serverless-apache-spark-data-processing-jobs-using-amazon-sagemaker-managed-spark-containers-and-the-amazon-sagemaker-sdk/

 

 

Apache Spark是一套用于大规模分布式数据处理的统一分析引擎。通常,在AWS上使用Spark类工作负载的企业会使用其基于Amazon Elastic Compute Cloud (Amazon EC2)或者Amazon EMR的栈以运行并扩展Apache Spark、Hive、Presto以及其他多种大数据框架。这种方式适用于持久性工作负载,可帮助您轻松保持Spark集群以24/7方式全天候运行;此外,在理想状况下,您还可以借此指定单一架构,以便按计划或按需扩展或收缩集群规模。

Amazon SageMaker Processing使您可以在完全托管的基础设施之上轻松运行预处理、后处理、模型评估或其他各类通用性质的转换工作负载。以往,Amazon SageMaker Processing当中还包含一套用于Scikit-learn式预处理功能的内置容器。要使用Spark等其他库,您也可以灵活引入自己的Docker容器。Amazon SageMaker Processing作业可以作为机器学习Step Functions工作流的组成部分,其中涉及预处理步骤与后处理步骤。关于更多详细信息,请参阅AWS Step Functions为Amazon SageMaker Processing添加支持

目前,有多种机器学习(ML)工作流中涉及使用Spark(或者其他库)对数据进行预处理,而后将训练数据传递至训练步骤。以下工作流展示了其中的提取、转换与加载(ETL)步骤,这些步骤将具体推进模型训练,并最终使用AWS Step Functions进行模型终端节点部署。

在此类工作流中引入Spark步骤,往往需要额外的步骤以实现集群设置与配置。或者,您也可以使用AWS Glue(一项完全托管的ETL服务)帮助客户轻松编写基于Python或Scala的Spark脚本,借此对机器学习训练中使用的数据进行预处理。

现在,我们很高兴向Amazon SageMaker Processing中添加了托管Spark容器以及相关SDK增强功能。如今,您可以直接提交PySpark或Java/Scala Spark应用程序,即可在Spark上执行大规模分布式处理。您可以在Amazon SageMaker Studio以及Amazon SageMaker notebook实例中使用此项功能。

为了演示,以下代码示例将使用PySparkProcessor在Amazon SageMaker Processing上运行PySpark脚本:

 

from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
) 

spark_processor.run(
    submit_app_py="./path/to/your/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix],
    spark_event_logs_s3_uri='s3://' + bucket + '/' + prefix + '/spark_event_logs',
    logs=False
)

下面具体剖析示例代码。其中名为preprocess.py的PySpark脚本负责将一个大型CSV文件由Amazon Simple Storage Service (Amazon S3)中加载至Spark dataframe内,并将其拟合并转换为输出dataframe,而后再将结果转换为CSV文件、最终保存回Amazon S3:

import time
import sys
import os
import shutil
import csv

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *


def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    
    # 将命令行args转换为args映射
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))

    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    
    # 定义与输入数据相对应的模式。输入数据中不包含标头。
    schema = StructType([StructField("sex", StringType(), True), 
                         StructField("length", DoubleType(), True),
                         StructField("diameter", DoubleType(), True),
                         StructField("height", DoubleType(), True),
                         StructField("whole_weight", DoubleType(), True),
                         StructField("shucked_weight", DoubleType(), True),
                         StructField("viscera_weight", DoubleType(), True), 
                         StructField("shell_weight", DoubleType(), True), 
                         StructField("rings", DoubleType(), True)])

    # 从S3处将数据下载至数据框内。
    total_df = spark.read.csv(('s3://' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],'abalone.csv')), header=False, schema=schema)

    # 具有分类值的“sex”列上的 StringIndexer。
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    # 对字符串索引的“sex”列 (indexed_sex)执行独热编码。
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # 向量编译器将所有特征整合至一维向量内,以便我们轻松将其保存为CSV格式。
    assembler = VectorAssembler(inputCols=["sex_vec", 
                                           "length", 
                                           "diameter", 
                                           "height", 
                                           "whole_weight", 
                                           "shucked_weight", 
                                           "viscera_weight", 
                                           "shell_weight"], 
                                outputCol="features")
    
    # 管道由以上添加的步骤构成
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
    
    # 此步骤用于训练transformers特征。
    model = pipeline.fit(total_df)
    
    # 此步骤使用从先前拟合中获取的信息进行数据集转换。
    transformed_total_df = model.transform(total_df)
    
    # 以80:20的比例,将整体数据集拆分为训练数据集与验证数据集。
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
    
    # 将训练dataframe转换为RDD,保存为CSV格式并上传至S3。
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile('s3://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))
    
    # 将验证dataframe转换为RDD,以CSV格式保存并上传至S3。
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile('s3://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))


if __name__ == "__main__":
    main()

 

大家可以使用PySparkProcessor()类轻松启动基于Spark的处理作业,具体如下所示:

 

from sagemaker.spark.processing import PySparkProcessor

# 将原始输入数据集上传至S3。
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix_abalone = prefix + '/input/raw/abalone'
input_preprocessed_prefix_abalone = prefix + '/input/preprocessed/abalone'
model_prefix = prefix + '/model'

sagemaker_session.upload_data(path='./data/abalone.csv', bucket=bucket, key_prefix=input_prefix_abalone)

# 运行处理作业。
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app_py="./code/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    spark_event_logs_s3_uri='s3://' + bucket + '/' + prefix + '/spark_event_logs',
    logs=False
)

 

在Amazon SageMaker Studio或Amazon SageMaker notebook实例当中运行时,输出结果将显示作业的具体进度:

Job Name:  sm-spark-<...>
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://<bucketname>/<prefix>/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'S3Output': {'S3Uri': 's3://<bucketname>/<prefix>', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]

 

在Amazon SageMaker Studio中,您可以选中处理作业名称(右键单击),而后选择Open in trail details以描述处理作业并查看相关详情。

您还可以在Amazon SageMaker控制台上跟踪处理作业的设置、日志与指标,具体参见以下截屏。

在作业完成之后,如果run()函数中指定了spark_event_logs_s3_uri,则大家可以运行history server以查看Spark UI:

spark_processor.start_history_server()

如果在Amazon SageMaker notebook实例中运行,则输出结果将包含可访问history server的代理URL:

Starting history server...

History server is up on https://<your-notebook>.notebook.us-west-2.sagemaker.aws/proxy/15050

访问此URL,您将进入History Server Web界面,具体如以下截屏所示:

您还可以在Spark作业中指定其他python与jar文件依赖项。例如,如果要序列化MLeap模型,则可以修改指向PySparkProcessor中run()函数的调用以指定其他依赖项:

spark_processor.run(
    submit_app_py="./code/preprocess-mleap.py",
    submit_py_files=["./spark-mleap/mleap-0.15.0.zip"],
    submit_jars=["./spark-mleap/mleap-spark-assembly.jar"],
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    logs=False
)

最后,在对Spark应用程序进行调优或者配置Hive metastore时,我们还需要覆盖Spark配置。大家可以使用我们的Python SDK覆盖掉Spark、Hive以及Hadoop配置。

例如,以下代码将覆盖掉spark.executor.memory 与 spark.executor.cores

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.c5.xlarge",
    max_runtime_in_seconds=1200,
)

configuration = [{
  "Classification": "spark-defaults",
  "Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
}]

spark_processor.run(
    submit_app_py="./code/preprocess.py",
    arguments=['s3_input_bucket', bucket,
               's3_input_key_prefix', input_prefix_abalone,
               's3_output_bucket', bucket,
               's3_output_key_prefix', input_preprocessed_prefix_abalone],
    configuration=configuration,
    logs=False
)

 

要亲自尝试本示例,请导航至Amazon SageMaker notebook实例中的examples选项卡;或者克隆Amazon SageMaker examples目录,并导航至Amazon SageMaker Processing示例文件夹。

此外,您可以使用Amazon SageMaker以及其他AWS服务为用例设置一条端到端的Spark工作流:

总结

Amazon SageMaker广泛使用Docker容器,允许用户构建用于数据准备、训练及推理代码的运行时环境。Amazon SageMaker内置用于Amazon SageMaker Processing的Spark容器则提供一套托管Spark运行时,其中包含运行分布式数据处理工作负载所需要的各类库组件与依赖项。本文中的示例还展示了开发人员与数据科学家应如何使用Amazon SageMaker上的内置Spark容器摆脱Spark基础设施调优、扩展或者管理等繁琐工作,专注于数据准备及预处理等关键实施任务。

 

 

本篇作者

Shreyas Subramanian

AI/ML专家解决方案架构师。他致力于使用AWS平台上的机器学习技术帮助客户解决各类实际业务难题。

Andrew Packer

Amazon AI团队软件工程师。他热衷于为广大客户构建起可扩展的分布式机器学习基础设施。在业余时间,他喜欢演奏吉他和探索自然风光。

Vidhi Kastuar

Amazon SageMaker高级产品经理,致力于帮助用户及企业以更简单易用且更具可扩展性的方式使用机器学习及人工智能。在加入AWS之前,Vidhi曾在Veritas Technologies公司担任产品管理总监。在业余时间,她喜欢素描和绘画,为他人提供职业规划指导以及与亲朋好友共度时光。