AWS Developer Tools Blog
Introducing support for Amazon S3 Select in the AWS SDK for Go
We’re excited to announce support for the Amazon Simple Storage Service (Amazon S3) SelectObjectContent API with EventStream in the AWS SDK for Go. Using Amazon S3 Select, you can query for a subset of data from an S3 object by using simple SQL expressions.
Amazon S3 streams the responses as a series of events, instead of returning the full API response all at once. This enables your applications to process the parts of the response as the application receives them. To support this new API behavior, the AWS SDK for Go now supports processing these asynchronous events without requiring your application to wait for the full response.
Using Amazon S3 Select to query an object
Amazon S3 Select enables you to query an object that contains CSV-formatted or JSON-formatted data with simple SQL expressions. For our example, let’s use a CSV file named target-file.csv
as the key, that’s uploaded to an S3 object in the bucket named my-bucket
in the us-west-2
AWS Region. The CSV file contains a common delimited list of user names and ages.
user_name, age
Banana, 13
Carrot, 22
Merge, 4
Boar, 14
Go Gopher, 9
...
With this CSV file, we want our application to select only users with an age greater than 12
. To do this, we write an SQL expression like the following to select the user_name
field for users with an age
greater than 12
.
SELECT user_name FROM S3Object WHERE cast(age as int) > 12
Using Amazon S3 Select to select records
We can now use the AWS SDK for Go with the Amazon S3 SelectObjectContent
API to select records from JSON and CSV files stored in Amazon S3.
First, we want our application to create a new Amazon S3 client for the AWS Region that our my-bucket
is in. We’ll use this client value, svc
, to make the SelectObjectContent
API calls.
sess, err := session.NewSession()
if err != nil {
return err
}
svc := s3.New(sess, &aws.Config{
Region: aws.String("us-west-2"),
})
Using the SDK’s SelectObjectContent API Reference Guide, our API request parameters could look like the following for the SQL expression we want to use.
params := &s3.SelectObjectContentInput{
Bucket: aws.String("my-bucket"),
Key: aws.String("target-file.csv"),
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT user_name FROM S3Object WHERE cast(age as int) > 12"),
InputSerialization: &s3.InputSerialization{
CSV: &s3.CSVInput{
FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{},
},
}
With the API’s input parameters set, our application can now call the SelectObjectContent API. When the API call successfully completes, our application can begin to read the RecordsEvent
from the API response’s EventStream
. Your application must call the resp.EventStream.Close
method when it’s done reading from the EventStream
. This ensures that the EventStream
connection is always cleaned up. This is similar to how you must call http.Response.Body.Close
when done handling an HTTP response.
resp, err := svc.SelectObjectContent(params)
if err != nil {
return err
}
defer resp.EventStream.Close()
for event := range resp.EventStream.Events() {
select v := event.(type) {
case *s3.RecordsEvent:
// s3.RecordsEvent.Records is a byte slice of select records
fmt.Println(string(s3.RecordsEvent.Records))
case *s3.StatsEvent:
// s3.StatsEvent contains information on the data that’s processed
fmt.Println("Processed", *tv.Details.BytesProcessed, "bytes")
case *s3.EndEvent:
// s3.EndEvent
fmt.Println("SelectObjectContent completed")
}
}
if err := resp.EventStream.Err(); err != nil {
return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}
Concurrent stream processing
Using the SelectObjectContent
EventStream
we can update our application to concurrently stream the selected records with Go’s csv.Reader. The following example is also available in the Amazon S3 package Eventstream concurrent example.
We’ll use a Goroutine to read the records from the contents of the SelectObjectContent
EventStream
with an io.Pipe. This allows our application to concurrently get the selected records.
resp, err := svc.SelectObjectContent(params)
if err != nil {
return err
}
defer resp.EventStream.Close()
results, resultWriter := io.Pipe()
go func() {
defer resultWriter.Close()
for event := range resp.EventStream.Events() {
switch e := event.(type) {
case *RecordsEvent:
resultWriter.Write(e.Payload)
}
}
}()
resReader := csv.NewReader(results)
for {
record, err := resReader.Read()
if err == io.EOF {
break
}
// Print out the records
fmt.Println(record)
}
if err := resp.EventStream.Err(); err != nil {
return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}
Error handling
If an error occurs with the SelectObjectContent
during the API call, the Err
method of EventStream
returns that error. If the error is returned from the service and you want your application to react to specific error codes, you can compare the returned error Code
against the Amazon S3 Select errors.
if err := resp.EventStream.Err(); err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
// Check against specific error codes that need custom handling
}
}
return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}
Final thoughts
With Amazon S3 Select, you can create SQL expressions to select a subset of CSV or JSON records from files stored in Amazon S3. The AWS SDK for Go provides the tools to use this API asynchronously.
Let us know what you think about this feature in the comments below!