AWS Developer Tools Blog
Introducing support for Amazon S3 Select in the AWS SDK for Ruby
We’re excited to announce support for the Amazon Simple Storage Service (Amazon S3) #select_object_content
API with event streams in the AWS SDK for Ruby. Amazon S3 Select enables you to retrieve only a subset of data from an object by using simple SQL expressions.
Amazon S3 streams the responses as a series of events, instead of returning the full response all at once. This provides performance benefits by enabling you to process response messages as they come in. To support this behavior, the AWS SDK for Ruby now supports processing events asynchronously, instead of needing to wait for the full response to be loaded before you can process it.
SDK version requirement
To use event streams and the Amazon S3 #select_object_content
API, you need to use version 3 of the AWS SDK for Ruby. You also need to have the aws-sdk-s3
gem version 1.13.0
or later available.
require ‘aws-sdk-s3’
client = Aws::S3::Client.new(region: ‘us-west-2’)
# Some basic S3 client usage
# take S3 #put_object API as an example
resp = client.put_object(bucket: ‘my-bucket’, key: ‘foo’, body: ‘Hello World!’)
# => Aws::S3::Types::PutObjectOutput
For more information about the AWS SDK for Ruby and its guides, check out our GitHub README.
Amazon S3 select usage pattern
Let’s try an SQL query against a CSV file in Amazon S3. Given that I have a CSV document named target_file.csv
stored in an S3 bucket named my-bucket
in the AWS Region us-west-2
, with contents describing user and age information:
user age
foo 12
bar 15
baz 10
...
Assuming this is a huge file and you want to select data of rows of users whose age is over 12 years old, you would have a SQL expression like the following:
SELECT * FROM S3Object WHERE cast(age as int) > 12
By following the SDK for Ruby API documentation for #select_object_content
request syntax, we could come up with input parameters for the operation, like this:
params = {
bucket: ‘my-bucket’,
key: ‘target_file.csv’,
expression_type: ‘SQL’,
expression: “SELECT * FROM S3Object WHERE cast(age as int) > 12”,
input_serialization: {
csv: { file_header_info: ‘USE’}
},
output_serialization: {
csv: {}
}
}
Now we have everything ready to make the API call. To process events once they arrive, you can use a block statement attached to the S3 Select call, or provide a handler that has callbacks registered for events.
Using a Ruby block statement
The following example shows how to use a block to process all events.
client.select_object_content(params) do |stream|
# Callback for every event that arrives
stream.on_event do |event|
puts event.event_type
# => :records / :stats / :end / :cont etc
# Do Something with event object
end
end
Pass in :event_stream_handler
You can pass in a handler that can be an EventStream object or a Ruby Proc object that is registered with callbacks for the :event_stream_handler
option.
Using an EventStream object
Let’s try using the :event_stream_handler
option with an Aws::S3::EventStreams::SelectObjectEventStream
object.
handler = Aws::S3::EventStreams::SelectObjectContentEventStream.new
handler.on_records_event do |event|
# get :records event payload that contains selected contents
puts event.payload.read
# => "bar,15\n …"
end
handler.on_stats_event do |event|
# get :stats event that contains progress information
puts event.details.inspect
# => Aws::S3::Types::Stats bytes_scanned=xx, bytes_processed=xx, bytes_returned=xx
end
# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params)
Using a Proc object
Using a Proc object is also supported with the same pattern.
handler = Proc.new do |stream|
stream.on_records_event do |event|
# Do Something with :records event
end
stream.on_stats_event do |event|
# Do Something with :stats event
end
end
# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params)
Using a hybrid pattern
You can also try a hybrid of the previous two usage patterns, as follows.
handler = Aws::S3::EventStreams::SelectObjectContentEventStream.new
handler.on_records_event do |event|
# get :records event payload that contains selected contents
puts event.payload.read
# => "bar,15\n …”
end
# Add :event_stream_handler option
params[:event_stream_handler] = handler
client.select_object_content(params) do |stream|
# raise the error in the event stream
stream.on_error_event do |event|
raise event
# => Aws::Errors::EventError
# event.event_type => :error
# event.error_code => String
# event.error_message => String
end
end
Notice that in the previous example, the on_error_event
callback is available for capturing all error events that happened after a stream connection is established. If an error happened when the request started, but before the stream response started, you can still rescue it from Aws::S3::Errors::ServiceError
.
When using a hybrid pattern, also note that callbacks passed in with a block statement attached to the API call would be registered to the :event_stream_handler
that was passed in. Thus, if the handler object is reused, it will contain all registered callbacks.
Wait for a full response
Of course, you can still wait for a full response to complete to fetch all events that are available from an Enumerator. (Notice that with the above streaming usage pattern, full response is also available.)
resp = client.select_object_content(params)
# => Aws::S3::Types::SelectObjectContentOutput payload= Aws::S3::Types::SelectObjectContentEventStream: ...
events = resp.payload
# => Aws::S3::Types::SelectObjectContentEventStream: ...
# SelectObjectContentEventStream is an Enumerator containing all events arrived
# it also has a helper method #event_types, returning an array of all valid event types
events.event_types
# => [:records, :stats, :progress, :cont, :end]
events.next
# => Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc160a590a0, event_type=:records
Response stubbing support
In addition to using the S3 Select API, the AWS SDK for Ruby also provides stubbed event stream responses for Rspec tests that you might want to write.
Let’s say you want to mock an event stream response with events (including errors). You just need to provide an Enumerator of mocking events, as follows.
stream = [
{ message_type: ‘event’, event_type: :records, payload: StringIO.new(‘selected content part one’) },
{ message_type: ‘event’, event_type: :records, payload: StringIO.new(‘selected content part two’) },
{ message_type: ‘error’, error_code: ‘InternalError’, error_message: "Something went wrong"}
].each
And you use :stub_responses
, similarly to other APIs.
client = Aws::S3::Client.new(stub_responses: {select_object_content: {payload: stream}})
# Then you can expect
resp = client.select_object_content(params)
# => Aws::S3::Types::SelectObjectContentOutput payload=Aws::S3::Types::SelectObjectContentEventStream: ...
# Get stubbing eventstream
stream = resp.payload
# Aws::S3::Types::SelectObjectContentEventStream: ...
# List all mocked events
stream.to_a
# => [ Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc1c28f7b00, event_type=:records,
# Aws::S3::Types::RecordsEvent payload=StringIO:0x007fc1c28e7138, event_type=:records,
# Aws::Errors::EventError: Aws::Errors::EventError ]
stream.next.payload.read
# => "selected content part one"
stream.next.payload.read
# => "selected content part two"
stream.next
# => Aws::Errors::EventError: Aws::Errors::EventError
Final thoughts
With Amazon S3 Select, you can use SQL statements to filter the contents of Amazon S3 objects and retrieve just the subset of data that you need. You can process selected record events asynchronously with the AWS SDK for Ruby, with multiple usage patterns. You can also use stubbed responses for the S3 Select API and write tests for your code.
Feedback
Please share your questions, comments, and issues with us on GitHub. You can also catch us in Gitter Channel.