How can I resolve the "MalformedJson" error when importing CSV or TSV files into DynamoDB using the default Data Pipeline template?

Last updated: 2020-08-27

When I try to import CSV or TSV files into Amazon DynamoDB using the default AWS Data Pipeline template, I get a "MalformedJson" error.

Resolution

Note: This resolution is valid for Amazon EMR 4.7.0 and later release versions.

The Import DynamoDB backup data from S3 template works only if you first export the data using the Export DynamoDB table to S3 template. If you didn't use the Export DynamoDB table to S3 template, then create a new pipeline using a DynamoDBDataFormat object with a HiveActivity object. Use the following script for the HiveActivity object. This script removes incompatible jars from the Hive classpath.

delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};

Here's an example of a pipeline definition that exports CSV files from Amazon Simple Storage Service (Amazon S3) to DynamoDB. The example pipeline launches the Amazon EMR resources into a private subnet, which is more secure than a public subnet. For more information, see Configure an Amazon EMR cluster in a private subnet. The CSV files in this example contain the following data:

  • AnyCompany1,100
  • AnyCompany2,20
  • AnyCompany3,30

Note: The DynamoDB table must exist before you run the pipeline. Be sure to specify your values for the variables listed in the "values" section. For more information, see Add myVariables to the pipeline definition.

{
  "objects": [
    {
      "name": "DefaultEmrCluster1",
      "id": "EmrClusterId_kvKJa",
      "releaseLabel": "emr-5.23.0",
      "type": "EmrCluster",
      "subnetId": "#{mySubnetId}",
      "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "terminateAfter": "24 Hours"
    },
    {
      "dataFormat": {
        "ref": "DynamoDBDataFormatId_YMozb"
      },
      "name": "DefaultDataNode2",
      "id": "DataNodeId_WFWdO",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_ciZN3"
      },
      "name": "DefaultDataNode1",
      "id": "DataNodeId_OZ8Nz",
      "type": "S3DataNode"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDynamoDBDataFormat1",
      "id": "DynamoDBDataFormatId_YMozb",
      "type": "DynamoDBDataFormat"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_ciZN3",
      "type": "CSV"
    },
    {
      "output": {
        "ref": "DataNodeId_WFWdO"
      },
      "input": {
        "ref": "DataNodeId_OZ8Nz"
      },
      "stage": "true",
      "maximumRetries": "0",
      "name": "DefaultHiveActivity1",
      "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
      "id": "HiveActivityId_AwIZ9",
      "runsOn": {
        "ref": "EmrClusterId_kvKJa"
      },
      "type": "HiveActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Destination DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    }
  ],
  "values": {
    "myDDBTableName": "companyid",
    "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
    "mySubnetId": "subnet_id",
    "myCoreAndTaskSecurityGroup": "core and task security group", 
    "myMasterSecurityGroup": "master security group",
    "myServiceAccessSecurityGroup":  "service access security group"
  }
}

Did this article help?


Do you need billing or technical support?