The Internet of Things on AWS – Official Blog
How to store data with AWS IoT SiteWise Edge in many locations
Introduction
In this post, we discuss how AWS IoT SiteWise and AWS IoT SiteWise Edge can be used to store data not only in the AWS IoT SiteWise data store but also in many other locations. By default data is stored in the AWS IoT SiteWise data store on AWS.
Customers told us that they want to use AWS IoT SiteWise to collect their industrial data from OPC-UA data sources. But not all customers want to store their data solely in the AWS IoT SiteWise data store. In this blog post, we describe howto store data in other services like Amazon S3, Amazon Timestream or to consume the data in customers on-premise environment.
AWS IoT SiteWise is a managed service that lets you collect, model, analyze, and visualize data from industrial equipment at scale. An AWS IoT SiteWise gateway collects data from industrial equipment and stores data in the AWS IoT SiteWise data store in the cloud.
AWS IoT SiteWise Edge brings features of AWS IoT SiteWise in the cloud to the customer’s premises. You can process data in the AWS IoT SiteWise gateway locally, and visualize equipment data using local AWS IoT SiteWise Monitor dashboards served from the AWS IoT SiteWise gateway.
By default, data is stored in the AWS IoT SiteWise data store on AWS.
In this blog post, we describe how customers can realize the benefits of the AWS IoT SiteWise Edge gateway to collect data but store data outside of the AWS IoT SiteWise data store.
Time to read 8 min
Learning level 300
Services used AWS IoT SiteWise Edge, AWS IoT Greengrass, Amazon Kinesis Data Streams, Amazon Timestream
Solution
Deploying AWS IoT SiteWise Edge gateway on AWS IoT Greengrass Version 2
I am going to explain how the AWS IoT SiteWise Edge gateway is deployed on AWS IoT Greengrass Version 2.
The AWS IoT SiteWise Edge gateway runs in form of components on AWS IoT Greengrass Version 2. The Data Collection Pack includes two components, the SiteWiseEdgeCollectorOpcua and SiteWiseEdgePublisher. The Data Processing Pack includes the single component SiteWiseEdgeProcessor.
The Data Collection Pack collects your industrial data and routes it to AWS destinations. The Data Processing Pack enables the gateway communication with edge-configured asset models and assets. You can use edge configuration to control what asset data to compute and process locally. You can then send your data to AWS IoT SiteWise or other AWS services in the cloud.
The following screenshot shows an AWS IoT Greengrass V2 deployment with the Data Collection Pack and Data Processing Pack deployed.
Figure 1: AWS IoT Greengrass V2 deployment
Understanding AWS IoT SiteWise gateway architecture
To send data to locations other than the AWS IoT SiteWise data store, you first need to understand the default architecture of the AWS IoT SiteWise gateway.
Data is ingested into the AWS IoT SiteWise data store. Data is collected by the SiteWiseEdgeCollectorOpcua from OPC-UA sources and ingested into an AWS IoT Greengrass stream on the gateway, by default to the SiteWise_Stream. The SiteWiseEdgePublisher reads the data from the stream and transfers it to the SiteWise data store on AWS.
Figure 2: AWS IoT SiteWise gateway architecture
Configuring destinations in the AWS IoT SiteWise gateway to store data in many locations
To send data to a destination other than the AWS IoT SiteWise data store, the gateway configuration allows you to configure the AWS IoT Greengrass stream name where the SiteWiseEdgeCollectorOpcua stores the data. You define the stream name for each data source in your AWS IoT SiteWise gateway. You can use the AWS IoT SiteWise console, the AWS CLI or AWS SDK to configure the stream name.
You can create your own custom stream on AWS IoT Greengrass V2 and point the destination for a data source to that stream. A stream can have an export definition, which defines the AWS destination to which your data will be transferred. Currently, AWS IoT SiteWise, AWS IoT Analytics, Amazon S3, and Amazon Kinesis Data Streams are supported as export configurations. When you export your data to Amazon Kinesis Data Streams, you have many options to read the data from Amazon Kinesis Data Streams and transfer it to another service. With consumers reading data from Amazon Kinesis Data Streams, you can send your data to different locations.
If you want for example to store your data in Amazon Timestream you can use an AWS Lambda function or Amazon Kinesis Data Analytics for Apache Flink as a consumer for Amazon Kinesis Data Streams and write the data into your Amazon Timestream table.
With such an architecture, you can not only store your data in Amazon Timestream but also in any location which is accessible from your Amazon Kinesis Data Streams consumer.
In case you are not using an export configuration for a custom stream, you can develop your own AWS IoT Greengrass component to consume data from your custom stream.
Figure 3: Architecture to store data in many locations with AWS IoT SiteWise
Understanding AWS IoT SiteWise Edge gateway architecture
The AWS IoT SiteWise Edge gateway architecture differs from the AWS IoT SiteWise gateway architecture in that it includes the SiteWiseEdgeProcessor, which allows you to serve AWS IoT SiteWise Monitor portals at the edge and also process data at the edge.
Figure 4: AWS IoT SiteWise Edge gateway architecture
To send data from AWS IoT SiteWise Edge to many locations you cannot use the same approach as with AWS IoT SiteWise. A custom stream for a data source defines where the SiteWiseEdgeCollectorOpcua sends the data to. The Data Processing Pack already uses the custom stream name SiteWise_Edge_Stream. If you changed the stream name to your custom stream, then your data would not reach the SiteWiseEdgeProcessor.
Configure AWS IoT SiteWise Edge to store data in many locations
There are multiple options to send data from AWS IoT SiteWise Edge to many locations. If you do not want to send data to the AWS IoT SiteWise data store you must remove the SiteWiseEdgePublisher from your AWS IoT Greengrass deployment, because the SiteWiseEdgePublisher reads data from the SiteWise_Stream and stores it in the AWS IoT SiteWise data store.
You can use the API at the edge to retrieve data and store it, for example, in a stream on AWS IoT Greengrass for further processing. This option requires you to query the API for every single asset property, and if your asset properties change, you must also change your application or the application’s configuration.
Another option is to develop a component to read data from the SiteWise_Stream. The component transfers the data to another destination such as another stream or a target in your on-premises environment.
Figure 5: Architecture to store data in many locations with AWS IoT SiteWise Edge
In the following example we explain how you can read data from the SiteWise_Stream and in one case, ingest the data to a custom stream to be transferred to AWS, and in another case, publish the data to a local MQTT message broker. The custom stream is created with an export configuration to Amazon Kinesis Data Streams on AWS.
The following code snippets are based on an AWS IoT Greengrass V2 component written in Python. The code uses the AWS Greengrass Stream Manager SDK for Python and the Paho Python Client.
The following variables are used in the custom component.
- STREAM_NAME_SOURCE is the name of the stream to read the data from.
- STREAM_NAME_TARGET is the name of your custom stream where you want to send the data to.
- STREAM_NAME_CLOUD is the name of Amazon Kinesis Data Streams on AWS. The stream STREAM_NAME_TARGET is created with an export configuration to the STREAM_NAME_CLOUD.
For example:
STREAM_NAME_SOURCE = "SiteWise_Stream"
STREAM_NAME_TARGET = "SiteWise_Anywhere_Stream"
STREAM_NAME_CLOUD = "SiteWiseToKinesisDatastream"
Before starting the component you must create an Amazon Kinesis Data Stream with stream name STREAM_NAME_CLOUD on AWS.
Upon start, the component checks if the stream STREAM_NAME_TARGET exists. If the stream does not exist, it is created with an export configuration to Amazon Kinesis Data Streams on AWS.
try:
response = stream_manager_client.describe_message_stream(STREAM_NAME_TARGET)
logger.info("stream_name: %s details: %s", STREAM_NAME_TARGET, response)
except ResourceNotFoundException as error:
logger.info("create message stream: %s error: %s", STREAM_NAME_TARGET, error)
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier=f"{STREAM_NAME_CLOUD}",
kinesis_stream_name=STREAM_NAME_CLOUD,
batch_size=10,
batch_interval_millis=60000
)]
)
stream_manager_client.create_message_stream(
MessageStreamDefinition(
name=STREAM_NAME_TARGET,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
persistence=Persistence.File,
max_size=1048576,
export_definition=exports
)
)
except Exception as error:
logger.error("%s", error)
The component reads messages from the STREAM_NAME_SOURCE. Once messages are available it iterates over the entries in a message and starts threads to store the entries in a custom stream and to publish them to an MQTT message broker.
response = stream_manager_client.read_messages(
STREAM_NAME_SOURCE,
ReadMessagesOptions(
desired_start_sequence_number=LAST_READ_SEQ_NO + 1,
min_message_count=MIN_MESSAGE_COUNT,
read_timeout_millis=1000
)
)
for entry in response:
logger.info("stream_name: %s payload: %s",
STREAM_NAME_SOURCE, entry.payload)
# send data to another stream at the edge
thread_stream = Thread(
target=store_message_to_stream,
args=[entry.payload])
thread_stream.start()
logger.info('thread_stream started: %s', thread_stream)
# send data to a local MQTT message broker
thread_mqtt = Thread(
target=publish_message_to_mqtt_broker,
args=[entry.payload])
thread_mqtt.start()
logger.info('thread_mqtt started: %s', thread_mqtt)
The following function code writes data to the custom stream STREAM_NAME_TARGET. Data ingested in this custom stream is transferred automatically to Amazon Kinesis Data Streams on AWS.
def store_message_to_stream(payload):
try:
sequence_number = stream_manager_client.append_message(stream_name=STREAM_NAME_TARGET, data=payload)
logger.info('appended message to stream: %s sequence_number: %s message: %s',
STREAM_NAME_TARGET, sequence_number, payload)
except Exception as error:
logger.error("append message to stream: %s: %s",
STREAM_NAME_TARGET, error)
The following function code publishes data to the topic sitewise on an MQTT message broker.
def publish_message_to_mqtt_broker(payload):
try:
logger.info('MQTT: publish message: %s', payload)
c_mqtt = paho.Client()
c_mqtt.mqtt_on_publish = mqtt_on_publish
c_mqtt.mqtt_on_disconnect = mqtt_on_disconnect
c_mqtt.connect(MQTT_BROKER, MQTT_PORT)
ret = c_mqtt.publish("sitewise", payload)
logger.info('MQTT: publish: ret: %s', ret)
c_mqtt.disconnect()
except Exception as error:
logger.error("MQTT: publish message: %s", error)
Conclusion
In this blog, you have learned how you can use an AWS IoT SiteWise gateway to collect data from your industrial equipment and send it to many locations. You have learned how to configure your gateway to send data from AWS IoT SiteWise or AWS IoT SiteWise Edge to a custom destination. Based on sample code, you have seen how you can transfer your data to a custom location on AWS and into your on-premise environment. Learn more at the AWS IoT SiteWise product page or at the AWS IoT SiteWise workshops.
About the author
Philipp Sacha
Philipp is a Specialist Solutions Architect for IoT at Amazon Web Services supporting customers in the IoT area. He joined AWS in 2015 as a general Solutions Architect and moved in 2018 into the role of a Specialist in the IoT area.