AWS Developer Tools Blog

Event-driven architecture using Scala, Docker, Amazon Kinesis Firehose, and the AWS SDK for Java (Part 1)

The key to developing a highly scalable architecture is to decouple functional parts of an application. In the context of an event-driven architecture, those functional parts are single-purpose event processing components (“microservices”). In this blog post, we will show you how to build a microservice using Scala, Akka, Scalatra, the AWS SDK for Java, and Docker. The application uses the AWS SDK for Java to write data into Amazon Kinesis Firehose. It can capture and automatically load streaming data into Amazon S3 and Amazon Redshift. Amazon S3 will be the target of the data we will put into a Firehose delivery stream.

In a two-part series, this blog post will cover the following topics:

Part 1: How to use the AWS SDK for Java to get started with Scala development, how to set up Amazon Kinesis Firehose, and how to test your application locally.

Part 2: How to use Amazon EC2 Container Service (Amazon ECS) and Amazon EC2 Container Registry (Amazon ECR) to roll out your Dockerized Scala application.

After you have downloaded your IDE, set up your AWS account, created an IAM user, and installed the AWS CLI, you can check out the example application from https://github.com/awslabs/aws-akka-firehose.

Accessing Java classes from Scala is no problem, but Scala has language features which can’t be applied to Java directly (for example, function types and traits) that can’t be applied to Java directly. The core of this application is an Akka actor that writes JSON data into Amazon Kinesis Firehose. Akka implements the actor model which is a model of concurrent programming. Actors receive messages and take actions based on those messages. With Akka, it is easy to build a distributed system using remote actors. In this example, the FirehoseActor receives a message from a REST interface that is written with Scalatra, a small and efficient, Sinatra-like web framework for Scala. It implements the servlet specification, so Scalatra apps can be deployed in Tomcat, Jetty or other servlet engines, or JavaEE application servers. To reduce dependencies and complexity, the application uses an embedded Jetty servlet engine that is bundled with the application. To bundle Jetty with our application, we have to add Jetty as dependency in build.scala:

"org.eclipse.jetty" % "jetty-webapp" % "9.2.10.v20150310" % "container;compile",

In this example, we use sbt and the sbt-assembly-plugin that was inspired by Maven’s assembly plugin to build a fat JAR containing all dependencies. We have to add sbt-assembly as a dependency in project/assembly.sbt and specify the main class in build.scala:

.settings(mainClass in assembly := Some("JettyLauncher"))

In this case, the main class is called JettyLauncher. It is responsible for bootstrapping the embedded Jetty servlet engine.

def main(args: Array[String]) {
    val port = if (System.getenv("PORT")!= null) System.getenv("PORT").toInt else 8080

    val server = new Server()
    val connector = new ServerConnector(server)
    connector.setHost("0.0.0.0");
    connector.setPort(port);
    server.addConnector(connector);

    val context = new WebAppContext()
    context setContextPath "/"
    context.setResourceBase("src/main/webapp")
    context.addEventListener(new ScalatraListener)
    context.addServlet(classOf[DefaultServlet], "/")
    server.setHandler(context)
    server.start
    server.join
  }

The ScalatraBootstrap file initializes the actor-system and mounts the FirehoseActorApp servlet under the context /api/:

class ScalatraBootstrap extends LifeCycle {
  val system = ActorSystem()
  val myActor = system.actorOf(Props[FireHoseActor])

  override def init(context: ServletContext) {
    context.mount(new FirehoseActorApp(system, myActor), "/api/*")
  }
}

The servlet exposes a REST API that accepts POST requests to /user with the parameters userId, userName, and timestamp. This API maps the passed values into a UserMessage object and sends this object as a message to the FireHoseActor.

class FirehoseActorApp(system: ActorSystem, firehoseActor: ActorRef) extends ScalatraServlet with JacksonJsonSupport {
  protected implicit lazy val jsonFormats: Formats = DefaultFormats
  implicit val timeout = new Timeout(2, TimeUnit.SECONDS)
  protected implicit def executor: ExecutionContext = system.dispatcher

  post("/user") {
    val userMessage = parsedBody.extract[UserMessage]
    firehoseActor ! userMessage
    Ok()
  }
}

This FirehoseActor uses the AWS SDK for Java to create an Amazon Kinesis Firehose client and send received messages asychronously to a Firehose stream:

def createFireHoseClient(): AmazonKinesisFirehoseAsyncClient = {
    log.debug("Connect to Firehose Stream: " + streamName)
    val client = new AmazonKinesisFirehoseAsyncClient
    val currentRegion = if (Regions.getCurrentRegion != null) Regions.getCurrentRegion else Region.getRegion(Regions.EU_WEST_1)
    client.withRegion(currentRegion)
    return client
  }


def sendMessageToFirehose(payload: ByteBuffer, partitionKey: String): Unit = {
   val putRecordRequest: PutRecordRequest = new PutRecordRequest
   putRecordRequest.setDeliveryStreamName(streamName)
   val record: Record = new Record
   record.setData(payload)
   putRecordRequest.setRecord(record)

   val futureResult: Future[PutRecordResult] = firehoseClient.putRecordAsync(putRecordRequest)

   try {
     val recordResult: PutRecordResult = futureResult.get
     log.debug("Sent message to Kinesis Firehose: " + recordResult.toString)
   }

   catch {
     case iexc: InterruptedException => {
       log.error(iexc.getMessage)
     }

     case eexc: ExecutionException => {
       log.error(eexc.getMessage)
     }
   }
 }

Using sbt to build the application is easy: The command sbt assembly compiles and builds a fat JAR containing all required libraries.

Now we should focus on setting up the infrastructure used by the application. First, we create the S3 bucket:

aws s3 mb --region us-east-1 s3://<your_name>-firehose-target --output json

Second, we create an IAM role to permit access to Firehose:

aws iam create-role --query "Role.Arn" --output json 
    --role-name FirehoseDefaultDeliveryRole 
    --assume-role-policy-document "{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "PermitFirehoseAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "firehose.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}"

Now, we need to create an IAM policy in order to get access to the S3 bucket:

aws iam put-role-policy 
    --role-name FirehoseDefaultDeliveryRole 
    --policy-name FirehoseDefaultDeliveryPolicy 
    --policy-document "{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "PermitFirehoseUsage",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<your_name>-firehose-target",
                "arn:aws:s3:::<your_name>-firehose-target/*"
            ]
        }
    ]
}"

The last step is to create the Firehose stream:

aws firehose create-delivery-stream --region eu-west-1 --query "DeliveryStreamARN" --output json 
    --delivery-stream-name firehose_stream 
    --s3-destination-configuration "RoleARN=<role_arn>,BucketARN=arn:aws:s3:::<your_name>-firehose-target"

To roll out the application in Amazon ECS, we need to build a Docker image containing the fat JAR and a JRE:

FROM phusion/baseimage

# Install Java.
RUN 
  echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | debconf-set-selections && 
  add-apt-repository -y ppa:webupd8team/java && 
  apt-get update && 
  apt-get install -y oracle-java8-installer && 
  rm -rf /var/lib/apt/lists/* && 
  rm -rf /var/cache/oracle-jdk8-installer

WORKDIR /srv/jetty

# Define commonly used JAVA_HOME variable
ENV JAVA_HOME /usr/lib/jvm/java-8-oracle

ADD target/scala-2.11/akka-firehose-assembly-*.jar srv/jetty/
CMD java -server 
   -XX:+DoEscapeAnalysis  
   -XX:+UseStringDeduplication -XX:+UseCompressedOops 
   -XX:+UseG1GC -jar srv/jetty/akka-firehose-assembly-*.jar

This Dockerfile it based on phusion-baseimage and installs Oracle’s JDK 8. It also sets the JAVA_HOME variable, copies the fat JAR to /srv/jetty, and starts using java -jar. Building the Docker image is pretty straightforward:

docker build -t smoell/akka-firehose .

There are two options for testing the application: by using the JAR file directly or by using the Docker container. To start the application by using the JAR file:

java -jar target/scala-2.11/akka-firehose-assembly-0.1.0.jar

With the following curl command, we can post data to our application to send data to our Firehose stream:

curl -v -H "Content-type: application/json" -X POST -d '{"userId":100, "userName": "This is user data"}' http://127.0.0.1:8080/api/user

The test looks a little bit different when we use the Docker container. First, we have to start the Firehose container and pass the access_key and secret_access_key as environment variables. (This is not necessary if we run on an EC2 instance, because the AWS SDK for Java uses the instance metadata.):

docker run --dns=8.8.8.8 --env AWS_ACCESS_KEY_ID="<your_access_key>" --env AWS_SECRET_ACCESS_KEY="<your_secret_access_key>" -p 8080:8080 smoell/akka-firehose

The curl command looks a little bit different this time, because we have to replace 127.0.0.1 with the IP address Docker is using on our local machine:

curl -v -H "Content-type: application/json" -X POST -d '{"userId":100, "userName": "This is user data"}' http://<you_docker_ip>:8080/api/user

After sending data to our application, we can list the files in the S3 bucket we’ve created as a target for Amazon Kinesis Firehose:

aws s3 ls s3://<your_name>-firehose-target --recursive

In this blog post, we used the AWS SDK for Java to create a Scala application to write data in Amazon Kinesis Firehose, Dockerized the application, and then testen and verified the application is working. In the second part of this blog post, we will roll out our application in Amazon ECS by using Amazon ECR as our private Docker registry.