Comment puis-je concaténer des fichiers Parquet dans Amazon EMR ?

Date de la dernière mise à jour : 09/07/2020

J'utilise S3DistCp (s3-dist-cp) pour concaténer des fichiers au format Apache Parquet avec les options --groupBy et --targetSize. La tâche s3-dist-cp s'exécute sans erreur, mais les fichiers Parquet générés ne fonctionnent pas. Lorsque j'essaie de lire les fichiers Parquet dans les applications, je reçois un message d'erreur similaire à ce qui suit :

« Expected n values in column chunk at /path/to/concatenated/parquet/file offset m but got x values instead over y pages ending at file offset z »

Comment puis-je concaténer des fichiers Parquet dans Amazon EMR ?

Brève description

S3DistCp ne prend pas en charge la concaténation pour les fichiers Parquet. Utilisez PySpark à la place.

Solution

Vous ne pouvez pas spécifier la taille du fichier cible dans PySpark, mais vous pouvez spécifier le nombre de partitions. Spark enregistre chaque partition dans un fichier de sortie distinct. Pour estimer le nombre de partitions dont vous avez besoin, divisez la taille de l'ensemble de données par la taille du fichier individuel cible.

1.    Créez un cluster Amazon EMR disposant d'Apache Spark.

2.    Spécifiez le nombre de programmes d'exécution dont vous avez besoin. Cela dépend de la capacité du cluster et de la taille de l'ensemble de données. Pour plus d'informations, consultez Bonnes pratiques de gestion de la mémoire pour les applications Apache Spark sur Amazon EMR.

$  pyspark --num-executors number_of_executors

3.    Chargez les fichiers Parquet source dans un DataFrame Spark. Il peut s'agir d'un chemin Amazon Simple Storage Service (Amazon S3) ou d'un chemin HDFS. Par exemple :

df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")

HDFS :

df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")

4.    Repartitionnez le DataFrame. Dans l'exemple suivant, n correspond au nombre de partitions.

df_output=df.coalesce(n)

5.    Enregistrez le DataFrame dans la destination. Il peut s'agir d'un chemin Amazon S3 ou d'un chemin HDFS. Par exemple :

df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")

HDFS :

df=sqlContext.read.parquet("hdfs:///tmp/destination/")

6.    Vérifiez combien de fichiers se trouvent à présent dans le répertoire de destination :

hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"

Le nombre total de fichiers doit être la valeur n spécifiée à l'étape 4, plus un. Le validateur de sortie Parquet écrit le fichier supplémentaire, appelé _SUCCESS.


Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?