AWS Big Data Blog
Test data quality at scale with Deequ
March 2023: You can now use AWS Glue Data Quality to measure and manage the quality of your data. AWS Glue Data Quality is built on DeeQu and it offers a simplified user experience for customers who want to this open-source package. Refer to the blog and documentation for additional details.
You generally write unit tests for your code, but do you also test your data? Incorrect or malformed data can have a large impact on production systems. Examples of data quality issues are:
- Missing values can lead to failures in production system that require non-null values (NullPointerException).
- Changes in the distribution of data can lead to unexpected outputs of machine learning models.
- Aggregations of incorrect data can lead to wrong business decisions.
In this blog post, we introduce Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse.
Since you’re reading this post, you may also be interested in the following: |
Deequ at Amazon
Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius.
Overview of Deequ
To use Deequ, let’s look at its main components (also shown in Figure 1).
- Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
- Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
- Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.
Figure 1: Overview of Deequ components
Setup: Launch the Spark cluster
This section shows the steps to use Deequ on your own data. First, set up Spark and Deequ on an Amazon EMR cluster. Then, load a sample dataset provided by AWS, run some analysis, and then run data tests.
Deequ is built on top of Apache Spark to support fast, distributed calculations on large datasets. Deequ depends on Spark version 2.2.0 or later. As a first step, create a cluster with Spark on Amazon EMR. Amazon EMR takes care of the configuration of Spark for you. Also, you canuse the EMR File System (EMRFS) to directly access data in Amazon S3. For testing, you can also install Spark on a single machine in standalone mode.
Connect to the Amazon EMR master node using SSH. Load the latest Deequ JAR from Maven Repository. To load the JAR of version 1.0.1, use the following:
wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar
Launch Spark Shell and use the spark.jars argument for referencing the Deequ JAR file:
spark-shell --conf spark.jars=deequ-1.0.1.jar
For more information about how to set up Spark, see the Spark Quick Start guide, and the overview of Spark configuration options.
Load data
As a running example, we use a customer review dataset provided by Amazon on Amazon S3. Let’s load the dataset containing reviews for the category “Electronics” in Spark. Make sure to enter the code in the Spark shell:
val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")
You can see the following selected attributes if you run dataset.printSchema() in the Spark shell:
root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)
Data analysis
Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. Deequ supports the following metrics (they are defined in this Deequ package):
Metric |
Description | Usage Example |
ApproxCountDistinct | Approximate number of distinct value, computed with HyperLogLogPlusPlus sketches. | ApproxCountDistinct("review_id") |
ApproxQuantile | Approximate quantile of a distribution. | ApproxQuantile("star_rating", quantile = 0.5) |
ApproxQuantiles | Approximate quantiles of a distribution. | ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9)) |
Completeness | Fraction of non-null values in a column. | Completeness("review_id") |
Compliance | Fraction of rows that comply with the given column constraint. | Compliance("top star_rating", "star_rating >= 4.0") |
Correlation | Pearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation. | Correlation("total_votes", "star_rating") |
CountDistinct | Number of distinct values. | CountDistinct("review_id") |
DataType | Distribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions. | DataType("year") |
Distinctness | Fraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3. | Distinctness("review_id") |
Entropy | Entropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055. | Entropy("star_rating") |
Maximum | Maximum value. | Maximum("star_rating") |
Mean | Mean value; null values are excluded. | Mean("star_rating") |
Minimum | Minimum value. | Minimum("star_rating") |
MutualInformation | Mutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative. | MutualInformation(Seq("total_votes", "star_rating")) |
PatternMatch | Fraction of rows that comply with a given regular experssion. | PatternMatch("marketplace", pattern = raw"\w{2}".r) |
Size | Number of rows in a DataFrame. | Size() |
Sum | Sum of all values of a column. | Sum("total_votes") |
UniqueValueRatio | Fraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2. | UniqueValueRatio("star_rating") |
Uniqueness | Fraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3. | Uniqueness("star_rating") |
In the following example, we show how to use the AnalysisRunner to define the metrics you are interested in. You can run the following code in the Spark shell by either just pasting it in the shell or by saving it in a local file on the master node and loading it in the Spark shell with the following command:
:load PATH_TO_FILE
The resulting data frame contains the calculated metrics (call metrics.show() in the Spark shell):
name | instance | value |
ApproxCountDistinct | review_id | 3010972 |
Completeness | review_id | 1 |
Compliance | top star_rating | 0.74941 |
Correlation | helpful_votes,total_votes | 0.99365 |
Correlation | total_votes,star_rating | -0.03451 |
Mean | star_rating | 4.03614 |
Size | * | 3120938 |
We can learn that:
- review_id has no missing values and approximately 3,010,972 unique values.
- 74.9 % of reviews have a star_rating of 4 or higher.
- total_votes and star_rating are not correlated.
- helpful_votes and total_votes are strongly correlated.
- The average star_rating is 4.0.
- The dataset contains 3,120,938 reviews.
Define and Run Tests for Data
After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.
For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data. In this example, we test for the following properties of our data:
- There are at least 3 million rows in total.
- review_id is never NULL.
- review_id is unique.
- star_rating has a minimum of 1.0 and a maximum of 5.0.
- marketplace only contains “US”, “UK”, “DE”, “JP”, or “FR”.
- year does not contain negative values.
This is the code that reflects the previous statements. For information about all available checks, see this GitHub repository. You can run this directly in the Spark shell as previously explained:
After calling run, Deequ translates your test description into a series of Spark jobs, which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., _ == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.
Call resultDataFrame.show(truncate=false) in the Spark shell to inspect the result. The resulting table shows the verification result for every test, for example:
constraint | constraint_status | constraint_message |
SizeConstraint(Size(None)) | Success | |
MinimumConstraint(Minimum(star_rating,None)) | Success | |
MaximumConstraint(Maximum(star_rating,None)) | Success | |
CompletenessConstraint(Completeness(review_id,None)) | Success | |
UniquenessConstraint(Uniqueness(List(review_id))) | Failure | Value: 0.9926566948782706 does not meet the constraint requirement! |
CompletenessConstraint(Completeness(marketplace,None)) | Success | |
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) | Success | |
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) | Success |
Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness.
We can also look at all the metrics that Deequ computed for this check:
VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)
Result:
name | instance | value |
Completeness | review_id | 1 |
Completeness | marketplace | 1 |
Compliance | marketplace contained in US,UK,DE,JP,FR | 1 |
Compliance | year is non-negative | 1 |
Maximum | star_rating | 5 |
Minimum | star_rating | 1 |
Size | * | 3120938 |
Uniqueness | review_id | 0.99266 |
Automated Constraint Suggestion
If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see this GitHub repository.
The result contains a list of constraints with descriptions and Scala code, so that you can directly apply it in your data quality checks. Call suggestionDataFrame.show(truncate=false) in the Spark shell to inspect the suggested constraints; here we show a subset:
column | constraint | scala code |
customer_id | ‘customer_id’ is not null | .isComplete("customer_id") |
customer_id | ‘customer_id’ has type Integral | .hasDataType("customer_id", ConstrainableDataTypes.Integral) |
customer_id | ‘customer_id’ has no negative values | .isNonNegative("customer_id") |
helpful_votes | ‘helpful_votes’ is not null | .isComplete("helpful_votes") |
helpful_votes | ‘helpful_votes’ has no negative values | .isNonNegative("helpful_votes") |
marketplace | ‘marketplace’ has value range ‘US’, ‘UK’, ‘DE’, ‘JP’, ‘FR’ | .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR")) |
product_title | ‘product_title’ is not null | .isComplete("product_title") |
star_rating | ‘star_rating’ is not null | .isComplete("star_rating") |
star_rating | ‘star_rating’ has no negative values | .isNonNegative("star_rating") |
vine | ‘vine’ has value range ‘N’, ‘Y’ | .isContainedIn("vine", Array("N", "Y")) |
Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is correct, which might not be the case. We recommend to review the suggestions before applying them in production.
More Examples on GitHub
You can find examples of more advanced features at Deequ’s GitHub page:
- Deequ not only provides data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
- Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
- If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation capability. For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.
Additional Resources
Learn more about the inner workings of Deequ in our VLDB 2018 paper “Automating large-scale data quality verification.”
Deequ is also used within Amazon SageMaker Model Monitor. You can also read Accelerate data preparation with data quality and insights, and learn how data science teams can use Amazon SageMaker Data Wrangler to efficiently and quickly verify quality and detects abnormalities in their data.
Conclusion
This blog post showed you how to use Deequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. Deequ is available for you now to build your own data quality management pipeline.
About the Authors
Philipp Schmidt is an Applied Science Manager at Amazon Search. Philipp’s team develops algorithms for improving the search customer experience through machine learning and data quality tracking. After his graduation from TU Berlin, he worked at the University of Potsdam and in several startups in Berlin. At Amazon, he worked as Software Development Engineer and Applied Scientist and leads a team in Amazon Search since early 2022.
Dustin Lange is an Applied Science Manager at Amazon Search in Berlin. Dustin’s team develops algorithms for improving the search customer experience through machine learning and data quality tracking. He completed his PhD in similarity search in databases in 2013 and started his Amazon career as an Applied Scientist in forecasting the same year.
Sebastian Schelter is a Senior Applied Scientist at Amazon Search, working on problems at the intersection of data management and machine learning. He holds a Ph.D. in large-scale data processing from TU Berlin and is an elected member of the Apache Software Foundation, where he currently serves as a mentor for the Apache Incubator.
Tammo Rukat is an Applied Scientist at Amazon Search in Berlin. He holds a PhD in statistical machine learning from the University of Oxford. At Amazon he makes use of the abundance and complexity of the company’s large-scale noisy datasets to contribute to a more intelligent customer experience.