AWS Big Data Blog

Analyze daily trading activity using transaction data from Amazon Redshift in Amazon FinSpace

Financial services organizations use data from various sources to discover new insights and improve trading decisions. Finding the right dataset and getting access to the data can frequently be a time-consuming process. For example, to analyze daily trading activity, analysts need to find a list of available databases and tables, identify its owner’s contact information, get access, understand the table schema, and load the data. They repeat this process for every additional dataset needed for the analysis.

Amazon FinSpace makes it easy for analysts and quants to discover, analyze, and share data, reducing the time it takes to find and access financial data from months to minutes. To get started, FinSpace admins create a category and an attribute set to capture relevant external reference information such as database type and table name. After connecting to data source or uploading it directly through the FinSpace user interface (UI), you can create datasets in FinSpace that include schema and other relevant information. Analysts can then search the catalog for necessary datasets and connect to them using the FinSpace web interface or through the FinSpace JupyterLab notebook.

Amazon Redshift is a popular choice for storing and querying exabytes of structured and semi-structured data such as trade transactions. In this post, we explore how to connect to an Amazon Redshift data warehouse from FinSpace through a Spark SQL JDBC connection and populate the FinSpace catalog with metadata such as schema details, dataset owner, and description. We then show how simple it is to use the FinSpace catalog to discover available data and to connect to an Amazon Redshift cluster from a Jupyter notebook in FinSpace to read daily trades for Amazon (AMZN) stock. Finally, we will evaluate how well-executed were our stock purchases. We will do it by comparing our transactions stored in Amazon Redshift to trading history for the stock stored in FinSpace.

Solution overview

The blog post covers the following steps:

  1. Setup Amazon Redshift integration notebooks.
  2. Configure your FinSpace catalog to describe your Amazon Redshift tables.
  3. Use FinSpace notebooks to connect to Amazon Redshift.
  4. Populate the FinSpace catalog with tables from Amazon Redshift. Add description, owner, and attributes to each dataset to help with data discovery and access control.
  5. Search the FinSpace catalog for data.
  6. Use FinSpace notebooks to analyze data from both FinSpace and Amazon Redshift to evaluate trade performance based on the daily price for AMZN stock.

The diagram below provides the complete solution overview.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Setup Amazon Redshift integration notebooks

Download Jupyter notebooks covering Amazon Redshift dataset import and analysis. The code provided in this blog post should be run from the FinSpace notebooks.

  1. On the FinSpace console, choose “Open Notebook”
  2. Choose Git on the left navigation panel and select “Clone a Repository”. Paste the link to the repo with FinSpace examples.
  3. Navigate to amazon-finspace-examples/blogs/finspace_redshift-2021-09 and open both Jupyter notebooks that contain Amazon Redshift integration code. If prompted, select FinSpace PySpark kernel. In the next sections, we will update and run the notebooks to connect to Amazon Redshift and import datasets.

Configure your FinSpace catalog to describe your Amazon Redshift tables

FinSpace users can discover relevant datasets by using search or by navigating across categories under the Categories menu. Categories allow for cataloging of datasets by commonly used business terms (such as source, data class, type, industry, and so on). An attribute set holds additional metadata for each dataset, including categories and table details to enable you to connect to the data source directly from a FinSpace notebook. Analysts can browse and search attributes to find datasets based on the values assigned to them.

Complete the following steps to create a new subcategory called Redshift under the Source category, and create an attribute set called Redshift Table Attributes. In the following section, we use the subcategory and attribute set to tag datasets from Amazon Redshift. FinSpace users can then browse for the data from the Amazon Redshift source from the Categories menu and filter datasets in FinSpace for the tables that are located in the company’s Amazon Redshift data warehouse.

  1. On the FinSpace console, select Settings (gear icon).
  2. Choose Categories.
  3. Hover over the Data Source category and choose Edit.

  4. On the Edit Category page, hover over the Source category again and choose Add Sub-Category.
  5. Add Redshift as a source subcategory and Financial data from company's Amazon Redshift data warehouse as the description. Select DONE and SAVE.

Next, create an attribute set called Redshift Table Attributes to capture additional business context for each dataset.

  1. On the FinSpace console, choose Settings (gear icon).
  2. Choose Attribute Sets.
  3. Choose CREATE ATTRIBUTE SET.
  4. Create a new attribute set called Redshift Table Attributes.
  5. Add the following fields:

    1. Catalog – Data String type
    2. Schema – Data String type
    3. Table – Data String type
    4. Source – Categorization Source type

Use FinSpace notebooks to connect to Amazon Redshift

The notebooks downloaded as part of the “Setup Amazon Redshift integration notebooks” section provide the integration between FinSpace and Amazon Redshift. The steps below explain the code so you can run and extend as needed.
  1. Connect to the Spark cluster by running the following code:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
    finspace_clusters = FinSpaceClusterManager()
    finspace_clusters.auto_connect()
else:
    print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

After the connection is established, you see a connected to cluster message. It may take 5–8 minutes for the cluster connection to establish.

  1. Add the JDBC driver to Spark jars by running the following code:
%%configure -f
{ "conf":{
          "spark.jars": "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.0.0.7/redshift-jdbc42-2.0.0.7.jar"
         }
}

In this example, we use the latest driver version available (2.0). To download the latest JDBC driver, see Download the Amazon Redshift JDBC driver, version 2.0.

  1. Run cells 1.3–1.4 in the notebook (collapsed to improved readability) to add FinSpace helper classes found in public GitHub examples and to add utility functions.

Python helper classes help with schema and table creation, cluster management, and more. The utility functions help translate Amazon Redshift data to a FinSpace schema.

Next, you update the user group ID that should get access to the datasets, and update the Amazon Redshift connection parameters.

  1. On the FinSpace console, choose Settings (gear icon).
  2. Chose Users and User Groups.
  3. Select a group that should have access to the data and copy the group ID from the URL. In this example, we select ID for the Analyst Team group.
  4. On the Amazon Redshift console, open your cluster.
  5. Note the cluster endpoint information from the General information section.
  6. Note your database name, port, and admin user name in the Database configurations section.

If you don’t know your user name or password, contact your Amazon Redshift administrator.

Populate the FinSpace catalog with tables from Amazon Redshift

Now we’re ready to import table metadata from Amazon Redshift into FinSpace. For each table, we create a FinSpace dataset, populate the attribute set we created with the metadata about the table (catalog, schema, table names, and Redshift subcategory for the Source category), and associate the populated attribute set to the created dataset.

  1. Use spark.read to retrieve a list of tables and columns as a Spark DataFrame:
spark.read.format("jdbc").option("driver","com.amazon.redshift.jdbc42.Driver").option("url", urlStr).option("query", Query).load()

As a result, you get two DataFrames, tablesDF and schemaDF, containing a list of tables and associated metadata (database, schema, table names, and comments) as shown in the following screenshot.

  1. Get the attribute set Redshift Table Attributes that we created earlier by running finspace.attribute_set(att_name). We use its identifiers for populating the metadata for each dataset we create in FinSpace.
# Get the attribute set
sfAttrSet = finspace.attribute_set(att_name)

att_def = None
att_fields = None

# Get the fields of the attribute set
att_resp = finspace.describe_attribute_set(sfAttrSet['id'])

if 'definition' in att_resp: 
    att_def = att_resp['definition']
    
if 'fields' in att_def:
    att_fields = att_def['fields']
  1. Get an ID for the Redshift subcategory to populate the attribute set and identify the datasets with the Amazon Redshift source:
source_cls = finspace.classification('Source')

source_fields = finspace.describe_classification(source_cls['id'])
source_key = None

for n in source_fields['definition']['nodes']:
    if n['fields']['name'] == source_name: 
        source_key = n['key']

# this is the key for source in the Category
print(f'Source: {source_name} Key: {source_key}')

As an output, you get the source_key ID for the Redshift subcategory.

  1. Use list_dataset_metadata_by_taxonomy_node(taxonomyId, source_key) to get the list of existing datasets in FinSpace to avoid duplicating the data if an Amazon Redshift table already exists in FinSpace:
# Get all the datasets from Redshift (classification type Source, with values ‘Redshift’)
resp = finspace.client.list_dataset_metadata_by_taxonomy_node(taxonomyId=source_cls['id'], taxonomyNodeKey=source_key)

# Get a list of datasets to iterate over
datasets = resp['datasetMetadataSummaries']

# Build the lookup table for existing datasets from Redshift to avoid creating duplicates
types_list = []

for s in datasets:

        # end of the arn is the dataset ID
        dataset_id = os.path.basename(s['datasetArn'])

        # get the details of the dataset (name, description, etc)
        dataset_details_resp = finspace.client.describe_dataset_details(datasetId=dataset_id)

        dataset_details = None
        dataset_types   = None
        owner_info = None
        taxonomy_info = None
        
        if 'dataset' in dataset_details_resp:
            dataset_details = dataset_details_resp["dataset"]

        if 'datasetTypeContexts' in dataset_details_resp:
            dataset_types = dataset_details_resp["datasetTypeContexts"]

        if 'ownerinfo' in dataset_details_resp:
            owner_info = dataset_details_resp["ownerinfo"]

        if 'taxonomyNodesinfo' in dataset_details_resp:
            taxonomy_info = dataset_details_resp["taxonomyNodesinfo"]
            
        # Pull Redshift attribute set from the list of dataset_types

        # first check the definition, then extract the values against the definition
        # have the keys of values/labels as the column header?
        for dt in dataset_types:
            if (dt['definition']['name'] != att_name):
                continue

            dd = {
                'dataset_id' : dataset_id
            }

            # used to map the field name (id) to the tile seen in the UI
            field_map = {}

            # get the field titles for name
            for f in dt['definition']['fields']:
                field_map[f['name']] = f['title']

            # human readable, else the keys would be numbers
            for v in dt['values']:
                dd[field_map[v['field']]] = v['values']

            types_list.append(dd)

types_pdf = pd.DataFrame(types_list)

If you already have tables tagged with Redshift as a source, your output looks similar to the following screenshot.

  1. Set permissions and owner details by updating the following code with your desired values:
basicPermissions = [
"ViewDatasetDetails",
"ReadDatasetData",
"AddDatasetData",
"CreateSnapshot",
"EditDatasetMetadata",
"ManageDatasetPermissions",
"DeleteDataset"
]

# All datasets have ownership
basicOwnerInfo = {
"phoneNumber" : "12125551000",
"email" : "jdoe@amazon.com",
"name" : "Jane Doe"
}
  1. Create a DataFrame with a list of tables in Amazon Redshift to iterate over:
tablesPDF = tablesDF.select('TABLE_CATALOG', 'TABLE_SCHEMA', 'TABLE_NAME', 'COMMENT').toPandas()
  1. Run the following code to:
    1. Check if a table already exists in FinSpace;
    2. If it doesn’t exist, get table’s schema and create an attribute set;
    3. Add the description and the attribute set to the dataset (Catalog, Schema, Table names, and Source).
c = 0
create=True

# For each table, create a dataset with the necessary attribute set populated and associated to the dataset
for index, row in tablesPDF.iterrows():
    
    c = c + 1
        
    catalog = row.TABLE_CATALOG
    schema  = row.TABLE_SCHEMA
    table   = row.TABLE_NAME
    
    # do we already have this dataset?
    exist_i = None
    for ee_i, ee in types_pdf.iterrows():
        if catalog in ee.Catalog:
            if schema in ee.Schema:
                if table in ee.Table:
                    exist_i = ee_i

    if exist_i is not None:
        print(f"Table exists in FinSpace: \n{types_pdf.iloc[[exist_i]]}")
        continue

    # Attributes and their populated values
    att_values = [
        { 'field' : get_field_by_name(att_fields, 'Catalog'), 'type' : get_field_by_name(att_fields, 'Catalog', 'type')['name'], 'values' : [ catalog ] },
        { 'field' : get_field_by_name(att_fields, 'Schema'),  'type' : get_field_by_name(att_fields, 'Schema', 'type')['name'],  'values' : [ schema ] },
        { 'field' : get_field_by_name(att_fields, 'Table'),   'type' : get_field_by_name(att_fields, 'Table', 'type')['name'],   'values' : [ table ] },
        { 'field' : get_field_by_name(att_fields, 'Source'),  'type' : get_field_by_name(att_fields, 'Source', 'type')['name'],  'values' : [ source_key ] },
    ]

    # get this table's schema from Redshift
    tableSchemaPDF = schemaDF.filter(schemaDF.table_name == table).filter(schemaDF.table_schema == schema).select('ORDINAL_POSITION', 'COLUMN_NAME', 'IS_NULLABLE', 'DATA_TYPE', 'COMMENT').orderBy('ORDINAL_POSITION').toPandas()

    print(tableSchemaPDF)
    # translate Redshift schema to FinSpace Schema
    fs_schema = get_finspace_schema(tableSchemaPDF)

    # name and description of the dataset to create
    name = f'{table}'
    description = f'Redshift table from catalog: {catalog}'
    
    if row.COMMENT is not None:
        description = row.COMMENT
    
    print(f'name: {name}')
    print(f'description: {description}')

    print("att_values:")
    for i in att_values:
        print(i)

    print("schema:")
    for i in fs_schema['columns']:
        print(i)
    
    if (create):
        # create the dataset
        dataset_id = finspace.create_dataset(
            name = name,
            description = description,
            permission_group_id = group_id,
            dataset_permissions = basicPermissions,
            kind = "TABULAR",
            owner_info = basicOwnerInfo,
            schema = fs_schema
        )

        print(f'Created, dataset_id: {dataset_id}')

        time.sleep(20)

        # associate tha attributes to the dataset
        if (att_name is not None and att_values is not None):
            print(f"Associating values to attribute set: {att_name}")
            finspace.associate_attribute_set(att_name=att_name, att_values=att_values, dataset_id=dataset_id) 

Search the FinSpace catalog for data

Analysts can search for datasets available to them in FinSpace and refine the results using category filters. To analyze our trading activity in the next section, we need to find two datasets: all trades of AMZN stock, and the buy and sell orders from the Amazon Redshift database.

  1. Search for “AMZN” or “US Equity TAQ Sample” to find the “US Equity TAQ Sample – 14 Symbols 6 Months – Sample”.
  1. Go to All Data Views tab and select Details to open Data View Details page.
  2. Copy the Dataset ID and Data View ID.

We use these IDs in the next section to connect to the data view in FinSpace and analyze our trading activity.

Next, we find the trade_history dataset that we created from the Amazon Redshift table and copy its dataset ID.

  1. On the FinSpace console, choose Data Source under Categories and choose Redshift.
  2. Open the trade_history table.
  3. Copy the dataset ID located in the URL.

Users with permissions to create datasets can also update the dataset with additional information, including a description and owner contact information if those details have changed since the dataset was created in FinSpace.

Use FinSpace notebooks to analyze data from both FinSpace and Amazon Redshift

We’re now ready to analyze the data.

  1. Open the provided Analysis Jupyter notebook in FinSpace.
  2. Follow the steps covered in the previous section, Connect to Amazon Redshift from a FinSpace Jupyter notebook using JDBC, to connect to the FinSpace cluster and add a JDBC driver to Spark jars. Add helper and utility functions.
  3. Set up your database connection and date parameters. In this scenario, we analyze trading activity for January 2, 2021.
  4. Connect to Amazon Redshift and query the table directly. Import the data as a Spark DataFrame.
myTrades  = get_dataframe_from_database(dataset_id = dataset_id_db, att_name = db_att_name)

As a result, you get the data stored in the Amazon Redshift database as a Spark DataFrame.

  1. Filter for stock purchase transactions (labeled as P) and calculate an average price paid:
avgPrice = (myTrades.filter( myTrades.trans_date == aDate )
                    .filter(myTrades.trans_type == "P")
                    .select('price')
                    .agg({'price':'avg'}))
  1. Get trading data from the FinSpace Capital Markets dataset:
df = finspace.read_view_as_spark(dataset_id = dataset_id, view_id = view_id)
  1. Apply date, ticker, and trade type filters:
import datetime as dt
import pandas as pd

fTicker = 'AMZN'

pDF = (
    df.filter( df.date == aDate )
    .filter(df.eventtype == "TRADE NB")
    .filter(df.ticker == fTicker)
    .select('price', 'quantity')
).toPandas()
  1. Compare the average purchase price to the daily trading price and plot them to compare how close we got to the lowest price.
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 6))

pDF["price"].plot(kind="hist", weights=pDF["quantity"], bins=50, figsize=(12,6))
plt.axvline(x=avgPrice.toPandas(), color='red')

# Add labels
plt.title(f"{fTicker} Price Distribution vs Avg Purchase Price")
plt.ylabel('Trades')
plt.xlabel('Price')
plt.subplots_adjust(bottom=0.2)

%matplot plt

As a result, you get a distribution of AMZN stock prices traded on January 2, 2021, which we got from a dataset in FinSpace. The red line in the following graph is the average price we paid for the stock calculated from the transaction data stored in Amazon Redshift. Although we didn’t pay the highest price traded that day, we performed average, paying $1,877 per share versus the lowest price of $1,865.

Clean up

If your work with FinSpace or Amazon Redshift is complete, delete the Amazon Redshift cluster or the FinSpace environment to avoid incurring additional fees.

Conclusion

In this post, we reviewed how to connect the Amazon Redshift database and FinSpace in order to create new datasets in FinSpace using the table metadata from Amazon Redshift. We then explored how to look for available data in the FinSpace web app to find two datasets that can help us evaluate how close we got to the best daily price. Finally, we used FinSpace dataset details to import the data into two DataFrames and plot price distribution versus the average price we paid. As a result, we reduced the time it takes to discover and connect to datasets needed for analyzing trading transactions.

Download the import and analysis Jupyter notebooks discussed in this blog post on GitHub.

Extensions to other databases

You can use the same integration notebook to connect to Aurora MySQL and other AWS relational databases. To connect to Aurora MySQL, download MariaDB driver and replace connection URL with jdbc:mysql:aurora//YOURCLUSTER:PORT?user=USER&password=PASSWORD.

Visit the FinSpace user guide to learn more about the service, or contact us to discuss FinSpace or Amazon Redshift in more detail.


About the Authors

Mariia Berezina is a Sr. Launch Manager at AWS. She is passionate about building new products to help customers get the most out of data. When not working, she enjoys mentoring women in technology, diving, and traveling the world.

Vincent Saulys is a Principal Solutions Architect at AWS working on FinSpace. Vincent has over 25 years of experience solving some of the world’s most difficult technical problems in the financial services industry. He is a launcher and leader of many mission-critical breakthroughs in data science and technology on behalf of Goldman Sachs, FINRA, and AWS.