Category: C++


Using a Thread Pool with the AWS SDK for C++

by Jonathan Henson | on | in C++ | | Comments

The default thread executor implementation we provide for asynchronous operations spins up a thread and then detaches it. On modern operating systems, this is often exactly what we want. However, there are some other use cases for which this simply will not work. For example, suppose we want to fire off asynchronous calls to Amazon Kinesis as quickly as we receive an event. Then suppose that we sometimes receive these events at a rate of 10 per millisecond. Even if we are calling Amazon Kinesis from an Amazon Elastic Compute Cloud (EC2) instance in the same data center as our Amazon Kinesis stream, the latency will eventually cause the number of threads on our system to bloat and possibly exhaust.

Here is an example of what this code might look like:


#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>

using namespace Aws::Client;
using namespace Aws::Utils;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;

class KinesisProducer
{
public:
    KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
    {
        ClientConfiguration clientConfiguration;
        m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
    }

    ~KinesisProducer()
    {
        Aws::Delete(m_client);
    }

    void StreamData(const Aws::Vector<ByteBuffer>& data)
    {
        PutRecordsRequest putRecordsRequest;
        putRecordsRequest.SetStreamName(m_streamName);

        for(auto& datum : data)
        {
            PutRecordsRequestEntry putRecordsRequestEntry;
            putRecordsRequestEntry.WithData(datum)
                    .WithPartitionKey(m_partition);

            putRecordsRequest.AddRecords(putRecordsRequestEntry);
        }

        m_client->PutRecordsAsync(putRecordsRequest,
               std::bind(&KinesisProducer::OnPutRecordsAsyncOutcomeReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
    }

private:
    void OnPutRecordsAsyncOutcomeReceived(const KinesisClient*, const Model::PutRecordsRequest&,
                                          const Model::PutRecordsOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
    {
        if(outcome.IsSuccess())
        {
            std::cout << "Records Put Successfully " << std::endl;
        }
        else
        {
            std::cout << "Put Records Failed with error " << outcome.GetError().GetMessage() << std::endl;
        }
    }

    KinesisClient* m_client;
    Aws::String m_partition;
    Aws::String m_streamName;
};


int main()
{
    KinesisProducer producer("kinesis-sample", "announcements");

    while(true)
    {
        Aws::String event1("Event #1");
        Aws::String event2("Event #2");

        producer.StreamData( {
                                     ByteBuffer((unsigned char*)event1.c_str(), event1.length()),
                                     ByteBuffer((unsigned char*)event2.c_str(), event2.length())
                             });
    }

    return 0;
}


This example is intended to show how exhausting the available threads from the operating system will ultimately result in a program crash. Most systems with this problem would be bursty and would not create such a sustained load. Still, we need a better way to handle our threads for such a scenario.

This week, we released a thread pool executor implementation. Simply include the aws/core/utils/threading/Executor.h file. The class name is PooledThreadExecutor. You can set two options: the number of threads for the pool to use and the overflow policy.

Currently, there are two overflow policy modes:

QUEUE_TASKS_EVENLY_ACROSS_THREADS will allow you to push as many tasks as you want to the executor. It will make sure tasks are queued and pulled by each thread as quickly as possible. For most cases, QUEUE_TASKS_EVENLY_ACROSS_THREADS is the preferred option.

REJECT_IMMEDIATELY will reject the task submission if the queued task length ever exceeds the size of the thread pool.

Let’s revise our example to use a thread pool:


#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/threading/Executor.h>

using namespace Aws::Client;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;

class KinesisProducer
{
public:
    KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
    {
        ClientConfiguration clientConfiguration;
        clientConfiguration.executor = Aws::MakeShared<PooledThreadExecutor>("kinesis-sample", 10);
        m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
    }

    ....

The only change we need to make to add the thread pool to our configuration is to assign an instance of the new executor implementation to our ClientConfiguration object.

As always, we welcome your feedback –and even pull requests– about how we can improve this feature.

Using CMake Exports with the AWS SDK for C++

by Jonathan Henson | on | in C++ | | Comments

This is our very first C++ blog post for the AWS Developer blog. There will be more to come. We are excited to receive and share feedback with the C++ community. This first post will start where most projects start, with the building of a simple program.

Building an application in C++ can be a daunting task—especially when dependencies are involved. Even after you have figured out what you want to do and which libraries you need to use, you encounter seemingly endless, painful tasks to compile, link, and distribute your application.

AWS SDK for C++ users most frequently report the difficulty of compiling and linking against the SDK. This involves building the SDK, installing the header files and libraries somewhere, updating the build system of the application with the include and linker paths, and passing definitions to the compiler. This is an error-prone– and now unnecessary– process. CMake has built-in functionality that will handle this scenario. We have now updated the CMake build scripts to handle this complexity for you.

The example we will use in this post assumes you are familiar with Amazon Simple Storage Service (Amazon S3), and know how to download and build the SDK. For more information, see our readme on github. If we want to write a simple program to upload and retrieve objects from Amazon S3. The code would look something like this:


#include <aws/s3/S3Client.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/core/utils/memory/stl/AwsStringStream.h> 

using namespace Aws::S3;
using namespace Aws::S3::Model;

static const char* KEY = "s3_cpp_sample_key";
static const char* BUCKET = "s3-cpp-sample-bucket";

int main()
{
    S3Client client;
    
    //first put an object into s3
    PutObjectRequest putObjectRequest;
    putObjectRequest.WithKey(KEY)
           .WithBucket(BUCKET);

    //this can be any arbitrary stream (e.g. fstream, stringstream etc...)
    auto requestStream = Aws::MakeShared<Aws::StringStream>("s3-sample");
    *requestStream << "Hello World!";
    
    //set the stream that will be put to s3
    putObjectRequest.SetBody(requestStream);

    auto putObjectOutcome = client.PutObject(putObjectRequest);

    if(putObjectOutcome.IsSuccess())
    {
        std::cout << "Put object succeeded" << std::endl;
    }
    else
    {
        std::cout << "Error while putting Object " << putObjectOutcome.GetError().GetExceptionName() << 
               " " << putObjectOutcome.GetError().GetMessage() << std::endl;
    }

    //now get the object back out of s3. The response stream can be overridden here if you want it to go directly to 
    // a file. In this case the default string buf is exactly what we want.
    GetObjectRequest getObjectRequest;
    getObjectRequest.WithBucket(BUCKET)
        .WithKey(KEY);

    auto getObjectOutcome = client.GetObject(getObjectRequest);

    if(getObjectOutcome.IsSuccess())
    {
        std::cout << "Successfully retrieved object from s3 with value: " << std::endl;
        std::cout << getObjectOutcome.GetResult().GetBody().rdbuf() << std::endl << std::endl;;  
    }
    else
    {
        std::cout << "Error while getting object " << getObjectOutcome.GetError().GetExceptionName() <<
             " " << getObjectOutcome.GetError().GetMessage() << std::endl;
    }

    return 0;  
}

Here, we have a direct dependency on aws-cpp-sdk-s3 and an indirect dependency on aws-cpp-sdk-core. Furthermore, we have several platform-specific dependencies that are required to make this work. On Windows, this involves WinHttp and BCrypt. On Linux, curl and OpenSSL. On OSX, curl and CommonCrypto. Other platforms, such as mobile, have their own dependencies. Traditionally, you would need to update your build system to detect each of these platforms and inject the right properties for each target.

However, the build process for the SDK already has access to this information from its configuration step. Why should you have to worry about this mess? Enter CMake export(). What would a CMakeLists.txt look like to build this program? This file generates our build artifacts for each platform we need to support—Visual Studio, XCode, AutoMake, and so on.


cmake_minimum_required(VERSION 2.8)
project(s3-sample)

#this will locate the aws sdk for c++ package so that we can use its targets
find_package(aws-sdk-cpp)

add_executable(s3-sample main.cpp)

#since we called find_package(), this will resolve all dependencies, header files, and cflags necessary
#to build and link your executable. 
target_link_libraries(s3-sample aws-cpp-sdk-s3)

That’s all you need to build your program. When we run this script for Visual Studio, CMake will determine that aws-cpp-sdk-s3 has dependencies on aws-cpp-sdk-core, WinHttp, and BCrypt. Also, the CMake configuration for the aws-sdk-cpp package knows whether the SDK was built using custom memory management. It will make sure the –DAWS_CUSTOM_MEMORY_MANAGEMENT flag is passed to your compiler if needed. The resulting Visual Studio projects will already have the include and linker arguments set and will contain compile definitions that need to be passed to your compiler. On GCC and Clang, we will also go ahead and pass you the –std=c++11 flag.

To configure your project, simply run the following:


cmake –Daws-sdk-cpp_DIR=<path to your SDK build> <path to your source>

You can pass additional CMake arguments , such as –G “Visual Studio 12 2013 Win64” too.

Now you are ready to build with msbuild, make, or whatever other build system you are using.

Obviously, not everyone uses or even wants to use CMake. The aws-sdk-cpp-config.cmake file will contain all of the information required to update your build script to use the SDK.

We’d like to extend a special thanks to our GitHub users for requesting this feature, especially Rico Huijbers who shared a blog post on the topic. His original post can be found here

We are excited to be offering better support to the C++ community. We invite you to try this feature and leave feedback here or on GitHub.