AWS Big Data Blog
Monitor data quality in your data lake using PyDeequ and AWS Glue
August 2024: This post was reviewed and updated with examples against a new dataset. Additionally, changed the architecture to use AWS Glue Studio Notebooks and added information on the appropriate Deequ/PyDeequ versions.
In our previous post, we introduced PyDeequ, an open-source Python wrapper over Deequ, which enables you to write unit tests on your data to ensure data quality. The use case we ran through was on static, historical data, but most datasets are dynamic, so how can you quantify how your data is changing and detect anomalous changes over time?
At Amazon, we’ve leveraged PyDeequ on AWS Glue to address this problem. AWS Glue is a serverless data integration service that allows you to easily prepare and combine your data for analytics, machine learning (ML), and application development. AWS Glue enables data engineers to build extract, transform, and load (ETL) workflows with ease. By using PyDeequ with AWS Glue, you can create a metrics repository on your data and check for anomalous changes over time inside your ETL workflows. In this post, we share this design pattern with you.
Use cases of PyDeequ on AWS Glue include:
- Identifying and counting mismatched schema items and then immediately correcting them
- Reviewing your incoming data with standard or custom, predefined analytics before storing it for big data validation
- Tracking changes in data distribution by using a data quality metric file
- Immediately identifying and creating useful constraints based on data distribution
The post describes the implementation process and provides a step-by-step tutorial of tracking changes in data quality. It walks you through an example of transforming a large dataset to identify the seasonality of the trends over time. Next, you create, sort, and load a metrics repository using PyDeequ, which allows you to persist your analysis over time. Finally, you create an alert that notifies you when a data point is outside the forecasted range.
Since you’re reading this post, you may also be interested in the following: |
Where are the anomalies?
It can be difficult to immediately find anomalies within your incoming data stream over time. PyDeequ makes it easier to identify changes in data distribution by creating a metrics repository. The repository allows you to store and load a variety of anomaly checks to compare current and past metric values. For this post, you learn about the Holt Winters anomaly detection strategy, one of the various anomaly detection strategies that PyDeequ provides. The Holt Winters model forecasts future datasets based on a repeated periodical pattern (seasonality), a trend (slope), and the average between two corresponding time points.
You can apply the Holt Winters method in many different use cases, such as the following:
- Business problem – Identifying a shift in the demand of a product
- Data pattern – Input data deviates from trend and seasonality
- Business analysis – Detecting changes in profits over time
To demonstrate this anomaly detection strategy, we have generated a synthetic reviews dataset with total votes in the jewelry subset from 2013 to 2015. A graph of this data shows a correlation between years 2013 and 2014, however, the data looks different in 2015. The following graph illustrates January 2015 as divergent from the previous years, with an increase in the number of total votes relative to the previous years.
How can we detect similar events to these in new data?
With PyDeequ, you can easily identify anomalies without any visuals. January 2015 is outside the calculated forecast range; therefore, PyDeequ flags the data point as anomalous. This post demonstrates using PyDeequ’s anomaly detection to get email notifications for anomalous events, which look like the following screenshot.
Solution architecture
With Amazon Athena and an AWS Glue crawler, you can create an AWS Glue Data Catalog entry to access the Amazon Simple Storage Service (Amazon S3) data source. This allows the data to be easily queried for usage downstream. You can use an AWS Glue Notebook to interact with the dataset and run the checks. We configure our AWS Glue ETL jobs to use PyDeequ to store results in Amazon S3, and use Amazon Simple Notification Service (Amazon SNS) to notify administrators of any anomalies identified in the data.
The following diagram illustrates this architecture.
Solution overview
To implement this solution, you complete the following high-level steps:
- Create an SNS topic.
- Upload PyDeequ and Deequ to Amazon S3.
- Create an AWS Identity and Access Management (IAM) role for AWS Glue.
- Crawl, query, and create your dataset.
- Transform the dataset into a table.
- Create an AWS Glue Notebook.
- Create a new AWS Glue session.
- Extract the table.
- Transform the table.
- Use PyDeequ to detect anomalous data points.
Create an SNS topic
Complete the following steps to create your SNS topic:
- On the Amazon SNS console, choose Topics.
- Choose Create topic.
- For Type, choose Standard.
- For Name, enter
jewelry_hw
. - For Display name, enter
Holt Winters Anomaly Example
.
- Keep the other options as default and choose Create Topic.
- On the details page for the topic you just created, under Subscription, choose Create subscription.
- For Protocol, choose Email.
- For Endpoint, enter the email address to which you want to receive the notification.
- Keep the other options as default and choose Create subscription. An email will be sent to the entered endpoint.
- Open the email message and choose Confirm subscription.
- Take a note of the ARN for your SNS topic as you will need it when we begin working on the data in the AWS Glue notebook.
Prepare dependencies: upload PyDeequ and Deequ to Amazon S3
In this step, you will create an S3 bucket to store Glue job dependencies, Athena query results and PyDeequ metrics. We will upload PyDeequ source and Deequ JAR file to the S3 bucket as these will be needed by the AWS Glue Notebook.
It is critical to download the version of deequ JAR that corresponds to the versions of Spark PyDeequ that you are using. Please see the blog post “Test data quality at scale with PyDeequ” for detailed explanation on how to identify the correct version of deequ.
Due to the ongoing issue with the dependency on the breeze package in deequ, the Holt Winters Anomaly detection works as expected in AWS Glue 2.0, pydeequ release/1_1_1, Spark 2.4 and deequ-1.1.0_spark-2.4-scala-2.11.jar (only).
The deequ jar can be downloaded from the Apache Maven repository.
- From the Amazon S3 console, create a new bucket. We reference it as <__YOUR_BUCKET__> throughout this post.
- Create a folder called
dependencies
from within this bucket. - Download the the deequ jar that corresponds to your version of Spark and pydeequ.
- Clone the PyDeequ repository from GitHub. Below we show how to clone the latest version on *nix operating system. Utilize
-b <release>
git option to clone a specific release. Create a .zip file for PyDeequ by compressing the folder that contains the__init__.py
file. - Upload the deequ JAR and PyDeequ zip to your
dependencies
folder.
If you’re on a *nix operating system or have the AWS Command Line Interface (AWS CLI) configured, you can use the following code:
Create an IAM role for AWS Glue
AWS Glue is a serverless data integration service that makes it easy for analytics users to discover, prepare, move, and integrate data from multiple sources such as Amazon Athena and Amazon S3. AWS Glue job notebooks are based on Jupyter notebooks. These notebooks are serverless and require minimal setup in AWS Glue so you can get started quickly. In this section we will create a role and attach required policies needed to work with AWS Glue notebooks and other services in our architecture. To read more about roles and policies needed to work with AWS Glue notebooks follow the documentation link here.
Create a custom policy
First, we create a policy to allow access to S3 buckets and to send a message to an Amazon SNS topic.
- On the IAM console choose Policies and choose Create Policy.
- Switch from Visual to JSON and paste the policy below, but be sure to replace the placeholder values your resource values:
- Choose Next.
- Under the Policy details section provide policy name, for example,
pydeequ-hw-sns-s3
. - Choose Create Policy.
Create a role
You now create an IAM role and attach the required policies.
- On the IAM console, choose Roles.
- Choose Create a role.
- For Trusted entity, choose AWS Service.
- For Use case, choose Glue.
- Choose Next.
- Add the AWS managed policy to the role:
AWSGlueConsoleFullAccess
. - Add the policy you created in the previous section:
pydeequ-hw-sns-s3
. - Under the Role details section provide policy name. The name should start with
AWSGlueServiceRole-
. For example, you can name itAWSGlueServiceRole-glue-notebook
. - Choose Create Policy.
Crawl, query, and create the dataset
You will use Amazon Athena to query the data stored in an Amazon S3 bucket. Since Athena needs to access table metadata to run queries, you first need to create a metastore or a data catalog. AWS Glue data catalog natively integrates with Athena. You can create a data catalog by either creating a data crawler from Athena Query console under Tables and views or from AWS Glue Console. Here we will show you how to create a crawler in AWS Glue Console. With either option the newly created databases and tables will become available for query in Athena.
- On the AWS Glue console, choose Databases under Data Catalog and choose Add database.
- Provide database name
jewelry_hw
under Name and choose Create database.
- Choose the newly created database
jewelry_hw
and under Tables and select Add tables using crawler.
- On the page Set crawler properties provide crawler name and choose Next.
- Under Choose data sources and classifiers choose Not yet under Is your data already mapped to Glue tables?
- Choose Add a data source.
- In the pop-up window update the location of S3 data to In a different account
- Under S3 path enter
s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Jewelry/
- Under Subsequent crawler runs choose Crawl all subfolders.
- Select Add an S3 data source.
- Choose Next.
- Under Configure security settings choose Create new IAM role. Provide the following name: AWSGlueServiceRole-jewelry-hw.
- Choose Next.
- Under Set output and scheduling choose
jewerly_hw
as the Target database and choose On demand under Crawler schedule. - Choose Next.
- Under Review and create review the crawler properties, data sources, created role, output and scheduling and choose Create Crawler.
- On the next page, choose Run crawler.
The crawler will begin its run which will take a couple of minutes.
You are now ready to navigate to Athena console to query and explore the dataset.
Create a table with the sum of all total product review votes by month in Athena
We create a new table and apply Holt Winters anomaly detection algorithm in PyDeequ to the sum of the total product review votes by year and month.
If this is your first time working with Athena you will need to define the query result location in S3 for your queries.
- In Athena Query editor, under the Settings tab, choose Manage.
- Provide
s3://<__YOUR_BUCKET__>/athena-queries/
as the location and choose Save. - You might also be prompted to provide an S3 location when you first open Athena Query Editor. This step needs to be done only once.
- Return to the Editor and under Database select
jewelry_hw
. You will see the tableproduct_category_jewelry
.
Let’s create a second table from this dataset. This table includes three columns, which contain month, year, the sum of total product review votes and will subset the review only for the US. Enter the following query in the query editor:
- Choose Run.
- Under Tables and views in Athena Query Editor you will now see a new table called
jewelry_dataset
injewelry_hw
database.
Create an AWS Glue Notebook session
Extract the table
You must read the data table jewelry_dataset
and turn it into to a DataFrame so that it can be used with PyDeequ. Next, use the dropDuplicates
method to remove any potential duplicates within the dataset. See the following code:
The following screenshot shows your output.
Transform the table
You can further simplify the jewelry_df
table by using the date_format
method to change the column to only show the month and year of total_votes
. Afterwards, you can filter jewelry_df2
by year to contain only the two columns needed. See the following code:
You can use df_2013.show(10)
to see an iteration of what your data table looks like before iterating through PyDeequ. The following screenshot shows our output.
Use PyDeequ to detect anomalous data points
First, specify your version of Spark (use Spark 2.4 for now, due to the dependency issue):
For this post, we demonstrate how you can detect anomalous data points with the FileSystemMetricsRepository
class. A metrics repository is stored in JSON format to be used as a data quality report over time in Amazon S3, HDFS, or in memory. The variable s3_write_path
is where you want your JSON file to be stored within Amazon S3. See the following code:
Now load the 2013–2014 dataset into metrics.
If your dataset is collected monthly, and follows an annual seasonal trend, use the MetricInterval.Monthly
and SeriesSeasonality.Yearly
metrics. This selection requires you to collect at least 25 data points. The initial 24 data points are monthly values from 2013–2014, which we use to create the Holt Winters model. The values in 2015 are the forecasted points, which can identify an anomalous value.
As shown in the following code, create a for
loop that iterates through df_2013
. Use month
to create a date
to later help us query values from df_2013
. The filter
method allows you to create a df
data frame that contains the total_votes
values by month (for this post, the first iteration is a table of values from January 2013). Repeat the process for the data from 2014.
Next, each set of metrics that were computed needs be indexed by a ResultKey
, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.
Finally, create a VerificationSuite
. You can make PyDeequ write and store metrics in Amazon S3 by adding the useRepository
and saveOrAppendResult
method. Then add Holt Winters
with a Sum
analyzer to calculate monthly total_votes
. See the following code:
You can also load the metrics to a Data Frame to review:
Great! You have created the trend for the Holt Winters algorithm. Now it’s time to detect any anomalies within 2015.
Create another Holt Winters anomaly check similar to the 2013–2014 dataset. For each month, check for an anomaly using jewelry_result.status
. If it’s not a success, that means an anomaly has been detected. Collect the constraint_message
to see the error value. Use publish
to create an SNS notification. Include the topicArn
created in Amazon SNS, a Message
, subject
, and MessageAttribute
. If an anomaly has been detected, break out of the loop. See the following code:
After completing this tutorial, you should receive an email notification stating an anomaly has been detected for January 2015. This coincides with our hypothesis that PyDeequ will flag the same anomaly from the graph!
More on using AWS Glue and PyDeequ
This post shows how you can start exploring anomaly detection with PyDeequ. This simple tutorial is just the beginning of what you can do with AWS Glue. To add to this tutorial, you can create a time-based schedule for jobs and crawlers to run every time a dataset is appended.
Alternatively, you can use the different modules provided by PyDeequ and its tutorials, or the use case examples provided at the beginning of this post to further understand the dataset.
Resource cleanup
Clean up the resources created in this post when you’re finished:
- IAM roles – Deleting roles or instance profiles
- IAM policy – Deleting an IAM role (console)
- AWS Glue table – DeleteTable
- AWS Glue crawler – DeleteCrawler
- S3 bucket – Deleting a bucket
- SNS topic – DeleteTopic
Conclusion
This post demonstrates the basics of detecting anomalies using PyDeequ and AWS Glue. Anomaly detection relies on the metrics repository file. This repository can easily be stored within Amazon S3, HDFS, or in memory as a JSON object for future test usage and AWS Glue ETL jobs. In addition to AWS Glue, PyDeequ can function within Amazon EMR and SageMaker in order to best handle the needs of your data pipeline.
This approach allows you to improve the quality and your own knowledge of your dataset. You can also apply this tool to a variety of business scenarios. The contents of this tutorial are for demonstration purposes and not production workloads. Be sure to follow security best practices for handling data at rest and in transit when you adapt PyDeequ into your workflows.
About the Authors
Vitalina Komashko is a Data Scientist with AWS Professional Services. She holds a PhD in Pharmacology and Toxicology but transitioned to data science from experimental work because she wanted to own data generation and the interpretation of the results. Earlier in her career, she worked with biotech and pharma companies. At AWS, she enjoys solving problems for customers from a variety of industries and learning about their unique challenges.
Veronika Megler, PhD, is a Principal Applied Scientist for Amazon.com Consumer Packaging. Previously she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused on economic impacts of ML models and exploring causality from observational data.
Joan Aoanan is a ProServe Consultant at AWS. With her B.S. Mathematics-Computer Science degree from Gonzaga University, she is interested in integrating her interests in math and science with technology.
Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.
Audit History
Last reviewed and updated in August 2024 by Vitalina Komashko | Data Scientist