AWS Storage Blog

Consistent point-in-time restore for Amazon S3 buckets

Enterprises store increasing quantities of object data for use cases such as data lakes, document management systems, or media libraries. Performing consistent point-in-time restores for large datasets can be challenging, as existing approaches with full-restore from backup are time consuming and expensive. Alternatively, restoring individual objects to previous versions is prone to errors and delays the restore as engineers manually determine which object versions need to be restored.

Customers tell us they need an easy and programmatic way to determine bucket changes, and to restore to a consistent point in time with minimal effort. In such cases, combining Amazon S3 Versioning with Amazon EventBridge to build a near real-time event stream from Amazon S3 enables consistent point-in-time restore process at the bucket level.

In this blog, we describe design and implementation considerations. Our solution restores only changed objects to a point in time, and does so in a programmatic way to minimize restore times.

Use cases

Here are a few relevant enterprise use cases:

  • S3 buckets storing millions of objects, for example media libraries, analytics platforms and data lakes. Full restore in such cases is time-consuming and expensive.
  • S3-backed applications, such as content management systems, CAD/CAM/PLM applications, or software component management repositories storing software artifacts in Amazon S3 and metadata on Amazon EBS Such applications either rely on point-in-time restore at the storage layer to avoid rewriting the application or require synchronous backup/restore as a set.
  • Amazon EC2-based applications using Amazon S3 File Gateway for file protocol access to data in Amazon S3 with local caching.

Design

This design addresses the following requirements:

  • Ability to provide complete bucket inventory at any time and determine object changes since the specified timestamp.
  • Ability to restore that time ‘snapshot’ into the same/different bucket.

Amazon S3 Inventory reports provide bucket inventory that can be queried with Amazon Athena, and generated daily or weekly. While sufficient for auditing, our use case requires the ability to report on bucket inventory at any time. This is where Amazon EventBridge can help. With Amazon EventBridge, we can build near real-time event log stream, capturing relevant changes to S3 objects with at-least-once delivery. This allows us to maintain consistent bucket inventory for restore and real-time audit purposes, including expiration actions defined in S3 Lifecycle configuration.

Figure 1 - Near real-time event log stream for point-in-time restores of S3 buckets

Figure 1: Near real-time event log stream for point-in-time restores of S3 buckets

The components of our design are as follows (cf. Figure 1):

The restore process is efficient in that it only performs required changes, avoiding a bulk restore of every object with additional overhead and cost.

Implementation procedure

Our design can be implemented in five steps along Figure 1. Let’s get started!

1. Create S3 bucket with S3 Versioning and S3 Event Notifications

First, we create our source S3 bucket (in our example: aws-blog-files-source). Make sure to activate Bucket Versioning upon creation, as restore will only go back to when versioning and the near real-time event log stream were activated.

Activate Versioning upon bucket creation, as restore will only go back to when versioning and the near real-time event log stream were activated

With the launch of Amazon S3 Event Notifications with Amazon EventBridge, integration with EventBridge is single API call or a few clicks. Note that the principal ID is not reported with S3 Event Notifications. Enabling AWS CloudTrail logging in addition would give you the ability to cross-check.

With the launch of Amazon S3 Event Notifications with Amazon EventBridge, integration with EventBridge is single API call or a few clicks

For increased protection, we strongly recommend MFA on delete.

2. Set up near real-time event log stream with Amazon EventBridge and Kinesis Data Firehose

To build our near real-time event log stream, we create an Amazon Kinesis Data Firehose delivery stream (here: s3-events-firehose) which EventBridge uses as target. This stream delivers changes on our source bucket to a separate S3 destination. When configuring our delivery stream, we set Source to Direct PUT, Destination to Amazon S3, and create the target S3 bucket for our events (here: aws-blog-s3events).

We create an Amazon Kinesis Data Firehose delivery stream (s3-events-firehose) which EventBridge uses as target.

You can set the Amazon S3 buffer according to the size, usage pattern and restore point objective on your source bucket. In our example, Amazon Kinesis Data Firehose buffers up to 5 MiB of event data and delivers updates every 60 seconds.

You can set the S3 buffer according to the size, usage pattern and restore point objective on your source bucket

When dealing with large amounts of data, make sure to activate Amazon S3 compression in addition to S3 encryption, and consider enabling Kinesis Data Firehose transformation to convert event data to Parquet format. It can take up to a minute until the delivery stream status is updated.

It can take up to a minute until the delivery stream status is updated.

For the EventBridge setup, we will first configure an Event matching pattern that listens on the following S3 events in our source bucket:

  • Object Created
  • Object Deleted

Select the Firehose delivery stream as target to invoke when an event matches our pattern. Configure input using Input transformer. The first text box (Input Path) assigns specific event attributes to variables; the second (Input Template) defines the output (JSON) passed to the target.

Input transformer text box 1:

{“bucketName”:“$.detail.bucket.name”,“eventName”:“$.detail-type”,“eventTime”:“$.time”,“key”:“$.detail.object.key”,“sourceIPAddress”:“$.detail.source-ip-address”,“version”:“$.detail.object.version-id”}

Input transformer text box 2:

{“eventTime” : <eventTime>,“bucketName” : <bucketName>,“key” : <key>,“version” : <version>,“eventName” :<eventName>,“sourceIPAddress” : <sourceIPAddress>}

Select the Firehose delivery stream as target to invoke when an event matches our pattern. Configure input using Input transformer.

With this, we have built our near real-time event log stream! Kinesis Data Firehose will be delivering changes as per the specified buffer and duration, and we can start using the source bucket.

3. Crawl event log and populate AWS Glue Data Catalog

In order for Athena to query event logs and generate point-in-time inventory, we must first run an AWS Glue crawler to catalog our event log files in the Glue Data Catalog. Add a crawler (here: aws-blog-s3-crawler) and configure it to crawl new folders only. This is because Kinesis Data Firehose writes files to the destination S3 bucket partitioned by hour.

Define the crawler to run on demand. Alternatively, the crawler can run on a schedule. The downside of schedule is that, depending on when in the hour the crawler runs, the very latest partition may not have been crawled yet and Athena would not return the latest results.

We also create an AWS Glue database (here: aws_blog_s3events) used for Amazon Athena queries.

Select and run the crawler before continuing with query definition.

4. Create point-in-time query with Amazon Athena to AWS Glue database

If it is your first time running the Athena query in your AWS account, first create a query results location. We should then be able to see all object changes by running a simple SELECT query in Athena on the AWS Glue database (here: Sixteen versions for ten S3 objects).

If it is your first time running the Athena query in your AWS account, first create a query results location.

Note that the Amazon S3 console is displaying local time (UTC-04:00), while timestamps in S3 events and our Athena query results are in UTC. AWS API calls are logged upon initiation while S3 event notification timestamps reflect the completion of the API call. Timestamps shown on the S3 console may not be identical to the event logs. In our case, we’d like to restore back to what the bucket looked like at 2021-08-03T21:00:00.

The first Athena query, as follows, determines the objects that need to be restored back to prior version (here: files 1–5).

WITH versionAtTS AS 
(select a.bucketname, a.key, a.version, a.eventname, a.eventtime
from   ( select key, max(eventtime) maxeventtime
         from aws_blog_s3events where eventtime <= '2021-08-03T21:00:00Z' 
         group by key) b,
    aws_blog_s3events a
where  a.key = b.key
and a.eventtime = b.maxeventtime order by key asc),
latestVersion AS
(select a.bucketname, a.key, a.version, a.eventname, a.eventtime
from   ( select key, max(eventtime) maxeventtime
         from aws_blog_s3events 
         group by key) b,
    aws_blog_s3events a
where  a.key = b.key
and a.eventtime = b.maxeventtime order by key asc),
copylist AS
(select bucketname, key, version from versionAtTS where key not like '' and eventname not like 'Object Deleted' and version not in (select version from latestVersion))
select * from copylist

The first Athena query, as follows, determines the objects that need to be restored back to prior version

The second Athena query, as follows, determines which objects did not exist at point in time and should be deleted (here: files 9–10).

WITH 
deletedfiles AS 
(select a.bucketname, a.key, a.version, a.eventname, a.eventtime
FROM   ( select key, max(eventtime) maxeventtime
         FROM aws_blog_s3events WHERE eventtime <= '2021-08-03T21:00:00Z' AND eventname like 'Object Deleted'
         group by key) b,
    aws_blog_s3events a
WHERE  a.key = b.key
AND a.eventtime = b.maxeventtime
order by key asc), 
newfiles
AS
(SELECT distinct bucketname, key FROM aws_blog_s3events WHERE (EventName like 'Object Created') and 
key not in (select key FROM aws_blog_s3events where eventtime <= '2021-08-03T21:01:06Z'))
SELECT bucketname, key FROM deletedfiles WHERE key in (select key FROM aws_blog_s3events WHERE eventtime > '2021-08-03T21:01:06Z' AND eventname like 'Object Created') UNION SELECT * FROM newfiles

The second Athena query, as follows, determines which objects did not exist at point in time and should be deleted

5. Run batch restore job with Amazon S3 Batch Operations

To use S3 Batch Operations, first download the two Athena queries’ results, remove the header lines using a code editor, save and upload to Amazon S3.

Create the first job in S3 Batch Operations, with Manifest format as CSV. Select Manifest includes version IDs. Under Manifest object select the results file from our first Athena query, listing objects to be restored back to prior versions. Set Operation type as Copy. For Copy destination specify our source bucket (here: aws-blog-files-source). Configure Path to completion report destination to serve as your audit log.

Configure Path to completion report destination serving as audit log.

Create or select an IAM role with Amazon S3 permissions for Batch Operations, and run the job.

Create or select an IAM role with Amazon S3 permissions for Batch Operations, and run the job

To delete the objects created after our point in time for restore, we make use of a Lambda function and attach an IAM policy with permissions to delete objects. Here is a sample using Python 3.8.

import logging
from urllib import parse
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel('INFO')

s3 = boto3.client('s3')

def lambda_handler(event, context):
    """
    Deletes files that are provided via S3 Batch Operations
    :param event: The S3 batch event that contains details of
    the object to delete.
    :param context: Context about the event.
    :return: A result structure that Amazon S3 uses to interpret
    the result of the operation. When the result code is
    TemporaryFailure, S3 retries the operation.
    """
    
    # Parse job parameters from Amazon S3 batch operations
    invocation_id = event['invocationId']
    invocation_schema_version = event['invocationSchemaVersion']
    
    results = []
    result_code = None
    result_string = None
    
    task = event['tasks'][0]
    task_id = task['taskId']
    
    try:
        obj_key_parsed = parse.unquote(task['s3Key'], encoding='utf-8')
        obj_key = parse.unquote_plus(obj_key_parsed)
        bucket_name = task['s3BucketArn'].split(':')[-1]
        
        logger.info(f'Got task: delete the object ({obj_key})' \
            f'from the bucket ({bucket_name})')
        
        ## Confirm that the object does exist and valid permissions to access
        logger.info(f"Checking that object exists and valid permissions. \n"
                    + f"Object: s3://{bucket_name}/{obj_key}")
        try:
            obj = s3.head_object(Bucket = bucket_name, Key = obj_key)
        except ClientError as exc:
            err_code = exc.response['Error']['Code']
            logger.info("Exception raised")
            logger.info(f"Error code: {err_code}")
            if err_code == '404':
                result_code = 'PermanentFailure'
                result_string = f"Object {obj_key} " \
                                f"does not exist so quitting."
                raise Exception("Object does not exist")
            if err_code == '403':
                result_code = 'PermanentFailure'
                result_string = f"403 (Access Denied) error when " \
                                f"trying to access object {obj_key}." \
                                "Quitting."
                raise Exception("Access Denied")
        
        ## Confirm that bucket versioning is enabled
        try:
            logger.info("Checking bucket versioning state")
            response = s3.get_bucket_versioning(Bucket = bucket_name)
            if 'Status' in response and response['Status'] == 'Enabled':
                logger.info(f'Versioning is enabled on {bucket_name}')
            else:
                result_code = 'PermanentFailure'
                result_string = f"Bucket {bucket_name} is not " \
                            f"versioning enabled so quitting."
                raise Exception("Bucket Versioning not enabled")
            logger.debug(response)
            logger.warning(result_string)
        
        except Exception as error:
            print(f'Error while checking bucket versioning status: {error}')
            raise
        
        ## Delete the object        
        try:
            logger.info(f'Going to delete object: {obj_key} from bucket' \
                        f'{bucket_name}')
            response = s3.delete_object(Bucket = bucket_name, Key = obj_key)
            logger.info(f'Response: {response}')
            
            try:
                obj = s3.head_object(Bucket = bucket_name, Key = obj_key)
            except ClientError as exc:
                if exc.response['Error']['Code'] == '404':
                    result_code = 'Succeeded'
                    result_string = f"Successfully deleted object " \
                                     f"{obj_key}."
                    logger.info(result_string)
        except Exception as error:
            # Mark request timeout as a temporary failure so it will be retried.
            if error.response['Error']['Code'] == 'RequestTimeout':
                result_code = 'TemporaryFailure'
                result_string = f"Attempt to delete object " \
                                f"{obj_key} timed out."
                logger.info(result_string)
            else:
                raise
    except Exception as error:
        # Mark all other exceptions as permanent failures.
        result_code = 'PermanentFailure'
        result_string = str(error)
        logger.exception(error)
    finally:
        results.append({
            'taskId': task_id,
            'resultCode': result_code,
            'resultString': result_string
        })
    return {
        'invocationSchemaVersion': invocation_schema_version,
        'treatMissingKeysAs': 'PermanentFailure',
        'invocationId': invocation_id,
        'results': results
    }

We can now create a second Amazon S3 Batch Operations job, with Operation set to Invoke AWS Lambda function and Manifest object set to the results file from our second Athena query with objects that did not exist at restore point in time. Running the job will result in adding a delete marker to objects (here: file9.txt and file10.txt). This is because the delete_object API call adds delete markers to Amazon S3 objects when you specify bucket name and object ID only. You can permanently remove specific versions if you are bucket owner and extend the API call to include version ID.

After two S3 Batch Operations jobs, our S3 bucket represents an exact match with the bucket snapshot at the designated timestamp.

As a result of running the two S3 Batch Operations jobs, our S3 bucket represents an exact match with the bucket snapshot at the designated timestamp.

Cleaning up

Remember to delete example resources if you no longer need them, to avoid incurring future costs. This includes the source S3 bucket with S3 Event Notifications with EventBridge, the target S3 bucket for S3 events, Kinesis Data Firehose, AWS Glue crawler and database, AWS Lambda, and S3 Batch Operations jobs.

Conclusion

In this blog post, we showed how Amazon S3 Event Notifications with EventBridge can be used to create near real-time log of S3 object changes. We also showed how you can query this log and use S3 Batch Operations to restore S3 objects to a consistent point in time.

The solution provides the following benefits:

  • Ability to restore to a point in time in reliable, fast and cost-effective manner
  • Avoid the need to integrate rollback logic into our application.
  • Near real-time event capture enables easy up-to-date auditing of S3 activity and simplifies data recovery procedures in case of security breach.

Thank you for reading this blog post! Use the following links to learn more about some of the AWS services covered in this blog:

We are looking forward to reading your feedback and questions in the comments section.

Rostislav Markov

Rostislav Markov

Rostislav Markov is principal architect with AWS Professional Services. As migrations leader on the global delivery team, he works with global and strategic customers and partners on their largest transformation programs. Outside of work, he enjoys spending time with his family outdoors and exploring New York City culture.

Gareth Eagar

Gareth Eagar

Gareth Eagar is a Senior Data Architect for Data Lakes with AWS Professional Services, and author of the book “Data Engineering with AWS”. Gareth has over 25 years experience in the IT industry, and currently enjoys helping customers innovate and solve challenging problems by harnessing the power of their data. Outside of work, Gareth enjoys traveling and exploring new places with his family.