AWS Storage Blog

Integrating custom metadata with Amazon S3 Metadata

Organizations of all sizes face a common challenge: efficiently managing, organizing, and retrieving vast amounts of digital content. From images and videos to documents and application data, businesses are inundated with information that needs to be stored securely, accessed quickly, and analyzed effectively. The ability to extract, manage, and use metadata from this content is crucial for enabling powerful search capabilities, maintaining data governance, and deriving valuable insights.

However, as data volumes grow exponentially, these custom-built systems struggle to scale, often becoming complex and unwieldy, particularly when they are handling frequent updates, additions, or deletions of objects and their associated metadata. Sharing and integrating metadata across different systems, along with tracking object states, adds further undifferentiated heavy lifting. With Amazon S3 Metadata and Amazon S3 Tables, you can offload the generation and automate the management of these metadata and object states in a managed Apache Iceberg-compatible S3 table bucket.

In this post, we demonstrate how to simplify metadata management using S3 Metadata and S3 Tables. Through a practical example, learn how to integrate S3 Metadata with custom metadata generated by Amazon Rekognition and the Pillow tool, stored in an S3 table bucket using an Apache Flink application.

Solution overview

The solution uses Apache Iceberg compatible table in an S3 table bucket, to store the content metadata of S3 objects uploaded to a bucket. We use this S3 table in conjunction with an S3 Metadata table to answer questions related to both the content and current state of objects in the S3 bucket.

The application sample provided builds an enriched event stream from the S3 Event Notifications generated by PUTs and DELETEs to the bucket. Then, it commits the changes in that stream to an Apache Iceberg table in an S3 table bucket. It uses AWS Lambda to perform the extraction of metadata from the objects’ content, including labels generated by Amazon Rekognition and image dimensions (width and height) using Pillow, an Amazon DynamoDB table to enforce the order of the events, and finally Apache Flink to batch and commit the events to the Iceberg table in the S3 table bucket.

Figure 1- Solution overview

Figure 1: Solution overview

The flow of data through the system is as follows:

  1. Images are uploaded to the bucket.
  2. S3 Metadata captures and records the object metadata in an S3 table bucket.
  3. S3 Event Notifications are configured to invoke a Lambda function through Amazon EventBridge to capture custom metadata.
  4. Custom metadata about the content of the object is generated through Amazon Rekognition and Pillow, and inserted into a DynamoDB table.
  5. The enriched events are read from the DynamoDB Change Stream by Apache Flink.
  6. Apache Flink writes records to an Apache Iceberg table stored in an S3 table bucket.

When performing queries, you often use object metadata generated by the S3 Metadata feature or custom content metadata generated by the application, or a combination of both. Both types of metadata are stored in separate tables in the same S3 table bucket.

The role of DynamoDB and Apache Flink

S3 Event Notifications are designed to deliver notifications at least once, but they aren’t guaranteed to arrive in the same order that the events occurred. On rare occasions, the S3 retry mechanism might cause duplicate S3 Event Notifications for the same object event. Sometimes event processing can take different amounts of time for mutation operations done to the same key. The DynamoDB conditional-write capability prevents processing an earlier event after having completed a later one. The records are kept in DynamoDB for only 24 hours before they expire. For more information about event ordering and handling duplicate events, read the AWS Storage Blog post, Manage event ordering and duplicate events with Amazon S3 Event Notifications.

Apache Iceberg is an open table format for big data analytics tables, which are usually composed of many Apache Parquet files. Inserts, updates, and deletions from an Iceberg table are accomplished by reading the existing table data, inserting, modifying, or deleting rows from the data files, writing out new files with the modified content, and committing the changes to the table’s metadata. This means that row-level inserts or modifications can necessitate reading and writing a large volume of data spread across multiple files. Therefore, Apache Flink is used in this sample application to batch these modifications into bulk writes.

Solution components

The AWS Cloud Development Kit (AWS CDK) stack deploys the following resources:

Prerequisites

To deploy the AWS CDK stack, you need the following prerequisites:

To run the queries, you need to install either Apache Spark (3.5.2 or higher) with Iceberg (1.6.0 or higher) or use Amazon EMR (7.5 or higher).

Walkthrough

In this solution walkthrough, you perform the following series of steps to set up S3 Metadata and utilize it with custom metadata.

  1. Create an S3 table bucket.
  2. Deploy AWS CDK stack.
  3. Configure S3 Metadata.
  4. Start Managed Apache Flink application.
  5. Upload sample images to input bucket.
  6. Run queries to integrate custom metadata with Amazon S3 Metadata.

1. Create an S3 table bucket

Configure your AWS CLI with a default AWS Region where S3 Metadata is available. Then, create an S3 table bucket.

aws s3tables create-table-bucket --name mdblogtb --region us-east-1

Store the Amazon Resource Name (ARN) from the output, as you use it for AWS CDK deployment. You can also use the aws s3tables list-table-buckets --region us-east-1 command to list the table buckets. Table bucket names aren’t globally unique, thus you can create table buckets with the same name in different AWS Regions.

2. Deploy AWS CDK stack

The sample code is available for download on GitHub. Start by cloning the repository.

git clone https://github.com/aws-samples/amazon-s3-contentmetadata.git
cd amazon-s3-contentmetadata

Run the mvn package command to compile, verify, and build the necessary packages in the AWS CDK stack.

# Go to the IcebergProcessor directory
cd IcebergProcessor
mvn package
# Go to main repo directory
cd ..
sudo npm install cdk-nag
Bash

Run the AWS CDK commands to bootstrap, synthesize, and deploy the stack. To learn about these steps, refer to the AWS CDK documentation and tutorial. You pass the context value for the S3 table bucket ARN, thus AWS CDK can use it to create the new Iceberg table to store the custom metadata.

# Pass the ARN of the newly created S3 table bucket
# Make sure you are in main directory where you copied git repo `amazon-s3-contentmetadata`
cdk bootstrap --context s3_tables_bucket_arn=<ARN_TABLE_BUCKET>
cdk synth --context s3_tables_bucket_arn=<ARN_TABLE_BUCKET>
Bash

Figure 2 shows the output of the CDK bootstrap command.

Figure 2- AWS CDK bootstrap

Figure 2: AWS CDK bootstrap

Figure 3 shows the output of the CDK synth command.

Figure 3- AWS CDK synth in progress

Figure 3: AWS CDK synth in progress

Deploy the stack:

cdk deploy --context s3_tables_bucket_arn=<ARN_TABLE_BUCKET>

The deployment process identifies changes. Review the changes and accept them by choosing y at the prompt, as shown in Figure 4.

Figure 4- AWS CDK deploy

Figure 4: AWS CDK deploy

Upon successful completion, you should see output with the new S3 general purpose input bucket and the Flink ApplicationName. Sample output shown in Figure 5. You can view stack deployment details in the AWS CloudFormation service in the AWS Management Console. The new input S3 bucket is used to upload objects for custom metadata extraction.

Figure 5- AWS CDK output

Figure 5: AWS CDK output

Store the output in a text editor, as you use this information in future configuration steps.

3. Configure S3 Metadata

Create an S3 Metadata configuration on the new general purpose bucket as per the documentation. Enter the table bucket ARN that you captured during table bucket creation and use mdblogs3metadata as the metadata table name. Finally, choose Create metadata configuration, as shown in Figure 6.

Figure 6- S3 Metadata configuration

Figure 6: S3 Metadata configuration

In the output, note the new namespace aws_s3_metadata created in the provided table bucket to store the S3 generated metadata. This namespace name is system-generated and can’t be changed. Record the namespace and table name, as they are used in queries.

4. Start Managed Apache Flink application

Using the application name generated from the AWS CDK deploy output, start the Apache Flink application as follows, replacing it with your actual application name. Alternatively, you can use the list-applications `aws kinesisanalyticsv2 list-applications` to find the ApplicationName if you don’t have AWS CDK deploy output.

aws kinesisanalyticsv2 start-application --application-name IcebergProcessor29058B24-85XGQ9V1ZqgV

You can go to the Managed Apache Flink service in the console to check the status. It may take a few minutes for the application to start, thus you should wait until the status changes to Running.

At this stage, you have the following tables in your S3 table bucket mdblogtb, with purposes outlined as follows:

S3 table bucket name mdblogtb
Namespace aws_s3_metadata default
Table name mdblogs3metadata s3_content_metadata
Purpose  Store object metadata generated by S3 Metadata feature Store custom metadata from deployed application

5. Upload sample images to input bucket

The GitHub repository includes sample images and a commands.sh script to upload objects to the content input bucket.

cd images
./commands.sh <s3_input_bucket_from_cdk_output>
Bash

The script uploads objects with tags using the key Project and values Africa or Europe to simulate scenarios where images taken from different production runs. Each production can have different types of content, for example images from jungles featuring elephants, animals, places, soccer games, or furniture. Amazon Rekognition identifies the content in the image, and the image dimensions are extracted through Pillow. The uploaded files are categorized as follows:

Object tags Objects Description of objects
Project=Africa elephant-boy-jungle.jpg
girl-with-elephant.jpg
safari-elephant.jpg
elephant pictures
Project=Africa ball-in-stadium.jpg
soccer.jpg
man-soccerball.jpg
soccer ball pictures
Project=Africa tiger-on-tree.jpg tiger on a tree
Project=Europe insideroom-table.jpg
coffeemug-table.jpg
furniture-table.jpg
random furniture pics
Project=Europe elephant-europe.jpg
home-office-coffeemug.png
elephant
office table with various objects
Project=Africa landscape-bridge-river.jpg
river-trees-mountain.jpg
outdoor landscape
Project=Europe landscapes-beach-person.jpg
grapevine.jpg
outdoors
No object tags used mountain-biking-man-jump.jpg
person-walk-park.jpg
jetty-women-boardwalk.jpg
random images with people

The scenario is common in typical media storage use cases but it can be extended and customized to other datasets. Often, customers use different buckets or prefixes to store data in different categories. Therefore, the use of tags is optional and is for demonstration purposes only.

6. Run queries to integrate custom metadata with S3 Metadata

To query the S3 Tables, follow the instructions for Amazon EMR. Alternatively, you can use Apache Spark installed on a server to run Spark queries. Start the PySpark session (adjust path as required) by replacing <table_bucket_arn> with the correct ARN of the table bucket. Following is a sample command:

/opt/spark/bin/pyspark \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0,software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3 \
--conf "spark.driver.extraJavaOptions=-Djava.security.manager=allow" \
--conf "spark.executor.extraJavaOptions=-Djava.security.manager=allow" \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.s3tb=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.s3tb.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog" \
--conf "spark.sql.catalog.s3tb.warehouse=<table_bucket_arn>" \
--master "local[*]"
Pyspark

Next run queries to identify objects in the bucket based on the state of the objects and object tags (using S3 Metadata table) and the type of content (using custom content metadata). First, run the following query to see the sample record entries and table schema in both tables. It may take a few minutes for records to appear in the mdblogs3metadata table after objects are uploaded.

# show sample records from s3 metadata table 
spark.sql("SELECT * FROM s3tb.aws_s3_metadata.mdblogs3metadata LIMIT 2").show()
# show sample records from custom metadata table
spark.sql("SELECT * FROM s3tb.default.s3_content_metadata LIMIT 2").show()
Pyspark

Sample output of the query is provided in Figure 7 for your reference.

Figure 7- Sample table output

Figure 7: Sample table output

Scenario #1
For example, you want to identify the objects, their storage class, and object tags associated with Project Africa where there is an Elephant in the image. This may be due to a marketing campaign for this shoot, or perhaps the images need to be licensed, distributed, or reused for another project.

spark.sql(""" SELECT om.key, om.storage_class, om.object_tags, cm.labels \
FROM s3tb.aws_s3_metadata.mdblogs3metadata as om \
JOIN s3tb.default.s3_content_metadata as cm ON om.bucket = cm.bucket \
AND om.key = cm.key \
WHERE om.object_tags['Project'] = 'Africa' \
AND array_contains(cm.labels, 'Elephant') \
""").show(truncate=False)
Pyspark

Figure 8- Output from first query

Figure 8: Output from first query

Both the S3 Metadata generated table and the table with custom content information were used through a join.

In the output, elephant-europe.jpg was not listed, as it was part of Project Europe. To find all the objects with an Elephant in the image regardless of Project, run the following query.

spark.sql(""" SELECT om.key, om.storage_class, om.object_tags, cm.labels \
FROM s3tb.aws_s3_metadata.mdblogs3metadata as om \
JOIN s3tb.default.s3_content_metadata as cm ON om.bucket = cm.bucket \
AND om.key = cm.key \
WHERE array_contains(cm.labels, 'Elephant') \
""").show(truncate=False)
Pyspark

Figure 9- Output from second query

Figure 9: Output from second query

Scenario #2
To find just the objects where there is a Soccer Ball in the label, run the following query. In the provided sample, these images are tagged with Project Africa. If there are multiple tags, then you can use filter or simply collect a list of the images in the bucket.

spark.sql(""" \
SELECT om.key, om.storage_class, om.record_type, om.object_tags, cm.labels \
FROM s3tb.aws_s3_metadata.mdblogs3metadata AS om \
JOIN s3tb.default.s3_content_metadata AS cm \
ON om.bucket = cm.bucket AND om.key = cm.key AND om.e_tag = cm.etag \
WHERE array_contains(labels, "Soccer Ball") \
ORDER BY om.sequence_number \
""").show(truncate=False)
Pyspark

Figure 10- Output from query without coalesced view

Figure 10: Output from query without coalesced view

There are multiple entries for soccer.jpg. To understand this, first review the commands.sh script used to upload objects. You should see the following sequence of object operations and associated record_types:

  • The object was first uploaded with the wrong tag Project=WrongProject. [record_type CREATE ]
  • The object was deleted, this record doesn’t show up as the query matches eTag. If you run the query without eTag match, you will see this record. [record_type DELETE]
  • The object was uploaded again with the tag Project=Africa. [record_type CREATE and object_tags Project=Africa]
  • Object tags were modified using the put-object-tagging operation, and the new tag sets Object Info=Soccer Ball and Project=Africa were added. [record_type UPDATE_METADATA and object_tags Object Info=Soccer Ball and Project=Africa]

The reason for multiple entries for soccer.jpg in the S3 Metadata table mdblogs3metadata is that each row represents a mutation event that has created, updated, or deleted an object in your general purpose bucket. These events in this post are the result of various user actions simulated with commands.sh, but some can be the result of actions taken by S3 on your behalf, such as S3 Lifecycle expirations or lifecycle storage class transitions. S3 Metadata is an event-processing pipeline that is designed to keep the metadata table eventually consistent with the changes that occurred in your general purpose bucket. This provides the state of the object, thus eliminating the need for complex tracking systems.

Therefore, a coalesced view shows the latest state of the object. The view can identify the most recent version of each object by filtering out deleted objects and marking the latest version of each object based on sequence numbers. The results are ordered by the bucket, key, and sequence_number columns. Create it using the following command:

spark.sql(""" \
CREATE TEMPORARY VIEW s3_metadata_coalesced AS ( \
WITH cte as ( \
SELECT * from s3tb.aws_s3_metadata.mdblogs3metadata \
), \
version_stacks as ( \
SELECT *, \
LEAD(sequence_number, 1) over ( \
partition by (bucket, key, version_id) order by sequence_number ASC \
 ) as next_sequence_number \
from cte \
), \
latest_versions as ( \
SELECT * from version_stacks where next_sequence_number is NULL \
), \
extant_versions as ( \
SELECT * from latest_versions where record_type != 'DELETE' \
), \
with_is_latest as ( \
SELECT *, \
sequence_number = (MAX(sequence_number) over (partition by (bucket, key))) as is_latest_version \
FROM extant_versions \
) \
SELECT * from with_is_latest \
ORDER BY bucket, key, sequence_number \
) \
""").show()
Pyspark

We didn’t create a view for custom metadata, as the custom content metadata table deletes the record for any DELETE operation and adds a record for PUT object events. You can view the AWS CDK-deployed EventBridge rule to see the event types that trigger custom content metadata creation through a Lambda function.

We can re-run the query to find all the images with a Soccer Ball, but this time we use the new coalesced view.

spark.sql(""" \
  SELECT om.key, om.storage_class, om.record_type, om.object_tags,cm.labels \
  FROM s3_metadata_coalesced AS om \
  JOIN s3tb.default.s3_content_metadata AS cm \
  ON om.bucket = cm.bucket AND om.key = cm.key AND om.e_tag = cm.etag \
  WHERE array_contains(labels, "Soccer Ball") \
  ORDER BY om.sequence_number \
  """).show(truncate=False)
Pyspark

Figure 11- Output from query with coalesced view

Figure 11: Output from query with coalesced view

This time, only the single latest record for the object soccer.jpg appears in the output, since the coalesced view has the latest recorded state of the object as UPDATE_METADATA. If the last operation was a delete, then the object won’t appear in the output, as its corresponding record in the custom content metadata table would have been deleted.

Building workflow pipelines

You can store the output of the queries in a comma-separated values (CSV) file and use it for further processing. For example:

  • If you have tiered objects in the S3 Glacier Flexible Retrieval storage class, then you can filter records and use the keys to submit an S3 Batch Operations job to do a restore.
  • You can use the S3 Batch copy job to copy the object.
  • You can trigger a Lambda to process the image in a new format/dimension or copy to another bucket for editing.

Cleaning up

Clean up the deployed resources to avoid any future charges. Conduct the following steps:

  • Empty the S3 bucket that was used for input/image uploads.
  • Go to the main directory where you cloned the GitHub repository and run `CDK destroy`.
# Go to the main directory where you copied git repo
cdk destroy --context s3_tables_bucket_arn=<ARN_TABLE_BUCKET>

# Delete the S3 tables, namespace & table bucket
# delete table created for custom metadata
aws s3tables delete-table --table-bucket-arn <ARN_TABLE_BUCKET> \
    --namespace default \
    --name s3_content_metadata
# delete table created to store the s3 metadata    
aws s3tables delete-table --table-bucket-arn <ARN_TABLE_BUCKET> \
    --namespace aws_s3_metadata \ 
    --name mdblogs3metadata
# delete namespaces   
aws s3tables delete-namespace --table-bucket-arn <ARN_TABLE_BUCKET> --namespace default
aws s3tables delete-namespace --table-bucket-arn <ARN_TABLE_BUCKET> --namespace aws_s3_metadata
# delete table bucket
aws s3tables delete-table-bucket --table-bucket-arn <ARN_TABLE_BUCKET>
Bash

Conclusion

In this post, we demonstrated a sample solution for extracting custom metadata from objects using Amazon Rekognition and Pillow. This data was stored in an S3 table bucket. We also used S3 Metadata to extract the object metadata, which was also stored in an S3 table bucket. Furthermore, we demonstrated how to use both tables to run queries and extract information about objects based on certain content types. We also shared how to create a coalesced view of the S3 Metadata table to view the latest state of the objects and use it in the queries.

As a next step, we recommend that you review our sample code, and use it as inspiration to meet your own business needs. You can customize the solution by using Amazon Bedrock or your custom content extraction tool, or even share/export data from your existing content management systems or digital asset management solutions into S3 Tables to index, join, and search for the objects and their content metadata.

Overall, S3 Metadata and S3 Tables help automate and eliminate the undifferentiated heavy lifting necessary to maintain the state of objects and their associated metadata. Go build!

Thank you for reading this post. If you have any comments or questions, leave them in the comments section.

Bimal Gajjar

Bimal Gajjar

Bimal Gajjar is a Senior Storage Solutions Architect for AWS. Bimal has over 23 years of experience in information technology. At AWS, Bimal focuses on helping Global Accounts architect, adopt, and deploy cloud storage solutions. He started his career in infrastructure solutions with GSIs and spent over two decades with OEMs at HPE, Dell EMC, and Pure Storage as Global Solutions Architect for large accounts.

Carl Summers

Carl Summers

Carl Summers is a Principal Software Engineer at Amazon Web Services, with 11 years of experience on the Amazon S3 team. Carl's work focuses on enhancing S3's security, functionality, and integration capabilities. They have had the pleasure of contributing to several S3 features including SSE-C, Event Notifications, CloudTrail integration, and Object Lambda.