AWS Big Data Blog

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

March 2024: This post was reviewed and updated for accuracy.

According a 2006 publication in Journal of Head Trauma Rehabilitation , recreational sports causes 1.6–3.8 million minor traumatic brain injuries (mTBI) incidents (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources). The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria. For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. You will be using key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, you will be focusing on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 4 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Amazon Managed Service for Apache Flink averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda  using Kinesis Streams
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR and Amazon SageMaker to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases. With the launch of Amazon Q, you can also leverage Amazon Q to generate data integration jobs using natural language.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 4 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 4

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws iot create-thing-group --thing-group-name heartRateGroup
aws iot add-thing-to-thing-group —thing-name raspberry-pi —thing-group-name heartRateGroup 

To complete the setup, follow steps 1 to 3 in Interact with local IoT devices over MQTT.

  • In the first step, you update the core device policy so that core device can communicate successfully with client device, in this case the Heartrate_Sensor.
  • In the second step, you enable some core Greengrass components for a client device to use cloud discovery to connect to core device.
  • In the third step, you communicate with core device over MQTT topic iot/heartrate.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. This means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 4 along with the AWS Greengrass core software. The IoT device uses the AWS IoT Device SDK , which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent.

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 4. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a python script kinesis_client_HeartRate.py, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

import datetime
import json
import random
import boto3
import time
STREAM_NAME = "<your_stream_name>"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'deviceID': random.randrange(1,11,1),
'heartRate': random.randrange(60,141,1)
}
def generate(stream_name, kinesis_client):
while True:
time.sleep(1)
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey"
)
if name == '__main__':
generate(STREAM_NAME, boto3.client('kinesis', region_name='us-east-1'))

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

Create a S3 bucket and upload the zipped application file for Amazon managed service for Apache Flink .

When launching an AWS CloudFormation template, make sure to enter your email address and S3 bucket name that you have created above.

Once the CloudFormation template is completely deployed, follow the below instructions to simulate heartrate data.

  • Connect to deployed EC2 instance and start the heartrate_producer.py script located at /tmp/heartrate_producer.py.
    You can view the data in Kinesis Stream:
  • Next, start the Amazon Managed Service for Apache Flink application.
  • You would also have received an email to confirm receiving alerts from SNS. Once you confirm the subscription, you will start receiving alerts.

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify anomalies in heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Amazon managed service for Apache Flink to perform analytics on streaming data in real time. Refer to the AWS documentation to learn the detailed steps to configure an application using Python.

Amazon managed service for Apache Flink uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

Below is the python code which processes the data in Amazon managed service for Apache Flink:

# -*- coding: utf-8 -*-

"""
heartrate_monitor.py
"""

from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes
from pyflink.table.window import Tumble
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udf
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda

def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))

def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]

def create_table(table_name, stream_name, region, stream_initpos = None):
init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else ''
return """ CREATE TABLE {0} (
event_time TIMESTAMP(3),
deviceID INTEGER,
heartRate INTEGER,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (deviceID)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(table_name, stream_name, region, init_pos)

def create_output_table(table_name, stream_name, region):
return """ CREATE TABLE {0} (
deviceID INTEGER,
heartRate INTEGER,
event_time VARCHAR(64)
)
PARTITIONED BY (deviceID)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(table_name, stream_name, region)

def perform_tumbling_window_aggregation(input_table_name):
# use SQL Table in the Table API
input_table = table_env.from_path(input_table_name)

tumbling_window_table = (
input_table.window(
Tumble.over("10.seconds").on("event_time").alias("ten_second_window")
)
.group_by("deviceID, ten_second_window")
.select("deviceID, heartRate.avg as heartRate, to_string(ten_second_window.end) as event_time")
)

return tumbling_window_table

@udf(input_types=[DataTypes.TIMESTAMP(3)], result_type=DataTypes.STRING())
def to_string(i):
return str(i)

table_env.create_temporary_system_function("to_string", to_string)

def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"

input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "scan.stream.initpos"

output_stream_key = "output.stream.name"
output_region_key = "aws.region"

# tables
input_table_name = "heartrateStream"
output_table_name = "highheartRate"

# get application properties
props = get_application_properties()

input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)

input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]

output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]

# 2. Creates a source table from a Kinesis Data Stream
#print("*************inside main before create input **********")
table_env.execute_sql(create_table(input_table_name, input_stream, input_region, stream_initpos))

# 3. Creates a sink table writing to a Kinesis Data Stream
#print("*************inside main before create output **********")
table_env.execute_sql(create_output_table(output_table_name, output_stream, output_region))

# 4. Queries from the Source Table and creates a tumbling window over 10 seconds to calculate the cumulative PRICE
# over the window.
tumbling_window_table = perform_tumbling_window_aggregation(input_table_name)
table_env.create_temporary_view("tumbling_window_table", tumbling_window_table)

# 5. These tumbling windows are inserted into the sink table
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1} WHERE heartRate > 120 or heartRate < 40"
.format(output_table_name, "tumbling_window_table"))

# get job status through TableResult
print(table_result.get_job_client().get_job_status())
if name == "__main__":
main()

The above python application first creates an input table from Kinesis stream and a sink to another Kinesis Stream . It also creates a temporary table to do the tumbling window before passing on the data to sink. It inserts values into the stream only when the average value of the heart beat that is received from source is greater than 120 or less than 40 in the 60-second time window. For more information about the tumbling window concept, see Creating a Tumbling Window in Python.

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the function overview editor, make sure that the kinesis stream capturing high heart rate is added as trigger. You only want to trigger the Lambda function when anomalies in the heart rate are detected.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Streams. The python lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message or email via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Python 3.x and choose the Create a custom role option for IAM permissions. If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatchLogs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Streams. After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.
Finally, the AWS Lambda function sends a email notification (which does not contain any sensitive information) to the athletic trainer’s email id retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage. To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to Gf, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

Rahul Singh is a Sr. Solutions Architect working with Global systems integrators. His expertise are on AI/ML and Data Engineering. He’s a happy camper in Chicago with his family and enjoys playing baseball with his kids


Audit History

Last reviewed and updated in March 2024 by Rahul Singh | Sr. Solutions Architect