AWS Developer Tools Blog

Announcing Amazon Kinesis SubscribeToShard API Support in the AWS SDK for Ruby

Amazon Kinesis launched two significant performance-improving features for Amazon Kinesis Data Streams: enhanced fan-out and an HTTP/2 data retrieval API (“SubscribeToShard”). This API allows data to be delivered from producers to consumers in 70 milliseconds or better. Today, we’re excited to announce the support for Kinesis SubscribeToShard API in the AWS SDK for Ruby.

Before calling the #subscribe_to_shard API

The #subscribe_to_shard API is available in the aws-sdk-kinesis gem version 1.10.0 and later for Ruby version 2.1 and later. This API is built on the HTTP/2 protocol for streaming back shard events, instead of the normal API call based on the HTTP1.1 protocol. You need to have http-2 gem available when using this API.


gem 'http-2', '~> 0.10'

This API is available at AsyncClient instead of Client, and an AsyncResponse object would be available after making the call instead of waiting for a complete sync Response.

Introduction to AsyncClient

Due to the nature of the HTTP/2 protocol, the AWS SDK for Ruby introduced AsyncClient for streaming APIs over HTTP/2. This is different from the Client object, which you might already be familiar with, for APIs over HTTP1.1.

You can create an AsyncClient for Kinesis as follows.


require 'aws-sdk-kinesis'

async_client = Aws::Kinesis::AsyncClient.new(region: 'us-west-2')

# list all available HTTP2/Async operations
async_client.operation_names
# => [:subscribe_to_shard]

In the next section, we walk through the async unidirectional streaming API usage pattern.
For general information about the SubscribeToShardAPI, see the Kinesis SubscribeToShard API documentation

#subscribe_to_shard API usage pattern

In this section, we will go through key parts for making a unidirectional async API call.

Intro to :output_event_stream_handler

Waiting for async responses to be synchronized takes minutes and misses all of the benefits of streaming the processing of events. We’ve introduced event stream objects that enable you to register callbacks for specific (or every) event that arrives.

Taking Kinesis as an example, all available EventStreamobjects would be under Aws::Kinesis::EventStreams. When looking at the #subscribe_to_shard API, we have Aws::Kinesis::EventStreams::SubscribeToShardEventStream available.


output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new

# callback for :on_subscribe_to_shard_event event
output_stream.on_subscribe_to_shard_event_event do |event|
	# => Aws::Kinesis::Types::SubscribeToShardEvent
end

# callback for :on_resource_in_use_exception event
output_stream.on_resource_in_use_exception_event do |exception|
	# => Aws::Kinesis::Types::ResourceInUseException
end

# Or if you want to see every event that arrives
ouput_stream.on_event do |event|
	puts event.event_type
  # => Symbol
end

# For a full list of available methods for registering callbacks, see
# Aws::Kinesis::EventStreams::SubscribeToShardEventStream

To use those event streams, you just need to provide them in the :output_event_stream_handler option when making the request. The following sections provide a complete example.

Prepare the consumer

Before calling the API, you would need to have a consumer using enhanced fan-out. If you already have this consumer living in Kinesis data streams, you may skip this section and jump to following section “Calling the API”.

To create a consumer using enhanced fan-out, you can simply call #register_stream_consumer available at Aws::Kinesis::Client like:


client = Aws::Kinesis::Client.new
resp = client.register_stream_consumer(
	stream_arn: MY_KINESIS_STREAM_ARN,
  consumer_name: 'foo'
)
# save consumer_arn used for later
consumer_arn = resp.consumer.consumer_arn

Make sure check until the consumer is active before using it for subscribing shard events:


status = client.describe_stream_consumer(
  stream_arn: MY_KINESIS_STREAM_ARN,
  consumer_name: 'foo',
).consumer_description.consumer_status
# => 'ACTIVE'

Then, let pick a shard that we will subscribe to, (you would need the shardId)


shard_resp = client.list_shards(
	stream_name: MY_STREAM_NAME
)
# say we want data living in the first shard of the stream
shard_id = shard_resp.shards.first.shard_id

If you already have some fresh data in the stream shard, you can skip the rest of this section and jump to “Calling the API”, else, you can use following code snippets to put some data for testing


records = []
10.times do |i|
	records << {
	  data: "test-data-#{i}",
    partition_key: "partition-key-#{i}"
  }
end
client.put_records(
  records: records,
  stream_name: MY_STREAM_NAME
)

Calling the API

Let’s bring all the pieces together and make the API call.


require 'aws-sdk-kinesis'

async_client = Aws::Kinesis::AsyncClient.new
output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new
output_stream.on_subscribe_to_shard_event_event do |event|
	puts event.inspect
end

# Note: parameter values in the example are placeholders
async_resp = async_client.subscribe_to_shard(
  consumer_arn: "arn:aws:kinesis:us-west-2:ACCOUNTID:stream/foo/consumer/bar:ID",
  shard_id: "shardId-SHARDID",
  starting_position: {
    type: 'AT_SEQUENCE_NUMBER',
    sequence_number: "SEQUENCE_NUMBER"
  },
  output_event_stream_handler: output_stream
)

# Calling wait will wait for h2 stream ends
# Alternatively, you can call #join! on the async response,
# which immediately ends the stream and forces a sync response
resp = async_resp.wait
puts resp

For full documentation of request parameters, see here. For more information about preparing those parameter values, see the Kinesis documentation RegisterStreamConsumer.

When you run the above code, you get output like the following.


# struct Aws::Kinesis::Types::SubscribeToShardEvent records=[ ... ], continuation_sequence_number="...", millis_behind_latest=0, event_type=:subscribe_to_shard_event
# struct Aws::Kinesis::Types::SubscribeToShardEvent records=[ ... ], continuation_sequence_number="...", millis_behind_latest=0, event_type=:subscribe_to_shard_event 
...
# struct Aws::Kinesis::Types::SubscribeToShardOutput ...

Additional notes

In addition to providing an Aws::Kinesis::EventStreams::SubscribeToShardEventStream object with callbacks that are registered (the most recommended usage pattern), you can also simply provide a Ruby Proc object with those callbacks, such as the following.


...
output_stream = Proc.new do |stream|
	stream.on_event do |event|
    puts "an event arrived!"
  end
  ... # other callbacks
end
# still need to provide :output_event_stream_handler
async_resp = async_client.subscribe_to_shard(..., output_event_stream_handler: output_stream)
...

You can also use a block when making the request to access the stream. In this pattern, you don’t have to provide :output_event_stream_handler.


async_resp = async_client.subscribe_to_shard(...) do |stream|
	stream.on_event do |event|
    puts "an event arrived!"
  end
end

A hybrid usage pattern is also supported.


output_stream = Aws::Kinesis::EventStreams::SubscribeToShardEventStream.new
output_stream.on_subscribe_to_shard_event_event do |event|
	puts event.inspect
end

async_resp = async_client.subscribe_to_shard(
  ..., output_event_stream_handler: output_stream) do |stream|
	stream.on_error_event do |error|
    # => Aws::Errors::EventError
    error.event_type # => :error
  end
end
...

One more note for the hybrid usage pattern: When extra callbacks are registered from the block, they’re essentially tracked (appended) at :output_stream_handler still.

Final thoughts

We walked through a #subscribe_to_shard API usage example in Ruby to give you an introduction to AsyncClient and AsyncResponse. Feel free to try out the usage pattern to process events with callbacks, and let us know if you have any questions.

Feedback

Share your questions, comments, and issues with us on GitHub. You can also catch us in our Gitter channel.