AWS Big Data Blog

Load data incrementally and optimized Parquet writer with AWS Glue

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. The first post of the series, Best practices to scale Apache Spark jobs and partition data with AWS Glue, discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

This post shows how to incrementally load data from data sources in an Amazon S3 data lake and databases using JDBC. It also shows how to scale AWS Glue ETL jobs by reading only newly added data using job bookmarks, and processing late-arriving data by resetting the job bookmark to the end of a prior job run. The post also reviews best practices using job bookmarks with complex AWS Glue ETL scripts and workloads.

Finally, the post shows how to use the custom AWS Glue Parquet writer optimized for performance by avoiding extra passes of the data and computing the schema at runtime. The AWS Glue Parquet writer also allows schema evolution in datasets with the addition or deletion of columns.

AWS Glue job bookmarks

AWS Glue’s Spark runtime has a mechanism to store state. This mechanism is used to track data processed by a particular run of an ETL job. The persisted state information is called job bookmark.

The snapshot above shows a view of the Glue Console with multiple job runs at different time instances of the same ETL job. Job bookmarks are used by AWS Glue jobs to process incremental data since the last job run. A job bookmark is composed of the states of various job elements, such as sources, transformations, and targets. For example, your AWS Glue job might read new partitions in an S3-backed table. AWS Glue tracks the partitions that the job has processed successfully to prevent duplicate processing and writing the same data to the target data store multiple times.

Job bookmark APIs

When using the AWS Glue console or the AWS Glue API to start a job, a job bookmark option is passed as a parameter.

There are three possible options:

  • Enable – This option causes the job to update the bookmark state after each successful run to keep track of processed data. Subsequent job run on the same data source only process newly added data since the last checkpoint.
  • Disable – Makes sure that job bookmarks are not used that can result in the job always processing the entire dataset. This is the default option.
  • Pause – Reads the state information and processes incremental data since the last checkpoint, but does not update it. You can use this option so that every subsequent run processes data from the same point in time.

In all cases, you are responsible for managing the output from previous job. For more information, see the first post in this series, Best practices to scale Apache Spark jobs and partition data with AWS Glue. For details about the parameters passed to a job, and specifically for a job bookmark, see Special Parameters Used by AWS Glue.

The following code example shows how to use job bookmarks in a Glue ETL job that reads from a AWS Glue table backed by a Amazon S3 location. The job receives new files from a Kinesis Firehose event stream in JSON format, transforms to rename two columns, converts and writes it out to Amazon Redshift.  transformation_ctx is the identifier for the job bookmark associated with this data source. For proper operation, you need job.init and job.commit to initialize and persist the bookmark state.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "firehose_s3_db",
                table_name = "firehose_s3_raw_table",
                transformation_ctx = "datasource0")
applymapping = ApplyMapping.apply(frame = datasource0, 
                mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
                transformation_ctx = "applymapping1")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping, catalog_connection = "redshift", connection_options = {"dbtable": "name", "database": "kinesis_db"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path")

job.commit()

When using the APIs or CLI to start a job run, you need to add the following arguments to enable the job bookmark:

Job Arguments :

--job-bookmark-option, job-bookmark-enable
--JOB_NAME, glue-job-incremental

For S3 input sources, AWS Glue job bookmarks check the last modified time of the objects to verify which objects to reprocess. If there are new files arriving from Kinesis firehose, or existing files changed, since your last job run, the files are reprocessed when the job is run again using a periodic Glue job trigger or S3 trigger notification.

If you intend to reprocess all the data using the same job, reset the job bookmark. To reset the job bookmark state, use the AWS Glue console, the ResetJobBookmark Action (Python: reset_job_bookmark) API operation, or the AWS CLI. For example, enter the following command using the AWS CLI:

aws glue reset-job-bookmark --job-name my-job-name

You can also use the ResetJobBookmark API to a specific point for scheduled job runs by passing in the job run ID. It resets the state of the job bookmark to that of after the job run ID when it is complete. This functionality is similar to time travel; for example, you can now reprocess input data from a time in the past and use a different set of transformations in your ETL script or downstream jobs orchestrated with AWS Glue workflows in the ETL pipeline. From the AWS Glue Console, you can use the Rewind job bookmark option to reset the job bookmark state to the commit of a previous job run.

AWS Glue keeps track of bookmarks for each job. If you delete a job, you also delete the job bookmark. Popular S3-based storage formats, including JSON, CSV, Apache Avro, XML, and JDBC sources, support job bookmarks. Starting with AWS Glue version 1.0, columnar storage formats such as Apache Parquet and ORC are also supported.

Best practices 1: Development with job bookmarks

In some cases, you might enable AWS Glue job bookmarks but your AWS Glue job reprocesses data that it already processed in an earlier run. That could happen because of the following reasons:

  • Missing job commit – The job.commit() statement at the end of your AWS Glue ETL script updates the state of the job bookmark. If you don’t include it, the job reprocesses both the previously processed and new files. Make sure that the job commit statement is executed by all possible code paths in your user script leading to job completion.
  • Missing transformation context – Transformation context is an optional parameter in the GlueContext However, job bookmarks need it to function correctly. Confirm that you include the transformation context parameter when creating the DynamicFrame. See the following code example:
    sample_dynF=glueContext.create_dynamic_frame_from_catalog(database, 
                table_name,
                transformation_ctx="sample_dynF") 
  • JDBC sources – Job bookmarks require source tables to either have a primary key column[s] or a column[s] with incrementing values, which need to be specified in the source options, when you access relational databases using a JDBC connection. Job bookmarks can capture only newly added rows. This behavior does not apply to source tables stored on S3.
  • Last modified time – To identify which files stored on S3 to process, job bookmarks check the last modified time of the objects, not the file names. If your input objects changed since the last time the job ran, then they are reprocessed when the job runs again.

Best practices 2: Monitoring job bookmarks

There are three ways to inspect the behavior of job bookmarks for any job run:

  • File list store in tmp directory – All AWS Glue ETL jobs running Apache Spark and using DynamicFrames to read data output a manifest file containing a list of processed files per path. The manifest file is stored in the temporary location specified with the job. The path of the file is :<temporary location>/partitionlisting/<job name>/<run id>/<source transformation_ctx>.input-files.jsonThis file captures the list of files read for the corresponding data source regardless of any enabled job bookmarks.
  • Job metrics – You can use the AWS Glue job metrics to inspect the S3 read and write operations and track the number of bytes read by the job using bookmarks. You can also track the data a job reads across its multiple runs in the AWS Glue console. For more information, see Monitoring the Progress of Multiple Jobs.
  • Glue job logs – An AWS Glue job also emits logs in the Spark driver log stream related to processing and skipping of partitions in S3. The logs are stored in Amazon CloudWatch.

Skipping partitions

A job skips partition when it is empty or the creation timestamp of that particular partition in the AWS Glue Data Catalog is older than the timestamp of the last job run as captured by the job bookmark. The following example log message indicates the skipped partition:

19/05/21 14:49:22 WARN HadoopDataSource: Skipping Partition
{"year": "2019", "month": "03", "day": "26", "hour": "13"}
has no new files detected 
@ s3://input-s3-prefix/Year=2019/Month=03/Day=26/Hour=13/ 
or path does not exist

Processing partitions

When a job finds a new S3 partition created after the last job run or that has new files to process, it generates a log message. Log messages also indicate the percentage of the total number of files in the particular partition. The initial and final job bookmark filters of the current job run process these files. The following example illustrates the job bookmark filtering logic.

If the partition is new (created after the most recent job run based on the partition creation time), then the job processes all of the files in the partition. The partition creation time is 1559235148000, which is after the last job run. See the following example log message:

19/05/31 10:39:55 INFO PartitionFilesListerUsingBookmark:
Found new partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000) 
with 47 files

An existing partition triggers the first bookmark filter. This filter selects files with modification timestamps since the last job run. In the following example log message, 15 out of 47 files in the partition are new and should be processed:

19/05/31 10:40:31 INFO PartitionFilesListerUsingBookmark:
After initial job bookmarks filter, 
processing 31.91% of 47 files 
in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@aa39e364,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

The final bookmark filter performs additional filtering to avoid race conditions related to S3’s eventual consistency. If a significantly large number of files arrive with the same modification time, this filter may exclude them from processing. In the following example log message, the filter processed all 15 files captured by the initial bookmark filter:

19/05/31 10:50:31 INFO PartitionFilesListerUsingBookmark:
After final job bookmarks filter, processing 100.00% of 15 files 
in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

Optimized Apache Parquet writer

AWS Glue offers an optimized Apache Parquet writer when using DynamicFrames to improve performance. Apache Parquet format is generally faster for reads than writes because of its columnar storage layout and a pre-computed schema that is written with the data into the files. AWS Glue’s Parquet writer offers fast write performance and flexibility to handle evolving datasets. Unlike the default Apache Spark Parquet writer, it does not require a pre-computed schema or schema that is inferred by performing an extra scan of the input dataset.

You can enable the AWS Glue Parquet writer by setting the format parameter of the write_dynamic_frame.from_options function to glueparquet. As data is streamed through an AWS Glue job for writing to S3, the optimized writer computes and merges the schema dynamically at runtime, which results in faster job runtimes. The AWS Glue Parquet writer also enables schema evolution by supporting the deletion and addition of new columns.

You can tune the AWS Glue Parquet writer further by setting the format_options parameters. See the following code example:

block_size = 128*1024*1024
page_size = 1024*1024
glueContext.write_dynamic_frame.from_options(frame = dyFrame, 
connection_type = "s3", connection_options = {"path": output_dir}, 
format = "glueparquet", 
format_options = {"compression": "snappy", 
                  blockSize = block_size, pageSize = page_size})

The default values for format_options are the following:

  • compression is “snappy”
  • blockSize is 128 MB
  • pageSize is 1 MB

The blockSize specifies the size of a row group in a Parquet file that is buffered in memory. The pageSize specifies the size of the smallest unit in a Parquet file that must be read fully to access a single record.

Conclusion

This post discussed how AWS Glue job bookmarks help incrementally process data collected from S3 and relational databases. You also learned how using job bookmarks can make backfilling historical data simple. Interacting with job bookmarks is easy; you can enable, disable, pause, and rewind them to a prior point in time. You can better tune your jobs and build checks to make sure all of the data is processed correctly by monitoring the progress and state of job bookmarks.

You can also use the AWS Glue Parquet writer to optimize the performance of writing Apache Parquet files in your data lake. The optimized writer enables schema evolution for Parquet files so you can manage changes to your data automatically.

We hope you try out these features to load and write your data in your Apache Spark applications on AWS Glue.

The third post in this series discusses how AWS Glue’s automatic code generation capability enables you to process and transform complex datasets with ease. The post also shows how to execute SQL queries on your datasets directly from your AWS Glue ETL script, and how to schedule and orchestrate data pipelines with AWS Glue workflows.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

 

Bijay Bisht is a senior software development engineer at AWS.