AWS Big Data Blog

Close the customer journey loop with Amazon Redshift at Equinox Fitness Clubs

Clickstream analysis tools handle their data well, and some even have impressive BI interfaces. However, analyzing clickstream data in isolation comes with many limitations. For example, a customer is interested in a product or service on your website. They go to your physical store to purchase it. The clickstream analyst asks, “What happened after they viewed the product?” and the commerce analyst asks themselves, “What happened before they bought it?”

It’s no surprise that clickstream data can enhance your other data sources. When used with purchase data, it helps you determine abandoned carts or optimize marketing spending. Similarly, it helps you analyze offline and online behavior, and also behavior before customers even registered an account. However, once the benefits of clickstream data feeds are evident, you must accommodate new requests quickly.

This blog post shows how we, at Equinox Fitness Clubs, moved our data from Amazon Redshift to Amazon S3 to use a late-binding view strategy with our clickstream data. Expect such fun things as Apache Spark, Apache Parquet, data lakes, hive partitioning, and external tables, all of which we will talk about extensively in this post!

When we first started passing our clickstream data from its own tool to our Amazon Redshift data warehouse, speed was the primary concern. Our initial use case was to tie Salesforce data and Adobe Analytics data together to get a better understanding about our lead process. Adobe Analytics could tell us what channel and campaigns people came from, what pages they looked at during the visit, and if they submitted a lead form on our site. Salesforce could tell us if a lead was qualified, if they ever met an advisor, and ultimately if they became a member. Tying these two datasets together helped us better understand and optimize our marketing.

To begin, we knew the steps involved to centralize Salesforce and Adobe Analytics data into Amazon Redshift. However, even when joined together in Redshift, they needed a common identifier to talk to each other. Our first step involved generating and sending the same GUID to both Salesforce and Adobe Analytics when a person submitted a lead form on our website.

Next, we had to pass Salesforce data to Redshift. Luckily, those feeds already existed, so we could add this new GUID attribute in the feed and describe it in Redshift.

Similarly, we had to generate data feeds from Adobe Analytics to Amazon Redshift. Adobe Analytics provides Amazon S3 as a destination option for our data, so we passed the data to S3 and then created a job to send it to Redshift. The job involved taking the daily Adobe Analytics feed – which comes with a data file containing hundreds of columns and hundreds of thousands of rows, a collection of lookup files like the headers for the data, and a manifest file that describes the files that were sent – and passing it all to Amazon S3 in its raw state. From there, we used Amazon EMR with Apache Spark to process the data feed files into a single CSV file, and then save it to S3 so that we could perform the COPY command to send the data to Amazon Redshift.

The job ran for a few weeks and worked well until we started to use the data more frequently. While the job was effective, backdating data with new columns (schema evolution) started to occur. That’s when we decided we needed greater flexibility because of the nature of the data.

Data lake to the rescue

When we decided to refactor the job, we had two things lined up for us. First, we were already moving towards more of a data lake strategy. Second, Redshift Spectrum had been recently released. It would enable us to query these flat files of clickstream data in our data lake without ever having to run the COPY command and store it in Redshift. Also, we could more efficiently join the clickstream data to other data sources stored inside of Redshift.

We wanted to take advantage of self-describing data which combines the schema of the data with the data itself. Converting the data to self-describing data would help us manage the wide clickstream datasets and prevent the schema-evolution related challenges. We could pull every column we desired into the data lake file and then use only the important columns in our queries to speed up processing. To accomplish this flexibility, we used the Apache Parquet file format, which is both self-describing and blazing fast due to its columnar storage technology. We used Apache Spark on Amazon EMR to convert from CSV to Parquet, and partition the data for scanning performance, as shown in the following code.

 

from datetime import date, timedelta
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json
import argparse
	
# Usage
# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios
# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31

parser = argparse.ArgumentParser()
parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')
parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')
parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')
parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')
parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')
parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')
parser.add_argument('application', help='App name for job',type=str, default='XXX')
args = parser.parse_args()

spark = SparkSession\
        .builder\
        .appName(args.application)\
        .getOrCreate()

sc = spark.sparkContext

def manifest_toJSON(file, location):
	text = sc.textFile(file).collect()
	manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0}
	for x in text:
		if 'Lookup-File:' in x:
			manifest['lookup_files'].append(location+x.split(': ')[1])
		elif 'Data-File: 01' in x:
			wildcard_path = x.replace('Data-File: 01','*')
			manifest['data_files'] += wildcard_path
		elif 'Record-Count:' in x:
			manifest['total_rows'] += int(x.split(': ')[1])
	return manifest

# Create metadata by stitching together the file paths for
# the header file and the data file from the manifest file
# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'
base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)
manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)
metadata = manifest_toJSON(manifest_filepath, base_filepath)

# Create a list of files and their data
# Look specifically for the column_headers.tsv data
# Split on \x00 to remove the garbage encoding and return a string of headers
lookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()
encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')
header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\
			.replace('\n', '')\
			.replace('(', '')\
			.replace(')', '')\
			.replace(' ', '-')

# Create a schema for the list from the header file splitting on tabs
# Cast everything as a string to avoid data type failures
schema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])

# Bypass RDD and write data file as a dataframe
# then save as parquet to tie headers to their respective values
df = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)
destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)
df.write.mode('overwrite').parquet(destination_filepath)

# Gracefully exit out of spark and this file
sc.stop()
exit()

 

Using the AWS Glue Data Catalog allowed us to make our clickstream data available to be queried within Amazon Redshift and other query tools like Amazon Athena and Apache Spark. This is accomplished by mapping the Parquet file to a relational schema. AWS Glue, enables querying additional data in mere seconds. This is because schema changes can occur in real time. This means you can delete and add columns, reorder column indices, and change column types all at once. You can then query the data immediately after saving the schema. Additionally, Parquet format prevents failures when the shape of the data changes, or when certain columns are deprecated and removed from the data set.

We used the following query to create our first AWS Glue table for our Adobe Analytics website data. We ran this query in Amazon Redshift in SQL Workbench.

 

--First create your schema
create external schema omniture_prod
from data catalog 
database 'omniture' 
iam_role 'arn:aws:iam:::role

--Then create your “table” 
CREATE EXTERNAL TABLE omniture_prod.eqx_web (
  date_time		VARCHAR,
  va_closer_id		VARCHAR,
  va_closer_detail	VARCHAR,
  va_finder_detail	VARCHAR,
  va_finder_id		VARCHAR,
  ip			VARCHAR,
  domain		VARCHAR,
  post_evar1		VARCHAR
)
STORED AS PARQUET
LOCATION 's3://eqxdl-prod/omniture/eqx_web/'
table properties ('parquet.compress'='SNAPPY');

--Check your databases, schemas, and tables
select * from pg_catalog.svv_external_databases;
select * from pg_catalog.svv_external_schemas;
select * from pg_catalog.svv_external_tables;

 

After running this query, we added additional columns to the schema upon request throughthe AWS Glue interface. We also used partitioning to make our queries faster and cheaper.

At this point, we had a new schema folder in our database. It contained the external table that could be queried, but we wanted to take it a step further. We needed to add some transformations to the data such as:

  • Renaming IDs to strings
  • Concatenating values
  • Manipulating strings, excluding bot traffic that we send from AWS to test the website
  • Changing the column names to be more user friendly.

To do this, we created a view of the external table, as follows:

 

create view edw_t.f_omniture_web as 
select
    REPLACE(dt, '-', '') as hive_date_key,
    va_closer_id,
    va_closer_detail as last_touch_campaign,
    CASE
        WHEN (va_closer_id) = '1' THEN 'Paid Search'
        WHEN (va_closer_id) = '2' THEN 'Natural Search'
        WHEN (va_closer_id) = '3' THEN 'Display'
        WHEN (va_closer_id) = '4' THEN 'Email Acq'
        WHEN (va_closer_id) = '5' THEN 'Direct'
        WHEN (va_closer_id) = '6' THEN 'Session Refresh'
        WHEN (va_closer_id) = '7' THEN 'Social Media'
        WHEN (va_closer_id) = '8' THEN 'Referring Domains'
        WHEN (va_closer_id) = '9' THEN 'Email Memb'
        WHEN (va_closer_id) = '10' THEN 'Social Placement'
        WHEN (va_closer_id) = '11' THEN 'Other Placement'
        WHEN (va_closer_id) = '12' THEN 'Partnership'
        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS last_touch_channel,
    va_finder_detail as first_touch_campaign,
    va_finder_id as va_finder_id,
    CASE
        WHEN (va_finder_id) = '1' THEN 'Paid Search'
        WHEN (va_finder_id) = '2' THEN 'Natural Search'
        WHEN (va_finder_id) = '3' THEN 'Display'
        WHEN (va_finder_id) = '4' THEN 'Email Acq'
        WHEN (va_finder_id) = '5' THEN 'Direct'
        WHEN (va_finder_id) = '6' THEN 'Session Refresh'
        WHEN (va_finder_id) = '7' THEN 'Social Media'
        WHEN (va_finder_id) = '8' THEN 'Referring Domains'
        WHEN (va_finder_id) = '9' THEN 'Email Memb'
        WHEN (va_finder_id) = '10' THEN 'Social Placement'
        WHEN (va_finder_id) = '11' THEN 'Other Placement'
        WHEN (va_finder_id) = '12' THEN 'Partnership'
        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS first_touch_channel,
    ip as ip_address,
    domain as domain,
    post_evar1 AS internal_compaign,
    post_evar10 as site_subsection_nm,
    post_evar11 as IOS_app_view_txt,
    post_evar12 AS site_section_nm,
    post_evar15 AS transaction_id,
    post_evar23 as join_barcode_id,
    post_evar3 AS page_nm,
    post_evar32 as host_nm,
    post_evar41 as class_category_id,
    post_evar42 as class_id,
    post_evar43 as class_instance_id,
    post_evar60 AS referral_source_txt,
    post_evar69 as adwords_gclid,
    post_evar7 as usersec_tracking_id, 
    post_evar8 as facility_id,
    post_event_list as post_event_list,  
    post_visid_low||post_visid_high as unique_adobe_id,
    post_visid_type as post_visid_type,
    post_page_event as hit_type,
    visit_num as visit_number,
    visit_start_time_gmt,
    post_evar25 as login_status,
    exclude_hit as exclude_hit,
    hit_source as hit_source,
    geo_zip,
    geo_city,
    geo_region,
    geo_country,
    post_evar64 as api_error_msg,
    post_evar70 as page_load_time,
    post_evar78 as join_transaction_id,
    post_evar9 as page_url,
    visit_start_pagename as entry_pg,
    post_tnt as abtest_campaign,
    post_tnt_action as abtest_experience,
    user_agent as user_agent,
    mobile_id as mobile_id,
    cast(date_time as timestamp) as date_time,
    CONVERT_TIMEZONE(
        'America/New_York', -- timezone of origin
        (cast(
            case 
            when post_t_time_info like '%undefined%' then '0'
            when post_t_time_info is null then '0'
            when post_t_time_info = '' then '0'
            when cast(split_part(post_t_time_info,' ',4) as int) < 0
              then left(split_part(post_t_time_info,' ',4),4)
            else left(split_part(post_t_time_info,' ',4),3) end as int
        )/60),
        cast(date_time as timestamp)
    ) as date_time_local,
    post_t_time_info as local_timezone
from omniture_prod.eqx_web
where exclude_hit = '0'
and hit_source not in ('5','7','8','9')

and domain <> 'amazonaws.com'
and domain <> 'amazon.com'

WITH NO SCHEMA BINDING;

 

Now we can perform queries from Amazon Redshift, blending our structured Salesforce data with our semi-structured, dynamic Adobe Analytics data. With these changes, our data became extremely flexible, friendly on storage size, and very performant when queried. Since then, we have started using Redshift Spectrum for many use cases from data quality checks, machine data, historical data archiving, and empowering our data analysts and scientists to more easily blend and onboard data.

 

with web_leads as (
  select transaction_id,last_touch_channel
  from edw_t.f_omniture_web
  where hive_date_key = '20170301'
      and post_event_list like '%201%'
      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'
),
opp_lifecycle as (
  SELECT lifecycle,weblead_transactionid
  FROM edw_t.f_opportunity
  where weblead_transactionid is not null
  and created_date_key between '20170220'and '20170310'
)
select
  web_leads.transaction_id,
  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecycle
from web_leads
left join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

Conclusion

We were able to setup an efficient and flexible analytics platform for clickstream data by combining the Amazon S3 data lake with Amazon Redshift. This has eliminated the need to always load clickstream data into the data warehouse, and also made the platform adaptable to schema changes in the incoming data. You can read the Redshift documentation to get started with Redshift Spectrum, and also watch our presentation at the AWS Chicago Summit 2018 below.


Additional Reading

If you found this post helpful, be sure to check out From Data Lake to Data Warehouse: Enhancing Customer 360 with Amazon Redshift Spectrum, and Narrativ is helping producers monetize their digital content with Amazon Redshift.

 


About the Author

Ryan Kelly is a data architect at Equinox, where he helps outline and implement frameworks for data initiatives. He also leads clickstream tracking which helps aid teams with insights on their digital initiatives. Ryan loves making it easier for people to reach and ingest their data for the purposes of business intelligence, analytics, and product/service enrichment. He also loves exploring and vetting new technologies to see how they can enhance what they do at Equinox.