AWS Big Data Blog

Building an AWS Glue ETL pipeline locally without an AWS account

If you’re new to AWS Glue and looking to understand its transformation capabilities without incurring an added expense, or if you’re simply wondering if AWS Glue ETL is the right tool for your use case and want a holistic view of AWS Glue ETL functions, then please continue reading. In this post, we walk you through several AWS Glue ETL functions with supporting examples, using a local PySpark shell in a containerized environment with no AWS artifact dependency. If you’re already familiar with AWS Glue and Apache Spark, you can use this solution as a quick cheat sheet for AWS Glue PySpark validations.

You don’t need an AWS account to follow along with this walkthrough. We use small example datasets for our use case and go through the transformations of several AWS Glue ETL PySpark functions: ApplyMapping, Filter, SplitRows, SelectFields, Join, DropFields, Relationalize, SelectFromCollection, RenameField, Unbox, Unnest, DropNullFields, SplitFields, Spigot and Write Dynamic Frame.

This post provides an introduction of the transformation capabilities of AWS Glue and provides insights towards possible uses of the supported functions. The goal is to get up and running with AWS Glue ETL functions in the shortest possible time, at no cost and without any AWS environment dependency.

Prerequisites

To follow along, you should have the following resources:

  • Basic programming experience
  • Basic Python and Spark knowledge (not required but good to have)
  • A desktop or workstation with Docker installed and running

If you prefer to set up the environment locally outside of a Docker container, you can follow the instructions provided in the GitHub repo, which hosts libraries used in AWS Glue. These libraries extend Apache Spark with additional data types and operations for ETL workflows.

Setting up resources

For this post, we use the amazon/aws-glue-libs:glue_libs_1.0.0_image_01 image from Dockerhub. This image has only been tested for AWS Glue 1.0 spark shell (PySpark). Additionally, this image also supports Jupyter and Zeppelin notebooks and a CLI interpreter. For the purpose of this post, we use the CLI interpreter. For more information on the container, please read Developing AWS Glue ETL jobs locally using a container.

To pull the relevant image from the Docker repository, enter the following command in a terminal prompt:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

To test on the command prompt, enter the following code:

docker run -itd --name glue_without_notebook amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker exec -it glue_without_notebook bash
/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

To test on Jupyter notebooks, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter \amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
/home/jupyter/jupyter_start.sh

Browse to ‘localhost:8888’ in a browser to open Jupyter notebooks.

Importing GlueContext

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

Dataset 1

We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

The following .show() command allows us to view the DataFrame in the shell:

df_orders.show()

# Output
+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

DynamicFrame

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf") 

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

ApplyMapping

The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()

# Output
root
|-- order_id: long
|-- customer_id: long
|-- essential_item: string
|-- timestamp: long
|-- zip: long

Filter

We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

# Output 
+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

Map

Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

# Output
+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        TRUE|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

Dataset 2

To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

# Output
+-----------------------------------------------------+-----+
|customers                                            |zip  |
+-----------------------------------------------------+-----+
|[[108 Park Street, TX, 623], [763 Marsh Ln, TX, 231]]|75091|
|[[771 Peek Pkwy, GA, 201]]                           |82091|
|[[66 P Street, NY, 343]]                             |75023|
|[[708 Fed Ln, CA, 932], [807 Deccan Dr, CA, 102]]    |90093|
+-----------------------------------------------------+-----+
# Input
df_json.printSchema()

# Output
root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

SelectFields

To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

# Output
+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

Join

The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# Output
+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[108 Park Street...|75091|75091|
|[[66 P Street, NY...|75023|75023|
|[[708 Fed Ln, CA,...|90093|90093|
+--------------------+-----+-----+

DropFields

Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

# Output
+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[66 P Street, NY...|75023|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

Relationalize

The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

# Input
dyf_relationize.keys()

# Output
dict_keys(['root', 'root_customers'])

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

SelectFromCollection

The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

# Output
+---------+-----+
|customers|  zip|
+---------+-----+
|        1|75091|
|        2|75023|
|        3|90093|
+---------+-----+

To retrieve the second DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

# Output
+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  2|    0|      66 P Street, NY|             343|
|  3|    0|       708 Fed Ln, CA|             932|
|  3|    1|    807 Deccan Dr, CA|             102|
|  1|    0|  108 Park Street, TX|             623|
|  1|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

RenameField

The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

# Output
+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|    66 P Street, NY|    343|
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

ResolveChoice

ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

# Output
root
|-- address: string
|-- cust_id: string

Dataset 3

We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

# Output
root
|-- warehouse_location: string
|-- data: string

Unbox

We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()
# Output
root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: int
|    |-- pineapple: int
|    |-- mango: int
|    |-- pears: null

# Input 
dyf_unbox.toDF().show()

# Output
+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

Unnest

Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

# Output 
root
|-- warehouse_loc: string
|-- data.strawberry: int
|-- data.pineapple: int
|-- data.mango: int
|-- data.pears: null

dyf_unnest.toDF().show()

# Output
+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

DropNullFields

The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

SplitFields

SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

# Output
+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

For the second Dynamic Frame, see the following code:

# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

# Output
+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

SplitRows

SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')

For the first Dynamic Frame, see the following code:

# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

For the second Dynamic Frame, see the following code:

# Input
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

Spigot

Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

Write Dynamic Frame

The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')

Conclusion

This article discussed the PySpark ETL capabilities of AWS Glue. Further testing with an AWS Glue development endpoint or directly adding jobs in AWS Glue is a good pivot to take the learning forward. For more information, see General Information about Programming AWS Glue ETL Scripts.


About the Authors

Adnan Alvee is a Big Data Architect for AWS ProServe Remote Consulting Services. He helps build solutions for customers leveraging their data and AWS services. Outside of AWS, he enjoys playing badminton and drinking chai.

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data analytics community.