AWS Big Data Blog

Building a cost efficient, petabyte-scale lake house with Amazon S3 lifecycle rules and Amazon Redshift Spectrum: Part 2

In part 1 of this series, we demonstrated building an end-to-end data lifecycle management system integrated with a data lake house implemented on Amazon Simple Storage Service (Amazon S3) with Amazon Redshift and Amazon Redshift Spectrum. In this post, we address the ongoing operation of the solution we built.

Data ageing process after a month (ongoing)

Let’s assume a month has elapsed since walking through the use case in the last post, and old historical data was classified and tiered accordingly to policy. You now need to enter the new monthly data generated into the lifecycle pipeline as follows:

  • June 2020 data – Produced and consolidated into Amazon Redshift local tables
  • December 2019 data – Migrated to Amazon S3
  • June 2019 data – Migrated from Amazon S3 to S3-IA
  • March 2019 data – Migrated from S3-IA to Glacier

The first step required is to increase the ageing counter of all the Parquet files (both midterm and shortterm prefixes), using aws s3api get-object-tagging to check the current value and increasing by 1 with aws s3api set-object-tagging. This can be cumbersome if you have many objects, but you can automate it with Amazon S3 CLI scripts or SDKs like Boto3 (Phyton).

The following is a simple Python script you can use to check the current tag settings for all keys in the prefix extract_shortterm:

from boto3 import client 
import re 
conn = client('s3') 
def printtags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket=mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "ageing = ", tagvalue) 
return 
#below set parameters bucket and prefix accordingly with your env 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

This second Python script lists all current tag settings for all keys in the prefix extract_shortterm, increases by 1 ageing, and lists the keys and new tag values. If other tags were added to these objects prior to this step, this new tag overwrites the entire tagSet. The set object tagging operation is not an append, but a completely new PUT.

from boto3 import client 
import re
conn = client('s3') 
def updateTags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket = mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "Current ageing = ", tagvalue) 
            tagvalue = tagvalue+1 
            put_tags_response = conn.put_object_tagging(Bucket=mybucket, Key = key['Key'], Tagging = {'TagSet': [ { 'Key': 'ageing', 'Value': str(tagvalue) }, ] } ) 
return 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

To run the pipeline described before, you need to perform the following:

  1. Unload the December 2019 data.
  2. Apply the tag ageing to 6.
  3. Add the new Parquet file as a new partition to the external table taxispectrum.taxi_archive.

For the relevant syntax, see [part 1]. You can automate these tasks using an AWS Lambda function and use a monthly schedule.

Check the results with a query to the external table and don’t forget to remove unloaded items from the Amazon Redshift local table, as you did in part 1 of this series.

In this use case, we know exactly the mapping of June 2019 to the Amazon S3 key name because we used a specific naming convention. If your use case is different, you can use the two pseudo-columns automatically created in every Amazon Redshift external table: $path and $size. See the following code:

select pickup, 
    "$path", 
    "$size" 
from taxispectrum.taxi_archive 
where pickup between '2019-06-01 00:00:00' and '2019-06-30 23:59:59' 
limit 10
;

 The following screenshot shows our results.

The following screenshot shows our results. 

We’re migrating the March 2019 Parquet file to Glacier, so you should remove the related partition from the AWS Glue Data Catalog:

ALTER TABLE taxispectrum.taxi_archive
DROP PARTITION (yearmonth=‘2019-03’)
;

Right to be forgotten

One of the pillar rules of GDPR is the “right to be forgotten” rule—the ability for a customer or employee to request deletion of any personal data.

Implementing this feature for external tables on Amazon Redshift requires a different approach than for local tables, because external tables don’t support delete and update operations.

You can implement two different approaches.

In and out

In this first approach, you copy the external table to a temporary Amazon Redshift table, delete the specific rows, unload the temporary table to Amazon S3 and overwrite the key name, and drop the temporary (internal table).

Let’s assume that the drivers in the dataset are identified with column pulocid. We want to delete all records related to a driver identified with pulocid 129 and who worked between October 2019 and November 2019.

  1. With the following code (from Amazon Redshift), you can identify a every single row matched with specific single Parquet file:
    select pickup,
    	dropoff,
    	pulocid,
    	“$path”
    from taxispectrum.taxi_shortterm
    where pulocid = 129 and pickup between ‘2019-10-01 00:00:00’ and ‘2019-11-30 23:59:59’
    ;

The following screenshot shows our results.

 The following screenshot shows our results.

  1. When checking the applied tags, note the value associated to the ageing tag, or save the output of the following command in a temporary JSON file:
    aws s3api get-object-tagging \
    --bucket rs-lakehouse-blog-post \
    --key extract_shortterm/green_tripdata_2019-10000.parquet > \
    /tmp/oldtag.json
  2. Create two temporary tables, one for each of the two Parquet files matching the query (October and November):
    create table temporaryoct (like greentaxi);
  3. Copy the Parquet file to the local table, using the format as parquet attribute:
    copy temporaryoct
    from ‘s3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’
    iam_role ‘arn:aws:iam::123456789012:role/BlogSpectrumRole’
    format as parquet
    ;
  4. Delete the records matching the “right to be forgotten” request criteria:
    delete from temporaryoct 
    where pulocid = 129 and pickup between '2019-10-01 00:00:00' and '2019-11-30 23:59:59'
    ;
  5. Overwrite the Parquet file with the UNLOAD command (note the allowoverwrite option):
    unload ('select * from temporaryoct') 
    to 's3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’ 
    iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' 
    parquet parallel off allowoverwrite
    ;
  6. Drop the temporary table:
    drop table temporaryoct;

In more complex use cases, user data might span multiple months, and our approach might not be effective. In these cases, using Spark to process and rewrite the Parquet could be a better and faster solution.

In other use cases, the number of records to be deleted could be a majority. If so, as an alternative to the delete and unload steps, you could use CREATE EXTERNAL TABLE AS (CTAS). CTAS creates an external table based on the column definition from a query and writes the results of that query on Amazon S3.

Edit your own

The second option is to use an external editor to access the Amazon S3 file and remove specific records. You could use a Spark script with the following steps:

  1. Create a DataFrame.
  2. Import a Parquet file in memory.
  3. Remove records matching your criteria.
  4. Overwrite the same Amazon S3 key with the new data.

Building a simple data ageing dashboard

Sometimes data temperature is very predictable and based on ageing, but in some cases, especially when data is originated and accessed from different entities, it’s not easy to build a model to fit the best storage transition strategy. For these scenarios, you can use Amazon S3 analytics storage class analysis and Amazon S3 access logs.

Storage class analytis observes the infrequent access patterns of a filtered set of data over a period of time. You can use the analysis results to help you improve your lifecycle policies. You can configure storage class analysis to analyze all the objects in a bucket rr, and configure filters to group objects together for analysis by a common prefix (objects that have names that begin with a common string), by object tags, or by both prefix and tags. Filtering by object groups is the best way to benefit from storage class analysis.

To achieve a better understanding of how data is accessed (and who accessed it, and when) and build a custom tiering strategy, you can use Amazon S3 access logs. This feature doesn’t have any additional costs, but log retention incurs Amazon S3 storage costs. You first define a recipient to store the logs.

  1. Create a new bucket named rs-lakehouse-blog-post-logs.
  2. To set up S3 Server access logging on the source bucket rs-lakehouse-blog-post on the Amazon S3 console, on the Properties tab, choose Server access logging.
  3. For Target bucket, enter rs-lakehouse-blog-post-logs.
  4. For Target prefix, leave blank.
  5. Choose Save.

You first define a recipient to store the logs.

Let’s assume that after few days of activities, you want to discover how users and applications accessed the data.

  1. On the Amazon Redshift console, create an external table to map the Amazon S3 access logs:
    CREATE EXTERNAL TABLE taxispectrum.s3accesslogs(
        BucketOwner                   varchar(256), 
        Bucket                        varchar(256), 
        RequestDateTime               varchar(256), 
        RemoteIP                      varchar(256), 
        Requester                     varchar(256), 
        RequestID                     varchar(256), 
        Operation                     varchar(256), 
        Key                           varchar(256), 
        RequestURI_operation          varchar(256),
        RequestURI_key                varchar(256),
        RequestURI_httpProtoversion   varchar(256),
        HTTPstatus                    varchar(256), 
        ErrorCode                     varchar(256), 
        BytesSent                     varchar(256), 
        ObjectSize                    varchar(256), 
        TotalTime                     varchar(256), 
        TurnAroundTime                varchar(256), 
        Referrer                      varchar(256), 
        UserAgent                     varchar(256), 
        VersionId                     varchar(256)
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
        'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
    )
    STORED AS TEXTFILE
    LOCATION 's3://rs-lakehouse-blog-post-logs/'
    ;
  2. Check the AWS Identity and Access Management (IAM) policy S3-Lakehouse-Policy created in part 1 and add the following two lines to the JSON definition file:
    "arn:aws:s3:::rs-lakehouse-blog-post-logs",
    "arn:aws:s3:::rs-lakehouse-blog-post-logs/*"
  3. Query a few columns:
    select bucket, 
        key, 
        requestdatetime, 
        remoteip, 
        operation, 
        useragent 
    from taxispectrum.s3accesslogs
    ;

The following screenshot shows our results.

The following screenshot shows our results.

This report is a starting point for both auditing purposes and for analyzing access patterns. As a next step, you could use a business intelligence (BI) visualization solution like Amazon QuickSight and create a dataset in order to create a dashboard showing the most and least accessed files.

Cleaning up

To clean up your resources after walking through this post, complete the following steps:

  1. Delete the Amazon Redshift cluster without the final cluster snapshot:
    aws redshift delete-cluster –-cluster-identifier redshift-cluster-1
  2. Delete the schema and table defined in AWS Glue:
    aws glue delete-table \
        –-database-name blogdb \
        –-name taxi_archive 
        
    aws glue delete-table \
        –-database-name blogdb \
        –-name s3accesslogs 
        
    aws glue delete-database –-name blogdb
  3. Delete the S3 buckets and all their content:
    aws s3 rb s3://rs-lakehouse-blog-post –-force
    aws s3 rb s3://rs-lakehouse-blog-post-logs –-force 

Conclusion

In the first post in this series, we demonstrated how to implement a data lifecycle system for a lake house using Amazon Redshift, Redshift Spectrum, and Amazon S3 lifecycle rules. In this post, we focused on how to operationalize the solution with automation scripts (with the AWS Boto3 library for Python) and S3 Server access logs.


About the Authors

Cristian Gavazzeni is a senior solution architect at Amazon Web Services. He has more than 20 years of experience as a pre-sales consultant focusing on Data Management, Infrastructure and Security. During his spare time he likes eating Japanese food and travelling abroad with only fly and drive bookings.

 

 

Francesco MarelliFrancesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.