AWS Big Data Blog

How SmartNews Built a Lambda Architecture on AWS to Analyze Customer Behavior and Recommend Content

by SmartNews | on | Permalink | Comments |  Share

This is a guest post by Takumi Sakamoto, a software engineer at SmartNews. SmartNews in their own words: “SmartNews is a machine learning-based news discovery app that delivers the very best stories on the Web for more than 18 million users worldwide.”

Data processing is one of the key technologies for SmartNews. Every team’s workload involves data processing for various purposes. The news team at SmartNews uses data as input to their machine learning algorithm for delivering the very best stories on the Web. The product team relies on data to run various A/B tests, to learn about how our customers consume news articles, and to make product decisions.

To meet the goals of both teams, we built a sustainable data platform based on the lambda architecture, which is a data-processing framework that handles a massive amount of data and integrates batch and real-time processing within a single framework.

Thanks to AWS services and OSS technologies, our data platform is highly scalable and reliable, and is flexible enough to satisfy various requirements with minimum cost and effort.

Our current system generates tens of GBs of data from multiple data sources, and runs daily aggregation queries or machine learning algorithms on datasets with hundreds of GBs. Some outputs by machine learning algorithms are joined on data streams for gathering user feedback in near real-time (e.g. the last 5 minutes). It lets us adapt our product for users with minimum latency. In this post, I’ll show you how we built a SmartNews data platform on AWS.

The image below depicts the platform. Please scroll to see the full architecture.

Design principles

Before I dive into how we built our data platform, it’s important to know the design principles behind the architecture.

When we started to discuss the data platform, most data was stored in a document database. Although it was a good at product launch, it became painful with growth. For data platform maintainers, it was very expensive to store and serve data at scale. At that time, our system generated more than 10 GB of user activity records every day and processing time increased linearly. For data platform users, it was hard to try something new for data processing because of the database’s insufficient scalability and limited integration with the big data ecosystem. Obviously, it wasn’t not sustainable for both.

To make our data platform sustainable, we decided to completely separate the compute and storage layers. We adopted Amazon S3  for file storage and Amazon Kinesis Streams for stream storage. Both services replicate data into multiple Availability Zones and keep it available without high operation costs. We don’t have to pay much attention to the storage layer and we can focus on the computation layer that transforms raw data to a valuable output.

In addition, Amazon S3 and Amazon Kinesis Streams let us run multiple compute layers without complex negotiations. After data is stored, everyone can consume it in their own way. For example, if a team wants to try a new version of Spark, they can launch a new cluster and start to evaluate it immediately. That means every engineer in SmartNews can craft any solutions using whatever tools they feel are best suited to the task.

Input data

The first step is dispatching raw data to both the batch layer and the speed layer for processing. There are two types of data sources at SmartNews:

  • Groups of user activity logs generated from our mobile app
  • Various tables on Amazon RDS

User activity logs include more than 60 types of activities to understand user behavior such as which news articles are read. After we receive logs from the mobile app, all logs are passed to Fluentd, an OSS log collector, and forwarded to Amazon S3 and Amazon Kinesis Streams. If you are not familiar with Fluentd, see Store Apache Logs into Amazon S3 and Collect Log Files into Kinesis Stream in Real-Time to understand how Fluentd works.

Our recommended practice is adding the flush_at_shutdown parameter. If set to true, Fluentd waits for the buffer to flush at shutdown. Because our instances are scaled automatically, it’s important to store log files on Amazon S3 before terminating instances.

In addition, monitoring Fluentd status is important so that you know when bad things happen. We use Datadog and some Fluentd plugins. Because the Fluent-plugin-flowcounter counts incoming messages and bytes per second, we post these metrics to Dogstatsd via Fluent-plugin-dogstatsd. An example configuration is available in a GitHub Gist post.

After metrics are sent to Datadog, we can visualize aggregated metrics across any level that we choose. The following graph aggregates the number of records per data source.

Also, Datadog notifies us when things go wrong. The alerts in the figure below let us know that there have been no incoming records on an instance for the last 1 hour. We also monitor Fluentd’s buffer status by using Datadog’s Fluentd integration.

Various tables on Amazon RDS are dumped by Embulk, an OSS bulk data loader, and exported to Amazon S3. Its pluggable architecture lets us mask some fields that we don’t want to export to the data platform.

Batch layer

This layer is responsible for various ETL tasks such as transforming text files into columnar files (RCFile or ORCFile) for following consumers, generating machine learning features, and pre-computing the batch views.

We run multiple Amazon EMR clusters for each task. Amazon EMR lets us run multiple heterogeneous Hive and Spark clusters with a few clicks. Because all data is stored on Amazon S3, we can use Spot Instances for most tasks and adjust cluster capacity dynamically. It significantly reduces the cost of running our data processing system.

In addition to data processing itself, task management is very important for this layer. Although a cron scheduler is a good first solution, it becomes hard to maintain after increasing the number of ETL tasks.

When using a cron scheduler, a developer needs to write additional code to handle dependencies such as waiting until the previous task is done, or failure handling such as retrying failed tasks or specifying timeouts for long-running tasks. We use Airflow, an open-sourced task scheduler, to manage our ETL tasks. We can define ETL tasks and dependencies with Python scripts.

Because every task is described as code, we can introduce pull request–based review flows for modifying ETL tasks.

Serving layer

The serving layer indexes and exposes the views so that they can be queried.

We use Presto for this layer. Presto is an open source, distributed SQL query engine for running interactive queries against various data sources such as Hive tables on S3, MySQL on Amazon RDS, Amazon Redshift, and Amazon Kinesis Streams. Presto converts a SQL query into a series of task stages and processes each stage in parallel. Because all processing occurs in memory to reduce disk I/O, end-to-end latency is very low: ~30 seconds to scan billions of records.

With Presto, we can analyze the data from various perspectives. The following simplified query shows the result of A/B testing by user clusters.

-- Suppose that this table exists
DESC hive.default.user_activities;
user_id bigint
action  varchar
abtest  array>
url     varchar

-- Summarize page view per A/B Test identifier
--   for comparing two algorithms v1 & v2
SELECT
  dt,
  t['behaviorId'],
  count(*) as pv
FROM hive.default.user_activities CROSS JOIN UNNEST(abtest) AS t (t)
WHERE dt like '2016-01-%' AND action = 'viewArticle'
  AND t['definitionId'] = 163
GROUP BY dt, t['behaviorId'] ORDER BY dt
;

-- Output:
-- 2015-12-01 | algorithm_v1 | 40000
-- 2015-12-01 | algorithm_v2 | 62000

Speed layer

Like the batch layer, the speed layer computes views from the data it receives. The difference is latency. Sometimes, the low latency adds variable outputs for the product.

For example, we need to detect current trending news by interest-based clusters to deliver the best stories for each user. For this purpose, we run Spark Streaming.

User feedback in Amazon Kinesis Streams is joined on the interest-based user cluster data calculated in offline machine learning, and then the output metrics for each news article. These metrics are used to rank news articles in a later phase. What Spark Streaming does in the above figure looks something like the following:

def main(args: Array[String]): Unit = {
  // ..... (prepare SparkContext)

  // Load user clusters that are generated by offline machine learning
  if (needToUpdate) {
    userClusterRDD: RDD[(Long, Int)] = sqlContext.sql(
      "SELECT user_id, cluster_id FROM user_cluster"
    ).map( row => {
      (row.getLong(0), row.getInt(1))
    })
  }

  // Fetch and parse JSON records in Amazon Kinesis Streams
  val userPageviewStream: DStream[(Long, String)] = ssc.union(kinesisStreams)
    .map( byteArray => {
      val json = new String(bytesArray)
      val userActivity = parse(json)
      (userActivity.user_id, userActivity.url)
    })

  // Join stream records with pre-calculated user clusters
  val clusterPageviewStream: DStream[(Int, String)] = userPageviewStream
    .transform( userPageviewStreamRDD => {
      userPageviewStreamRDD.join(userClusterRDD).map( data => {
        val (userId, (url, clusterId) ) = data
        (clusterId, url)
      })
    })

  // ..... (aggregates pageview by clusters and store to DynamoDB)
}

Because every EMR cluster uses the shared Hive metastore, Spark Streaming applications can load all tables created on the batch layer by using SQLContext. After the tables are loaded as an RDD (Resilient Distributed Dataset), we can join it to a Kinesis stream.

Spark Streaming is a great tool for empowering your machine learning–based application, but it can be overkill for simpler use cases such as monitoring. For these cases, we use AWS Lambda and PipelineDB (not covered here in detail).

Output data

Chartio is a commercial business intelligence (BI) service. Chartio enables every member (including non-engineers!) in the company to create, edit, and refine beautiful dashboards with minimal effort. This has saved us hours each week so we can spend our time improving our product, not reporting on it. Because Chartio supports various data sources such as Amazon RDS (MySQL, PostgreSQL), Presto, PipelineDB, Amazon Redshift, and Amazon Elasticsearch, you can start using it easily.

Summary

In this post, I’ve shown you how SmartNews uses AWS services and OSS technologies to create a data platform that is highly scalable and reliable, and is flexible enough to satisfy various requirements with minimum cost and effort. If you’re interested in our data platform, check out these two slides in our SlideShare: Building a Sustainable Data Platform on AWS  and Stream Processing in SmartNews.

If you have questions or suggestions, please leave a comment below.

Takumi Sakamoto is not an Amazon employee and does not represent Amazon.

———————————

Related

Building a Near Real-Time Discovery Platform with AWS

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.