Amazon Web Services ブログ

Redshift 横串検索を使用して、簡略化された ETL およびライブデータクエリソリューションを構築する

最高の ETL は、ETL を行わないことだという話を聞いたことがあるかもしれません。Amazon Redshift は、横串検索でこれを可能にしました。最初のリリースでは、この機能により、Amazon Redshift 外部スキーマを使用して、Amazon Aurora PostgreSQL または Amazon RDS for PostgreSQL でデータをクエリできます。横串検索は、システムビューとドライバー API を通じてソースデータベースからメタデータも公開します。これにより、Tableau や Amazon Quicksight などのビジネスインテリジェンスツールがローカルコピーを作成しなくても、Amazon Redshift に接続して PostgreSQL のデータにクエリを実行できます。これにより、新しいデータウェアハウスパターン (ライブデータクエリ) が可能になります。これにより、PostgreSQL データベースからデータをシームレスに取得したり、遅延バインディングビューにデータを構築したりできます。遅延バインディングビューでは、運用中の PostgreSQL データ、分析的な Amazon Redshift ローカルデータ、および Amazon S3 データレイク内の過去の Amazon Redshift Spectrum データを組み合わせます。

簡略化された ETL のユースケース

この ETL のユースケースでは、横串検索を使用して、おなじみの upsert パターンを簡略化できます。ソースデータベース内のデータをクエリすることにより、Amazon S3 での増分抽出の必要性と、COPY を介したその後のロードのバイパスを回避できます。この変更は、COPY コマンドを外部テーブルへのクエリに置き換える 1 行のコードにすることができます。次のコードを参照してください。

BEGIN;
CREATE TEMP TABLE staging (LIKE ods.store_sales);
-- replace the following COPY from S3 
COPY staging FROM 's3://yourETLbucket/daily_store_sales/' 
     IAM_ROLE 'arn:aws:iam::<account_id>:role/<s3_reader_role>' DELIMITER '|' COMPUPDATE OFF;
-- with this federated query to load staging data from PostgreSQL source
INSERT INTO staging SELECT * FROM pg.store_sales p
	WHERE p.last_updated_date > (SELECT MAX(last_updated_date) FROM ods.store_sales)
DELETE FROM ods.store_sales USING staging s WHERE ods.store_sales.id = s.id;
INSERT INTO ods.store_sales SELECT * FROM staging;
DROP TABLE staging;
COMMIT;

上記の例では、テーブル pg.store_sales が PostgreSQL にあり、横串検索を使用して、Amazon Redshift のステージングテーブルにロードするための最新のデータを取得しています。それにより、実際に削除された状態を維持しており、挿入操作に変更はありません。このパターンは、おそらく横串検索の最も一般的な用法でしょう。

外部スキーマのセットアップ

前の例の外部スキーマ pg は、次のように設定されています。

CREATE EXTERNAL SCHEMA IF NOT EXISTS pg                                                                         
FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'                                                                                     
URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432                                                                                                               
IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

CREATE EXTERNAL SCHEMA コマンドを Spectrum で使用してきたことで慣れている場合は、横串検索を有効にする際、新しいパラメータオプションがあることに注意してください。

FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'

Amazon Redshift Spectrum は AWS GlueAmazon Athena、または Hive 内にある外部データカタログを参照するのに対し、このコードは Postgres カタログを指します。また、Amazon Redshift が連携クエリに対してより多くのソースデータベースをサポートするため、FROM で使用するより多くのキーワードを期待します。デフォルトでは、SCHEMA を指定しない場合、public がデフォルトになります。

ターゲットデータベース内で、DATABASE ‘dev’ および SCHEMA ‘retail’ を識別します。これにより、Amazon Redshift テーブル pg.<some_table> へのクエリは、PostgreSQL に対して、dev データベースの retail.<some_table> のリクエストとして発行されます。Amazon Redshift では、クエリ述語はプッシュダウンされ、完全に PostgreSQL で実行されるため、後続の操作のために Amazon Redshift に返される結果セットが削減されます。さらに進んで、クエリプランナーは、Amazon Redshift と PostgreSQL の間の結合を最適化するために、外部テーブルのカーディナリティ推定を導出します。前の例から:

URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432

PostgreSQL エンドポイントとポートの両方を参照する URI および PORT パラメータは一目瞭然ですが、設定で考慮すべき点がいくつかあります。

  • Aurora または Amazon RDS for PostgreSQL のリードレプリカエンドポイントを使用して、プライマリインスタンスの負荷を軽減します。
  • Amazon RDS for PostgreSQL インスタンス、Aurora サーバーレスまたはプロビジョニングされたインスタンス、および Amazon Redshift クラスターをセットアップして、同じ VPC およびサブネットグループを使用します。これにより、クラスター向けのセキュリティグループを、Aurora または Amazon RDS for PostgreSQL インスタンス向けのセキュリティグループのインバウンドルールに追加できます。
  • Amazon Redshift と Aurora または Amazon RDS for PostgreSQL の両方が異なる VPC にある場合は、VPC ピアリングをセットアップします。詳細については、「VPC ピアリングとは」を参照してください。

リモートデータベース認証情報用の AWS Secrets Manager の設定

AWS Secrets Manager リモートデータベース認証情報を取得するために、この例では次のコードを使用しています。

IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

SECRET_ARN もロールの IAM ポリシーに埋め込まれているため、これら 2 つのパラメータは相互に関連しています。

Secrets Manager などのサービスが存在せず、Amazon Redshift から PostgreSQL に横串検索を発行したい場合、CREDENTIALS のようなパラメータを介して CREATE EXTERNAL SCHEMA コマンドにデータベース認証情報を提供する必要があります。そのパラメータは、COPY コマンドでも使用します。ただし、このハードコーディングされたアプローチでは、PostgreSQL の認証情報が期限切れになる可能性があることを考慮していません。

この問題を回避するには、シークレットを管理する上で一元化されたサービスを提供する Secrets Manager 内に PostgreSQL データベースの認証情報を保持します。Amazon Redshift はこのような認証情報を取得して使用するため、一時的なものであり、生成されたコードには格納されず、クエリの実行後に破棄されます。

認証情報を Secrets Manager に保存するには、最大で数分かかります。新しいシークレットを保存するには、次の手順を実行します。

 

  1. Secrets Manager コンソールで、[Secrets] を選択します。
  2. [Store a new secret] を選択します。
  3. [Store a new secret] セクションで、以下を実行します。

 

  • PostgreSQL データベースの認証情報を入力します
  • シークレットに名前を付けます。たとえば、MyRDSCredentials
  • ローテーションを設定します (これは後で有効にすることができます)
  • 必要に応じて、好みのプログラミング言語を使用してシークレットにアクセスするためのプログラムコードをコピーします (この記事では必要ありません)。
  1. [Next] を選択します。

認証情報を簡単に取得することもできます。

  1. Secrets Manager コンソールで、シークレットを選択します。
  2. [Retrieve secret value] を選択します。

次のスクリーンショットは、シークレット値の詳細を示しています。

このシークレットは、シークレット ARN を介して参照される AWS リソースになりました。次のスクリーンショットを参照してください。

IAM ロールを設定する

これで、シークレット ARN を IAM ポリシーに埋め込み、ポリシーに名前を付け、IAM ロールにアタッチすることで、すべてをまとめることができます。次のコードを参照してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

次のスクリーンショットは、MyRDSSecretPolicy ポリシーを含む myFederatedQueryRDS という IAM ロールの詳細を示しています。これは、CREATE EXTERNAL SCHEMA DDL の IAM_ROLE パラメータで提供されるのと同じロールです。

最後に、同じ IAM ロールを Amazon Redshift クラスターにアタッチします。

  1. Amazon Redshift コンソールで、クラスターを選択します。
  2. [Actions] ドロップダウンメニューから、[Manage IAM roles] を選択します。
  3. 作成した IAM ロールを選択して追加します。

これで、以下の手順が完了しました。

  1. IAM ポリシーとロールを作成する
  2. PostgreSQL データベースの認証情報を Secrets Manager に保存する
  3. シークレットと IAM ロールを使用して PostgreSQL エンドポイントで認証する Amazon Redshift 外部スキーマ定義を作成します
  4. Amazon Redshift データベースとスキーマ間のマッピングを PostgreSQL データベースとスキーマに適用して、Amazon Redshift が PostgreSQL テーブルにクエリを発行できるようにします。

この設定は一度だけ行う必要があります。

運用データのクエリ

このセクションでは、別のユースケースについて説明します。それは、複数のソースデータベースにわたる運用データのクエリです。このユースケースでは、グローバルなオンライン小売業者が、さまざまな地域のさまざまなチームにデータベースをデプロイさせています。

  • リージョン us-east-1 はサーバーレス Aurora PostgreSQL を実行します。
  • リージョン us-west-1 は、プロビジョニングされた Aurora PostgreSQL を実行します。これは、us-east-1 にリードレプリカを持つグローバルデータベースとしても設定されています。
  • リージョン eu-west-1 は、us-east-1 のリードレプリカを使用して Amazon RDS for PostgreSQL インスタンスを実行します。

サーバーレスでプロビジョニングされた Aurora PostgreSQL と Amazon RDS for PostgreSQL は、リージョン us-east-1 の Amazon RDS コンソールに表示されます。次のスクリーンショットを参照してください。

このユースケースでは、us-east-1 の同じ VPC とサブネットをローカルサーバーレス Aurora PostgreSQL と共有するように Aurora と Amazon RDS のリードレプリカを設定したと想定します。さらに、これらの各インスタンスの認証情報のシークレットと IAM ロール MyCombinedRDSSecretPolicy をすでに作成しています。後者はより寛容で、Amazon Redshift が任意のリージョン内の Amazon RDS シークレット値を取得できます。ただし、本番運用ではセキュリティに注意し、IAM ポリシーの個別のステートメントで各シークレットのリソース ARN を明示的に指定します。次のコードを参照してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:*:555566667777:secret:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

その後、Amazon Redshift の外部スキーマ DDL は、結合された IAM ロールと個々のシークレット ARN を参照できます。次のコードを参照してください。

CREATE EXTERNAL SCHEMA IF NOT EXISTS useast
FROM POSTGRES
DATABASE 'dev'
URI 'us-east-1-aurora-pg-serverless.cluster-samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyEastUSAuroraServerlessCredentials-dXOlEq'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS uswest
FROM POSTGRES
DATABASE 'dev'
URI 'global-aurora-pg-west-coast-stores-instance-1.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-west-1:555566667777:secret:MyWestUSAuroraGlobalDBCredentials-p3sV9m'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS europe
FROM POSTGRES
DATABASE 'dev'
URI 'eu-west-1-postgres-read-replica.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:eu-west-1:555566667777:secret:MyEuropeRDSPostgresCredentials-mz2u9L'
;

この遅延バインディングビューは、すべての PostgreSQL インスタンス内の TPC-H ラインアイテムテストデータへの基になるクエリを抽象化します。次のコードを参照してください。

CREATE VIEW global_lineitem AS
SELECT 'useast' AS region, * from useast.lineitem
UNION ALL
SELECT 'uswest', * from uswest.lineitem
UNION ALL
SELECT 'europe', * from europe.lineitem
WITH NO SCHEMA BINDING
;

Amazon Redshift は、複数の分散データベースにかけてライブ運用データをクエリし、この機能を使用して結果を統合ビューに集約できます。次のコードを参照してください。

dev=# SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
      FROM global_lineitem
      WHERE l_shipdate >= '1997-01-01'
      AND l_shipdate < '1998-01-01'
      AND month < 4
      GROUP BY 1, 2
      ORDER BY 1, 2
;
 region | month |      sales
--------+-------+------------------
 europe |     1 | 16036160823.3700
 europe |     2 | 15089300790.7200
 europe |     3 | 16579123912.6700
 useast |     1 | 16176034865.7100
 useast |     2 | 14624520114.6700
 useast |     3 | 16645469098.8600
 uswest |     1 | 16800599170.4600
 uswest |     2 | 14547930407.7000
 uswest |     3 | 16595334825.9200
(9 rows)

次のクエリプランで Remote PG Seq Scan を調べると、3 つの PostgreSQL データベースすべてで実行するために述語がプッシュダウンされていることがわかります。最初の単純化された ETL の使用例とは異なり、データが適切にクエリおよびフィルタリングされるため、ETL は実行されません。次のコードを参照してください。

dev=# EXPLAIN SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
FROM global_lineitem
WHERE l_shipdate >= '1997-01-01'
AND l_shipdate < '1998-01-01'
AND month < 4
GROUP BY 1, 2
ORDER BY 1, 2
;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
   Merge Key: derived_col1, derived_col2
   ->  XN Network  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
         Send to leader
         ->  XN Sort  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
               Sort Key: derived_col1, derived_col2
               ->  XN HashAggregate  (cost=60136.52..60138.02 rows=200 width=100)
                     ->  XN Subquery Scan global_lineitem  (cost=20037.51..60130.52 rows=600 width=100)
                           ->  XN Append  (cost=20037.51..60124.52 rows=600 width=52)
                                 ->  XN Subquery Scan "*SELECT* 1"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan useast.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 2"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan uswest.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 3"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan europe.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
(24 rows)

データレイク、データウェアハウス、およびライブ運用データを組み合わせる

この次のユースケースでは、Amazon Redshift Spectrum 履歴データを、Amazon Redshift の現在のデータと PostgreSQL のライブデータと結合します。3TB TPC-DS データセット を使用して、1998 年から 2001 年までのデータを Amazon Redshift の store_sales テーブルから Amazon S3 にアンロードします。アンロードされたファイルは、パーティションキーとして ss_sold_date_sk を使用して Parquet 形式で保存されます。

Amazon Redshift Spectrum を介してこの履歴データにアクセスするには、外部テーブルを作成します。次のコードを参照してください。

CREATE EXTERNAL TABLE spectrum.store_sales_historical
(
  ss_sold_time_sk int ,
  ss_item_sk int ,
  ss_customer_sk int ,
  ss_cdemo_sk int ,
  ss_hdemo_sk int ,
  ss_addr_sk int ,
  ss_store_sk int ,
  ss_promo_sk int ,
  ss_ticket_number bigint,
  ss_quantity int ,
  ss_wholesale_cost numeric(7,2) ,
  ss_list_price numeric(7,2) ,
  ss_sales_price numeric(7,2) ,
  ss_ext_discount_amt numeric(7,2) ,
  ss_ext_sales_price numeric(7,2) ,
  ss_ext_wholesale_cost numeric(7,2) ,
  ss_ext_list_price numeric(7,2) ,
  ss_ext_tax numeric(7,2) ,
  ss_coupon_amt numeric(7,2) ,
  ss_net_paid numeric(7,2) ,
  ss_net_paid_inc_tax numeric(7,2) ,
  ss_net_profit numeric(7,2)
)
PARTITIONED BY (ss_sold_date_sk int)
STORED AS PARQUET
LOCATION 's3://mysamplebucket/historical_store_sales/';   

外部スペクトルスキーマは次のように定義されます。

CREATE EXTERNAL SCHEMA spectrum
FROM data catalog DATABASE 'spectrumdb'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

Amazon S3 読み取り専用ポリシーの代わりに、IAM ロール mySpectrumRole には AmazonS3FullAccessAWSGlueConsoleFullAccess の両方のポリシーが含まれ、前者では Amazon Redshift が Amazon S3 に書き込みを許可しています。次のコードを参照してください。

UNLOAD ('SELECT * FROM tpcds.store_sales WHERE ss_sold_date_sk < 2452276')
TO 's3://mysamplebucket/historical_store_sales/'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
FORMAT AS PARQUET
PARTITION BY (ss_sold_date_sk) ALLOWOVERWRITE;

To make partitioned data visible, the ALTER TABLE ...ADD PARTITION コマンドは、すべてのパーティション値を指定する必要があります。このユースケースでは、2450816 から 2452275 はそれぞれ、日付 1998-01-02 から 2001-12-31 に対応しています。これらの DDL をすばやく生成するには、次のコードを使用します。

WITH partitions AS (SELECT * FROM generate_series(2450816, 2452275))
SELECT 'ALTER TABLE spectrum.store_sales_historical ADD PARTITION (ss_sold_date_sk='|| generate_series || ') '
    || 'LOCATION \'s3://mysamplebucket/historical_store_sales/ss_sold_date_sk=' || generate_series || '/\';'
FROM partitions;

生成された ALTER TABLE ステートメントを個別にまたはバッチとして実行して、パーティションデータを表示できます。次のコードを参照してください。

ALTER TABLE spectrum.store_sales_historical 
ADD PARTITION (ss_sold_date_sk=2450816)
LOCATION 's3://mysamplebucket/historical_store_sales/ss_sold_date_sk=2450816/';
-- repeated for all partition values

次のビューの 3 つの結合されたソースは、1998 年から 2001 年の Amazon S3 の履歴データ、2002 年の Amazon Redshift のローカルの現在のデータ、および PostgreSQL の 2003 年の 2 か月のライブデータで構成されています。この遅延バインディングビューを作成するとき、前の UNLOAD 操作でパーティションキーとして ss_sold_date_sk を指定して列の順序を最後にシフトしたため、Amazon Redshift Spectrum 外部テーブル列を並べ替える必要があります。次のコードを参照してください。

CREATE VIEW store_sales_integrated AS
SELECT * FROM uswest.store_sales_live
UNION ALL
SELECT * FROM tpcds.store_sales_current
UNION ALL
SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk,
       ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number,
       ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price,
       ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost,
       ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid,
       ss_net_paid_inc_tax, ss_net_profit
FROM spectrum.store_sales_historical
WITH NO SCHEMA BINDING;

ビューでクエリを実行して、日付を集計し、3 つのソース間でテーブルを結合できます。次のコードを参照してください。

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1
;
 date_part |   count
-----------+------------
      1998 | 1632403114
      1999 | 1650163390
      2000 | 1659168880
      2001 | 1641184375
      2002 | 1650209644
      2003 |   17994540
(6 rows)

Time: 77624.926 ms (01:17.625)

この次の横串検索は 2 ノードの DC2.8XL クラスターで実行され、Amazon S3、PostgreSQL、Amazon Redshift のストアの売上高と、Amazon Redshift の日付ディメンションテーブルを組み合わせて行数を年ごとに集計およびソートするのに 1 分 17 秒かかりました。

dev=# EXPLAIN SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1;

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=461036314645.93..461036315011.18 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=913.11..428113374829.91 rows=6584587963204 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=4)
                                 ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=4)
                                       ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=4)
                                             ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=4)
                                                   ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=4)
                                       ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=4)
                                             ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=4)
                                       ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=4)
                                             ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=4)
                                                   ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                   ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=0)
                                                         ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=0)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(23 rows)

クエリプランは、これらが 3 つのソーステーブルで実行されるフルシーケンシャルスキャンであり、返された行の数が強調表示され、合計が 82 億であることを示しています。Amazon Redshift Spectrum は外部テーブルの統計を生成しないため、numRows プロパティを Amazon S3 の履歴データの行数に手動で設定します。次のコードを参照してください。

ALTER TABLE spectrum.store_sales_historical SET TABLE PROPERTIES ('numRows' = '6582919759');

Amazon Redshift にローカルな別のディメンションテーブル (今回は 3,000 万行の customer テーブル) と結合し、列 c_birth_country でフィルターできます。次のコードを参照してください。

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
JOIN tpcds.customer c on (a.ss_customer_sk = c.c_customer_sk)
AND c.c_birth_country = 'UNITED STATES'
GROUP BY 1
ORDER BY 1
;
 date_part |  count
-----------+---------
      1998 | 7299277
      1999 | 7392156
      2000 | 7416905
      2001 | 7347920
      2002 | 7390590
      2003 |   81627
(6 rows)

Time: 77878.586 ms (01:17.879)

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=363288854947.85..363288855313.09 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=376252.50..363139873158.03 rows=29796357965 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=375339.39..362394963295.79 rows=29796357965 width=4)
                                 Hash Cond: ("outer".ss_customer_sk = "inner".c_customer_sk)
                                 ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=8)
                                       ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=8)
                                             ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=8)
                                                   ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=8)
                                                         ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=8)
                                             ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=8)
                                                   ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=8)
                                             ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=8)
                                                   ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=8)
                                                         ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                         ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=4)
                                                               ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=4)
                                 ->  XN Hash  (cost=375000.00..375000.00 rows=135755 width=4)
                                       ->  XN Seq Scan on customer c  (cost=0.00..375000.00 rows=135755 width=4)
                                             Filter: ((c_birth_country)::text = 'UNITED STATES'::text)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(28 rows)

クエリのパフォーマンスは、前のクエリとほとんど変わりません。クエリは 1 つの列 (ss_sold_date_sk) のみをスキャンしたため、履歴データサブクエリの Parquet の列構造が役立ちます。つまり、履歴データが CSV として保存されている場合、すべてのデータがスキャンされるため、パフォーマンスが大幅に低下します。

さらに、TPC-DS モデルは、日付値を store_sales ファクトテーブルに格納しません。代わりに、外部キーは date_dim テーブルを参照します。類似した何かを実装する予定があるが、頻繁に日付列でフィルターする場合は、その列をファクトテーブルに追加し、それをソートキーとして使用し、さらに Amazon Redshift Spectrum にパーティション列を追加することを検討してください。そうすることで、Amazon Redshift は、ローカルデータのブロックをより効率的にスキップし、Amazon S3 データのパーティションをプルーニングし、後者の場合、フィルタリング基準を Amazon Redshift Spectrum にプッシュすることができます。

まとめ

実際のシナリオでのライブデータ統合のアプリケーションには、データ検出、機械学習のためのデータ準備、運用分析、IoT テレメトリ分析、不正検出、コンプライアンスおよびセキュリティ監査が含まれます。Amazon Redshift Spectrum は、Amazon Redshift の範囲を AWS データレイクに拡張しますが、横串検索は、その範囲を運用データベースなどに拡張します。

これらのデータベース間のデータ型の違いの詳細については、「Amazon Redshift とサポートされている RDS PostgreSQL または Aurora PostgreSQL データベース間のデータ型の違い」を参照してください。Amazon Redshift を使用して統合データにアクセスする方法の詳細については、「Amazon Redshift で統合データにアクセスする際の制限と考慮事項」を参照してください。

 


著者について

Tito Mijares は、AWS のデータウェアハウススペシャリストソリューションアーキテクトです。 AWS のお客様が Amazon Redshift を採用してその使用を最適化するのをサポートしています。社外では、ジャズギターを演奏し、オーディオの録音や再生プロジェクトに取り組んでいます。

 

 

 

Entong Shen は、AWS Redshift のシニアソフトウェア開発エンジニアです。 彼は 8 年以上 MPP データベースの仕事をしており、クエリの最適化、統計、それに ストアドプロシージャや横串検索などの SQL 言語の機能に重点的に取り組んでいます。余暇は、あらゆるジャンルの音楽を聴いたり、多肉植物庭園で庭仕事をして過ごします。

 

 

 

Niranjan Kamat は、Amazon Redshift クエリ処理チームのソフトウェアエンジニアです。 彼の PhD 研究の焦点は、大規模なデータベースに対するインタラクティブなクエリでした。Redshift では、クエリの最適化、コマンドと統計の分析、横串検索など、さまざまなクエリ処理領域で働いています。余暇には、3 歳の娘と遊んだり、卓球を練習したり (オハイオ州でトップ 10 にランクイン、USATT レーティング 2143)、チェスを楽しんでいます。

 

 

Sriram Krishnamurthy は、AWS Redshift クエリ処理チームのソフトウェア開発マネージャーです。 彼はデータベースに情熱を傾けており、半構造化データ処理と SQL のコンパイルと実行に 15 年以上携わっています。余暇にはよく娘 2 人を連れてテニスを楽しんでいます。