AWS Storage Blog

Post-process your transferred data with file-level logging in AWS DataSync

AWS DataSync is an online data transfer service that simplifies, automates, and accelerates moving data between on-premises storage systems and AWS Storage services, as well as between AWS Storage services. Customers use DataSync for a number of use cases, including migrating active datasets to AWS, archiving data to free up on-premises storage capacity, replicating data to AWS for business continuity, or transferring data to the cloud for analysis and processing. When a DataSync task completes, customers still have a local copy of the dataset transferred to the cloud that begins to fill up local storage. Customers want to free up their local storage resources, but want to be confident that they are only deleting or archiving data successfully transferred to AWS to avoid data loss.

On April 24, 2020, AWS announced the release of enhanced monitoring capabilities for AWS DataSync, including file-level logging of data transfers. This feature allows users to identify successfully transferred files, in addition to the results of DataSync’s integrity verification on those files. AWS DataSync records file-level logging in an Amazon CloudWatch log group, and CloudWatch Events are triggered at the end of your task. This set of features enables event-driven architectures to automate post-processing work on successfully transferred files after a task completes. This includes use cases such as reapers to clean up transferred files that you no longer need locally, or archiving services that move the files from hot to cold storage at the source. Files at the source can be removed or archived to free up valuable storage space, with the confidence that only files verified as successfully transferred are impacted.

The enhanced file-level logging in CloudWatch Logs captures the relative file path, but it does not capture the file server or the root path specified when configuring the source location. In this blog, I outline a solution that takes a specific DataSync task ID for an existing task and enables detailed logging on that task. As part of the solution, I demonstrate deploying an AWS Lambda function that consumes the file-level logging in CloudWatch, attaches the full path for the source file to the Amazon S3 object as metadata, and creates and populates an Amazon SQS queue with a list of the source paths of all successfully transferred files. You can consume the contents of this queue with any services that must perform post-processing on successfully transferred files. This provides two benefits to end users:

  • The ability to have an event-driven architecture for working with files on the source side based on files that have been verified as successfully transferred to Amazon S3.
  • The ability to capture the source file lineage of an object that you have transferred to Amazon S3 in its metadata.

Solution overview

Solution overview diagram using DataSync, CloudWatch, CloudFormation, Lambda, SQS, and S3 to post-process migrated data

The preceding architecture diagram displays the various components of the solution:

  1. AWS DataSync initiates a data transfer task execution. The DataSync agent copies the appropriate files from the source location to the destination location, which is an Amazon S3 bucket.
  2. The solution enables detailed logging for the AWS DataSync task. As it is writes objects to Amazon S3, it confirms data writes and performs checksum validations for those objects and writes the results to CloudWatch Logs.
  3. The DataSync CloudWatch Logs group triggers an AWS Lambda function whenever it finds the pattern “Verified file” in the detailed logging on the DataSync task execution.
  4. The Lambda function uses the contents of the CloudWatch Logs and the information stored in DataSync for the source location and reconstructs the full original file path. This path is added to the corresponding Amazon S3 object as metadata, allowing users to see an S3 object’s original location at any time.
  5. The Lambda function also places the full original file path into an SQS queue that can be consumed by downstream services for further processing on successfully transferred files.

Getting started

We use an AWS CloudFormation stack to create the following required resources:

  • Amazon CloudWatch Logs log group for the DataSync task.
  • Amazon SQS standard queue to hold the paths of verified files.
  • AWS Lambda function to create the full source file path, write it to an SQS queue, and then attach it to the Amazon S3 object as metadata.
  • AWS Lambda function to be used as an AWS CloudFormation custom resource for adding detailed logging to an existing DataSync task.
  • Amazon CloudWatch Logs subscription filter to send log data to the Lambda function for the solution
  • All necessary AWS Identity and Access Management (AWS IAM) roles and policies.

Prerequisites

Download the contents of the GitHub repository located here. In the root directory, run the “makeZip.sh” script to create two zip files: DataSync_log.zip and DataSync_log_prep.zip. The “makeZip.sh” script requires an environment with the pip package installer for Python in order to function. Place these two zip files and the DataSynclog.yml AWS CloudFormation template into an S3 bucket in your environment. Ensure that the user deploying the solution has access to the bucket containing these files. Note the name of the S3 bucket, as it will be an input parameter for the CloudFormation template.

Ensure that you have a DataSync task created that has a destination type of Amazon S3. Note the Task ID for the DataSync task that you wish to use for this solution, as it will be an input parameter for the AWS CloudFormation template. The stack deployment will fail with an error if the target location is not an S3 bucket.

Deploy the solution with AWS CloudFormation

Log into the AWS Management Console with your IAM user name and password. Navigate to the CloudFormation service. Click on the Create stack button in the upper right corner. Select With new resources (standard).

On the Create Stack page, add the Amazon S3 URL for the CloudFormation template object uploaded in the prerequisite steps, and choose the Next button in the bottom right.

On the Specify stack details page, provide the following information, and then choose Next:

  • Enter the Task ID from the prerequisite steps in the DataSyncTaskID text box.
  • In the CodeBucket text box, provide the name of the S3 bucket that the zip files were uploaded to in the prerequisites for this deployment.

On the Configure stack options page, leave all values as their defaults, and choose the Next button.

On the Review page, choose the check box next to the statement I acknowledge that AWS CloudFormation might create IAM resources at the bottom of the page, and choose Create stack.

Wait for the stack to reach a status of CREATE_COMPLETE. You can look at the Events tab to track the status of the creation of each element of the stack. The Resources tab provides the Physical IDs for each of the elements created.

Run an AWS DataSync task execution and explore the results

After deploying the solution, look at the Task logging tab on the AWS DataSync task that you specified when deploying the AWS CloudFormation template. Ensure that the Log level is set to Log all transferred objects and files, and that the CloudWatch Log group specified is the one created by the template. You can find the log group on the Resources tab of the AWS CloudFormation console as the “DataSyncLogGroup.”

Task logging tab of AWS DataSync task

Click Start in the upper right corner of the DataSync task in the console to start a DataSync task execution. Click on the History tab, and monitor the task execution until it completes. Once the job completes, navigate to the Amazon S3 console, and select an object that your AWS DataSync transferred into your S3 target location. Select the object, and on the Properties tab, select Metadata. You should see a metadata entry of “x-amz-meta-source-path” with the source path to the file that was copied into S3.

Metadata of source file copied to Amazon S3

Navigate to the Amazon SQS console and find the queue that the AWS CloudFormation template created. The queue name is located on the Resources tab of the CloudFormation console as the “VerifiedFileQueue.” In the upper right corner, click on Send and receive messages. In the Receive messages section of the page, click on the Poll for messages button.

Send and recieve messages in the Amazon SQS console

Select one of the messages returned, and click on the Body tab. You should see the source path of the synced file in the body of the message.

The body tab of a returned message contains the source path of the synced file in the body of the message

Consuming messages from the queue

Once you have created the Amazon SQS queue, you can consume the contents and take actions on the files successfully transferred to the source location. The following Python script retrieves the messages from the verified files queue, and has a section denoted with a comment where you insert processing logic. As a placeholder, the script currently has a line that prints out that it is processing the file. There are two ways that a script like this can be run:

  • If you run this script from an on-premises environment, make sure that you have credentials available that have access to retrieve and delete messages from the SQS queue. Also, ensure that the server that it runs from can access the storage location that the files were transferred from.
  • If your AWS environment can reach the on-premises storage location, you can modify the script to run as a Lambda function and subscribe to the Amazon SQS queue. This can enable you to process added entries. It can also be created as a Lambda function to be triggered on a schedule by Amazon EventBridge.
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs')
myQueue = sqs.Queue(url='INSERT SQS QUEUE URL HERE')

# Function to retrieve messages from queue.  Insert logic to take actions on each file where noted in comments
def process_messages(queue, max_number, wait_time):
	try:
		messages = queue.receive_messages(
			MessageAttributeNames=['All'],
			MaxNumberOfMessages=max_number,
			WaitTimeSeconds=wait_time
		)
		for msg in messages:
			process_file(msg.body)
	except ClientError as error:
		logger.exception("Couldn't receive messages from queue: %s", queue)
		raise error
	else:
		return messages
# Function to process messages.  Insert file processing logic in this function.
def process_file(filePath):
	print("Processing file " + filePath)
	### Insert logic here ###
# Function to delete messages from the queue after processing
def delete_messages(queue, messages):
	try:
		entries = [{'Id': str(ind), 'ReceiptHandle': msg.receipt_handle} for ind, msg in enumerate(messages)]
		response = queue.delete_messages(Entries=entries)
		if 'Successful' in response:
			for msg_meta in response['Successful']:
				logger.info("Deleted %s", messages[int(msg_meta['Id'])].receipt_handle)
	except ClientError:
		logger.exception("Couldn't delete messages from queue %s", queue)
	else:
		return response
		
# Retrieve messages from the queue, take action on the file in the message, and then delete messages from the queue
more_messages = True
while more_messages:
	received_messages = process_messages(myQueue, 10, 15)
	if received_messages:
		delete_messages(myQueue, received_messages)
	else:
		more_messages = False
print('Done.')

Cleaning up

Please complete the following step to remove the solution code from your environment.

  • Navigate to CloudFormation and find the stack that you deployed in the “Deployment” section of this blog. Select the stack and click on the Delete button in the upper right corner. Monitor the process to ensure that all resources are deleted.

Deletion of this stack removes the CloudWatch Logs log group for your AWS DataSync task added as part of this deployment, so future runs won’t capture file-level logging. If you still want to have DataSync logging captured, edit your DataSync task, specify the level of logging desired, and select a new log group for that logging.

Conclusion

In this blog post, I covered using an AWS Lambda function to process file-level logging in AWS DataSync to save the original paths of transferred files as Amazon S3 object metadata. This same Lambda function also adds this information to an Amazon SQS queue that you can use for post-processing source data. This solution provides an event-driven architecture for any operations that you must complete against files in the source location after you have successfully transferred them to S3. This includes archiving or reaping functionality, to ensure that customer’s do not use storage with data that they have moved to the AWS Cloud. It also provides the lineage of an object in S3 in its metadata.

This solution currently only captures files that AWS DataSync successfully copies to Amazon S3, but you can easily extend it to also capture file transfer failures as well. This solution is limited to DataSync tasks with S3 buckets as a target. However, using the model presented, you can extend it to support additional target locations supported by DataSync (Amazon EFS and Amazon FSx for Windows File Server).

This solution also requires an existing DataSync task before you can deploy it, but you can easily add creation of that task to the AWS CloudFormation template, if one does not already exist.

Thanks for reading this blog post on post-processing your migrated data. If you have any comments or questions, don’t hesitate to leave them in the comments section.