Amazon Web Services ブログ

Amazon Kinesis Data Firehose、Amazon Athena、Amazon Redshift を使用して Apache Parquet 最適化データを分析する

Amazon Kinesis Data Firehoseは、 Amazon S3で構築されたデータを取得し、データレイクへデータを流し入れる最も簡単な方法です。このデータは、AWS CloudTrail ログファイル、Amazon VPC フローログ、Application Load BalancerなどのAWSのサービスによるものです。IoTイベント、ゲームイベントなどにもなります。このデータを効率的に参照するには、時間のかかるETL (抽出、転送、ロード)処理が必要です。最適なファイルフォーマットへデータを抽出、変換することで、インサイトへの時間が増えます。この状況は、特に時間とともに価値が失われるリアルタイムデータ向けには最適とはいえません。

この通常の問題を解決するために、Kinesis Data Firehoseでは、Apache Parquet または Apache ORC フォーマット内でAmazon S3へデータを保存できます。S3でデータを参照する場合、最適なパフォーマンスとコスト効率に優れた、推奨される最適なカラムナフォーマットがあります。これにより、Amazon AthenaAmazon RedshiftAWS GlueAmazon EMR、またはAWS パートナーネットワークから、またオープンソースコミュニティから利用できるその他のビッグデータツールを使用する場合に、直接的なメリットがあります。

Amazon Connectは、使いやすい、クラウドベースの連絡センターサービスです。通常のサービスよりも低コストで優れた顧客体験を簡単に提供することができます。オープンプラットフォーム設計により、他のシステムと簡単に統合できます。その中のシステムの1つとして、Kinesis Data Streams および Kinesis Data Firehose内のAmazon Kinesisがあります。

Apache Parquetフォーマット内の Amazon ConnectからS3までイベントを保存できることは素晴らしいことです。リアルタイムに Amazon Athena および Amazon Redshift Spectrumを使用して、この主なパフォーマンスとコスト最適化を利用した分析を実行できます。もちろん、Amazon Connectはその1例です。これにより、特に、組織がデータレイクを継続的に構築する場合、最適な機会が開けるでしょう。

Amazon Connectには、管理者ダッシュボード内に一連の分析表示が含まれています。しかし、その他の種類の分析を実行したい場合もあるでしょう。この投稿では、Kinesis Data Streams、Kinesis Data Firehose、S3を経由してAmazon Connectからのデータストリームを設定し、Athena および Amazon Redshift Spectrumを使用して分析を実施する方法を説明しています。Parquet およびAWS Glue Data Catalog、Amazon Athenaおよび Amazon Redshiftとの統合向けのKinesis Data Firehoseサポートに焦点を当てています。

ソリューションの概要

以下はソリューションを設定する方法です:

 

 

以下のセクションでは、パイプラインを設定する各ステップを説明しています。

1.スキーマを定義

Kinesis Data Firehoseでは、受信データを処理し、そのデータをParquetへ変換する場合、適用されるスキーマを確認する必要があります。これは、多くの場合、受信イベントには、作成者が宣伝している価値に基づき期待フィールドの全て、または一部が含まれているためです。通常のプロセスは、簡単に理解かつ参照できる継続的なスキーマにするために、バッチETLジョブ中のスキーマを最適化することです。これを実行することで、バッチプロセスの本質により、潜在時間が導入されます。この問題を克服するために、Kinesis Data Firehoseでは、事前に定義されたスキーマを必要としています。

利用可能なカラムおよび構成を確認するには、Amazon Connect Agent Event Streamsにアクセスしてください。簡単にするために、ネスト構造を作成するのではなく、全てStriingタイプのカラムを作成することを選択しました。しかし、必要に応じて、変更することもできます。

スキーマを定義するための最も簡単な方法は、Amazon Athenaコンソール内でテーブルを作成することです。Athenaコンソールを開き、以下のテーブル作成記述を貼り付け、自分のイベントデータが保存されるよう、自分のS3バケットおよびプレフィックスに置き換えます。データカタログデータベースは、作成できる異なるテーブルを保有する論理的コンテナです。デフォルトのデータベース名は、すでに存在しています。ない場合、名前を作成するか、すでに作成された別のデータベースを使用してください。

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

Kinesis Data Firehose向けのスキーマの準備はこれだけです。

2.データストリームの定義

次に、Amazon Connectイベントのストリームに使用されるKinesisデータストリームを定義する必要があります。  Kinesisデータストリームコンソールを開き、2つのストリームを作成します。  今は多くのデータがないため、それぞれ1つのシャードで構成できます。

3.Parquet向けのKinesis Data Firehose デリバーストリームを定義します

ソースとしてデータストリーム、アウトプットとしてAmazon S3を使用して Data Firehoseデリバリーストリームを構成しましょう。Kinesis Data Firehoseコンソールを開くことで開始し、新しいデータデリバリーストリームを作成します。名前を付け、ステップ2で作成したKinesisデータストリームに関連付けます。

以下のスクリーンショットで表示されるとおり、Record フォーマット変換 (1) および Apache Parquet (2)を選択します。ご覧のとおり、 Apache ORCもサポートされています。下へスクロールし、AWS Glue Data Catalog データベース名 (3)およびステップ1で作成したテーブル名(4)を提供します。[Next] (次へ) を選択する

簡単にするために、ステップ1のテーブル記述で作成したLOCATIONパラメータで定義された値を使用して、アウトプットS3 バケットおよびプレフィックスフィールドが自動で投入されます。よくできました。さらに、ソースレコード S3バックアップセクション内で定義されるとおり、別のロケーションへ未定義のイベントを保存するためのオプションがあります。Data Firehoseは、プレフィックス内のデータパーティションを作成するため、続いて前にスラッシュ“ / “ を付け加えるのを忘れないでください。

次のページの、S3バッファ条件セクション内では、ラージバッファサイズの構成に関する記述があります。Parquetファイルフォーマットは、データを保存および圧縮する際に非常に効率的です。バッファサイズを増加させることで、各アウトプットガイルに多くの列をパックできます。これにより、Parquetからメリットを得ることができます。

Snappyを使用した圧縮が、ParquetおよびORC用に自動で有効になります。Kinesis Data Firehose APIを使用して圧縮アルゴリズムを変更し、OutputFormatConfigurationをアップデートできます。

発生するかもしれない問題をデバックするために、Amazon CloudWatch logsを有効にすることもできます。

最後に、Firehoseデリバリーストリームの作成を終了し、次のセクションへ進みます。

4.Amazon Connect連絡センターの設定

Kinesisパイプラインを設定した後、Amazon Connectで連絡センターを設定します。はじめにページでは、環境の設定、電話番号の取得、電話を受けるエージェントの作成などの方法の説明が掲載されています。

連絡センターを設定した後、Amazon Connectコンソールで、Instance Aliasを選択し、Data Streamingを選択します。Agent Event下から、ステップ2で作成したKinesisデータストリームを選択し、Saveを選択します。

この時点で、パイプラインは完了です。  Amazon Connectからのエージェントイベントは、エージェントがその日に実行している日で作成されています。Kinesisデータストリームを介してKinesis Data Firehoseにイベントが送信されます。JSONからParquetまでイベントデータが変換され、S3内に保存されます。Athena および Amazon Redshift Spectrumは、追加の作業なしに簡単にデータを参照できます。

データを作成してみましょう。Amazon Connect連絡センターの管理コンソールに戻り、受信コールに対応するエージェントを作成します。この例では、Agent Oneという名前を付けました。作成した後、Agent One は作業したり、コンソールにログインできます。利用可能に設定すると受信コールを受ける準備ができています。

データを興味深いものにするため、2番目のエージェントAgent Twoも作成しました。次に、受信および発信コールをすると、不具合が発生しました。そこで、分析のための利用可能なデータがあります。

5.Athenaでデータを分析

Athenaコンソールを開き、クエリを実行します。データセットのスキーマを作成した場合、ドキュメンテーション内で複雑な構造であってもフィールドをStringsに定義したことに気づくでしょう。  これは、JSONデータを解析できるようAthenaの柔軟性を示すためのものです。ただし、Kinesis Data FirehoseはParquetファイルへ適切なスキーマを適用するようテーブルスキーマ内でネスト構造を定義できます。

エージェントがシステムへログインしたか確認するため、最初のクエリを実行しましょう。

クエリは複雑ですが、簡単なものです。

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot,
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot,
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot,
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot,
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot,
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot,
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot,
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM データセット
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

クエリアウトプットは次のようになります:

各エージェントが実施しているセッションを示す別のクエリがあります。受信また発信した場合、完了した場合、コールに失敗した場合などが示されています。

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM データセット
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

クエリアウトプットは以下と同様になります:

6.Amazon Redshift Spectrumでデータを分析

Amazon Redshift Spectrumで、既存のAmazon Redshift データウェアハウスクラスターを使用したS3内に直接データを参照できます。データはParquetフォーマットであるため、Redshift Spectrumは、Athenaの素晴らしいメリットと同様のメリットを得られます。

Amazon Redshiftからの同様のデータを参照するための簡単なクエリがあります。これを実行するには、AWS Glue Data Catalogへポイントを示すAmazon Redshift内の外部スキーマを作成する必要があります。

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

以下はクエリアウトプットを示しています:

まとめ

この投稿では、Kinesis Data Firehoseを使用してカラムナファイルフォーマットへデータを取り込み、変換し、Athena および Amazon Redshiftを使用してリアルタイム分析を実行する方法を示しています。これは、膨大なデータを保存および分析する場合に必要となるコストやパフォーマンスのレベルを最適化するための機能です。これにより、AWSにおいてデータレイク構築に投資する場合に、同様に重要となります。

 


その他の参考資料

この投稿が有用であるとお考えの場合、Amazon Kinesis Firehose、Amazon AthenaおよびAmazon QuickSightによるVPC Flow Logs with の分析 AWS Glueのパーティション分割されたデータを使用した作業をご覧ください。


著者について

Roy Hasson は、AWS Analytics のグローバルビジネス開発マネージャーです。彼は世界中の顧客と協力して、データ処理、分析、ビジネスインテリジェンスのニーズを満たすソリューションを設計しています。Roy は、マンチェスターユナイテッドの大ファンであり、家族とともにチームを応援しています。