亚马逊AWS官方博客
基于开源工具构建 EMR 数据分析平台(三)使用 DolphinScheduler 进行 EMR 任务调度
![]() |
在我们构建的基于开源工具与 EMR 的数据分析平台中,Amazon EMR 是核心的计算引擎,提供了 Apache Spark 执行框架,底层使用 YARN 作为资源管理。通常数据驱动的公司,每天有大量的离线任务需要执行,这些任务需要在不同的时间、以不同的周期执行,任务之间有前后依赖关系,任务也由 sql、shell、java 等不同语言编写,任务分布在不同的部门,有不同的 owner,所以需要一个统一的管理、调度、编排工具,能够按部门、按项目管理任务,使用可视化的方式编写任务调度工作流,在本方案中我们引入了开源的任务调度系统 DolphinScheduler。
DolphinScheduler 介绍
Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。
Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系,并为应用程序提供数据和各种 OPS 编排中的关系。 解决数据研发 ETL 依赖错综复杂,无法监控任务健康状态的问题。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。
![]() |
特性
- 简单易用
- 可视化 DAG:用户友好的,通过拖拽定义工作流的,运行时控制工具
- 模块化操作:模块化有助于轻松定制和维护。
- 丰富的使用场景
- 支持多种任务类型:支持 Shell、MR、Spark、SQL 等 10 余种任务类型,支持跨语言,易于扩展
- 丰富的工作流操作:工作流程可以定时、暂停、恢复和停止,便于维护和控制全局和本地参数。
- High Reliability
- 高可靠性: 去中心化设计,确保稳定性。 原生 HA 任务队列支持,提供过载容错能力。 DolphinScheduler 能提供高度稳健的环境。
- High Scalability
- 高扩展性: 支持多租户和在线资源管理。支持每天 10 万个数据任务的稳定运行。
DolphinScheduler 安装配置
DolphinScheduler 提供了多种部署选项,包括单机部署、伪集群部署、集群部署和 K8S 方式部署。官方部署方式参考【 DolphinScheduler 部署指南】,您也可以在 AWS 部署无服务器版本,部署方式参考博客【AWS 部署无服务器 DolphinScheduler】。
DolphinScheduler 任务管理与调度
任务管理
在 DolphinScheduler 中,任务按照用户、项目、工作流来组织,首先可以配置多用户,每个用户创建、上下线、管理自己的任务;然后再按照项目组织工作流,工作流中编排多个任务。可以配置用户对项目、数据源等的管理权限。这样在一个公司内,可以很好的按照部门、项目组、用户管理任务。
![]() |
任务提交
DolphinScheduler 支持丰富的任务类型,支持基于 cron 表达式的定时调度和手动调度,命令类型支持:启动工作流、从当前节点开始执行,支持补历史数据。
其中针对 Spark 任务,有原生的 Spark 组件,但是使用该组件时需要 DolphinScheduler 节点上有 EMR 的环境,这样一旦 EMR 重建就需要重新配置环境,难以维护,而且无法在多个 EMR 集群上调度任务。为了减少 DolphinScheduler 和 EMR 的依赖和耦合,我们设计了不依赖与 EMR 环境的任务提交方式。
预先配置
在向 EMR 提交作业时,会使用 AWS SDK 进行作业提交,所以也需要对应的权限认证。通常步骤如下:
- 创建一个 IAM Policy,指定 EMR Step 相关权限
- 创建一个 IAM User,将步骤 1 里的 Policy 绑定到此 IAM User
- 获取此 IAM User 的 Access Key 与 Secret Key
一个参考的 IAM Policy 可以是:
如果你需要更严格的权限控制,可以将资源 ARN 限制为特定区域或特定集群 ID,例如:
在获取了 AWS Access Key 和 Secret Key 后,编辑 worker server 的 common.properties 配置文件(路径为:worker-server/conf/common.properties),填写以下配置后保存即可:
resource.aws.access.key.id=xxxxx
resource.aws.secret.access.key=xxxxx
resource.aws.region=xxxx
SparkJar 任务
首先在项目管理中选择一个项目,进入工作流定义界面,创建一个新工作流,拖拽“云”分组下的 EMR 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,程序类型选择 ADD_JOB_FLOW_STEPS,stepsDefineJson 按照下面 Json 格式填写,其中 JobFlowId 中填写 EMR Cluster ID,前置任务选择依赖的前置任务。
该组件的功能是通过 EMR SDK 向 EMR 集群增加一个 Step 任务,是 EMR 的原生任务提交方式,由于在 Json 中指定了集群名称,并且可以采用参数的方式提交,可以灵活定义和修改集群。
![]() |
完整的 stepsDefineJson 格式:
SparkSql 任务
参考【基于开源工具构建 EMR 数据分析平台(四)使用 Kyuubi 进行 Spark Sql 任务提交】
EMR Serverless 任务
在工作流创建页面,拖拽通用组件下的 SHELL 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,在脚本部分填写以下 shell 脚本提交任务到 EMR Serverless。
![]() |
脚本模板:
关于用 shell 提交 EMR Serverless 任务和 emr-serverless-utils-1.0.jar 脚本,请参考博客【如何在 Amazon EMR Serverless 上执行纯 SQL 文件】,以上是提交 Spark Sql 任务的脚本,如果是 Spark Jar 任务,直接将任务 jar 替换 emr-serverless-utils-1.0.jar,其他参数根据实际情况修改即可。
通过参数优化任务配置
在 DolphinScheduler 中,有全局参数、项目参数、本地参数等各种参数可以定义,定义的参数可以在任务配置、脚本、数据源定义中使用,通过参数定义的方式,可以在一个地方定义,多处使用,这样减少配置复杂度、增加灵活性。比如,在 Kyuubi 数据源配置中使用参数,可以统一数据源定义、还能根据任务灵活调整资源用量,在 SparkJar 任务配置中灵活定义目标集群。配置样例如下:
![]() |
DolphinScheduler 定制化开发
虽然通过以上介绍,我们可以看到通过 DolphinScheduler 可以灵活、方便、高效地管理、调度各种 Spark 任务并提交到 EMR,但是在实际使用中,DolphinScheduler3.2 版本还存在以下几个问题:
- 当通过 EMR 原生方式提交任务后,在任务实例中看到的状态是 EMR Step 的状态,不能真正体现 Yarn 上任务状态;
- 无法真正停止任务,在任务实例列表中停止任务,实际上停止的是 EMR Step,而不是 Yarn 上的任务;
- DolphinScheduler 的参数,在 Kyuubi 数据源定义中和 EMR Step Json 中无法生效;
- DolphinScheduler 在 Sql 任务结束后没有关闭连接,Yarn 上还一直保持运行状态,资源不释放;
对此,我们修改了部分 DolphinScheduler 源码,解决了上述问题,核心代码如下:
- 从 Yarn rest API 读取任务的真实状态,解决读 step 状态不准确的问题
- 当 kill 任务时,直接通过 Yarn rest API 进行操作,防止 kill step 之后任务在 Yarn 上仍在运行
- 在 Sql 任务重,解析参数,解决参数无法生效的问题
总结
本文介绍了如何在基于开源工具构建的 EMR 数据分析平台中,通过引入 Apache DolphinScheduler,实现对 EMR 任务的统一管理和调度。DolphinScheduler 不仅提供了可视化的任务编排界面,还支持多种任务提交方式,能够满足不同场景下的 EMR 任务调度需求。在实际应用中,我们采用了三种主要的任务提交方式:
- 通过 EMR SDK 提交 SparkJar 任务
- 使用 Kyuubi 提交 SparkSQL 任务
- 采用 Shell 脚本方式提交 EMR Serverless 任务
通过参数化配置,我们提高了任务管理的灵活性和复用性。虽然在使用过程中发现了一些问题,如任务状态显示、任务停止控制等,但通过源码级别的定制开发,这些问题都得到了解决。
DolphinScheduler 的引入,使我们能够更好地管理复杂的数据处理工作流,提高了任务调度的可靠性和效率,为构建完整的 EMR 数据分析平台提供了重要支撑。