Comment puis-je résoudre l'erreur « MalformedJson » lors de l'importation de fichiers CSV ou TSV dans DynamoDB à l'aide du modèle Data Pipeline par défaut ?

Date de la dernière mise à jour : 27/08/2020

Lorsque j'essaie d'importer des fichiers CSV ou TSV dans Amazon DynamoDB à l'aide du modèle AWS Data Pipeline par défaut, j'obtiens une erreur « MalformedJson ».

Résolution

Remarque : cette résolution est valide pour Amazon EMR 4.7.0 et les versions ultérieures.

Le modèle Importer les données de sauvegarde DynamoDB à partir du modèle S3 ne fonctionne que si vous exportez d'abord les données à l'aide du modèle Exporter DynamoDB vers S3. Si vous n'avez pas utilisé le modèle Exporter la table DynamoDB vers S3 créez un nouveau pipeline à l'aide d'un objet DynamoDBDataFormat avec un objet HiveActivity. Utilisez le script suivant pour l'objet HiveActivity. Ce script supprime les fichiers jar incompatibles du chemin de classe Hive.

delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};

Voici un exemple de définition de pipeline qui exporte des fichiers CSV d'Amazon Simple Storage Service (Amazon S3) vers DynamoDB. L'exemple de pipeline lance les ressources Amazon EMR dans un sous-réseau privé, qui est plus sécurisé qu'un sous-réseau public. Pour plus d'informations, consultez Configuration d'un cluster Amazon EMR dans un sous-réseau privé. Les fichiers CSV de cet exemple contiennent les données suivantes :

  • AnyCompany1,100
  • AnyCompany2,20
  • AnyCompany3,30

Remarque : la table DynamoDB doit exister avant d'exécuter le pipeline. Veillez à spécifier vos valeurs pour les variables répertoriées dans la section « valeurs ». Pour plus d'informations, voir Ajout de myVariables à la définition de pipeline.

{
  "objects": [
    {
      "name": "DefaultEmrCluster1",
      "id": "EmrClusterId_kvKJa",
      "releaseLabel": "emr-5.23.0",
      "type": "EmrCluster",
      "subnetId": "#{mySubnetId}",
      "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "terminateAfter": "24 Hours"
    },
    {
      "dataFormat": {
        "ref": "DynamoDBDataFormatId_YMozb"
      },
      "name": "DefaultDataNode2",
      "id": "DataNodeId_WFWdO",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormatId_ciZN3"
      },
      "name": "DefaultDataNode1",
      "id": "DataNodeId_OZ8Nz",
      "type": "S3DataNode"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDynamoDBDataFormat1",
      "id": "DynamoDBDataFormatId_YMozb",
      "type": "DynamoDBDataFormat"
    },
    {
      "column": [
        "company string",
        "id bigint"
      ],
      "name": "DefaultDataFormat1",
      "id": "DataFormatId_ciZN3",
      "type": "CSV"
    },
    {
      "output": {
        "ref": "DataNodeId_WFWdO"
      },
      "input": {
        "ref": "DataNodeId_OZ8Nz"
      },
      "stage": "true",
      "maximumRetries": "0",
      "name": "DefaultHiveActivity1",
      "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
      "id": "HiveActivityId_AwIZ9",
      "runsOn": {
        "ref": "EmrClusterId_kvKJa"
      },
      "type": "HiveActivity"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Destination DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    }
  ],
  "values": {
    "myDDBTableName": "companyid",
    "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
    "mySubnetId": "subnet_id",
    "myCoreAndTaskSecurityGroup": "core and task security group", 
    "myMasterSecurityGroup": "master security group",
    "myServiceAccessSecurityGroup":  "service access security group"
  }
}

Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?