亚马逊AWS官方博客

通过Lambda实现URL文件直接上传到S3

对象存储已经是公有云厂商的标准服务,所有厂商都支持在对象存储服务上提供基于对象的上传,下载等服务。但原生的服务接口通常默认上传下载的数据位置一端为本地,一端为云上。但如果需要上传到对象服务的文件是存在某个通过URL可访问的数据,在AWS上还需要客户自行完成下载之后,再使用cli或者sdk向S3上传。另外,在很多业务场景中,用户需要批量下载互联网资源并上传到S3中。

本文介绍一个基于lambda的实现,可以帮助客户直接指定数据源的URL并自动完成数据的下载和上传过程。如下图所示,用户可以通过设置url,bucket,prefix等参数调用Lambda函数完成网上数据的抓取和S3上传,上传后的文件位置会返回给客户。

AWS Lambda 是一项无服务器计算服务,可运行代码来响应事件并为您自动管理底层计算资源。Lambda 在高可用性计算基础设施上运行代码,用于执行计算资源的所有管理工作。这包括服务器和操作系统维护、容量调配和弹性伸缩、代码和安全补丁部署以及代码监控和日志记录。您只需要提供代码,而无需关心后端计算资源的管理和运维。

在这篇blog中,我们将介绍通过lambda实现URL文件直接上传到S3并通知到客户的具体方法。

第一步 构建docker容器镜像

  1. 在你的运行环境中安装docker与git, 并确保你有AWS账户可以访问ECR,Lambda等服务。
  2. 在terminal 中运行:#git clone https://github.com/zhaoanbei/url2s3.git
  3. clone到本地的文件目录如下:

  1. 查看Dockerfile内容如下:
# python3.8 lambda base image
FROM public.ecr.aws/lambda/python:3.8
# copy requirements.txt to container
COPY requirements.txt ./
# installing dependencies
RUN pip3 install -r requirements.txt
# Copy function code to container
COPY app.py ./
# setting the CMD to your handler file_name.function_name
CMD [ "app.lambda_handler" ]
  1. py代码如下:其中,get_file用于下载文件,upload_file用于上传文件:
#-*- coding: utf-8 -*-

import json
import urllib.request

import logging
import boto3
from botocore.exceptions import ClientError
import os
import requests
import time


local_path = "/tmp/" #"/mnt/test/tmp"
local_file_name = "temp_file"
local_file = local_path+local_file_name

def urllib_get_file_size(url):
    file = urllib.request.urlopen(url)
    print("file size to get is: "+str(file.length))
    return(int(file.length))

def requests_get_file_size(url):
    info = requests.head(url)
    print("file size to get is: " +info.headers['Content-Length'])
    return(info.headers['Content-Length'])

def get_file(url):
    r = requests.get(url)
    with open(local_file, 'wb') as f:
        f.write(r.content) 

def get_dir_available_size(dir):
    batcmd = "df -k "+dir
    result = os.popen(batcmd)
    lines = result.readlines()
    #print(batcmd+"output: "+" line 0:"+lines[0])
    #print(batcmd+"output: "+" line 1:"+lines[1])
    line = lines[1].split()
    # make size unit to byte
    dir_size = int(line[3])*1024
    print("dir available size: "+str(dir_size))
    return dir_size

def rm_file(file_path):
    os.remove(file_path)


def upload_file(file_name, bucket, object_name=None):
    if object_name is None:
        object_name = os.path.basename(file_name)
    s3_client = boto3.client('s3')
    try:
        s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True


def lambda_handler(event, context):
    url = event['url']
    bucket = event['bucket']
    prefix = event['prefix']+'/'+url.split('/')[-1]
    s3path='s3://'+bucket+'/'+prefix

    file_size = urllib_get_file_size(url)
    dir_available_size = get_dir_available_size(local_path)
    if dir_available_size < file_size:
        print("Not enough storage!")
        return -1
    else:
        print("File size is ok")

        get_file(url)
        upload_file(local_file, bucket, prefix)
        time.sleep(1)
        rm_file(local_file)
        responseObject ={}
        responseObject['statusCode'] = 200
        responseObject['headers']= {}
        responseObject['headers']['Content-Type'] = 'application/json'
        responseObject['headers']['s3path'] = s3path
        
        return responseObject

第二步 容器镜像上传到ECR服务

  1. 按照https://docs.aws.amazon.com/zh_cn/AmazonECR/latest/userguide/repository-create.html创建私有 repository(AWS控制台进入repository, 根据右上角view push demands进行image build与push操作)

第三步 创建Lambda函数

  1. 进入lambda控制台,选择create function, Container image, 选择之前构建的repository,latest镜像版本。

  1. 创建完成后,配置对应的Memory, Ephemeral storage与Timeout。这里需要注意的是,如果预期要获取的文件较大,请调整‘Ephemeral storage’的大小到合理的取值(最大为10GB)

  1. 在Test下编辑测试事件,点击右上角save changes, Test。测试过程中可以通过Lambda的控制台结合CloudWatch的日志组进行过程的跟踪和分析。

第四步 本地触发Lambda函数

通过git代码仓库中的python_invoke.ipynb,可以实现对应的lambda调用。有关boto3可参考:https://boto3.amazonaws.com/v1/documentation/api/latest/index.html

以下是python_invoke.ipynb的具体实现,可以返回已保存的文件在S3中的位置:

import boto3
import json
import logging

cn = boto3.session.Session(profile_name='cn')

lambda_client = cn.client('lambda')
para = {
  "url": "https://dinoanimals.com/wp-content/uploads/2021/01/Burmese-cat-16.jpg",
  "bucket": "anbei-show",
  "prefix": "url2s3"
}
class LambdaWrapper:
    def __init__(self, lambda_client):
        self.lambda_client = lambda_client

    def invoke_function(self, function_name, function_params, get_log=False):
        response = self.lambda_client.invoke(
            FunctionName=function_name,
            Payload=json.dumps(function_params),
            LogType='Tail' if get_log else 'None')
        logging.info("Invoked function %s.", function_name)
        return json.loads(response['Payload'].read().decode("utf-8"))
    
response = LambdaWrapper(lambda_client).invoke_function('url2s3',para)
s3path = response['headers']['s3path']

通过本工具,用户可以自动完成针对特定URL文件的自动S3上传。后续会继续改进本工具,以期实现批量数据的自动同步和异步上传等功能。

本篇作者

薛东

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云平台的解决方案咨询和设计,目前在亚马逊云科技大中华区服务教育行业客户。专注于无服务,安全等技术方向。

赵安蓓

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云平台的解决方案咨询和设计,机器学习TFC成员。在数据处理与建模领域有着丰富的实践经验,特别关注医疗领域的机器学习工程化与运用。