기본 Data Pipeline 템플릿을 사용하여 CSV 또는 TSV 파일을 DynamoDB로 가져올 때 발생하는 "MalformedJson" 오류를 해결하려면 어떻게 해야 합니까?

최종 업데이트 날짜: 2020년 8월 27일

기본 AWS Data Pipeline 템플릿을 사용하여 CSV 또는 TSV 파일을 Amazon DynamoDB로 가져오려고 하면 "MalformedJson" 오류가 발생합니다.

​해결 방법

참고: 이 해결 방법은 Amazon EMR 4.7.0 이상 릴리스 버전에 적용됩니다.

S3에서 DynamoDB 백업 데이터 가져오기 템플릿은 먼저 S3로 DynamoDB 테이블 내보내기 템플릿을 사용하여 데이터를 내보낸 경우에만 작동합니다. S3로 DynamoDB 테이블 내보내기 템플릿을 사용하지 않은 경우 HiveActivity 객체가 있는 DynamoDBDataFormat 객체를 사용하여 새 파이프라인을 생성합니다. HiveActivity 객체에 다음 스크립트를 사용합니다. 이 스크립트는 Hive 클래스 경로에서 호환되지 않는 jar를 제거합니다.

delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};

다음은 Amazon Simple Storage Service(Amazon S3)에서 DynamoDB로 CSV 파일을 내보내는 파이프라인 정의의 예입니다. 이 파이프라인 예는 Amazon EMR 리소스를 퍼블릭 서브넷보다 더 안전한 프라이빗 서브넷에서 시작합니다. 자세한 내용은 프라이빗 서브넷에서 Amazon EMR 클러스터 구성을 참조하십시오. 이 예의 CSV 파일에는 다음 데이터가 포함되어 있습니다.

  • AnyCompany1,100
  • AnyCompany2,20
  • AnyCompany3,30

참고: 파이프라인을 실행하기 전에 DynamoDB 테이블이 있어야 합니다. "values" 섹션에 나열된 변수의 값을 지정해야 합니다. 자세한 내용은 파이프라인 정의에 myVariables 추가를 참고하십시오.

{
  "objects": [
    {
      "name": "DefaultEmrCluster1",
      "id": "EmrClusterId_kvKJa",
      "releaseLabel": "emr-5.23.0",
      "type": "EmrCluster",
      "subnetId": "#{mySubnetId}",
      "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "terminateAfter": "24 Hours"
    },
    {
      "dataFormat": {
        "ref": "DynamoDBDataFormatId_YMozb"
      },
      "name": "DefaultDataNode2",
      "id": "DataNodeId_WFWdO",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_ciZN3"
      },
      "name": "DefaultDataNode1",
      "id": "DataNodeId_OZ8Nz",
      "type": "S3DataNode"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDynamoDBDataFormat1",
      "id": "DynamoDBDataFormatId_YMozb",
      "type": "DynamoDBDataFormat"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_ciZN3",
      "type": "CSV"
    },
    {
      "output": {
        "ref": "DataNodeId_WFWdO"
      },
      "input": {
        "ref": "DataNodeId_OZ8Nz"
      },
      "stage": "true",
      "maximumRetries": "0",
      "name": "DefaultHiveActivity1",
      "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
      "id": "HiveActivityId_AwIZ9",
      "runsOn": {
        "ref": "EmrClusterId_kvKJa"
      },
      "type": "HiveActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Destination DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    }
  ],
  "values": {
    "myDDBTableName": "companyid",
    "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
    "mySubnetId": "subnet_id",
    "myCoreAndTaskSecurityGroup": "core and task security group", 
    "myMasterSecurityGroup": "master security group",
    "myServiceAccessSecurityGroup":  "service access security group"
  }
}

이 문서가 도움이 되었습니까?


결제 또는 기술 지원이 필요합니까?