亚马逊AWS官方博客

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js

此博文由 NUVIAD 的创始人和 CEO Rafi Ton 特约发表。NUVIAD 对自己的定位是:“为专业营销人员机构和本地企业提供一流工具以通过精确定向、大数据分析和高级机器学习工具来推广他们的产品和服务的移动营销平台。

在 NUVIAD,我们 3 年多以来一直将 Amazon Redshift 用作我们的主要数据仓库解决方案。

我们存储了大量广告交易数据,我们的用户和合作伙伴对这些数据进行分析,以确定广告活动策略。大规模运行实时竞标 (RTB) 活动时,数据的新鲜度至关重要,因为这样一来,我们的用户就可以对广告效果的变化做出快速反应。我们选择 Amazon Redshift 是因为它的简单性、可扩展性、性能及几乎实时加载新数据的能力。

在过去三年里,我们的客户群大大增加,数据亦是如此。我们发现 Amazon Redshift 集群从 3 个节点增长到 65 个节点。为了平衡成本和分析性能,我们找到了一种方法,以更低的成本存储大量不太频繁分析的数据。然而,我们仍然希望能够立即为用户查询提供数据及满足他们对性能显著提升的期望。我们转向了 Amazon Redshift Spectrum

在此博文中,我们解释了将带 Redshift Spectrum 的 Amazon Redshift 扩展为现代数据仓库的原因。我将介绍我们的数据增长及平衡成本和性能的需求如何促使我们采用 Redshift Spectrum。我还将分享我们的环境中的关键性能指标,并讨论提供可扩展和快速环境的额外 AWS 服务,并提供数据供我们日益增长的用户群进行立即查询。

将 Amazon Redshift 作为基础

能够为客户和合作伙伴提供最新数据一直是我们平台的主要目标。我们发现其他解决方案可提供几个小时前的数据,但这对我们来说还不够好。我们坚持提供最新的数据。对我们而言,这意味着以频繁的微批次加载 Amazon Redshift 并使我们的客户直接查询 Amazon Redshift 将近乎实时的获得结果。

优势是显而易见的。我们的客户可以看到,他们的活动比其他解决方案执行的更快,对不断变化的媒体供应定价和可用性的反应更迅速。他们非常高兴。

然而,这种方法需要 Amazon Redshift 长期存储大量数据,且我们的数据大幅增长。在高峰期时,我们维护了一个运行 65 个 DC1.large 节点的集群。对 Amazon Redshift 集群的影响是显而易见的,而且我们发现 CPU 利用率增长到 90%。

我们为什么将 Amazon Redshift 扩展到 Redshift Spectrum

Redshift Spectrum 使我们能够使用强大的 Amazon Redshift 查询引擎对 Amazon S3 中存储的数据运行 SQL 查询,无需加载数据。利用 Redshift Spectrum,我们可以以我们希望的成本江南数据存储在我们想要的地方。我们可提供数据供用户在需要时进行分析,且数据性能符合用户预期。

无缝可扩展性、高性能和无限并发数

扩展 Redshift Spectrum 是一个简单的过程。首先,它可使我们将 Amazon S3 用作存储引擎并获得近乎无限的数据容量。

其次,如果我们需要更多计算能力,我们可以在数千个节点上利用 Redshift Spectrum 的分布式计算引擎来提供卓越的性能——这对于针对大量数据运行的复杂查询非常适合。

第三,所有的 Redshift Spectrum 集群均可访问相同的数据目录,这样一来,我们不必再担心数据迁移,可以毫不费力进行无缝扩展。

最后,由于 Redshift Spectrum 将查询分布在可能有数千个的节点上,它们不受其他查询的影响,从而能提供更稳定的性能和无限的并发数。

将它保持为 SQL

Redshift Spectrum 使用与 Amazon Redshift 相同的查询引擎。这意味着,我们不需要更改我们的 BI 工具或查询语法,无论我们是在单个表上使用复杂查询还是加入多个表。

最近推出的一项有趣功能是,能够创建同时跨越 Amazon Redshift 和 Redshift Spectrum 外部表的视图。利用此功能,您可以使用一个视图在 Amazon Redshift 集群中查询频繁访问的数据并在 Amazon S3 中查询较少访问的数据。

利用 Parquet 以获得更高性能

Parquet 是一种柱状数据格式,可提供优越的能力并使 Redshift Spectrum(或 Amazon Athena)能够扫描更少的数据。查询使用较少的 I/O 便能更快运行,且每个查询的费用更低。您可以在 https://parquet.apache.org/https://en.wikipedia.org/wiki/Apache_Parquet 中阅读有关 Parquet 的所有内容。

较低的成本

从成本角度来看,我们为 Amazon S3 中的数据支付标准费率,每个查询只需要少量数据来使用 Redshift Spectrum 分析数据。使用 Parquet 格式,我们可以大大减少扫描的数据量。现在,我们的成本更低,而我们的用户甚至在较大的复杂查询中都能获得快速结果。

我们了解的有关 Amazon Redshift 对Redshift Spectrum 性能的内容

当我们第一次开始了解 Redshift Spectrum 时,我们希望对其进行测试。我们希望了解它与 Amazon Redshift 相比如何,所以我们考虑了两个关键问题:

  1. Amazon Redshift 与 Redshift Spectrum 在简单和复杂查询方面的性能差异是什么?
  2. 数据格式是否影响性能?

在迁移期间,我们将 Amazon Redshift 和 S3 中存储的数据集设置为 CSV/GZIP 和 Parquet 文件格式。我们对三种配置进行了测试:

  • 带 28 个 DC1.large 节点的 Amazon Redshift 集群
  • 使用 CSV/GZIP 的 Redshift Spectrum
  • 使用 Parquet 的 Redshift Spectrum

我们使用一个月的数据对简单和复杂查询执行了基准测试。我们测试了执行查询所需的时间,以及多次运行同一个查询时,结果的一致性如何。我们用于测试的数据已按照日期和小时进行了分区。对数据进行适当分区将大大提高性能并降低查询时间。

简单查询

首先,我们测试了聚合一个月账单数据的简单查询:

SELECT 
  user_id,
  count(*) AS impressions,
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;
SQL

我们运行了七次同一个查询并测量了反应时间(红色表示最长时间,绿色表示最短时间):

执行时间(秒)
  Amazon Redshift Redshift Spectrum
CSV
Redshift Spectrum Parquet
Run #1 39.65 45.11 11.92
Run #2 15.26 43.13 12.05
Run #3 15.27 46.47 13.38
Run #4 21.22 51.02 12.74
Run #5 17.27 43.35 11.76
Run #6 16.67 44.23 13.67
Run #7 25.37 40.39 12.75
Average 21.53  44.82 12.61

对于简单查询,Amazon Redshift 的表现比 Redshift Spectrum 更好,正如我们所想,这是因为数据在 Amazon Redshift 本地。

令人惊讶的是,在 Redshift Spectrum 中使用 Parquet 数据格式明显优于 Amazon Redshift 的“传统”表现。对于我们的查询,使用 Parquet 数据格式与 Redshift Spectrum 产生的平均性能比传统的 Amazon Redshift 高 40%。此外,Redshift Spectrum 在执行时间上表现出很高的一致性,最慢运行和最快运行之间的差异较小。

使用 CSV/GZIP 和 Parquet 时对扫描的数据量进行比较发现,差异也很显著:

扫描的数据 (GB)
CSV (Gzip) 135.49
Parquet 2.83

由于我们只为 Redshift Spectrum 扫描的数据付款,使用 Parquet 显然可以大幅节省成本。

复杂的查询

接下来,我们将相同的三种配置与复杂查询进行了比较。

执行时间(秒)
  Amazon Redshift Redshift Spectrum CSV Redshift Spectrum Parquet
Run #1 329.80 84.20 42.40
Run #2 167.60 65.30 35.10
Run #3 165.20 62.20 23.90
Run #4 273.90 74.90 55.90
Run #5 167.70 69.00 58.40
Average 220.84 71.12 43.14

此时,使用 Parquet 的 Redshift Spectrum 与传统的 Amazon Redshift 相比,将平均查询时间削减了 80%!

底线:对于复杂查询,Redshift Spectrum 与 Amazon Redshift 相比,将性能提高 67%。使用 Parquet 数据格式后,Redshift Spectrum 与 Amazon Redshift 相比,使性能提高 80%。对我们而言,这一提高巨大。

为不同工作负载优化数据结构

由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。

数据排列

例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:

SELECT 
  user,
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN2017-08-01 14:00:00AND2017-08-01 14:00:59GROUP BY 
  user;
SQL

(假设 ‘ts’ 是为每个事件存储时间戳的列。)

利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。

高效创建 Parquet 数据

平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。

最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。

但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:

  • 从实例中收集事件数据。
  • 以 Parquet 格式保存数据。
  • 对数据进行有效分区。

为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:

  1. 将 S3 临时存储桶用作目标,以一分钟的间隔将数据从实例发送到 Kinesis Firehose 中。
  2. 聚合每小时的数据,并使用 AWS LambdaAWS Glue 将其转换为 Parquet。
  3. 通过更新表分区将 Parquet 数据添加到 S3。

在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。

数据验证

为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)(date date, hour smallint) 分区  
存储为 parquet
位置 's3://nuviad-temp/blog/clicks/';
SQL

上面的语句定义了包括几个属性的新外部表(所有的 Redshift Spectrum 表为外部表)。我们将 ‘ts’ 存储为 Unix 时间戳,并非时间戳,计费数据存储为浮点数,而不是小数(稍后进行更多讨论)。我们还说过,数据按日期和小时进行分区,然后在 S3 上存储为 Parquet。

首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';
SQL

此查询列出表中的所有列及其各自的定义:

schemaname tablename columnname external_type columnnum part_key
spectrum blog_clicks user_id varchar(50) 1 0
spectrum blog_clicks campaign_id varchar(50) 2 0
spectrum blog_clicks os varchar(50) 3 0
spectrum blog_clicks ua varchar(255) 4 0
spectrum blog_clicks ts bigint 5 0
spectrum blog_clicks billing double 6 0
spectrum blog_clicks date date 7 1
spectrum blog_clicks hour smallint 8 2

现在,我们可以使用此数据为我们的数据创建验证模式:

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};
JSON

接下来,我们创建一个函数,以使用此模式验证数据:

function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}
JavaScript

使用 Kinesis Firehose 进行近实时的数据加载

在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:

传输流名称:事件
来源:Direct PUT
S3 存储桶:nuviad-events
S3 前缀:rtb/
IAM 角色:firehose_delivery_role_1
数据转换:已禁用
源记录备份:已禁用
S3 缓冲区大小 (MB):100
S3 缓冲区间隔(秒):60
S3 压缩:GZIP
S3 加密:未加密
状态:活动
错误记录:已启用
Code

此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:

if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line

    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // 继续您的下一步 
        }
    });
}
JavaScript

现在,我们有一个 CSV 文件,该文件表示 S3 中存储的一分钟事件数据。在向 S3 写入对象之前,Kinesis Firehose 通过添加一个格式为 YYYY/MM/DD/HH 的 UTC 时间前缀对文件进行自动命名。由于我们将日期和小时用作分区,我们需要更换文件的名称和位置,以适应我们的 Redshift Spectrum 模式。

使用 AWS Lambda 自动化数据分布

我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
Code

我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):

/*
	事件对象中的对象密钥结构:
your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}
    
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}
JavaScript

现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 
JavaScript

Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。

由于我们按日期和小时对数据进行分区,如果处理的分钟是一小时中的第一分钟(即分钟 0),我们在 Redshift Spectrum 表上创建了一个新分区。我们运行了以下各项:

ALTER TABLE 
  spectrum.events 
ADD 分区
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
SQL

处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。

使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet

我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。

创建 AWS Glue 作业

此简单 AWS Glue 脚本执行以下操作:

  • 获取待处理的作业、日期和小时参数
  • 创建 Spark EMR 上下文,以便我们运行 Spark 代码
  • 将 CSV 数据读取到 DataFrame 中
  • 将数据以 Parquet 格式写入目标 S3 存储桶
  • 为该表添加或修改 Redshift Spectrum / Amazon Athena 表分区
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()
Python

注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。

下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误,例如:

S3 查询异常(获取)。由于内部错误,任务失败。文件 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'.列类型:DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。

创建一个 Lambda 函数以触发转换

接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )
Python

使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。

Redshift Spectrum 和 Node.js

我们的开发堆栈基于 Node.js,非常适合需要处理大量交易的高速轻服务器。然而,Node.js 环境有一些限制,要求我们创建变通方法并使用其他工具完成此过程。

Node.js 和 Parquet

由于 Node.js 缺乏 Parquet 模块,需要我们执行 AWS Glue/Amazon EMR 过程以将数据从 CSV 有效迁移到 Parquet。我们宁愿直接保存到 Parquet,但我们找不到有效的方法来执行此操作。

一个有趣的项目是通过称为 node-parquet 的 Marc Vertes 开发 Parquet NPM (https://www.npmjs.com/package/node-parquet)。它没有进入生产状态,但我们认为很值得跟进此生产包的进展。

时间戳数据类型

根据 Parquet 文档,时间戳数据以 64 位整数存储在 Parquet 中。然而,JavaScript 不支持 64 位整数,因为本机号码类型为 64 位双精度型,仅提供 53 位的整数范围。

因此,无法使用 Node.js 将时间戳正确存储在 Parquet 中。解决方法是将时间戳存储为字符串,并将类型转换为查询中的时间戳。使用这种方法,我们没有发现任何性能下降。

经验教训

您可以从我们的试错经验中获益。

经验 1:数据验证很关键

如前所述,分区中的一个条目损坏可能会导致对该分区运行的查询失败,尤其是在使用 Parquet 时,这比简单的 CSV 文件更难编辑。确保您验证数据后再使用 Redshift Spectrum 扫描数据。

经验 2:有效构造数据结构和对数据分区

使用 Redshift Spectrum(或者在此情况下使用 Athena)的一项最大益处是,您不需要一直保持节点的正常运行。您只需为您执行的查询及每个查询所扫描的数据付费。

在此情况下,为不同的查询保留不同的数据排列非常有意义。例如,您可以按日期和小时对数据进行分区,以运行基于时间的查询,也可以有一个按 user_id 和日期分区的集合,以运行基于用户的查询。这将提高数据仓库的运行速度和效率。

以正确的格式存储数据

随时使用 Parquet。Parquet 的益处非常多。性能更快、扫描的数据更少且柱状格式更高效。然而,它不支持 Kinesis Firehose 的开箱即用,因此,您需要执行自己的 ETL。AWS Glue 是一个非常好的选项。

为频繁的任务创建较小的表

当我们开始使用 Redshift Spectrum 时,我们发现 Amazon Redshift 的成本每天上涨数百美元。然后,我们意识到,我们没必要每分钟都扫描一整天的数据。利用在同一个 S3 存储桶或文件夹上定义多个表的能力,并为频繁查询创建临时表和较小的表。

经验 3:将 Athena 和 Redshift Spectrum 结合,以获得最大性能

迁移到 Redshift Spectrum 还能让我们利用 Athena,因为它们都使用 AWS Glue 数据目录。在利用 Amazon Redshift 高级查询引擎使用 Redshift Spectrum 进行复杂查询时,使用 Athena 运行快速而简单的查询。

运行复杂的查询时,Redshift Spectrum 更为突出。它可以将很多计算密集型任务(如谓词过滤和聚合)下推到 Redshift Spectrum 层,从而减少查询使用的集群处理能力。

经验 4:在分区类对您的 Parquet 数据进行分类

通过使用 sortWithinPartitions(sort_field) 在分区内对数据进行分类,我们实现了又一次性能改进。例如:

df.repartition(1).sortWithinPartitions("campaign_id")…
Code

小结

我们非常高兴在过去的三年内将 Amazon Redshift 用作我们的核心数据仓库。但随着我们的客户群和数据量大幅增长,我们对 Amazon Redshift 进行了扩展,从而利用 Redshift Spectrum 的可扩展性、性能和成本。

Redshift Spectrum 让我们可以清楚的扩展到几乎无限的存储、扩展计算,并且可以为我们的用户提供超快的结果。利用 Redshift Spectrum,我们可以将数据以我们想要的成本存储在我们希望的位置,并且可提供数据供用户在需要时进行分析,且数据性能符合用户预期。

关于作者

Rafi Ton 拥有 7 年的广告科技经验和 15 年的领先科技公司经验,是 NUVIAD 的创始人兼 CEO。他喜欢探索并将新技术应用于尖端产品和服务中,在现实世界中创造真正的财富。作为一名经验丰富的企业家,Rafi 相信实用编程和快速适应新技术可以取得显著的市场优势。