AWS Storage Blog

Zalando handles millions of objects with Amazon S3 event notifications

As mentioned the last time we made a guest appearance on this blog, Zalando is Europe’s leading online platform for fashion and lifestyle, with over 34 million active customers. The first Zalando blog post, authored by a colleague of mine and data engineer Max Schultze, discussed our data lake on Amazon S3. I am Ire Sun, a data engineer, and I work on ingestion pipelines and storage management at Zalando. Max enjoyed sharing some of his learnings on this blog channel. After he finished up, he recommended I share another use case of ours to spread more information on how useful some AWS services have been to our business.

At Zalando, we have datasets materialized in Amazon S3 that feed input to downstream pipelines, such as personalized recommendations, sizing suggestions, and sales forecasting. To produce the most accurate model and forecasting, we must react on the S3 object creation as soon as possible. In this blog post, I cover how Zalando leverages Amazon S3 event notifications and AWS Lambda to build a reactive and highly scalable data flow, which enables Zalando to produce high-quality data products.

Event-bus integration

At Zalando, we have an internal event-bus (so-called Nakadi) to serve as a communication channel between services. There are 4200 event-types in Zalando. Event-types are a concept similar to topics in Kafka, and each event-type contains events of corresponding action (for example, change of wishlist, submission of a search query, etc.).

The events history raw data is moved into Amazon S3, which creates around 60 million S3 objects per day, to be analyzed and processed. With millions of small JSON files that are non-splittable and semistructured, the raw data is not in the best format for query engines like Presto or Spark. Besides that, each event-type could have a different optimal partition key based on the use case.

In Nakadi, we can roughly divide the event-types into two categories:

  • Compatible schema: cannot have a schema-breaking change (for example, column renaming or type change) without creating a new one.
  • Incompatible schema: can have schema-breaking changes if necessary.

And we wanted to meet the following goals for Zalando’s data lake to best deal with raw data:

  • For both compatible and incompatible event-types, we wanted to provide a repartitioned JSON format to the user, with the dataset repartitioned based on the use case. We did so to enable users to have a decent performance in the query engine.
  • For compatible-schema event-types, we wanted to provide a fully structured Parquet format, so that users can execute their queries efficiently.
  • For users that have the capability to write their own ETL pipeline, we wanted to provide a channel for them to read the raw data in their applications.

To achieve the preceding goals, we configured an Amazon S3 event notification. The Amazon S3 notification feature enables you to receive notifications when certain events happen in your bucket. To enable notifications, first add a notification configuration that identifies the events you want S3 to publish and the destinations where you want S3 to send the notifications.

The Amazon S3 event notification we created is an event to an Amazon SQS (SQS) queue whenever an object is uploaded to a specific bucket and prefix. The SQS queue is integrated with an AWS Lambda function. The AWS Lambda function forwards the same message body to an Amazon SNS (SNS) topic with the Nakadi event-type as an additional message attribute. From this point, the fan-out functionality of SNS enables us to branch out to different targets.

In addition to the main SQS queue that is receiving messages from S3 Bucket, we created another SQS queue as the dead-letter-queue (Amazon SQS DLQ) of the main queue. Whenever a message in the main queue failed to be processed by the Lambda for N times, the message would be moved to the DLQ. We integrate the DLQ with Lambda, but the integration is usually disabled. Only when we observe messages in DLQ and resolve the issue do we temporarily enable the integration of DLQ, to reprocess the failed messages.

Figure 1 - Forwarding Lambda with message attribute

Figure 1: Forwarding Lambda with message attribute

For the repartition-only JSON format, we created an SQS queue that is subscribed to the SNS topic without any condition. The SQS queue integrates with a Lambda function, which repartitions the data and writes to the destination S3 bucket.

Figure 2 - Repartition Lambda

Figure 2: Repartition Lambda

For the fully structured Parquet format, we created one Amazon SQS queue per Nakadi event-type. We subscribed the SQS queue to the Amazon SNS topic with an SNS filter policy to filter by the message attribute (event-type) we added in the previous step. As we scaled out the service, we noticed that one AWS account can only have up-to 200 filter-policies per default. Therefore, we implemented an AWS Lambda function that replaced the Amazon SNS filter-policy by tagging the SQS queue with its Nakadi event-type, and searched the corresponding queue with Resource Groups Tagging API. With this approach, we can easily scale-out the filter-condition to 1000+ destination queues.

The Amazon SQS queue is connected to a Spark application with the S3-SQS connector (open-source version), which takes Amazon S3 notifications as streaming input and provides the object-content in the data frame. At this point, we are able to apply the compatible schema and write larger (but fewer) files to the destination bucket as Delta format.

Figure 3 - Spark S3-SQS connector

Figure 3: Spark S3-SQS connector

For a custom ETL pipeline, we grant the user accounts the right to subscribe to the Amazon SNS topic and read the data from Amazon S3. From this point, the users can do whatever they need, with or without filter-policies in the subscription.

Now the pipelines are ready and datasets are materialized in a more efficient format and distribution.

When reading JSON datasets, the query engines can filter the datasets by partition key to reduce the data scanned to have a faster query performance. And query engines can benefit from the following advantages when reading Delta datasets:

  • Reading the dataset without providing/inferring schema
  • Column statistics for data skipping in addition to partition pruning
  • Creating Spark structure-streaming against the Delta format

Amazon S3 inventory integration

Zalando has more than 4000 S3 buckets across the whole organization. To have better observability to any of the S3 buckets, we configured Amazon S3 inventory for each bucket in the organization. S3 inventory is one of the tools Amazon S3 provides to help manage your storage. You can use it to simplify and speed up business workflows and big data jobs and is a scheduled alternative to the Amazon S3 synchronous List API operation.

We wanted to copy the inventory to a central dataset partitioned by bucket name once the inventory of a bucket is generated. By having the centralized inventory, Spark can read the latest state of a bucket without integrating with symlink.txt each time it reads the inventory of a bucket. Nevertheless, storing the centralized inventory as a Delta table, enables us to time travel to any date of the history of a bucket inventory.

Whenever an inventory is generated, Amazon S3 generates a manifest.json and manifest.checksum as a completion signal. We configured an S3 notification that notifies an SQS queue whenever a manifest.checksum got created. And an AWS Lambda function is fetching the messages from the SQS queue periodically. The AWS Lambda function triggers a new Spark job whenever there is a new inventory complete notification. The Spark job reads from the inventory generated by S3 according to symlink.txt, and overwrites the partition of this bucket in the central inventory.

Figure 4 - Amazon S3 inventory

Figure 4: Amazon S3 inventory

With the centralized inventory, Zalando managed to generate a cost report and forecast for S3 storage across different buckets, and make decisions of storage class transition, whether to keep a dataset, or other cost advice. Nevertheless, whenever we need to operate S3 objects for a huge batch, we can generate the manifest by reading the centralized inventory without the necessity of listing a bucket.

Conclusion

In the end, we used Amazon S3 event notifications to convert millions of S3 objects from thousands of Nakadi event types to a different format. We also trigger S3 inventory pipelines with the creation of checksum files. To achieve the same goal with the S3 listing API, we would need to spend an additional 10,000+ USD per year, not including the operation and engineer cost.

Serverless resources are extremely useful when it comes to operation. They saved us from doing the heavy-lifting of service scaling. We don’t need to worry about if the service failing due to insufficient resources, or even checking it before any campaign. On some of our busiest days, like Black Friday, the only thing we have to do is increase our account’s AWS Lambda function concurrency limit. Increasing the concurrent limit costs nothing, one only gets charged when their services allocated the resources from AWS. Most importantly, engineers get to focus on the feature and pipeline quality, without worrying about the infrastructure.

Amazon S3 event notifications can react when certain events happen in your bucket, from minor operations to object creation. With our notification settings, we are prevented from listing S3 buckets and having to build a database to track when there is a new object. With this feature, it is possible to:

  • Make AWS Lambda react to object creation to facilitate all kinds of transformations, for example, exploding JSON to JSON-lines, converting file formats, and adding metadata/tags to an S3 object.
  • Make Spark read the notifications and parse the newly created objects.
  • Use the notification as a completion message whenever a dataset is completed.

If you, like us, have applications reacting to Amazon S3 object creations, please consider using S3 event notifications with Amazon SQS. Doing so would save you from creating the infrastructure of continuously listing and detecting the S3 object creation and operations. Thanks for reading this blog post, if you have any comments or questions, don’t hesitate to leave them in the comments section.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.