AWS Developer Tools Blog

Introducing support for Amazon S3 Select in the AWS SDK for Go

We’re excited to announce support for the Amazon Simple Storage Service (Amazon S3) SelectObjectContent API with EventStream in the AWS SDK for Go. Using Amazon S3 Select, you can query for a subset of data from an S3 object by using simple SQL expressions.

Amazon S3 streams the responses as a series of events, instead of returning the full API response all at once. This enables your applications to process the parts of the response as the application receives them. To support this new API behavior, the AWS SDK for Go now supports processing these asynchronous events without requiring your application to wait for the full response.

Using Amazon S3 Select to query an object

Amazon S3 Select enables you to query an object that contains CSV-formatted or JSON-formatted data with simple SQL expressions. For our example, let’s use a CSV file named target-file.csv as the key, that’s uploaded to an S3 object in the bucket named my-bucket in the us-west-2 AWS Region. The CSV file contains a common delimited list of user names and ages.

user_name, age
Banana, 13
Carrot, 22
Merge, 4
Boar, 14
Go Gopher, 9
...

With this CSV file, we want our application to select only users with an age greater than 12. To do this, we write an SQL expression like the following to select the user_name field for users with an age greater than 12.

SELECT user_name FROM S3Object WHERE cast(age as int) > 12

Using Amazon S3 Select to select records

We can now use the AWS SDK for Go with the Amazon S3 SelectObjectContent API to select records from JSON and CSV files stored in Amazon S3.

First, we want our application to create a new Amazon S3 client for the AWS Region that our my-bucket is in. We’ll use this client value, svc, to make the SelectObjectContent API calls.

sess, err := session.NewSession()
if err != nil {
	return err
}

svc := s3.New(sess, &aws.Config{
	Region: aws.String("us-west-2"),
})

Using the SDK’s SelectObjectContent API Reference Guide, our API request parameters could look like the following for the SQL expression we want to use.

params := &s3.SelectObjectContentInput{
	Bucket: aws.String("my-bucket"),
	Key: aws.String("target-file.csv"),
	ExpressionType: aws.String(s3.ExpressionTypeSql),
	Expression: aws.String("SELECT user_name FROM S3Object WHERE cast(age as int) > 12"),
	InputSerialization: &s3.InputSerialization{
		CSV: &s3.CSVInput{
			FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
		},
	},
	OutputSerialization: &s3.OutputSerialization{
		CSV: &s3.CSVOutput{},
	},
}

With the API’s input parameters set, our application can now call the SelectObjectContent API. When the API call successfully completes, our application can begin to read the RecordsEvent from the API response’s EventStream. Your application must call the resp.EventStream.Close method when it’s done reading from the EventStream. This ensures that the EventStream connection is always cleaned up. This is similar to how you must call http.Response.Body.Close when done handling an HTTP response.

resp, err := svc.SelectObjectContent(params)
if err != nil {
	return err
}
defer resp.EventStream.Close()

for event := range resp.EventStream.Events() {
	select v := event.(type) {
	case *s3.RecordsEvent:
		// s3.RecordsEvent.Records is a byte slice of select records
		fmt.Println(string(s3.RecordsEvent.Records))
	case *s3.StatsEvent:
		// s3.StatsEvent contains information on the data that’s processed
		fmt.Println("Processed", *tv.Details.BytesProcessed, "bytes")
    case *s3.EndEvent:
		// s3.EndEvent 
		fmt.Println("SelectObjectContent completed")
	}
}

if err := resp.EventStream.Err(); err != nil {
	return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}

Concurrent stream processing

Using the SelectObjectContent EventStream we can update our application to concurrently stream the selected records with Go’s csv.Reader. The following example is also available in the Amazon S3 package Eventstream concurrent example.

We’ll use a Goroutine to read the records from the contents of the SelectObjectContent EventStream with an io.Pipe. This allows our application to concurrently get the selected records.

resp, err := svc.SelectObjectContent(params)
if err != nil {
	return err
}
defer resp.EventStream.Close()

results, resultWriter := io.Pipe()
go func() {
	defer resultWriter.Close()
	for event := range resp.EventStream.Events() {
		switch e := event.(type) {
		case *RecordsEvent:
			resultWriter.Write(e.Payload)
		}
	}
}()

resReader := csv.NewReader(results)
for {
	record, err := resReader.Read()
	if err == io.EOF {
		break
	}
	// Print out the records
	fmt.Println(record)
}

if err := resp.EventStream.Err(); err != nil {
	return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}

Error handling

If an error occurs with the SelectObjectContent during the API call, the Err method of EventStream returns that error. If the error is returned from the service and you want your application to react to specific error codes, you can compare the returned error Code against the Amazon S3 Select errors.

if err := resp.EventStream.Err(); err != nil {
	if aerr, ok := err.(awserr.Error); ok {
		switch aerr.Code() {
		// Check against specific error codes that need custom handling
		}
	}
	return fmt.Errorf("failed to read from SelectObjectContent EventStream, %v", err)
}

Final thoughts

With Amazon S3 Select, you can create SQL expressions to select a subset of CSV or JSON records from files stored in Amazon S3. The AWS SDK for Go provides the tools to use this API asynchronously.

Let us know what you think about this feature in the comments below!