如何解决使用默认 Data Pipeline 模板将 CSV 或 TSV 文件导入 DynamoDB 时出现的“MalformedJson”错误?

上次更新时间:2020 年 8 月 27 日

当我尝试使用默认 AWS Data Pipeline 模板将 CSV 或 TSV 文件导入 Amazon DynamoDB 时,出现“MalformedJson”错误。

解决方法

注意:此解决方法适用于 Amazon EMR 4.7.0 及更高版本。

仅当首先使用将 DynamoDB 表导出到 S3 模板导出数据时,才能使用从 S3 导入 DynamoDB 备份数据模板。如果未使用将 DynamoDB 表导出到 S3 模板,则使用包含 HiveActivity 对象的 DynamoDBDataFormat 对象创建新管道。为 HiveActivity 对象使用以下脚本。此脚本将从 Hive 类路径中删除不兼容的 jar。

delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};

以下是将 CSV 文件从 Amazon Simple Storage Service (Amazon S3) 导出到 DynamoDB 的管道定义示例。此示例管道将在私有子网中启动 Amazon EMR 资源,该子网比公有子网更安全。有关更多信息,请参阅在私有子网中配置 Amazon EMR 集群。此示例中的 CSV 文件包含以下数据:

  • AnyCompany1,100
  • AnyCompany2,20
  • AnyCompany3,30

注意:在运行管道之前,DynamoDB 表必须已存在。请务必为“值”部分中列出的变量指定值。有关更多信息,请参阅将 myVariable 添加到管道定义

{
  "objects": [
    {
      "name": "DefaultEmrCluster1",
      "id": "EmrClusterId_kvKJa",
      "releaseLabel": "emr-5.23.0",
      "type": "EmrCluster",
      "subnetId": "#{mySubnetId}",
      "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "terminateAfter": "24 Hours"
    },
    {
      "dataFormat": {
        "ref": "DynamoDBDataFormatId_YMozb"
      },
      "name": "DefaultDataNode2",
      "id": "DataNodeId_WFWdO",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_ciZN3"
      },
      "name": "DefaultDataNode1",
      "id": "DataNodeId_OZ8Nz",
      "type": "S3DataNode"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDynamoDBDataFormat1",
      "id": "DynamoDBDataFormatId_YMozb",
      "type": "DynamoDBDataFormat"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_ciZN3",
      "type": "CSV"
    },
    {
      "output": {
        "ref": "DataNodeId_WFWdO"
      },
      "input": {
        "ref": "DataNodeId_OZ8Nz"
      },
      "stage": "true",
      "maximumRetries": "0",
      "name": "DefaultHiveActivity1",
      "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
      "id": "HiveActivityId_AwIZ9",
      "runsOn": {
        "ref": "EmrClusterId_kvKJa"
      },
      "type": "HiveActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Destination DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    }
  ],
  "values": {
    "myDDBTableName": "companyid",
    "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
    "mySubnetId": "subnet_id",
    "myCoreAndTaskSecurityGroup": "core and task security group", 
    "myMasterSecurityGroup": "master security group",
    "myServiceAccessSecurityGroup":  "service access security group"
  }
}

这篇文章对您有帮助吗?


您是否需要账单或技术支持?