Amazon Web Services ブログ

AWS Lambda を使用して Amazon S3 ログバケットから Splunk へログをフィルタリングしてストリーミングする

この記事は Ameya Paldhikar(パートナーソリューションアーキテクト)と Marc Luescher(シニアソリューションアーキテクト)によって執筆された内容を日本語化したものです。原文は「Filter and Stream Logs from Amazon S3 Logging Buckets into Splunk Using AWS Lambda」を参照してください。

Splunk-AWS-Partners-2022
Splunk
Connect with Splunk-1.1

AWS のお客様 (スタートアップ企業から大企業まで) は複数の AWS アカウントを管理しています。マルチアカウントの管理において、AWS が提供する AWS Prescriptive Guidance(AWS規範ガイダンス)に従って、一般にお客様は複数の AWS アカウントにまたがる AWS ログソース (AWS CloudTrail ログ、VPC フローログ、AWS Config ログ) を専用のログアーカイブアカウントの Amazon Simple Storage Service (Amazon S3) バケットに一元化することを一般的に選択しています。

これらの一元化された S3 バケットに保存されるログの量は、AWS アカウントの数やワークロードの規模によっては、非常に多くなる可能性があります (1 日あたり数 TB) 。S3 バケットから Splunk にログを取り込むには、通常、Splunk add-on for AWS を使用します。これは Splunk Heavy Forwarder 上にデプロイされ、データを S3 から取り込むために独立してポーリングを行います。

これらのサーバーは、ニアリアルタイムのログの取り込みをサポートするために、ログの取り込み量の増加に応じて水平方向にスケールアウトする機能も必要です。この方法は、デプロイメントを管理するための追加のオーバーヘッドがあり、専用のインフラストラクチャを実行するためコストの増加も発生します。

Splunk の取り込みデータ量ベースのライセンスコストを最適化するため、S3 バケットからログのサブセットのみをフィルタリングして Splunk に転送したい場合も考えられます。具体例としては、VPC フローログのうちフィールド “action” == “REJECT” に合致する、拒否されたトラフィックのみを取り込むことが挙げられます。プルベースのログ取り込みアプローチでは、現時点でそれを実現する方法は提供されていません。

この記事では、AWS Lambda を活用したプッシュメカニズムによって、一元化された Amazon S3 ログバケットから Splunk にログをフィルタリングしてストリーミングする方法を紹介します。プッシュメカニズムには、運用オーバーヘッドが低く、コストが安く、自動スケーリングが可能といった利点があります。Virtual Private Cloud (VPC) フローログで “action” フラグが “REJECT” に設定されているものをフィルタリングし、Splunk HTTP Event Collector (HEC) エンドポイント経由で Splunk に送信するための手順とサンプルの Lambda コードを提示します。

Splunk はクラウドオペレーション、データと分析、DevOps など多くの分野でコンピテンシーを有する AWS スペシャライゼーションパートナーであり、AWS Marketplace セラー です。大手組織では、デジタルシステムをセキュアで信頼性の高いものにするために、Splunk の統合セキュリティとオブザーバビリティプラットフォームを活用しています。

アーキテクチャ

図 1 のアーキテクチャ図は、AWS Lambda を使用して VPC フローログを Splunk に取り込むプロセスを示しています。

Architecture for Splunk Ingestion using AWS Lambda

図 1 – AWS Lambda を使用した Splunk へのログ取り込みのアーキテクチャ

  1. 1 つまたは複数の AWS アカウントの VPC フローログは、ログアーカイブ用 AWS アカウント内の S3 ログバケットに集約されます。
  2. S3 バケットは、バケットに保存された各オブジェクトに対して “object create” イベント通知を Amazon Simple Queue Service (SQS) キューに送信します。
  3. Lambda 関数が作成され、関数のイベントソースとして Amazon SQS が設定されます。この関数は SQS からのメッセージをバッチでポーリングし、各イベント通知の内容を読み取り、オブジェクトキーと対応する S3 バケット名を特定します。
  4. 次に、この関数は S3 バケットに “GetObject” 呼び出しを実行し、オブジェクトを取得します。Lambda 関数は、”action” フラグが “REJECT” ではないイベントをフィルタリングして除外します。
  5. Lambda 関数は、フィルタリングされた VPC フローログを Splunk HTTP Event Collector にストリーミングします。
  6. VPC フローログが Splunk に取り込まれ、Splunk を使用して検索可能になります。
  7. Splunk が利用できない場合や、ログ転送中にエラーが発生した場合、Lambda 関数はそれらのイベントをバックアップ用の S3 バケットに転送します。

前提条件

次の前提条件を満たす必要があります。

  • VPC フローログを Amazon S3 に出力する – AWS アカウント内の S3 バケットに VPC フローログを出力するように 設定します。
  • VPC フローログを取り込むための Splunk のインデックスを作成します。

Step 1: Splunk HTTP Event Collector (HEC) の設定

まずはじめに、AWS サービスに対して Splunk へデータを転送するための設定をする前に、Splunk HEC に対してデータを受信するための設定をする必要があります。

  • Splunk Web にアクセスし、Settings をクリックし、Data inputs を選択します。

図 2 – Splunk Data inputs

  • HTTP Event Collector を選択し、New Token を選択します。
  • 以下の図 3 に示す詳細に従って新しいトークンを構成し、Submit をクリックします。Source Typeaws:cloudwatchlogs:vpcflow に設定されていることを確認します。

図 3 – Splunk HEC トークンの設定

  • トークンが作成されたら、Global Settings を選択し、All Tokens が有効になっていることを確認し、Save をクリックしてください。

Step 2: Splunk の設定

次に、Splunk サーバーの props.conf 内に設定を追加して、改行、タイムスタンプ、およびフィールド抽出が正しく設定されていることを確認する必要があります。$SPLUNK_HOME/etc/system/local/ の props.conf に以下の内容をコピーしてください。これらの設定の詳細については、Splunk の props.conf のドキュメントを参照してください。

[aws:cloudwatchlogs:vpcflow]
BREAK_ONLY_BEFORE_DATE = false
CHARSET=UTF-8
disabled=false
pulldown_type = true
SHOULD_LINEMERGE=false
LINE_BREAKER=\'\,(\s+)
NO_BINARY_CHECK=true
KV_MODE = json
TIME_FORMAT = %s
TIME_PREFIX = ^(?>\S+\s){10}
MAX_TIMESTAMP_LOOKAHEAD = 10  # makes sure account_id is not used for timestamp
## Replace unrequired characters from the VPC Flow logs list with blank values
SEDCMD-A = s/\'//g 
SEDCMD-B = s/\"|\,//g

## Extraction of fields within VPC Flow log events ##
EXTRACT-vpcflowlog=^(?<version>2{1})\s+(?<account_id>[^\s]{7,12})\s+(?<interface_id>[^\s]+)\s+(?<src_ip>[^\s]+)\s+(?<dest_ip>[^\s]+)\s+(?<src_port>[^\s]+)\s+(?<dest_port>[^\s]+)\s+(?<protocol_code>[^\s]+)\s+(?P<packets>[^\s]+)\s+(?<bytes>[^\s]+)\s+(?<start_time>[^\s]+)\s+(?<end_time>[^\s]+)\s+(?<vpcflow_action>[^\s]+)\s+(?<log_status>[^\s]+)

Step 3: イベント通知のための SQS キューの作成

新しいオブジェクト (ログファイル) が Amazon S3 バケットに保存されると、イベント通知が SQS キューに転送されます。以下の手順に従って、SQS キューを作成し、一元化された S3 バケットのイベント通知設定を行います。

  • AWS アカウントで Amazon SQS コンソール にアクセスし、キューを作成を選択します。
  • 標準タイプを選択し、キューの名前を入力します。
  • 設定内で、可視性タイムアウト5 分に増やし、メッセージ保持期間を 14 日に増やします。以下のスクリーンショットを参照してください。

図 4 – Amazon SQS の設定

  • キューの保存時の暗号化を有効化するには、暗号化を有効に設定してください。
  • 以下のようにアクセスポリシーを設定して、S3 バケットがこの SQS キューにメッセージを送信できるようにします。<> 内のプレースホルダーを環境に合わせた値に置き換えてください。
    {
        "Version": "2012-10-17",
        "Id": "Queue1_Policy_UUID",
        "Statement": [
          {
            "Sid": "Queue1_Send",
            "Effect": "Allow",
            "Principal": {
              "Service": "s3.amazonaws.com"
            },
            "Action": "sqs:SendMessage",
            "Resource": "<arn_of_this_SQS>",
            "Condition": {
              "StringEquals": {
                "aws:SourceAccount": "<your_AWS_Account_ID>"
              },
              "ArnLike": {
                "aws:SourceArn": "<arn_of_the_log_centralization_S3_bucket>"
              }
            }
          }
        ]
      }
  • デッドレターキューを有効にすると、このキューから処理されなかったメッセージが、より詳しい検査のためにデッドレターキューに転送されます。

Step 4: Amazon S3 イベント通知を SQS に送信

SQS キューが作成されたので、次の手順に沿って VPC フローログ保存用の S3 バケットを構成し、すべてのオブジェクト作成イベントの通知をキューに送信します。

  • Amazon S3 コンソールから、VPC フローログが集約された S3 バケットにアクセスしてください。
  • プロパティタブを選択し、イベント通知までスクロールして、イベント通知を作成を選択してください。
  • 一般的な設定で適切なイベント名を入力してください。イベントタイプの下で、すべてのオブジェクト作成イベントを選択してください。送信先の下で SQS キューを選択し、前の手順で作成した SQS キューを選択してください。変更の保存をクリックすると、構成はこのようになります。

Figure 5 - S3 Event Notification

図 5 – Amazon S3 のイベント通知

Step 5: バックアップ用の Amazon S3 バケットの作成

ここでは、AWS Lambda 関数が Splunk にデータ配信できない場合でも、フィルタリングされたデータが失われないよう、バックアップ用の S3 バケットを作成します。Lambda 関数は、Splunk への配信に失敗した場合、フィルタリングされたログをこのバケットに送信します。このドキュメントの手順に従って S3 バケットを作成してください。

Step 6: Lambda 関数用の AWS IAM ロールの作成

  • AWS IAM コンソールから、ポリシーメニューにアクセスしてポリシーの作成を選択します。
  • ポリシーエディタオプションで JSON を選択し、以下のポリシーを貼り付けます。<> 内のプレースホルダーを環境に合わせた値に置き換えてください。完了したら次へをクリックします。
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "lambdaPutObject",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::<your_backsplash_s3_name>/*"
            },
            {
                "Sid": "lambdaGetObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<your_log_centralization_s3_name>/*"
            },
            {
                "Sid": "lambdaSqsActions",
                "Effect": "Allow",
                "Action": [
                    "sqs:ReceiveMessage",
                    "sqs:DeleteMessage",
                    "sqs:GetQueueAttributes"
                ],
                "Resource": "<arn_of_the_SQS_Queue>" 
            }
        ]
    }
  • ポリシー名説明を入力し、ポリシーの作成を選択します。
  • IAM コンソールから、ロールにアクセスし、ロールを作成を選択します。
  • ユースケースの下で Lambda を選択し、次へをクリックします。
  • 許可の追加ページで、AWS 管理の AWSLambdaBasicExecutionRole ポリシーと、このロールの作成前に作成したカスタムポリシーを選択します。両方のポリシーを選択したら次へを選択します。
  • 適切なロール名を入力し、ロールを作成を選択します。

Step 7: ログをフィルタリングして Splunk に送信する Lambda 関数の作成

  • AWS Lambda コンソールにアクセスし、関数の作成を選択してください。
  • 基本的な情報の下で、適切な関数名を入力し、ランタイムでは Python でサポートされている最新のランタイムを選択してください。
  • デフォルトの実行ロールの変更オプションを展開し、既存のロールを使用するを選択して、前のセクションで作成したロールを選択します。
  • 他の設定はすべてデフォルトのままにして、関数の作成を選択してください。
  • 関数が作成されたら、関数内の設定タブを選択し、一般設定を編集します。タイムアウトの値を 5 分に変更し、保存をクリックしてください。
  • 環境変数を編集し、以下のキーと値のペアを追加してください。<> 内のプレースホルダーを、環境に基づいた適切な値に置き換えてください。環境変数を追加したら、保存をクリックしてください。
    • backup_s3 = <backsplash_Amazon_S3_bucket_name_created_in_the earlier_section>
    • splunk_hec_token = <your_splunk_hec_token>
    • splunk_hec_url = <your_splunk_url>:8088/services/collector/raw
  • 関数内のコードタブを選択し、lambda_function.py を以下の Python コードで更新してください。また、Python コードはこちらの GitHub リポジトリ内の lambda_splunk_function.py ファイルからアクセスすることもできます。
import os
import boto3
import json
import gzip
import urllib3
import logginglogger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client('s3')
splunk_hec_url = os.environ['splunk_hec_url']
splunk_hec_token = os.environ['splunk_hec_token']
s3_bucket_name = os.environ['backup_s3']

def write_to_backup_s3(data, key):
data_bytes=bytes(json.dumps(data).encode())
compressed_data = gzip.compress(data_bytes)
try:
response = s3.put_object(
Bucket = s3_bucket_name,
Key = key,
Body = compressed_data
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.info('Object written to S3 successfully')

except Exception as e:
logger.info(f"Error writing object to S3: {e}")
return

def send_to_splunk_hec(data_list):
data = str(data_list)[1:-1]

headers = {
"Authorization": "Splunk " + splunk_hec_token
}

http = urllib3.PoolManager(timeout=20)

try:
response = http.request(
"POST",
splunk_hec_url,
headers=headers,
body=json.dumps(data)
)
logger.info(f"Splunk HEC Response: {response.status} - {response.data}")
return response.status

except Exception as e:
logger.info(f"HTTP POST error: {e}")
return None

def filter_data(obj):
logs_to_send = []
content = gzip.decompress(obj['Body'].read()).decode('utf-8')
flow_log_records = content.strip().split('\n')
for record in flow_log_records:
fields = record.strip().split()
action = fields[12]
logger.info(f"Action: {action}")
if action == "REJECT":
logs_to_send.append(record)
logger.info(f"logs_to_send = {logs_to_send}")
return logs_to_send

def get_object(bucket, key):
try:
obj = s3.get_object(Bucket=bucket, Key=key)
return obj

except Exception as e:
logger.info(f"Error retrieving S3 object: {e}")
return None

def lambda_handler(event, context):
for record in event['Records']:
body = record['body']
logger.info(f"received sqs message: {body}")
#Parse the json message
message = json.loads(body)
try:
#extract s3 key and bucket name from the message
bucket = message['Records'][0]['s3']['bucket']['name']
s3_key = message['Records'][0]['s3']['object']['key']
#log the bucket and s3_key parameters
logger.info(f"bucket: {bucket}")
logger.info(f"s3 key: {s3_key}")

except Exception as e:
logger.info(f"Error retrieving S3 bucket name and/or object key from message: {e}")

#if bucket and s3_key are not null, invoke get_object function
if not (bucket or s3_key):
continue
obj = get_object(bucket, s3_key)
if not obj:
continue
filtered_data = filter_data(obj)
logger.info(f"filtered data = {filtered_data}")
if filtered_data:
status = send_to_splunk_hec(filtered_data)
logger.info(f"status: {status}")
if status != 200:
write_to_backup_s3(filtered_data, s3_key)

return
  • 関数内の設定タブを選択し、トリガーを選択します。
  • トリガーを追加をクリックし、SQS を選択します。
  • SQS queue のドロップダウンから、S3 オブジェクト作成のイベント通知用に構成した SQS を選択します。
  • Activate trigger を選択します。他の設定はすべてデフォルトのままにして、追加を選択します。

Step 8: Splunk での VPC フローログの検索

Lambda 関数が作成され、SQS トリガーがアクティブ化されると、関数はすぐに VPC フローログの Splunk への転送を開始します。

  • Splunk のコンソールを開き、Searching and Reporting app 内の Search tab に移動します。
  • 次の SPL クエリを実行して、取り込まれた VPC フローログレコードを表示します。<> 内のプレースホルダーを適切な Splunk インデックス名に置き換えてください( index = <insert_index_name> sourcetype = “aws:cloudwatchlogs:vpcflow” )。

Figure 6 - Searching VPC Flow Logs in Splunk

図 6 – Splunk での VPC フローログの検索

まとめ

本記事では、AWS Lambda 関数を利用して、VPC フローログをフィルタリングして Splunk に取り込む方法について詳しく説明しました。VPC フローログを例として使用しましたが、Amazon S3 バケットに保存されている複数のログタイプに対して同様のアーキテクチャパターンの適用を検討できます。

このコード例では、プッシュベースのメカニズムを使用して、Amazon S3 に一元化された AWS やそれ以外のログを Splunk に取り込むための拡張可能なフレームワークを提供します。Lambda 関数でフィルタリングを実装することで、関心のあるログのみを取り込むことができるため、Splunk ライセンスの使用を最適化してコストを削減できます。

Splunk の詳細については、AWS Marketplace で確認することもできます。

.
Splunk-APN-Blog-Connect-2023
.


Splunk – AWS Partner Spotlight

Splunk は AWS スペシャライゼーションパートナーであり、クラウドオペレーション、データと分析、DevOps など多くの分野でコンピテンシーを有しています。大手組織では、デジタルシステムをセキュアで信頼性の高いものにするために、Splunk の統合セキュリティとオブザーバビリティプラットフォームを活用しています。

Contact Splunk | Partner Overview | AWS Marketplace | Case Studies

記事を読んでいただきありがとうございました。

本記事はパートナーセールスソリューションアーキテクトの宮城康暢が翻訳しました。原文は「Filter and Stream Logs from Amazon S3 Logging Buckets into Splunk Using AWS Lambda」を参照してください。