AWS for Industries

Post Processing Capital Market and Insurance Grid output data in AWS

The computing needs of capital market and insurance firms have evolved with regulatory regimes requiring firms to develop more complex models that run more frequently and provide transparency. In response, firms are creating more complex models that require greater compute capacity. As financial institutions run their grid workloads on AWS, two core aspects stand out – the size of the data being generated by grid batch and the post-processing of grid output data using array-based operations with applied mathematical/statistical function on the array.

Most of the time these grid systems run in batch mode (for example overnight risk batch run). When running in batch mode, it’s common to see grid output data size over 1 TB. The second aspect of the result data is that the post-processing of the data involves array-based operations and applying a mathematical/statistical function on the array. A common example will be PnL array output from Grid, summing up that array based on some filter, and then calculating VaR from the new array. It’s difficult to perform this type of transformation using SQL/ BI tools because of arithmetic/other mathematical operations on the array data type, and then the application of user defined functions on the result set.

Here we present steps using AWS native services to simplify and scale the post-processing grid output data.

Architecture

redshift spectrum

Processing steps

You will need to follow these processing steps:

  1. Compute Grid engines write PnL data to Amazon Simple Storage Service(Amazon S3).
  2. AWS Glue reads PnL data from Amazon S3 and calculates VaR based on different filters (by Desk, Sector, etc.) using PySpark.
  3. From AWS Glue, the computed VaR numbers are stored in Amazon Redshift.
  4. Amazon Redshift also uses Amazon Redshift Spectrum to query the raw PnL result data stored in Amazon S3 from generating an ad-hoc report. The raw PnL data size is generally quite large (> 1 TB) and not frequently queried.
  5. Amazon QuickSight is used to generate a dashboard from Amazon Redshift.

Amazon Redshift is the Risk Data Warehouse here. For most risk systems, the result data output is much larger in size (often 10 to 100 times) when compared to the input data.

Sample input files

Trade data files

A B C D E F G
1 ticker stock_price strike maturity quantity trade_id desk
2 SIRI 62.66 65.793 3 1000 1999901 casheq-long
3 ERTS 13.57 13.0272 36 1000 1999902 delta-neutral
4 VLCM 54.39 49.4949 43 1000 1999903 delta-hedge
5 CMCSA 6.59 6.9195 23 500 1999904 vega-hedge
6 HAL 95.12 95.12 43 500 1999905 liquid-longmat
7 AET 25.09 27.0972 23 2000 1999906 casheq-long

S and P 500 industry sector description file

A B C
1 Symbol Name Sector
2 MMM 3M Company Industrials
3 AOS A.O. Smith Corporation Industrials
4 ABT Abbott Laboratories Health Care
5 ABBV AbbVie Inc. Health Care
6 ACN Accenture plc Information Technology

This is just to demonstrate the process. We understand that actual Trade Files will have many more fields.

Grid result file (PnL array of Trades)

Worksheet in Post Processing Capital Market and Insurance Grid output data in AWS

In the above sample file, each column is a trade ID (from Trade file) and each row is a scenario ID (10,000 scenario ID). Each cell is a value of the trade’PnL for that scenario ID.­­­

AWS Glue ETL Process to transform the result file 

We have used Python (version 3) for the AWS Glue script.

Here are the main steps for the AWS Glue script:

1> We import the required libraries:

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job

import numpy as np

import pandas as pd

import boto3

from awsglue.dynamicframe import DynamicFrame

sc = SparkContext.getOrCreate()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

2> Here we read the Trade PnL result files in Spark:

result_bucket_name = ‘resultdata-990764378933’

result_bucket_url = ‘s3://’ + result_bucket_name + ‘/’

rdd_trade = spark.read.csv(result_bucket_url + ‘tradefiles/<your trade file name>’,header=True).rdd

rdd_spnames = spark.read.csv(result_bucket_url + ‘tradefiles/sp500names.csv’,header=True).rdd

3> There could be many result files under the result bucket + key. As the Grid saves results from different grid workers (not shown here in the architecture), there will be multiple result files. They must be included in the post-processing. Here we read all of the result files that are in the scope for this post-processing:

s3 = boto3.resource(‘s3′)
bucket_name = result_bucket_name
bucket = s3.Bucket(bucket_name)
pl_df = None
for obj in bucket.objects.filter(Delimiter=’/’, Prefix=’resultstore/’):
print(obj.key)
temp_pl_df = spark.read.csv(‘s3://’+ bucket_name + ‘/’ + obj.key, header=True)
if pl_df:
pl_df = pl_df.join(temp_pl_df, pl_df._c0 == temp_pl_df._c0,’inner’ ).drop(temp_pl_df[‘_c0’])
else:
pl_df = temp_pl_df

4> We wrote a very simple VaR utility function. This function will be called later to get VaR for a set of trades filtered by some trade attributes.

import numpy as np
#here columns are the trade ids.
#pl_df contains all trades arry (from step 3)
#Arrays are added in a loop, each time with a mini bacth, otherwise fora very large data
#frame we may encounter out of memory issue.
#once the Arrays are summed, array is sorted and a position is the array is chosen to get
#the VaR number
def calc_var(cols,conf=0.1):
batch_size = 50
low = 0
high = batch_size # if len(cols) > batch_size else len(cols)
total = None
loop = True
while loop:
if high > len(cols):
high = len(cols)
loop = False

local_pl = np.array(pl_df.withColumn(“total”, sum(pl_df[col] for col in cols[low:high])).select([‘total’]).toPandas()[‘total’])
if total is not None:
total += local_pl
else:
total = local_pl

low = high
high = high + batch_size

total.sort()
arry_pt =int(conf* len(total))
var = total[arry_pt]
return var

5> Here is a wrapper function to call calc_var (defined in Step 4) for each grouping (explained in the next step) and return a list of calculated numbers:

def get_var_from_group(trade_grp):
var_list = []

for row in trade_grp.collect():
var = calc_var(row[1])
print (row[0], var)
var_dict = {}
var_dict[‘grouping’] = row[0]
var_dict[‘var’] = float(var)
var_dict[‘conf’] = float(0.1)
var_list.append(var_dict)
return var_list

6> We define a function to store the results from AWS Glue dynamic frame into Amazon Redshift. %connection is an AWS Glue magic function. In the AWS Glue console we defined a network connection AWS gluevpcconnection (Vpc Subnet where we want to run the AWS Glue script). We also defined an AWS Glue catalog_connection to

Amazon Redshift.

%connections gluevpcconnection

def store_var_into_redshift(var_list):
result_df = pd.DataFrame(var_list)
result_df_dyn = DynamicFrame.fromDF(spark.createDataFrame(result_df), glueContext, “var_store”)
result_df_dyn.resolveChoice(specs=[(“var”,”cast:float”),(“conf”,”cast:float”)])
glueContext.write_dynamic_frame.from_jdbc_conf(frame = result_df_dyn,
catalog_connection = “glueredshiftconn”,
connection_options = {“dbtable”: “var_store”, “database”: “dev”},
redshift_tmp_dir = result_bucket_url + “gluetempdir/”)

7> Now we group the trades and call the get_var_from_group function from Step 5 to calculate VaR. After the calculations, we call the results calling store_var_into_redshift(defined in step 6)

Here we calculate VaR numbers by sector.

from pyspark.sql import functions as F

spnames_df = rdd_spnames.toDF()

trades_df = rdd_trade.toDF()

detailed_trade_df = spnames_df.join(trades_df, spnames_df.Symbol ==trades_df.ticker,”inner” )

sector_trade_grp = detailed_trade_df[‘trade_id’,’Sector’].groupby(‘Sector’).agg(F.collect_set(‘trade_id’))

var_list = get_var_from_group(sector_trade_grp)

store_var_into_redshift(var_list)

Here we calculate VaR numbers by desk.

desk_trade_grp = rdd_trade.toDF()[‘trade_id’,’desk’].groupby(‘desk’).agg(F.collect_set(‘trade_id’))

var_list = get_var_from_group(desk_trade_grp)

store_var_into_redshift(var_list)

The following is the sample Amazon Redshift table containing the VaR numbers for each group of trades. As we described in the previous step, we grouped the trades by sector and also by desk. In a real production environment, there could be various levels of grouping and a hierarchy of grouping (desks could be further grouped into Business Units, etc.).

A B C
1 grouping var conf
2 Consumer Discretionary -347843 0.1
3 Energy -165757 0.1
4 Financials -342638 0.1
5 Information Technology -318135 0.1
6 Materials -173281 0.1
7 Real Estate -258028 0.1
8 Telecommunication Services -93403.7 0.1
9 Utilities -177862 0.1
10 casheq-long -419156 0.1
11 delta-hedge -442783 0.1
12 delta-neutral -464063 0.1
13 liquid-longmat -415884 0.1
14 vega-hedge -489351 0.1

Amazon Redshift Spectrum to query the raw result data

In many scenarios, raw result output data isn’t frequently used for analytics or reporting. However, to support ad-hoc queries, we must run SQL on the files stored in Amazon S3. Here we give an example of where we used Amazon Redshift Spectrum to join tables in Amazon Redshift with result files stored in Amazon s3. We transpose the original result files to store the trade IDs in rows (instead of columns).

The advantage of this approach is that the Amazon Redshift data node size isn’t increased by saving very large but less frequently used raw result data.

create external table raw_pl_schema.pos_pl(

idx smallint,

sim_id smallint,

pos_id smallint,

pnl decimal(8,2))

row format delimited

fields terminated by ‘\,’

stored as textfile

location <s3 result files location>

table properties (‘skip.header.line.count’ = ‘1’);

select sum(pnl) from raw_pl_schema.pos_pl where

pos_id in (

select trade_id from trades where

desk = ‘delta-hedge’

)

group by sim_id

In the above query, the trades table is stored in Amazon S3, and pos_pl is the external table pointing to the Amazon s3 result data.

Conclusion

We presented a simple, scalable solution for post-processing grid output data. The Apache Spark script (in AWS Glue) can automatically scale by adding more worker nodes. Amazon Redshift concurrency scaling can be used to scale concurrent queries and users. Many of the scenarios’ grid output results are simulations (Monte Carlo, etc.) and involve working with array-based data and applying User Defined Functions (UDF), such as VaR, Expected Shortfall on the grouped data. Glue jobs can be triggered by various mechanism (time based, AWS Lambda, etc.) and run any combination of filtering and grouping data. Furthermore, running post-processing in AWS reduces the data sets to the level that they can be used by BI systems such as QuickSight (instead of downloading TBs of raw data from Amazon S3).

We welcome your feedback on this post and on any further analysis that you’d like to see implemented.

This post is for educational purposes only and isn’t meant to provide investment advice.

Sumon Samanta

Sumon Samanta

Sumon Samanta is a Senior Specialist Architect for Global Financial Services at AWS. Previously, he worked as a Quantitative Developer at several investment banks to develop pricing and risk systems.