AWS Database Blog

Filter Amazon Aurora database activity stream data for segregation and monitoring

Most organizations need to monitor activity on databases containing sensitive information to ensure security auditing and compliance. Although some security operations teams might be interested in monitoring all activities like read, write, and logons, others might want to restrict monitoring to activities that lead to changes in data and data structures only. In this post, we show you how to filter, process, and store activities generated by Amazon Aurora database activity streams that are relevant for specific business use cases.

Database activity streams provide a near-real-time stream of activity in your database, help with monitoring and compliance, and are a free feature of Aurora.

A best practice is to process and store activity (audit) information outside of the database and provide access to this information on a need-to-know basis. Database activity streams capture database activity from Aurora and stream it to Amazon Kinesis Data Streams, which is created on behalf of your Aurora DB cluster. Multiple AWS services such as Amazon Kinesis Data Firehose and AWS Lambda can consume the activity stream from Kinesis Data Streams.

Solution overview

Applications can consume an activity stream for auditing, compliance and monitoring. However, as of this writing, database activity streams don’t segregate activity by type. Some organizations may want to audit or monitor specific activities like Data Definition Language (DDL) and Data Manipulation Language (DML) requests but want to exclude Data Query Language (DQL) requests. We show how a Kinesis Data Firehose delivery stream captures the activity data from a Kinesis data stream and invokes a Lambda function to segregate the activity type and store it in different folders in an Amazon Simple Storage Service (Amazon S3) bucket.

Database activity streams work similarly with Amazon Aurora PostgreSQL-Compatible edition and Amazon Aurora MySQL-Compatible edition, with some differences: Aurora MySQL doesn’t support synchronous audit event publishing, and there are minor differences in the information published per event, although the JSON structure and format is the same.

The following diagram illustrates the architecture of our solution.

Prerequisites

To use this solution, you need an AWS account with privileges to create resources like an Aurora DB cluster, Kinesis data stream, Firehose delivery stream, and Lambda function. For this post, we assume the Aurora cluster already exists. For more information about creating the Aurora cluster, see Getting started with Amazon Aurora. For more information about database activity streams, see Requirements for database activity streams.

When you enable a database activity stream, you create a Kinesis data stream, Firehose delivery stream, and other resources, which all incur additional costs. Make sure you review the pricing page to understand associated costs before you provision resources. If you’re testing the solution, please make sure you delete resources that you created after your tests are over.

Create a database activity stream

To set up your database activity stream, complete the following steps:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Select your Aurora cluster.
  3. On the Actions menu, choose Start database activity stream.
  4. For Master key, choose RDS-S3-Export.

You use this key to encrypt the activity stream data.

  1. For Database activity stream mode, select Asynchronous.
  2. For Apply immediately, select Apply immediately.
  3. Choose Continue.

This initiates the creation of the activity stream and the associated Kinesis data stream. After the activity stream is configured, activity data is encrypted using the RDS-S3-Export key. This activity data is streamed through the Kinesis data stream.

Sample activity stream records

The following code is a sample of activity stream records. For details about the structure of Aurora PostgreSQL and Aurora MySQL activity stream records, see Monitoring database activity streams.

{
    "type": "DatabaseActivityMonitoringRecords",
    "version": "1.1",
    "databaseActivityEvents": "AYABeMjnoQWkYmD+8jhdLBSkzsUAAAABAAJCQwAbRGF0YUtleQAAAIAAAAAM/SldfBEyqeozTNECADBcPoF/3pnTIIR7wOsZSvYoANOJuC5N9npmPLvwXoyBcHGe3Sq29y1ZrEApaL6bZdQCAAAAAAwAABAAAAAAAAAAAAAAAAAAzDeB2utzcJt/dDQ5NlJ1Cf////8AAAABAAAAAAAAAAAAAAABAAAArIJKzPyFsmzvXcUS5fYx+ebhAN+fJwjK/AOOFiqLlUKjTeWA/Al6I46GUB2CDyIXXeMJJ8e7Z3oz+3/9Dcw2eYVuugj+SBSm9KzTHKln1S+JxNz73gHf86bGArRDl6xWKes3mrP4HBByHd/Bm32rsbaFZjd/XqxtH84aKG7ycI+3kTCFJbSa4nQgk1czdbwspRHVn7fyKDP1kqMCknaYpZC6lK5jAuYGFaRTqMJKvsuBJuVJgHNJPmuNGXJw",
    "key": "AQIDAHi1Fwr8tVldEiD725/2qbuU7C8jvWknJ/cspwgqoGjPaQEE28f67OmmXQ4lGxwjAkkMAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM+PVn8ZtiziHr0oVHAgEQgDtgi7nEzQV27IhtQgaJuqR3R7x1u0RTEwKzHJR7uboEdlbQsDw2Nrmf3vCzgMjR048SkQQUNs9rpC4Q2g=="
}
{
    "type": "DatabaseActivityMonitoringRecords",
    "version": "1.1",
    "databaseActivityEvents": "AYABeIA5LB2rk0ZEPw/23P+ycJkAAAABAAJCQwAbRGF0YUtleQAAAIAAAAAMuJt3ULKht1fZtecVADCfan3XiN1yVWZ7Rw1REl5QeHDQzoMSj4ga4Aif8z72pIJBZTnXx0ZHdfRgdfd778ICAAAAAAwAABAAAAAAAAAAAAAAAAAAJ1LtY+vbHd7JKHDuwNNjqP////8AAAABAAAAAAAAAAAAAAABAAAArFKsMa7r3jWoK5m6DorTDlVhgxSMwtHSMb0Vd0S/ZtONyeD3FwlJmtIai9KZKqlW4xQ7TyyMr/Y35S0iHo/av8jYr0ROmse3L9aa+8fhtsk5wfv02wjEyXZVudXM8gCgEDnhf9iw2O1oIILScgsnWcMm/SCANoGMumwc54nIqLIvPXXc9GZmuEQnNPuUoR7EuF1XjOxcJNgxtOBUGECdXOwQ3FzvpxXK95c0kdh6/HrEsJq7wQuPxojiKUna",
    "key": "AQIDAHi1Fwr8tVldEiD725/2qbuU7C8jvWknJ/cspwgqoGjPaQEE28f67OmmXQ4lGxwjAkkMAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM+PVn8ZtiziHr0oVHAgEQgDtgi7nEzQV27IhtQgaJuqR3R7x1u0RTEwKzHJR7uboEdlbQsDw2Nrmf3vCzgMjR048SkQQUNs9rpC4Q2g=="
}

Lambda function

The Lambda function is invoked by Kinesis Data Firehose. We configure the Lambda event source after we create the function. Aurora activity data is Base64 encoded and encrypted using an AWS Key Management Service (AWS KMS) key. For this post, we decrypt the data and store it in an appropriate folder in the S3 bucket based on activity type. However, we recommend encrypting the activity data before storing it in the folder, or enabling encryption at the bucket level. We also discard heartbeat events and store other events in specific folders like CONNECT, DISCONNECT, ERROR, and AUTH FAILURE. Pay attention to comments within the Lambda code because they highlight activities like decryption, filtering, and putting data in a specific folder. For instructions on creating a Lambda function, see the AWS Lambda Developer Guide.

The following is the sample Lambda function code in Python 3.8 using Boto3. This code depends on the aws_encryption_sdk library, which needs to be packaged as part of the Lambda code. aws_encryption_sdk depends on the cryptography library, which is OS dependent. For Lambda, Linux cryptography libraries should be included in the Lambda package.

When choosing an existing or new Lambda execution role, make sure role has access to the S3 bucket, Kinesis Data Firehose, and AWS KMS. It’s always a best practice to follow the principle of least privileges.

The following Lambda code can be found at AWS Samples on GitHub:

#This Lambda function reads the Kinesis Data Firehose records as Input, decrypt the log records using KMS key, unzip the records and then categories the event type into S3 folder structure. 
from __future__ import print_function
import json
import boto3
import base64
import zlib
import os 
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import datetime

REGION_NAME = os.environ['region_name'] # 'us-east-1'
RESOURCE_ID = os.environ['resource_id'] #'cluster-ABCD1234'
BUCKET_NAME = os.environ['bucket_name'] # 'dastestbucket'

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)
kms = boto3.client('kms', region_name=REGION_NAME)
s3 = boto3.client('s3')
todays_date = datetime.datetime.now()

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"
    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj
    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
    def _get_raw_key(self, key_id):
        return self.wrapping_key

def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    #Decrypt the records using the master key.
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext

def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    #Decompress the records using zlib library.
    decrypted = zlib.decompress(decrypted, zlib.MAX_WBITS + 16)
    return decrypted

#Lambda Handler entry point
def lambda_handler(event, context):
    output = []
    print("Received event: " + json.dumps(event, indent=2))
    for dasRecord in event['records']:
        recID = dasRecord['recordId']
        data = base64.b64decode(dasRecord['data'])
        # Do processing here
        val = processDASRecord(recID,data)
        #Record count has to match when we return to Firehose. If we don’t want certain records to reach destination – result should be equal to Dropped. 
        if len(val)>0:
            output_record = {
                'recordId': dasRecord['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(val).encode("utf-8"))
            }
        else:
            output_record = {
                'recordId': dasRecord['recordId'],
                'result': 'Dropped',
                'data': base64.b64encode(b'this is a dropped event')
            }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}

def processDASRecord(rID, rec):
    record = json.loads(rec)
    if record['type'] == 'DatabaseActivityMonitoringRecords':
        dbEvents = record["databaseActivityEvents"]
        dataKey = base64.b64decode(record["key"])
        try:
            #Decrypt the envelope master key using KMS
            data_key_decrypt_result = kms.decrypt(CiphertextBlob=dataKey, EncryptionContext={'aws:rds:dbc-id':RESOURCE_ID})
        except Exception as e:
            print(e)
            raise e

        try:
            plaintextEvents = decrypt_decompress(base64.b64decode(dbEvents), data_key_decrypt_result['Plaintext'])
        except Exception as e:
            print(e)
            raise e
        
        retObj = []
        #parse thru all activity and categorize it.
        try:
            events = json.loads(plaintextEvents)
            for dbEvent in events['databaseActivityEventList']:
                #filter out events which you don't want to log.
                if dbEvent['type']== "heartbeat": #or  eventType == "READ":
                    print ("Heart beat event - ignored event, dropping it.")
                    continue

                if not (dbEvent.get('command') is None):
                    eventType = dbEvent['command']
                    #use this section to log all events in separate S3 folder. 
                    #parse and write individual type of events to separate S3 folders. 
                    s3suffix = '/' + str(todays_date.year) + '/' + str(todays_date.month) + '/' + str(todays_date.day) + '/' + rID + '.txt' 
                    s3.put_object(Body=json.dumps(dbEvent, ensure_ascii=False), Bucket=BUCKET_NAME, Key = 'parsed/'+ eventType + s3suffix )

               retObj.append(dbEvent)

        except Exception as e:
            print (e)
            raise e
        
        return retObj

Deploy the function

You can use console to deploy the Lambda function or use the following AWS Serverless Application Model (AWS SAM) template to deploy it. The entire package is available in AWS Samples on GitHub.

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: >-
  This Lambda function reads the Kinesis Data Firehose records as Input, decrypt the log records using KMS key, 
  unzip the records and then categories the event type into S3 folder structure
Parameters: 
  BucketNamePrefix:
    Type: String
    Default: dastestbucket
  KeyName:
    Type: String
  RegionName:
    Type: String
    Default: us-east-1
  AuroraResourceID:
    Type: String

Resources:
  dasProcessor:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: python3.8
      CodeUri: .
      Description: >-
        An Amazon Kinesis Data Firehose stream processor that accesses the records in
        the input and returns them with a processing status.  Use this processor
        for any custom transformation logic.
      MemorySize: 128
      Timeout: 3
      Environment:
        Variables:
          bucket_name: !Ref BucketNamePrefix
          region_name: !Ref RegionName
          resource_id: !Ref AuroraResourceID
      Policies:
          - CloudWatchPutMetricPolicy: {}
          - KMSDecryptPolicy:
              KeyId: ${KeyName}
          - S3WritePolicy:
              BucketName: ${BucketNamePrefix}

Sample parsed records

The following is a sample record parsed by the preceding Lambda function for a CREATE INDEX event:

{
    "logTime": "2021-06-04 02:22:47.218201+00",
    "statementId": 4,
    "substatementId": 1,
    "objectType": null,
    "command": "CREATE INDEX",
    "objectName": null,
    "databaseName": "compass",
    "dbUserName": "compass",
    "remoteHost": "10.0.1.108",
    "remotePort": "42216",
    "sessionId": "60b98a86.4f64",
    "rowCount": null,
    "commandText": "CREATE INDEX scale_slow ON scale_data (section, id1, id2);",
    "paramList": [],
    "pid": 20324,
    "clientApplication": "psql",
    "exitCode": null,
    "class": "DDL",
    "serverVersion": "2.7.2",
    "serverType": "PostgreSQL",
    "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
    "serverHost": "10.0.4.153",
    "netProtocol": "TCP",
    "dbProtocol": "Postgres 3.0",
    "type": "record",
    "startTime": "2021-06-04 02:12:04.649566+00",
    "errorMessage": null
}

Validate the data stream

After you create the activity stream, you can go to the Kinesis Data Streams console to see the stream details.

Create a Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Delivery stream name, enter a name.
  3. For Choose a source, select Kinesis Data Stream.
  4. For Kinesis data stream, choose the data stream you created when you created your database activity stream.
  5. Choose Next.
  6. For Transform source records with AWS Lambda, select Enable.
  7. For Lambda function, choose the function you created.
  8. Choose Next.
  9. For Destination, select Amazon S3.
  10. For S3 bucket, choose your S3 bucket.

S3 bucket folder structure

Our Lambda function creates the following folder structure based on the event type. You can customize this in the Lambda code before writing the object to Amazon S3.

Clean up

When you’re done testing the solution, make sure you delete the resources you created to avoid ongoing charges. If you have deploying the solution through AWS CloudFormation or Serverless Application Model, use the CloudFormation documentation to clean up the resources when not needed.

If you have created the resources manually from AWS console:

  • Go to RDS console and disable DAS (Refer to Step 3 above).
  • From Kinesis page, delete Kinesis Firehose delivery stream and Kinesis Data Stream.
  • From Lambda console, delete the Lambda function, although keeping the Lambda function will not incur any cost.
  • Delete the S3 bucket used in this solution.

Conclusion

In this post, we demonstrated the process of capturing a database activity stream and filtering records based on business requirements and specific use cases. With this approach, you can customize the Lambda function to capture events that are important for your use case and discard whatever is irrelevant. You can also perform additional activities like sending notifications on specific events.

For any questions or suggestions about this post, leave a comment.


About the Authors

Gaurav Sharma is a Solutions Architect at AWS. He works with digital-native business customers, providing architectural guidance on various AWS services. He brings more than 24 years’ experience in database development, database administration, and solution architecture.

 

 

 

Vivek Kumar is a Solutions Architect at AWS based out of New York. He works with customers to provide technical assistance and architectural guidance on various AWS services. He brings more than 23 years of experience in software engineering and architecture roles for various large-scale enterprises.

 

 

 

Nitin Aggarwal is a Solutions Architect at AWS. He helps digital native customers architect data analytics solutions and provides technical guidance on various AWS services. He brings more than 16 years of experience in software engineering and architecture roles for various large-scale enterprises.