AWS Big Data Blog

Entity resolution and fuzzy matches in AWS Glue using the Zingg open source library

In today’s data-driven world, organizations often deal with data from multiple sources, leading to challenges in data integration and governance. AWS Glue, a serverless data integration service, simplifies the process of discovering, preparing, moving, and integrating data for analytics, machine learning (ML), and application development.

One critical aspect of data governance is entity resolution, which involves linking data from different sources that represent the same entity, despite not being exactly identical. This process is crucial for maintaining data integrity and avoiding duplication that could skew analytics and insights.

AWS Glue is based on the Apache Spark framework, and offers the flexibility to extend its capabilities through third-party Spark libraries. One such powerful open source library is Zingg, an ML-based tool, specifically designed for entity resolution on Spark.

In this post, we explore how to use Zingg’s entity resolution capabilities within an AWS Glue notebook, which you can later run as an extract, transform, and load (ETL) job. By integrating Zingg in your notebooks or ETL jobs, you can effectively address data governance challenges and provide consistent and accurate data across your organization.

Solution overview

The use case is the same as that in Integrate and deduplicate datasets using AWS Lake Formation FindMatches.

It consists of a dataset of publications, which has many duplicates because the titles, names, descriptions, or other attributes are slightly different. This often happens when collating information from different sources.

In this post, we use the same dataset and training labels but show how to do it with a third-party entity resolution like the Zingg ML library.

Prerequisites

To follow this post, you need the following:

Set up the required files

To run the notebook (or later to run as a job), you need to set up the Zingg library and configuration. Complete the following steps:

  1. Download the Zingg distribution package for AWS Glue 4.0, which uses Spark 3.3.0. The appropriate release is Zingg 0.3.4.
  2. Extract the JAR file zingg-0.3.4-SNAPSHOT.jar inside the tar and upload it to the base of your S3 bucket.
  3. Create a text file named config.json and enter the following content, providing the name of your S3 bucket in the places indicated, and upload the file to the base of your bucket:
{
    "fieldDefinition":[
            {
                    "fieldName" : "title",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "authors",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "venue",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "year",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"double\""
            }
    ],
    "output" : [{
            "name":"output",
            "format":"csv",
            "props": {
                    "location": "s3://<your bucket name>/matchOuput/",
                    "delimiter": ",",
                    "header":true
            }
    }],
    "data" : [{
            "name":"dblp-scholar",
            "format":"json",
            "props": {
                    "location": "s3://ml-transforms-public-datasets-us-east-1/dblp-scholar/records/dblp_scholar_records.jsonl"
            },
            "schema":
                    "{\"type\" : \"struct\",
                    \"fields\" : [
                            {\"name\":\"id\", \"type\":\"string\", \"nullable\":false},
                            {\"name\":\"title\", \"type\":\"string\", \"nullable\":true},
                            {\"name\":\"authors\",\"type\":\"string\",\"nullable\":true} ,
                            {\"name\":\"venue\", \"type\":\"string\", \"nullable\":true},
                            {\"name\":\"year\", \"type\":\"double\", \"nullable\":true},
                            {\"name\":\"source\",\"type\":\"string\",\"nullable\":true}
                    ]
            }"
    }],
    "numPartitions":4,
    "modelId": 1,
    "zinggDir": "s3://<your bucket name>/models"
}

You can also define the configuration programmatically, but using JSON makes it more straightforward to visualize and allows you to use it in the Zingg command line tool. Refer to the library documentation for further details.

Set up the AWG Glue notebook

For simplicity, we use an AWS Glue notebook to prepare the training data, build a model, and find matches. Complete the following steps to set up the notebook with the Zingg libraries and config files that you prepared:

  1. On the AWS Glue console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. Leave the default options and choose a role suitable for notebooks.
  4. Add a new cell to use for Zingg-specific configuration and enter the following content, providing the name of your bucket:

%extra_jars s3://<your bucket>/zingg-0.3.4-SNAPSHOT.jar
%extra_py_files s3://<your bucket>/config.json
%additional_python_modules zingg==0.3.4

notebook setup cell

  1. Run the configuration cell. It’s important that this is done before running any other cell because the configuration changes won’t apply if the session is already started. If that happens, create and run a cell with the content %stop_session. This will stop the session but not the notebook, so when you run a cell will code, it will start a new one, using all the configuration settings you have defined at that moment.
    Now the notebook is ready to start the session.
  1. Create a session using the setup cell provided (labeled: “Run this cell to set up and start your interactive session”).
    After a few seconds, you should get a message indicating the session has been created.

Prepare the training data

Zingg enables providing sample training pairs as well as interactively defining them by an expert; in the latter, the algorithm finds examples that it considers meaningful and asks an expert if it’s a match, if it’s not, or if the expert can’t decide. The algorithm can work with a few samples of matches and non-matches, but the larger the training data, the better.

In this example, we reuse the labels provided in the original post, which assigns the samples to groups of rows (called clusters) instead of labeling individual pairs. Because we need to transform that data, we can convert it to the format that Zingg uses internally, so we skip having to configure the training samples definition and format. To learn more about the configuration that would be required, refer to Using pre-existing training data.

  1. In the notebook with the session started, add a new cell and enter the following code, providing the name of your own bucket:
bucket_name = "<your bucket name>"

spark.read.csv(
    "s3://ml-transforms-public-datasets-us-east-1/dblp-scholar/labels/dblp_scholar_labels_350.csv"
    , header=True).createOrReplaceTempView("labeled")

spark.sql("""
SELECT book.id as z_zid, "sample" as z_source, z_cluster, z_isMatch,
           book.title, book.authors, book.venue, CAST(book.year AS DOUBLE) as year, book.source
FROM(
    SELECT explode(pair) as book, *
    FROM(
        SELECT (a.label == b.label) as z_isMatch, array(struct(a.*), 
               struct(b.*)) as pair, uuid() as z_cluster
        FROM labeled a, labeled b 
        WHERE a.labeling_set_id = b.labeling_set_id AND a.id != b.id
))
""").write.mode("overwrite").parquet(f"s3://{bucket_name}/models/1/trainingData/marked/")
print("Labeled data ready")
  1. Run the new cell. After a few seconds, it will print the message indicating the labeled data is ready.

Build the model and find matches

Create and run a new cell with the following content:

sc._jsc.hadoopConfiguration().set('fs.defaultFS', f's3://{bucket_name}/')
sc._jsc.hadoopConfiguration().set('mapred.output.committer.class', "org.apache.hadoop.mapred.FileOutputCommitter")

from zingg.client import Arguments, ClientOptions, FieldDefinition, Zingg
zopts = ClientOptions(["--phase", "trainMatch",  "--conf", "/tmp/config.json"])
zargs = Arguments.createArgumentsFromJSON(zopts.getConf(), zopts.getPhase())
zingg = Zingg(zargs, zopts)
zingg.init()
zingg.execute()

Because it’s doing both training and matching, it will take a few minutes to complete. When it’s complete, the cell will print the options used.

If there is an error, the information returned to the notebook might not be enough to troubleshoot, in which case you can use Amazon CloudWatch. On the CloudWatch console, choose Log Groups in the navigation pane, then under /aws-glue/sessions/error, find the driver log using the timestamp or the session ID (the driver is the one with just the ID without any suffix).

Explore the matches found by the algorithm

As per the Zingg configuration, the previous step produced a CSV file with the matches found on the original JSON data. Create and run a new cell with the following content to visualize the matches file:

from pyspark.sql.functions import col
spark.read.csv(f"s3://{bucket_name}/matchOuput/", header=True) \
    .withColumn("z_cluster", col("z_cluster").cast('int')) \
    .drop("z_minScore", "z_maxScore") \
    .sort(col("z_cluster")).show(100, False)

It will display the first 100 rows with clusters assigned. If the cluster assigned is the same, then the publications are considered duplicates.

Athena results

For instance, in the preceding screenshot, clusters 0 or 20 are spelling variations of the same title, with some incomplete or incorrect data in other fields. The publications appear as duplicates in these cases.

As in the original post with FindMatches, it struggles with editor’s notes and cluster 12 has more questionable duplicates, where the title and venue are similar, but the completely different authors suggest it’s not a duplicate and the algorithm needs more training with examples like this.

You can also run the notebook as a job, either choosing Run or programmatically, in which case you want to remove the cell you created earlier to explore the output, as well as any other cells that are not needed to do the entity resolution, such as the sample cells provided when you created the notebook.

Additional considerations

As part of the notebook setup, you created a configuration cell with three configuration magics. You could replace these with the ones in the setup cell provided, as long as they are listed before any Python code.

One of them specifies the Zingg configuration JSON file as an extra Python file, even though it’s not really a Python file. This is so it gets deployed on the cluster under the /tmp directory and it’s accessible by the library. You could also specify the Zingg configuration programmatically using the library’s API, and not require the config file.

In the cell that builds and runs the model, there are two lines that adjust the Hadoop configuration. This is required because the library was designed to run on HDFS instead of Amazon S3. The first one configures the default file system to use the S3 bucket, so when it needs to produce temporary files, they are written there. The second one restores the default committer instead of the direct one that AWS Glue configures out of the box.

The Zingg library is invoked with the phase trainMatch. This is a shortcut to do both the train and match phases in one call. It works the same as when you invoke a phase in the Zingg command line that is often used as an example in the Zingg documentation.

If you want to do incremental matches, you could run a match on the new data and then a linking phase between the main data and the new data. For more information, see Linking across datasets.

Clean up

When you navigate away from the notebook, the interactive session should be stopped. You can verify it was stopped on the AWS Glue console by choosing Interactive Sessions in the navigation pane and then sorting by status, to check if any are running and therefore generating charges. You can also delete the files in the S3 bucket if you don’t intend to use them.

Conclusion

In this post, we showed how you can incorporate a third-party Apache Spark library to extend the capabilities of AWS Glue and give you the freedom of choice. You can use your own data in the same way, and then integrate this entity resolution as part of a workflow using a tool such as Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

If you have any questions, please leave them in the comments.


About the Authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team, with a background in machine learning and AI.

Emilio Garcia Montano is a Solutions Architect at Amazon Web Services. He works with media and entertainment customers and supports them to achieve their outcomes with machine learning and AI.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.