“我们的网管系统只能接收和处理带 Excel 附件的邮件,AWS PHD(Personal Health Dashboard) 只会发送纯文本/HTML 邮件,不符合要求。”
字节跳动(ByteDance)是一家领先的互联网科技公司,成立于 2012 年。旗下拥有抖音、TikTok、今日头条、西瓜视频、飞书、Lark等产品,业务遍布全球多个国家和地区。
所有 DX 维护事件(AWS Health 服务类型为 DIRECTCONNECT 的事件)从各账号各 Region 的 Default Bus 统一转发至主账号 us-east-1 的自定义 EventBridge Bus( BD-DX-Notification)。
后续章节将逐层展开各组件配置与代码细节,帮助读者复现完整架构。从 EventBridge 策略、Lambda 逻辑、DynamoDB 建表、到 SES 发送机制,均提供可落地的配置与代码示例。
import boto3
import json
import hashlib
import time
import os
import tempfile
import random
from datetime import timedelta, datetime, timezone
from typing import TypedDict, NotRequired, Optional
from botocore.exceptions import ClientError
from openpyxl import Workbook
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
# ========================================
# 类型定义
# ========================================
class TimeInfo(TypedDict):
iso8601: str
unix_timestamp: int
human_readable: str
date_only: str
year_month: str
class CheckResult(TypedDict):
processed: bool
details: NotRequired[dict]
createdAt: NotRequired[str]
ageInDays: NotRequired[float]
class EventDetails(TypedDict):
eventTypeCode: str
startTime: str
endTime: str
region: str
account: str
resources: list[str]
ticketNumber: str
emailStatus: str
processingAttempts: int
# ========================================
# 配置项 - 全部从环境变量读取
# ========================================
EVENT_TYPE_LIST = [
'AWS_DIRECTCONNECT_EMERGENCY_MAINTENANCE_SCHEDULED',
'AWS_DIRECTCONNECT_MAINTENANCE_SCHEDULED',
'AWS_DIRECTCONNECT_MAINTENANCE_CANCELLED'
]
SUPPORTED_EVENT_SOURCES = ['aws.health', 'bd.aws.health']
# 必需的环境变量
SENDER_EMAIL = os.environ.get('SENDER_EMAIL')
RECIPIENT_EMAILS_STR = os.environ.get('RECIPIENT_EMAILS')
DYNAMODB_TABLE_NAME = os.environ.get('DYNAMODB_TABLE_NAME')
# 可选的环境变量(带默认值)
DEDUP_WINDOW_DAYS = int(os.environ.get('DEDUP_WINDOW_DAYS', '15'))
DATE_FORMAT = os.environ.get('DATE_FORMAT', '%Y/%m/%d %H:%M:%S')
TIMEZONE_OFFSET = int(os.environ.get('TIMEZONE_OFFSET', '8')) # UTC+8
MAINTENANCE_REASON = os.environ.get('MAINTENANCE_REASON', 'OS upgrade')
MAX_RETRY_ATTEMPTS = int(os.environ.get('MAX_RETRY_ATTEMPTS', '3'))
# 验证必需的环境变量
if not SENDER_EMAIL:
raise ValueError("SENDER_EMAIL environment variable is required")
if not RECIPIENT_EMAILS_STR:
raise ValueError("RECIPIENT_EMAILS environment variable is required")
if not DYNAMODB_TABLE_NAME:
raise ValueError("DYNAMODB_TABLE_NAME environment variable is required")
RECIPIENT_EMAILS = [email.strip() for email in RECIPIENT_EMAILS_STR.split(',')]
# AWS 客户端初始化
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
ses_client = boto3.client('ses')
cloudwatch = boto3.client('cloudwatch')
# ========================================
# 工具函数
# ========================================
def get_current_time_info() -> TimeInfo:
now_utc = datetime.now(timezone.utc)
return TimeInfo(
iso8601=now_utc.strftime('%Y-%m-%dT%H:%M:%SZ'),
unix_timestamp=int(now_utc.timestamp()),
human_readable=now_utc.strftime('%Y-%m-%d %H:%M:%S UTC'),
date_only=now_utc.strftime('%Y-%m-%d'),
year_month=now_utc.strftime('%Y-%m')
)
def get_current_time_local() -> str:
now_utc = datetime.now(timezone.utc)
now_local = now_utc + timedelta(hours=TIMEZONE_OFFSET)
return now_local.strftime(DATE_FORMAT)
def get_dedup_window_timestamp() -> int:
window_start = datetime.now(timezone.utc) - timedelta(days=DEDUP_WINDOW_DAYS)
return int(window_start.timestamp())
def generate_dedup_key(event_arn: str, event_type_code: str) -> str:
return f"{event_arn}#{event_type_code}"
def generate_ticket_number(start_time):
month = "{:02d}".format(start_time.month)
day = "{:02d}".format(start_time.day)
month_day = "{}{}".format(month, day)
random_number = random.randint(1000000000, 9999999999)
# 组装票号
ticket = "AWS-{}-{}".format(month_day, random_number)
print("[TICKET] Generated ticket number: {}".format(ticket))
print("[TICKET] Maintenance start time: {}".format(start_time.strftime('%Y-%m-%d %H:%M:%S')))
print("[TICKET] Breakdown: MonthDay={}, RandomNumber={}".format(month_day, random_number))
return ticket
def convert_time_with_timezone(time_str: str,
input_format: str = "%a, %d %b %Y %H:%M:%S %Z",
output_format: str = None,
add_hours: int = 0) -> datetime:
try:
if output_format is None:
output_format = DATE_FORMAT
# 解析时间
time_obj = datetime.strptime(time_str, input_format)
# 添加时区偏移
if add_hours != 0:
time_obj = time_obj + timedelta(hours=add_hours)
return time_obj
except Exception as e:
print(f"[ERROR] Time conversion failed: {e!r}")
print(f"[ERROR] Input: {time_str}, Format: {input_format}")
raise
def format_datetime(dt: datetime, format_str: str = None) -> str:
"""格式化datetime对象"""
if format_str is None:
format_str = DATE_FORMAT
return dt.strftime(format_str)
# ========================================
# DynamoDB 操作
# ========================================
def check_event_processed_in_window(event_arn: str, event_type_code: str) -> CheckResult:
try:
dedup_key = generate_dedup_key(event_arn, event_type_code)
print(f"[CHECK] Checking event dedup key: {dedup_key}")
print(f"[CHECK] Event ARN: {event_arn}")
print(f"[CHECK] Event Type: {event_type_code}")
print(f"[CHECK] Dedup window: {DEDUP_WINDOW_DAYS} days")
window_start_timestamp = get_dedup_window_timestamp()
current_timestamp = int(time.time())
window_start_date = datetime.fromtimestamp(window_start_timestamp, tz=timezone.utc)
print(f"[CHECK] Window start: {window_start_date.strftime('%Y-%m-%d %H:%M:%S UTC')}")
# 使用 dedup_key 作为查询键
response = table.get_item(
Key={'eventDedupKey': dedup_key},
ConsistentRead=True
)
item = response.get('Item')
if not item:
print(f"[NEW] Event not found in DynamoDB (dedup_key: {dedup_key})")
return CheckResult(processed=False)
created_timestamp = int(item.get('createdAtTimestamp', 0))
if created_timestamp >= window_start_timestamp:
created_at = item.get('createdAt', 'Unknown')
age_seconds = current_timestamp - created_timestamp
age_days = age_seconds / (24 * 60 * 60)
print(f"[DUPLICATE] Event found within {DEDUP_WINDOW_DAYS}-day window")
print(f"[DUPLICATE] Original created: {created_at}")
print(f"[DUPLICATE] Age: {age_days:.2f} days")
print(f"[DUPLICATE] Ticket: {item.get('ticketNumber', 'N/A')}")
print(f"[DUPLICATE] Email status: {item.get('emailStatus', 'unknown')}")
return CheckResult(
processed=True,
details=item,
createdAt=created_at,
ageInDays=age_days
)
else:
age_days = (current_timestamp - created_timestamp) / (24 * 60 * 60)
print(f"[INFO] Event found but outside window (age: {age_days:.2f} days)")
return CheckResult(processed=False)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
print(f"[NEW] Event not found (table might not exist)")
return CheckResult(processed=False)
print(f"[ERROR] DynamoDB error: {error_code}")
raise
except Exception as e:
print(f"[ERROR] Unexpected error in check: {e!r}")
raise
def record_event_processed(event_arn: str,
event_type_code: str,
event_details: EventDetails,
event_source: str,
email_sent: bool) -> bool:
try:
time_info = get_current_time_info()
dedup_key = generate_dedup_key(event_arn, event_type_code)
# 获取当前尝试次数(如果存在)
current_attempts = 1
try:
existing_item = table.get_item(Key={'eventDedupKey': dedup_key})
if 'Item' in existing_item:
current_attempts = existing_item['Item'].get('processingAttempts', 0) + 1
except:
pass
item = {
# 主键:使用组合去重键
'eventDedupKey': dedup_key,
# 原始标识(用于查询和审计)
'eventArn': event_arn,
'eventTypeCode': event_details.get('eventTypeCode', ''),
# 时间信息
'createdAt': time_info['iso8601'],
'createdAtTimestamp': time_info['unix_timestamp'],
'createdDate': time_info['date_only'],
'createdYearMonth': time_info['year_month'],
'recordCreatedTime': time_info['human_readable'],
# 事件详情
'maintenanceStartTime': event_details.get('startTime', ''),
'maintenanceEndTime': event_details.get('endTime', ''),
'region': event_details.get('region', ''),
'account': event_details.get('account', ''),
'resources': event_details.get('resources', []),
'ticketNumber': event_details.get('ticketNumber', ''),
'eventSource': event_source,
# 处理状态
'emailStatus': 'sent' if email_sent else 'failed',
'emailSent': email_sent,
'processingAttempts': current_attempts,
'lastProcessedAt': time_info['iso8601'],
# 元数据
'recordVersion': 3, # 版本号升级,反映去重逻辑变更
'processedBy': 'lambda-dx-maintenance-handler-v3',
'dedupWindowDays': DEDUP_WINDOW_DAYS,
'retentionPolicy': 'permanent',
'auditEnabled': True
}
print(f"[RECORD] Recording to DynamoDB...")
print(f"[RECORD] Dedup Key: {dedup_key}")
print(f"[RECORD] Event ARN: {event_arn}")
print(f"[RECORD] Event Type: {event_type_code}")
print(f"[RECORD] Event Source: {event_source}")
print(f"[RECORD] Ticket: {event_details.get('ticketNumber', 'N/A')}")
print(f"[RECORD] Email Status: {'sent' if email_sent else 'failed'}")
print(f"[RECORD] Attempt: {current_attempts}")
print(f"[RECORD] Timestamp: {time_info['iso8601']}")
table.put_item(Item=item)
print(f"[SUCCESS] Event recorded successfully")
# 发送 CloudWatch 指标
send_cloudwatch_metric('EventRecorded', 1)
if not email_sent:
send_cloudwatch_metric('EmailFailed', 1)
return True
except Exception as e:
print(f"[ERROR] Failed to record event: {e!r}")
send_cloudwatch_metric('RecordFailed', 1)
raise
# ========================================
# Excel 生成
# ========================================
def create_excel_file(event_data: dict) -> str:
excel_path = None
try:
print("[EXCEL] Creating workbook...")
wb = Workbook()
ws = wb.active
ws.title = "Maintenance Notice"
# 表头(注意:这里使用 GMT+8 格式,历史遗留要求)
headers = [
"Sequence Number",
f"Email SentTime (GMT+{TIMEZONE_OFFSET})",
f"Maintenance StartTime (GMT+{TIMEZONE_OFFSET})",
f"Maintenance EndTime (GMT+{TIMEZONE_OFFSET})",
"Maintenance Ticket Number",
"Impacted Links",
"Maintenance Impact",
"Urgency",
"Type",
"Vendor",
"Fault Report",
"Reason",
"Region",
"Account",
"Event ARN"
]
ws.append(headers)
# 数据行
data_row = [
event_data.get('sequence_number', 1),
event_data.get('email_timestamp_local'),
event_data.get('start_time_local'),
event_data.get('end_time_local'),
event_data.get('ticket_number'),
event_data.get('affected_resources'),
event_data.get('maintenance_impact', 'Yes'),
event_data.get('urgency', 'Yes'),
event_data.get('maint_type'),
event_data.get('vendor', 'AWS'),
event_data.get('any_wording', 'No'),
event_data.get('maintenance_reason', MAINTENANCE_REASON),
event_data.get('region'),
event_data.get('account'),
event_data.get('event_arn', '')
]
ws.append(data_row)
# 调整列宽
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 使用临时文件
with tempfile.NamedTemporaryFile(
mode='w+b',
suffix='.xlsx',
prefix='AWS_DX_Maintenance_',
delete=False
) as tmp_file:
excel_path = tmp_file.name
wb.save(excel_path)
file_size = os.path.getsize(excel_path)
print(f"[SUCCESS] Excel created: {excel_path}")
print(f"[SUCCESS] File size: {file_size} bytes")
return excel_path
except Exception as e:
print(f"[ERROR] Failed to create Excel: {e!r}")
# 清理失败的文件
if excel_path and os.path.exists(excel_path):
try:
os.remove(excel_path)
except:
pass
raise
# ========================================
# 邮件发送
# ========================================
def create_email_with_attachment(subject: str,
body: str,
attachment_path: str,
attachment_filename: str = None) -> str:
try:
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = SENDER_EMAIL
msg['To'] = ', '.join(RECIPIENT_EMAILS)
# 添加正文
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
if attachment_filename is None:
filename = os.path.basename(attachment_path)
else:
filename = attachment_filename
with open(attachment_path, 'rb') as f:
attachment = MIMEApplication(f.read())
attachment.add_header(
'Content-Disposition',
'attachment',
filename=filename
)
msg.attach(attachment)
print(f"[EMAIL] Message created successfully")
print(f"[EMAIL] Attachment: {filename}")
return msg.as_string()
except Exception as e:
print(f"[ERROR] Failed to create email: {e!r}")
raise
def send_email_via_ses(subject: str,
body: str,
attachment_path: str,
attachment_filename: str = None,
max_retries: int = 3) -> bool:
for attempt in range(1, max_retries + 1):
try:
print(f"[EMAIL] Sending attempt {attempt}/{max_retries}")
print(f"[EMAIL] From: {SENDER_EMAIL}")
print(f"[EMAIL] To: {RECIPIENT_EMAILS}")
raw_message = create_email_with_attachment(subject, body, attachment_path, attachment_filename)
response = ses_client.send_raw_email(
Source=SENDER_EMAIL,
Destinations=RECIPIENT_EMAILS,
RawMessage={'Data': raw_message}
)
message_id = response['MessageId']
print(f"[SUCCESS] Email sent! MessageId: {message_id}")
# 发送成功指标
send_cloudwatch_metric('EmailSent', 1)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
print(f"[ERROR] SES ClientError (attempt {attempt}): {error_code} - {error_msg}")
# 某些错误不需要重试
non_retryable_errors = [
'MessageRejected',
'MailFromDomainNotVerified',
'ConfigurationSetDoesNotExist'
]
if error_code in non_retryable_errors:
print(f"[ERROR] Non-retryable error, stopping attempts")
send_cloudwatch_metric('EmailFailedNonRetryable', 1)
return False
if attempt < max_retries:
wait_time = 2 ** attempt # 指数退避
print(f"[RETRY] Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
send_cloudwatch_metric('EmailFailedMaxRetries', 1)
except Exception as e:
print(f"[ERROR] Unexpected error (attempt {attempt}): {e!r}")
if attempt < max_retries:
time.sleep(2 ** attempt)
else:
send_cloudwatch_metric('EmailFailedUnexpected', 1)
print(f"[FAILED] All {max_retries} email attempts failed")
return False
# ========================================
# CloudWatch 指标
# ========================================
def send_cloudwatch_metric(metric_name: str,
value: float = 1.0,
unit: str = 'Count',
dimensions: Optional[dict] = None):
try:
metric_data = {
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.now(timezone.utc)
}
if dimensions:
metric_data['Dimensions'] = [
{'Name': k, 'Value': v} for k, v in dimensions.items()
]
cloudwatch.put_metric_data(
Namespace='DXMaintenance',
MetricData=[metric_data]
)
print(f"[METRIC] Sent: {metric_name} = {value}")
except Exception as e:
# 指标发送失败不影响主流程
print(f"[WARNING] Failed to send metric {metric_name}: {e!r}")
# ========================================
# 主处理函数
# ========================================
def lambda_handler(event, context):
print("=" * 80)
print("[START] Lambda function invoked")
print(f"[CONFIG] Dedup window: {DEDUP_WINDOW_DAYS} days")
print(f"[CONFIG] Timezone offset: UTC+{TIMEZONE_OFFSET}")
print(f"[CONFIG] Max retries: {MAX_RETRY_ATTEMPTS}")
print(f"[EVENT] Raw event: {json.dumps(event, default=str, indent=2)}")
print("=" * 80)
excel_path = None
try:
# ========================================
# 1. 验证事件源
# ========================================
event_source = event.get('source', '')
print(f"[VALIDATE] Event source: {event_source}")
if event_source not in SUPPORTED_EVENT_SOURCES:
print(f"[SKIP] Unsupported event source")
send_cloudwatch_metric('EventSkipped', 1, dimensions={'Reason': 'UnsupportedSource'})
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Event source not supported',
'eventSource': event_source,
'supportedSources': SUPPORTED_EVENT_SOURCES
})
}
# ========================================
# 2. 验证事件类型
# ========================================
event_code = event['detail']['eventTypeCode']
print(f"[VALIDATE] Event type: {event_code}")
if event_code not in EVENT_TYPE_LIST:
print(f"[SKIP] Event type not in monitoring list")
send_cloudwatch_metric('EventSkipped', 1, dimensions={'Reason': 'UnmonitoredType'})
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Event type not monitored',
'eventTypeCode': event_code,
'monitoredTypes': EVENT_TYPE_LIST
})
}
# ========================================
# 3. 检查去重(基于 event_arn + event_type_code)
# ========================================
event_arn = event['detail']['eventArn']
print(f"[VALIDATE] Event ARN: {event_arn}")
print(f"[VALIDATE] Event Type Code: {event_code}")
print(f"[VALIDATE] Dedup Key: {generate_dedup_key(event_arn, event_code)}")
check_result = check_event_processed_in_window(event_arn, event_code)
if check_result['processed']:
print(f"[SKIP] Duplicate event within {DEDUP_WINDOW_DAYS}-day window")
print(f"[SKIP] Same event_arn AND event_type_code already processed")
send_cloudwatch_metric('EventSkipped', 1, dimensions={'Reason': 'Duplicate'})
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Duplicate event (within {DEDUP_WINDOW_DAYS}-day window)',
'eventArn': event_arn,
'eventTypeCode': event_code,
'dedupKey': generate_dedup_key(event_arn, event_code),
'eventSource': event_source,
'action': 'skipped',
'dedupWindowDays': DEDUP_WINDOW_DAYS,
'originalProcessingTime': check_result.get('createdAt'),
'recordAgeInDays': round(check_result.get('ageInDays', 0), 2),
'previousTicketNumber': check_result['details'].get('ticketNumber') if check_result.get('details') else None,
'previousEmailStatus': check_result['details'].get('emailStatus') if check_result.get('details') else None
}, default=str)
}
print(f"[PROCESS] New event (or new event type for same ARN), proceeding with processing...")
send_cloudwatch_metric('EventProcessing', 1)
# ========================================
# 4. 提取事件信息
# ========================================
start_time_raw = event['detail']['startTime']
end_time_raw = event['detail']['endTime']
ls_resources = event.get('resources', [])
affected_resources = ', '.join(ls_resources) if ls_resources else 'N/A'
region = event.get('region', 'unknown')
account = event.get('account', 'unknown')
print(f"[INFO] Region: {region}")
print(f"[INFO] Account: {account}")
print(f"[INFO] Resources: {affected_resources}")
# ========================================
# 5. 时间处理
# ========================================
print("[PROCESS] Converting timestamps...")
# 转换维护时间(UTC -> UTC+TIMEZONE_OFFSET)
start_time_utc = convert_time_with_timezone(start_time_raw)
end_time_utc = convert_time_with_timezone(end_time_raw)
start_time_local = start_time_utc + timedelta(hours=TIMEZONE_OFFSET)
end_time_local = end_time_utc + timedelta(hours=TIMEZONE_OFFSET)
# 当前时间(用于邮件发送时间)- 使用 UTC+TIMEZONE_OFFSET
email_timestamp_utc = datetime.now(timezone.utc)
email_timestamp_local = email_timestamp_utc + timedelta(hours=TIMEZONE_OFFSET)
print(f"[TIME] Maintenance start (UTC): {format_datetime(start_time_utc)}")
print(f"[TIME] Maintenance start (UTC+{TIMEZONE_OFFSET}): {format_datetime(start_time_local)}")
print(f"[TIME] Email timestamp (UTC+{TIMEZONE_OFFSET}): {format_datetime(email_timestamp_local)}")
# ========================================
# 6. 生成票号(基于维护开始时间)
# ========================================
ticket_number = generate_ticket_number(start_time_local)
# ========================================
# 7. 确定维护类型
# ========================================
maint_type = "Cancel" if event_code == 'AWS_DIRECTCONNECT_MAINTENANCE_CANCELLED' else "New"
print(f"[INFO] Maintenance type: {maint_type}")
# ========================================
# 8. 准备事件数据
# ========================================
event_data = {
'sequence_number': 1,
'email_timestamp_local': format_datetime(email_timestamp_local),
'start_time_local': format_datetime(start_time_local),
'end_time_local': format_datetime(end_time_local),
'ticket_number': ticket_number,
'affected_resources': affected_resources,
'maintenance_impact': 'Yes',
'urgency': 'Yes',
'maint_type': maint_type,
'vendor': 'AWS',
'any_wording': 'No',
'maintenance_reason': MAINTENANCE_REASON,
'region': region,
'account': account,
'event_arn': event_arn
}
# ========================================
# 9. 生成 Excel 文件
# ========================================
print("[PROCESS] Creating Excel file...")
excel_path = create_excel_file(event_data)
# ========================================
# 10. 发送邮件
# ========================================
print("[PROCESS] Sending email notification...")
subject = f'AWS Direct Connect Maintenance Notice - {ticket_number}'
body = f"""Please check the attached summary of the AWS Direct Connect maintenance notice.
Maintenance Details:
{'=' * 60}
Ticket Number: {ticket_number}
Event Type: {maint_type}
Region: {region}
Account: {account}
Start Time: {format_datetime(start_time_local)} (UTC+{TIMEZONE_OFFSET})
End Time: {format_datetime(end_time_local)} (UTC+{TIMEZONE_OFFSET})
Affected Resources: {affected_resources}
Maintenance Type: {event_code}
Event Source: {event_source}
Event ARN: {event_arn}
{'=' * 60}
Important Notes:
- This is an automated notification from the DX Maintenance Handler system
- Duplicate notifications within {DEDUP_WINDOW_DAYS} days are automatically filtered
- Note: Same event ARN with different event types (e.g., SCHEDULED vs CANCELLED) will be sent separately
- All events are permanently logged for audit purposes
- Processing attempts are tracked for reliability
For questions or issues, please contact AWS Support.
---
This message was generated at {format_datetime(email_timestamp_local)} (UTC+{TIMEZONE_OFFSET})
"""
email_sent = send_email_via_ses(
subject=subject,
body=body,
attachment_path=excel_path,
attachment_filename=f'DX_Maintenance_{ticket_number}.xlsx',
max_retries=MAX_RETRY_ATTEMPTS
)
# ========================================
# 11. 记录到 DynamoDB(不论邮件是否成功)
# ========================================
print("[PROCESS] Recording event to DynamoDB...")
event_details = EventDetails(
eventTypeCode=event_code,
startTime=start_time_raw,
endTime=end_time_raw,
region=region,
account=account,
resources=ls_resources,
ticketNumber=ticket_number,
emailStatus='sent' if email_sent else 'failed',
processingAttempts=1
)
record_success = record_event_processed(
event_arn=event_arn,
event_type_code=event_code,
event_details=event_details,
event_source=event_source,
email_sent=email_sent
)
# ========================================
# 12. 准备返回结果
# ========================================
time_info = get_current_time_info()
result = {
'statusCode': 200 if email_sent else 206, # 206 = Partial Content (邮件失败但已记录)
'body': json.dumps({
'message': 'Processing complete' if email_sent else 'Processing complete with email failure',
'eventArn': event_arn,
'eventTypeCode': event_code,
'dedupKey': generate_dedup_key(event_arn, event_code),
'eventSource': event_source,
'ticketNumber': ticket_number,
'maintenanceType': maint_type,
'emailSent': email_sent,
'recordedInDynamoDB': record_success,
'recordTimestamp': time_info['iso8601'],
'dedupWindowDays': DEDUP_WINDOW_DAYS,
'retentionPolicy': 'permanent',
'action': 'processed',
'handlerVersion': 'v3.0',
'processingTimestamp': time_info['iso8601'],
'timezoneOffset': f'UTC+{TIMEZONE_OFFSET}'
})
}
print("=" * 80)
print(f"[SUCCESS] Lambda execution completed")
print(f"[RESULT] Email: {'✓ Sent' if email_sent else '✗ Failed'}")
print(f"[RESULT] DynamoDB: {'✓ Recorded' if record_success else '✗ Failed'}")
print(f"[RESULT] {json.dumps(result, indent=2)}")
print("=" * 80)
return result
except KeyError as e:
error_msg = f"Missing required field: {e!r}"
print(f"[ERROR] {error_msg}")
send_cloudwatch_metric('ProcessingError', 1, dimensions={'ErrorType': 'KeyError'})
return {
'statusCode': 400,
'body': json.dumps({
'error': 'Invalid event format',
'details': error_msg,
'missingField': str(e)
})
}
except Exception as e:
error_msg = str(e)
print(f"[ERROR] Unexpected error: {e!r}")
send_cloudwatch_metric('ProcessingError', 1, dimensions={'ErrorType': 'Unexpected'})
import traceback
traceback.print_exc()
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Internal processing error',
'details': error_msg,
'timestamp': datetime.now(timezone.utc).isoformat()
})
}
finally:
# ========================================
# 清理临时文件
# ========================================
if excel_path and os.path.exists(excel_path):
try:
os.remove(excel_path)
print(f"[CLEANUP] Removed temporary file: {excel_path}")
except Exception as e:
print(f"[WARNING] Failed to remove temporary file: {e!r}")