Amazon Web Services ブログ

AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する

AWS CloudTrail ログを簡単に視覚化できることは、AWS インフラストラクチャがどのように使用されているかについてより良い理解を提供してくれます。また、AWS API コールの監査とレビューを行って、AWS アカウント内のセキュリティ異常を検知するためにも役立ちます。これを行うには、CloudTrail ログに基づいた分析を実行できる必要があります。

この記事では、Amazon S3 内の AWS CloudTrail ログを JSON 形式からクエリ用に最適化された形式のデータセットに変換するための AWS GlueAWS Lambda の使用について詳しく説明します。その後、Amazon Athena と Amazon QuickSight を使用してデータをクエリし、視覚化します。

ソリューションの概要

CloudTrail ログを処理するには、以下のアーキテクチャを実装する必要があります。

CloudTrail は Amazon S3 バケットフォルダにログファイルを配信します。これらのログを正しくクロールするには、S3 バケットの単一フォルダ内に変換済みファイルを格納する Amazon S3 によってトリガーされる Lambda 関数を使ってファイルコンテンツとフォルダ構造を変更します。ファイルが単一のフォルダ内にある場合、AWS Glue はデータをスキャンし、それを Apache Parquet フォーマットに変換して、Amazon Athena と Amazon QuickSight を使用したクエリと視覚化を可能にするためにカタログ登録します。

 

チュートリアル

ソリューションを構築するために必要なステップを見て行きましょう。

CloudTrail ログのセットアップ

最初に、S3 バケットにログファイルを配信する証跡をセットアップする必要があります。CloudTrail で証跡を作成するには、証跡の作成にある手順に従ってください。

作成が終わったら、証跡設定ページが以下のスクリーンショットのようになります。

この例では、 cloudtraillfcaro バケットに配信されるようにログファイルをセットアップします。

Lambda を使用した CloudTrail レポートの単一フォルダへの統合

AWS CloudTrail は、以下のフォルダ構造を使って設定済みの Amazon S3 バケット内にログファイルを配信します。

AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/HOUR/filename.json.gz

さらに、ログファイルには以下の構造があります。

{
    "Records": [{
        "eventVersion": "1.01",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": "AIDAJDPLRKLG7UEXAMPLE",
            "arn": "arn:aws:iam::123456789012:user/Alice",
            "accountId": "123456789012",
            "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
            "userName": "Alice",
            "sessionContext": {
                "attributes": {
                    "mfaAuthenticated": "false",
                    "creationDate": "2014-03-18T14:29:23Z"
                }
            }
        },
        "eventTime": "2014-03-18T14:30:07Z",
        "eventSource": "cloudtrail.amazonaws.com",
        "eventName": "StartLogging",
        "awsRegion": "us-west-2",
        "sourceIPAddress": "72.21.198.64",
        "userAgent": "signin.amazonaws.com",
        "requestParameters": {
            "name": "Default"
        },
        "responseElements": null,
        "requestID": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c",
        "eventID": "3074414d-c626-42aa-984b-68ff152d6ab7"
    },
    ... additional entries ...
    ]

ファイルが書き込まれる時に AWS Glue クローラーを使用してこれらをカタログ登録すると、以下の障害が生じます。

  1. フォルダが従来のパーティション形式に従っていないため、AWS Glue が異なるフォルダごとに異なるテーブルを認識する。
  2. ファイルコンテンツの構造に基づいて、AWS Glue がテーブルを array 型の単一カラムを持つものとして識別する。
  3. CloudTrail ログに大文字を使う JSON 属性がある。Best Practices When Using Athena with AWS Glue によると、これらを小文字に変換することが推奨されます。

AWS Glue が、すべてのカラムに各イベントの説明がある単一のテーブル内にすべてのログファイルをカタログ登録するようにするには、以下の Lambda 関数を実装します。

from __future__ import print_function
import json
import urllib
import boto3
import gzip

s3 = boto3.resource('s3')
client = boto3.client('s3')

def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = key.lower()
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj


def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print(bucket)
    print(key)
    try:
        newKey = 'flatfiles/' + key.replace("/", "")
        client.download_file(bucket, key, '/tmp/file.json.gz')
        with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file:
            i = 0
            for line in file: 
                for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['records']:
            		if i != 0:
            		    output.write("\n")
            		output.write(json.dumps(record))
            		i += 1
        client.upload_file('/tmp/out.json.gz', bucket,newKey)
        return "success"
    except Exception as e:
        print(e)
        print('Error processing object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

関数はレコード配列の各要素をチェックし、カラム名で大文字を小文字に変換して、配列の各要素を新しいファイルの単一行として挿入します。新しいファイルは S3 バケットに、サブフォルダなしで、関数によって作成された flatfiles フォルダ内に保存されます。

関数には、少なくとも以下のパーミッションを持つポリシーが含まれたロールがあるようにしてください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::cloudtraillfcaro/*",
                "arn:aws:s3:::cloudtraillfcaro"
            ],
            "Effect": "Allow"
        }
    ]
}

この例では、CloudTrail がログを cloudtraillfcaro バケットに配信します。この名前は、ポリシー内のバケット名に置き替えるようにしてください。インラインポリシーの操作方法の詳細については、IAM ポリシーを管理するを参照してください。

Lambda 関数を作成したら、AWS Lambda コンソールの [トリガー] タブを使って以下のトリガーをセットアップできます。

[トリガーを追加] を選択し、トリガーのソースとして [S3] を選択します。

ソースを選択したら、以下の設定を行います。

このトリガーでは、ログファイルのパス (この場合は AWSLogs/119582755581/CloudTrail/—is ) に書き込まれたすべてのファイルが処理されます。[トリガーの有効化] チェックボックスが選択されており、バケットとプレフィックスパラメータがユースケースに一致することを確認してください。

関数をセットアップしてログファイルを受け取ったら、バケット (この場合は cloudtraillfcaro) には flatfiles フォルダ内に処理済みのファイルが含まれているはずです。

ソースデータのカタログ登録

Lambda 関数でファイルが処理されたら、それらをカタログ登録するために cloudtrail という名前のクローラーをセットアップします。

クローラーは flatfiles フォルダをポイントする必要があります。

このソリューションのために作成されたすべてのクローラーと AWS Glue ジョブには、 AWSGlueServiceRole マネージドポリシーと、Lambda 関数で使用された S3 バケットを変更するパーミッションを持つインラインポリシーがある必要があります。詳細については、IAM ポリシーを管理するを参照してください。

ロールは以下のようになります。

この例では、 s3perms という名前のインラインポリシーに S3 バケットを変更するためのパーミッションが含まれています。

ロールを選択した後、オンデマンドで実行されるようにクローラーをスケジュールできます。

新しいデータベースが作成され、それを使うようにクローラが設定されます。この場合、 cloudtrail データベースがすべてのテーブルに使用されます。

クローラーが実行された後、以下の構造でカタログに単一のテーブルが作成されているはずです。

テーブルには以下のカラムが含まれています。

AWS Glue ジョブの作成と実行

すべての CloudTrail ログを Parquet の列指向ストアに変換するには、以下のステップに従って AWS Glue ジョブをセットアップしてください。

以下のスクリプトを Amazon S3 のバケットにアップロードします。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrail", table_name = "flatfiles", transformation_ctx = "datasource0")
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail")
datasink = glueContext.write_dynamic_frame.from_options(frame = relationalized1, connection_type = "s3", connection_options = {"path": "s3://cloudtraillfcaro/parquettrails"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

この例では、スクリプトを cloudtrailtoparquet.py という名前のファイルとしてロードします。スクリプトを変更して、結果を保存したい宛先に 「{"path": "s3://cloudtraillfcaro/parquettrails"}」 をアップデートするようにしてください。

スクリプトをロードしたら、新しい AWS Glue ジョブを追加します。ジョブの名前とロールを選択して、[An existing script that you provide] からジョブを実行するオプションを選択します。

同じデータを 2 度処理しないようにするため、Job Properties の [Advanced properties] セクションで [Job bookmark] 設定を有効化してください。

[Next] を 2 度選択してから [Finish] を選択します。

ログがすでに flatfiles フォルダ内にある場合、オンデマンドでジョブを実行して結果の最初のセットを生成できます。

ジョブが実行され始めたら、完了するまで待ちます。

ジョブが完了したら、ジョブの [Run status] が [Succeeded] になります。その後、Parquet ファイルが Amazon S3 ロケーションに書き込まれていることを検証できます。

結果のカタログ登録

Athena から結果を処理できるようにするには、AWS Glue クローラーを使用して AWS Glue ジョブの結果をカタログ登録することができます。

この例では、cloudtrail という名前のソースと同じデータベースを使用するようにクローラーが設定されています。

クローラーはコンソールを使用して実行できます。クローラーが動作を完了し、Parquet 結果を処理したら、AWS Glue データカタログに新しいテーブルが作成されているはずです。この例では、 parquettrails と命名されています。

テーブルでは [Classification] が parquet に設定されているようにします。

これには flatfiles テーブルと同じカラムがありますが、 struct 型カラムは例外で、複数のカラムにリレーショナル化されているはずです。

この例では、 requestparameters カラム、つまり元のテーブル (flatfiles) の構造体であったカラムが、複数のカラム (その中のキー値ごとにひとつ) に変換されていることに注意してください。これは、relationalize と呼ばれる AWS Glue にネイティブの変換を使って行われます。

Athena でのクエリ結果

結果をクロールした後は、Athena を使ってそれらをクエリできます。例えば、2017-10-23t12:00:00 から 2017-10-23t13:00 の時間枠でどのようなイベントが発生したかをクエリするには、次の select ステートメントを使用します。

select *
from cloudtrail.parquettrails
where eventtime > '2017-10-23T12:00:00Z' AND eventtime < '2017-10-23T13:00:00Z'
order by eventtime asc;

cloudtrail.parquettrails は、お使いのデータベースと、Parquet 結果を参照するテーブルの名前に置き替えるようにしてください。 datetimes を、アカウントにアクティビティが発生し、AWS Glue ジョブによって処理された時間に置き替えてください。

Amazon QuickSight を使用した結果の視覚化

Athena を使ってデータのクエリができたら、Amazon QuickSight を使ってそれを視覚化することができます。Amazon QuickSight を Athena に接続する前に、Athena とアカウント内の関連する S3 バケットに QuickSight アクセス権を付与するようにしてください。詳細については、Amazon QuickSight の AWS リソースへのアクセス権限の管理を参照してください。次に、作成した Athena テーブルに基づいて Amazon QuickSight に新しいデータセットを作成できます。

パーミッションのセットアップ後は、[New analysis] を選択して Amazon QuickSight で新規の分析を作成できます。

次に、新しいデータセットを追加します。

ソースとして Athena を選択します。

データソースに名前を付けます (この場合は cloudtrailと命名しました)。

データベースの名前と Parquet 結果を参照しているテーブルを選択します。

その後、[Visualize] を選択します。

そうすると、以下の画面が表示されます。

これで視覚化を実行できるようになりました。まず sourceipaddress カラムを検索し、それを [AutoGraph] セクションにドラッグします。

AWS とのやり取りに使用した IP アドレスのリストが表示されます。これらの IP アドレス が IAM ユーザー、内部 AWS サービス、またはロールから使用されたかどうかを確認するには、元のログファイルの useridentity フィールド内にあるタイプ値を使用します。 Relationalize 変換のおかげで、この値は useridentity.type カラムとして使用できるようになっています。このカラムが Group/Color] ボックスに追加されたら、視覚化は以下のようになります。

これで、最もよく使用されている IP と、それらがロール、AWS サービス、または IAM ユーザーのどちらから使用されているかを確認し、識別することができるようになりました。

これらすべてのステップを行った後、Amazon QuickSight を使用して CloudTrail から異なるカラムを追加し、異なるタイプの視覚化を実行することができます。AWS インフラストラクチャの使用状況とアクセスを継続的にモニタリングするオペレーショナルダッシュボードを構築でき、これらのダッシュボードは、このデータを確認する必要がある組織内の人と共有することができます。

まとめ

このポストでは、Athena のクエリパフォーマンスとデータ圧縮を改善するために、シンプルな Lambda 関数と AWS Glue スクリプトを使ってテキストファイルを Parquet に変換する方法を見てきました。また、AWS Lambda を使って Amazon S3 内のファイルを事前処理し、AWS Glue クローラーが認識できる形式に変換する方法も説明しました。

この例では AWS CloudTrail ログを使用しましたが、提案したソリューションは、事前処理後に AWS Glue でカタログ登録できるどのファイルセットにも適用できます。


その他の参考資料

Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight での手法を学んでください。


About the Author

Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

{{{items.0.additionalFields.title}}}