AWS Public Sector Blog
Hydrating the Natural History Museum’s Planetary Knowledge Base with Amazon Neptune and Open Data on AWS
The Natural History Museum (NHM) in London is a world-class visitor attraction and a leading science research center. NHM and Amazon Web Services (AWS) have partnered up to transform and accelerate scientific research by bringing together a broad range of biodiversity and environmental data types in one place for the first time. The goal is to enable and accelerate much-needed research in the area of biodiversity that can help society address and combat the loss of global ecosystems.
In our first blog post, we discussed NHM’s overall vision for using open data in combination with large-scale compute, data systems, and machine learning (ML) to create the Planetary Knowledge Base (PKB), a knowledge graph of global biodiversity. In this post, we focus on the underlying services and architecture that comprise the PKB. Readers will learn how NHM deployed a knowledge graph of global biodiversity using Amazon Neptune in search of higher-level insights, such as incorrect classifications and outliers, by training a deep graph neural network (GNN) on top of the knowledge graph. Principles, architectural patterns, and code samples introduced in this how-to can be transferred to other domains where semantic relationships between entities can be used for insights. For a deep dive into this use case, also watch the re:Invent 2023 session on Building next-generation sustainability workloads with open data (SUS304).
Solution overview
The solution workflow is shown in Figure 1 and operates as follows:
- Specimen data is ingested from the Global Biodiversity Information Facility (GBIF) Species Occurrence dataset. This publicly available data (via the Amazon Sustainability Data Initiative, or ASDI) exists as parquet files and is stored in an Amazon Simple Storage Service (Amazon S3) bucket. A PySpark-based AWS Glue job is used to process the multiple large-scale datasets and generate Neptune load files (nodes and edges) from them.
- The processed data is loaded into a Neptune graph database using the Neptune bulk loader through a Neptune notebook.
- Users interface with the graph database by issuing queries (written in Gremlin query language) through a Neptune notebook.
- In order to use Neptune ML, the graph data is exported back to Amazon S3 using the Neptune-Export service.
- A data processing job is run, followed by a GNN training job. Once trained, the model is deployed to an Amazon SageMaker endpoint.
- Users submit Gremlin inference queries to the model endpoint, which prepares the data and returns inference results.
Solution walkthrough
Step 1. Ingest and transform GBIF specimen data from AWS Repository of Open Data and create node and edge load files
The PKB supplements the GBIF Species Occurrences dataset with several NHM-compiled datasets, such as a database of biodiversity research institutions (Institution) and a unified dataset of collectors and their affiliations (Person). These entities (that is, the nodes or vertices) and their relationships (that is, the edges or triples) can be represented as a graph (shown in Figure 2).
To set up this graph and interact with it at scale, we need to generate load data that can be ingested and processed by Neptune. Neptune understands both property graph formats (Apache Tinkerpop Gremlin or openCypher) and various RDF load data formats. For the PKB, we selected Gremlin for its ability to integrate natively with existing programming frameworks like Python or Java, as well as its intuitive graph traversal syntax and widespread adoption as part of the vendor-agnostic Apache TinkerPop framework (see this AWS Open Source Blog post for a gentle introduction to Apache Tinkerpop and Gremlin).
Gremlin load data are .csv
files (with optional compression) that include a comma-separated header row with both system column headers and, optionally, property column headers. For nodes, you need to supply an ~id
(which uniquely identifies the node) and a ~label
column (which indicates that type of node, for example, a specimen). In addition, you can supply property columns that capture supplementary information for each node (for example, the year of discovery of a GBIF specimen). For edges, you also need to specify the node-to-node relationship encoding. For this, you need to specify columns ~from
(the ID of the origin node) and ~to
(the ID of the destination node).
With GBIF containing more than 2.5 billion individual specimens, generating these node and edge load data files requires large-scale distributed processing. A convenient and highly cost-effective way to achieve this without having to manage the underlying compute infrastructure is AWS Glue, a serverless data integration service. AWS Glue lets you specify the extract, transform, and load (ETL) workflow by supplying, for example, a PySpark script. All that’s left to do apart from this is to specify the peak processing capacity of your AWS Glue job by specifying the desired number of workers. For more details on configuring and customizing AWS Glue Spark job properties, see the AWS documentation (Configuring job properties for Spark jobs in AWS Glue).
The following code snippet details the ingestion of the GBIF specimen data from the AWS Registry of Open Data Amazon S3 bucket and the subsequent transformation into a Gremlin load data file that is saved to a destination output S3 bucket for ingestion as specimen nodes into Neptune. Note that individual dataset license agreements are clearly stated on the respective dataset card. For example, GBIF is licensed under a CC-BY-NC license.
Ensure that both input and output S3 locations and the AWS Glue job run in the same AWS region to minimize latency and network cost. Also make sure that Glue has the necessary permissions to read from and write to the specified S3 buckets.
def dump_specimens(data_location=gbif_specimen_bucket,output_location=destination_data_bucket,filter_col="kingdom",filter_val="Plantae",limit=False,save=False):
#read data directly from AWS Open Data (hosted on S3)
specimen_df = spark.read.parquet(data_location)
#optionally filter for filter_val on filter_col
if filter_col:
specimen_df = specimen_df.filter(col(filter_col)==filter_val)
#optionally limit (useful for test loading)
if limit:
specimen_df = specimen_df.orderBy('gbifid').limit(limit)
#specimen_df=specimen_df.limit(limit)
#select subset of columns
cols = ['gbifid','kingdom','family','verbatimscientificname','occurrencestatus','decimallatitude','decimallongitude','day','month','year',
'recordedby','identifiedby','lastinterpreted','taxonkey','institutioncode','countrycode']
specimen_df=specimen_df.select(*cols)
#convert column names to lowercase
specimen_df = specimen_df.toDF(*(c.lower() for c in cols))
cols = specimen_df.columns
#unnest recordeby and identifiedby columns
specimen_df = specimen_df.withColumn("recordedby", col("recordedby").getItem(0))
specimen_df = specimen_df.withColumn("recordedby", specimen_df["recordedby"].array_element) #index "array_element" field
specimen_df = specimen_df.withColumn("identifiedby", col("identifiedby").getItem(0))
specimen_df = specimen_df.withColumn("identifiedby", specimen_df["identifiedby"].array_element) #index "array_element" field
#dataset-specific santization/cleansing steps
#remove commas (,) and double quotes (“) from all columns
col_to_change=['kingdom','family','verbatimscientificname','occurrencestatus','recordedby','identifiedby','institutioncode','countrycode']
for c in col_to_change:
specimen_df = specimen_df.withColumn(c, regexp_replace(col(c), '[,"]', ''))
#set ~id and ~label columns
specimen_df = specimen_df.select(format_string("gbif%s", "gbifid").alias("~id"),lit("specimen").alias("~label"),*cols).distinct()
#remove rows with empty ids
specimen_df = specimen_df.filter(col("~id").isNotNull())
#save results to s3 under spark/nodes/specimens prefix
num_partitions=1 #change this for very large files
specimen_df.coalesce(num_partitions).write.mode("overwrite").csv(f"{output_location}/spark/nodes/specimens",
header = True, encoding="UTF-8")
return specimen_df #return data for later edge generation
#set origin and detsination buckets
gbif_specimen_bucket = 's3://gbif-open-data-us-east-1/occurrence/2023-06-01/occurrence.parquet/*' #AWS open data
output_location = 's3://<pkb-data-lake>' #replace with s3 location!
# execute e-2-e data transformation job for a 10M subset of data
# (for demonstrative purposes)
specimens = dump_specimens(data_location=gbif_specimen_bucket,
output_location=destination_data_bucket,
limit=10000000,save=save)
Creating edges then involves matching nodes on specified keys to create relationships. For example, the following code excerpt produces specimen-to-institution edges (~label = discovering_institution
) and saves them to Amazon S3.
def create_specimen_institution_edges(specimens=specimens,institutions=institutions,output_location=destination_data_bucket):
#select relevant columns in institution dataset, subset only those with associated institution
specimens_wt_inst = specimens.filter(col('institutioncode').isNotNull())
specimens_wt_inst = specimens_wt_inst.select('~id','institutioncode')
specimens_wt_inst = specimens_wt_inst.withColumnRenamed("~id", "~id_specimen")
#select relevant columns in institution dataset
institutions = institutions.select('~id','code')
institutions = institutions.withColumnRenamed("~id", "~id_inst")
#join both datasets on institution code
edges_specimen_inst = specimens_wt_inst.join(institutions,specimens_wt_inst['institutioncode'] == institutions['code'], "left")
edges_specimen_inst = edges_specimen_inst.filter(col('code').isNotNull())
set ~id, ~label, ~from, ~to columns
edges_specimen_inst = edges_specimen_inst.select(format_string("%s-spec-to-%s-inst", "~id_specimen", "~id_inst").alias("~id"),
col("~id_specimen").alias("~from"),
col("~id_inst").alias("~to"),
lit('discovering_institution').alias('~label')).distinct()
#save under edges/specimen-institution
edges_specimen_inst.coalesce(1).write.mode("overwrite").csv(f"{output_location}/spark/edges/specimen-institution",
header = True, encoding="UTF-8") #optionally add gzip compression for faster loading into Neptune
return edges_specimen_inst
You can repeat a similar node and edge generation routine for all other parts of the graph. You will end up with a set of data load files in Amazon S3 that can be directly ingested by Neptune. Note that as you are adapting this code for your own use cases, it is recommended to add your own error wrapping/handling, for example, by including try-except blocks in the above routines and/or by implementing custom exception classes. You can also refer to the Glue job’s CloudWatch logs to assist the debugging process during development.
Step 2. Set up Neptune graph database and load data into Neptune
Now that the GBIF data is formatted and stored in Amazon S3, we want to get the data loaded into a Neptune graph database. As with many new applications and workloads, trying to predict capacity requirements for the PKB comes with uncertainty. As new specimens are identified and recorded by the global scientific community over time, the database will grow. Database query demand will likely vary over multiple time scales (for example, hour by hour, week by week) as the needs of users change and database adoption grows. Ensuring the database is always provisioned for peak capacity under such circumstances is not only an undifferentiated heavy lift that consumes valuable human work-hours but also manual capacity management could allow for cost inefficiencies to creep into the workload.
For those reasons, the PKB uses Neptune serverless. This allows the database and associated workload to scale up and down as needed without having to manually manage and optimize capacity. This reduces the guesswork and complexity associated with adapting to a potentially changing demand environment. Neptune database instances can be modified from provisioned to serverless or from serverless to provisioned without having to create a new database cluster or instance. Serverless database instances can be readers or writers within a Neptune cluster, and you can even mix serverless and non-serverless database instances in the same cluster.
To get started with Neptune, you can use the AWS console, CLI, or SDK. Alternatively, this AWS CloudFormation quick start template will set up a new database cluster, the necessary identity and access management (IAM) roles, and a Neptune graph (Jupyter) notebook for you to start working with Neptune ML.
Once the database is up and running, it’s time to get our Gremlin-formatted data into the database. To accomplish this, we use the Neptune bulk loader through the Neptune graph notebook we created when deploying the quick start CloudFormation template (the bulk loader can also be used through the command line). Launching the bulk loader is simple; use the built-in loader command:
%load
There are several parameters that make up a bulk load request. For a complete reference, see the Neptune Loader command documentation, but we’ll touch on a few parameters here (highlighted in Figure 3).
- Source – The Amazon S3 URI of the data or files to be loaded. This can be one or more files or folders.
- Format –The format of the data files (for example, “csv” for Gremlin CSV format, “opencypher” for openCypher CSV format).
- Region – The AWS Region of the database cluster. Note that the source Amazon S3 bucket must be in the same Region.
- Load ARN – The Amazon Resource Name (ARN) for an IAM role assumed by the Neptune DB instance that grants access to the source Amazon S3 bucket.
Data can be added to the graph database over time. For the PKB prototype, we worked with a subset of the total GBIF dataset containing approximately 10 million species occurrences of plants.
Step 3. Query the graph database
This step is not required in the setup of the workflow, but once you have some data loaded, the database can be queried using the Neptune graph notebook and the Gremlin query language. One useful feature of the Gremlin query language is the ability to visualize graphs in addition to querying the data itself.
To illustrate this, this video demonstrates a few example queries of the prototype PKB knowledge graph, including issuing a sample query to find species occurrences from Fiji (limited to 25 entities for viewing simplicity).
Step 4. Export graph data to Amazon S3 for use with Neptune ML
Training a GNN using Neptune ML requires that training data is provided and formatted such that it can be used by the Deep Graph Library. Getting data from a Neptune graph database formatted and into Amazon S3 for training can be accomplished with a single line of code by running an export job using Neptune workbench magics.
%%neptune_ml export start —export-url {neptune_ml.get_export_service_host()} —export-iam —store-to export_results —wait-timeout 1000000
${export_params}
Here, we specify the target, ML task, and additional features, including feature engineering routines (see the Neptune User Guide for a list of built-in feature-transformation techniques). For example, to specify a node classification training job for the PKB, export parameters may look as follows:
export_params = {
"command": "export-pg",
"params": {
"endpoint": neptune_ml.get_host(),
"profile": "neptune_ml",
"useIamAuth": neptune_ml.get_iam(),
"cloneCluster": False, #set to True if a separate Neptune Cluster should be spun up for exporting
"filter" : { #specify which nodes/edges should be included by the GNN
"nodes":
[
{"label":"specimen", "properties":["countrycode","year","occurrencestatus"]},
{"label":"taxon", "properties":['taxonrank','status']},
{"label":"country","properties":['dependsfrom']},
{"label":"institution","properties":[]}
],
"edges":
[
{"label":"origing_country","properties":[]},
{"label":"determination","properties":[]},
{"label":"located_in","properties":[]},
{"label":"discovering_institution","properties":[]}
],
},
},
"outputS3Path": processed_folder_s3_uri, #the export destination
"additionalParams": {"neptune_ml": {
"version": "v2.0",
"targets": [{"node": "specimen", "property": "countrycode", "type": "classification"}],
"features":[{"node":"specimen",
"property":"year",
"type":"bucket_numerical",
"range":[1600, 2100],
"num_buckets": 12},
]}},
"jobSize": "medium",
}
Step 5. Train and deploy a GNN with Neptune ML
At a high level, a GNN produces a lower-dimensional, simplified representation of the underlying highly complex graph structure. These so-called node embeddings preserve locality and relationship information. They also capture properties of the nodes and edges, essentially distilling the information contained in the graph to a lower-dimensional space. With the embeddings generated, a downstream classification or regression model can be learned to make predictions. Both GNN-generated embeddings and the output layer (for regression or classification, depending on the use case) are trained jointly and in an end-to-end fashion. For additional details, refer to this tutorial on Learning graph neural networks with Deep Graph Library.
In the context of the PKB, a GNN can be used for node classification (for example, predicting properties of nodes, such as the discovering institutions) or link prediction (for example, suggesting the 10 most likely countries in which a specimen may be found).
The GNN training routine in Neptune ML comprises three key steps and can be accomplished from start to finish in a Neptune graph notebook:
1. Run a data processing job – Once exported, we can run the necessary processing (already specified in the export_params JSON above). We first need to set the parameters for the processing job, such as the instance type to use:
processing_params = f"""
--config-file-name training-data-configuration.json
--job-id {training_job_name}
--s3-input-uri {export_results['outputS3Uri']}
--s3-processed-uri {processed_folder+"preloading"}
--instance-type ml.r5.8xlarge
"""
%neptune_ml dataprocessing start —store-to processing_results {processing_params}
The processing job produces a DGL graph object that the downstream training job takes as input.
2. Run the training job – The next step (model training) trains the ML model that will be used for predictions. The model training is done in two stages. The first stage uses a SageMaker processing job to generate a model training strategy. Once the first stage is complete, the SageMaker processing job launches a SageMaker hyperparameter tuning job. This process is fully managed by Neptune ML, and all we have to do is specify a range of training parameters, then execute the job.
training_params=f"""
--job-id {training_job_name}
--data-processing-id {training_job_name}
--instance-type ml.m5.24xlarge
--s3-output-uri {str(s3_bucket_uri)}/training
--max-hpo-number 20
--max-hpo-parallel 2 """
%neptune_ml training start —store-to training_results {training_params} —wait
This will tune a built-in Relational Graph Convolutional Network (R-GCN) as from the DGL library, although custom models can also be developed and trained in a similar way.
3. Deploy the model to a SageMaker endpoint – The final step is to create the inference endpoint, which is an Amazon SageMaker endpoint instance that is launched with the model artifacts produced by the best training job resulting from the hyperparameter tuning job. This endpoint will be used by our Gremlin inference queries later to return the model predictions for the inputs in the request. The following code defines endpoint parameters, most notably the type of instance the model will be hosted on, and creates the endpoint using Neptune line magics.
endpoint_params=f"""
--id {training_job_name}
--model-training-job-id {training_job_name}
--instance-type ml.m5d.4xlarge
"""
%neptune_ml endpoint create —store-to endpoint_results {endpoint_params} —wait
Note: This is a persistent endpoint, and you will have to actively delete it once you stop using it to avoid incurring costs.
Step 6. Use the trained Graph Neural Network to run inferences
With the trained GNN model deployed to a SageMaker inference endpoint, we can now interact with it and generate inferences such as node classification or link prediction. This is made easy in Neptune ML since the Deep Graph ML model can be queried using a very similar Gremlin expression as if we were to query ground truth data directly from the graph database. Specifically, we can use Gremlin inference queries, which automatically prepare the inference data, supply them to the model, and return the inference results.
For example, the node classification GNN we have trained can be queried with just a few lines of code in Python.
We first verify that the country property and specimen-country link do not exist for the gbifids we have artificially:
%%gremlin
g.V().has('specimen', 'gbifid','${selected_gbifid}').
valueMap('gbifid','verbatimscientificname' ,'countrycode')
Next we retrieve the endpoint name from the endpoint results JSON (stored in the preceding code when creating the endpoint).
endpoint=endpoint_results['endpoint']['name']
We can reference that endpoint and make inferences in a Gremlin inference query as shown in the following snippet. The first .with() statement references the endpoint (.with("Neptune#ml.endpoint", '${endpoint}'
), while the second .with() statement specifies that we would like the query to return the top three classification results (with("Neptune#ml.limit",3
). Finally, to retrieve the predicted countrycode properties, we call the properties ()
step with an additional with ()
step (with("Neptune#ml.classification"
), which specifies that we want to retrieve the predicted values for this property.
%%gremlin
#specify endpoint
g.with("Neptune#ml.endpoint", '${endpoint}').
#return the three most probable classification results
V().with("Neptune#ml.limit",3).
has('specimen', 'gbifid','${selected_gbifid}').
properties("countrycode").
with("Neptune#ml.classification").
value()
Here, we have shown how to train and query a node classification model on top of NHM’s PKB. Such a model is useful to fill missing values and identify other possible properties (such as other origin countries), which is useful in establishing a standard database of global biodiversity, collectors, and institutions.
Cleanup
To avoid recurring charges, make sure to delete the Neptune database, terminate all endpoints, and delete any unwanted datasets once you are done.
Conclusion
This article is the second of two posts documenting how the Natural History Museum is using large-scale open sustainability data to accelerate global biodiversity. In this technical post, we outline the creation of the Planetary Knowledge Base, a graph database that is central to these efforts.
Using AWS Glue for serverless ETL, Amazon Neptune as a graph database, and Amazon Neptune ML for model training and inference, we show the creation of the data backbone for the PKB. The patterns, architectures, and code samples introduced here can be readily transferred to other sustainability use cases looking to explore semantic relationships for deeper insights. You can get started today using the AWS Registry of Open Data and creating an Amazon Neptune instance in your AWS account.