My AWS Glue job fails with lost nodes while migrating a large data set from Amazon RDS to Amazon S3

Last updated: 2019-06-12

I'm migrating a large dataset from Amazon Relational Database Service (Amazon RDS) or an on-premises JDBC database to Amazon Simple Storage Service (Amazon S3) using AWS Glue. My ETL job runs for a long time. Then it fails with lost nodes.

Short Description

AWS Glue uses a single connection to read the entire dataset. If you're migrating a large JDBC table, the ETL job might run for a long time without any signs of progress on the AWS Glue side. The job might eventually fail because of disk space issues (lost nodes). To resolve this issue, read the JDBC table in parallel. If the job still fails with lost nodes, use a SQL expression as a pushdown predicate.

Resolution

Read the JDBC table in parallel

If the table doesn't have any numeric columns (INT or BIGINT), use the hashfield option to partition the data. Set hashfield to the name of a column in the JDBC table. For best results, choose a column with an even distribution of values.

If the table has numeric columns, set the hashpartitions and hashexpression options in the table or while creating the DynamicFrame:

  • hashpartitions: defines the number of tasks that AWS Glue creates to read the data
  • hashexpression: divides rows evenly among tasks

Here's an example of how to set hashpartitions and hashexpression while creating a DynamicFrame using a JDBC connection:

connection_option= {"url": "jdbc:mysql://mysql–instance1.123456789012.us-east-1.rds.amazonaws.com:3306/database", "user": "test", "password": "password","dbtable": "test_table","hashexpression":"column_name","hashpartitions":"10"}

datasource0 = glueContext.create_dynamic_frame.from_options('mysql',connection_options=connection_option,transformation_ctx = "datasource0")

Here's an example of how to set hashpartitions and hashexpression while creating a DynamicFrame from the AWS Glue catalog:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "test_table",additional_options={"hashexpression":"column_name","hashpartitions":"10"}, transformation_ctx = "datasource0")

Note: Setting larger values for hashpartitions can reduce your table's performance. That's because each task reads the entire table and returns a set of rows to the executor.

For more information, see Reading from JDBC Tables in Parallel.

Use a SQL expression as a pushdown predicate

If the table contains billions of records and tebibytes of data, the job might take a long time to complete or fail with lost nodes, even after you set hashpartitions and hashexpression. To resolve these issues, use a SQL expression similar to the following with the hashexpression option:

column_name > 1000 AND column_name < 2000 AND column_name

The SQL expression acts as a pushdown predicate—it forces the job to read one set of rows per job run, rather than reading all the data at once. Here is what the full statement looks like:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "test_table",additional_options={"hashexpression":"column_name > 1000 AND column_name < 2000 AND column_name","hashpartitions":"10"}, transformation_ctx = "datasource0")

Note: Be sure to disable job bookmarks for initial job runs with this configuration. When you run a job with a job bookmark, AWS Glue records the maximum value of the column. When you run the job again, AWS Glue processes only the rows that have values greater than the previous bookmark value. You can enable job bookmarks during the last job run, if needed.


Did this article help you?

Anything we could improve?


Need more help?