Amazon Web Services ブログ

本番環境でAmazon Redshift Spectrum, Amazon Athena, およびAWS GlueをNode.jsで使用する

これはNUVIADの創設者兼CEOであるRafi Tonによるゲスト投稿です。NUVIADは、彼ら自身の言葉を借りれば、「ハイパーターゲティング、ビッグデータ分析、先進的な機械学習ツールを使ってプロのマーケティング担当者代理店、地元の企業に最先端のツールを提供するモバイルマーケティングプラットフォーム」です。

NUVIADでは3年以上にわたり、Amazon Redshiftを主なデータウェアハウスソリューションとして使用してきました。

当社は、ユーザーとパートナーが分析し広告キャンペーンの戦略を決定するための、大量の広告取引データを保存しています。リアルタイム入札(RTB)キャンペーンを大規模に実行する場合、ユーザーがキャンペーンの掲載結果の変化に迅速に対応する上で、データの最新性が極めて重要となります。我々は、シンプルさ、スケーラビリティ、パフォーマンス、およびニアリアルタイムで新しいデータを読み込む能力を評価し、Amazon Redshiftを選択しました。

過去3年間で、当社の顧客基盤は大幅に成長し、データも同様に増加しました。Amazon Redshiftクラスターは、当初の3ノードから65ノードにまで伸張しました。コストと分析のパフォーマンスのバランスを取るため、我々は頻繁に分析されない大量のデータを低コストで保存する方法を探しました。一方で、我々は依然として、ユーザークエリーに対してすぐにデータを利用できるようにしておき、高速なパフォーマンスについての彼らの期待に応えたいと考えていました。そして、我々はAmazon Redshift Spectrumに目を向けたのです。

この記事では、Amazon RedshiftをRedshift Spectrumによってモダンなデータウェアハウスとして拡張した理由について説明します。データの成長と、コストとパフォーマンスのバランスを取る要求とが、どのように我々をしてRedshift Spectrumの採用に至らしめたかを説明します。私たちの環境における重要なパフォーマンスメトリクスをご紹介し、また、増え続けるユーザーベースによる即時性の高いクエリーのためにデータを利用可能な状態に置きつつ、スケーラブルで高速な環境を提供する、その他のAWSサービスについても議論します。

ビジネス基盤としてのAmazon Redshift

当社のプラットフォームでは、最新のデータをお客様やパートナーに提供することが常に主要な目標でした。数時間前のデータを提供する他のソリューションがも検討しましたが、これは我々にとって十分ではありませんでした。可能な限り最新のデータを提供することにこだわりたかったのです。Amazon Redshiftによって、頻繁なマイクロバッチでデータをロードし、顧客がAmazon Redshiftに直接クエリーしてニアリアルタイムで結果を得ることが可能となりました。

利点はすぐに明らかになりました。当社のお客様は、キャンペーンが他のソリューションよりいかに速く実行されたかを知ることができ、また、常に変化し続けるメディアの供給価格と利用可能性の課題に早急に対応できるようになりました。彼らはとても幸せでした。

しかし、この方法ではAmazon Redshiftに長期間にわたって多くのデータを保存する必要があり、そして我々のデータは急速に増加していました。ピーク時には、65のDC1.largeノードを実行するクラスターを運用していました。Amazon Redshiftクラスタへの影響は明白であり、CPU使用率も90%にまで増加していました。

Amazon RedshiftをRedshift Spectrumへと拡張した理由

Redshift Spectrumは、データをロードすることなく、Amazon S3に格納されたデータに対して、強力なAmazon Redshiftクエリエンジンを使用してSQLクエリを実行する能力を提供してくれます。Redshift Spectrumでは、必要な場所に、我々が望むコストでデータを保存することができます。そしてデータを、ユーザーが必要とした時に期待通りのパフォーマンスで分析が行える状態にしておくことができるのです。

シームレスなスケーラビリティ、高性能、および無制限の同時実行性

Redshift Spectrumがスケールするプロセスはシンプルです。まず、Amazon S3をストレージエンジンとして利用し、事実上無制限のデータキャパシティを得ることができるようになります。

次に、より多くのコンピューティング能力が必要な場合は、Redshift Spectrumの数千ノードにおよぶ分散コンピューティングエンジンを使ってよりよいパフォーマンスを得ることができます。大量のデータに対して複雑なクエリーを投げるには最適です。

さらに、全てのRedshift Spectrumクラスターを同一のデータカタログにアクセスさせれば、データの移行に頭を悩ませることはなくなります。スケーリングは労力を必要とせず、かつシームレスなものになります。

最後に、Redshift Spectrumは潜在的に数千ものノードにクエリーを分散させるため、他のクエリーによって影響を受けることがなくなり、より安定したパフォーマンスが得られます。また、無制限の同時実行性(訳者註:クラスターを分けることで実現できます)が提供されることになります。

SQLを維持できること

Redshift SpectrumはAmazon Redshiftと同じクエリエンジンを使用します。従って、単一のテーブルで複雑なクエリを使用する場合も、複数のテーブルを結合する場合も、既存のBIツールやクエリー構文を変更する必要はありませんでした。

最近紹介された興味深い機能は、Amazon RedshiftとRedshift Spectrumの外部表の両方にまたがるビューを作成できるというものです。この機能を使用すると、Amazon Redshiftクラスター内の頻繁にアクセスされるデータと、Amazon S3上の頻繁にアクセスされないデータを、1つのビューでクエリーすることができます。

より高いパフォーマンスのためのParquet利用

Parquet は列指向のデータフォーマットです。Parquetは優れたパフォーマンスを提供するとともに、Redshift Spectrum(あるいはAmazon Athena)が極めて少ないデータのみをスキャンできるようにします。I/Oが少なくなれば、クエリーはより高速になり、そしてクエリー当たりのコストも低くなります。

Parquetに関する全ての情報は、https://parquet.apache.org/ または https://en.wikipedia.org/wiki/Apache_Parquet で得ることができます。

より低いコスト

コストの観点では、我々は、Redshift Spectrumを使用してデータを分析する上で、Amazon S3上のデータを標準レートで支払う他は、クエリーごとに発生するわずかな金額しか支払っていません。Qarquetフォーマットを利用すれば、スキャンするデータの量を大幅に削減できます。我々のコストは以前より低くなり、そして我々のユーザーは大規模で複雑なクエリに対しても迅速な結果を得ることができます。

Amazon RedshiftとRedshift Spectrumの性能比較から我々が学んだこと

Redshift Spectrumを最初に見たときに、まずテストを行いたいと思いました。Amazon Redshiftとの比較について知りたかったので、2つの重要な問いに着目しました。

  1. シンプルかつ複雑なクエリーに対して、Amazon RedshiftとRedshift Spectrumのパフォーマンス上の違いはどのようなものであるか?
  2. データフォーマットはパフォーマンスに影響するか?

移行フェーズ中、我々はデータセットをAmazon Redshiftと、S3上のCSV/GZIPおよびParquetファイルフォーマットとして格納しました。テストしたのは次の3つの構成です。

  • 28のDC1.largeノードを持つAmazon Redshiftクラスター
  • CSV/GZIPを使用したRedshift Spectrum
  • Parquetを用いたRedshift Spectrum

1か月分のデータを用いて、シンプルなクエリーと複雑なクエリーのベンチマークを実行しました。クエリを実行するのにかかる時間と、同じクエリを複数回実行したときに結果がどの程度一定になるかをテストしました。テストには、すでに日付と時間でパーティション済みのデータを使用しました。データを適切に分割すると、パフォーマンスが大幅に向上し、クエリー時間が短縮されます。

シンプルなクエリー

最初に、月当たりの請求データを集計するシンプルなクエリーをテストしました。

SELECT 
  user_id, 
  count(*) AS impressions, 
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;

同一のクエリーを7回流し、レスポンスタイムを計測しています(は最遅、は最速を示しています)。

実行時間 (秒)
  Amazon Redshift Redshift Spectrum
CSV
Redshift Spectrum Parquet
実行 #1 39.65 45.11 11.92
実行 #2 15.26 43.13 12.05
実行 #3 15.27 46.47 13.38
実行 #4 21.22 51.02 12.74
実行 #5 17.27 43.35 11.76
実行 #6 16.67 44.23 13.67
実行 #7 25.37 40.39 12.75
平均 21.53  44.82 12.61

シンプルなクエリーについては、事前に我々が予想していた通り、データをローカルに保持しているAmazon RedshiftがRedshift Spectrumよりよい数値を示しました。

驚きだったのは、Parquetデータフォーマットを使用したRedshift Spectrumの性能が、“従来型の”Amazon Redshiftのそれを大きく上回ったことです。我々のクエリ-に関して言えば、Redshift SpectrumでParquetデータフォーマットを使用した場合、従来型Amazon Redshiftに対して平均40%の高速化が見られました。また、Redshift Spectrumは実行時間も安定しており、最遅と最速の差はわずかなものでした。

スキャンされたデータをCSV/GZIPとParquetで比較してみると、両者の差異もまた顕著であることがわかります。

スキャンされたデータ (GB)
CSV (Gzip) 135.49
Parquet 2.83

Redshift Spectrumの課金はスキャンされたデータに対してのみ行われるので、Parquetを使用した場合のコスト削減効果は明白です。

複雑なクエリー

次に、複雑なクエリーについて、同じ3つの構成を比較しました。

実行時間 (秒)
  Amazon Redshift Redshift Spectrum CSV Redshift Spectrum Parquet
実行 #1 329.80 84.20 42.40
実行 #2 167.60 65.30 35.10
実行 #3 165.20 62.20 23.90
実行 #4 273.90 74.90 55.90
実行 #5 167.70 69.00 58.40
平均 220.84 71.12 43.14

今度は、Redshift SpectrumでParquetを使用した構成は、従来型のAmazon Redshiftに対し80%もの平均クエリー実行時間削減が見られました!

結論:複雑なクエリーの場合、Redshift SpectrumはAmazon Redshiftに対し67%の性能向上をもたらします。Parquetデータフォーマットを使用することで、Redshift SpectrumのAmazon Redshiftに対する性能改善効果は80%に達します。我々にとって、この結果は大きなものでした。

様々なワークロードのためのデータ構造の最適化

S3のコストは比較的低く、またRedshift Spectrumのクエリーコストはスキャンされたデータに対してのみかかることから、我々はデータを様々な分析エンジンやワークロードごとに別々のフォーマットで保持しておくことが合理的であると思っています。S3上の単一データをポイントする、いくつものテーブルを持つことが出来る点を押さえておくことは重要です。全ては、いかにデータを分割し、テーブルパーティションを更新するか次第です。

データ配列

例えば、我々のシステムには毎分実行され、直近1分の収集データに関する統計情報を生成する処理があります。Amazon Redshiftでは、これは以下のようなクエリーをテーブルに対して実行することで実現できます。

SELECT 
  user, 
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ 
GROUP BY 
  user;

(”ts”は個々のイベントのタイムスタンプを保持する列を想定しています)

Redshift Spectrumでは、個々のクエリーがスキャンしたデータに対してのみ課金されます。データが時間単位ではなく分単位で分割されている場合、1秒分を見るクエリーのコストは60分の1で済みます。もし直近1秒のデータのみ指定するテンポラリーテーブルを使用するのであれば、不必要なデータの分のコストを削減することができるのです。

Parquetデータの効率的な作成

平均すると、我々はトラフィックを処理するために800ほどのインスタンスを保持しています。それぞれのインスタンスは、最終的にAmazon Redshiftにロードされることになるイベントを送信します。3年前に始めた際は、個々のインスタンスからS3にデータをオフロードし、その後定期的にCOPYコマンドを用いてS3からAmazon Redshiftにロードしていました。

近年、Amazon Kinesis FirehoseによってAmazon Redshiftに直接データを投入することができるようになりました。これは今では有効なオプションの一つですが、我々は3年にわたって完璧かつ効率的に動作してきた既存の収集方法を堅持していました。

しかし、Redshift Spectrumを導入することになり、状況が変わりました。Redshift Spectrumでは、以下のことを行う方法を見つけ出す必要がありました。

  • インスタンスからイベントデータを収集する。
  • データをParquetフォーマットで保存する。
  • データを効果的に分割する。

これを実現するため、我々はデータをCSVで保存し、その後Parquetに変換しています。Parquetファイルを生成するための最も効果的な方法は、以下の通りです。

  1. データを1秒間隔でインスタンスからKinesis Firehoseに送信し、S3の一時バケットに保存する。
  2. AWS LambdaおよびAWS Glueを用いて、データを1時間単位で集計し、Parquetに変換する。
  3. ParquetデータをS3に保存し、テーブルパーティションを更新する。

この新しいプロセスでは、データはKinesis Firehoseに送られる前により慎重に検証される必要がありました。壊れたレコードがパーティション内に1件でもあると、そのパーティションに対するクエリーは失敗するためです。

データ検証

クリックデータをテーブルに保存するために、以下のようなcreate table用のSQLを用意しました。

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)  
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

上記のステートメントによって、いくつかの属性と共に新しい外部テーブルが定義されます(全てのRedshift Spectrumテーブルは外部テーブルです)。ここでは、’ts’はTimestamp型ではなくUNIXタイムスタンプとして保持しています。また、課金データはdeciaml型ではなくfloat型として保持します(詳細は後ほど説明します)。前述の通り、データは日時と時間で分割され、ParquetとしてS3上に保持されます。

まず、テーブル定義を取得する必要があります。これは下記のようなクエリーを実行することで実現できます。

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';

このクエリーはテーブル内の全カラムをそれぞれの定義と共にリストします。

schemaname tablename columnname external_type columnnum part_key
spectrum blog_clicks user_id varchar(50) 1 0
spectrum blog_clicks campaign_id varchar(50) 2 0
spectrum blog_clicks os varchar(50) 3 0
spectrum blog_clicks ua varchar(255) 4 0
spectrum blog_clicks ts bigint 5 0
spectrum blog_clicks billing double 6 0
spectrum blog_clicks date date 7 1
spectrum blog_clicks hour smallint 8 2

このデータを使用して、データの検証スキーマを作成することができます。

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};
次に、このスキーマを用いてデータを検証する関数を作成します。
function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}

Kinesis Firehoseを使用したニアリアルタイムのデータローディング

Kinesis Firehose上で、イベントを処理するための新しいデリバリーストリームを以下の通り作成しました。

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

このデリバリーストリームはイベントデータを毎分または100MBごとに集計し、データをS3バケットにCSV/GZIP圧縮ファイルとして書き込みます。データが検証された後は、Kinesis Firehose APIに安全に送信することができます。


if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // Continue to your next step 
        }
    });
}

これで、イベントデータを1分ごとに単一のCSVファイルとしてS3に保管することができるようになります。ファイルはKinesis Firehoseによって、UTC時間プレフィックスをYYYY/MM/DD形式で付加する形で自動的に命名され、その後オブジェクトがS3に書き込まれます。日付と時間をパーティションとして使用しているので、命名規則と場所はRedshift Spectrumスキーマに適合するよう変更する必要があります。

AWS Lambdaを使用したデータ分散の自動化

ファイルを別のロケーションにコピーするためのS3 PUTイベントをトリガーとする、シンプルなLambda関数を作成しました。ただしS3イベントは、我々のデータ構造と処理フローに適合するようリネームしています。先に触れた通り、Kinesis Firehoseによって生成されたファイルは事前に定義された構造で階層化されています。例えば以下のようなものです。

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

後は、オブジェクト名をパースして最適な形に再構成するだけです。我々のケースでは、以下のようにしています(このオブジェクトはLambda関数に渡されたもので、S3に書き込まれた当該オブジェクトに関するデータを保持しています)。

/*

	object key structure in the event object:

your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}

let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

これで、必要に応じてファイルを2つの宛先に再配布することができるようになります。一つは分単位の処理タスク、もう一つは時間単位での集計です。

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 

Kinesis Firehoseはデータを一時フォルダーに保管します。このオブジェクトを、直近1分に処理されたデータを保管する別のフォルダーにコピーします。このフォルダーは、ずっと大きなデータセットをスキャンせずに処理できるよう、小さなRedshift Spectrumテーブルに接続されています。同じデータを、1時間分のデータを保管するフォルダーにコピーし、後で集計してParquetに変換できるようにします。

データを日付と時間に分割しているので、処理対象の分が1時間の最初の1分(つまり0分)だった場合は、以下のクエリーを実行してRedshift Spectrumテーブル上に新しいパーティションを作成しています。

ALTER TABLE 
  spectrum.events 
ADD partition
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';

データが処理されテーブルに追加されたら、Kinesis Firehoseの一時領域および分単位のフォルダーから処理済みデータを削除します。

AWS GlueおよびAmazon EMRを使用したCSVのParquet変換

CSVデータをParquetに変換する時間単位ジョブを実行する上で、我々の知る限り最もシンプルなやり方は、LambdaとAWS Glueを利用する方法です(AWSビッグデータチームの素晴らしい協力に感謝します)。

AWS Glueジョブの作成

このシンプルなAWS Glueジョブが行っていることは以下の通りです。

  • 処理対象のジョブ、日付、時間のパラメーターを取得
  • Sparkコードを実行するためのSpark EMRコンテキストを作成
  • CSVデータをDataFrameに読み込み
  • データをS3バケット上にParquetとして書き込み
  • Redshift Spectrum / Amazon Athenaテーブルパーティションを追加または修正
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()

備考:Redshift SpectrumとAthenaは共にAWS Glueデータカタログを使用するため、Athenaクライアントを使用してパーティションをテーブルに追加することができました。

float、decimalおよびdoubleについていくつか注意点があります。decimalを使用することは、Redshift SpectrumとSparkで取り扱いが異なることから、予想より多くの困難を伴いました。Redshift SpectrumとSparkでdecimalを使用する度に、以下のようなエラーが発生したのです。

S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

いくつかの浮動小数点フォーマットを試した結果、Spark上ではカラムをdoubleで定義し、Spectrumではfloatで定義するのが唯一有効な組み合わせであることがわかりました。課金データをSpectrumではfloat、Sparkコードではdoubleで定義しなければならない理由はここにあります。

変換を実行するためのLambda関数の作成

次に、次のようなPythonコードを用いて、AWS Glueスクリプトを1時間ごとに起動するためのシンプルなLambda関数を作成しました。

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Amazon CloudWatch Eventsを使用してこの関数を1時間ごとに実行しました。この関数は、‘convertEventsParquetHourly’という名前のAWS Glueジョブを起動して直近の1時間を処理し、ジョブ名とパーティションの値をAWS Glueに渡します。

Redshift SpectrumとNode.js

我々の開発スタックはNode.jsを基本としています。Node.jsは大量のトランザクションを処理する必要のある、高速・軽量なサーバーでの利用に適しています。しかし、Node.js環境にはいくつかの制約があるため、ワークアラウンドを用意し、別のツールを使うなどして、処理を完結させる必要がありました。

Node.jsとParquet

Node.jsにはParquetモジュールが欠けていたため、データをCSVからParquetへ効率的に移行するためには、AWS Glue / Amazon EMRによる処理を実装する必要がありました。本来は直接Parquetに保存したいところですが、他に妥当な方法が見つかりませんでした。

現在進められている興味深いプロジェクトの一つに、Marc VertesによるParquet NPMの開発があります(https://www.npmjs.com/package/node-parquet)。まだ正式リリースには至っていませんが、このパッケージの進捗は注視するに値すると考えています。

Timestampデータ型

Parquetドキュメンテーションによると、Timestampデータは64-bit integerとしてParquetに保存されます。しかしながら、JavaScriptのネイティブの数値型は整数レンジに53ビットのみを割り当てた64-bit doubleであり、64-bit integerはサポートしていません。

このため、Node.jsではTimestampを正しくParquetに保存することができません。解決策は、Timestampを文字列として保存し、クエリー内でTimestampとしてキャストすることです。この方法による性能劣化は一切観察されませんでした。

 

知見

我々のトライアンドエラーの経験から、以下のような知見を得ることができるかと思います。

知見 #1: データ検証は極めて重要である

前述の通り、パーティション内に壊れたエントリーが一つでもあると、そのパーティションに対するクエリーは失敗します。特に、Parquetを使用している場合は、シンプルなCSVファイルに比べて修正が困難です。Redshift Spectrumでスキャンする前に、データを検証するようにして下さい。

知見 #2: データを効率的に構造化および分割する

Redshift Spectrum(あるいはAthena)の最大の利点の一つは、ノードを常時起動しておく必要がないことです。実行したクエリーによってスキャンされたデータに対してのみ、課金が行われます。

異なるクエリーのために、データを異なる並びで保持しておくことは、この点では非常に有益です。例えば、時間ベースでクエリーを実行するために日付と時間でデータを分割し、user_idベースでクエリーを実行するためにuser_idと日付で分割した別のパーティションを持つことができます。これにより、あなたのデータウェアハウスはより高速かつ効率的になるでしょう。

正しいフォーマットでデータを保管する

可能な場合はParquetを利用して下さい。高速なパフォーマンス、より少ないスキャンデータ、列指向フォーマットによる効率性など、Parquetの効果は絶大です。ただし、Kinesis Firehoseで直接に利用することはできないため、独自のETLの仕組みを実装する必要があります。AWS Glueは素晴らしいオプションです。

頻繁なタスクには小さなテーブルを作成する

Redshift Spectrumを利用し始めた頃、我々はAmazon Redshiftのコストが一日当たり何百ドルも跳ね上がっていることに気づきました。丸1日分のデータを毎分、不必要にスキャンしていることに気づいたのはその後のことです。同じS3バケットないしフォルダーに対して複数のテーブルを定義できる利点を活かし、頻度の高いクエリーには一時的で小さなテーブルを用意するようにして下さい。

知見 #3: 最適なパフォーマンスのためにAthenaとRedshift Spectrumを組み合わせる

Redshift Spectrumへの移行は、同様にAWS Glueのデータカタログを使用しているAthenaの利点を活かすことにも繋がりました。シンプルでクイックなクエリーをAthenaで捌きつつ、より複雑なクエリーにはRedshift Spectrumを使ってAmazon Redshiftのクエリーエンジンを使う、といったことができます。

Redshift Spectrumは複雑なクエリーを実行することに秀でています。プレディケイトフィルタリングや集計といった計算能力依存のタスクの多くをRedshit Spectrum層にプッシュダウンすることができ、クエリーが消費するクラスターの処理能力を大幅に抑制できます。

知見 #4: パーティション内でParquetデータをソートする

sortWithinPartiions(sort_field)を用いてパーティション内でデータをソートすることで、さらなる性能改善を達成することができます。例えば以下のようにします。

df.repartition(1).sortWithinPartitions("campaign_id")…

結論

我々は、コアデータウェアハウスとして3年以上利用してきたAmazon Redshiftに非常に満足していました。しかし、我々のクライアントベースとデータ量が飛躍的に増加したことから、Amazon Redshiftを拡張し、Redshift Spectrumのスケーラビリティ、性能、コストの利点を活用することを決断しました。

Redshift Spectrumは事実上無制限のストレージ領域を利用すること、コンピュート能力を透過的に拡張すること、および我々のユーザーに極めて高速に結果を提供することを可能にしてくれます。Redshift Spectrumによって、我々はデータを望む場所に望むコストで保管し、そしてユーザーが望む時に彼らが期待するだけの性能で分析を行えるよう、データを利用可能な状態にしておくことができるのです。

著者について

テクノロジーリーディングカンパニーでの15年とアドテク業界での7年の経験を経て、Rafi ToniはNUVIADを設立しそのCEOに就任しました。新しいテクノロジーを探索し、現実世界で現実のマネーを生み出すカッティングエッジな製品やサービスに反映することを楽しんでいます。起業家としての経験から、Rafiは新規テクノロジーへの早期適応と実践的なプログラミングこそが大きな市場価値を生み出すと信じています。

 

 

(翻訳はプロフェッショナルサービス仲谷が担当しました。原文はこちら