亚马逊AWS官方博客
使用 Amazon Athena 参数化查询提供数据即服务
功能概述
在发送给 Athena 的查询中,您可以在查询字符串中使用由问号 (?) 声明的位置参数,然后在 StartQueryExecution 请求中按顺序将值声明为执行参数。您可以将执行参数用于现有的预处理语句以及 Athena 中支持的 SQL 查询。您仍可利用参数化查询的可重用性和安全性优势,且在 Athena 中查看最近的查询时,使用执行参数还可屏蔽查询的参数。您也可以从手动构建 SQL 查询字符串改为使用执行参数;这样,您无需先创建预处理语句即可运行参数化查询。SELECT、INSERT INTO、CTAS 和 UNLOAD 语句目前支持参数化查询。有关最新列表以及有关执行参数的信息,请参阅使用参数化查询进行查询。
以前,要运行参数化查询,只能先在 Athena 工作组中创建预处理语句,然后运行参数化查询,同时使用 USING 子句将变量传递到 EXECUTE SQL 语句中。您不再需要在所有 Athena 工作组中创建和维护预处理语句来利用参数化。如果您在多个工作组中运行相同的查询,或者不需要预处理语句功能,这将非常有用。
您可以继续使用 Athena 工作组来隔离、实施单个成本约束,并跟踪多租户应用程序中租户的查询相关指标。例如,您 DaaS 应用程序的客户可以使用单独的工作组对您的数据集运行相同的查询。有关 Athena 工作组的更多信息,请参阅使用工作组运行查询。
更改代码以使用参数化查询
将现有代码更改为使用参数化查询是一个很小的变化,它将立即产生积极影响。以前,您需要使用环境变量作为参数占位符,手动构建查询字符串值。无论意图如何,操纵查询字符串可能都会很麻烦,并且存在注入不想要的值或 SQL 片段(如 SQL 运算符)这样的固有风险。现在,您可以将查询字符串中的变量替换为问号 (?),并通过 ExecutionParameters 选项按顺序声明变量值。这样,您就可以利用参数化查询的安全优势,查询的编写和维护也不那么复杂了。语法更改如以下代码所示,以 AWS Command Line Interface (AWS CLI) 为例。
以前,在没有执行参数的情况下针对 Athena 运行查询:
aws athena start-query-execution \
--query-string "SELECT * FROM table WHERE x = $ARG1 AND y = $ARG2 AND z = $ARG3" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup
现在,使用执行参数针对 Athena 运行参数化查询:
aws athena start-query-execution \
--query-string "SELECT * FROM table WHERE x = ? AND y = ? AND z = ?" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup \
--execution-parameters $ARG1 $ARG2 $ARG3
以下是在 Athena 工作组中创建预处理语句的命令示例。要详细了解如何创建预处理语句,请参阅使用预处理语句进行查询。
aws athena start-query-execution \
--query-string "PREPARE my-prepared-statement FROM SELECT * FROM table WHERE x = ? AND y = ? AND z = ?" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup
以前,对不带执行参数的预处理语句运行参数化查询:
aws athena start-query-execution \
--query-string "EXECUTE my-prepared-statement USING $ARG1, $ARG2, $ARG3“ \
--query-execution-context "Database"="default" \
--work-group myWorkGroup
现在,对具有执行参数的预处理语句运行参数化查询:
aws athena start-query-execution \
--query-string "EXECUTE my-prepared-statement" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup \
--execution-parameters $ARG1 $ARG2 $ARG3
示例架构
此示例架构的目的在于,运行 Athena 查询(带和不带预处理语句)时应用 ExecutionParameters 功能。这并不是要作为用于生产数据的 DaaS 解决方案。
此示例架构展示了 DaaS 应用程序,其带有用户界面 (UI),并呈现了针对公开 Amazon.com 客户评论数据集编写的三个 Athena 参数化查询。下图描述了用户向 Athena 提交查询时的此工作流。该示例使用 AWS Amplify 来托管前端应用程序。应用程序调用 Amazon API Gateway HTTP API,该 API 调用 AWS Lambda 函数对请求进行身份验证,获取 Athena 预处理语句和命名查询,然后针对 Athena 运行参数化查询。Lambda 函数使用 Athena 工作组的名称、语句名称、语句类型(无论是否为预处理语句)以及用户输入的查询参数列表。Athena 在 Amazon Simple Storage Service (Amazon S3) 存储桶(在 AWS Glue 中经过编目)中查询数据,并在 DaaS 应用程序 UI 中将结果显示给用户。
DaaS 应用程序 UI 的最终用户只能对 Athena 运行参数化查询。DaaS 应用程序 UI 演示了两种使用执行参数运行参数化查询的方法:使用和不使用预处理语句。在这两种情况下,Lambda 函数都会提交查询,等待查询完成,并提供与查询参数匹配的结果。下图描述了 DaaS 应用程序 UI。
您可能希望用户能够列出 Athena 工作组内所有 Athena 预处理语句,选择语句,输入参数并运行查询;在 DaaS 应用程序 UI 的左侧,您使用 EXECUTE 语句来查询带有 Athena 预处理语句的数据湖。您的代码库中可能保留了多个报告查询。在这种情况下,您的用户选择一条语句,输入参数,然后运行查询。在 DaaS 应用程序 UI 的右侧,您通过 SELECT 语句使用没有预处理语句的 Athena 参数化查询。
先决条件
本文使用以下 AWS 服务来演示使用 Athena 查询 Amazon.com 客户评论数据集的 DaaS 架构模式:
- AWS Amplify
- Amazon API Gateway
- Amazon Athena
- AWS CloudFormation
- Amazon DynamoDB
- AWS Glue 和 AWS Glue Data Catalog
- AWS Identity and Access Management (IAM)
- AWS Lambda
- Amazon S3
本文假设您已满足以下先决条件:
- 拥有一个 AWS 账户。有关说明,请参阅创建 AWS 账户。
- 创建了 CloudTrail 路径。有关说明,请参阅创建路径。
- 创建了 S3 存储桶。有关说明,请参阅创建存储桶。
- Node.js 16+ 版本已安装到您的设备上。
部署 CloudFormation 堆栈
在本部分中,您将部署 CloudFormation 模板来创建以下资源:
- AWS Glue Data Catalog 数据库
- AWS Glue Data Catalog 表
- Athena 工作组
- 三个 Athena 预处理语句
- 三个 Athena 命名查询
- API Gateway HTTP API
- Athena 查询的 Lambda 执行角色
- API Gateway HTTP API 授权的 Lambda 执行角色
- 五个 Lambda 函数:
- 更新 AWS Glue Data Catalog
- 授权 API Gateway 请求
- 提交 Athena 查询
- 列出 Athena 预处理语句
- 列出 Athena 命名查询
请注意,此 CloudFormation 模板已在 AWS 区域 ap-southeast-2、ca-central-1、eu-west-2、us-east-1, us-east-2 和 us-west-2 进行了测试。请注意,将其部署到您的 AWS 账户会产生成本。本文稍后将介绍清理资源的步骤。
要部署 CloudFormation 堆栈,请按照以下步骤操作:
- 导航到本文的 GitHub 存储库。
- 克隆存储库或复制 CloudFormation 模板 athena-parameterized-queries.yaml。
- 在 AWS CloudFormation 控制台上,选择 Create stack(创建堆栈)。
- 选择 Upload a template file(上传模板文件),然后选择 Choose file(选择文件)。
- 上传 athena-parameterized-queries.yaml,然后选择 Next(下一步)。
- 在 Specify stack details(指定堆栈详情)页面上,输入堆栈名称 athena-parameterized-queries。
- 在同一页面上,有两个参数:
- 对于 S3QueryResultsBucketName,在 AWS 账户中及在运行 CloudFormation 堆栈所在的相同 AWS 区域中输入 S3 存储桶名称。(在本文中,我们使用存储桶名称值,比如 my-bucket)。
- 对于 APIPassphrase,输入密码以对 API 请求进行身份验证。您稍后会用到这个。
- 选择 Next(下一步)。
- 在 Configure stack options(配置堆栈选项)页面上,选择 Next(下一步)。
- 在 Review(审核)页面上,选择 I acknowledge that AWS CloudFormation might create IAM resources with custom names(我确认 AWS CloudFormation 可能会使用自定义名称创建 IAM 资源),然后选择 Create stack(创建堆栈)。
该脚本只需不到两分钟的时间即可运行并更改为 CREATE_COMPLETE 状态。如果您在同一 AWS 账户和区域中部署堆栈两次,则某些资源可能已经存在,该过程将失败,并显示一条消息,指出该资源已存在于另一个模板中。
- 在 Outputs(输出)选项卡上,复制 APIEndpoint 值供稍后使用。
要获得部署 CloudFormation 模板的最低权限授权,您可以创建具有以下 IAM 策略操作的 AWS CloudFormation 服务角色。为此,您必须创建 IAM 策略和 IAM 角色,然后在配置堆栈选项时选择该角色。您需要将 ${Partition}、${AccountId} 和 ${Region} 的值替换为您自己的值;有关这些值的更多信息,请参阅 Pseudo 参数引用。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "IAM",
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:UntagRole",
"iam:TagRole",
"iam:CreateRole",
"iam:DeleteRole",
"iam:PassRole",
"iam:GetRolePolicy",
"iam:PutRolePolicy",
"iam:AttachRolePolicy",
"iam:TagPolicy",
"iam:DeleteRolePolicy",
"iam:DetachRolePolicy",
"iam:UntagPolicy"
],
"Resource": [
"arn:${Partition}:iam::${AccountId}:role/LambdaAthenaExecutionRole-athena-parameterized-queries",
"arn:${Partition}:iam::${AccountId}:role/service-role/LambdaAthenaExecutionRole-athena-parameterized-queries",
"arn:${Partition}:iam::${AccountId}:role/service-role/LambdaAuthorizerExecutionRole-athena-parameterized-queries",
"arn:${Partition}:iam::${AccountId}:role/LambdaAuthorizerExecutionRole-athena-parameterized-queries"
]
},
{
"Sid": "LAMBDA",
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
"lambda:GetFunction",
"lambda:InvokeFunction",
"lambda:AddPermission",
"lambda:DeleteFunction",
"lambda:RemovePermission",
"lambda:UpdateFunctionConfiguration"
],
"Resource": [
"arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaRepairFunction-athena-parameterized-queries",
"arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAthenaFunction-athena-parameterized-queries",
"arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAuthorizerFunction-athena-parameterized-queries",
"arn:${Partition}:lambda:${Region}:${AccountId}:function:GetPrepStatements-athena-parameterized-queries",
"arn:${Partition}:lambda:${Region}:${AccountId}:function:GetNamedQueries-athena-parameterized-queries"
]
},
{
"Sid": "ATHENA",
"Effect": "Allow",
"Action": [
"athena:GetWorkGroup",
"athena:CreateWorkGroup",
"athena:DeleteWorkGroup",
"athena:DeleteNamedQuery",
"athena:CreateNamedQuery",
"athena:CreatePreparedStatement",
"athena:DeletePreparedStatement",
"athena:GetPreparedStatement"
],
"Resource": [
"arn:${Partition}:athena:${Region}:${AccountId}:workgroup/ParameterizedStatementsWG"
]
},
{
"Sid": "GLUE",
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:CreateTable",
"glue:DeleteTable"
],
"Resource": [
"arn:${Partition}:glue:${Region}:${AccountId}:catalog",
"arn:${Partition}:glue:${Region}:${AccountId}:database/athena_prepared_statements",
"arn:${Partition}:glue:${Region}:${AccountId}:table/athena_prepared_statements/*",
"arn:${Partition}:glue:${Region}:${AccountId}:userDefinedFunction/athena_prepared_statements/*"
]
},
{
"Sid": "APIGATEWAY",
"Effect": "Allow",
"Action": [
"apigateway:DELETE",
"apigateway:PUT",
"apigateway:PATCH",
"apigateway:POST",
"apigateway:TagResource",
"apigateway:UntagResource"
],
"Resource": [
"arn:${Partition}:apigateway:${Region}::/apis/*/integrations*",
"arn:${Partition}:apigateway:${Region}::/apis/*/stages*",
"arn:${Partition}:apigateway:${Region}::/apis/*/authorizers*",
"arn:${Partition}:apigateway:${Region}::/apis/*/routes*",
"arn:${Partition}:apigateway:${Region}::/tags/arn%3Aaws%3Aapigateway%3A${Region}%3A%3A%2Fv2%2Fapis%2F*"
]
},
{
"Sid": "APIGATEWAYMANAGEAPI",
"Effect": "Allow",
"Action": [
"apigateway:DELETE",
"apigateway:PUT",
"apigateway:PATCH",
"apigateway:POST",
"apigateway:GET"
],
"Resource": [
"arn:${Partition}:apigateway:${Region}::/apis"
],
"Condition": {
"StringEquals": {
"apigateway:Request/ApiName": "AthenaAPI-athena-parameterized-queries"
}
}
},
{
"Sid": "APIGATEWAYMANAGEAPI2",
"Effect": "Allow",
"Action": [
"apigateway:DELETE",
"apigateway:PUT",
"apigateway:PATCH",
"apigateway:POST",
"apigateway:GET"
],
"Resource": [
"arn:${Partition}:apigateway:${Region}::/apis/*"
],
"Condition": {
"StringEquals": {
"apigateway:Resource/ApiName": "AthenaAPI-athena-parameterized-queries"
}
}
},
{
"Sid": "APIGATEWAYGET",
"Effect": "Allow",
"Action": [
"apigateway:GET"
],
"Resource": [
"arn:${Partition}:apigateway:${Region}::/apis/*"
]
},
{
"Sid": "LAMBDALAYER",
"Effect": "Allow",
"Action": [
"lambda:GetLayerVersion"
],
"Resource": [
"arn:${Partition}:lambda:*:280475519630:layer:boto3-1_24*"
]
}
]
}
创建 CloudFormation 堆栈后,您可以使用 AWS 管理控制台部署 Amplify 应用程序并查看 Lambda 函数。以下是范围缩小的 IAM 策略,您可以将其附加到 IAM 用户或角色以执行这些操作:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AmplifyCreateApp",
"Effect": "Allow",
"Action": [
"amplify:CreateBranch",
"amplify:StartDeployment",
"amplify:CreateDeployment",
"amplify:CreateApp",
"amplify:StartJob"
],
"Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
},
{
"Sid": "AmplifyList",
"Effect": "Allow",
"Action": "amplify:List*",
"Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
},
{
"Sid": "AmplifyGet",
"Effect": "Allow",
"Action": "amplify:GetJob",
"Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
},
{
"Sid": "LambdaList",
"Effect": "Allow",
"Action": [
"lambda:GetAccountSettings",
"lambda:ListFunctions"
],
"Resource": "*"
},
{
"Sid": "LambdaFunction",
"Effect": "Allow",
"Action": [
"lambda:GetFunction"
],
"Resource": "arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAthenaFunction-athena-parameterized-queries"
}
]
}
请注意,在部署 Amplify 应用程序以设置全局密码以及清理资源以删除 Amplify 应用程序时,您需要遵循以下 IAM 策略。务必将 ${AppARN} 替换为 Amplify 应用程序的 ARN。您可以在创建 Amplify 应用程序后,在 General(常规)选项卡(位于 Amplify 控制台的 App Settings(应用程序设置)部分)中查找 ARN。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "UpdateAndDeleteAmplifyApp",
"Effect": "Allow",
"Action": [
"amplify:DeleteApp",
"amplify:UpdateApp"
],
"Resource": "${AppARN}"
}
]
}
部署 Amplify 应用程序
在本部分中,您将部署 Amplify 应用程序。
- 在克隆的存储库中,在文本编辑器中打开 web-application/.env。
- 将 AWS_API_ENDPOINT 设置为 APIEndpoint 值(来自 CloudFormation 堆栈 Outputs(输出)),例如:AWS_API_ENDPOINT=”https://123456abcd.execute-api.your-region.amazonaws.com”。
- 将 API_AUTH_CODE 设置为您作为 CloudFormation 堆栈 APIPassphrase 参数输入的值。例如:API_AUTH_CODE=”YOUR_PASSPHRASE”。
- 导航到 web-application/ 目录并运行 npm install。
- 运行 npm run build 来编译分发资产。
- 在 Amplify 控制台上,选择 All apps(全部应用程序)。
- 选择 New app(新应用程序)。
- 选择 Host web app(托管 Web 应用程序),选择 Deploy without Git provider(在无 Git 提供程序下部署),然后选择 Continue(继续)。
- 对于 App name(应用程序名称),输入 Athena Parameterized Queries App。
- 对于 Environment name(环境名称),您无需输入值。
- 选择 Drag and Drop(拖放)。
- 找到 dist/ 目录(位于 web-application/ 内),将其拖放到窗口。确保拖动整个目录,而不是其中的文件。
- 选择 Save and deploy(保存并部署),以在 Amplify 上部署 Web 应用程序。
此步骤只需不到一分钟即可完成。
- 在 App settings(应用程序设置)下,选择 Access control(访问控制),然后选择 Manage access(管理访问)。
- 选择 Apply a global password(应用全局密码),然后为 Username(用户名)和 Password(密码)输入值。
您将使用这些凭据访问您的 Amplify 应用程序。
访问您的 Amplify 应用程序并运行查询
在部分中,您将使用 Amplify 应用程序对 Amazon.com 客户评论数据集运行 Athena 参数化查询。应用程序的左侧显示如何使用 Athena 预处理语句运行参数化查询。应用程序的右侧显示如何在没有预处理语句的情况下运行参数化查询,例如在代码中编写查询。本文中的示例使用 Athena 工作组中的命名查询。有关命名查询的更多信息,请参阅 NamedQuery。
- 打开位于 Domain(域)下的 Amplify Web 应用程序链接。例如:https://dev123.abcd12345xyz.amplifyapp.com/。
- 在 Sign in(登录)提示中,输入作为 Amplify 应用程序全局密码提供的用户名和密码。
- 对于 Workgroup Name(工作组名称),选择 ParameterizedStatementsWG 工作组。
- 在 Prepared Statement(预处理语句)或 SQL Statement(SQL 语句)下拉菜单中,选择语句示例。
选择语句将显示有关查询的说明,包括您可以尝试使用此语句的参数示例以及原始 SQL 查询字符串。字符串类型的 SQL 参数必须用单引号括起来,例如:’your_string_value’。
- 输入您的查询参数。
下图显示了要为 product_helpful_reviews 预处理语句输入的参数示例。
- 选择 Run Query(运行查询),将查询请求发送到 API 端点。
查询运行后,示例应用程序将以表格格式显示结果,如以下屏幕截图所示。这是呈现结果的众多方法之一,您的应用程序可以以对用户最有意义的格式显示结果。完整的查询工作流如前面的架构图所示。
在适用于 Python (Boto3) 的 AWS SDK 中使用执行参数
在本部分中,您将检查 Lambda 函数代码,以使用带和不带预处理语句的 StartQueryExecution API。
- 在 Lambda 控制台上,选择 Functions(函数)。
- 导航到 LambdaAthenaFunction-athena-parameterized-queries 函数。
- 选择 Code Source (代码源)窗口。
使用适用于 Python (Boto3) 的 AWS SDK 向 Athena StartQueryExecution API 传递参数的示例从第 39 行和第 49 行开始。请注意第 45 行和第 55 行的 ExecutionParameters 选项。
以下代码将执行参数与 Athena 预处理语句结合使用:
response = athena.start_query_execution(
QueryString=f'EXECUTE {statement}', # Example: "EXECUTE prepared_statement_name"
WorkGroup=workgroup,
QueryExecutionContext={
'Database': 'athena_prepared_statements'
},
ExecutionParameters=input_parameters
)
以下代码使用不带 Athena 预处理语句的执行参数:
response = athena.start_query_execution(
QueryString=statement, # Example: "SELECT * FROM TABLE WHERE parameter_name = ?"
WorkGroup=workgroup,
QueryExecutionContext={
'Database': 'athena_prepared_statements'
},
ExecutionParameters=input_parameters
)
清除
在本文中,您创建了几个会产生成本的组件。为避免将来产生费用,请按以下步骤移除资源:
- 删除 S3 存储桶的结果前缀(您在工作组上运行查询后所创建)。
使用默认模板时,前缀名为 <S3QueryResultsBucketName>/athena-results。在此步骤中要格外小心。除非您在 S3 存储桶上使用版本控制,否则,删除 S3 对象将无法撤消。
- 在 Amplify 控制台上,选择要删除的应用程序,并在 Actions(操作)菜单中,选择 Delete app(删除应用程序),然后进行确认。
- 在 AWS CloudFormation 控制台上,选择要删除的堆栈,选择 Delete(删除),然后进行确认。
结论
在本文中,我们展示了如何使用 Athena 参数化查询构建 DaaS 应用程序。Athena 中的 StartQueryExecution API 现在支持执行参数,允许您将 Athena 查询作为参数化查询运行。您可以将执行参数与查询字符串分离,并使用参数化查询,而不局限于创建预处理语句的 Athena 工作组。您可以利用 Athena 通过参数化查询提供的安全优势,开发人员不再需要手动构建查询字符串。在本文中,您学习了如何使用执行参数,并部署了 DaaS 参考架构来了解如何应用参数化查询。
您可以通过 Athena 控制台、AWS CLI 或 AWS SDK,开始使用 Athena 参数化查询。要详细了解 Athena,请参阅 Amazon Athena 用户指南。有关使用执行参数的更多信息,请参阅使用参数化查询进行查询。
感谢您阅读这篇文章! 如果您对 Athena 预处理语句和参数化查询有疑问,请随时发表评论。
关于作者
Blayze Stefaniak 是技术策略师计划的高级解决方案架构师,支持 AWS 营销领域的高级客户计划。他拥有医疗保健、汽车和公共部门等行业工作的经验。他热衷于将复杂的情况分解为切实可行的东西。在业余时间,Blayze 喜欢听《星球大战》有声读物,试着逗狗发笑,可能还会不出声说话。
Daniel Tatarkin 是 Amazon Web Services (AWS) 的解决方案架构师,为联邦金融机构提供支持。他对大数据分析和无服务器技术充满热情。在工作之外,他喜欢学习个人理财、喝咖啡,也喜欢尝试新的编程语言,他觉得这很有趣。
Matt Boyd 是 AWS 的高级解决方案架构师,与联邦金融机构合作。他对有效的云管理和治理以及数据治理策略充满热情。不工作时,他喜欢跑步、举重,教上小学的儿子掌握道德黑客技能。