Amazon Web Services ブログ

Amazon Aurora Serverless MySQL データベースを操作する Data API の使用

Amazon Aurora Serverlessは、自動スケーリングされる Amazon Aurora (MySQL 互換エディション) のオンデマンド型実装です。このデータベースは、スタートアップとシャットダウンや、アプリケーションからの要求に応じた容量のスケーリングなどを自動で行います。これにより、データベースインスタンスの管理を一切行うことなく、クラウドでデータベースの実行が可能になります。頻度と継続性が低く予想しにくいワークロードのためのシンプルでコスト効果が高い、1 つの選択肢だと言えます。

AWS は最近、Data API の一般的な可用性を発表しました。この機能により、MySQL 互換バージョンの Amazon Aurora Serverless データベースを、シンプルな API エンドポイントを使ってアクセスできるようになりました。アプリケーションが常にデータベースに接続するため生じる、管理上の労力も必要なくなります。AWS Lambda を使っていれば、VPC 内で起動する Lambda 関数による追加的なオーバーヘッドもなく、Data API により安全にデータベースアクセスが行えます。また、Data API では、AWS Secrets Manager に保存されたデータベース認証情報が使えるので、API 呼び出しの際の認証情報の受け渡しも必要ありません。

このブログ記事では、コード的なインフラストラクチャー (AWS CloudFormation) を使って、Aurora Serverless MySQL クラスターをプロビジョニングする方法を解説します。Data API から Aurora Serverless データベースに SQL コマンドを発行する方法を、複数のユースケースについてコード例を示し説明します。コード例について深く知りたい場合は、GitHub にある「full end-to-end sample Serverless application」をご参照ください。

次の図に示すとおり、Data API は AWS クラウドネイティブもしくはオンプレミスの、多様なアプリケーションのために利用できます。それらには、EC2 インスタンス (VM) や、コンテナー化されサーバーレスのアプリケーションが含まれます。

Data API エンドポイントでは、SQL コマンドをプログラム的に Amazon Aurora Serverless データベースに発行するため、AWS CLI や各種の AWS SDK などが利用可能です。プログラム的に発行するコマンドには、データベースに継続的に接続することから生じる管理の必要がない、寿命が短くイベントドリブンなアプリケーション (例えば AWS Lambda) に対しての利点があります。また、DB 接続の中心にエンドポイントを置くことで、個別の Lambda 関数を多数配置し独自に接続の管理をさせる場合に比べ、データベースの負荷を低減することもできます。

Amazon Aurora Serverless クラスターのプロビジョニング

まず、Amazon Aurora Serverless データベースのプロビジョニングから開始します。データベースクラスターのプロビジョニングには、AWS CloudFormation を使用します。定義済みテンプレート (次のコード例を参照) には、Aurora Serverless クラスター、データベース認証情報を発行および保存する Secrets Manager、リソースとしてのサブネットグループが含まれています。テンプレートの完全なリファレンスについては、サンプルコードをご参照ください。

このテンプレートでは、テンプレートリソースから参照される、次のような入力パラメータを受け取ります。

  • 環境タイプ (envType)
  • データベースクラスター名 (DBClusterName)
  • データベース名 (DatabaseName)
  • データベースのマスターユーザー名 (DBMasterUserName)
  • サブネット一覧 (DBSubnetList)

ここでは、Amazon Aurora がサーバーレスモードで実行 (EngineMode: serverless) するよう設定し、データベースの自動的な停止 (AutoPause: true) が、15 分間 (SecondsUntilAutoPause: 900) に動作がなかった場合に発生するようにします。同時に、1 つの ACU (Aurora Capacity Unit) に最小容量 (MinCapacity) と、4 つの ACU に最大容量 (MaxCapacity) の指定も行います。

各 ACU は、処理とメモリの容量による組み合わせでできています。これらの設定に基づいて、Aurora Serverless はスケーリングルール、CPU 使用率の設定しきい値、接続、使用可能なメモリ量などを自動的に生成します。データベースのストレージは、標準 Aurora DB クラスターのストレージと同様に、10 GiB から 64 TiB の間で自動的にスケーリングします。詳細については、「Aurora Serverless の仕組み」をご参照ください。

 Aurora Serverless クラスターテンプレートのコード例 #1

Resources:
  RDSCluster:
    Type: AWS::RDS::DBCluster
    Properties:
      DBClusterIdentifier: !Ref DBClusterName
      MasterUsername: !Join ['', ['{{resolve:secretsmanager:', !Ref DBSecret, ':SecretString:username}}' ]]
      MasterUserPassword: !Join ['', ['{{resolve:secretsmanager:', !Ref DBSecret, ':SecretString:password}}' ]]
      DatabaseName: !Ref DatabaseName
      Engine: aurora
      EngineMode: serverless
      EngineVersion: 5.6.10a
      ScalingConfiguration:
        AutoPause: true
        MaxCapacity: 4
        MinCapacity: 1
        SecondsUntilAutoPause: 900 # 15 min
      DBSubnetGroupName:
        Ref: DBSubnetGroup

Aurora クラスターのプロパティ―への自動入力を使うには、AWS CloudFormation の動的な参照 が使えます。例えば、Secrets Manager からのユーザー名とパスワードをMasterUsernameMasterUserPassword に自動入力することができます。詳細については、上記および下記のコード例をご参照ください。

ユーザー名には、DBMasterUserName と書かれたテンプレート上の入力パラメータが参照されます。しかし、パスワードは動的に生成されるので誰に目にも触れることはありません。結果的に、アプリケーション内でも設定コード上においても、データベースのパスワードを人目に晒すことはあり得ません。パスワードは、アプリケーションのコードが関連する機密事項を Secrets Manager から取得する際に、ランタイムで解決されます。

Aurora Serverless クラスターテンプレートのコード例 #2 (リソースセクションの続き)

    DBSecret:
      Type: AWS::SecretsManager::Secret
      Properties:
        Name: !Sub "${EnvType}-AuroraUserSecret"
        Description: RDS database auto-generated user password
        GenerateSecretString:
          SecretStringTemplate: !Sub '{"username": "${DBMasterUserName}"}'
          GenerateStringKey: "password"
          PasswordLength: 30
          ExcludeCharacters: '"@/\'

テンプレートがサブネットグループリソース (DBSubnetGroup) を Aurora Serverless のため作成する際の入力として、サブネットの定義済みリストを使用しています。詳細は、前出のコード例 #1 にあるプロパティ DBSubnetGroupName と次のコード例 #3 をご覧ください。

Aurora Serverless クラスターテンプレートのコード例 #3 (リソースセクションの続き)

    DBSubnetGroup:
      Type: AWS::RDS::DBSubnetGroup
      Properties:
        DBSubnetGroupDescription: CloudFormation managed DB subnet group.
        SubnetIds:
          - !Select [0, !Ref DBSubnetList ]
          - !Select [1, !Ref DBSubnetList ]
          - !Select [2, !Ref DBSubnetList ]

このテンプレートをデプロイすると、スタックにより Aurora Serverless がプロビジョニングされます。

Data API を使い Aurora Serverless データベースを操作する

Aurora Serverless データベースの操作は、次のステップに従ってください。

Data API エンドポイントを有効化する

Aurora Serverless クラスターを使用するには、関連する Data API を有効化する必要が有ります。

RDS コンソールでインスタンスを選択し、[Network & Security] をクリック、[Data API] のチェックボックスをクリックした後、すぐにクラスターに変更を適用します。詳細については、「Aurora サーバーレス DB クラスターの変更」もしくはブログ記事「Data API public launch」をご参照ください。

また、次のコマンドを使い、AWS CLI から Data API を有効化することもできます。

aws rds modify-db-cluster --db-cluster-identifier [add-db-cluster-name-here] --enable-http-endpoint --apply-immediately

必ず、AWS CLI の最新バージョンを使うようにしてください。

Amazon Aurora Serverless のクエリエディタを使う

Data API を有効化すると、Amazon Aurora Serverless のクエリエディタを使い、次の図に示すように、SQL ステートメントを Aurora サーバーレスデータベースに対し発行できるようになります。これには、IAM ユーザーであることと、ロールがクエリエディターへのアクセス権限を持っていることが必要です。詳細は「Aurora Serverless のクエリエディタの使用」をご参照ください。

Data API 呼び出しに AWS SDK を使用する

Data API の呼び出しには、AWS CLI やその他多数の SDK が使えます。ここでは、Python (Boto3) 用に AWS SDK の使用例をいくつか挙げていきましょう。

この記事で取り上げたコード例の完全なバージョンは、GitHub レポジトリの「aws-aurora-serverless-data-api-sam」で入手していただけます。

Data API を使えるようにするには、始めに boto3 ライブラリをインポートし、RDSDataService クライアントの作成を行います (下にあるオブジェクトの rds_client をご覧ください) 。Data API の機能には、クライアントオブジェクトを使いアクセスします。

import boto3
rds_client = boto3.client('rds-data')

その後、データベース名、データベース ARN、そして Aurora Serverless クラスターが使う Secrets Manager の シークレット ARN を、モジュールスコープ変数として指定します。次のコード例を、ご自分の環境に合った値に変更してくお使いださい。

database_name = “add-database-name-here”
db_cluster_arn = “add-cluster-arn-here”
db_credentials_secrets_store_arn = “add-secrets-store-arn-here”

実際には、これらの値は Aurora Serverless クラスターをプロビジョニングした AWS CloudFormation スタックを通じエクスポートすることと、このようなスクリプトを使い自動的にインポートすることが可能です。例えば、この CloudFormation テンプレートの例で、出力パラメータを確認してみてください。

RDSDataService クライアントでは、execute_statement() という名前の関数により、Aurora Serverless データベースに対する SQL ステートメントの発行を有効化します。この関数には、既に上でご説明したパラメータが使われます。呼び出しの度にパラメータを渡す手間を省くために、ラッパー関数を作成します。次のラッパー関数 execute_statement() で詳細をご確認ください。

def execute_statement(sql):
    response = rds_client.execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql
    )
    return response

この関数の入力パラメータには、SQL ステートメント が渡されます。このステートメントは Aurora Serverless データベースで実行され、未加工のオブジェクトを返します。次のいくつかの使用例で、この関数の使い方を示します。

例 1: DDL コマンドを発行して、 データベースおよびテーブル作成する

ラッパー関数 execute_statement() は、DDL コマンド実行のためにも使えます。データベーステーブルの作成時などがその一例です。

それにはまず、package_table.sql という名前で次に示すとおりの関数を作成します。

CREATE TABLE IF NOT EXISTS package (
    package_name VARCHAR(100) NOT NULL,
    package_version VARCHAR(50) NOT NULL,
    PRIMARY KEY (package_name, package_version)
)

このファイルでは、ソフトウェアパッケージの名前やバージョン (例えば MySQL 5.7.21) を格納する、package という名前のテーブルを作成します。

次に、execute_statement() 関数を呼び出し、MySQL データベースオブジェクトと package テーブルを作成します。次の例のとおりです。

# create MySQL database object
execute_statement(f'create database if not exists {database_name}')
# create ‘package’ table
table_ddl_script_file = 'package_table.sql
print(f’Creating table from DDL file: {table_ddl_script_file}’)
with open(table_ddl_script_file, 'r') as ddl_script:
    ddl_script_content=ddl_script.read()
    execute_statement(ddl_script_content)

複数のテーブルを作成するには、配列に別々の DDL .sql ファイルを配置しておき、関数 execute_statement() を繰り返し実行させるようにします。詳細は、GitHub レポジトリの「aws-aurora-serverless-data-api-sam」をご参照ください。

例 2: シンプルなクエリを作成する

ここまでの execute_statement() 関数では、シンプルなクエリを発行し、標準出力される未加工な結果をプリントすることができます。

response = execute_statement('select * from package')
print(response['records'])

例 3: パラメータ化されたクエリを作成する

RDSDataService クライアントでは、プレースホルダパラメータを SQL ステートメント内で使えるので、クエリをパラメータ化することも可能です。エスケープされた入力値が、ランタイムでこのパラメータの解決を許可します。パラメータ化されたクエリでは、SQL のインジェクション攻撃 を防ぐ効果があります。

例として、次の SQL クエリでは、package_name という名前がついたパラメータを使っています。これは、クエリが変数 package_name で渡される値で実行される時に、ランタイムで解決するパッケージの名前を収めたプレースホルダです。

sql = 'select * from package where package_name=:package_name'
package_name = 'package100'
sql_parameters = [{'name':'package_name', 'value':{'stringValue': f'{package_name}'}}]
response = execute_statement(sql, sql_parameters)
print(response['records'])

さらに、execute_statement() 関数をリファクタリングして、パラメータ化された SQL をサポートするための追加パラメータ (sql_parameters) を受け取れるようにします。

def execute_statement(sql, sql_parameters=[]):
    response = rds_client.execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql,
        parameters=sql_parameters
    )
    return response

例 4: クエリの結果をフォーマットする

ここまででお気づきかも知れませんが、関数 execute_statement() では未加工の結果を返すので、通常それを使用するためには、内容の解析とフォーマット化が必要となります。次のコード例では、いくつかの小さな Python 関数 (formatRecords()formatRecord()formatField()) を使い、戻り値であるレコードのリストをフォーマット化するために、各レコードとフィールドの処理を行います。

# Formatting query returned Field
def formatField(field):
   return list(field.values())[0]
# Formatting query returned Record
def formatRecord(record):
   return [formatField(field) for field in record]
# Formatting query returned Field
def formatRecords(records):
   return [formatRecord(record) for record in records]
sql = 'select package_name, package_version from package'
response = execute_statement(sql)
print(formatRecords(response['records']))

例 5: SQL の insert 文をパラメータ化する

insert 文を含め、他の SQL ステートメントをパラメータ化することも可能です。例えば次のコード例では、execute_statement() 関数を使い、パッケージテーブルにタプル (“package-2”、“version-1”) を挿入しています。

sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
sql_parameters = [
    {'name':'package_name', 'value':{'stringValue': 'package-2'}},
    {'name':'package_version', 'value':{'stringValue': 'version-1'}}
]
response = execute_statement(sql, sql_parameters)
print(f'Number of records updated: {response["numberOfRecordsUpdated"]}')

例 6: SQL の insert 文をバッチ処理する

Data API では、一度の API 呼び出しだけで特定のパラメータセットについて SQL のステートメントを複数回実行させるバッチ処理にも対応しています。バッチ処理を行うと、複数回の SQL ステートメントを処理 (例えばテーブルに列を数百行を挿入) するために、ネットワーク全体が要する時間が大幅に低減されるので、顕著なパフォーマンス改善をもたらします。

RDSDataService にある batch_execute_statement() 関数がこのメリットを実現します。この関数は、execute_statement() が受け取るものと同じパラメータを受け取りますが、パラメータ化された値のリストを処理するため、2 次元配列を使用します。

このパラメータのリストを簡素化し、関数呼び出しを容易にするため、RDSDataService の batch_execute_statement() 関数をラッピングします。次に示すラッパー 関数 batch_execute_statement() は、パラメータとして SQL ステートメントと 2 次元配列 (リストのセット) を受け取り、オリジナルの batch_execute_statement() 関数を呼び出してから、未加工のオブジェクトを結果として返します。

def batch_execute_statement(sql, sql_parameter_sets):
    response = rds_client.batch_execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql,
        parameterSets=sql_parameter_sets
    )
    return response

これで、一度の API 使用だけでラッパー関数 batch_execute_statement() を呼び出し、複数の列を挿入させることが可能になりました。

以下にパッケージテーブルにパッケージを 10 個挿入する例を示します。最初に、2 次元配列 sql_parameters_sets にパッケージ名とバージョンを書き込みます。次に batch_execute_statement() を呼び出し、SQL の insert 文と配列をパラメータとして伝達させます。次のコード例をご参照ください。

sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
    sql_parameter_sets = []
    for i in range(1,11):
        entry = [
                {'name':'package_name', 'value':{'stringValue': f'package{i}'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
        ]
        sql_parameter_sets.append(entry)
    response = batch_execute_statement(sql, sql_parameter_sets)
    print(f'Number of records updated: {len(response["updateResults"])}')

例 7: 例外処理

ここでは、コード処理中に発生するデータベース関連エラーをキャッチおよびレイズする専用の例外タイプを作成します。まず、シンプルなクラス DataAccessLayerException を次のコード例のとおりに作成します。

class DataAccessLayerException(Exception):
   pass

この新しい例外タイプは、データベース関連エラーをキャッチしレイズするために使用できます。

次のコード例で、データベースに SQL ステートメントが発行された際に発生する可能性のある全エラーを、関数 add_package() がどのようにキャッチするかを示します。このラッパーは、Python 3 の「raise from」ステートメントを使い、DataAccessLayerException としてオリジナルの例外を再レイズしています。

    def add_package():
        try:
            sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
            sql_parameters = [
                {'name':'package_name', 'value':{'stringValue': f'package-2'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
            ]
            response = execute_statement(sql, sql_parameters)
            print(f'Number of records updated: {response["numberOfRecordsUpdated"]}')
        except Exception as e:
            raise DataAccessLayerException(e) from e

次のコード例に示すとおり、add_package() 関数呼び出し時に発生した DataAccessLayerException 例外を処理するために try/except ブロックを使うことができます。

 try:
     add_package()
 except DataAccessLayerException as e:
     print(e)

例 8: トランザクションのコミットとロールバックを行う

Data API では、トランザクションもサポートします.コード内でトランザクションを開始し、その中で SQL コマンドを実行した後、そのトランザクションをコミットできます。この処理中に例外が発生した場合、このトランザクションを完全にロールバックさせることが可能です。

まず、トランザクションをサポートさせるため、execute_statement() 関数と batch_execute_statement() 関数を修正してみましょう。今これらの関数は、オプションの transaction_id パラメータを受け取るようになています。仮に、呼び出し側が transaction_id 値を手渡していないと、この関数は通常どおりに実行され、実行結果は自動的にコミット呼び出しを変更します。これを使わない場合は、RDSDataService’s 関数の commit_transaction()rollback_transaction() を明示的に呼び出さなければなりません。

トランザクションをサポートするための、関数 execute_statement() へのリファクタリングは、次のようになります。

def execute_statement(sql, sql_parameters=[], transaction_id=None):
     parameters = {
         'secretArn': db_credentials_secrets_store_arn,
         'database': database_name,
         'resourceArn': db_cluster_arn,
          'sql': sql,
         'parameters': sql_parameters
     }
     if transaction_id is not None:
         parameters['transactionId'] = transaction_id
     response = rds_client.execute_statement(**parameters)
     return response

トランザクションをサポートするための、関数 batch_execute_statement() へのリファクタリングは、次のようになります。

def batch_execute_statement(sql, sql_parameter_sets, transaction_id=None):
     parameters = {
         'secretArn': db_credentials_secrets_store_arn,
         'database': database_name,
         'resourceArn': db_cluster_arn,
         'sql': sql,
         'parameterSets': sql_parameter_sets
     }
     if transaction_id is not None:
         parameters['transactionId'] = transaction_id
     response = rds_client.batch_execute_statement(**parameters)
     return response

このアップデートされた batch_execute_statement() 関数で、トランザクションコンテキスト内でデータ挿入をバッチ処理できます。仮にエラーが発生しなければ、そのトランザクションはコミットされます。そうならない場合はロールバックされます。

このコード例では、transaction_id は、関数 batch_execute_statement() へ、begin_transaction() 関数が返すトランザクションオブジェクトを使い手渡されています。

transaction = rds_client.begin_transaction(
     secretArn=db_credentials_secrets_store_arn,
     resourceArn=db_cluster_arn,
     database=database_name)
try:
    sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
    sql_parameter_sets = []
    for i in range(30,40):
        entry = [
                {'name':'package_name', 'value':{'stringValue': f'package{i}'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
        ]
        sql_parameter_sets.append(entry)
    response = batch_execute_statement(sql, sql_parameter_sets, transaction['transactionId'])
except Exception:
    transaction_response = rds_client.rollback_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
else:
    transaction_response = rds_client.commit_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
    print(f'Number of records updated: {len(response["updateResults"])}')
print(f'Transaction Status: {transaction_response["transactionStatus"]}')

結論

今回のブログ記事では、コード (AWS CloudFormation) 形式のインフラストラクチャーを使い、Aurora Serverless MySQL クラスターをプロビジョニングする方法を解説しました。いくつかのユースケースについて、Aurora Serverless データベースに対し SQL コマンドを発行するため、どのように Data API コマンドが使えるかを、コード例で示しました。さらに詳しいコード例が必要な場合は、GitHub の「full end-to-end sample Serverless application」をご参照ください。

Data API は、米国東部 (バージニア北部)、米国東部 (オハイオ)、米国西部 (オレゴン)、アジアパシフィック (東京)、および欧州 (アイルランド) のリージョンでご利用いただけます。API には料金はかかりませんが、AWS から外部へのデータ転送には通常料金がかかります。

 


著者について

Marcilio Mendonca は、シニアグローバルコンサルタントとして アマゾンウェブサービスのプロフェッショナルチームで働いています。彼は、VM コンテナーやサーバーレスアーキテクチャを用いる、クラウドネイティブな最高クラスの AWS アプリケーションについて、カスタマーデザイン、構築、デプロイなどを行ってきました。AWS に加わる以前、Marcilio はアマゾンのソフトウェア開発エンジニアでした。また Marcilio は、コンピュータサイエンスの博士号取得者でもあります。彼は余暇には、ドラム演奏、バイクツーリングを楽しみ、家族や友人達と楽しい時間を過ごすようにしています。