Amazon Web Services ブログ

Amazon S3 インベントリ、Amazon EMR、および Amazon Athena を使用して既存のオブジェクトのクロスリージョンレプリケーションをトリガーする

Amazon Simple Storage Service (Amazon S3) では、クロスリージョンレプリケーション (CRR) を使用して、異なる AWS リージョン内のバケット間でオブジェクトを自動的かつ非同期にコピーできます。CRR はバケットレベルの設定であり、データのコピーを異なるリージョンに保存することで、コンプライアンス要件を満たし、レイテンシーを最小限に抑えるのに役立ちます。CRR は、プレフィックスとタグによって制御されるソースバケット内のすべてのオブジェクト、またはオプションでサブセットを複製します。

CRR を有効にする前に存在していたオブジェクト (既存のオブジェクト) は複製されません。同様に、レプリケーションに使用される IAM ロールまたはバケットポリシー (バケットが異なる AWS アカウントにある場合) のいずれかで権限が設定されていないと、オブジェクトの複製に失敗することがあります (失敗したオブジェクト)。

お客様との作業では、前述の理由で多数のオブジェクトが複製されないという状況を経験しました。この記事では、既存のオブジェクトと失敗したオブジェクトに対してクロスリージョンレプリケーションをトリガーする方法を説明します。

方法論

高いレベルで、既存のオブジェクトと失敗したオブジェクトに対してコピーインプレース操作を実行する戦略です。この操作では、Amazon S3 API を使用して、タグ、アクセスコントロールリスト (ACL)、メタデータ、および暗号化キーを保持しながら、オブジェクトを自分自身の上にコピーします。操作は、オブジェクトの Replication_Status フラグもリセットします。これにより、クロスリージョンレプリケーションがトリガーされ、次にオブジェクトが宛先バケットにコピーされます。

これを達成するために、以下を使用します。

  • その場でコピーするオブジェクトを識別する Amazon S3 インベントリ。これらのオブジェクトはレプリケーションステータスがないか、または FAILED ステータスを持っています。
  • S3 インベントリファイルをテーブルとして公開するための Amazon AthenaAWS Glue
  • AWS Glue テーブルをクエリしてインプレースコピー操作を実行する Apache Spark ジョブを実行するための Amazon EMR

オブジェクトフィルタリング

問題のサイズを小さくし (数十億のオブジェクトを含むバケットを見たことがあります!)、S3 リスト操作を排除するために、Amazon S3 インベントリを使用します。S3 インベントリはバケットレベルで有効になり、S3 オブジェクトのレポートを提供します。インベントリファイルには、保留中 (PENDING)、完了 (COMPLETED)、失敗 (FAILED)、または複製 (REPLICA) といったオブジェクトのレプリケーションステータス が含まれています。既存のオブジェクトはインベントリにレプリケーションステータスがありません

インタラクティブ分析

S3 インベントリで作成したファイルを簡単に扱うために、AWS Glue Data Catalog にテーブルを作成します。Amazon Athena を使用してこのテーブルをクエリし、オブジェクトを分析できます。 Amazon EMR で実行されている Spark ジョブでこのテーブルを使用して、その場でコピーするオブジェクトを識別することもできます。

コピーインプレース実行

Amazon EMR で実行されている Spark ジョブを使用して、S3 オブジェクトのインプレースコピー操作を実行します。このステップにより、同時コピー操作の数を増やすことができます。これにより、シングルスレッドアプリケーションでコピー操作を連続して実行する場合と比較して、多数のオブジェクトに対するパフォーマンスが向上します。

アカウント設定

この例では、3 つの S3 バケットを作成しました。バケットはデモンストレーションに固有のものです。手順を実行する場合、自分自身のバケットを作成する必要があります (異なる名前で)。

crr-preexisting-demo-source という名前のソースバケットと crr-preexisting-demo-destination という名前の宛先バケットを使用しています。ソースバケットには、既存のオブジェクトとレプリケーションステータスが FAILED のオブジェクトが含まれています。S3 インベントリファイルを crr-preexisting-demo-inventory という 3 番目のバケットに格納します。

次の図は基本的な設定を示しています。

任意のバケットを使用してインベントリを保存できますが、バケットポリシーに次のステートメントを含める必要があります (自分のものに合わせて Resourceaws:SourceAccount を変更してください)。

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

この例では、6 つのオブジェクトを crr-preexisting-demo-source にアップロードしました。CRR が有効になる前に、3 つのオブジェクト (既存の *.txt) を追加しました。また、権限が CRR IAM ロールから削除された後に 3 つのオブジェクト (失敗した *.txt) を追加したため、CRR が失敗しました。

S3 インベントリを有効にする

ソースバケットで S3 インベントリを有効にする必要があります。次のように Amazon S3 コンソールでこれを行うことができます。

ソースバケットの [管理] タブで、[インベントリ] を選択します。

[新規追加] を選択し、表示されているとおりに設定を完了し、[CSV] 形式を選択して、[レプリケーションステータス] チェックボックスをオンにします。インベントリを作成するための詳細な手順については、Amazon S3 コンソールユーザーガイドの「Amazon S3 インベントリの設定方法」を参照してください。

S3 インベントリを有効にしたら、インベントリファイルが配信されるのを待つ必要があります。最初のレポートが配信されるまでに最大 48 時間かかることがあります。デモをフォローしている場合は、先に進む前にインベントリレポートが配信されていることを確認してください。

サンプルインベントリファイルは次のようになります。

オブジェクトの [概要] タブで S3 コンソールを見ることもできます。既存のオブジェクトにはレプリケーションステータスがありませんが、失敗したオブジェクトは以下のように表示されます。

Amazon Athena を使用して AWS Glue Data Catalog にテーブルを登録する

SQL を使用してインベントリファイルをクエリできるようにするには、まず AWS Glue Data Catalog に外部テーブルを作成する必要があります。https://console.aws.amazon.com/athena/home で Amazon Athena コンソールを開きます。

[Query Editor] タブで、以下の SQL ステートメントを実行します。このステートメントは、AWS Glue Data Catalog に外部テーブルを登録します。

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

テーブルを作成したら、テーブルにパーティションメタデータを追加して、AWS Glue Data Catalog に既存のデータとパーティションを認識させる必要があります。これを行うには、Metastore Consistency Check ユーティリティを使用して、パーティションメタデータをスキャンして AWS Glue Data Catalog に追加します。

MSCK REPAIR TABLE crr_preexisting_demo;

これが必要な理由の詳細については、Amazon Athena ユーザーガイドの「MSCK REPAIR TABLE」と「データパーティショニング」のドキュメントを参照してください。

テーブルとパーティションがデータカタログに登録されたので、Amazon Athena を使用してインベントリファイルをクエリできます。

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

クエリの結果は以下のとおりです。

このクエリは、特定の配信日について S3 インベントリ内のすべての行を返します。EMR クラスタを起動して、既存のオブジェクトと失敗したオブジェクトをその場にコピーする準備が整いました。

: 失敗したオブジェクトを修正することが目的である場合は、次の手順に進む前に、失敗の原因 (IAM アクセス許可または S3 バケットポリシー) を必ず修正してください。

EMR クラスターを作成してオブジェクトをコピーする

コピーインプレース操作を並列化するには、Amazon EMR で Spark ジョブを実行します。EMR クラスターの作成と EMR ステップの実行依頼を容易にするために、bash スクリプトを作成しました (この GitHub リポジトリにあります)。

スクリプトを実行するには、GitHub リポジトリを複製します。その後、次のように EMR クラスターを起動します。

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

: bash スクリプトを実行すると AWS の課金が発生します。デフォルトでは、m4.xlargem4.2xlarge の 2 つの Amazon EC2 インスタンスが作成されます。自動終了は有効になっているため、クラスタがインプレースコピーを完了したら終了します。

スクリプトは以下のタスクを実行します。

  1. デフォルトの EMR ロール (EMR_EC2_DefaultRole および EMR_DefaultRole) を作成します。
  2. ブートストラップアクションとステップに使用するファイルを Amazon S3 にアップロードします (これらのファイルを保存するには crr-preexisting-demo-inventory を使用します)。
  3. create-cluster を使用してインストールされた Apache Spark で EMR クラスターを作成します

クラスターがプロビジョニングされた後:

  1. ブートストラップアクションは boto3awscli をインストールします。
  2. Spark アプリケーションをマスターノードにコピーしてからアプリケーションを実行するという 2 つのステップが実行されます。

以下は Spark アプリケーションのハイライトです。この例の完全なコードは GitHub の amazon-s3-crr-preexisting-objects リポジトリにあります。

ここでは、AWS Glue Data Catalog に登録されているテーブルからレコードを選択し、replication_status が 「FAILED」または「“”」のオブジェクトをフィルタリングします。

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

前のクエリで返された各キーに対して copy_object 関数を呼び出します。

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata,
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself,
        # with the Storage Class, updated Metadata,
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

: Spark アプリケーションは、forcedreplication キーをオブジェクトのメタデータに追加します。これは、Amazon S3 では、オブジェクトまたはそのメタデータを変更せずにその場でコピーすることができないためです。

Amazon Athena でクエリを実行して、EMR ジョブの成功を確認します。

Spark アプリケーションはその結果を S3 に出力します。Amazon Athena を使用して別の外部テーブルを作成し、それを AWS Glue Data Catalog に登録することができます。その後、Athena を使用してテーブルをクエリし、コピーインプレース操作が成功したことを確認できます。

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

結果はコンソールに次のように表示されます。

これは、コピーインプレース操作が成功したことを示していますが、それでも CRR はオブジェクトをレプリケートする必要があります。後続のインベントリファイルは、オブジェクトのレプリケーションステータスを COMPLETED として表示します。またコンソールで、既存の *.txt失敗した *.txt が COMPLETED になっていることを確認することもできます。

CRR がバージョン管理されたバケットを必要とするので、コピーインプレース操作がオブジェクトの別のバージョンを生成することは注目に値します。S3 ライフサイクルポリシーを使用してバージョンを管理できます。

結論

この記事では、Amazon S3 インベントリ、Amazon Athena、AWS Glue Data Catalog、および Amazon EMR を使用して、既存のオブジェクトと失敗したオブジェクトに対してコピーインプレース操作を大規模に実行する方法について説明しました。

: Amazon S3 バッチ操作は、オブジェクトをコピーするための代替手段です。違いは、S3 バッチ操作では各オブジェクトの既存のプロパティをチェックせず、オブジェクトの ACL、ストレージクラス、および暗号化をオブジェクトごとに設定することです。詳細については、Amazon S3 コンソールユーザーガイドの「Amazon S3 バッチ操作の概要」を参照してください。

 


著者について

Michael Sambol は AWS のシニアコンサルタントです。彼はジョージア工科大学でコンピューターサイエンスの修士号を取得しています。Michael の趣味はエクササイズ、テニス、旅行、そして洋画鑑賞です。

 

 

 

 

Chauncy McCaughey は AWS のシニアデータアーキテクトです。 現在の副プロジェクトでは、運転習慣や交通パターンの統計分析を使用して、どのようにしていつも遅い車線にたどり着くのかを理解することに取り組んでいます。