AWS Architecture Blog
Field Notes: Building Automated Pipeline Corrosion Monitoring with AWS IoT Core
This post was authored by Venkatesh Muthusami, Principle Consultant, Infosys Technologies, Sudharsan Chinnappan, Analyst, System Development, Infosys Technologies, Kenneth Francis Dias, Technology Architect, Infosys Technologies, and Ashutosh Pateriya, Partner Solutions Architect, AWS
Pipelines are crucial to the oil and gas industry across upstream, midstream, and downstream sectors. For industries like oil and gas, the pipeline network forms the backbone for transportation, and corrosion in these pipelines pose a significant challenge. Inspections on pipelines help companies identify potential failures and proactively remediate issues to minimize business operations. These monitoring activities are labor-intensive since they are usually carried out manually onsite, and proactive maintenance is a significant operational expense.
In this post, you learn how to build an end-to-end pipeline monitoring solution AWS IoT Core where ultrasonic sensor data flows into AWS IoT Core, and the AWS Lambda function processes the data. Then it gets stored in Amazon S3 with a corrosion measurement point, corrosion value, and timestamp.
Using AWS IoT Core and analytics services, customers can reduce the operational expense for pipeline monitoring and save time. The following technology use case specifically targets in-plant pipelines. However, it can be applied to a broader range of use cases involving infrastructure where IoT sensors measure specific parameters.
Architecture Overview
The solution has been conceptualized for in-plant pipelines and uses ultrasonic sensors placed at regular intervals on the pipeline’s external surface. The ultrasonic sensors capture pipeline thickness and transmit data in real time via IoT Gateway (we used Kepware IoT gateway for this project, but customers can use any IoT gateway) into AWS IoT for storage and analytics.
This solution provides an automated way to monitor pipeline thickness periodically using the data provided by ultrasonic sensors and stores this data into Amazon S3 for durable storage. It also provides a visual dashboard using Amazon QuickSight. This solution continuously checks the predefined pipeline thickness limits and triggers email and mobile notifications to the operations team. AWS Lambda and Amazon Simple Notification Service (Amazon SNS) provides these notifications as soon as there is a limit breach.
The components of this solution are:
- Ultrasonic sensors – These sensors are mounted across the pipeline at periodic intervals to capture pipeline thickness. Also, these sensors publish data to the IoT gateway.
- IoT Gateway – This is used to ingest data from the individual ultrasonic sensors and in turn publish sensor data under a topic via MQTT protocol to be consumed by AWS IoT Core.
- AWS IoT Core – AWS IoT Core subscribes to the IoT topics published by the IoT gateway and ingests data into the AWS Cloud for analysis and storage.
- AWS IoT rule – Rules give your devices the ability to interact with AWS services. Rules are analyzed and actions are performed based on the MQTT topic stream. Here Amazon SNS rule has been used to trigger an email notification when pipe thickness goes below the configured value.
- AWS Lambda – Used to load data to S3.
- Amazon SNS – Sends notifications to the operations team as necessary.
- Amazon S3 – Stores sensor data, mapping information and corresponding metadata like timestamps, sensor region and measurement location.
- Amazon Athena – Queries Amazon S3 using standard SQL to analyze data.
- Amazon QuickSight – helps visualize and provide insights on the sensor data.
You follow these steps to build an end-to-end data pipeline:
-
- Data collection from ultrasonic sensors via IoT gateway
- Process data using AWS IoT Core
a. Sending notification using AWS IoT rule
b. Store sensor data to S3 - Create an IoT rule to trigger the Lambda function when new data arrives
- Analyzing the data with Athena
- Create a dashboard using Amazon QuickSight
1. Data collection from ultrasonic sensors via IoT gateway
Install ultrasonic sensors across the pipe surface at periodic intervals. Data (sensor id, pipeline thickness and timestamp) from these ultrasonic sensors sent to the IoT gateway. The IoT gateway publishes that payload data to the configured endpoint of AWS IoT Core.
In the preceding diagram, we are configuring IoT gateway to receive periodic data from ultrasonic sensors and batch them before publishing it to AWS IoT Core.
2. Process data using AWS IoT Core
Configure IoT Gateway to publish sensor data under a topic via MQTT protocol. The data is sent to AWS IoT Core endpoint and to keep data encrypted during transit, AWS IoT Core endpoint is configured with an SSL certificate. Subscribe to the IoT Topic in AWS IoT Core and process the received payload through IoT Rule.
Note: AWS IoT Core provides a feature to generate an SSL certificate and configure with Kepware to connect to the endpoint of AWS IoT Core.
a. Sending notification using AWS IoT rule
In this section, you create an AWS IoT rule that sends MQTT message data to an Amazon SNS topic so that it can be sent as an email or AWS SMS text message.
You need to create a rule that sends message data from the ultrasonic sensors to all subscribers of an Amazon SNS topic, whenever the pipe thickness goes below the value set in the rule. The rule detects when the reported thickness goes below the value set by the rule and creates a new message payload that includes only the device ID, the reported thickness and the timestamp.
The following rule sends the new message payload as a JSON document to an SNS topic, which notifies all subscribers to the SNS topic.
Refer to Configuring Amazon SNS to learn more.
The following image shows a sample email received from Amazon SNS with the payload for the operation team which has details about the sensor id, report time and the actual thickness (which is below the configured threshold).
b. Storing sensor data to S3
Store and format the sensor data (corrosion measurement point, wall thickness value and timestamp) using the AWS Lambda function in Amazon S3. The data stored in the bucket is encrypted using Server-Side Encryption with Amazon S3-Managed Keys (SSE-S3). Data is encrypted as soon as it lands in S3 bucket.
Follow the steps below:
- Create an S3 bucket named ‘corrosioniot’ (make sure to use a unique name for your S3 bucket). Refer to ‘How to create an S3 bucket‘.
- Create the following Lambda function. This lambda function formats the data before sending it to S3 for persistent storage and analysis. Refer to the Lambda Function creation guide.
import json
import boto3
from datetime import datetime
import json
import boto3
from datetime import datetime
s3 = boto3.resource('s3')
def lambda_handler(event, context):
payload = event['values']
bucket_name = 'corrosioniot'
fmt = "%Y-%m-%d %H:%M:%S"
for sensor_data in payload:
# Format the timestamp to a user readable format
timestamp = datetime.fromtimestamp(float(sensor_data['timestamp'])/1000.00).strftime(fmt)
# Create the JSON key value pair for the payload (Sensor id, Timestamp, Value)
itemJson = {"sensor_id": sensor_data['sensor_id'],"timestamp": timestamp, "value": str(round(sensor_data['value'],2))}
# Create an object with the name, concatenating sensor id and timestamp
file_name = sensor_data['sensor_id'] + "_" + str(timestamp) + ".txt"
lambda_path = "/tmp/" + file_name
# Write the object with the payload into the S3 bucket
with open(lambda_path, "w") as f:
json.dump(itemJson, f)
3. Create an IoT rule to trigger the Lambda function when new data arrives.
In the following screenshot, we are using a Lambda function to push data to S3 in JSON format so that it can easily be queried using Athena.
The following screenshot shows the data being pushed to the S3 bucket.
4. Analyzing the Data with Athena
In the AWS Management Console, navigate to Services -> Athena. Create Athena database “corrosion” using the following command:
create database corrosion;
Use this database to create all Athena tables.
Create an Amazon Athena table using the following command:
CREATE EXTERNAL TABLE corrosion(
sensor_id string,
value string,
timestamp string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://corrosioniot/
The following screenshot shows a table has been created successfully.
Query the sensor data using Athena
Use the below SQL query to fetch the sensor data:
SELECT
corrosion.corrosion.sensor_id AS Sensor_ID,
corrosion.corrosion.value AS value,
corrosion.corrosion.timestamp AS Timestamp
FROM "corrosion.corrosion"
ORDER BY timestamp ASC
Using SQL query, sensor data has been pulled from S3 which will can be used for analysis by any business intelligence tool like AWS QuickSight.
5. Create a dashboard using Amazon QuickSight
- Go to Amazon console and select QuickSight.
- Sign in to the QuickSight console using your username and password. If you do not have Quicksign login, review the Signing Up for an Amazon QuickSight Subscription guide.
- Once logged in, select the Quicksight icon on the top left and select Datasets and then new dataset.
- Select Athena, input data source name and select Validate connection. Choose Create data source.
- Select the database and table (same as we used in Athena) and select Use custom SQL.
- Enter the same query we used in the Athena console and select Confirm query. Select Save and visualize.
SELECT
corrosion.corrosion.sensor_id AS Sensor_ID,
corrosion.corrosion.value AS value,
corrosion.corrosion.timestamp AS Timestamp
FROM "corrosion.corrosion"
ORDER BY timestamp ASC
- Visualize and it will take you the dashboard.
- Select the vertical bar chart from the available chart types (feel free to tryout various chart types) and select the fields to be put in the chart.
QuickSight retrieves the sensor data from Amazon S3 using Athena based on the given date range. QuickSight is configured with Athena as data source and the graph is plotted to have timelines on x-axis and thickness values on y-axis. The graph provides a consolidated view of all the pipeline thickness values across all sensors over a period of time.
Conclusion
As you have learned in this post, you can build an IoT-based automated pipeline inspection system to store your data in a data lake. The data coming from various ultrasonic sensors is then stored in S3 where you can perform analytics on the data using Athena. Visual dashboards are generated using QuickSight. This solution helps reduce the need for manual inspection work, and drives maintenance scheduling efficiencies. Instances of pipeline failures are reduced with the instant notifications of pipeline thickness going below the configured threshold. Operations teams are able to respond quickly and prevent pipeline failure. Additionally, payload data from the sensors can be analyzed for business planning and predictive maintenance.
While we discussed oil and gas industry vertical as an example, this solution can be applied to any industry that uses IoT sensors to collect data. A number of customers in different industry verticals—smart devices, technology, industrial, automotive, oil and gas, security, and so on use AWS IoT Core in a number of ways. Visit our IoT use cases page for more information about how customers have achieved success with IoT on AWS.