亚马逊AWS官方博客

如何透过 Serverless 与 API 的方式异步搜寻数据湖中的数据

背景介紹

为了要解决数据孤岛的问题(Data Silos),我们需要建立一个集中式的数据湖,更进一步在上面满足各式各样的数据分析与机器学习相关的需求。而构建数据湖的通常可以分成以下几个步骤,设定Amazon S3 作为数据湖的储存空间。然后开始摄取资料到S3,开始处理并产生数据的元数据资料,有了这些元数据我们才可以用 Amazon Athena 来搜寻这些资料。

客户在建立数据平台后遇到会遇到一个挑战是如何透过API的方式,让使用者可以搜寻数据平台上的资料,Athena是我们最常使用的服务。但是对于资料的使用者而言,我们可以透过异步的方式呼叫 Athena API 但是需要一直来问该次搜寻是否完成,GetQueryResults API 也有1000笔的限制,在这篇部落格透过 AWS 服务整合透过 serverless 的方式,实作出由使用者在发送请求时,提供一个 callback URL然后,透过 Step Function 控制流程,Athena 完成搜寻后,把搜寻结果回传给使用端,降低使用端需要一直等待并询问的工作。

解決方案

Github aws-sample 提供了本篇部落格的实作范例,可见于 asynchronously-query-data-lake-through-serverless-service

数据湖可以透过 API Gateway 配合预先约定好的 json 请求格式,包含以下资讯:

{
  "Query": "<your_sql_query>",
  "CallbackUrl": "<your_api_endpoint>",
}

命名上采用与Athena API 一致的大写开头,中间用大写分隔的做法。在API Gateway 中我们会去 Mapping object 成以下格式,并呼叫 Step Function 开始执行工作流程服务:

{
  "ApiRequest": {
    "Query": "<your_sql_query>",
    "CallbackUrl": "<your_api_endpoint>"
  }
}

首先我们需要设定InputPath: $.ApiRequest,过滤出 ApiRequest 内的 json 物件,然后我们可以开始构建所传送给 Athena Start Query Execution API,的 json 物件如下:

{
  "QueryString.$": "$.Query",
  "WorkGroup": "<your_work_group>",
  "ResultConfiguration": {
    "OutputLocation.$": "<your_output_location>"
  }
}

Athena Start Query Execution API 會回傳 QueryExecutionId,我們會透過ResultPath: $.AthenaRequest 合併回傳的結果,到 AthenaRequest 的欄位,此時我們 output 結果如下:

{
  "ApiRequest": {
    "Query": "<your_sql_query>",
    "CallbackUrl": "<your_api_endpoint>"
  },
  "AthenaRequest": {
    "QueryExecutionId": "<execution_id>"
  }
}

接下来我们会等候 15 秒,然后用InputPath: $.AthenaRequest,开始构建呼叫 Athena Get Query Execution API 所需参数如下:

{
  "QueryExecutionId.$": "$.QueryExecutionId"
}

回传结果,我们透过 ResultPath: $.AthenaResult 合并回传的结果,到 AthenaResult 的栏位,此时我们 output 结果如下:

{
  "ApiRequest": {
    "Query": "<your_sql_query>",
    "CallbackUrl": "<your_api_endpoint>"
  },
  "AthenaRequest": {
    "QueryExecutionId": "<execution_id>"
  },
  "AthenaResult": {
    "QueryExecution": {
       "Status": {
          "State": "<QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED>",
          ...
       },
       ...
    }
  } 
}

下一步我们透过 Choice 流程,判断 AthenaResult.QueryExecution.Status.State 栏位如结果为 SUCCEEDED | FAILED | CANCELLED 则把结果送入 Lambda 作为 payload,利用Lambda 产生 S3 Presigned URL,并呼叫使用者提供的 Callback URL 回传下列资讯。

{
  "State": {
    "CompletionDateTime": 1645062487295,
    "State": "SUCCEEDED | FAILED | CANCELLED",
    "SubmissionDateTime": 1645062485193
  }
  "QueryResult": {
    "PresignedUrl": "https://xxxx/query-results-path/yyy.csv",
    "ExpiredIn": 3600
  }
}

总结

API Gateway 部分,我们简化了前端使用者需要传入的参数,并开始建构所需要的独立物件为 Step Function 的执行做准备。 Step Function 中间传递参数过程,我们尽量用独立的 object 来储存我们关注的资讯,避免参数彼此互相覆盖而遗失后面的工作流程所关注的参数。最后 Lambda 来处理完成的资料,这边是使用 Presigned URL 的方式而非 API 直接回传结果的原因是,我们需要回传的资料可能超过 Payload 上限,而使用者 Presigned URL 的方式会有 ExpiredIn 所以我们也会告诉前端使用者,需要去做这一块的确认。

透过上述方式,我们可以透过 AWS 服务整合透过 serverless 的方式,实作出由使用者在发送请求时,提供一个 callback URL然后,透过 Step Function 控制流程,Athena 完成搜寻后,把搜寻结果回传给使用端,降低使用端需要一直等待并询问的工作。

本篇作者

王祥瑞

AWS 解决方案架构师,主要基于 AWS 的云计算方案架构的咨询和设计,研究在云上构建现代解决方案与微服务架构设计,尤其是在大数据和机器学习方面。通过全部12张的AWS认证,扩展技术的深度与广度。

李定远

亚马逊云科技解决方案架构师,负责亚马逊云科技无服务器计算相关的服务。对 Amazon Lambda, Amazon API Gateway 等相关产品有深入了解。