AWS Storage Blog
Monitoring CloudEndure Disaster Recovery with AWS Lambda
Many organizations must monitor and track their disaster recovery (DR) initiatives to meet business and audit objectives. CloudEndure Disaster Recovery provides fast and reliable recovery of physical, virtual, and cloud-based servers into AWS. It uses email alerts and has a high-level dashboard for monitoring DR jobs. However, many organizations require more robust reporting mechanisms. This includes graphical dashboards and up-to-the-minute status reports to satisfy audits and meet business service level agreements (SLAs).
CloudEndure’s API and integration with security information and event management (SIEM) tools, such as DataDog, provide additional customization. Presidio, an AWS Premier Partner, harnesses technology innovation and simplifies IT complexity to digitally transform businesses and drive return on IT investment. Presidio created a CloudEndure monitoring dashboard using AWS Lambda and DataDog. It presents a single pane of glass to monitor CloudEndure Disaster Recovery workloads and the AWS environment.
This post explains how to use the CloudEndure Disaster Recovery API with AWS Lambda functions to pull logs from CloudEndure and translate them into DataDog. The AWS Lambda functions, created by Presidio, use the CloudEndure API to authenticate and run GET commands on active CloudEndure Disaster Recovery jobs. The AWS Lambda functions are triggered on a regular interval determined by a CloudWatch Events schedule. The combination of DataDog and CloudEndure APIs provides a detailed report on the status of protected servers and email alerts. It also provides a single view into your CloudEndure Disaster Recovery workloads.
How the CloudEndure API works with AWS Lambda functions
This section explains how CloudEndure’s API token provides the code snippets that will run as AWS Lambda functions. This article assumes you have set up CloudEndure Disaster Recovery and have begun replication. You must also have a Datadog API setup and configured.
The following is a visual flow of how CloudEndure logs are posted to DataDog while using AWS Lambda functions for the automation components.
CloudEndure API
The following steps show you how to generate a CloudEndure API token from the CloudEndure console in the Other Settings tab under Setup & Info.
You can use the API Token to sign in (rather than your username and password) when making API calls. Click on GENERATE NEW TOKEN to generate a new API Token.
AWS Lambda functions
Log in to the AWS Management Console and search for the Lambda service. Select create function on the Lambda console. The following is an overview of the Lambda functions.
The handler.py Lambda function is triggered by a CloudWatch Events cron schedule. The handler is the main module of the project, and calls the function that gathers and sends logs to DataDog. It posts new data logs to DataDog based on the cron schedule, which is set up using Amazon CloudWatch Events.
All of the CloudEndure Disaster Recovery job status info is parsed by the ce_logs.py Lambda function for new events. It uses the CloudEndure API token for authentication with your CloudEndure project and retrieves the CloudEndure event logs. These logs are returned back to the handler.py function to be filtered and then sent to the DataDog API.
The post_data_to_datadog.py Lambda function retrieves the encrypted DataDog API key from the SSM Parameter Store. It then posts data to DataDog using the key for authentication. After sending the data to DataDog, the Lambda automation’s job is complete.
The next two Lambda functions’ (send_latest_dr_events.py and send_latest_repl_events.py) main role is to send only the latest update of events from CloudEndure. The logic ensures that no duplicate data is sent to DataDog.
The preceding Lambda functions are triggered on a regular interval determined by the CloudWatch events schedule. The idea is to retrieve new CloudEndure logs, filter for new events, and then post the CloudEndure logs to DataDog in a specific format.
handler.py
import requests
import json
import post_to_datadog
import ce_logs
import os
import boto3
import send_last_dr_event
import send_last_repl_event
def handler(event, context):
# Print event
print(json.dumps(event))
# Payload, URL, Headers, and post to CE login
project_dr_json, project_replication_json = ce_logs.get_ce_logs()
send_new_dr_events(project_dr_json)
send_new_repl_events(project_replication_json)
# Return 200 if process runs without error
return {"ResponseCode": 200}
def send_new_dr_events(project_dr_json):
# Loop through projects looking for new DR events
for project_log_dr in project_dr_json['items']:
has_new_event = send_last_dr_event.send_latest_dr_events(project_log_dr)
def send_new_repl_events(project_replication_json):
# Loop through projects looking for new DR events
for project_log_repl in project_replication_json['items']:
has_new_event = send_last_repl_event.send_latest_repl_events(project_log_repl)
ce_logs.py
import requests
import json
import post_to_datadog
import boto3
import os
#
# This module auth's with CE and gets project event logs
#
def get_ce_logs():
# SSM client and param get
ssm = boto3.client("ssm")
api_key = ssm.get_parameter(
Name=os.environ['CE_SSM_PARAMETER'],
WithDecryption=True
).get("Parameter").get("Value")
# Payload, URL, Headers, and post to CE login
ce_payload = {"userApiToken":api_key}
ce_url = 'https://console.cloudendure.com/api/latest/login'
headers = {'Content-Type' : 'application/json'}
response = requests.post(ce_url, data=json.dumps(ce_payload), headers=headers)
# Get cookies and start location
cookie_text = response.headers['Set-Cookie']
xToken = cookie_text[cookie_text.find("XSRF-TOKEN=")+12:cookie_text.find(";")-1]
startLoc = cookie_text.find("session=")
cookie_text_start_loc = cookie_text[startLoc:]
sCookie = cookie_text_start_loc[cookie_text_start_loc.find("session"):cookie_text_start_loc.find("; Secure")]
# Perform Get against CE project
ce_project_id = os.environ['CE_PROJECT_ID']
url_ce_projects = "https://console.cloudendure.com/api/latest/projects/" + ce_project_id + "/jobs"
payload_project = {'X-XSRF-TOKEN' : xToken, 'Cookie' : sCookie}
project_response = requests.get(url_ce_projects, headers=payload_project)
project_data = project_response.content
project_dr_json = json.loads(project_data)
# Get replication data
url_ce_projects_machines = "https://console.cloudendure.com/api/latest/projects/" + ce_project_id + "/machines"
payload_project_machines = {'X-XSRF-TOKEN' : xToken, 'Cookie' : sCookie}
project_response_machines = requests.get(url_ce_projects_machines, headers=payload_project_machines)
project_data_machines = project_response_machines.content
project_replication_json = json.loads(project_data_machines)
# Print for debug and return data back
# print(json.dumps(project_dr_json,indent=2))
return project_dr_json, project_replication_json
post_data_to_datadog.py
import json
import requests
import boto3
import os
#
# This module sets up a payload to send to DD through their https post api
#
def post_data_to_datadog(payload):
# Client for ssm and get api key
ssm = boto3.client("ssm")
api_key = ssm.get_parameter(
Name=os.environ['DD_SSM_PARAMETER'],
WithDecryption=True
).get("Parameter").get("Value")
# Host name set due to DataDog config and limitations for now
# Set all json payload values and post to DD using API Key
headers = {'DD-API-KEY': api_key, 'Content-Type': 'application/json'}
response_dd = requests.post('https://http-intake.logs.datadoghq.com/v1/input', data=json.dumps(payload,sort_keys=True,indent=2),headers=headers)
return True
send_latest_repl_events.py
import json
import post_to_datadog
import os
import boto3
from datetime import datetime
#
# Checks replication log info for new events
#
def send_latest_repl_events(project_log_repl):
# If project has no repl info, move on. It may be empty
if 'replicationInfo' in project_log_repl.keys() and len(project_log_repl['replicationInfo']['initiationStates']['items']) > 0:
# S3 identifier
id = project_log_repl['id'] + "_repl_data"
# S3 client
s3_client = boto3.client('s3')
# print("Project Repl Data: ", json.dumps(project_log_repl['replicationInfo']['initiationStates']['items'],indent=2))
# Perform Get on S3 Object, if it doesnt exist, handle exception
try:
response = s3_client.get_object(
Bucket=os.environ['S3_BUCKET'],
Key=id
)
# If there is an object, turn it into a string
last_event_recorded_s3 = json.loads(response['Body'].read())
except:
# If no S3 object exists, post all repl events to Log Storage
last_event_to_store_in_s3 = put_all_events(project_log_repl)
response = s3_client.put_object(
Bucket=os.environ['S3_BUCKET'],
Key=id,
Body=json.dumps(last_event_to_store_in_s3)
)
return True
# If S3 object exists, Compare S3 object's value to last event in project id
# If latest event has already been written to log storage, move on
last_event_to_store_in_s3 = create_replica_payload(project_log_repl['id'],project_log_repl['sourceProperties']['machineCloudId'],project_log_repl['replicationInfo']['initiationStates']['items'][-1]['steps'][-1])
# Debug
# print("Last event stored in S3: " + json.dumps(last_event_recorded_s3, indent=2))
# print("Last event to be stored: " + json.dumps(last_event_to_store_in_s3, indent=2))
# If last event is same, move on. Else post all events missing in DD logs
if last_event_recorded_s3 == last_event_to_store_in_s3:
print("Last Repl Events already in DD")
return False
else:
put_new_events( last_event_recorded_s3, project_log_repl )
response = s3_client.put_object(
Bucket=os.environ['S3_BUCKET'],
Key=id,
Body=json.dumps(last_event_to_store_in_s3)
)
return True
#
# Creates replication data payload to send to log storage
#
def create_replica_payload( project_id, ec2Id, project_log_repl_step ):
# Create payload for replica event
payload = dict()
payload = project_log_repl_step
payload['host']=os.environ['DD_HOST']
payload['service']='user'
payload['ddsource']='custom'
payload['ddtags'] = {'business_unit': 'managed services'}
payload['status'] = project_log_repl_step['status']
payload['CE-Action-Type']='Replication'
payload['id'] = project_id
payload['ec2InsanceId'] = ec2Id
return payload
#
# Gains location of most written log to DD/S3. Takes slice of current logs that are not yet written, and writes to log storage
#
def put_new_events( last_event_recorded_s3, project_log_repl ):
# Debug
# print("Steps in Repl Process: " + json.dumps(project_log_repl['replicationInfo']['initiationStates']['items'][-1]['steps'],indent=2))
# print("Last event recorded S3: " + json.dumps(last_event_recorded_s3,indent=2))
# Get index of last event written in S3/DD
index_of_latest = project_log_repl['replicationInfo']['initiationStates']['items'][-1]['steps'].index( { 'name': last_event_recorded_s3['name'], 'status': last_event_recorded_s3['status'] } )
# Debug
# print("Index of event is at: ", str(index_of_latest))
# print("Slice Being written to DD: ", project_log_repl['log'][index_of_latest+1:])
# For slice of data that does not exits in DD, write it to API
for project_dr_event in project_log_repl['log'][index_of_latest+1:]:
print(json.dumps(project_dr_event,indent=2))
payload = create_replica_payload(project_log_repl['id'],project_log_repl['sourceProperties']['machineCloudId'],project_dr_event)
post_to_datadog.post_data_to_datadog(payload)
#
# Used if no logs exist for this project yet. Writes all repl log events to log storage.
#
def put_all_events( project_log_repl):
# Debug
print ("Putting All Replica Events")
# For all steos in project, post to log storage
for step in project_log_repl['replicationInfo']['initiationStates']['items'][-1]['steps']:
payload = create_replica_payload(project_log_repl['id'],project_log_repl['sourceProperties']['machineCloudId'],step)
# print("New event to write to DD: " + json.dumps(payload, indent=2) )
post_to_datadog.post_data_to_datadog(payload)
# Store very last event in S3
last_event_to_store_in_s3 = create_replica_payload(project_log_repl['id'],project_log_repl['sourceProperties']['machineCloudId'],project_log_repl['replicationInfo']['initiationStates']['items'][-1]['steps'][-1])
return last_event_to_store_in_s3
send_latest_dr_events.py
import json
import post_to_datadog
import os
import boto3
#
# Sends latest DR log events to Log storage. Checks S3 objects for latest log written for current project
#
def send_latest_dr_events(project_log):
# Get job id
id = project_log['id']
# S3 client
s3_client = boto3.client('s3')
# Perform Get on S3 Object, if it doesnt exist, return true
try:
response = s3_client.get_object(
Bucket=os.environ['S3_BUCKET'],
Key=id
)
# If there is an object, turn it into a string
last_event_recorded_s3 = json.loads(response['Body'].read())
except:
# If no S3 object exists, write all log events for this project to Log storage and to S3
print('No DR events found for project, write all events')
last_event_written_to_s3 = put_all_events(project_log)
response = s3_client.put_object(
Bucket=os.environ['S3_BUCKET'],
Key=id,
Body=json.dumps(last_event_written_to_s3)
)
return True
# If S3 object exists, Compare S3 object's value to last event in project id
# If same event value, move on. If new, post to DD
# Debug
# print("Last Event In S3: " + json.dumps(last_event_recorded_s3, indent=2))
# Convert last event in log to payload for S3 comparison
last_event_in_log = create_dr_payload(project_log['id'], project_log['log'][-1] , project_log['participatingMachines'],project_log)
# Debug
# print("Last DR Event In Log: " + json.dumps(last_event_in_log, indent=2))
# If last event recorded equals last event in project log, return, as nothing needs to be written
if last_event_recorded_s3 == last_event_in_log:
print("Last DR event already in DataDog")
return False
else:
# Write new DR events if last event is not in S3
last_dr_event_log = put_new_dr_events(last_event_recorded_s3,project_log)
response = s3_client.put_object(
Bucket=os.environ['S3_BUCKET'],
Key=id,
Body=json.dumps(last_dr_event_log)
)
return True
# Method writes all logs of current project to log storage - If S3 oject doesnt already exist
def put_all_events(project_log):
# Loop through all events making payloads, then writing logs to DD
for project_dr_event in project_log['log']:
# Debug
# print(json.dumps(project_dr_event,indent=2))
# Create payload and post to DataDog
payload = create_dr_payload(project_log['id'], project_dr_event, project_log['participatingMachines'],project_log)
post_to_datadog.post_data_to_datadog(payload)
# Return last event as payload to store final log to s3
final_payload = create_dr_payload(project_log['id'], project_log['log'][-1] , project_log['participatingMachines'],project_log)
return final_payload
# Method to write only new events
def put_new_dr_events(last_event_recorded_s3, project_log):
# Find the index of where last event is in current log list
index_of_latest = project_log['log'].index( { 'logDateTime': last_event_recorded_s3['message']['Time'], 'message': last_event_recorded_s3['message']['CEMessage'] } )
# Debug
# print("Index of event is at: ", str(index_of_latest))
# print("Slice Being written to DD: ", project_log['log'][index_of_latest+1:])
# Use Slice to post new data to Log storage
for project_dr_event in project_log['log'][index_of_latest+1:]:
# Debug
# print(json.dumps(project_dr_event,indent=2))
payload = create_dr_payload(project_log['id'], project_dr_event, project_log['participatingMachines'],project_log)
post_to_datadog.post_data_to_datadog(payload)
# Return last event as a payload to store into S3
final_payload = create_dr_payload(project_log['id'], project_log['log'][-1], project_log['participatingMachines'],project_log)
return final_payload
# Function to create Distaster Recovery Playloas to be sent to DD
def create_dr_payload(job_id, project_dr_log_event, project_machines,project_log):
payload= dict()
payload['participatingMachines'] = project_machines
payload['host']=os.environ['DD_HOST']
payload['hostname'] = project_machines[0]
payload['status']='INFO'
payload['message']={'CE-JOB': job_id, 'STATUS': project_log['status'], 'Time': project_dr_log_event['logDateTime'], 'TYPE': 'RECOVERY_LAUNCH', 'origin': 'LambdaFunction', 'CEMessage': project_dr_log_event['message']}
payload['service']='user'
payload['ddsource']='custom'
payload['ddtags'] = {'business_unit': 'managed services'}
payload['CE-Action-Type']='Distaster-Recovery'
return payload
How the dashboard works in DataDog
This section includes screenshots of how the solution runs. First, launch a DR server in the CloudEndure console. After launching, you can see that the task fails. Log in to the CloudEndure console to see the job progress failure.
The DataDog dashboard shows the parsed failure message pulled from the CloudEndure console. It is using the handler.py Lambda function to pull this information.
If you properly set up your DataDog automation alerts, you can extend notifications to email alerts. For more info on this, work with your DataDog administrator or contact Presidio to assist you with this feature.
The CloudEndure Test Migration Dashboard outputs real-time monitoring of all of the servers protected by CloudEndure Disaster Recovery. All of this information is gathered using the Lambda functions post_data_to_datadog.py, send_latest_repl_events.py, and send_latest_dr_events on a regular CloudWatch event schedule. The dashboard can provide useful information like job completion, replication completion, timestamps of jobs completed, lag on any servers, creation of resources such as subnets, and detailed reports of server status.
Presidio can create fully customizable dashboards for any data analytics platform. While this blog shows examples of dashboards within DataDog, the same concepts can be applied to any data analytics platform.
Conclusion
In this blog post, we demonstrated using AWS Lambda to capture, parse, and send CloudEndure Disaster Recovery events logs to DataDog for a single pane-of-glass dashboard view. This information is crucial for monitoring business SLAs and can be used to satisfy audit checks.
If there are other use cases or functions you may be interested in, please reach out to our AWS Partner Presidio who would be happy to help you create them.
Thanks for reading this blog on monitoring CloudEndure Disaster Recovery with AWS Lambda. If you have any comments or questions, please leave them in the comments section.