AWS Cloud Operations Blog

Building a fully automated Dow Jones Asset Tracking System on AWS

Dow Jones is a global provider of news and business information, delivering content to consumers and organizations around the world across multiple formats, including print, digital, mobile and live events. Dow Jones has produced unrivaled quality content for more than 130 years and today has one of the world’s largest news gathering operations globally. It produces leading publications and products including the flagship Wall Street Journal, America’s largest newspaper by paid circulation; Factiva, Barron’s, MarketWatch, Mansion Global, Financial News, Dow Jones Risk & Compliance and Dow Jones Newswires.

The Challenge

Dow Jones (DJ) manages resources across multiple accounts in multiple AWS Regions. The DJ Operations team tracks Compute and Relational Database resources in their Asset Tracking system (a configuration management database or CMDB). This helps the Operations team to effectively monitor resource health, view resource utilization, perform cost analysis and identify resource owners to troubleshoot issues in a given environment. The Operations team was looking for an automated method to maintain resource details in the Asset Tracking system which spans across multiple AWS Regions and over multiple AWS accounts whenever a resource is provisioned, terminated or modified. The resources tracked include Amazon EC2 and AWS Lambda for Compute, and Amazon RDS for databases.

Previously, the Application teams would create requests for the Asset Management team and the Asset Management team, which would then add, update, or remove resources from the system manually. Asset inventory updates would only happen during business hours. While the existing method was adequate for on-premises work, AWS’s dynamic and elastic environments made it difficult to manually track and manage the resource inventory. The Operations team could not rely on the existing manually managed Asset Tracking system data.

The Solution

To overcome these issues, the DJ Operations team developed a fully automated Asset Tracking system to track resources as soon as they are provisioned, modified or terminated. DJ has a comprehensive tagging strategy in place that keeps track of the owner, team, application stack, lifecycle policies and environment. The automated tracking system uses resource metadata tagging to have a comprehensive view of all resources. Since tracking is no longer manual, all information in the Asset Tracking system is up-to-date on the AWS resources.

The DJ Operations team decided to use serverless architecture to avoid having to manage the resources used by the Asset Tracking system. The implemented solution leverages AWS CloudTrail, Amazon S3, AWS Lambda, Amazon SNS, Amazon SQS, Amazon CloudWatch, and IAM.

The following diagram illustrates the Asset Tracking system’s seven-step workflow.

  1. Enable AWS CloudTrail in all existing accounts across all AWS Regions. DJ’s Landing Zone blueprint includes enablement of CloudTrail for new accounts.
  2. Set up AWS CloudTrail to deliver all logs to a centralized Amazon S3 bucket that exists in an isolated account with restricted access to ensure integrity of the logs.
  3. The Amazon S3 bucket triggers events to the Amazon SNS topic for all PutObject events. The Amazon SNS topic is used in case there is a future need for a fan-out option for parallel processing.
  4. The S3 processor Lambda processes all incoming CloudTrail events. It parses and filters specific events such as create resource, update resource and terminate resource to an Amazon SQS queue. The Resource Describer Lambda function is meant for one-time processing in case of bulk processing of events.
  5. Once relevant events are filtered by the Lambda function, these events are queued and persisted in Amazon SQS to process for asset inventory. It follows a loosely coupled architecture so that applications can process events as they come in and move failed events to a dead letter queue to process and analyze later.
  6. CloudWatch scheduled events (CloudWatch Event Rule) poll the Amazon SQS queue at a minute frequency for new events. Scheduled events also help avoid API limit issues. If the queue has new messages, the Lambda function (SQS Poller) invokes a processor Lambda function (CMDB pusher) with a payload of Amazon SQS messages. One Lambda function handles only one event from the queue. The Lambda function reads the event and, depending on resource type, it runs the describe on the resource from Amazon DynamoDB. The describe call collects all the tagging details for the Asset Tracking system and delivers resource details to the Asset Tracking system as API payload. Once all the steps have completed successfully, the same Lambda function removes the Amazon SQS message from queue.
  7. The Lambda CMDB pusher function updates the current state of the resources in DJ’s CMBD.

The Lambda functions were developed using Python and HashiCorp Terraform for infrastructure, and the deployment was done using Jenkins.

An Alternate Solution

You can set up your own Asset Tracking system by running CloudFormation on your own account. This alternate solution uses serverless architecture with automated steps. It can create and terminate events for Amazon EC2 instances and Amazon RDS databases. You can customize it for your own use cases to add more events.

On your account, AWS CloudFormation sets up CloudTrail (an Amazon S3 bucket for storing CloudTrail Events); two Lambda functions for processing events from Amazon S3 and Amazon SQS; Amazon SQS for filtered events; and DynamoDB for asset tracking, bucket policy, roles and permissions.

---
AWSTemplateFormatVersion: 2010-09-09
Description: "AWS Management Blog"

Resources:

  # =================================================================
  # Lambda Role
  # =================================================================

  # Role used by the Lambda functions for execution
  LambdaRoleForEventsProcessing:
    Type: AWS::IAM::Role
    Properties:
      Policies:
      - PolicyName: LambdaPolicy
        PolicyDocument:
          Version: 2012-10-17
          Statement:
          - Effect: Allow
            Action:
            - cloudwatch:*
            Resource: '*'
          - Effect: Allow
            Action:
            - logs:CreateLogGroup
            - logs:CreateLogStream
            - logs:PutLogEvents
            Resource: '*'
          - Effect: Allow
            Action:
            - sqs:SendMessage
            - sqs:ReceiveMessage
            - sqs:DeleteMessage
            - sqs:GetQueueAttributes
            - sqs:ChangeMessageVisibility
            Resource: !GetAtt EventQueue.Arn
          - Effect: Allow
            Action:
            - dynamodb:GetItem
            - dynamodb:PutItem
            - dynamodb:DeleteItem
            Resource: !GetAtt cmdbDynamoDBTable.Arn
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
        - Effect: Allow
          Principal: { Service: lambda.amazonaws.com }
          Action:
          - sts:AssumeRole

  # Lambda function to process CloudTrail logs
  CloudTrailEventProcessor:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: !Sub |
          import io
          import gzip
          import json
          import boto3
          import os

          def get_records(session, bucket, key):
              """
              Loads a CloudTrail log file, decompresses it, and extracts its records.
              :param session: Boto3 session
              :param bucket: Bucket where log file is located
              :param key: Key to the log file object in the bucket
              :return: list of CloudTrail records
              """
              s3 = session.client('s3')
              response = s3.get_object(Bucket=bucket, Key=key)

              with io.BytesIO(response['Body'].read()) as obj:
                  with gzip.GzipFile(fileobj=obj) as logfile:
                      records = json.load(logfile)['Records']
                      return records

          def handler(event, context):
              """
              Checks for API calls with RunInstances, TerminateInstances,
              CreateDBInstance and DeleteDBInstance in CloudTrail.
              if found, send specific records to SQS for processing

              :param event: S3:ObjectCreated:Put notification event
              :return: 200, success if records process successfully
              """
              session = boto3.session.Session()
              sqs = session.client('sqs')
              event_to_track =['RunInstances', 'TerminateInstances', 'CreateDBInstance', 'DeleteDBInstance']
              SQS_QUEUE_URL = os.environ['TASK_QUEUE_URL']

              # Get the S3 bucket and key for each log file contained in the event
              for event_record in event['Records']:
                  try:
                      bucket = event_record['s3']['bucket']['name']
                      key = event_record['s3']['object']['key']
                      print('Loading CloudTrail log file s3://{}/{}'.format(bucket, key))
                      records = get_records(session, bucket, key)
                      print('Number of records in log file: {}'.format(len(records)))
                      #process records and filter the relevent events
                      for record in records:
                          if record["eventName"] in event_to_track:
                              response = sqs.send_message(QueueUrl=SQS_QUEUE_URL,
                                          MessageBody=json.dumps(record),
                                          DelaySeconds=1
                              )
                  except Exception as e:
                      print (e)
                      return {'Exception status': e}
                  else:
                      print("records processed successfully!!")

              return {
                  'statusCode': 200,
                  'body': json.dumps('records pushed successfully to SQS!!')
              }
      Handler: index.handler
      MemorySize: 1024
      Role: !GetAtt LambdaRoleForEventsProcessing.Arn
      Runtime: python3.7
      Timeout: 30  # max is 30 seconds
      Environment:
        Variables:
          TASK_QUEUE_URL: !Ref EventQueue

  # Permission for the S3 bucket to invoke the Lambda
  LambdaInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      FunctionName: !Ref CloudTrailEventProcessor
      SourceAccount: !Ref 'AWS::AccountId'

  # =================================================================
  # CloudTrail (logs)
  # =================================================================

  # S3 bucket where CloudTrail log files will be delivered
  CloudTrailBucket:
    Type: AWS::S3::Bucket
    Properties:
      NotificationConfiguration:
        LambdaConfigurations:
        - Function: !GetAtt CloudTrailEventProcessor.Arn
          Event: "s3:ObjectCreated:*"
          Filter:
            S3Key:
              Rules:
                - Name: prefix
                  Value: !Sub AWSLogs/${AWS::AccountId}/CloudTrail/
                - Name: suffix
                  Value: json.gz

  # Policy granting CloudTrail access to the S3 bucket
  CloudTrailBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref CloudTrailBucket
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: AWSCloudTrailAclCheck
            Effect: Allow
            Principal: { Service: cloudtrail.amazonaws.com }
            Action: s3:GetBucketAcl
            Resource: !GetAtt CloudTrailBucket.Arn
          - Sid: AWSCloudTrailWrite
            Effect: Allow
            Principal: { Service: cloudtrail.amazonaws.com }
            Action: s3:PutObject
            Resource: !Sub "${CloudTrailBucket.Arn}/AWSLogs/${AWS::AccountId}/*"
            Condition: # ensure we have control of objects written to the bucket
              StringEquals:
                s3:x-amz-acl: "bucket-owner-full-control"

  # Trail to gather logs for all regions
  CloudTrail:
    Type: AWS::CloudTrail::Trail
    Properties:
      IsLogging: true
      IsMultiRegionTrail: true
      IncludeGlobalServiceEvents: true
      S3BucketName: !Ref CloudTrailBucket
      EnableLogFileValidation: true
    DependsOn:
      # Wait for the S3 bucket policy to be created, which implies
      # that the bucket itself has been created
      - CloudTrailBucketPolicy

  # =================================================================
  # Queue for traping the events
  # =================================================================

  EventQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "SQSQueueForTrackingEvents"

  # =================================================================
  # Processing Queued event to DynamoDB
  # the next layer....
  # =================================================================

  cmdbDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        -
          AttributeName: "instanceId"
          AttributeType: "S"
      KeySchema:
        -
          AttributeName: "instanceId"
          KeyType: "HASH"
      ProvisionedThroughput:
        ReadCapacityUnits: "5"
        WriteCapacityUnits: "5"
      TableName: "CMDBv1"

  # Lambda function to process CloudTrail logs
  EventQueueProcessor:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: !Sub |
          import boto3
          import os
          import json
          from boto3.dynamodb.conditions import Key, Attr
          from botocore.exceptions import ClientError

          def handler(event, context):
              """
              process the API calls and persist in DynamoDB/ delete from DynamoDB.
              if id no found, ignore the delete item from DynamoDB

              :return: 200, success if records process successfully
              """
              REGION = os.environ['AWS_REGION']
              print(REGION)
              dynamodb = boto3.resource("dynamodb", region_name=REGION)
              table = dynamodb.Table('CMDBv1')
              
              #process the records from SQS to update DynamoDB.
              for rec in event['Records']:
                  try:
                      record = json.loads(rec["body"])
                      if (record["eventName"] == "RunInstances"):
                        #New instance added
                        eventID=record["eventID"]
                        accountId=record["userIdentity"]["accountId"]
                        eventTime=record["eventTime"]
                        eventSource=record["eventSource"]
                        awsRegion=record["awsRegion"]
                        instanceType=record["requestParameters"]["instanceType"]

                        #process each event and each event may have multiple items
                        for instance in record["responseElements"]["instancesSet"]["items"]:
                          response = table.put_item(
                            Item={'eventID': eventID,
                                  'account_id': accountId,
                                  'eventTime': eventTime,
                                  'eventSource':eventSource,
                                  'awsRegion':awsRegion,
                                  'instanceType': instanceType,
                                  'instanceId': instance["instanceId"],
                                  'privateDnsName': instance["privateDnsName"],
                                  'subnetId': instance["subnetId"],
                                  'vpcId':instance["vpcId"],
                                  'details':str(instance)
                            }
                          )
                      elif (record["eventName"] == "TerminateInstances"):
                        #instance terminated
                        for instance in record["responseElements"]["instancesSet"]["items"]:
                          instanceId = instance["instanceId"]
                          response = table.delete_item(Key={"instanceId": instanceId})
                      elif (record["eventName"] == "CreateDBInstance"):
                        #DB instance added
                        response = table.put_item(
                          Item={'eventID': record["eventID"],
                            'account_id': record["userIdentity"]["accountId"],
                            'eventTime': record["eventTime"],
                            'eventSource':record["eventSource"],
                            'awsRegion':record["awsRegion"],
                            'instanceType': record["requestParameters"]["dBInstanceClass"],
                            'instanceId': record["responseElements"]["dbiResourceId"],
                            'privateDnsName': record["responseElements"]["dBInstanceArn"],
                            'vpcId': record["responseElements"]["dBSubnetGroup"]["vpcId"],
                            'multiAZ': record["responseElements"]["multiAZ"],
                            'engine': record["responseElements"]["engine"],
                            'engineVersion': record["responseElements"]["engineVersion"],
                            'dBInstanceClass': record["responseElements"]["dBInstanceClass"],
                            'details':str(record["responseElements"])
                          }
                        )
                      elif (record["eventName"] == "DeleteDBInstance"):
                        #DB instance terminated
                        var = record["responseElements"]["dbiResourceId"]
                        response = table.delete_item(Key={"instanceId": var})
                      else:
                        print("Not a valid event to in queue")
                  except ClientError as e:
                    print(e.response['Error']['Message'])
                    return {'statusCode': e.response['Error']['Code'], 'body': (e.response['Error']['Message'])}
                  else:
                    print("PutItem/DeleteItem succeeded:", response)

              return {'statusCode': 200, 'body': json.dumps('Lambda function succeeded!!')}

      Handler: index.handler
      MemorySize: 1024
      Role: !GetAtt LambdaRoleForEventsProcessing.Arn
      Runtime: python3.7
      Timeout: 30  # max is 30 seconds

  # Permission for the SQS to invoke the Lambda
  LambdaInvokePermissionForSQS:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      Principal: sqs.amazonaws.com
      FunctionName: !Ref EventQueueProcessor
      SourceAccount: !Sub ${AWS::AccountId}


  LambdaFunctionEventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      Enabled: true
      EventSourceArn: !GetAtt EventQueue.Arn
      FunctionName: !GetAtt EventQueueProcessor.Arn

Outputs:
  CloudTrailS3Bucket:
    Value: !Ref CloudTrailBucket
    Export:
      Name: CloudTrailS3Bucket
  LambdaRoleForEventsProcessing:
    Value: !Ref LambdaRoleForEventsProcessing
    Export:
      Name: LambdaRoleForEventsProcessing

Permission CloudFormation

---
AWSTemplateFormatVersion: 2010-09-09
Description: "AWS Management Blog Addendum"
Resources:
  BlogManagedPolicy:
    Type: 'AWS::IAM::ManagedPolicy'
    Properties:
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: AllowReadWrite
            Effect: Allow
            Action:
              - s3:Get*
              - s3:List*
              - s3:Put*
            Resource: !Join ["", ["arn:aws:s3:::",!ImportValue CloudTrailS3Bucket, "/AWSLogs/", !Ref "AWS::AccountId", "/*"]]
      Roles:
        - !ImportValue LambdaRoleForEventsProcessing

The following diagram illustrates this workflow:

  1. When a new Amazon EC2 or Amazon RDS instance is created, it generates a CloudTrail event for tracking.
  2. The cross-region CloudTrail event moves to a centralized Amazon S3 bucket for event processing. The Amazon S3 bucket publishes the s3:ObjectCreated:put event to Lambda by invoking the Lambda function, as specified in the bucket notification configuration. Because the Lambda function’s access permissions policy includes permissions for Amazon S3 to invoke the function, Amazon S3 can invoke the function.
  3. Lambda executes the CloudTrailEventProcessor Lambda function by assuming the execution role created by AWS CloudFormation. The Lambda function reads the Amazon S3 events it receives as a parameter, determines where the CloudTrail object is, reads the CloudTrail object and processes the log records in the CloudTrail object. If the log includes a record with specific eventType values, it publishes the event to your Amazon SQS for further processing.
  4. Once events arrive at the Amazon SQS queue, it triggers the EventQueueProcessor Lambda function for persisting the created resource details.
  5. The EventQueueProcessor Lambda function captures the event from the queue, extracts the metadata that are critical for tracking the instance and sends the payload to DynamoDB.
  6. DynamoDB persists the resource details for instance lifecycle and, when the resource gets terminated, the item is removed from DynamoDB.

Conclusion

Using this approach, the Dow Jones Operations team is able to track their resource inventory automatically without manual intervention. Even the Asset Tracking system’s own resources are tracked by the Tracking system itself. Newly created, terminated or modified resources are updated in the inventory system within seven minutes. Above all, the Asset Tracking system is always in current status.

 

About the authors

Sacheen Shah is a Lead Engineer at Dow Jones. He lives in New Jersey, and helps engineering teams at Dow Jones with how best to deploy their solutions in Cloud using serverless technology and modern methodology. When he isn’t working, he likes playing console games, watching Sci-fi movies and spending time with his family.

 

 

Utsav Joshi is a Technical Account Manager at AWS. He lives in New Jersey and enjoys working with AWS customers in solving architectural, operational, and cost optimization challenges. In his spare time, he enjoys traveling, road trips and playing with his kids.