AWS Cloud Operations Blog
Building a fully automated Dow Jones Asset Tracking System on AWS
Dow Jones is a global provider of news and business information, delivering content to consumers and organizations around the world across multiple formats, including print, digital, mobile and live events. Dow Jones has produced unrivaled quality content for more than 130 years and today has one of the world’s largest news gathering operations globally. It produces leading publications and products including the flagship Wall Street Journal, America’s largest newspaper by paid circulation; Factiva, Barron’s, MarketWatch, Mansion Global, Financial News, Dow Jones Risk & Compliance and Dow Jones Newswires.
The Challenge
Dow Jones (DJ) manages resources across multiple accounts in multiple AWS Regions. The DJ Operations team tracks Compute and Relational Database resources in their Asset Tracking system (a configuration management database or CMDB). This helps the Operations team to effectively monitor resource health, view resource utilization, perform cost analysis and identify resource owners to troubleshoot issues in a given environment. The Operations team was looking for an automated method to maintain resource details in the Asset Tracking system which spans across multiple AWS Regions and over multiple AWS accounts whenever a resource is provisioned, terminated or modified. The resources tracked include Amazon EC2 and AWS Lambda for Compute, and Amazon RDS for databases.
Previously, the Application teams would create requests for the Asset Management team and the Asset Management team, which would then add, update, or remove resources from the system manually. Asset inventory updates would only happen during business hours. While the existing method was adequate for on-premises work, AWS’s dynamic and elastic environments made it difficult to manually track and manage the resource inventory. The Operations team could not rely on the existing manually managed Asset Tracking system data.
The Solution
To overcome these issues, the DJ Operations team developed a fully automated Asset Tracking system to track resources as soon as they are provisioned, modified or terminated. DJ has a comprehensive tagging strategy in place that keeps track of the owner, team, application stack, lifecycle policies and environment. The automated tracking system uses resource metadata tagging to have a comprehensive view of all resources. Since tracking is no longer manual, all information in the Asset Tracking system is up-to-date on the AWS resources.
The DJ Operations team decided to use serverless architecture to avoid having to manage the resources used by the Asset Tracking system. The implemented solution leverages AWS CloudTrail, Amazon S3, AWS Lambda, Amazon SNS, Amazon SQS, Amazon CloudWatch, and IAM.
The following diagram illustrates the Asset Tracking system’s seven-step workflow.
- Enable AWS CloudTrail in all existing accounts across all AWS Regions. DJ’s Landing Zone blueprint includes enablement of CloudTrail for new accounts.
- Set up AWS CloudTrail to deliver all logs to a centralized Amazon S3 bucket that exists in an isolated account with restricted access to ensure integrity of the logs.
- The Amazon S3 bucket triggers events to the Amazon SNS topic for all
PutObject
events. The Amazon SNS topic is used in case there is a future need for a fan-out option for parallel processing. - The S3 processor Lambda processes all incoming CloudTrail events. It parses and filters specific events such as
create resource
,update resource
andterminate resource
to an Amazon SQS queue. TheResource Describer
Lambda function is meant for one-time processing in case of bulk processing of events. - Once relevant events are filtered by the Lambda function, these events are queued and persisted in Amazon SQS to process for asset inventory. It follows a loosely coupled architecture so that applications can process events as they come in and move failed events to a dead letter queue to process and analyze later.
- CloudWatch scheduled events (
CloudWatch Event Rule
) poll the Amazon SQS queue at a minute frequency for new events. Scheduled events also help avoid API limit issues. If the queue has new messages, the Lambda function (SQS Poller
) invokes a processor Lambda function (CMDB pusher
) with a payload of Amazon SQS messages. One Lambda function handles only one event from the queue. The Lambda function reads the event and, depending on resource type, it runs thedescribe
on the resource from Amazon DynamoDB. Thedescribe
call collects all the tagging details for the Asset Tracking system and delivers resource details to the Asset Tracking system as API payload. Once all the steps have completed successfully, the same Lambda function removes the Amazon SQS message from queue. - The Lambda
CMDB pusher
function updates the current state of the resources in DJ’s CMBD.
The Lambda functions were developed using Python and HashiCorp Terraform for infrastructure, and the deployment was done using Jenkins.
An Alternate Solution
You can set up your own Asset Tracking system by running CloudFormation on your own account. This alternate solution uses serverless architecture with automated steps. It can create and terminate events for Amazon EC2 instances and Amazon RDS databases. You can customize it for your own use cases to add more events.
On your account, AWS CloudFormation sets up CloudTrail (an Amazon S3 bucket for storing CloudTrail Events); two Lambda functions for processing events from Amazon S3 and Amazon SQS; Amazon SQS for filtered events; and DynamoDB for asset tracking, bucket policy, roles and permissions.
---
AWSTemplateFormatVersion: 2010-09-09
Description: "AWS Management Blog"
Resources:
# =================================================================
# Lambda Role
# =================================================================
# Role used by the Lambda functions for execution
LambdaRoleForEventsProcessing:
Type: AWS::IAM::Role
Properties:
Policies:
- PolicyName: LambdaPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- cloudwatch:*
Resource: '*'
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sqs:SendMessage
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ChangeMessageVisibility
Resource: !GetAtt EventQueue.Arn
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:DeleteItem
Resource: !GetAtt cmdbDynamoDBTable.Arn
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal: { Service: lambda.amazonaws.com }
Action:
- sts:AssumeRole
# Lambda function to process CloudTrail logs
CloudTrailEventProcessor:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: !Sub |
import io
import gzip
import json
import boto3
import os
def get_records(session, bucket, key):
"""
Loads a CloudTrail log file, decompresses it, and extracts its records.
:param session: Boto3 session
:param bucket: Bucket where log file is located
:param key: Key to the log file object in the bucket
:return: list of CloudTrail records
"""
s3 = session.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
with io.BytesIO(response['Body'].read()) as obj:
with gzip.GzipFile(fileobj=obj) as logfile:
records = json.load(logfile)['Records']
return records
def handler(event, context):
"""
Checks for API calls with RunInstances, TerminateInstances,
CreateDBInstance and DeleteDBInstance in CloudTrail.
if found, send specific records to SQS for processing
:param event: S3:ObjectCreated:Put notification event
:return: 200, success if records process successfully
"""
session = boto3.session.Session()
sqs = session.client('sqs')
event_to_track =['RunInstances', 'TerminateInstances', 'CreateDBInstance', 'DeleteDBInstance']
SQS_QUEUE_URL = os.environ['TASK_QUEUE_URL']
# Get the S3 bucket and key for each log file contained in the event
for event_record in event['Records']:
try:
bucket = event_record['s3']['bucket']['name']
key = event_record['s3']['object']['key']
print('Loading CloudTrail log file s3://{}/{}'.format(bucket, key))
records = get_records(session, bucket, key)
print('Number of records in log file: {}'.format(len(records)))
#process records and filter the relevent events
for record in records:
if record["eventName"] in event_to_track:
response = sqs.send_message(QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(record),
DelaySeconds=1
)
except Exception as e:
print (e)
return {'Exception status': e}
else:
print("records processed successfully!!")
return {
'statusCode': 200,
'body': json.dumps('records pushed successfully to SQS!!')
}
Handler: index.handler
MemorySize: 1024
Role: !GetAtt LambdaRoleForEventsProcessing.Arn
Runtime: python3.7
Timeout: 30 # max is 30 seconds
Environment:
Variables:
TASK_QUEUE_URL: !Ref EventQueue
# Permission for the S3 bucket to invoke the Lambda
LambdaInvokePermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
FunctionName: !Ref CloudTrailEventProcessor
SourceAccount: !Ref 'AWS::AccountId'
# =================================================================
# CloudTrail (logs)
# =================================================================
# S3 bucket where CloudTrail log files will be delivered
CloudTrailBucket:
Type: AWS::S3::Bucket
Properties:
NotificationConfiguration:
LambdaConfigurations:
- Function: !GetAtt CloudTrailEventProcessor.Arn
Event: "s3:ObjectCreated:*"
Filter:
S3Key:
Rules:
- Name: prefix
Value: !Sub AWSLogs/${AWS::AccountId}/CloudTrail/
- Name: suffix
Value: json.gz
# Policy granting CloudTrail access to the S3 bucket
CloudTrailBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref CloudTrailBucket
PolicyDocument:
Version: 2012-10-17
Statement:
- Sid: AWSCloudTrailAclCheck
Effect: Allow
Principal: { Service: cloudtrail.amazonaws.com }
Action: s3:GetBucketAcl
Resource: !GetAtt CloudTrailBucket.Arn
- Sid: AWSCloudTrailWrite
Effect: Allow
Principal: { Service: cloudtrail.amazonaws.com }
Action: s3:PutObject
Resource: !Sub "${CloudTrailBucket.Arn}/AWSLogs/${AWS::AccountId}/*"
Condition: # ensure we have control of objects written to the bucket
StringEquals:
s3:x-amz-acl: "bucket-owner-full-control"
# Trail to gather logs for all regions
CloudTrail:
Type: AWS::CloudTrail::Trail
Properties:
IsLogging: true
IsMultiRegionTrail: true
IncludeGlobalServiceEvents: true
S3BucketName: !Ref CloudTrailBucket
EnableLogFileValidation: true
DependsOn:
# Wait for the S3 bucket policy to be created, which implies
# that the bucket itself has been created
- CloudTrailBucketPolicy
# =================================================================
# Queue for traping the events
# =================================================================
EventQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: "SQSQueueForTrackingEvents"
# =================================================================
# Processing Queued event to DynamoDB
# the next layer....
# =================================================================
cmdbDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
-
AttributeName: "instanceId"
AttributeType: "S"
KeySchema:
-
AttributeName: "instanceId"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: "5"
WriteCapacityUnits: "5"
TableName: "CMDBv1"
# Lambda function to process CloudTrail logs
EventQueueProcessor:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: !Sub |
import boto3
import os
import json
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
def handler(event, context):
"""
process the API calls and persist in DynamoDB/ delete from DynamoDB.
if id no found, ignore the delete item from DynamoDB
:return: 200, success if records process successfully
"""
REGION = os.environ['AWS_REGION']
print(REGION)
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table('CMDBv1')
#process the records from SQS to update DynamoDB.
for rec in event['Records']:
try:
record = json.loads(rec["body"])
if (record["eventName"] == "RunInstances"):
#New instance added
eventID=record["eventID"]
accountId=record["userIdentity"]["accountId"]
eventTime=record["eventTime"]
eventSource=record["eventSource"]
awsRegion=record["awsRegion"]
instanceType=record["requestParameters"]["instanceType"]
#process each event and each event may have multiple items
for instance in record["responseElements"]["instancesSet"]["items"]:
response = table.put_item(
Item={'eventID': eventID,
'account_id': accountId,
'eventTime': eventTime,
'eventSource':eventSource,
'awsRegion':awsRegion,
'instanceType': instanceType,
'instanceId': instance["instanceId"],
'privateDnsName': instance["privateDnsName"],
'subnetId': instance["subnetId"],
'vpcId':instance["vpcId"],
'details':str(instance)
}
)
elif (record["eventName"] == "TerminateInstances"):
#instance terminated
for instance in record["responseElements"]["instancesSet"]["items"]:
instanceId = instance["instanceId"]
response = table.delete_item(Key={"instanceId": instanceId})
elif (record["eventName"] == "CreateDBInstance"):
#DB instance added
response = table.put_item(
Item={'eventID': record["eventID"],
'account_id': record["userIdentity"]["accountId"],
'eventTime': record["eventTime"],
'eventSource':record["eventSource"],
'awsRegion':record["awsRegion"],
'instanceType': record["requestParameters"]["dBInstanceClass"],
'instanceId': record["responseElements"]["dbiResourceId"],
'privateDnsName': record["responseElements"]["dBInstanceArn"],
'vpcId': record["responseElements"]["dBSubnetGroup"]["vpcId"],
'multiAZ': record["responseElements"]["multiAZ"],
'engine': record["responseElements"]["engine"],
'engineVersion': record["responseElements"]["engineVersion"],
'dBInstanceClass': record["responseElements"]["dBInstanceClass"],
'details':str(record["responseElements"])
}
)
elif (record["eventName"] == "DeleteDBInstance"):
#DB instance terminated
var = record["responseElements"]["dbiResourceId"]
response = table.delete_item(Key={"instanceId": var})
else:
print("Not a valid event to in queue")
except ClientError as e:
print(e.response['Error']['Message'])
return {'statusCode': e.response['Error']['Code'], 'body': (e.response['Error']['Message'])}
else:
print("PutItem/DeleteItem succeeded:", response)
return {'statusCode': 200, 'body': json.dumps('Lambda function succeeded!!')}
Handler: index.handler
MemorySize: 1024
Role: !GetAtt LambdaRoleForEventsProcessing.Arn
Runtime: python3.7
Timeout: 30 # max is 30 seconds
# Permission for the SQS to invoke the Lambda
LambdaInvokePermissionForSQS:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
Principal: sqs.amazonaws.com
FunctionName: !Ref EventQueueProcessor
SourceAccount: !Sub ${AWS::AccountId}
LambdaFunctionEventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
Enabled: true
EventSourceArn: !GetAtt EventQueue.Arn
FunctionName: !GetAtt EventQueueProcessor.Arn
Outputs:
CloudTrailS3Bucket:
Value: !Ref CloudTrailBucket
Export:
Name: CloudTrailS3Bucket
LambdaRoleForEventsProcessing:
Value: !Ref LambdaRoleForEventsProcessing
Export:
Name: LambdaRoleForEventsProcessing
Permission CloudFormation
---
AWSTemplateFormatVersion: 2010-09-09
Description: "AWS Management Blog Addendum"
Resources:
BlogManagedPolicy:
Type: 'AWS::IAM::ManagedPolicy'
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AllowReadWrite
Effect: Allow
Action:
- s3:Get*
- s3:List*
- s3:Put*
Resource: !Join ["", ["arn:aws:s3:::",!ImportValue CloudTrailS3Bucket, "/AWSLogs/", !Ref "AWS::AccountId", "/*"]]
Roles:
- !ImportValue LambdaRoleForEventsProcessing
The following diagram illustrates this workflow:
- When a new Amazon EC2 or Amazon RDS instance is created, it generates a CloudTrail event for tracking.
- The cross-region CloudTrail event moves to a centralized Amazon S3 bucket for event processing. The Amazon S3 bucket publishes the
s3:ObjectCreated:put
event to Lambda by invoking the Lambda function, as specified in the bucket notification configuration. Because the Lambda function’s access permissions policy includes permissions for Amazon S3 to invoke the function, Amazon S3 can invoke the function. - Lambda executes the
CloudTrailEventProcessor
Lambda function by assuming the execution role created by AWS CloudFormation. The Lambda function reads the Amazon S3 events it receives as a parameter, determines where the CloudTrail object is, reads the CloudTrail object and processes the log records in the CloudTrail object. If the log includes a record with specificeventType
values, it publishes the event to your Amazon SQS for further processing. - Once events arrive at the Amazon SQS queue, it triggers the
EventQueueProcessor
Lambda function for persisting the created resource details. - The
EventQueueProcessor
Lambda function captures the event from the queue, extracts the metadata that are critical for tracking the instance and sends the payload to DynamoDB. - DynamoDB persists the resource details for instance lifecycle and, when the resource gets terminated, the item is removed from DynamoDB.
Conclusion
Using this approach, the Dow Jones Operations team is able to track their resource inventory automatically without manual intervention. Even the Asset Tracking system’s own resources are tracked by the Tracking system itself. Newly created, terminated or modified resources are updated in the inventory system within seven minutes. Above all, the Asset Tracking system is always in current status.
About the authors
Sacheen Shah is a Lead Engineer at Dow Jones. He lives in New Jersey, and helps engineering teams at Dow Jones with how best to deploy their solutions in Cloud using serverless technology and modern methodology. When he isn’t working, he likes playing console games, watching Sci-fi movies and spending time with his family.
Utsav Joshi is a Technical Account Manager at AWS. He lives in New Jersey and enjoys working with AWS customers in solving architectural, operational, and cost optimization challenges. In his spare time, he enjoys traveling, road trips and playing with his kids.