AWS Developer Tools Blog

Introducing support for Amazon S3 Select in the AWS SDK for Ruby

We’re excited to announce support for the Amazon Simple Storage Service (Amazon S3) #select_object_content API with event streams in the AWS SDK for Ruby. Amazon S3 Select enables you to retrieve only a subset of data from an object by using simple SQL expressions.

Amazon S3 streams the responses as a series of events, instead of returning the full response all at once. This provides performance benefits by enabling you to process response messages as they come in. To support this behavior, the AWS SDK for Ruby now supports processing events asynchronously, instead of needing to wait for the full response to be loaded before you can process it.

SDK version requirement

To use event streams and the Amazon S3 #select_object_content API, you need to use version 3 of the AWS SDK for Ruby. You also need to have the aws-sdk-s3 gem version 1.13.0 or later available.


require ‘aws-sdk-s3’

client = Aws::S3::Client.new(region: ‘us-west-2’)

# Some basic S3 client usage
# take S3 #put_object API as an example
resp = client.put_object(bucket: ‘my-bucket’, key: ‘foo’, body: ‘Hello World!’)
# => Aws::S3::Types::PutObjectOutput

For more information about the AWS SDK for Ruby and its guides, check out our GitHub README.

Amazon S3 select usage pattern

Let’s try an SQL query against a CSV file in Amazon S3. Given that I have a CSV document named target_file.csv stored in an S3 bucket named my-bucket in the AWS Region us-west-2, with contents describing user and age information:

 user  age
 foo    12
 bar    15
 baz    10
 ...

Assuming this is a huge file and you want to select data of rows of users whose age is over 12 years old, you would have a SQL expression like the following:


SELECT * FROM S3Object WHERE cast(age as int) > 12

By following the SDK for Ruby API documentation for #select_object_content request syntax, we could come up with input parameters for the operation, like this:


params = {
  bucket: ‘my-bucket’,
  key: ‘target_file.csv’,
  expression_type: ‘SQL’,
  expression: “SELECT * FROM S3Object WHERE cast(age as int) > 12”,
  input_serialization: {
    csv: { file_header_info: ‘USE’}
  },
  output_serialization: {
    csv: {}
  }
}

Now we have everything ready to make the API call. To process events once they arrive, you can use a block statement attached to the S3 Select call, or provide a handler that has callbacks registered for events.

Using a Ruby block statement

The following example shows how to use a block to process all events.


client.select_object_content(params) do |stream|

  # Callback for every event that arrives
  stream.on_event do |event|
     puts event.event_type
     # => :records / :stats / :end / :cont etc
     # Do Something with event object
  end

end

Pass in :event_stream_handler

You can pass in a handler that can be an EventStream object or a Ruby Proc object that is registered with callbacks for the :event_stream_handler option.

Using an EventStream object

Let’s try using the :event_stream_handler option with an Aws::S3::EventStreams::SelectObjectEventStream object.


handler = Aws::S3::EventStreams::SelectObjectContentEventStream.new
handler.on_records_event do |event|
  # get :records event payload that contains selected contents
  puts event.payload.read
  # => "bar,15\n …"
end
handler.on_stats_event do |event|
   # get :stats event that contains progress information
   puts event.details.inspect
   # => Aws::S3::Types::Stats bytes_scanned=xx, bytes_processed=xx, bytes_returned=xx
end

# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params)

Using a Proc object

Using a Proc object is also supported with the same pattern.


handler = Proc.new do |stream|

  stream.on_records_event do |event|
    # Do Something with :records event
  end

  stream.on_stats_event do |event|
     # Do Something with :stats event
  end

end

# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params)

Using a hybrid pattern

You can also try a hybrid of the previous two usage patterns, as follows.


handler = Aws::S3::EventStreams::SelectObjectContentEventStream.new
handler.on_records_event do |event|
  # get :records event payload that contains selected contents
  puts event.payload.read
  # => "bar,15\n …” 
end

# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params) do |stream|
  # raise the error in the event stream
  stream.on_error_event do |event|
    raise event
    # => Aws::Errors::EventError
    # event.event_type => :error
    # event.error_code => String
    # event.error_message => String
  end

end

Notice that in the previous example, the on_error_event callback is available for capturing all error events that happened after a stream connection is established. If an error happened when the request started, but before the stream response started, you can still rescue it from Aws::S3::Errors::ServiceError.

When using a hybrid pattern, also note that callbacks passed in with a block statement attached to the API call would be registered to the :event_stream_handler that was passed in. Thus, if the handler object is reused, it will contain all registered callbacks.

Wait for a full response

Of course, you can still wait for a full response to complete to fetch all events that are available from an Enumerator. (Notice that with the above streaming usage pattern, full response is also available.)


resp = client.select_object_content(params)
# => Aws::S3::Types::SelectObjectContentOutput payload= Aws::S3::Types::SelectObjectContentEventStream: ...

events = resp.payload
# => Aws::S3::Types::SelectObjectContentEventStream: ...

# SelectObjectContentEventStream is an Enumerator containing all events arrived
# it also has a helper method #event_types, returning an array of all valid event types
events.event_types
# => [:records, :stats, :progress, :cont, :end]
events.next
# => Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc160a590a0, event_type=:records

Response stubbing support

In addition to using the S3 Select API, the AWS SDK for Ruby also provides stubbed event stream responses for Rspec tests that you might want to write.

Let’s say you want to mock an event stream response with events (including errors). You just need to provide an Enumerator of mocking events, as follows.


stream = [
   { message_type: ‘event’, event_type: :records, payload: StringIO.new(‘selected content part one’) },
   { message_type: ‘event’, event_type: :records, payload: StringIO.new(‘selected content part two’) },
   { message_type: ‘error’, error_code: ‘InternalError’, error_message: "Something went wrong"}
  ].each

And you use :stub_responses, similarly to other APIs.


client = Aws::S3::Client.new(stub_responses: {select_object_content: {payload: stream}})

# Then you can expect
resp = client.select_object_content(params)
# => Aws::S3::Types::SelectObjectContentOutput payload=Aws::S3::Types::SelectObjectContentEventStream: ...

# Get stubbing eventstream
stream = resp.payload
# Aws::S3::Types::SelectObjectContentEventStream: ...

# List all mocked events
stream.to_a
# => [ Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc1c28f7b00, event_type=:records,
#  Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc1c28e7138, event_type=:records,
# Aws::Errors::EventError: Aws::Errors::EventError ]

stream.next.payload.read
# => "selected content part one"

stream.next.payload.read
# => "selected content part two"

stream.next
# => Aws::Errors::EventError: Aws::Errors::EventError

Final thoughts

With Amazon S3 Select, you can use SQL statements to filter the contents of Amazon S3 objects and retrieve just the subset of data that you need. You can process selected record events asynchronously with the AWS SDK for Ruby, with multiple usage patterns. You can also use stubbed responses for the S3 Select API and write tests for your code.

Feedback

Please share your questions, comments, and issues with us on GitHub. You can also catch us in Gitter Channel.