import os
import gzip
import time
import boto3
import json
import urllib.parse
from botocore.config import Config
s3 = boto3.client('s3')
def load_data_from_object(filename):
records = []
try:
with gzip.open(filename, 'rt') as f:
for text in f:
temp = text.split()
data_row = {
'ClientTime': temp[0],
'System': temp[1],
'Host': temp[2],
'URL': temp[3],
'Bytes': temp[4],
'ServerIP': temp[5],
'ClientIP': temp[6],
'Country': temp[7],
'Protocol': temp[8],
'ASN': temp[9],
'ErrorCode': temp[10],
'NetCost': temp[11]
}
dimensions = [
{'Name': 'ServerIP', 'Value': data_row['ServerIP']},
{'Name': 'ClientIP', 'Value': data_row['ClientIP']},
{'Name': 'protocol','Value': data_row['Protocol']},
{'Name': 'ASN','Value': data_row['ASN']},
{'Name': 'Country','Value': data_row['Country']},
{'Name': 'ErrorCode','Value': data_row['ErrorCode']}
]
record_netcost = {
'Dimensions': dimensions,
'MeasureName': 'neatcost',
'MeasureValue': data_row['NetCost'],
'MeasureValueType': 'BIGINT',
'Time': data_row['ClientTime']
}
records.append(record_netcost)
except Exception as e:
print('load_data_from_log_file error:',str(e))
return None
return records
def write_records(data):
session = boto3.Session()
write_client = session.client('timestream-write',
config=Config(read_timeout=20,
max_pool_connections=5000,
retries={'max_attempts': 10}))
i = 1
Records = []
while len(data) > 100*i:
Records = data[(i-1)*100:(i)*100-1]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",100*i)
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
i = i + 1
if len(data) < i*100:
Records = data[(i-1)*100:]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",len(data))
#print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
data = load_data_from_object(response['Body'])
write_records(data)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}.'.format(key, bucket))
raise e
l 同样的方式配置处理CloudFront日志函数TransformCFLogToTimestream,函数源码如下:
import os
import gzip
import time
import boto3
import json
import urllib.parse
from datetime import datetime
from botocore.config import Config
s3 = boto3.client('s3')
def load_data_from_object(filename):
print("load_data_from_log_file....")
records = []
try:
with gzip.open(filename, 'rt') as f:
for text in f:
temp = text.split()
if temp[0].startswith('#'):
continue
data_row = {
'date': temp[0],
'time': temp[1],
'x-edge-location': temp[2],
'sc-bytes': temp[3],
'c-ip': temp[4],
'cs-method': temp[5],
'Host': temp[6],
'cs-uri-stem': temp[7],
'sc-status': temp[8],
'Referer': temp[9],
'User-Agent': temp[10],
'cs-uri-query': temp[11],
'Cookie': temp[12],
'x-edge-result-type': temp[13],
'x-edge-request-id': temp[14],
'x-host-header': temp[15],
'cs-protocol': temp[16],
'cs-bytes': temp[17],
'time-taken': temp[18],
'x-forwarded-for': temp[19],
'ssl-protocol': temp[20],
'ssl-cipher': temp[21],
'x-edge-response-result-type': temp[22],
'cs-protocol-version': temp[23],
'fle-status': temp[24],
'fle-encrypted-fields': temp[25],
'c-port': temp[26],
'time-to-first-byte': temp[27],
'x-edge-detailed-result-type': temp[28],
'sc-content-type': temp[29],
'sc-content-len': temp[30],
'sc-range-start': temp[31],
'sc-range-end': temp[32]
}
dimensions = [
{'Name': 'ClientIP', 'Value': data_row['c-ip']},
{'Name': 'location', 'Value': data_row['x-edge-location']},
{'Name': 'ResultType','Value': data_row['x-edge-result-type']}
]
record_first_byte = {
'Dimensions': dimensions,
'MeasureName': 'FirstByteCost',
'MeasureValue': str(int(float(data_row['time-to-first-byte'])*1000)),
'MeasureValueType': 'BIGINT',
'Time': str(int(datetime.strptime(data_row['date']+' '+data_row['time'], "%Y-%m-%d %H:%M:%S").timestamp()))
}
records.append(record_first_byte)
except Exception as e:
print('load_data_from_log_file error:',str(e))
return None
return records
def write_records(data):
session = boto3.Session()
write_client = session.client('timestream-write',
config=Config(read_timeout=20,
max_pool_connections=5000,
retries={'max_attempts': 10}))
i = 1
Records = []
while len(data) > 100*i:
Records = data[(i-1)*100:(i)*100-1]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='CloudFrontMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",100*i)
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
i = i + 1
if len(data) < i*100:
Records = data[(i-1)*100:]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",len(data))
#print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
data = load_data_from_object(response['Body'])
write_records(data)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e