如何解决 AWS Glue 中的“Unable to infer schema”异常?

上次更新时间:2019 年 6 月 12 日

AWS Glue 作业失败,并显示以下异常:

  • “AnalysisException: u'Unable to infer schema for Parquet.It must be specified manually.;'”
  • “AnalysisException: u'Unable to infer schema for ORC.It must be specified manually.;'”

简短描述

如果 AWS Glue 尝试读取的 Parquet 或 Orc 文件未存储在使用 key=val 结构的 Apache Hive 样式的分区路径中,通常会发生此错误。AWS Glue 期望 Amazon Simple Storage Service (Amazon S3) 源文件采用键值对形式。例如,如果 AWS Glue 作业正在处理 s3://s3-bucket/parquet-data/ 中的文件,这些文件应具有以下分区结构:

s3://s3-bucket/parquet-data/year=2018/month=10/day=10/file1.parquet

如果您的 Parquet 或 Orc 文件存储在分层结构中,AWS Glue 作业将失败,并显示“Unable to infer schema”异常。例如:

s3://s3-bucket/parquet-data/year/month/day/file1.parquet

解决方法

使用以下方法之一来解决此错误。

调整数据结构

将文件复制到新的 S3 存储桶,并使用 Hive 样式的分区路径。重新运行作业。

将分区列名称替换为星号

如果调整数据结构不可行,请从 Amazon S3 直接创建 DynamicFrame。在 Amazon S3 路径中,将所有分区列名称替换为星号 (*)。当您使用此解决方案时,AWS Glue 不会在 DynamicFrame 中包含分区列,只包含数据。

例如,假定您的文件存储在一个 S3 存储桶中并采用以下分区结构:

s3://s3-bucket/parquet-data/year/month/day/files.parquet

要处理 s3://s3-bucket/parquet-data/ 路径中的所有文件,请创建 DynamicFrame,如下所示:

dynamic_frame0 = glueContext.create_dynamic_frame_from_options('s3',connection_options={'paths':['s3://s3-bucket/parquet-data/*/*/*'],},format="parquet",transformation_ctx = "dynamic_frame0")

使用映射转换添加分区列

要在 DynamicFrame 中包含分区列,请先创建一个 DataFrame,然后为 Amazon S3 文件路径添加一列。之后,创建 DynamicFrame 并应用映射转换以添加分区列,如下例所示。在使用示例代码之前,请替换 Amazon S3 路径,并使用正确的索引值输入分区列名称。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df= spark.read.parquet("s3://s3-bucket/parquet-data/*/*/*")
modified_df = df.withColumn('partitions_column',input_file_name())
dyf_0 = DynamicFrame.fromDF(modified_df, glueContext, "dyf_0")

def modify_col(x):
     if x['partitions_column']:
        new_columns = x['partitions_column'].split('/')
        x['year'],x['month'],x['day'] = new_columns[4],new_columns[5],new_columns[6]
        del x['partitions_column']
     return x

modified_dyf = Map.apply(dyf_0,f=modify_col)
datasink2 = glueContext.write_dynamic_frame.from_options(frame =modified_dyf , connection_type = "s3", connection_options = {"path": "s3://my-output-bucket/output/","partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink2")

有关应用映射转换的更多信息,请参阅映射类


这篇文章对您有帮助吗?

您觉得我们哪些地方需要改进?


需要更多帮助?