AWS Developer Tools Blog
Announcing Amazon Kinesis SubscribeToShard API Support in the AWS SDK for Ruby
Amazon Kinesis launched two significant performance-improving features for Amazon Kinesis Data Streams: enhanced fan-out and an HTTP/2 data retrieval API (“SubscribeToShard”). This API allows data to be delivered from producers to consumers in 70 milliseconds or better. Today, we’re excited to announce the support for Kinesis SubscribeToShard API in the AWS SDK for Ruby.
Before calling the #subscribe_to_shard API
The #subscribe_to_shard
API is available in the aws-sdk-kinesis
gem version 1.10.0 and later for Ruby version 2.1 and later. This API is built on the HTTP/2 protocol for streaming back shard events, instead of the normal API call based on the HTTP1.1 protocol. You need to have http-2 gem available when using this API.
gem 'http-2', '~> 0.10'
This API is available at AsyncClient
instead of Client
, and an AsyncResponse
object would be available after making the call instead of waiting for a complete sync Response
.
Introduction to AsyncClient
Due to the nature of the HTTP/2 protocol, the AWS SDK for Ruby introduced AsyncClient
for streaming APIs over HTTP/2. This is different from the Client
object, which you might already be familiar with, for APIs over HTTP1.1.
You can create an AsyncClient
for Kinesis as follows.
require 'aws-sdk-kinesis'
async_client = Aws::Kinesis::AsyncClient.new(region: 'us-west-2')
# list all available HTTP2/Async operations
async_client.operation_names
# => [:subscribe_to_shard]
In the next section, we walk through the async unidirectional streaming API usage pattern.
For general information about the SubscribeToShardAPI, see the Kinesis SubscribeToShard API documentation
#subscribe_to_shard API usage pattern
In this section, we will go through key parts for making a unidirectional async API call.
Intro to :output_event_stream_handler
Waiting for async responses to be synchronized takes minutes and misses all of the benefits of streaming the processing of events. We’ve introduced event stream objects that enable you to register callbacks for specific (or every) event that arrives.
Taking Kinesis as an example, all available EventStreamobjects
would be under Aws::Kinesis::EventStreams
. When looking at the #subscribe_to_shard
API, we have Aws::Kinesis::EventStreams::SubscribeToShardEventStream
available.
output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new
# callback for :on_subscribe_to_shard_event event
output_stream.on_subscribe_to_shard_event_event do |event|
# => Aws::Kinesis::Types::SubscribeToShardEvent
end
# callback for :on_resource_in_use_exception event
output_stream.on_resource_in_use_exception_event do |exception|
# => Aws::Kinesis::Types::ResourceInUseException
end
# Or if you want to see every event that arrives
ouput_stream.on_event do |event|
puts event.event_type
# => Symbol
end
# For a full list of available methods for registering callbacks, see
# Aws::Kinesis::EventStreams::SubscribeToShardEventStream
To use those event streams, you just need to provide them in the :output_event_stream_handler
option when making the request. The following sections provide a complete example.
Prepare the consumer
Before calling the API, you would need to have a consumer using enhanced fan-out. If you already have this consumer living in Kinesis data streams, you may skip this section and jump to following section “Calling the API”.
To create a consumer using enhanced fan-out, you can simply call #register_stream_consumer
available at Aws::Kinesis::Client
like:
client = Aws::Kinesis::Client.new
resp = client.register_stream_consumer(
stream_arn: MY_KINESIS_STREAM_ARN,
consumer_name: 'foo'
)
# save consumer_arn used for later
consumer_arn = resp.consumer.consumer_arn
Make sure check until the consumer is active before using it for subscribing shard events:
status = client.describe_stream_consumer(
stream_arn: MY_KINESIS_STREAM_ARN,
consumer_name: 'foo',
).consumer_description.consumer_status
# => 'ACTIVE'
Then, let pick a shard that we will subscribe to, (you would need the shardId)
shard_resp = client.list_shards(
stream_name: MY_STREAM_NAME
)
# say we want data living in the first shard of the stream
shard_id = shard_resp.shards.first.shard_id
If you already have some fresh data in the stream shard, you can skip the rest of this section and jump to “Calling the API”, else, you can use following code snippets to put some data for testing
records = []
10.times do |i|
records << {
data: "test-data-#{i}",
partition_key: "partition-key-#{i}"
}
end
client.put_records(
records: records,
stream_name: MY_STREAM_NAME
)
Calling the API
Let’s bring all the pieces together and make the API call.
require 'aws-sdk-kinesis'
async_client = Aws::Kinesis::AsyncClient.new
output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new
output_stream.on_subscribe_to_shard_event_event do |event|
puts event.inspect
end
# Note: parameter values in the example are placeholders
async_resp = async_client.subscribe_to_shard(
consumer_arn: "arn:aws:kinesis:us-west-2:ACCOUNTID:stream/foo/consumer/bar:ID",
shard_id: "shardId-SHARDID",
starting_position: {
type: 'AT_SEQUENCE_NUMBER',
sequence_number: "SEQUENCE_NUMBER"
},
output_event_stream_handler: output_stream
)
# Calling wait will wait for h2 stream ends
# Alternatively, you can call #join! on the async response,
# which immediately ends the stream and forces a sync response
resp = async_resp.wait
puts resp
For full documentation of request parameters, see here. For more information about preparing those parameter values, see the Kinesis documentation RegisterStreamConsumer.
When you run the above code, you get output like the following.
# struct Aws::Kinesis::Types::SubscribeToShardEvent records=[ ... ], continuation_sequence_number="...", millis_behind_latest=0, event_type=:subscribe_to_shard_event
# struct Aws::Kinesis::Types::SubscribeToShardEvent records=[ ... ], continuation_sequence_number="...", millis_behind_latest=0, event_type=:subscribe_to_shard_event
...
# struct Aws::Kinesis::Types::SubscribeToShardOutput ...
Additional notes
In addition to providing an Aws::Kinesis::EventStreams::SubscribeToShardEventStream
object with callbacks that are registered (the most recommended usage pattern), you can also simply provide a Ruby Proc object with those callbacks, such as the following.
...
output_stream = Proc.new do |stream|
stream.on_event do |event|
puts "an event arrived!"
end
... # other callbacks
end
# still need to provide :output_event_stream_handler
async_resp = async_client.subscribe_to_shard(..., output_event_stream_handler: output_stream)
...
You can also use a block when making the request to access the stream. In this pattern, you don’t have to provide :output_event_stream_handler
.
async_resp = async_client.subscribe_to_shard(...) do |stream|
stream.on_event do |event|
puts "an event arrived!"
end
end
A hybrid usage pattern is also supported.
output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new
output_stream.on_subscribe_to_shard_event_event do |event|
puts event.inspect
end
async_resp = async_client.subscribe_to_shard(
..., output_event_stream_handler: output_stream) do |stream|
stream.on_error_event do |error|
# => Aws::Errors::EventError
error.event_type # => :error
end
end
...
One more note for the hybrid usage pattern: When extra callbacks are registered from the block, they’re essentially tracked (appended) at :output_stream_handler
still.
Final thoughts
We walked through a #subscribe_to_shard
API usage example in Ruby to give you an introduction to AsyncClient
and AsyncResponse
. Feel free to try out the usage pattern to process events with callbacks, and let us know if you have any questions.
Feedback
Share your questions, comments, and issues with us on GitHub. You can also catch us in our Gitter channel.