AWS Glue で発生する「Unable to infer schema」例外を解決するにはどうすればよいですか?

最終更新日: 2019 年 6 月 12 日

My AWS Glue ジョブが次のいずれかの例外で処理に失敗します。

  • 「AnalysisException: u'Unable to infer schema for Parquet.It must be specified manually.;'」
  • 「AnalysisException: u'Unable to infer schema for ORC.It must be specified manually.;'」

簡単な説明

通常このエラーは、AWS Glue が読み込もうとする Parquet や Orc ファイルの格納場所がkey=val 構造を持つ Apache Hive 形式でパーティションされたパスではない場合に発生します。AWS Glue では、Amazon Simple Storage Service (Amazon S3) のソースファイルは、キーと値のペアであることが前提です。例えば、AWS Glue ジョブが s3://s3-bucket/parquet-data/ にあるファイルを処理している場合、このファイルは次のパーティション構造である必要があります。

s3://s3-bucket/parquet-data/year=2018/month=10/day=10/file1.parquet

ご使用になる Parquet あるいは Orc ファイルが階層構造で保存されている場合、AWS Glue ジョブは「Unable to infer schema」例外を発生し失敗します。例:

s3://s3-bucket/parquet-data/year/month/day/file1.parquet

解決方法

このエラーを解決するには、次のいずれかの方法を使用します。

データの再構築

Hive 形式でパーティションされたパスを使い、ファイルを新しい S3 バケットにコピーします。ジョブを再度実行します。

パーティション列の名前をアスタリスクに置き換えます

データの再構築が失敗する場合は、Amazon S3 から DynamicFrame を直接作成します。Amazon S3 パスで、すべてのパーティション列名をアスタリスク (*) に置き換えます。この解決法を使うとき、AWS Glue にはデータのみが含まれ、DynamicFrame のパーティション列は含まれていません。

例えば、ファイルは次のようなパーティション構造を持つ、S3 バケットに保存されていると考えられます。

s3://s3-bucket/parquet-data/year/month/day/files.parquet

s3://s3-bucket/parquet-data/ 内のすべてのファイルを処理するには、次のような DynamicFrame を作成します。

dynamic_frame0 = glueContext.create_dynamic_frame_from_options('s3',connection_options={'paths':['s3://s3-bucket/parquet-data/*/*/*'],},format="parquet",transformation_ctx = "dynamic_frame0")

マッピング変換を使用してパーティション列を追加する

DynamicFrame にパーティション列を含めるには、まず DataFrame を作成してから、Amazon S3 ファイルパスのための列を追加します。その後、次に示す例のように、DynamicFrame を作成してマッピング変換を行い、パーティション列を追加します。サンプルコードは、Amazon S3 パスを置き換え、パーティション列の名前は、正しいインデックスの値に書き換えてから使用してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df= spark.read.parquet("s3://s3-bucket/parquet-data/*/*/*")
modified_df = df.withColumn('partitions_column',input_file_name())
dyf_0 = DynamicFrame.fromDF(modified_df, glueContext, "dyf_0")

def modify_col(x):
     if x['partitions_column']:
        new_columns = x['partitions_column'].split('/')
        x['year'],x['month'],x['day'] = new_columns[4],new_columns[5],new_columns[6]
        del x['partitions_column']
     return x

modified_dyf = Map.apply(dyf_0,f=modify_col)
datasink2 = glueContext.write_dynamic_frame.from_options(frame =modified_dyf , connection_type = "s3", connection_options = {"path": "s3://my-output-bucket/output/","partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink2")

マップ変換の方法については、「マップクラス」で詳細をご参照ください。


この記事はお役に立ちましたか?

改善できることはありますか?


さらにサポートが必要な場合