Category: Amazon Redshift*


本番環境でAmazon Redshift Spectrum, Amazon Athena, およびAWS GlueをNode.jsで使用する

これはNUVIADの創設者兼CEOであるRafi Tonによるゲスト投稿です。NUVIADは、彼ら自身の言葉を借りれば、「ハイパーターゲティング、ビッグデータ分析、先進的な機械学習ツールを使ってプロのマーケティング担当者代理店、地元の企業に最先端のツールを提供するモバイルマーケティングプラットフォーム」です。

NUVIADでは3年以上にわたり、Amazon Redshiftを主なデータウェアハウスソリューションとして使用してきました。

当社は、ユーザーとパートナーが分析し広告キャンペーンの戦略を決定するための、大量の広告取引データを保存しています。リアルタイム入札(RTB)キャンペーンを大規模に実行する場合、ユーザーがキャンペーンの掲載結果の変化に迅速に対応する上で、データの最新性が極めて重要となります。我々は、シンプルさ、スケーラビリティ、パフォーマンス、およびニアリアルタイムで新しいデータを読み込む能力を評価し、Amazon Redshiftを選択しました。

過去3年間で、当社の顧客基盤は大幅に成長し、データも同様に増加しました。Amazon Redshiftクラスターは、当初の3ノードから65ノードにまで伸張しました。コストと分析のパフォーマンスのバランスを取るため、我々は頻繁に分析されない大量のデータを低コストで保存する方法を探しました。一方で、我々は依然として、ユーザークエリーに対してすぐにデータを利用できるようにしておき、高速なパフォーマンスについての彼らの期待に応えたいと考えていました。そして、我々はAmazon Redshift Spectrumに目を向けたのです。

この記事では、Amazon RedshiftをRedshift Spectrumによってモダンなデータウェアハウスとして拡張した理由について説明します。データの成長と、コストとパフォーマンスのバランスを取る要求とが、どのように我々をしてRedshift Spectrumの採用に至らしめたかを説明します。私たちの環境における重要なパフォーマンスメトリクスをご紹介し、また、増え続けるユーザーベースによる即時性の高いクエリーのためにデータを利用可能な状態に置きつつ、スケーラブルで高速な環境を提供する、その他のAWSサービスについても議論します。

ビジネス基盤としてのAmazon Redshift

当社のプラットフォームでは、最新のデータをお客様やパートナーに提供することが常に主要な目標でした。数時間前のデータを提供する他のソリューションがも検討しましたが、これは我々にとって十分ではありませんでした。可能な限り最新のデータを提供することにこだわりたかったのです。Amazon Redshiftによって、頻繁なマイクロバッチでデータをロードし、顧客がAmazon Redshiftに直接クエリーしてニアリアルタイムで結果を得ることが可能となりました。

利点はすぐに明らかになりました。当社のお客様は、キャンペーンが他のソリューションよりいかに速く実行されたかを知ることができ、また、常に変化し続けるメディアの供給価格と利用可能性の課題に早急に対応できるようになりました。彼らはとても幸せでした。

しかし、この方法ではAmazon Redshiftに長期間にわたって多くのデータを保存する必要があり、そして我々のデータは急速に増加していました。ピーク時には、65のDC1.largeノードを実行するクラスターを運用していました。Amazon Redshiftクラスタへの影響は明白であり、CPU使用率も90%にまで増加していました。

Amazon RedshiftをRedshift Spectrumへと拡張した理由

Redshift Spectrumは、データをロードすることなく、Amazon S3に格納されたデータに対して、強力なAmazon Redshiftクエリエンジンを使用してSQLクエリを実行する能力を提供してくれます。Redshift Spectrumでは、必要な場所に、我々が望むコストでデータを保存することができます。そしてデータを、ユーザーが必要とした時に期待通りのパフォーマンスで分析が行える状態にしておくことができるのです。

シームレスなスケーラビリティ、高性能、および無制限の同時実行性

Redshift Spectrumがスケールするプロセスはシンプルです。まず、Amazon S3をストレージエンジンとして利用し、事実上無制限のデータキャパシティを得ることができるようになります。

次に、より多くのコンピューティング能力が必要な場合は、Redshift Spectrumの数千ノードにおよぶ分散コンピューティングエンジンを使ってよりよいパフォーマンスを得ることができます。大量のデータに対して複雑なクエリーを投げるには最適です。

さらに、全てのRedshift Spectrumクラスターを同一のデータカタログにアクセスさせれば、データの移行に頭を悩ませることはなくなります。スケーリングは労力を必要とせず、かつシームレスなものになります。

最後に、Redshift Spectrumは潜在的に数千ものノードにクエリーを分散させるため、他のクエリーによって影響を受けることがなくなり、より安定したパフォーマンスが得られます。また、無制限の同時実行性(訳者註:クラスターを分けることで実現できます)が提供されることになります。

SQLを維持できること

Redshift SpectrumはAmazon Redshiftと同じクエリエンジンを使用します。従って、単一のテーブルで複雑なクエリを使用する場合も、複数のテーブルを結合する場合も、既存のBIツールやクエリー構文を変更する必要はありませんでした。

最近紹介された興味深い機能は、Amazon RedshiftとRedshift Spectrumの外部表の両方にまたがるビューを作成できるというものです。この機能を使用すると、Amazon Redshiftクラスター内の頻繁にアクセスされるデータと、Amazon S3上の頻繁にアクセスされないデータを、1つのビューでクエリーすることができます。

より高いパフォーマンスのためのParquet利用

Parquet は列指向のデータフォーマットです。Parquetは優れたパフォーマンスを提供するとともに、Redshift Spectrum(あるいはAmazon Athena)が極めて少ないデータのみをスキャンできるようにします。I/Oが少なくなれば、クエリーはより高速になり、そしてクエリー当たりのコストも低くなります。

Parquetに関する全ての情報は、https://parquet.apache.org/ または https://en.wikipedia.org/wiki/Apache_Parquet で得ることができます。

より低いコスト

コストの観点では、我々は、Redshift Spectrumを使用してデータを分析する上で、Amazon S3上のデータを標準レートで支払う他は、クエリーごとに発生するわずかな金額しか支払っていません。Qarquetフォーマットを利用すれば、スキャンするデータの量を大幅に削減できます。我々のコストは以前より低くなり、そして我々のユーザーは大規模で複雑なクエリに対しても迅速な結果を得ることができます。

Amazon RedshiftとRedshift Spectrumの性能比較から我々が学んだこと

Redshift Spectrumを最初に見たときに、まずテストを行いたいと思いました。Amazon Redshiftとの比較について知りたかったので、2つの重要な問いに着目しました。

  1. シンプルかつ複雑なクエリーに対して、Amazon RedshiftとRedshift Spectrumのパフォーマンス上の違いはどのようなものであるか?
  2. データフォーマットはパフォーマンスに影響するか?

移行フェーズ中、我々はデータセットをAmazon Redshiftと、S3上のCSV/GZIPおよびParquetファイルフォーマットとして格納しました。テストしたのは次の3つの構成です。

  • 28のDC1.largeノードを持つAmazon Redshiftクラスター
  • CSV/GZIPを使用したRedshift Spectrum
  • Parquetを用いたRedshift Spectrum

1か月分のデータを用いて、シンプルなクエリーと複雑なクエリーのベンチマークを実行しました。クエリを実行するのにかかる時間と、同じクエリを複数回実行したときに結果がどの程度一定になるかをテストしました。テストには、すでに日付と時間でパーティション済みのデータを使用しました。データを適切に分割すると、パフォーマンスが大幅に向上し、クエリー時間が短縮されます。

シンプルなクエリー

最初に、月当たりの請求データを集計するシンプルなクエリーをテストしました。

SELECT 
  user_id, 
  count(*) AS impressions, 
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;

同一のクエリーを7回流し、レスポンスタイムを計測しています(は最遅、は最速を示しています)。

実行時間 (秒)
  Amazon Redshift Redshift Spectrum
CSV
Redshift Spectrum Parquet
実行 #1 39.65 45.11 11.92
実行 #2 15.26 43.13 12.05
実行 #3 15.27 46.47 13.38
実行 #4 21.22 51.02 12.74
実行 #5 17.27 43.35 11.76
実行 #6 16.67 44.23 13.67
実行 #7 25.37 40.39 12.75
平均 21.53  44.82 12.61

シンプルなクエリーについては、事前に我々が予想していた通り、データをローカルに保持しているAmazon RedshiftがRedshift Spectrumよりよい数値を示しました。

驚きだったのは、Parquetデータフォーマットを使用したRedshift Spectrumの性能が、“従来型の”Amazon Redshiftのそれを大きく上回ったことです。我々のクエリ-に関して言えば、Redshift SpectrumでParquetデータフォーマットを使用した場合、従来型Amazon Redshiftに対して平均40%の高速化が見られました。また、Redshift Spectrumは実行時間も安定しており、最遅と最速の差はわずかなものでした。

スキャンされたデータをCSV/GZIPとParquetで比較してみると、両者の差異もまた顕著であることがわかります。

スキャンされたデータ (GB)
CSV (Gzip) 135.49
Parquet 2.83

Redshift Spectrumの課金はスキャンされたデータに対してのみ行われるので、Parquetを使用した場合のコスト削減効果は明白です。

複雑なクエリー

次に、複雑なクエリーについて、同じ3つの構成を比較しました。

実行時間 (秒)
  Amazon Redshift Redshift Spectrum CSV Redshift Spectrum Parquet
実行 #1 329.80 84.20 42.40
実行 #2 167.60 65.30 35.10
実行 #3 165.20 62.20 23.90
実行 #4 273.90 74.90 55.90
実行 #5 167.70 69.00 58.40
平均 220.84 71.12 43.14

今度は、Redshift SpectrumでParquetを使用した構成は、従来型のAmazon Redshiftに対し80%もの平均クエリー実行時間削減が見られました!

結論:複雑なクエリーの場合、Redshift SpectrumはAmazon Redshiftに対し67%の性能向上をもたらします。Parquetデータフォーマットを使用することで、Redshift SpectrumのAmazon Redshiftに対する性能改善効果は80%に達します。我々にとって、この結果は大きなものでした。

様々なワークロードのためのデータ構造の最適化

S3のコストは比較的低く、またRedshift Spectrumのクエリーコストはスキャンされたデータに対してのみかかることから、我々はデータを様々な分析エンジンやワークロードごとに別々のフォーマットで保持しておくことが合理的であると思っています。S3上の単一データをポイントする、いくつものテーブルを持つことが出来る点を押さえておくことは重要です。全ては、いかにデータを分割し、テーブルパーティションを更新するか次第です。

データ配列

例えば、我々のシステムには毎分実行され、直近1分の収集データに関する統計情報を生成する処理があります。Amazon Redshiftでは、これは以下のようなクエリーをテーブルに対して実行することで実現できます。

SELECT 
  user, 
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ 
GROUP BY 
  user;

(”ts”は個々のイベントのタイムスタンプを保持する列を想定しています)

Redshift Spectrumでは、個々のクエリーがスキャンしたデータに対してのみ課金されます。データが時間単位ではなく分単位で分割されている場合、1秒分を見るクエリーのコストは60分の1で済みます。もし直近1秒のデータのみ指定するテンポラリーテーブルを使用するのであれば、不必要なデータの分のコストを削減することができるのです。

Parquetデータの効率的な作成

平均すると、我々はトラフィックを処理するために800ほどのインスタンスを保持しています。それぞれのインスタンスは、最終的にAmazon Redshiftにロードされることになるイベントを送信します。3年前に始めた際は、個々のインスタンスからS3にデータをオフロードし、その後定期的にCOPYコマンドを用いてS3からAmazon Redshiftにロードしていました。

近年、Amazon Kinesis FirehoseによってAmazon Redshiftに直接データを投入することができるようになりました。これは今では有効なオプションの一つですが、我々は3年にわたって完璧かつ効率的に動作してきた既存の収集方法を堅持していました。

しかし、Redshift Spectrumを導入することになり、状況が変わりました。Redshift Spectrumでは、以下のことを行う方法を見つけ出す必要がありました。

  • インスタンスからイベントデータを収集する。
  • データをParquetフォーマットで保存する。
  • データを効果的に分割する。

これを実現するため、我々はデータをCSVで保存し、その後Parquetに変換しています。Parquetファイルを生成するための最も効果的な方法は、以下の通りです。

  1. データを1秒間隔でインスタンスからKinesis Firehoseに送信し、S3の一時バケットに保存する。
  2. AWS LambdaおよびAWS Glueを用いて、データを1時間単位で集計し、Parquetに変換する。
  3. ParquetデータをS3に保存し、テーブルパーティションを更新する。

この新しいプロセスでは、データはKinesis Firehoseに送られる前により慎重に検証される必要がありました。壊れたレコードがパーティション内に1件でもあると、そのパーティションに対するクエリーは失敗するためです。

データ検証

クリックデータをテーブルに保存するために、以下のようなcreate table用のSQLを用意しました。

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)  
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

上記のステートメントによって、いくつかの属性と共に新しい外部テーブルが定義されます(全てのRedshift Spectrumテーブルは外部テーブルです)。ここでは、’ts’はTimestamp型ではなくUNIXタイムスタンプとして保持しています。また、課金データはdeciaml型ではなくfloat型として保持します(詳細は後ほど説明します)。前述の通り、データは日時と時間で分割され、ParquetとしてS3上に保持されます。

まず、テーブル定義を取得する必要があります。これは下記のようなクエリーを実行することで実現できます。

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';

このクエリーはテーブル内の全カラムをそれぞれの定義と共にリストします。

schemaname tablename columnname external_type columnnum part_key
spectrum blog_clicks user_id varchar(50) 1 0
spectrum blog_clicks campaign_id varchar(50) 2 0
spectrum blog_clicks os varchar(50) 3 0
spectrum blog_clicks ua varchar(255) 4 0
spectrum blog_clicks ts bigint 5 0
spectrum blog_clicks billing double 6 0
spectrum blog_clicks date date 7 1
spectrum blog_clicks hour smallint 8 2

このデータを使用して、データの検証スキーマを作成することができます。

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};
次に、このスキーマを用いてデータを検証する関数を作成します。
function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}

Kinesis Firehoseを使用したニアリアルタイムのデータローディング

Kinesis Firehose上で、イベントを処理するための新しいデリバリーストリームを以下の通り作成しました。

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

このデリバリーストリームはイベントデータを毎分または100MBごとに集計し、データをS3バケットにCSV/GZIP圧縮ファイルとして書き込みます。データが検証された後は、Kinesis Firehose APIに安全に送信することができます。


if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // Continue to your next step 
        }
    });
}

これで、イベントデータを1分ごとに単一のCSVファイルとしてS3に保管することができるようになります。ファイルはKinesis Firehoseによって、UTC時間プレフィックスをYYYY/MM/DD形式で付加する形で自動的に命名され、その後オブジェクトがS3に書き込まれます。日付と時間をパーティションとして使用しているので、命名規則と場所はRedshift Spectrumスキーマに適合するよう変更する必要があります。

AWS Lambdaを使用したデータ分散の自動化

ファイルを別のロケーションにコピーするためのS3 PUTイベントをトリガーとする、シンプルなLambda関数を作成しました。ただしS3イベントは、我々のデータ構造と処理フローに適合するようリネームしています。先に触れた通り、Kinesis Firehoseによって生成されたファイルは事前に定義された構造で階層化されています。例えば以下のようなものです。

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

後は、オブジェクト名をパースして最適な形に再構成するだけです。我々のケースでは、以下のようにしています(このオブジェクトはLambda関数に渡されたもので、S3に書き込まれた当該オブジェクトに関するデータを保持しています)。

/*

	object key structure in the event object:

your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}

let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

これで、必要に応じてファイルを2つの宛先に再配布することができるようになります。一つは分単位の処理タスク、もう一つは時間単位での集計です。

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 

Kinesis Firehoseはデータを一時フォルダーに保管します。このオブジェクトを、直近1分に処理されたデータを保管する別のフォルダーにコピーします。このフォルダーは、ずっと大きなデータセットをスキャンせずに処理できるよう、小さなRedshift Spectrumテーブルに接続されています。同じデータを、1時間分のデータを保管するフォルダーにコピーし、後で集計してParquetに変換できるようにします。

データを日付と時間に分割しているので、処理対象の分が1時間の最初の1分(つまり0分)だった場合は、以下のクエリーを実行してRedshift Spectrumテーブル上に新しいパーティションを作成しています。

ALTER TABLE 
  spectrum.events 
ADD partition
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';

データが処理されテーブルに追加されたら、Kinesis Firehoseの一時領域および分単位のフォルダーから処理済みデータを削除します。

AWS GlueおよびAmazon EMRを使用したCSVのParquet変換

CSVデータをParquetに変換する時間単位ジョブを実行する上で、我々の知る限り最もシンプルなやり方は、LambdaとAWS Glueを利用する方法です(AWSビッグデータチームの素晴らしい協力に感謝します)。

AWS Glueジョブの作成

このシンプルなAWS Glueジョブが行っていることは以下の通りです。

  • 処理対象のジョブ、日付、時間のパラメーターを取得
  • Sparkコードを実行するためのSpark EMRコンテキストを作成
  • CSVデータをDataFrameに読み込み
  • データをS3バケット上にParquetとして書き込み
  • Redshift Spectrum / Amazon Athenaテーブルパーティションを追加または修正
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()

備考:Redshift SpectrumとAthenaは共にAWS Glueデータカタログを使用するため、Athenaクライアントを使用してパーティションをテーブルに追加することができました。

float、decimalおよびdoubleについていくつか注意点があります。decimalを使用することは、Redshift SpectrumとSparkで取り扱いが異なることから、予想より多くの困難を伴いました。Redshift SpectrumとSparkでdecimalを使用する度に、以下のようなエラーが発生したのです。

S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

いくつかの浮動小数点フォーマットを試した結果、Spark上ではカラムをdoubleで定義し、Spectrumではfloatで定義するのが唯一有効な組み合わせであることがわかりました。課金データをSpectrumではfloat、Sparkコードではdoubleで定義しなければならない理由はここにあります。

変換を実行するためのLambda関数の作成

次に、次のようなPythonコードを用いて、AWS Glueスクリプトを1時間ごとに起動するためのシンプルなLambda関数を作成しました。

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Amazon CloudWatch Eventsを使用してこの関数を1時間ごとに実行しました。この関数は、‘convertEventsParquetHourly’という名前のAWS Glueジョブを起動して直近の1時間を処理し、ジョブ名とパーティションの値をAWS Glueに渡します。

Redshift SpectrumとNode.js

我々の開発スタックはNode.jsを基本としています。Node.jsは大量のトランザクションを処理する必要のある、高速・軽量なサーバーでの利用に適しています。しかし、Node.js環境にはいくつかの制約があるため、ワークアラウンドを用意し、別のツールを使うなどして、処理を完結させる必要がありました。

Node.jsとParquet

Node.jsにはParquetモジュールが欠けていたため、データをCSVからParquetへ効率的に移行するためには、AWS Glue / Amazon EMRによる処理を実装する必要がありました。本来は直接Parquetに保存したいところですが、他に妥当な方法が見つかりませんでした。

現在進められている興味深いプロジェクトの一つに、Marc VertesによるParquet NPMの開発があります(https://www.npmjs.com/package/node-parquet)。まだ正式リリースには至っていませんが、このパッケージの進捗は注視するに値すると考えています。

Timestampデータ型

Parquetドキュメンテーションによると、Timestampデータは64-bit integerとしてParquetに保存されます。しかしながら、JavaScriptのネイティブの数値型は整数レンジに53ビットのみを割り当てた64-bit doubleであり、64-bit integerはサポートしていません。

このため、Node.jsではTimestampを正しくParquetに保存することができません。解決策は、Timestampを文字列として保存し、クエリー内でTimestampとしてキャストすることです。この方法による性能劣化は一切観察されませんでした。

 

知見

我々のトライアンドエラーの経験から、以下のような知見を得ることができるかと思います。

知見 #1: データ検証は極めて重要である

前述の通り、パーティション内に壊れたエントリーが一つでもあると、そのパーティションに対するクエリーは失敗します。特に、Parquetを使用している場合は、シンプルなCSVファイルに比べて修正が困難です。Redshift Spectrumでスキャンする前に、データを検証するようにして下さい。

知見 #2: データを効率的に構造化および分割する

Redshift Spectrum(あるいはAthena)の最大の利点の一つは、ノードを常時起動しておく必要がないことです。実行したクエリーによってスキャンされたデータに対してのみ、課金が行われます。

異なるクエリーのために、データを異なる並びで保持しておくことは、この点では非常に有益です。例えば、時間ベースでクエリーを実行するために日付と時間でデータを分割し、user_idベースでクエリーを実行するためにuser_idと日付で分割した別のパーティションを持つことができます。これにより、あなたのデータウェアハウスはより高速かつ効率的になるでしょう。

正しいフォーマットでデータを保管する

可能な場合はParquetを利用して下さい。高速なパフォーマンス、より少ないスキャンデータ、列指向フォーマットによる効率性など、Parquetの効果は絶大です。ただし、Kinesis Firehoseで直接に利用することはできないため、独自のETLの仕組みを実装する必要があります。AWS Glueは素晴らしいオプションです。

頻繁なタスクには小さなテーブルを作成する

Redshift Spectrumを利用し始めた頃、我々はAmazon Redshiftのコストが一日当たり何百ドルも跳ね上がっていることに気づきました。丸1日分のデータを毎分、不必要にスキャンしていることに気づいたのはその後のことです。同じS3バケットないしフォルダーに対して複数のテーブルを定義できる利点を活かし、頻度の高いクエリーには一時的で小さなテーブルを用意するようにして下さい。

知見 #3: 最適なパフォーマンスのためにAthenaとRedshift Spectrumを組み合わせる

Redshift Spectrumへの移行は、同様にAWS Glueのデータカタログを使用しているAthenaの利点を活かすことにも繋がりました。シンプルでクイックなクエリーをAthenaで捌きつつ、より複雑なクエリーにはRedshift Spectrumを使ってAmazon Redshiftのクエリーエンジンを使う、といったことができます。

Redshift Spectrumは複雑なクエリーを実行することに秀でています。プレディケイトフィルタリングや集計といった計算能力依存のタスクの多くをRedshit Spectrum層にプッシュダウンすることができ、クエリーが消費するクラスターの処理能力を大幅に抑制できます。

知見 #4: パーティション内でParquetデータをソートする

sortWithinPartiions(sort_field)を用いてパーティション内でデータをソートすることで、さらなる性能改善を達成することができます。例えば以下のようにします。

df.repartition(1).sortWithinPartitions("campaign_id")…

結論

我々は、コアデータウェアハウスとして3年以上利用してきたAmazon Redshiftに非常に満足していました。しかし、我々のクライアントベースとデータ量が飛躍的に増加したことから、Amazon Redshiftを拡張し、Redshift Spectrumのスケーラビリティ、性能、コストの利点を活用することを決断しました。

Redshift Spectrumは事実上無制限のストレージ領域を利用すること、コンピュート能力を透過的に拡張すること、および我々のユーザーに極めて高速に結果を提供することを可能にしてくれます。Redshift Spectrumによって、我々はデータを望む場所に望むコストで保管し、そしてユーザーが望む時に彼らが期待するだけの性能で分析を行えるよう、データを利用可能な状態にしておくことができるのです。

著者について

テクノロジーリーディングカンパニーでの15年とアドテク業界での7年の経験を経て、Rafi ToniはNUVIADを設立しそのCEOに就任しました。新しいテクノロジーを探索し、現実世界で現実のマネーを生み出すカッティングエッジな製品やサービスに反映することを楽しんでいます。起業家としての経験から、Rafiは新規テクノロジーへの早期適応と実践的なプログラミングこそが大きな市場価値を生み出すと信じています。

 

 

(翻訳はプロフェッショナルサービス仲谷が担当しました。原文はこちら

Amazon Redshift Spectrumによるセキュリティとコンプライアンスのためのデータベース監査ログの分析

(補足:本記事は2017年6月にAWS Bigdata Blogにポストされた記事の翻訳です。一部の記載を現時点の状況に合わせて更新してあります)

クラウドサービスの採用が増加するにつれて、組織は重要なワークロードをAWSに移行しています。これらのワークロードの中には、セキュリティとコンプライアンスの要件を満たすために監査が必要な機密データを格納、処理、分析するものがあります。監査人が良くする質問は、誰がどの機密データをいつ照会したのか、いつユーザが最後に自分の資格情報を変更/更新したのか、誰が、いつシステムにログインしたかということです。

デフォルトでは、Amazon Redshiftは、ユーザーの接続情報、変更情報、アクティビティに関連するすべての情報をデータベースに記録します。ただし、ディスク領域を効率的に管理するために、ログの使用状況と使用可能なディスク容量に応じて、ログは2〜5日間のみ保持されます。より長い時間ログデータを保持するには、データベース監査ロギングを有効にします。有効にすると、Amazon Redshiftは指定したS3バケットに自動的にデータを転送します。

Amazon Redshift Spectrumにより、Amazon S3に格納されたデータにクエリすることを可能にし、さらにAmazon Reshift のテーブルと結合することも可能です。 Redshift Spectrumを使い、S3に格納されている監査データを確認し、すべてのセキュリティおよびコンプライアンス関連の質問に答えることができます。AVRO、Parquet、テキストファイル(csv、pipe delimited、tsv)、シーケンスファイル、およびRCファイル形式、ORC、Grokなどのファイルをサポートしています。 gzip、snappy、bz2などのさまざまな圧縮タイプもサポートしています。

このブログでは、S3に保存されたAmazon Redshift の監査データを照会し、セキュリティーやコンプライアンスの質問への回答を提供する方法を説明します。

作業手順

次のリソースを設定します。

  • Amazon Redshift クラスタとパラメータグループ
  • Amazon Redshift に Redshift Spectrumアクセスを提供するIAMロールとポリシー
  • Redshift Spectrum外部表

前提条件

  • AWS アカウントを作成する
  • AWS CLI にて作業ができるように設定する
  • Amazon Redshift にアクセスできる環境を用意する。(psqlやその他クライアント)
  • S3バケットを作成する

クラスタ要件

Amazon Redshift クラスタは、次の条件を満たす必要があります。

  • 監査ログファイルを格納しているS3バケットと同じリージョンにあること
  • バージョン1.0.1294以降であること
  • ログ蓄積用のS3バケットに読み込み、PUT権限を設定されていること
  • AmazonS3ReadOnlyAccessとAmazonAthenaFullAccessの少なくとも2つのポリシーを追加したIAMロールにアタッチしていること

Amazon Redshift のセットアップ

ユーザーのアクティビティーをロギングするために、新しいパラメータグループを作ります。


aws redshift create-cluster-parameter-group --parameter-group-name rs10-enable-log --parameter-group-family Redshift-1.0 --description "Enable Audit Logging" 
aws redshift modify-cluster-parameter-group --parameter-group-name rs10-enable-log --parameters '{"ParameterName":"enable_user_activity_logging","ParameterValue":"true"}'

Amazon Redshift クラスタを上記で作成したパラメータグループを使い作成します。

aws redshift create-cluster --node-type dc1.large --cluster-type single-node --cluster-parameter-group-name rs10-enable-log --master-username <Username> --master-user-password <Password> --cluster-identifier <ClusterName>

クラスターが出来るまで待ち、作成されたらロギングを有効にします。

aws redshift enable-logging --cluster-identifier <ClusterName> --bucket-name <bucketname>

※S3のバケットポリシーなどはこちらを御覧ください。
※もしくは下記のようにマネージメントコンソールからログ用のS3のバケットを新規で作成するとバケットポリーが設定された状態のバケットが作成できます。

 

Redshift Spectrumをセットアップします。

Redshift Spectrumをセットアップするために、IAM ロールとポリシー、External Database,External Tablesを作成します。

IAM ロールとポリシー

Redshift データベースからS3バケットにアクセスするためのIAMロールを作成します。
RedshiftAssumeRole.json ファイルを作成し、下記のコマンドを実行してください。

aws iam create-role --role-name RedshiftSpectrumRole --assume-role-policy-document file://RedshiftAssumeRole.json

RedshiftAssumeRole.json
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
			"Service": "redshift.amazonaws.com"
		},
		"Action": "sts:AssumeRole"
		}
	]
}

AmazonS3ReadOnlyAccess および AmazonAthenaFullAccess の2つのポリシーをアタッチします。

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name RedshiftSpectrumRole
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name RedshiftSpectrumRole

作成したロールをAmazon Redshift クラスタに追加します。 <ClusterName> と <accountNumber> を自身のものに置き換えます。

aws redshift modify-cluster-iam-roles --cluster-identifier <ClusterName> --add-iam-roles arn:aws:iam::<accountNumber>:role/RedshiftSpectrumRole

ここまでの操作で、Amazon Redshift クラスタから S3 にアクセスできるように、Redshift Spectrum は設定されました。

External DatabaseとSchema

Amazon Redshift データベースにログインして、外部データベースとスキーマ、外部表を作成し、S3 に格納されたデータにアクセスできるよう設定します。

例えばpsql でアクセスする場合は下記がコマンド例になります。
本手順で作成している場合は、dev という名前のデータベースが作成されています。

psql -h <ご自身の設定したものを確認して変更ください>.ap-northeast-1.redshift.amazonaws.com -p 5439 -U dbadmin -d dev -W

Amazon Redshift で作成された外部データベースは、Amazon Athena データカタログに格納することが出来、Athena からも直接アクセスできます。

※本Blog (英語版)が書かれたあとにAWS Glue がリリースしておりますので、こちらも参考にしてください。

監査ログデータを照会するには、Amazon Redshift で外部データベースとスキーマを作成します。 DDL を実行する前に、以下のアカウント番号、ロール名、リージョン名を更新してください。

CREATE external SCHEMA auditLogSchema
FROM data catalog
DATABASE 'auditLogdb'
iam_role 'arn:aws:iam::<AccountNumber>:role/<RoleName>'
REGION '<regionName>'
CREATE external DATABASE if not exists;

REGION パラメータは、Athena データカタログのリージョンを指定します。 デフォルトの値は、Amazon Redshift クラスタと同じリージョンになります。(東京リージョンは、ap-northeast-1 になります。)

外部表の作成

Redshift Spectrum は S3 上のデータに対してクエリ可能になる外部表が作成可能です。 外部表は読取り専用であり、 現在、Spectrum を使用して S3 上のデータを変更することはできません。外部表は、表名の前にスキーマ名を付けた形で指定します。

3つの異なる監査ログ・ファイルを照会するための下記3つの表を作成します。

・ユーザーDDL:ユーザーが実施した DDL(CREATEやDROPなど)を記録します。
・ユーザー接続:成功または失敗したすべてのログオン情報をログに記録します。
・ユーザーアクティビティ:ユーザーが実行したすべてのクエリを記録します。
ユーザーDDL とユーザー接続のデータファイル形式は、パイプ区切りのテキストファイルです。 どちらのファイルもgzipユーティリティを使用して圧縮されています。 ユーザーアクティビティログはフリーフローテキストです。 各クエリは新しい行で区切られます。 このログも、gzip ユーティリティを使用して圧縮されています。

次のクエリのS3 バケットの場所を自身のバケットになおして実行してください。

CREATE external TABLE auditlogschema.userlog
(
userid INTEGER,
username CHAR(50),
oldusername CHAR(50),
action CHAR(10),
usecreatedb INTEGER,
usesuper INTEGER,
usecatupd INTEGER,
valuntil TIMESTAMP,
pid INTEGER,
xid BIGINT,
recordtime VARCHAR(200)
)
row format delimited
fields terminated by '|'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

CREATE external TABLE auditlogschema.connectionlog
(
event CHAR(50) ,
recordtime VARCHAR(200) ,
remotehost CHAR(32) ,
remoteport CHAR(32) ,
pid INTEGER ,
dbname CHAR(50) ,
username CHAR(50) ,
authmethod CHAR(32) ,
duration BIGINT ,
sslversion CHAR(50) ,
sslcipher CHAR(128) ,
mtu INTEGER ,
sslcompression CHAR(64) ,
sslexpansion CHAR(64) ,
iamauthguid CHAR(36)
)
row format delimited
fields terminated by '|'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

CREATE external TABLE auditlogschema.activitylog
(
logtext VARCHAR(20000)
)
row format delimited
lines terminated by '\n'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

guest ユーザーを作成して簡単な作業をしログを作成する。

ユーザー “guest”  を作成してログインし、 “person” という表を作成します。次に、テーブルに対してクエリーを実行します。

管理ユーザーとしてログインし、新しいユーザー “guest” を作成します。

CREATE USER guest PASSWORD 'Temp1234';

ユーザー  “guest” としてログインし、以下の DDL を実行します。

CREATE TABLE person (id int, name varchar(50),address VARCHAR(500));

INSERT INTO person VALUES(1,'Sam','7 Avonworth sq, Columbia, MD');
INSERT INTO person VALUES(2,'John','10125 Main St, Edison, NJ');
INSERT INTO person VALUES(3,'Jake','33w 7th st, NY, NY');
INSERT INTO person VALUES(4,'Josh','21025 Stanford Sq, Stanford, CT');
INSERT INTO person VALUES(5,'James','909 Trafalgar sq, Elkton, MD');

guest  でログインしている間に、person テーブルでいくつかのクエリを実行して、アクティビティログを生成します。

SELECT * 
  FROM person;

SELECT * 
  FROM person 
 WHERE name='John';

次に、上記で作成した3つの外部表(それぞれのログ)毎に、1つのクエリーの具体例を説明します。

ユーザーDDL ログ

次のクエリは、ユーザー guest がその日に実行した作業が確認できます。

SELECT username
,action
,recordtime 
  FROM auditlogschema.userlog 
 WHERE action in ('create','alter','drop','rename') 
   AND username='guest';

ユーザー接続ログ

次のクエリでは、ユーザー guest がログインした remotehost名、時刻を確認できます。

SELECT event
,recordtime
,remotehost 
  FROM auditlogschema.connectionlog 
 WHERE length(username) >0 
   AND username='guest' 
ORDER BY recordtime;

ユーザーアクティビティログ

次のクエリは、誰がいつ、person表 にアクセスしたか、またその際に流したクエリを確認できます。

SELECT pu.usename
	,substring(logtext,2,strpos(logtext,'UTC')+1)UTCTime
	,substring(logtext,strpos(logtext,'LOG:')+5) query
  FROM auditlogschema.activitylog al,pg_user pu
 WHERE logtext like '%xid=%'
   AND logtext not like '%SELECT 1%'
   AND logtext not like '%SET %'
   AND logtext like '%from person%'
   AND substring(substring(logtext,strpos(logtext,'userid=')+7),1,strpos(substring(logtext,strpos(logtext,'userid=')+7),' '))=pu.usesysid;

 

まとめ

このBlogでは、Amazon Redshift に新たに追加された機能 (Redshift Spectrum) を使用して、S3に格納されている監査ログデータを照会し、セキュリティおよびコンプライアンス関連の質問に簡単に回答する方法について説明しました。

Amazon Redshift Spectrum は2017年10月20日現在、東京リージョンでも利用可能となっております。

Amazon Redshift Spectrum の詳細については、こちらをご覧ください。 新機能についての詳細は、「Get Started」をご覧ください。

翻訳:パートナーソリューションアーキテクト 相澤

Amazon Redshift Spectrumが東京リージョンで利用可能になりました & Spectrum 一般公開後のアップデート

Amazon Redshift は高速で完全マネージド型のデータウェアハウスです。ペタバイト級のデータを高速なローカルストレージに取り込み、多様なクエリを処理可能なデータウェアハウスを実現可能です。

今年の4月に新機能としてAmazon Redshift Spectrumが発表されました。これはデータをAmazon S3に置いたままロードせずにAmazon Redshiftからクエリする事を可能にする新機能であり、Amazon Redshiftが処理可能なデータサイズをペタバイトから、エクサバイト級に押し上げるものです。データ置き場(Amazon S3)とデータ処理基盤(Amazon Redshift)が分離するということは、単に扱えるデータサイズが増えるだけでなく、これまで以上に多彩なワークロードを実現可能にしました。例えば、ロード時間なしで素早くデータ分析を開始したり、あまりアクセスしない古いデータと頻繁にアクセスするデータの置き場所を変えることで、コスト効率の良いデータウェアハウスを実現しつつ、全期間のデータ分析を実現する等です。

Amazon Redshift Spectrumについての詳細を確認するには、以下の記事を参照してください。

Amazon Redshift Spectrumは北バージニアリージョンから提供を開始し、継続的に利用可能なリージョンを増やしてきました。そして本日からAmazon Redshift Spectrumが東京リージョンで利用可能になりました!

AWSのサービスはリリースした後も新機能が継続的に追加されていきます。Amazon Redshift Spectrumもその例外ではなく、上述のブログには書かれていなかった機能が多数追加されています。本稿ではGA(一般利用開始)から現在までの期間でどのような機能追加、改善があったのかを解説します。

継続的な処理性能の改善

Amazon Redshiftでは内部的な改善による処理性能の向上が継続的に行われています。Amazon Redshift Spectrumでの改善の1つとして、大きいファイルの分割アクセスがあります。GAの時点では1つのファイルを1つのSpectrum層のプロセスが処理していたため、ファイルサイズが巨大だった場合に読み取りがボトルネックになる可能性がありましたが、その後の改善で巨大なファイルは自動的に分割して読み取り処理を行なうように改善されています。(巨大ファイルをそのまま置く事を推奨しているわけではありません。可能であれば利用者の方で適切なサイズに分割しておく事が推奨されます)

Amazon Redshift Spectrumのパフォーマンスについては以下の記事も参照してください。

対応フォーマットの追加

Amazon Redshift Spectrumでは多彩なフォーマットに対応しているのが特長です。CSV、TSVといった区切りファイル、Parquet、RCFileといったカラムナフォーマット等です。そしてGA後も継続的に対応フォーマットが追加されています。例えばカラムナフォーマットのORCファイルや、Regex(正規表現)等がGA後に追加されました。現時点では以下のファイルフォーマットをサポートしています。

  • AVRO
  • PARQUET
  • TEXTFILE
  • SEQUENCEFILE
  • RCFILE
  • RegexSerDe
  • ORC
  • Grok
  • OpenCSV

また読み取りの際の機能追加も行われています。例えばskip.header.line.countプロパティが追加されています。これはCSVファイル等のテキストファイルの先頭数行を読み飛ばすという機能で、列の名前等が先頭行に入っているファイルの読み取りに便利です。

参照)CREATE EXTERNAL TABLE

外部表を含んだVIEWのサポート

GA時には、外部表(Amazon S3上にデータがある表)に対してVIEWを作成する事ができないという制約がありましたが、現在はVIEWの定義に外部表を含むことが可能になっています。これはCREATE VIEWにWITH NO SCHEMA BINDINGオプションが使えるよう機能追加されたことで実現可能になりました。外部表をVIEW定義に含めることが出来ると、データウェアハウスとしての利用がさらに便利になります。例えば最近のデータはローカルストレージ上の表にあり、古いデータがAmazon S3上にあるようなケースでも、全体をUNION ALLで結合したVIEWを作成することで、ユーザはどちらのデータがどちらの領域にあるのか気にすることなく分析を実行することが可能になります。

既存JDBC/ODBCアプリケーションとの互換性向上

JDBCドライバやODBCドライバも改善が続けられています。Amazon Redshift Spectrumリリース当初はドライバのメタデータ取得機能(表の一覧や表定義を取得するための機能)が、外部表のデータを返すことが出来なかったため、SQLワークベンチのようなGUIアプリケーションにおいて、外部表へのSQLは実行できるが、GUI上の表一覧の画面には外部表が表示されないという課題がありました。最新のドライバでは外部表もローカルの表と同じようにメタデータが取得できるように改善されており、既存のJDBC/ODBCアプリケーションでの互換性が向上しています。

AWS Glueデータカタログとの連係

Amazon Redshift Spectrumは外部表のメタデータ管理(どこにファイルが置かれていて、スキーマ定義はどうなっているかといった情報の管理)のために、Amazon Athena、もしくはお客様管理のHiveメタストアを利用することが可能です。AWS Glueがリリースされた際に拡張され、AWS GlueのデータカタログをAmazon Redshift Spectrumのメタデータ管理に利用可能になりました。AWS Glueのデータカタログはサーバレスであるために運用負荷を低く抑えることが可能なだけでなく、クローラーによるスキーマの自動更新が実現可能です。新しいファイルがAmazon S3に置かれると、それをクローラーが発見し、データカタログを更新するため、ユーザが手動で登録すること無しにAmazon Redshift Spectrumからクエリ可能にする事が実現可能になりました。

参照)Amazon Redshift Spectrum Now Integrates with AWS Glue

システム表等、周辺情報を取得する方法の改善

ローカルストレージに存在する表と、外部表を区別することなく、表や列の情報を取り出すためのSVV_TABLESSVV_COLUMNSが追加されました。また外部表のスキーマを分かりやすく返すSVV_EXTERNAL_SCHEMASも追加されています。

また疑似列として$pathと $sizeが利用可能になりました。この列をクエリで指定することで、クエリ対象の外部表のサイズやS3でのURLを得る事が可能です。また、spectrum_enable_pseudo_columns パラメータをfalseに設定することでこの疑似列使えないよう制限することも可能です。

参照)CREATE EXTERNAL TABLE ※本稿執筆時点では日本語マニュアルの既述には$path、$sizeが無いため、英語版に切り替えてご覧ください

ご利用いただくには

東京リージョンで、新しく Redshift クラスタを立ち上げていただいた場合には、そのまま Spectrum の機能をお使いいただくことができます。Spectrum の始め方については、公式ドキュメントをご覧ください。

東京リージョンですでにクラスターをご利用中のお客さまについては、メンテナンスウィンドウ中に順次メンテナンスイベントが実施されていきますので、その後にご利用いただけるようになります。もし既存のクラスター上で、すぐに Spectrum をご利用になりたい場合には、WLM などの設定を変更した上でクラスターを再起動していただくことにより、Spectrum の機能がご利用いただけるようになります。

引き続きご期待ください

本日の発表により、バージニア北部、オレゴン、オハイオ、東京、アイルランドリージョンでAmazon Redshift Spectrumが利用可能になりました。今後もAmazon Redshiftには継続的に新機能、改善が行われていく予定です。最新情報はAWSの最新情報ページで告知されますのでぜひご覧ください(RSSでのアクセスも可能です)。

AWS 下佐粉 昭 (simosako@)

データウェアハウスをエクサバイト級に拡張するAmazon Redshift Spectrum

(補足:本記事は2017年7月にAWS Bigdata Blogにポストされた記事の翻訳です。一部の記載を現時点の状況に合わせて更新してあります)

何年も前、最初にクラウドベースのデータウェアハウスを構築する可能性について検討を始めた際、我々は、我々の顧客が増え続ける一方の大量のデータを持つ一方で、そのごく一部のデータのみが既存のデータウェアハウスやHadoopシステムに投入され分析に利用されているという事実に直面しました。同時に、これがクラウド特有の特殊事情ではないこともわかりました。エンタープライズストレージ市場の成長率がデータウェアハウス市場のそれを大きく上回る様々な業界においても、状況は同じだったのです。

我々はこれを“ダークデータ”問題と名付けました。我々の顧客は、彼らが収集したデータに利用されていない価値があることに気づいていました。そうでなければなぜそれを保管するコストをかけるでしょうか?しかしながら、彼らが利用できるシステムは、これらのデータ全てを処理するには遅すぎ、複雑すぎ、高すぎたため、データのサブセットのみを利用することになりました。彼らはいつか誰かが解決策を見出すことへの楽観的な期待とともに、これらのデータを保持し続けました。

Amazon Redshift はダークデータ問題の解決に寄与することから、AWSサービスの中でも最も成長の速いサービスの一つとなりました。このソリューションは大半の代替案に比べ、少なくとも一桁は安価で、かつ高速でした。また、Amazon Redshiftは当初からフルマネージドのサービスで、ユーザーはキャパシティやプロビジョニング、パッチ対応、監視、バックアップ等を始めとする様々なDBA課題について頭を悩ませる必要がありませんでした。 VevoYelpRedfin,Edmunds, NTTドコモなどの多くの顧客が、Amazon Redshiftに移行して、クエリー性能の改善、DBAオーバーヘッドの削減、そして分析コストの低減を実現しました。

我々の顧客のデータは、極めて速いペースで増え続けています。おしなべて、ギガバイトのデータはペタバイトとなり、平均的なAmazon Redshift顧客が分析するデータ量は毎年二倍になっています。我々が、増加するデータを扱う上でお客様の手助けとなる機能群を実装してきた理由はここにあります。例えばクエリースループットを二倍にする、圧縮率を三倍から四倍に改善する、といったことです。これらは、お客様がデータを破棄したり分析システムから削除したりすることを考慮せざるを得なくなる時期を遅らせることができます。しかしながら、ペタバイトのデータを日々生成するAWSユーザーが増えており、こうしたデータはわずか3年でエクサバイトの水準に達します。このようなお客様のためのソリューションは存在しませんでした。もしデータが毎年倍々になるのであれば、コスト・性能・管理のシンプルさに革新をもたらす、新たな、破壊的なアプローチを見付けることを強いられるまで、そう長い時間はかからないでしょう。

今日利用可能な選択肢に目を向けてみましょう。お客様は、Amazon EMRを用いて、Apache HiveなどのHadoopベースの技術を利用することができます。これは実際のところ、非常に素晴らしいソリューションです。抽出と変換のステップを経ることなく、Amazon S3上のデータを簡単かつ低コストで直接操作できるようになるからです。クラスターは必要な時に起動することができ、実行対象となる特定のジョブに合うよう適切にサイジングすることができます。こうしたシステムは、スキャンやフィルター、集計といったスケールアウト型の処理には最適です。一方で、これらのシステムは複雑なクエリー処理には向いていません。例えば、結合処理ではノード間でデータをシャッフルする必要が生じます。巨大なデータと多数のノードが存在する場合、この処理は極めて低速になります。そし結合処理は、重要な分析課題の大半において本質的に重要なものです。

Amazon Redshiftのような、列指向かつ超並列型のデータウェアハウスを利用することもできます。こうしたシステムは、巨大なデータセットに対する結合や集計といった複雑な分析クエリーを、単純かつ高速に実行することを可能にします。特に、Amazon Redshiftは、高速なローカルディスクと洗練されたクエリー実行、そして結合処理に最適化されたデータフォーマットを活用します。標準SQLを用いるので、既存のETLツールやBIツールを活用することもできます。一方で、ストレージとCPU双方の要件を満たすようにクラスターをプロビジョニングする必要があり、データロードも不可欠となります。

いずれのソリューションも強力な特長を備えていますが、お客様はどちらの特長を優先するかの判断を強いられます。我々はこれを「ORの抑圧(※)」と見做しています。ローカルディスクのスループットとAmazon S3のスケーラビリティは両立できない。洗練されたクエリー最適化と高度にスケールするデータ処理は両立できない。最適化されたフォーマットによる高速な結合処理性能と、汎用的なデータフォーマットを用いる様々なデータ処理エンジンは両立できない、などです。しかし、この選択は本来迫られるべきではありません。この規模においては、選択する余裕など到底ないからです。お客様が必要とするのは「上記の全て」なのです。

※ジム・コリンズが著書「ビジョナリー・カンパニー」で提示した概念。一見矛盾する力や考え方は同時に追求できない。

Redshift Spectrum

Redshift Spectrumは、こうした「ORの抑圧」に終止符を打つべく開発されました。Redshift Spectrumによって、Amazon Redshiftを利用されているお客様はAmazon S3上のデータに対し

簡単にクエリーを実行できるようになります。Amazon EMRと同様に、お客様はオープンなデータフォーマットと安価なストレージの恩恵を享受できます。データを抽出し、フィルターし、射影し、集計し、グループ化し、ソートするために、何千ものノードにスケールアウトすることも可能です。Amazon Athenaと同様に、Redshift Spectrumはサーバーレスであり、プロビジョニングや管理は必要ありません。単に、Redshift Spectrumを利用したクエリーが実行されている間に消費中のリソースに対してお支払いいただくだけです。Amazon Redshift自身と同様に、洗練されたクエリーオプティマイザー、ローカルディスク上のデータへの高速アクセス、そして標準SQLの恩恵を得ることができます。そして、他のどのようなソリューションとも異なり、Redshift Spectrumはエクサバイト級ないしはそれ以上のデータに対して、高度に洗練されたクエリーを、わずか数分で実行することが可能です。

Redshift SpectrumはAmazon Redshiftの組み込み機能の一つであり、お客様の既存のクエリーやBIツールはシームレスにご利用いただくことができます。背後では、我々は複数のアベイラビリティゾーンに跨がった何千ものRedshift Spectrumノードのフリートを運用しています。これらのノードは、処理する必要があるデータに基づいて透過的にスケールし、クエリーに割り当てられます。プロビジョニングや利用の確約は不要です。Redshift Spectrumは同時実行性にも優れています。お客様は任意のAmazon S3上のデータに対して、複数のAmazon Redshiftクラスターからアクセスすることができます。

Redshift Spectrumクエリーのライフサイクル

Redshift Spectrumクエリーのライフサイクルは、クエリーがAmazon Redshiftクラスターのリーダーノードに送信された時に始まります。リーダーノードはクエリーを最適化し、コンパイルし、その実行命令をAmazon Redshiftクラスターのコンピュートノード群に送ります。次に、コンピュートノード群は外部テーブルに関する情報をデータカタログから取得し、当該クエリーのフィルターと結合に基づいて、無関係なパーティションを動的に取り除きます。コンピュートノードはまた、ノード上でローカルに利用可能なデータを精査して、Amazon S3内の関連するオブジェクトだけを効率的にスキャンするようプレディケイトプッシュダウンを行います。

Amazon Redshiftコンピュートノードは、続いて、処理する必要のあるオブジェクトの数に基づいて複数のリクエストを生成し、それらをRedshift Spectrumに一斉に送ります。Redshift Spectrumは、AWSリージョンごとに何千ものAmazon EC2インスタンスをプールしています。Redshift SpectrumのワーカーノードはAmazon S3上のデータをスキャン、フィルター、集約し、処理に必要なデータをAmazon Redshiftクラスターにストリーミングします。その後、最終的な結合とマージの処理がクラスター内でローカルに実行され、結果がクライアントに返されます。

Redshift Spectrumのアーキテクチャーはいくつかのアドバンテージをもたらします。第一に、コンピュートリソースをAmazon S3のストレージレイヤーとは独立した形で、弾力的にスケールさせることができます。第二に、Amazon S3上の同一データに対して複数のAmazon Redshiftクラスターを起動しクエリーを実行できるため、(単一のAmazon Redshiftクラスターに比べて)ずっと高い同時実行性能を提供します。第三に、Redshift SpectrumはAmazon Redshiftのクエリーオプティマイザーを利用して効率的なクエリープランを生成します。これは複数テーブルの結合やWindow関数を伴う複雑なクエリーでも同様です。第四に、ソースデータをそれらのネイティブなフォーマット(Parquet、RCFile、ORC、CSV、TSV、Sequence、Avro、RegexSerDe、Grok等、さらに多くのフォーマットが今後追加される予定です)で直接操作することが可能です。このことは、データロードやデータ変換処理が不要であることを意味します。また、データを重複して保有することや、それに伴う追加のコストも不要にします。第五に、オープンなデータフォーマットを用いることで、単一のAmazon S3データを元に、様々なチーム間で他のAWSサービスや実行エンジンを柔軟に利用し協働することを可能にします。お客様はこれらのアドバンテージ全てを享受することが可能です。また、Redshift SpectrumはAmazon Redshiftの一機能であることから、Amazon Redshiftと同じレベルでのエンドツーエンドセキュリティ、コンプライアンス、および認定を得ることができます。

性能とコストパフォーマンスを目的とした設計

Amazon Redshift Spectrumでは、実際に実行したクエリーでスキャンされたデータ量の分のみ、お支払いいただきます。ファイル分割、列指向フォーマット、データ圧縮を利用してAmazon S3から読み取るデータ量を最小化することをお勧めします。これらはクエリー性能を大幅に改善しコスト削減にも寄与するため、データウェアハウスでは重要なファクターとなります。Amazon S3上のデータを日、時、およびその他のカスタムキーで分割することで、Redshift Spectrumは無関係なパーティションを動的に取り除き、処理対象のデータ量を最小化することができるようになります。データがParquetのような列指向フォーマットで保管されている場合には、Redshift Spectrumは行全体を処理する代わりに、当該クエリーによって必要とされる列だけをスキャンします。同様に、データがRedshift Spectrumでサポートされている圧縮アルゴリズムで圧縮されている場合は、スキャンされるデータはより少ない量で済みます。

Amazon RedshiftおよびRedshift Spectrumでは、両者のいわゆる「いいとこ取り」が可能となります。もし同一データに対する頻繁なクエリーを実行する必要があるなら、Amazon Redshiftにデータを保存することで、フル機能を持ち、構造化されたデータを定額で保存およびクエリーできるデータウェアハウスの利便性を享受できるでしょう。同時に、それが過去のデータであるか直近のデータであるかに関わらず、さらなるデータを複数のオープンフォーマットの形式でAmazon S3に保持して、Amazon Redshiftのクエリー機能をAmazon S3データレイクへと拡張することも可能です。

そしてもうひとつ、データロードが不要になること – これにより、Amazon Redshift Spectrumはデータウェアハウスをエクサバイト級にまで拡張します。Redshift Spectrumは「ORの抑圧」を終わらせます。お客様が、データを望む場所に望むフォーマットで保存し、必要な時に標準SQLを用いて高速に処理できる状態に置くことを、現在と将来にわたって可能にするのです。

 

参考文献

Amazon Redshift Spectrum 10 のベストプラクティス

著者について

Maor Kleider はAmazon Redshiftのシニアプロダクトマネージャーです。Amazon Redshiftは、高速、シンプルかつコスト効率のよいデータウェアハウスです。Maorはお客様およびパートナーの皆様と協働し、彼らの特有なビッグデータユースケースに付いて学び、その利用体験をよりよくすることに情熱を傾けています。余暇では、家族とともに旅行やレストラン開拓を楽しんでいます。

(翻訳はAWS プロフェッショナルサービス仲谷が担当しました。)

Amazon Redshiftに新世代のDC2ノードが追加 – 価格はそのままで最大2倍の性能向上

Amazon Redshiftは高速で完全マネージド型のデータウェアハウス(DWH)です。ペタバイト級までスケールアウトが可能であり、Amazon Redshift Spectrumを利用することでAmazon S3上に保存されたエクサバイト級のデータにロード無しでクエリを実行することも可能です。

Amazon Redshiftがリリースされた当初からご利用いただいている方であれば、当初はHDD搭載のDW1と呼ばれるノード1種類しか無かったことをご記憶かと思います。続いてSSDを搭載した新しいノード追加され、DW1(HDDベース)とDW2(SSDベース)の2タイプから選択可能になりました。

その後、DW1の後継がリリースされる際にHDDベースはDense Storage (DS) に、SSDベースはDense Compute (DC)とそれぞれの特性を表した名前に整理され、DS1(旧DW1)の後継としてDS2がリリースされました。DS2リリース時のブログエントリはこちらにありますが、その登場はDS1ユーザから驚きをもって迎えられました。DWHとしての性能が大きく向上しつつ、ノードの価格は据え置きだったからです。

次はDense Compute (DC)の番です。DC2が本日より利用可能になりました!

第二世代のDense Computeノード

DC2はDC1の後継となるノードであり、高いスループットと低いレイテンシを必要とするDWHワークロードのために設計されています。CPUはIntel E5-2686 v4(Broadwell)になり、高速なDDR4メモリを搭載。ストレージはNVMe接続のSSDです。
私達はAmazon Redshiftがこのより高速なCPU、ネットワーク、ストレージの性能をDC2で十分に発揮できるようチューニングを行い、結果としてDC1との同一価格構成での比較で最大2倍のパフォーマンスを発揮しています。DC2.8xlargeノードではスライスあたりで2倍のメモリを搭載しており、ストレージレイアウトの改善によって30%多いデータが保管できるようになりました。これらの改善がされた新世代のノードを旧世代と同じ価格で提供します。

DC2.8xlargeではパフォーマンスを最大化するためにスライス数が変更されています。旧世代のDC1.8xlargeでは1ノードあたり32スライスでしたが、DC2.8xlargeでは16スライスに変更されています。DC2.largeはDC1.largeと変わらず1ノード2スライスのままです。
このため、DC1.8xlarge (もしくはDS)からDC2.8xlargeへ移行するためにはクラスターのリサイズが必要になります。DC1.largeからDC2.largeへの移行については、リサイズもしくはDC1で取得したスナップショットからの作成が可能です。

本日より利用可能です

DC2ノードはUS East (N. Virginia), US East (Ohio), US West (N. California), US West (Oregon), EU (Frankfurt), EU (Ireland), EU (London), Asia Pacific (Singapore), Asia Pacific (Tokyo), Asia Pacific (Sydney), Asia Pacific (Seoul), Asia Pacific (Mumbai), Canada (Central), South America (São Paulo) リージョンで本日よりご利用いただけます。

詳細な情報はこちらのブログも参照してください。また価格についてはAmazon Redshift料金ページを確認してください。

AWS 下佐粉 昭 (@simosako)