亚马逊AWS官方博客

AWS DMS在数据迁移任务可用性及对源数据归档支持的实践

背景

AWS Database Migration Service (AWS DMS) 是一项 Web 服务,您可以使用此服务将本地部署的数据库、位于 Amazon Relational Database Service (Amazon RDS) 数据库实例上的数据库或位于 Amazon Elastic Compute Cloud (Amazon EC2) 实例上的数据库中的数据迁移到 AWS 服务上的数据库。这些服务可以包括 Amazon RDS 上的数据库或 Amazon EC2 实例上的数据库。您还可以将数据库从 AWS 服务迁移到本地部署数据库。您可在使用相同数据库引擎的源和目标终端节点之间迁移,例如从 Oracle 数据库到 Oracle 数据库。也可在使用不同数据库引擎的源和目标终端节点之间迁移,例如从 Oracle 数据库到 PostgreSQL 数据库。

本篇文章将介绍在使用AWS DMS进行数据迁移过程中的实践。通过对失败的AWS DMS任务进行邮件告警并无限次尝试自动恢复,提高任务可用性、降低数据延迟风险及人工修复成本。通过对持续迁移的MS SQL Server源表临时删除发布(或禁用CDC),实现用户对数据归档或截断的需求,降低源表的维护成本。

AWS DMS 任务失败自动尝试恢复

AWS DMS 任务失败的原因主要有如下:

1.源数据库类故障(包括数据库服务中断、网络通信故障)

2.数据类错误,如转换错误、截断等

对于因源数据库故障导致的AWS DMS任务失败,可以使用Amazon EventBridge接收AWS DMS任务事件(需升级AWS DMS复制实例到3.4.6或更高版本),并与预先定义的Event Pattern进行匹配。如果匹配,使用Amazon Simple Notification Service(Amazon SNS)提供通知服务(比如邮件告警),同时发送事件到AWS Step Functions状态机,自动尝试对AWS DMS任务恢复。

下面架构图展示了方案架构设计:

  1. Amazon EventBridge从AWS DMS接收事件(任务或复制实例)
  2. 对于匹配预先定义的Event Pattern的事件,使用Amazon SNS通知服务以邮件进行告警
  3. 对于匹配预先定义的Event Pattern的事件,使用AWS Step Functions状态机尝试恢复任务运行

下面将详细介绍EventBridge规则配置、状态机流程和代码示例。

Amazon EventBridge Event Pattern定义如下:

{
  "source": ["aws.dms"],
  "resources": ["your-task-arn"],
  "detail": {
    "type": ["REPLICATION_TASK"],
    "category": ["Failure", "StateChange"],
    "eventType": ["REPLICATION_TASK_FAILED", "REPLICATION_TASK_STOPPED"]
  }
}

AWS Step Functions状态机图如下:

状态机定义如下:

{
  "Comment": "A description of my state machine",
  "StartAt": "检查DMS任务状态",
  "States": {
    "检查DMS任务状态": {
      "Type": "Task",
      "Next": "任务状态判断和处理",
      "Parameters": {
        "Filters": [
          {
            "Name": "replication-task-arn",
            "Values.$": "$.replication-task-arn"
          }
        ]
      },
      "Resource": "arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks",
      "ResultSelector": {
        "ReplicationTasks.$": "$.ReplicationTasks"
      },
      "ResultPath": "$"
    },
    "任务状态判断和处理": {
      "Type": "Map",
      "ItemsPath": "$.ReplicationTasks",
      "Iterator": {
        "StartAt": "任务是否正在运行",
        "States": {
          "任务是否正在运行": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.Status",
                "StringEquals": "stopped",
                "Next": "开始启动任务",
                "Comment": "任务处于停止状态"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "failed",
                "Next": "开始启动任务",
                "Comment": "任务处于失败状态"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "running",
                "Next": "Success",
                "Comment": "任务运行中"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "starting",
                "Next": "任务启动中,等待30s",
                "Comment": "任务正在启动"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "stopping",
                "Next": "任务正在停止,等待30s",
                "Comment": "任务正在停止"
              }
            ]
          },
          "任务正在停止,等待30s": {
            "Type": "Wait",
            "Seconds": 30,
            "Next": "完善任务arn"
          },
          "任务启动中,等待30s": {
            "Type": "Wait",
            "Seconds": 30,
            "Next": "完善任务arn"
          },
          "任务正在停止,再次等待30s": {
            "Type": "Wait",
            "Seconds": 30,
            "Next": "完善任务arn"
          },
          "开始启动任务": {
            "Type": "Task",
            "Parameters": {
              "ReplicationTaskArn.$": "$.ReplicationTaskArn",
              "StartReplicationTaskType": "resume-processing"
            },
            "Resource": "arn:aws-cn:states:::aws-sdk:databasemigration:startReplicationTask",
            "Next": "任务启动中,等待30s",
            "ResultSelector": {
              "ReplicationTaskArn.$": "$.ReplicationTask.ReplicationTaskArn",
              "Status.$": "$.ReplicationTask.Status"
            }
          },
          "完善任务arn": {
            "Type": "Task",
            "Resource": "arn:aws-cn:states:::lambda:invoke",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "your-lambda-arn"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "Next": "检查任务状态",
            "ResultSelector": {
              "ReplicationTaskArn.$": "$.Payload.ReplicationTaskArn",
              "Status.$": "$.Payload.Status"
            }
          },
          "检查任务状态": {
            "Type": "Task",
            "Next": "任务状态",
            "Parameters": {
              "Filters": [
                {
                  "Name": "replication-task-arn",
                  "Values.$": "$.ReplicationTaskArn"
                }
              ]
            },
            "Resource": "arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks",
            "ResultSelector": {
              "Status.$": "$.ReplicationTasks[0].Status",
              "ReplicationTaskArn.$": "$.ReplicationTasks[0].ReplicationTaskArn"
            }
          },
          "任务状态": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.Status",
                "StringEquals": "running",
                "Next": "Success",
                "Comment": "正在运行"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "starting",
                "Next": "任务启动中,等待30s",
                "Comment": "任务启动中"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "stopped",
                "Next": "开始启动任务",
                "Comment": "任务处于停止状态"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "failed",
                "Next": "开始启动任务",
                "Comment": "任务处于失败状态"
              },
              {
                "Variable": "$.Status",
                "StringEquals": "stopping",
                "Next": "任务正在停止,再次等待30s",
                "Comment": "任务正在停止"
              }
            ],
            "Default": "Success"
          },
          "Success": {
            "Type": "Succeed"
          }
        }
      },
      "End": true
    }
  }

完善任务arn使用AWS Lambda函数将AWS DMS任务的arn转化为数组,示例代码如下:

import json
def lambda_handler(event, context):
    ReplicationTaskArn = []
    ReplicationTaskArn.append(event["ReplicationTaskArn"])
    return {
        "Status": event["Status"],
        "ReplicationTaskArn": ReplicationTaskArn,
        'statusCode': 200,
        'body': json.dumps('Here is to convert the resource arn to array!')
    }

AWS DMS 任务对源数据归档的支持

用户在使用AWS DMS对源 MS SQL Server数据库表数据进行持续迁移到AWS服务上的数据库(包括 Amazon RDS 上的数据库或 Amazon EC2 实例上的数据库)的过程中,对于有大量数据的源表(比如机器设备生成的数据、日志数据等,每日增量数千万条,大小数十个GB左右),因源MS SQL Server数据库存储容量限制或内部策略,需要每日对前一日表数据进行归档或截断。

对于处于持续迁移中的MS SQL Server源表,在进行归档(重命名、交换分区)或截断操作时存在一些限制。

下面是两种解决方案介绍,操作期间无需对AWS DMS 任务进行任何修改:

  1. 使用insert into…select…配合delete方案实现对MS SQL Server源表数据归档。该方案中因delete操作是按行加锁并执行删除,性能较低,且delete操作产生的事务日志也会被应用到迁移目的地AWS服务上的数据库中。
  2. 在MS SQL Server源表上临时删除发布或禁用CDC(基于不同源表类型选择),对源表交换分区或截断,最后重新添加发布或启用CDC。该方案直接对源表元数据操作,提高了数据归档或截断性能,对迁移目的地AWS服务上的数据库数据无任何影响。通过将所有操作置于事务中,避免操作期间新增数据无法被迁移。

下面是对方案2的详细介绍:

对于有主键的源表,使用MS-Replication进行数据持续迁移,可按如下步骤进行数据分区切换或截断操作,将相关操作置于事务中。如下示例脚本:

1.显示开启事务

BEGIN TRAN

2.删除article

exec sp_droparticle @publication = N'pub-testrepl-repl', @article = N'TestTable', @force_invalidate_snapshot = 1

3.归档或截断数据表

ALTER TABLE dbo.TestTable SWITCH TO dbo.TestTable_BAK;

或者

如果历史数据不再需要,直接截断即可。

TRUNCATE TABLE dbo.TestTable

4.添加article

exec sp_addarticle @publication = N'pub-testrepl-repl', @article = N'TestTable', @source_owner = N'dbo', @source_object = N'TestTable', @type = N'logbased', @description = null, @creation_script = null, @pre_creation_cmd = N'drop', @schema_option = 0x000000000803509F, @identityrangemanagementoption = N'manual', @destination_table = N'TestTable', @destination_owner = N'dbo', @vertical_partition = N'false', @ins_cmd = N'CALL sp_MSins_dboTestTable', @del_cmd = N'CALL sp_MSdel_dboTestTable', @upd_cmd = N'SCALL sp_MSupd_dboTestTable'

5.提交事务

COMMIT TRAN

对于没有主键,有唯一索引的源表,可直接进行交换分区以将数据进行归档,但是不能直接进行截断。

对于没有主键,有唯一索引的源表,需要进行截断时,可先在表上禁用CDC,其次执行截断,最后在表上开启CDC,将相关操作置于事务中。如下示例脚本:

1.显示开启事务

BEGIN TRAN

2.禁用CDC

EXECUTE sys.sp_cdc_disable_table @source_schema = N'dbo',@source_name = N'TestTable',@capture_instance = N'dbo_TestTable';

3.截断数据表

TRUNCATE TABLE dbo.TestTable;

4.启用CDC

EXEC sys.sp_cdc_enable_table @source_schema = N'dbo',@source_name = N'TestTable',@index_name = N'AK_ID',@role_name = NULL,@supports_net_changes = 1;

5.提交事务

COMMIT TRAN

对于没有主键及唯一索引的源表,可直接进行交换分区以将数据进行归档,但是不能直接进行截断。

对于没有主键及唯一索引的源表,需要进行截断时,可先在表上禁用CDC,其次执行截断,最后在表上启用CDC,将相关操作置于事务中。如下示例脚本:

1.显示开启事务

BEGIN TRAN

2.禁用CDC

EXECUTE sys.sp_cdc_disable_table @source_schema = N'dbo',@source_name = N'TestTable',@capture_instance = N'dbo_TestTable';

3.截断数据表

TRUNCATE TABLE dbo.TestTable;

4.启用CDC

EXEC sys.sp_cdc_enable_table @source_schema = N'dbo',@source_name = N'TestTable',@role_name = NULL

5.提交事务

COMMIT TRAN

总结

本篇文章总结了在使用AWS DMS进行数据持续迁移过程中的实践。在提高任务可用性方面,对源数据库类故障导致的DMS任务失败,通过对AWS DMS失败任务邮件告警及无限次尝试自动恢复,降低数据迁移延迟风险和人工修复成本。对于数据类错误,除了配置错误处理任务设置外,仍需用户响应告警邮件并修复错误。同时,对于来自MS SQL Server的大表数据持续迁移,在数据归档或截断的需求方面,提出了解决方案。

本篇作者

Chris Fan

亚马逊云科技专业服务团队大数据顾问。他帮助客户设计和构建现代化数据平台解决方案,提供专家技术咨询和基于亚马逊云科技平台实施服务。在数据平台架构、处理、管道、建模等方面拥有丰富经验。