Containers

Inside Pinterest’s Custom Spark Job logging and monitoring on Amazon EKS: Using AWS for Fluent Bit, Amazon S3, and ADOT

This is Part 1 of the blog post.

Introduction

Pinterest is a visual search and curation platform focused on inspiring users to create a life they love. Critical to the service are data insights, recommendations and machine learning (ML) models that are produced by synthesizing insights provided by the over 500 million monthly active users (MAUs) that use Pinterest. In the past, much of this work was performed by Monarch, Pinterest’s Hadoop based batch processing platform, which, at the last count, totaled around 14K nodes across many clusters and processed more than 120PB of data daily. However, including maintenance difficulties and inability to easily leverage performance/cost efficiency techniques such as newer JDKs or EC2 instances, combined with an increasing user base and escalating data processing demands, necessitated a more scalable solution. In particular, a Kubernetes (K8s) based approach offered several benefits:

  • Container based isolation and security as first class citizens
  • Ease of deployment through projects such as AWS Data on EKS
  • Availability of frameworks to provide missing platform functionality
  • Various levers for extracting performance such as flexibility when choosing instance types

To address these challenges and realize the benefits of a more cloud native data processing approach, Pinterest developed Moka, their next-generation Spark on the Amazon Elastic Kubernetes Service (Amazon EKS) platform. Over the past year and a half, the batch processing platform team at Pinterest Data Engineering has focused on building and migrating production Spark workloads from Monarch to Moka. This has been a great success with about 35% of the jobs migrated and > 50% expected end of year (EOY). We’ve observed a corresponding positive impact on our h/w costs, system availability and operational capabilities. Encouraged by this success, we are also now migrating other data platforms to EKS.

However, building Moka involved overcoming a significant number of technical hurdles, some of which have already been covered by these posts that cover the networking setup for Moka: Spark on Amazon EKS networking – Part 1 | Containers, Spark on Amazon EKS networking – Part 2 | Containers. Another critical challenge for teams using the platform is to quickly diagnose issues with Spark jobs and to monitor job performance and status through metrics. Consequently, it was crucial for us to build a highly robust and scalable observability platform on Amazon EKS. In this post, we describe our setup and explain how we resolved some of these issues to build a robust observability and metrics framework for Moka.

Before we delve into observability and logging, we first walk through the design of Moka and the choices we made as part of this journey. This overview provides context on the architecture and decisions that underpin Moka, helping you to understand how we approached creating a scalable and efficient Spark on the Amazon EKS platform. We discuss key components, the rationale behind the selection of certain technologies, and how these choices align with our goal of enhancing performance and manageability for Pinterest’s data processing needs.Moka high level design

Moka high level design

The following illustrates the initial high level design of Moka.

Initial Moka high level design

Figure 1: Initial Moka high level design

In the Moka system, jobs are submitted and processed as follows:

  • Scheduled workflows are decomposed into specific jobs by Spinner, Pinterest’s workflow management and orchestration platform, which transmits them to Archer, a new Amazon EKS Batch Job Submission Service we developed as part of Moka.
  • Archer converts the incoming Spark job specifications into a K8s custom resource definition (CRD), which is in the form a job definition YAML for the Spark Operator. Then, it submits the CRD to a suitable EKS cluster.
  • Pinterest EKS clusters intended for Spark are augmented by the addition of:
    • Spark Operator, which allows Spark applications to run natively on K8s.
    • Apache YuniKorn, which brings Apache Hadoop YARN style batch scheduling to K8s.
    • Remote Shuffle Service, which allows Spark applications to offload the shuffling to a dedicated service. We are currently using Apache Celeborn to provide this functionality.
  • SparkSQL jobs running on Amazon EKS interact with Hive Metastore to help convert structured query language (SQL) into Spark jobs.
  • The actual job uses container images stored in the Amazon Elastic Container Registry (Amazon ECR)
  • When the job is executing:
    • Archer keeps track of its status for upstream status updates.
    • Users can connect to the live Spark History server user interface (UI) of the Spark drivers of running jobs in EKS clusters through a Proxy network using AWS Network Load Balancers and K8s Ingress resources.
    • Spark application and event logs, along with system pod logs, are continuously streamed to Amazon Simple Storage Service (Amazon S3) using AWS for Fluent Bit.
    • Various aspects of the platform are collected by a set of agents and transmitted to Statsboard and other custom dashboards.
  • Post job execution, users use the Spark History Server to obtain job records and logs.
  • The Moka UI provides a centralized portal for users to view the (read only) status of their jobs and connect to either the live UI if the job is running or to the job’s Spark History Server page if it has completed execution.

A key learning when building Moka was that replacing Hadoop means having to find alternatives to the myriad other functionalities that it provides in addition to data processing, such as log management and aggregation. In the remainder of this post, we detail how we integrated other frameworks for logging and monitoring into the Moka platform.

Logging infrastructure

Effective management of logs output by cluster components and Spark applications is critical to determining how well they are running, identifying issues, and performing post mortem analysis. Our users expect this as a matter of course when running jobs, and thus it was important for us to find an effective alternative to the functionality provided by Hadoop. Broadly categorizing, a logging solution for Moka would have to consider 1) Amazon EKS control plane logs, 2) Spark Application logs, and 3) System pod logs. The following figure illustrates the various log categories.

Figure 2: Amazon EKS log categories

Figure 2: Amazon EKS log categories

Control plane logs are those generated by the components that constitute the Amazon EKS control plane. These include:

  • API server: Exposes the K8s application programming interface (API).
  • Audit: K8s audit logs provide a record of the individual users, administrators, or system components that have affected the cluster.
  • Authenticator: Authenticator logs are unique to Amazon EKS. They represent the control plane component that Amazon EKS uses for K8s Role Based Access Control (RBAC) authentication using AWS Identity and Access Management (IAM)
  • Controller manager: The controller manager manages the core control loops that are shipped with K8s.
  • Scheduler: Logs for the default K8s scheduler.

Because the control plane is managed by Amazon EKS, logs for these components are not available directly. Instead, Amazon EKS exports logs for each of the components to Amazon CloudWatch. Each log listed previously can be enabled/disabled independently. Once the logs are ingested into CloudWatch, AWS recommends analyzing them in-place using CloudWatch Log Insights. Because a solution for collecting these logs already exists, we instead focus on how best to collect system pod and spark application logs in the remainder of this section.

Spark applications generate a variety of logs depending on the application component:

  • Driver: These logs are generated by the Spark driver and contain information about the driver’s activities, such as task submission, task scheduling, and data shuffling.
  • Executor: These logs are generated by the Spark executors and contain information about the tasks executed by the executors, such as task start, completion, and failures.
  • Event Logs: These logs are also generated by the Spark driver during execution and contain information about the internal operations of Spark, such as the allocation of resources, the scheduling of tasks, and the execution of stages. The event logs provide a comprehensive view of the operations performed by Spark and are useful for performance tuning, debugging, and profiling.

In Monarch, we rely on Hadoop to collect Spark application logs and upload to a final destination. Driver/executor logs are aggregated and then uploaded to Amazon S3. Event logs, due to their dynamic nature, are streamed to HDFS because Amazon S3 doesn’t support appends for objects. In addition, certain Hadoop and special case logs, such as YARN app summaries and SparkSQL query logs, are also uploaded by Singer to specific locations in Amazon S3 from which they can be consumed by downstream jobs. System pod logs are those generated by non-Spark application pods that are either part of core K8s (for example Kube-proxy, CoreDNS, VPC CNI) or Moka (for example YuniKorn, Spark Operator).

In Monarch, we do not collect logs generated by the majority of Hadoop or supporting system components. However, for Moka, we felt it would be crucial to persist logs from these system-critical pods in order to diagnose failures that may occur under heavy load. In particular this is due to the transient nature of pods in K8s and the logs they produce as well as our lack of familiarity with operating Amazon EKS at scale.

In summary, a logging solution for Moka would meet the following requirements:

  • Spark application logs for a single job have to be grouped together in one location in Amazon S3 such that individual logs for drivers/executors for that job can be retrieved.
  • Upload Spark event logs to Amazon S3 in a way that can be consumed by Spark History Server. Spark is able to upload event logs to Amazon S3 but, by default, the driver buffers the logs on the local disk and only uploads once the main job completes. In the event of job errors or driver crashes, the event log is not uploaded. Spark 3.x introduced a feature (rolling event logs) that uploads Amazon S3 event logs in increments. However, the minimum increment is 10 MB, which means we would effectively suffer the same problem for small applications.
  • System pod logs have to be uploaded to individual locations in Amazon S3.
  • YuniKorn pod logs, in addition to being uploaded to Amazon S3, also need to be filtered to collect Spark application resource usage summaries that would be placed in another Amazon S3 location so that it could be processed by our cluster usage analysis workflows.

First, we considered Singer, Pinterest’s in-house logging system. Although Singer could be modified to pick up pod logs, the subsequent downstream log processing and aggregation requirements would necessitate building a collection of near real-time stream processing jobs on a Flink cluster. Alternatively, Singer could be modified to write logs to Amazon S3 directly, but, again, this would also necessitate implementing additional custom logic. For the sake of expediency, we used Fluent Bit, a Cloud Native Computing Foundation (CNCF) graduated project and the well-known solution for handling K8s logs. Fluent Bit is able to filter, forward, and augment logs, and it can be extended through plugins. In particular, an Amazon S3 plugin allows a Fluent Bit agent to directly upload files to Amazon S3.

We collaborated with members of the AWS Solution Architects team, such as the Data on EKS team, and Ashim Shrestha and William Tom from our Moka team, to deploy Fluent Bit on our EKS clusters as a DaemonSet, making sure each node has a Fluent Bit pod running. We configured Fluent Bit to perform the following tasks:

  • System pods running in their own namespaces are uploaded to unique locations in Amazon S3.
  • When submitting to Amazon EKS, Archer makes sure the driver and executor pods of a Spark job have the same unique prefix (Archer unique ID). Fluent Bit is configured to make sure logs from Spark pods are uploaded under this unique ID in Amazon S3. This makes sure logs from the same Spark application are grouped together in one location.
  • The driver of a Spark application outputs events to a single uniquely named log file to a central location on a host. Fluent Bit uploads this file in chunks to Amazon S3 in a layout that mimics the rolling event log format. It uses filtering to create additional files necessary for Spark History Server to recognize event logs files in Amazon S3.
  • Filtering is also used to extract specific strings corresponding to resource summaries from YuniKorn logs and to upload to a separate location.

The following figure illustrates the various log flows performed by Fluent Bit on a single node.

Figure 3: Fluent Bit log upload flow

Figure 3: Fluent Bit log upload flow

You can find the Fluent Bit configuration that covers system pod logs and Spark application logs in the Data on EKS repository: AWS Fluent Bit Configuration. Additionally, we used the following supplementary Fluent Bit configurations for Pinterest, particularly for event log handling:

   [INPUT]
      Name                tail
      Tag                 sel.<spark_internal_app_id>
      Path                /var/log/containers/eventlogs/*\.inprogress
      DB                  /var/log/sel_spark.db
      multiline.parser    docker, cri
      Mem_Buf_Limit       10MB
      Skip_Long_Lines     On
      Refresh_Interval    10
      Tag_Regex           (?<spark_internal_app_id>spark-[a-z0-9]+)
      Buffer_Chunk_Size   1MB
      Buffer_Max_Size     5MB

additionalFilters: |
  [FILTER]
      Name                rewrite_tag
      Match               systempods.*
      Rule                $log ^.+\s(YK_APP_SUMMARY:).*$ yuni.$TAG.log false
  [FILTER]
      Name                rewrite_tag
      Match               sel.*
      Rule                $log ^(\{\"Event\":\"SparkListenerLogStart\").+$ app.$TAG.log true
  [FILTER]
      Name                rewrite_tag
      Match               sel.*
      Rule                $log ^(\{\"Event\":\"SparkListenerApplicationEnd\").+$ suc.$TAG.log true

additionalOutputs: |
  [OUTPUT]
      Name                            s3
      Match                           yuni.*
      region                          ${region}
      bucket                          ${yk_s3_bucket_name}
      total_file_size                 5M
      s3_key_format                   /fluentbit-logs/yunikorn-app-summary/%Y/%m/%d/%H/%H%M%S_${cluster_name}_$UUID.log
      s3_key_format_tag_delimiters    ..
      store_dir                       /home/ec2-user/buffer
      upload_timeout                  5m
      log_key                         log

  [OUTPUT]
      Name                            s3
      Match                           sel.*
      region                          ${region}
      bucket                          ${s3_bucket_name}
      total_file_size                 10M
      s3_key_format                   /spark-event-logs/${pinfo_environment}/eventlog_v2_$TAG[1]/events_$INDEX_$TAG[1]_$UUID
      s3_key_format_tag_delimiters    ..
      store_dir                       /home/ec2-user/buffer
      upload_timeout                  7m
      log_key                         log

  [OUTPUT]
      Name                            s3
      Match                           app.*
      region                          ${region}
      bucket                          ${s3_bucket_name}
      total_file_size                 1M
      static_file_path                true
      s3_key_format                   /spark-event-logs/${pinfo_environment}/eventlog_v2_$TAG[2]/appstatus_$TAG[2]
      s3_key_format_tag_delimiters    ..
      store_dir                       /home/ec2-user/buffer
      upload_timeout                  5m
      log_key                         log

  [OUTPUT]
      Name                            s3
      Match                           suc.*
      region                          ${region}
      bucket                          ${s3_bucket_name}
      canned_acl                      ${canned_acl}
      total_file_size                 1M
      static_file_path                true
      s3_key_format                   /spark-event-logs/${pinfo_environment}/eventlog_v2_$TAG[2]/_SUCCESS
      s3_key_format_tag_delimiters    ..
      store_dir                       /home/ec2-user/buffer
      upload_timeout                  5m
      log_key                         log
      retry_limit                     false

Implementing Fluent Bit has enabled us to establish a comprehensive logging infrastructure efficiently. We can now effectively gather, process, and store logs from our EKS clusters. However, we have encountered some challenges that need ongoing attention:

  • Output multi-threading: The Amazon S3 output plugin in Fluent Bit could benefit from improved multi-threading capabilities to handle high log volumes more efficiently.
  • Incremental uploads: Fluent Bit uploads logs to Amazon S3 in chunks rather than as complete files. For long-running jobs that span many hours, this can create numerous small files in Amazon S3, which may cause issues when loading events through the Spark History Server. An additional compaction process is necessary to consolidate these logs into larger chunks of 128MB or 256MB.

Overall, Fluent Bit has proven to be a powerful tool for our logging needs, but continued improvements and optimizations are necessary to fully meet our requirements.

Conclusion

In this first part of our series on Pinterest’s Spark custom job logging and monitoring on Amazon EKS, we explored the foundational elements of Moka’s high-level design and its logging infrastructure at scale. We showed how the transition from the Monarch Hadoop-based platform to the Spark on the Amazon EKS platform addresses the increasing data processing demands. By using AWS for Fluent Bit, Amazon S3, and a meticulously planned logging framework, we demonstrated how Moka achieves robust log management and aggregation, making sure of operational visibility and facilitating proactive issue resolution.

The key to Moka’s success lies in its ability to effectively manage and process the extensive logs generated by the control plane, Spark applications, and system pods, making it easier to diagnose issues and perform post-mortem analysis. This comprehensive logging infrastructure forms the backbone of our observability efforts, enabling us to maintain the performance and reliability of Pinterest’s data processing environment. Stay tuned for Part 2, where we dive into the metrics and observability aspects of Moka.