任意の SaaS から AWS へのデータ連携を Amazon AppFlow カスタムコネクタで実現 !

2023-02-02
デベロッパーのためのクラウド活用方法

Author : 松岡 勝也

皆さん、こんにちは。ソリューションアーキテクトの松岡です。

皆さんが使われている SaaS のデータをうまく活用できていますでしょうか ?
SaaS のデータ活用のために AWS 上へデータ転送する際に役立つサービスとして Amazon AppFlow があります。AppFlow はサードパーティーの SaaS と AWS サービスとの間でデータを簡単かつ安全に転送できるマネージドサービスです。そして、AppFlow は多くのサービスをサポートしており、ネイティブにサポートしていないサービスについては カスタムコネクタ を活用することでデータフローを作成できます。

本記事ではこの AppFlow カスタムコネクタについて実装例を交えながらご紹介します。今回紹介する実装例は Prototyping Engineer 栗山と Solutions Architect 松岡で作成した Google Analytics 4 (GA4) 用のカスタムコネクタのサンプルをベースとしています。なお、本記事では可読性のためソースコードから一部省略や修正をしているため、GA4 カスタムコネクタのソースコードについては こちら を参照ください。また、AppFlow は GA4 をネイティブにサポートしており、その GA4 コネクタ と本カスタムコネクタの差分はスケジュールできるデータの取得間隔です。データの取得間隔が日次で良い場合はマネージドのコネクタを、ニアリアルタイムで取得したい場合は本カスタムコネクタを活用いただければと思います。


Amazon AppFlow とは

Amazon AppFlow は前述した通り、サードパーティーの SaaS と AWS サービスとの間でデータを簡単かつ安全に転送できるマネージドサービスです。AppFlow を使用すれば GUI ベースで数クリックで、スケジュール、イベントドリブンまたはオンデマンドのデータフローを作成できます。また、追加の手順なしでフローの一部としてフィルタリングや検証などのデータ変換機能を設定することができます。

現在は 50 以上のサービスのコネクタをサポート しています。そして、ネイティブにサポートしていないサービスについてはカスタムコネクタを構築することで AppFlow を利用することができます。


カスタムコネクタとは

ネイティブにサポートしていないサービスから AppFlow を用いてデータフローを作成するには、カスタムコネクタを活用します。カスタムコネクタを実装することで、ネイティブにサポートされたコネクタと同様に AppFlow コンソールから操作可能になります。カスタムコネクタは Custom Connector SDK (Python SDK または Java SDK) を用いてカスタムコネクタ用の AWS Lambda 関数を実装することで構築できます。そのため、クォータ などは Lambda に準拠します。

Lambda 関数を実装するのであれば、独自の Lambda 関数を実装してデータの入出力を行えば良いのではと疑問に思われる方もいるかもしれません。しかしカスタムコネクタを利用することで、その実装をパッケージとして取り扱いやすくなるという利点があります。例えば、作成したカスタムコネクタを AWS Marketplace で公開したり、SaaS 事業者が自社 SaaS のカスタムコネクタを提供し、AWS を活用しているエンドユーザー企業へ自社 SaaS の利用を訴求するといったケースが考えられます。

カスタムコネクタという定型化された様式で提供することで、エンドユーザーは AppFlow コンソールの画面上で、システム連携を設定できるようになります。また、カスタムコネクタで SaaS からデータ連携のフローを作成する前提条件として、SaaS 側にデータ取得の API が必要となります。


カスタムコネクタの利用の流れ

カスタムコネクタの利用の流れは以下の通りです。

  1. カスタムコネクタとして使用する Lambda 関数の実装・デプロイ
  2. デプロイした Lambda 関数を AppFlow コンソールから「カスタムコネクタ」として登録
  3. 登録したカスタムコネクタを用いて「接続」を作成
  4. 登録したカスタムコネクタと接続を用いて「フロー」を作成

2 以降はネイティブにサポートされているコネクタと同様に AppFlow のコンソールから GUI で操作するだけで利用できるため、本記事では 1 のカスタムコネクタ用 Lambda 関数の実装に焦点を当てて紹介します。2 以降の設定方法の詳細は こちら も参照ください。


カスタムコネクタ用 Lambda 関数の実装

カスタムコネクタを構築するには Custom Connector SDK (Python または Java) を用いてカスタムコネクタ用の Lambda 関数を実装する必要があります。ここでは Python SDK を用いた場合について、実装例を交えながら紹介します。

以下 3 つの Class と各 Class 内で決まった関数を実装する必要があります。そして、各関数のレスポンスは AppFlow の決まった形で返す必要があります。

validate_connector_runtime_settings はランタイムを、validate_credentials は認証情報を検証する関数で、接続の作成時に呼ばれますが、本実装例はサンプルのため省略しています (GA4 カスタムコネクタの実装例は こちら)。また、write_data は送信先のコネクタの場合には実装する必要がありますが、今回は送信元のコネクタのため不要です (GA4 カスタムコネクタの実装例は こちら)。このため、これら 3 つを除いた残り 4 つの関数について詳細を後述します。

また、Lambda 関数に以下のようなリソースベースポリシーを付与する必要があります (参考ドキュメント)。これは AppFlow から Lambda 関数を Invoke するために必要なポリシーです。

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Sid": "<任意の名前>",
      "Effect": "Allow",
      "Principal": {
        "Service": "appflow.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:<リージョン名>:<アカウントID>:function:<Lambda関数名>",
      "Condition": {
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:appflow:<リージョン名>:<アカウントID>:*"
        }
      }
    }
  ]
}

その他の注意事項は、カスタムコネクタとして Lambda 関数を登録した後、かつフロー作成の完了前に Lambda 関数を更新した場合はカスタムコネクタの登録からやり直す必要がある点です。一方で、フロー作成まで完了すると Lambda 関数の更新はフローに即時反映されるため再登録は不要です。これはドキュメントに明記されていないので、現時点での状況となります。

また、フィルター機能を使いたい場合は SDK の custom_connector_queryfilter を利用して追加実装する必要があります。

そして、カスタムコネクタ登録やフロー作成などフロー実行時以外でも Lambda 関数は呼び出されるため、コネクタ登録やフロー作成のデバッグにも Amazon CloudWatch Logs が役立ちます。

describe_connector_configuration

describe_connector_configuration はカスタムコネクタ登録時に呼び出され、連携元システムへの認証方法の定義を返します。

認証方式については authentication_config で指定します。これは、 is_basic_auth_supported, is_api_key_auth_supported, is_oauth_2_supportedis_custom_auth_supported のいずれかを True にすることで Basic 認証、API キー、OAuth2、カスタム認証の 4 つから指定します。ここではカスタム認証を指定しているため、追加で custom_auth_config でカスタム認証で使用するパラメータのリストを設定します。

authentication_config = auth.AuthenticationConfig(
    is_custom_auth_supported=True,
    custom_auth_config=[
        auth.CustomAuthConfig(
            authentication_type="CUSTOM",
            auth_parameters=[
                auth.AuthParameter(
                    key="Key名",
                    required=True,
                    label="ラベル",
                    description="説明",
                    sensitive_field=True,
                    connector_supplied_values=None,
                )
            ],
        )
    ],
)

また、runtime_setting ではコネクタが実行時に必要とする値を設定します。カスタム認証を使用する場合はパラメータを自身で定義できるため、実行時に必要な値を authentication_config で定義することも可能ですが、認証情報は authentication_config で、それ以外の必要な値は runtime_setting で指定するといった使い分けをします。これは authentication_config で指定したパラメータの値は AWS Secrets Manager で保管されるためです。

runtime_setting = settings.ConnectorRuntimeSetting(
    key="Key名",
    data_type=settings.ConnectorRuntimeSettingDataType.String,
    required=True,
    label="ラベル",
    description="説明",
    scope=settings.ConnectorRuntimeSettingScope.CONNECTOR_PROFILE,
)

返り値の connector_modes ではカスタムコネクタが送信元と送信先に対応しているかを指定します。今回は SaaS から AWS へデータ連携するカスタムコネクタであり、送信元にのみ指定できるため ConnectorModes.SOURCE のみを設定します。関数全体のサンプルコードは以下の通りです。

import custom_connector_sdk.connector.settings as settings
import custom_connector_sdk.lambda_handler.requests as requests
import custom_connector_sdk.lambda_handler.responses as responses
import custom_connector_sdk.connector.auth as auth
import custom_connector_sdk.connector.configuration as config

def describe_connector_configuration(
        self, request: requests.DescribeConnectorConfigurationRequest
    ) -> responses.DescribeConnectorConfigurationResponse:
    
    # 認証方法 (OAuth2, Basic認証, APIキー, カスタム認証) の設定
    authentication_config = auth.AuthenticationConfig(
        is_custom_auth_supported=True,
        custom_auth_config=[
            auth.CustomAuthConfig(
                authentication_type="CUSTOM",
                auth_parameters=[
                    auth.AuthParameter(
                        key="Key名",
                        required=True,
                        label="ラベル",
                        description="説明",
                        sensitive_field=True,
                        connector_supplied_values=None,
                    )
                ],
            )
        ],
    )
   
    # ランタイムの設定 
    runtime_setting = settings.ConnectorRuntimeSetting(
        key="Key名",
        data_type=settings.ConnectorRuntimeSettingDataType.String,
        required=True,
        label="ラベル",
        description="説明",
        scope=settings.ConnectorRuntimeSettingScope.CONNECTOR_PROFILE,
    )
    
    return responses.DescribeConnectorConfigurationResponse(
        is_success=True,
        connector_owner="本カスタムコネクタの所有者",
        connector_name="本カスタムコネクタ名",
        authentication_config=authentication_config, # 認証方法の設定
        connector_version="本カスタムコネクタのバージョン", 
        supported_api_versions=["サポートしている連携元のデータ API のバージョン"],
        connector_modes=[config.ConnectorModes.SOURCE], # モードの設定 (SOURCE, DESTINATION)
        connector_runtime_setting=[runtime_setting], # ランタイムの設定
    )

GA4 カスタムコネクタの実装例は こちら です。

list_entities

list_entities はフロー作成時に呼び出され、スキーマ定義 (Entity) のリストを返します。この Entity はサポートする API 毎に定義するものです。

返り値の entities に Entity のリストを渡します。ここでは、1 つの API のみをサポートしているため、1つの Entity のみを指定します。

import custom_connector_sdk.lambda_handler.requests as requests
import custom_connector_sdk.lambda_handler.responses as responses

# スキーマ定義
sample_entity = context.Entity(
    entity_identifier="ID",
    label="ラベル",
    has_nested_entities=False, # サブオブジェクトの有無
    description="説明",
)

def list_entities(
    self, request: requests.ListEntitiesRequest
) -> responses.ListEntitiesResponse:

    return responses.ListEntitiesResponse(is_success=True, entities=[sample_entity])

GA4 カスタムコネクタの実装例は こちら です。

describe_entity

describe_entity はフロー作成時のデータフィールドマッピングで呼び出され、フィールドの定義を返します。

返り値の entity_definitionentity は、list_entitiesentities の当該 Entity と同値となるようにします。ここでは 1 つの API のみをサポートしているため、Entity は 1 つのみとなっています。また、fields は当該 Entity のフィールド定義のリストを設定します。これはデータ送信元から API を用いて取得するデータを表します。

この例では 1 つのフィールドのみを設定していますが、複数設定することも可能です。また、本カスタムコネクタは送信元として設定しているため、write_properties の定義は不要です。

import custom_connector_sdk.lambda_handler.requests as requests
import custom_connector_sdk.lambda_handler.responses as responses
import custom_connector_sdk.connector.context as context
import custom_connector_sdk.connector.fields as fields

# スキーマ定義
sample_entity = context.Entity(
    entity_identifier="ID",
    label="ラベル",
    has_nested_entities=False, # サブオブジェクトの有無
    description="説明",
)

# フロー作成のデータフィールドマッピングで呼び出され、フィールドの定義を返す
def describe_entity(
    self, request: requests.DescribeEntityRequest
) -> responses.DescribeEntityResponse:

    sample_field = context.FieldDefinition(
        field_name="フィールド名",
        data_type=fields.FieldDataType.String,
        data_type_label="string",
        label="ラベル",
        description="説明",
        default_value="デフォルト値",
        is_primary_key=True,
        # データの送信元に指定する場合に設定するフィールドの型定義
        read_properties=fields.ReadOperationProperty(
            is_queryable=True, # データ取得時にフィルタとして指定できる項目か
            is_retrievable=True, # データ取得時に表示されるか
            is_nullable=False, # null 値を許容するか
            is_timestamp_field_for_incremental_queries=False, # Date か DateTime 型か
        ),
        write_properties=None, # データの送信先に指定する場合に設定するフィールドの型定義
    )

    entity_definition = context.EntityDefinition(
        entity=sample_entity,
        fields=[sample_field],
    )

    return responses.DescribeEntityResponse(
        is_success=True, entity_definition=entity_definition
    )

GA4 カスタムコネクタの実装例は こちら です。

query_data

query_data はフロー実行時に呼び出され、フローで定義されたフィールドを取得します。

返り値の records に渡す配列は、下記 record_list のように JSON オブジェクトの文字列の配列です。そして、JSON オブジェクトのキーは各フィールドの describe_entityfield_name と一致させる必要があります。

import custom_connector_sdk.lambda_handler.requests as requests
import custom_connector_sdk.lambda_handler.responses as responses

# スキーマ定義
sample_entity = context.Entity(
    entity_identifier="ID",
    label="ラベル",
    has_nested_entities=False, # サブオブジェクトの有無
    description="説明",
)

def query_data(
    self, request: requests.QueryDataRequest
) -> responses.QueryDataResponse:

    # JSON オブジェクトの文字列の配列
    # フロー作成時にマッピングしたフィールドのみ送信される
    record_list = [
        '{"DateTime": "2022-01-01 00:00:00", "ScreenPageViews": 1}',
        '{"DateTime": "2022-01-01 00:01:00", "ScreenPageViews": 2}',
    ]

    return responses.QueryDataResponse(is_success=True, records=record_list)

GA4 カスタムコネクタの実装例は こちら です。


まとめ

本記事では AppFlow カスタムコネクタについて、Amazon AppFlow Custom Connector Python SDK を用いた実装例を交えながら紹介しました。

本記事で紹介した各関数を参考に Lambda を実装することでカスタムコネクタを構築でき、任意の SaaS からのデータ連携を AppFlow を用いて容易に実現することができます。また、AWS Samples として公開している GA4 カスタムコネクタの実装例 も本記事と併せてご参照ください!


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます


筆者プロフィール

松岡 勝也 (@ktsmats)
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト

ソリューションアーキテクトとして主に SaaS 事業者に技術的な支援をしています。最近はお客様にサンプル実装を提供するプロトタイピングによる支援も行っています。

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する