Tag: Kinesis


Event-driven architecture using Scala, Docker, Amazon Kinesis Firehose, and the AWS SDK for Java (Part 2)

by Sascha Moellering | on | in Java | Permalink | Comments |  Share

In the first part of this blog post, we used the AWS SDK for Java to create a Scala application to write data in Amazon Kinesis Firehose, Dockerized the application, and then tested and verified the application is working. Now we will roll out our Scala application in Amazon EC2 Container Service (ECS) and use the Amazon EC2 Container Registry (Amazon ECR) as our private Docker registry.

To roll out our application on Amazon ECS, we have to set up a private Docker registry and an Amazon ECS cluster. First, we have to create IAM roles for Amazon ECS. Before we can launch container instances and register them into a cluster, we must generate an IAM role for those container instances to use when they are launched. This requirement applies to container instances launched with the Amazon Machine Image (AMIoptimized for ECS or any other instances where you will run the agent.

aws iam create-role --role-name ecsInstanceRole --assume-role-policy-document file://<path_to_json_file>/ecsInstanceRole.json

ecsInstanceRole.json:

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

 

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role --role-name ecsInstanceRole

We have to attach an additional policy so the ecsInstanceRole can pull Docker images from Amazon ECR:

aws iam put-role-policy --role-name ecsInstanceRole --policy-name ecrPullPolicy --policy-document file://<path_to_json_file>/ecrPullPolicy.json

ecrPullPolicy.json:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ecr:BatchGetImage",
                "ecr:GetDownloadUrlForLayer",
                "ecr:GetAuthorizationToken"
            ],
            "Resource": "*"
        }
    ]
}

This role needs permission to write into our Amazon Kinesis Firehose stream, too:

aws iam put-role-policy --role-name ecsInstanceRole --policy-name firehosePolicy --policy-document file://<path_to_json_file>/firehosePolicy.json

firehosePolicy.json:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "firehose:DescribeDeliveryStream",
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:aws-region:<account-ID>:deliverystream/<delivery-stream-name>"
            ]
        }
    ]
}

The Amazon ECS service scheduler makes calls on our behalf to the Amazon EC2 and Elastic Load Balancing APIs to register and deregister container instances with our load balancers. Before we can attach a load balancer to an Amazon ECS service, we must create an IAM role for our services to use. This requirement applies to any Amazon ECS service that we plan to use with a load balancer.

aws iam create-role --role-name ecsServiceRole --assume-role-policy-document file://<path_to_json_file>/ecsServiceRole.json

ecsServiceRole.json:

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "ecs.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

 

aws iam put-role-policy --role-name ecsServiceRole --policy-name ecsServicePolicy --policy-document file://<path_to_json_file>/ecsServicePolicy.json

ecsServicePolicy:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticloadbalancing:Describe*",
        "elasticloadbalancing:DeregisterInstancesFromLoadBalancer",
        "elasticloadbalancing:RegisterInstancesWithLoadBalancer",
        "ec2:Describe*",
        "ec2:AuthorizeSecurityGroupIngress"
      ],
      "Resource": [
        "*"
      ]
    }
  ]
}

Now we have set up the IAM roles and permissions required for a fully functional Amazon ECS cluster. Before setting up the cluster, we will create an ELB load balancer to be used for our akka-firehose service. The load balancer is called “akkaElb.It maps port 80 to port 80, and uses the specified subnets and security groups.

aws elb create-load-balancer --load-balancer-name akkaElb --listeners "Protocol=HTTP,LoadBalancerPort=80,InstanceProtocol=HTTP,InstancePort=80" --subnets subnet-a,subnet-b --security-groups sg-a --region us-east-1

The health check configuration of the load balancer contains information such as the protocol, ping port, ping path, response timeout, and health check interval.

aws elb configure-health-check --load-balancer-name akkaElb --health-check Target=HTTP:80/api/healthcheck,Interval=30,UnhealthyThreshold=5,HealthyThreshold=2,Timeout=3 --region us-east-1

We should enable connection draining for our load balancer to ensure it will stop sending requests to deregistering or unhealthy instances, while keeping existing connections open.

aws elb modify-load-balancer-attributes --load-balancer-name akkaElb --load-balancer-attributes "{"ConnectionDraining":{"Enabled":true,"Timeout":300}}" --region us-east-1

After setting up the load balancer, we can now create the Amazon ECS cluster:

aws ecs create-cluster --cluster-name "AkkaCluster" --region us-east-1

To set up our Amazon ECR repository correctly, we will sign in to Amazon ECR and receive a token that will be stored in /home/ec2-user/.docker/config.jsonThe token is valid for 24 hours.

aws ecr get-login --region us-east-1

To store the Docker image, we will create a repository in Amazon ECR:

aws ecr create-repository --repository-name akka-firehose --region us-east-1

Under most circumstances, the Docker image will be created at the end of a build process triggered by a continuous integration server like Jenkins. Therefore, we have to create a repository policy so that the Jenkins IAM role can push and pull Docker images from our newly created repository:

aws ecr set-repository-policy --repository-name akka-firehose --region us-east-1 --policy-text "{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "jenkins_push_pull",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<account-id>:role/<Jenkins-role>"
            },
            "Action": [
                "ecr:DescribeRepositories",
                "ecr:GetRepositoryPolicy",
                "ecr:BatchCheckLayerAvailability",
                "ecr:GetDownloadUrlForLayer",
                "ecr:ListImages",
                "ecr:BatchGetImage",
                "ecr:PutImage",
                "ecr:InitiateLayerUpload",
                "ecr:UploadLayerPart",
                "ecr:CompleteLayerUpload"
            ]
        }
    ]
}"

We have to add a similar repository policy for Amazon ECS because the Amazon EC2 instances in our Amazon ECS cluster have to be able to pull Docker images from our private Docker registry:

aws ecr set-repository-policy --repository-name akka-firehose --region us-east-1 --policy-text "{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "ecs_instance_pull",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<account-id>:role/ecsInstanceRole"
            },
            "Action": [
                "ecr:DescribeRepositories",
                "ecr:GetRepositoryPolicy",
                "ecr:BatchCheckLayerAvailability",
                "ecr:GetDownloadUrlForLayer",
                "ecr:ListImages",
                "ecr:BatchGetImage"
            ]
        }
    ]
}"

Now we can tag and push the Docker container into our repository in Amazon ECR:

docker tag akka-firehose <account-id>.dkr.ecr.us-east-1.amazonaws.com/akka-firehose


docker push <account-id>.dkr.ecr.us-east-1.amazonaws.com/akka-firehose

To populate our Amazon ECS cluster, we have to launch a few Amazon EC2 instances and register them in our cluster. It is important to choose either the Amazon Machine Image optimized for ECR or one for another operating system (such as CoreOS or Ubuntu) with the Amazon ECS container agent installed. (In this example, we will use the ECS-optimized AMI of Amazon Linux.) In the first step, we will create an instance profile and then attach the ecsInstanceRole to this profile:

aws iam create-instance-profile --instance-profile-name ecsServer


aws iam add-role-to-instance-profile --role-name ecsInstanceRole --instance-profile-name ecsServer

Now we will use the following user data script to launch a few EC2 instances in different subnets:

ecs-userdata.txt:

#!/bin/bash
yum update -y
echo ECS_CLUSTER=AkkaCluster >> /etc/ecs/ecs.config

This user data script updates the Linux packages of the Amazon EC2 instance and registers it in the Amazon ECS cluster. By default, the container instance is launched into your default cluster if you don’t specify another one.

aws ec2 run-instances --image-id ami-840e42ee --count 1 --instance-type t2.medium --key-name <your_ssh_key> --security-group-ids sg-a --subnet-id subnet-a --iam-instance-profile Name=ecsServer --user-data file://<path_to_user_data_file>/ecs-userdata.txt --region us-east-1


aws ec2 run-instances --image-id ami-840e42ee --count 1 --instance-type t2.medium --key-name <your_ssh_key> --security-group-ids sg-a --subnet-id subnet-b --iam-instance-profile Name=ecsServer --user-data file://<path_to_user_data_file>/ecs-userdata.txt --region us-east-1

Now we will register our task definition and service:

aws ecs register-task-definition --cli-input-json file://<path_to_json_file>/akka-firehose.json --region us-east-1

akka-firehose.json:

{
  "containerDefinitions": [
    {
      "name": "akka-firehose",
      "image": "<your_account_id>.dkr.ecr.us-east-1.amazonaws.com/akka-firehose",
      "cpu": 1024,
      "memory": 1024,
      "portMappings": [{
                      "containerPort": 8080,
                      "hostPort": 80
              }],
      "essential": true
    }
  ],
  "family": "akka-demo"
}

The task definition specifies which image you want to use, how many resources (CPU and RAM) are required and the port mappings between Docker container and host.

aws ecs create-service --cluster AkkaCluster --service-name akka-firehose-service --cli-input-json file://<path_to_json_file>/akka-elb.json --region us-east-1

akka-elb.json:

{
    "serviceName": "akka-firehose-service",
    "taskDefinition": "akka-demo:1",
    "loadBalancers": [
        {
            "loadBalancerName": "akkaElb",
            "containerName": "akka-firehose",
            "containerPort": 8080
        }
    ],
    "desiredCount": 2,
    "role": "ecsServiceRole"
}

Our service uses the task definition in version 1 and connects the containers on port 8080 to our previously defined ELB load balancer. The configuration specifies the desired number of services to two, so if we have registered two Amazon EC2 instances in our Amazon ECS cluster, each of them should run a service. After a short amount of time, the service should run successfully on the cluster. We can test the current setup by sending a POST request to our ELB load balancer:

curl -v -H "Content-type: application/json" -X POST -d '{"userId":100, "userName": "This is user data"}' http://<address_of_elb>.us-east-1.elb.amazonaws.com/api/user

After sending data to our application, we can list the files in the S3 bucket we created as a target for Amazon Kinesis Firehose:

aws s3 ls s3://<your_name>-firehose-target --recursive

In this blog post we created the infrastructure to roll out our Scala-based microservice in Amazon ECS and Amazon ECR. We hope we’ve given you ideas for creating your own Dockerized Scala-based applications in AWS. Feel free to share your ideas in the comments below! 

Using Amazon Kinesis Firehose

Amazon Kinesis Firehose, a new service announced at this year’s re:Invent conference, is the easiest way to load streaming data into to AWS. Firehose manages all of the resources and automatically scales to match the throughput of your data. It can capture and automatically load streaming data into Amazon S3 and Amazon Redshift.

An example use for Firehose is to keep track of traffic patterns in a web application. To do that, we want to stream the records generated for each request to a web application with a record that contains the current page and the page being requested. Let’s take a look.

Creating the Delivery Stream

First, we need to create our Firehose delivery stream. Although we can do this through the Firehose console, let’s take a look at how we can automate the creation of the delivery stream with PowerShell.

In our PowerShell script, we need to set up the account ID and variables for the names of the resources we will create. The account ID is used in our IAM role to restrict access to just the account with the delivery stream.

$accountId = '<account-id>'
$roleName = '<iam-role-name>'
$s3BucketName = '<s3-bucket-name>'
$firehoseDeliveryStreamName = '<delivery-stream-name>'

Because Firehose will push our streaming data to S3, our script will need to make sure the bucket exists.

$s3Bucket = Get-S3Bucket -BucketName $s3BucketName
if($s3Bucket -eq $null)
{
    New-S3Bucket -BucketName $s3BucketName
}

We also need to set up an IAM role that gives Firehose permission to push data to S3. The role will need access to the Firehose API and the S3 destination bucket. For the Firehose access, our script will use the AmazonKinesisFirehoseFullAccess managed policy. For the S3 access, our script will use an inline policy that restricts access to the destination bucket.

$role = (Get-IAMRoles | ? { $_.RoleName -eq $roleName })

if($role -eq $null)
{
    # Assume role policy allowing Firehose to assume a role
    $assumeRolePolicy = @"
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId":"$accountId"
        }
      }
    }
  ]
}
"@

    $role = New-IAMRole -RoleName $roleName -AssumeRolePolicyDocument $assumeRolePolicy

    # Add managed policy AmazonKinesisFirehoseFullAccess to role
    Register-IAMRolePolicy -RoleName $roleName -PolicyArn 'arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess'

    # Add policy giving access to S3
    $s3AccessPolicy = @"
{
"Version": "2012-10-17",  
    "Statement":
    [    
        {      
            "Sid": "",      
            "Effect": "Allow",      
            "Action":
            [        
                "s3:AbortMultipartUpload",        
                "s3:GetBucketLocation",        
                "s3:GetObject",        
                "s3:ListBucket",        
                "s3:ListBucketMultipartUploads",        
                "s3:PutObject"
            ],      
            "Resource":
            [        
                "arn:aws:s3:::$s3BucketName",
                "arn:aws:s3:::$s3BucketName/*"		    
            ]    
        } 
    ]
}
"@

    Write-IAMRolePolicy -RoleName $roleName -PolicyName "S3Access" -PolicyDocument $s3AccessPolicy

    # Sleep to wait for the eventual consistency of the role creation
    Start-Sleep -Seconds 2
}

Now that the S3 bucket and IAM role are set up, we will create the delivery stream. We just need to set up an S3DestinationConfiguration object and call the New-KINFDeliveryStream cmdlet.

$s3Destination = New-Object Amazon.KinesisFirehose.Model.S3DestinationConfiguration
$s3Destination.BucketARN = "arn:aws:s3:::" + $s3Bucket.BucketName
$s3Destination.RoleARN = $role.Arn

New-KINFDeliveryStream -DeliveryStreamName $firehoseDeliveryStreamName -S3DestinationConfiguration $s3Destination 

After the New-KINFDeliveryStream cmdlet is called, it will take a few minutes to create the delivery stream. We can use the Get-KINFDeliveryStream cmdlet to check the status. As soon as it is active, we can run the following cmdlet to test our stream.

Write-KINFRecord -DeliveryStreamName $firehoseDeliveryStreamName -Record_Text "test record"

This will send one record to our stream, which will be pushed to the S3 bucket. By default, delivery streams buffer data to either 5 MB or 5 minutes before pushing to S3, so check the bucket in 5 minutes.

Writing to the Delivery Stream

In an ASP.NET application, we can write an IHttpModule so we know about every request. With an IHttpModule, we can add an event handler to the BeginRequest event and inspect where the request is coming from and going to. Here is code for our IHttpModule. The Init method adds the event handler. The RecordRequest method grabs the current URL and the request URL and sends that to the delivery stream.

using System;
using System.IO;
using System.Text;
using System.Web;

using Amazon;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;

namespace KinesisFirehoseDemo
{
    /// 
    /// This http module adds an event handler for incoming requests.
	/// For each request a record is sent to Kinesis Firehose. For this demo a
    /// single record is sent at time with the PutRecord operation to
	/// keep the demo simple. This can be optimized by batching records and
	/// using the PutRecordBatch operation.
    /// 
    public class FirehoseSiteTracker : IHttpModule
    {
        IAmazonKinesisFirehose _client;

        // The delivery stream that was created using the setup.ps1 script.
        string _deliveryStreamName = "";

        public FirehoseSiteTracker()
        {
            this._client = new AmazonKinesisFirehoseClient(RegionEndpoint.USWest2);
        }

        public void Dispose() 
        {
            this._client.Dispose(); 
        }

        public bool IsReusable
        {
            get { return true; }
        }

        /// 
        /// Setup the event handler for BeginRequest events.
        /// 
        /// 
        public void Init(HttpApplication application)
        {
            application.BeginRequest +=
                (new EventHandler(this.RecordRequest));
        }

        /// 
        /// Write to Firehose a record with the starting page and the page being requested.
        /// 
        /// 
        /// 
        private void RecordRequest(Object source, EventArgs e)
        {
            // Create HttpApplication and HttpContext objects to access
            // request and response properties.
            HttpApplication application = (HttpApplication)source;
            HttpContext context = application.Context;

            string startingRequest = string.Empty;
            if (context.Request.UrlReferrer != null)
                startingRequest = context.Request.UrlReferrer.PathAndQuery;

            var record = new MemoryStream(UTF8Encoding.UTF8.GetBytes(string.Format("{0}t{1}n",
                startingRequest, context.Request.Path)));

            var request = new PutRecordRequest
            {
                DeliveryStreamName = this._deliveryStreamName,
                Record = new Record
                {
                    Data = record
                }
            };
            this._client.PutRecordAsync(request);
        }
    }
}

 

<system.webServer>
  <modules>
    <add name="siterecorder" type="KinesisFirehoseDemo.FirehoseSiteTracker"/>
  </modules>
</system.webServer>

Now we can navigate through our ASP.NET application and watch data flow into our S3 bucket.

What’s Next

Now that our data is flowing into S3, we have many options for what to do with that data. Firehose has built-in support for pushing our S3 data straight to Amazon Redshift, giving us lots of power for running queries and doing analytics. We could also set up event notifications to have Lambda functions or SQS pollers read the data getting pushed to Amazon S3 in real time.