デフォルトの Data Pipeline テンプレートを使用して CSV または TSV ファイルを DynamoDB にインポートするときに発生する「MalFormedJSON」エラーはどのように解決できますか?

最終更新日: 2020 年 8 月 27 日

デフォルトの AWS Data Pipeline テンプレートを使用して CSV または TSV ファイルを Amazon DynamoDB にインポートしようとすると、「MalformedJson」エラーが表示されます。

解決方法

注意: この解決方法は、Amazon EMR 4.7.0 以降のリリースバージョンで有効です。

Import DynamoDB backup data from S3 (S3 から DynamoDB バックアップデータをインポートする) テンプレートは、最初に Export DynamoDB table to S3 (S3 にDynamoDB テーブルをエクスポートする) テンプレートを使用してデータをエクスポートした場合にのみ機能します。Export DynamoDB table to S3 (S3 にDynamoDB テーブルをエクスポートする) テンプレートを使用しなかった場合は、HiveActivity オブジェクトと共に DynamoDBDataFormat オブジェクトを使用して新しいパイプラインを作成します。HiveActivity オブジェクトには以下のスクリプトを使用します。このスクリプトは、Hive クラスパスから互換性のない jar ファイルを削除します。

delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};

これは、Amazon Simple Storage Service (Amazon S3) から DynamoDB に CSV ファイルをエクスポートするパイプライン定義の例です。このパイプライン例は、パブリックサブネットよりもセキュアなプライベートサブネットで Amazon EMR リソースを起動します。詳細については、「プライベートサブネットで Amazon EMR クラスターを設定する」を参照してください。この例の CSV ファイルには、以下のデータが含まれています。

  • AnyCompany1,100
  • AnyCompany2,20
  • AnyCompany3,30

注意: DynamoDB テーブルはパイプラインの実行前から存在している必要があります。「values」セクションにリストされている変数の値を指定するようにしてください。詳細については、「パイプライン定義への myVariables の追加」を参照してください。

{
  "objects": [
    {
      "name": "DefaultEmrCluster1",
      "id": "EmrClusterId_kvKJa",
      "releaseLabel": "emr-5.23.0",
      "type": "EmrCluster",
      "subnetId": "#{mySubnetId}",
      "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "terminateAfter": "24 Hours"
    },
    {
      "dataFormat": {
        "ref": "DynamoDBDataFormatId_YMozb"
      },
      "name": "DefaultDataNode2",
      "id": "DataNodeId_WFWdO",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_ciZN3"
      },
      "name": "DefaultDataNode1",
      "id": "DataNodeId_OZ8Nz",
      "type": "S3DataNode"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDynamoDBDataFormat1",
      "id": "DynamoDBDataFormatId_YMozb",
      "type": "DynamoDBDataFormat"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_ciZN3",
      "type": "CSV"
    },
    {
      "output": {
        "ref": "DataNodeId_WFWdO"
      },
      "input": {
        "ref": "DataNodeId_OZ8Nz"
      },
      "stage": "true",
      "maximumRetries": "0",
      "name": "DefaultHiveActivity1",
      "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
      "id": "HiveActivityId_AwIZ9",
      "runsOn": {
        "ref": "EmrClusterId_kvKJa"
      },
      "type": "HiveActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Destination DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    }
  ],
  "values": {
    "myDDBTableName": "companyid",
    "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
    "mySubnetId": "subnet_id",
    "myCoreAndTaskSecurityGroup": "core and task security group", 
    "myMasterSecurityGroup": "master security group",
    "myServiceAccessSecurityGroup":  "service access security group"
  }
}

この記事はお役に立ちましたか?


請求に関するサポートまたは技術的なサポートが必要ですか?