Blog de Amazon Web Services (AWS)

Cierre el bucle de traspaso del cliente con Amazon Redshift en Equinox Fitness Clubs

Las herramientas de análisis de secuencias de clics gestionan los datos correctamente y algunas poseen muy buenas interfaces de inteligencia empresarial (BI). Sin embargo, el análisis de datos de secuencias de clics por sí solo tiene muchas limitaciones. Por ejemplo: un cliente está interesado en un producto o servicio que figura en su sitio web. El cliente se dirige a su tienda física para comprarlo. El analista de secuencias de clics se pregunta: “¿Qué pasó después de que vio el producto?”, y el analista de comercio se pregunta: “¿Qué pasó antes de que lo comprara?”.

No es extraño que los datos de las secuencias de clics mejoren los orígenes de los demás datos. Cuando se utilizan con los datos de compra, nos ayudan a determinar qué compras fueron descartadas y a optimizar los gastos en marketing. Asimismo, nos ayudarán a analizar el comportamiento fuera de línea y en línea, además del comportamiento de los clientes incluso antes de que hayan registrado una cuenta. Sin embargo, cuando los beneficios del sistema de fuente de datos de la secuencia de clics ya sean evidentes, habrá que adaptarse con rapidez a las nuevas solicitudes.

Esta publicación de blog muestra cómo, en Equinox Fitness Clubs, trasladamos nuestros datos desde Amazon Redshift a Amazon S3 para utilizar una estrategia de enlace dinámico con nuestros datos de secuencias de clics. Habrá cosas interesantes, como Apache Spark, Apache Parquet, lagos de datos, particiones de Hive y tablas externas. ¡Hablaremos de todos ellos en esta publicación!

Cuando comenzamos a trasladar nuestros datos de secuencias de clics desde su propia herramienta a nuestro almacén de datos de Amazon Redshift, la velocidad era un aspecto primordial. Nuestro caso de uso inicial era combinar los datos de Salesforce y los datos de Adobe Analytics para comprender mejor nuestro proceso de registro. Adobe Analytics nos puede dar información acerca de los canales y las campañas que utilizaron las personas para encontrarnos, las páginas que revisan durante la visita y si han enviado un formulario de registro en nuestro sitio. Salesforce puede decirnos si el cliente potencial era calificado, si se puso en contacto con un asesor y, finalmente, si se volvió un miembro. Una vez que combinamos estos dos conjuntos de datos, pudimos optimizar nuestro proceso de marketing y comprenderlo mejor.

Para comenzar, sabíamos cuáles eran las etapas necesarias para centralizar los datos de Salesforce y Adobe Analytics en Amazon Redshift. Sin embargo, aun después de combinarlos en Redshift, necesitaban un identificador común para comunicarse entre ellos. El primer paso fue generar y enviar el mismo GUID tanto a Salesforce como a Adobe Analytics cuando una persona enviaba un formulario de registro en nuestro sitio web.

Luego, debíamos pasar los datos de Salesforce a Redshift. Afortunadamente, esos sistemas de fuente de datos ya existían, así que pudimos agregar el nuevo atributo de GUID a la fuente de datos y describirlo en Redshift.

De la misma manera, debimos generar fuentes de datos desde Adobe Analytics a Amazon Redshift. Adobe Analytics ofrece Amazon S3 como opción de destino para nuestros datos, por lo que pasamos los datos a S3 y, luego, creamos un trabajo para enviarlos a Redshift. El trabajo consistía en tomar la fuente de datos diaria de Adobe Analytics (que contiene un archivo de datos con cientos de columnas y miles de filas, una colección de archivos de búsqueda como los encabezados de los datos, y un archivo de manifiesto que describe los archivos que han sido enviados) y trasladar todo a Amazon S3 en su estado bruto. Desde aquí, utilizamos Amazon EMR con Apache Spark para procesar los archivos del sistema de fuente de datos en un solo archivo CSV. Luego, lo guardamos en S3 para poder ejecutar el comando COPIAR y enviar los datos a Amazon Redshift.

El trabajo se ejecutó durante algunas semanas y funcionó bien, hasta que comenzamos a utilizar los datos con más frecuencia. Aunque el trabajo era eficaz, comenzaron a antedatarse datos con columnas nuevas (evolución de esquemas). En ese momento, decidimos que necesitábamos más flexibilidad debido a la naturaleza de los datos.

Lago de datos al rescate

Cuando decidimos refactorizar el trabajo, había dos cosas ya resueltas. Primero, ya estábamos encaminados hacia una estrategia más parecida a la del lago de datos. Segundo, Redshift Spectrum había sido lanzado recientemente. Nos permitiría consultar los archivos de texto plano de los datos de secuencias de clics en nuestro lago de datos sin tener que ejecutar el comando COPIAR y almacenarlos en Redshift. Además, podríamos combinar más eficazmente los datos de las secuencias de clics con otros orígenes de datos almacenados dentro de Redshift.

Queríamos aprovechar la autodescripción de datos, que combina el esquema de los datos con los datos en sí mismos. La conversión de los datos en datos autodescriptivos nos ayudaría a gestionar los grandes conjuntos de datos de secuencias de clics y a prevenir los desafíos relacionados con la evolución de esquemas. Podríamos incluir cualquier columna deseada en el archivo del lago de datos y, luego usar solo las columnas importantes para nuestras consultas y así acelerar el proceso. Para conseguir esta flexibilidad, utilizamos el formato de archivo de Apache Parquet, que es tanto autodescriptivo como increíblemente veloz gracias a su tecnología de almacenamiento por columnas. Utilizamos Apache Spark en Amazon EMR para convertirlo de CSV en Parquet y hacer una partición de datos para escanear el rendimiento, como se muestra en el siguiente código.

from datetime import date, timedelta
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json
import argparse
	
# Usage
# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios
# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31

parser = argparse.ArgumentParser()
parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')
parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')
parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')
parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')
parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')
parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')
parser.add_argument('application', help='App name for job',type=str, default='XXX')
args = parser.parse_args()

spark = SparkSession\
        .builder\
        .appName(args.application)\
        .getOrCreate()

sc = spark.sparkContext

def manifest_toJSON(file, location):
	text = sc.textFile(file).collect()
	manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0}
	for x in text:
		if 'Lookup-File:' in x:
			manifest['lookup_files'].append(location+x.split(': ')[1])
		elif 'Data-File: 01' in x:
			wildcard_path = x.replace('Data-File: 01','*')
			manifest['data_files'] += wildcard_path
		elif 'Record-Count:' in x:
			manifest['total_rows'] += int(x.split(': ')[1])
	return manifest
 
# Crear metadatos mediante la combinación de las rutas de archivo
# del archivo del encabezado y el archivo de datos desde el archivo de manifiesto
# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'
base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)
manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)
metadata = manifest_toJSON(manifest_filepath, base_filepath)

# Crear una lista de archivos y sus datos
# Buscar específicamente los datos de column_headers.tsv
# Partir \x00 para eliminar el cifrado basura y obtener una cadena de encabezados
lookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()
encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')
header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\
			.replace('\n', '')\
			.replace('(', '')\
			.replace(')', '')\
			.replace(' ', '-')

# Crear un esquema para la lista a partir del archivo del encabezado dividido en pestañas
# Enviar todo como una cadena para evitar fallas de datos
schema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])

# Evitar RDD y escribir un archivo de datos como una trama de datos
# luego, guardarla como Parquet para unir los encabezados con sus valores respectivos
df = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)
destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)
df.write.mode('overwrite').parquet(destination_filepath)

# Salir de Spark y de este archivo
sc.stop()
exit()

Cuando utilizamos el catálogo de datos de AWS Glue, pudimos habilitar la consulta de nuestros datos de secuencias de clics dentro de Amazon Redshift y otras herramientas de consulta, como Amazon Athena y Apache Spark. Esto se logra asignando el archivo Parquet a un esquema relacional. AWS Glue permite consultar datos adicionales en cuestión de segundos. Esto se debe a que los cambios de esquema pueden ocurrir en tiempo real. Significa que se puede eliminar y agregar columnas, reordenar los índices de columna y cambiar los tipos de columna, todo de una vez. Luego, es posible consultar los datos inmediatamente después de guardar el esquema. Además, el formato Parquet previene fallas cuando la forma de los datos cambia o cuando ciertas columnas son obsoletas y se eliminan del conjunto de datos.

Utilizamos la siguiente consulta para crear nuestra primera tabla de AWS Glue para los datos de nuestro sitio web de Adobe Analytics. Ejecutamos esta consulta en Amazon Redshift en SQL Workbench.

--Primero cree su esquema
create external schema omniture_prod
from data catalog 
database 'omniture' 
iam_role 'arn:aws:iam:::role

--Luego cree su "tabla" 
CREATE EXTERNAL TABLE omniture_prod.eqx_web (
  date_time		VARCHAR,
  va_closer_id		VARCHAR,
  va_closer_detail	VARCHAR,
  va_finder_detail	VARCHAR,
  va_finder_id		VARCHAR,
  ip			VARCHAR,
  domain		VARCHAR,
  post_evar1		VARCHAR
)
STORED AS PARQUET
LOCATION 's3://eqxdl-prod/omniture/eqx_web/'
table properties ('parquet.compress'='SNAPPY');

--Compruebe sus bases de datos, esquemas y tablas
select * from pg_catalog.svv_external_databases;
select * from pg_catalog.svv_external_schemas;
select * from pg_catalog.svv_external_tables;

Después de ejecutar esta consulta, agregamos columnas adicionales al esquema a pedido a través de la interfaz de AWS Glue. También utilizamos particiones para hacer nuestras consultas más rápidas y económicas.

En este punto, teníamos una nueva carpeta de esquema en nuestra base de datos. Contenía la tabla externa que se podía consultar, pero queríamos ir un paso más allá. Necesitábamos agregar algunas transformaciones a los datos tales como:

  • Cambiar el nombre de ID a cadenas
  • Concatenar valores
  • Manipular cadenas, sin incluir el tráfico de bot que enviamos desde AWS para probar el sitio web
  • Cambiar los nombres de las columnas para que sean más fáciles de usar.

Para hacer esto, creamos una vista de la tabla externa, y lo hicimos de la siguiente manera:

create view edw_t.f_omniture_web as
select
    REPLACE(dt, '-', '') as hive_date_key,
    va_closer_id,
    va_closer_detail as last_touch_campaign,
    CASE
        WHEN (va_closer_id) = '1' THEN 'Paid Search'
        WHEN (va_closer_id) = '2' THEN 'Natural Search'
        WHEN (va_closer_id) = '3' THEN 'Display'
        WHEN (va_closer_id) = '4' THEN 'Email Acq'
        WHEN (va_closer_id) = '5' THEN 'Direct'
        WHEN (va_closer_id) = '6' THEN 'Session Refresh'
        WHEN (va_closer_id) = '7' THEN 'Social Media'
        WHEN (va_closer_id) = '8' THEN 'Referring Domains'
        WHEN (va_closer_id) = '9' THEN 'Email Memb'
        WHEN (va_closer_id) = '10' THEN 'Social Placement'
        WHEN (va_closer_id) = '11' THEN 'Other Placement'
        WHEN (va_closer_id) = '12' THEN 'Partnership'
        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS last_touch_channel,
    va_finder_detail as first_touch_campaign,
    va_finder_id as va_finder_id,
    CASE
        WHEN (va_finder_id) = '1' THEN 'Paid Search'
        WHEN (va_finder_id) = '2' THEN 'Natural Search'
        WHEN (va_finder_id) = '3' THEN 'Display'
        WHEN (va_finder_id) = '4' THEN 'Email Acq'
        WHEN (va_finder_id) = '5' THEN 'Direct'
        WHEN (va_finder_id) = '6' THEN 'Session Refresh'
        WHEN (va_finder_id) = '7' THEN 'Social Media'
        WHEN (va_finder_id) = '8' THEN 'Referring Domains'
        WHEN (va_finder_id) = '9' THEN 'Email Memb'
        WHEN (va_finder_id) = '10' THEN 'Social Placement'
        WHEN (va_finder_id) = '11' THEN 'Other Placement'
        WHEN (va_finder_id) = '12' THEN 'Partnership'
        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS first_touch_channel,
    ip as ip_address,
    domain as domain,
    post_evar1 AS internal_compaign,
    post_evar10 as site_subsection_nm,
    post_evar11 as IOS_app_view_txt,
    post_evar12 AS site_section_nm,
    post_evar15 AS transaction_id,
    post_evar23 as join_barcode_id,
    post_evar3 AS page_nm,
    post_evar32 as host_nm,
    post_evar41 as class_category_id,
    post_evar42 as class_id,
    post_evar43 as class_instance_id,
    post_evar60 AS referral_source_txt,
    post_evar69 as adwords_gclid,
    post_evar7 as usersec_tracking_id, 
    post_evar8 as facility_id,
    post_event_list as post_event_list,  
    post_visid_low||post_visid_high as unique_adobe_id,
    post_visid_type as post_visid_type,
    post_page_event as hit_type,
    visit_num as visit_number,
    visit_start_time_gmt,
    post_evar25 as login_status,
    exclude_hit as exclude_hit,
    hit_source as hit_source,
    geo_zip,
    geo_city,
    geo_region,
    geo_country,
    post_evar64 as api_error_msg,
    post_evar70 as page_load_time,
    post_evar78 as join_transaction_id,
    post_evar9 as page_url,
    visit_start_pagename as entry_pg,
    post_tnt as abtest_campaign,
    post_tnt_action as abtest_experience,
    user_agent as user_agent,
    mobile_id as mobile_id,
    cast(date_time as timestamp) as date_time,
    CONVERT_TIMEZONE(
        'America/New_York', -- timezone of origin
        (cast(
            case 
            when post_t_time_info like '%undefined%' then '0'
            when post_t_time_info is null then '0'
            when post_t_time_info = '' then '0'
            when cast(split_part(post_t_time_info,' ',4) as int) < 0
              then left(split_part(post_t_time_info,' ',4),4)
            else left(split_part(post_t_time_info,' ',4),3) end as int
        )/60),
        cast(date_time as timestamp)
    ) as date_time_local,
    post_t_time_info as local_timezone
from omniture_prod.eqx_web
where exclude_hit = '0'
and hit_source not in ('5','7','8','9')

and domain <> 'amazonaws.com'
and domain <> 'amazon.com'

WITH NO SCHEMA BINDING;

Ahora podemos realizar consultas desde Amazon Redshift, combinando nuestros datos estructurados de Salesforce con nuestros datos dinámicos semiestructurados de Adobe Analytics. Con estos cambios, nuestros datos se volvieron extremadamente flexibles, amigables con el espacio de almacenamiento y muy efectivos cuando se les consulta. Desde entonces, hemos comenzado a usar Redshift Spectrum para muchos casos de uso, como controles de calidad de datos, datos de máquinas, archivo de datos históricos y facilitarles a nuestros analistas de datos y científicos la tarea de combinar e incorporar datos más fácilmente.

with web_leads as (
  select transaction_id,last_touch_channel
  from edw_t.f_omniture_web
  where hive_date_key = '20170301'
      and post_event_list like '%201%'
      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'
),
opp_lifecycle as (
  SELECT lifecycle,weblead_transactionid
  FROM edw_t.f_opportunity
  where weblead_transactionid is not null
  and created_date_key between '20170220'and '20170310'
)
select
  web_leads.transaction_id,
  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecycle
from web_leads
left join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

Conclusión

Pudimos configurar una plataforma de análisis eficiente y flexible de datos de flujo de clics combinando el lago de datos de Amazon S3 con Amazon Redshift. Esto ha eliminado la necesidad de cargar siempre datos de flujo de clics en el almacén de datos y también ha hecho que la plataforma se adapte a los cambios de esquema en los datos entrantes. Lea la documentación de Redshift para comenzar a usar Redshift Spectrum y también vea a continuación nuestra presentación en AWS Chicago Summit 2018.


Lectura adicional

Si encuentra útil esta publicación, asegúrese de visitar “From Data Lake to Data Warehouse: Enhancing Customer 360 with Amazon Redshift Spectrum” y “Narrativ is helping producers monetize their digital content with Amazon Redshift“.


Sobre el autor

Ryan Kelly es arquitecto de datos en Equinox, donde ayuda a delinear e implementar marcos para iniciativas de datos. También lidera el seguimiento del flujo de clics, que ayuda a los equipos a comprender sus iniciativas digitales. A Ryan le encanta colaborar para que las personas puedan acceder y utilizar sus datos con fines de inteligencia empresarial, análisis y enriquecimiento de productos y servicios. También le gusta explorar nuevas tecnologías para encontrar nuevas formas de mejorar el trabajo en Equinox.