Category: Amazon Elasticsearch Service*


Amazon Elasticsearch Service, VPC 지원 기능 출시

지난주 Amazon VPC 내부에서 NAT 인스턴스나 인터넷 게이트웨이 없이도 Amazon Elasticsearch Service 도메인에 접속할 수 있습니다. Amazon ES용 VPC 지원은 구성하기 쉽고 안정적이면서도 보안이 한층 강화되어 있습니다. VPC 지원을 통해 다른 서비스와 Amazon ES 간 트래픽이 퍼블릭 인터넷과 분리된 AWS 네트워크 내에 완전히 유지됩니다. 기존 VPC 보안 그룹을 사용하여 네트워크 액세스를 관리하고, AWS Identity and Access Management(IAM)를 사용하여 보호력을 강화할 수 있습니다. Amazon ES 도메인용 VPC 지원은 추가 비용 없이 사용할 수 있습니다.

시작하기

VPC에서 Amazon Elasticsearch Service 도메인을 쉽게 만들 수 있습니다. 클러스터를 만들고 나서 [“VPC access”]를 선택할 때 사용하는 일반적인 단계 전체를 실행합니다.

됐습니다. 이제 다 끝났습니다. 이제 VPC 내에서 도메인에 액세스할 수 있습니다!

알아둘 사항

VPC를 지원하기 위해 Amazon ES는 엔드포인트를 사용자의 VPC의 서브넷 1개 이상에 배치합니다. Amazon ES는 클러스터의 각 데이터 노드별로 VPC에 탄력적 네트워크 인터페이스(ENI)를 배치합니다. 각 ENI는 서브넷 IPv4 범위의 프라이빗 IP 주소를 사용하고 퍼블릭 DNS 호스트 이름을 수신합니다. 영역 인식이 활성화되어 있으면 Amazon ES는 각기 다른 가용 영역에 있는 서브넷 2개에 엔드포인트를 만들기 때문에, 데이터 내구성이 강화됩니다.

클러스터의 노드 개수는 IP 주소 개수의 3개로 확보해야 합니다. 영역 인식이 활성화되어 있으면 이 개수를 2로 나눌 수 있습니다. 사용자는 Amazon ES에서 별도의 서브넷을 만드는 것이 가장 좋습니다.

참고 사항:

자세한 내용은 Amazon ES 설명서를 참조하십시오.

Randall;

이 글은 Amazon Elasticsearch Service now supports VPC의 한국어 번역입니다.

Apache Flink를 이용한 AWS기반 실시간 스트림 처리 파이프라인 구성하기

오늘날 비즈니스 환경에서, 다양한 데이터 소스의 꾸준한 증가에 맞추어 데이터는 계속적으로 생성되고 있습니다. 따라서, 원시 데이터의 대규모 스트림을 통해 실행 가능한 통찰력을 얻기 위한 데이터를 지속적으로 수집하고, 저장하고, 처리하는 능력을 갖춘다는 것은 조직의 경쟁력 측면에서 장점이라 하겠습니다.

Apache Flink스트림 프로세싱 파이프라인의 기반을 갖추는 데 매우 적합한 오픈소스 프로젝트 입니다. 스트리밍 데이터의 지속적인 분석에 적합한 고유한 기능을 제공합니다. 하지만, Flink를 기반으로 파이프라인 을 구축하고 유지하려면 때때로 물리적 자원 및 운영상의 노력 외에 상당한 전문 지식이 필요합니다.

이 글을 통해서 Amazon EMR, Amazon KinesisAmazon Elasticsearch Service (ES)를 사용하는 Apache Flink를 기반으로 일관성 있고 확장 가능하며 안정적인 스트림 프로세싱 파이프라인에 대한 레퍼런스 아키텍처를 설명해드리고자 합니다. AWSLabs GitHub 저장소는 레퍼런스 아키텍처를 실제로 탐색하는데 필요한 아티팩트(artifact)를 제공합니다. 리소스에는 샘플 데이터를 Amazon Kinesis Stream으로 수집하는 프로듀서(producer) 애플리케이션과 실시간으로 데이터를 분석하고 그 결과를 시각화하기 위해 Amazon ES로 보내는 Flink 프로그램이 포함되어 있습니다.

실시간으로 택시 데이터 지리적 위치 분석하기
택시의 차량 운영 최적화와 관련하여 다음과 같은 시나리오를 생각해보겠습니다. 뉴욕시에서 현재 운영중인 수많은 택시들로부터 운행 정보를 지속적으로 확보합니다. 그리고 이 데이터를 사용하여 수집된 데이터를 실시간으로 분석하고 데이터에 기반한 의사 결정을 내림으로써 운영을 최적화하고자 합니다.

예를 들어, 현재 택시 수요가 많은 지역을 의미하는 핫스팟(Hot spot)을 식별해서 빈 택시들이 이 곳으로 이동하도록 할 수 있습니다. 또, 현재 교통 상황을 파악해서 가까운 공항으로 가기 위한 대략적인 운행 시간을 고객에게 알려드릴 수도 있습니다. 당연히, 이러한 결정은 현재의 수요와 교통 상황을 상세하게 반영하고 있는 정보를 기반으로 합니다. 유입되는 데이터는 지속적이면서도 시간에 맞춰 적절하게 분석되어야 합니다. 이와 관련한 KPI와 이를 통해 도출되는 인사이트는 실시간으로 대시보드에서 액세스할 수 있어야 합니다.

이 글의 목적에 맞게, 뉴욕시에서 수집된 택시 운행 이력 데이터셋을 Amazon Kinesis Streams로 재생해서 운행 이벤트를 만들어보겠습니다. 이 데이터셋은 New York City Taxi & Limousine Commission 웹사이트에서 얻을 수 있으며, 지리적 위치 정보와 택시 운행 관련 요금 정보 등이 포함되어 있습니다.

보다 현실적인 시나리오에서는, AWS IoT를 사용하여 택시에 설치된 원격 측정 유닛에서 데이터를 수집한 다음 이 데이터를 Amazon Kinesis Streams로 처리할 수도 있습니다.

높은 신뢰성과  확장 가능한 스트림 프로세싱 파이프라인 아키텍처
파이프라인이 택시를 운영하고 최적화하기 위한 중앙 집중형 툴을 제공하므로, 단일 노드의 장애에 대한 내성을 갖는 아키텍처를 구축하는 것은 매우 중요합니다. 파이프라인은 유입되는 이벤트의 변화율에 맞춰 적응할 것입니다. 따라서, 이벤트 수집, 실질적인 처리, 확보한 인사이트의 시각화를 다른 구성 요소들로 분리합니다. 인프라의 구성요소들 간의 결합도를 낮추고, 관리형 서비스를 사용해서 장애에 대해 파이프라인의 견고성(robustness)을 증가시킬 수 있습니다. 또, 인프라의 여러 부분을 확장시킬 수 있고 전체 파이프라인을 구축하고 운영하는데 필요한 노력을 줄일 수 있습니다.

기대하는 데이터 통찰력을 얻기 위한 쿼리 계산으로부터 택시를 통해 전송된 이벤트의 수집과 저장을 분리하여, 인프라의 견고성을 상당히 증가시킬 수 있습니다.

이벤트는 처음에는 Amazon Kinesis Streams를 통해 유지됩니다. Amazon Kinesis Streams는 재생 가능하고 순서가 있는 로그(log)를 보유하며 여러 가용 영역(Availability Zone)에 이벤트를 중복 저장합니다. 이후에, 스트림에서 이벤트를 읽어서 Apache Flink에서 처리합니다. Flink는 내부 상태를 지속적으로 스냅샷으로 남겨놓기 때문에, 스냅샷에서 내부 상태를 복원하고 스트림에서 재처리가 필요한 이벤트를 재생하여 오퍼레이터 또는 전체 노드에서 발생한 장애를 복구 할 수 있습니다.

이벤트 저장을 위해 이렇게 한 곳에 로그를 보유하는 또 다른 장점은 여러 애플리케이션에서 데이터를 소비(Consume)할 수 있다는 점입니다. 벤치마크 테스트 및 일반 테스트를 위해 다른 버전의 Flink 애플리케이션을 나란히 실행시키는 것도 가능합니다. 또는, 장기간 아카이빙을 위해 스트림에서 Amazon S3로 데이터를 저장하는 데에 Amazon Kinesis Firehose를 사용할 수 도 있고, 그런 다음 Amazon Athena를 사용하여 과거의 기록을 상세하게 분석할 수도 있습니다.

Amazon Kinesis Streams, Amazon EMR, Amazon ES는 간단한 API 호출을 통해 생성 및 확장할 수 있는 관리형 서비스입니다. 따라서 이러한 서비스를 사용하면 비즈니스 가치를 제공하기 위한 전문 작업에 집중할 수 있습니다. 파이프라인 전체를 구축하고, 운영하고, 확장하기 위해 필요한 것들이 구분되어 있지 않은 무거운 작업을AWS로 수행하시기 바랍니다. 파이프 라인 생성은 AWS CloudFormation으로 완전히 자동화 될 수 있습니다. 개별 구성 요소는 Amazon CloudWatch를 통해 모니터링되고 자동으로 확장될 수 있습니다. 오류 사항 감지 및 자동 완화 역시 지원됩니다.

이후 부분에서는, AWS에서 레퍼런스 아키텍처를 만들고 실행하는 것에 관련된 부분에 중점을 두겠습니다. Flink에 대한 자세한 사항은, API를 이용하여 Flink 프로그램을 어떻게 구현하는지 다루는 Flink training session 을 참조하시기 바랍니다. 이 세션에서 사용된 시나리오는 이 글에서 다루는 내용과 상당히 비슷합니다.

레퍼런스 아키텍처 빌드 및 실행

실제 택시 운행 분석 애플리케이션을 위해, 다음 2 개의 CloudFormation 템플릿을 사용하여 레퍼런스 아키텍처를 만들고 실행합니다:

  • 첫번째 템플릿은 택시 운행을 스트림으로 수집하고 Flink로 운행 정보를 분석하기 위한 런타임 아티팩트(artifact)를 구축합니다.
  • 두번째 템플릿은 애플리케이션을 실행시키는 인프라 리소스를 생성합니다.

Flink 애플리케이션과 CloudFormation템플릿을 비롯한, 레퍼런스 아키텍처를 구축하고 실행시키는데 필요한 리소스는 AWSLabs GitHub 저장소의 flink-stream-processing-refarch에 있습니다.

런타임 아티팩트(Artifacts) 빌드 및 인프라 생성

우선 첫번째 CloudFormation 템플릿을 실행해서 AWS CodePipeline 파이프라인을 생성합니다. 이 파이프라인은 서버리스 방식으로 AWS CodeBuild를 사용해서 아티팩트를 만듭니다. 여기에 Maven을 설치하고 Flink Amazon Kinesis 커넥터 및 기타 런타임 아티팩트를 수동으로 구축할 수 있습니다. 파이프라인의 모든 단계가 성공적으로 완료되면, CloudFormation 템플릿의 출력 섹션에 지정되어 있는 S3 버킷에서 아티팩트를 검색할 수 있게 됩니다.

첫번째 템플릿이 생성되고 런타임 아티팩트가 만들어지면, 두번째 CloudFormation 템플릿을 실행합니다. 이를 통해 앞에서 설명한 레퍼런스의 리소스를 생성합니다. (SSH 접속을 위한 Key Pair가 없을 경우 두번째 CloudFormation 템플릿 실행 전에 미리 생성하여 준비합니다)

다음 단계를 진행하기 전에 위의 템플릿 2개가 모두 성공적으로 생성될 때까지 기다립니다. 약 15분 정도 걸릴 수 있으니, CloudFormation이 모든 작업을 마칠 때까지 자유롭게 커피 한 잔 하면서 기다립니다.

Flink 런타임 시작 및 Flink 프로그램 제출하기
Flink 런타임을 시작하고, 분석을 하는 Flink 프로그램을 서브밋(submit)하기 위해, EMR 마스터 노드에 연결합니다. 인프라 프로비저닝과 런타임 아티팩트 빌드를 위해 사용된 2개의 CloudFormation 템플릿의 실행 결과를 다음 명령어와 관련 파라미터를 통해 확인할 수 있습니다.

$ ssh -C -D 8157 hadoop@«EMR master node IP»

CloudFormation 템플릿을 통해 프로비저닝 된 EMR 클러스터는 노드당 4개의 vCPU를 지닌 2개의 c4.xlarge 코어 노드로 구성됩니다. 일반적으로, 태스크 매니저당 슬롯의 개수와 노드 코어의 수를 동일하게 맞춥니다. 여기서는, 2개의 태스크 매니저와 각 태스크 매니저 당 4개의 슬롯을 지닌 long-runnig Flink 클러스터로 시작하면 적절하겠습니다 (EMR 마스터 노드에 접속한 상태에서 다음 명령어를 실행합니다):

$ flink-yarn-session -n 2 -s 4 -tm 4096 -d

Flink 런타임이 실행되고 나면, 택시 스트림 프로세서 프로그램은 Amazon Kinesis 스트림 내의 운행 이벤트를 실시간으로 분석하기 위해 Flink 런타임에 제출 됩니다.

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Flink 애플리케이션이 실행 중이니, 스트림에서 유입되는 이벤트를 읽습니다. 그런 다음 이벤트 시간에 따라 타임 윈도우 내에서 이들을 집계하여 결과를 Amazon ES로 전송합니다. Flink 애플리케이션은 소규모 요청으로 Elasitcsearch 클러스터에 과부하가 걸리지 않도록 배치 레코드를 처리하고, 배치 처리 요청에 서명을 통해서Elasticsearch 클러스터의 보안 구성이 가능하도록 합니다.

브라우저에서 프록시(proxy)를 활성화 해놓았다면, 마스터 노드에 대한 SSH 세션을 수립하는 동적 포트 포워딩을 통해서Flink 웹 인터페이스를 볼 수 있습니다.

택시 운행 이벤트를 Amazon Kinesis Stream으로 입력
이벤트를 수집하기 위해, 택시 스트림 프로듀서 애플리케이션을 사용합니다. 이 애플리케이션은 미국 뉴욕 시에서 기록한 택시 운행의 이력 데이터셋을 S3로부터 읽어와서 8개의 샤드(shard)가 있는 Amazon Kinesis Stream으로 재생합니다. 택시 운행에 외에도, 프로듀서 애플리케이션은 워터마크 이벤트를 스트림으로 가져옵니다. 이를 통해서 프로듀서가 이력 데이터셋을 재생하는 시간을 Flink 애플리케이션이 결정할 수 있게 됩니다.

$ ssh -C ec2-user@«producer instance IP»
$ aws s3 cp s3://«artifact-bucket»/artifacts/kinesis-taxi-stream-producer-1.0.jar .
$ java -jar kinesis-taxi-stream-producer-1.0.jar -speedup 1440 -stream «Kinesis stream name» -region «AWS region»

이 애플리케이션은 이번 블로그에서 논의한  레퍼런스 아키텍처에만 특화된 것은 아닙니다. 예를 들어 Apache Flink 대신 Amazon Kinesis Analytics를 기반으로 유사한 스트림 처리 아키텍처를 구축하는 식으로 다른 목적에도 쉽게 재사용이 가능합니다.

Kibana 대시보드 탐색

전체 파이프라인이 실행중이므로, Flink 애플리케이션에 의해 실시간으로 제공되는 인사이트를 보여주는 Kibana 대시보드를 볼 수 있습니다:

https://«Elasticsearch end-point»/_plugin/kibana/app/kibana#/dashboard/Taxi-Trips-Dashboard

이 글의 목적에 맞게 Elasticsearch 클러스터는 인프라를 생성하는 CloudFormation 템플릿의 파라미터로 지정된 IP 주소 범위의 연결을 허용하도록 구성됩니다. 프로덕션-준비 상태의 애플리케이션의 경우, 이러한 설정이 맞지 않을 수도 있고 설정 자체가 불가능할 수도 있습니다. Elasticsearch 클러스터에 안전하게 연결하는 방법에 대한 자세한 내용은 AWS 데이터베이스 블로그의 Amazon Elasticsearch Service에 대한 액세스 제어 설정을 참조하시기 바랍니다.

Kibana 대시 보드에서 왼쪽의 지도는 택시 운행의 시작 지점을 시각화하여 보여주고 있습니다. 사각형이 빨간색에 가까울수록 해당 위치에서 더 많은 택시 운행이 시작됨을 의미합니다. 오른쪽의 그래프 차트는 John F. Kennedy 국제 공항과 LaGuardia Airport까지의 택시 운행 평균 시간을 각각 보여줍니다.

이러한 정보를 가지고, 현재 수요가 많은 지역에 빈 택시를 사전 조치하여 보내고 현지 공항으로의 운행 시간을 보다 정확하게 예측함으로써 택시 차량 운영 최적화가 가능해집니다.

이제 기본 인프라를 확장시킬 수 있습니다. 예를 들면, 스트림의 샤드 용량을 확장 시킵니다. Elasticsearch 클러스터의 인스턴수 갯수 또는 타입도 변경합니다. 아울러, 전체 파이프라인이 스케일 조정 중에도 제대로 동작하고 응답하는지 확인합니다.

AWS 에서 Apache Flink 실행

앞에서 본 것처럼, Flink 런타임은 YARN을 통해서 배포될 수 있습니다. 따라서 EMR은 AWS 상에서 Flink를 실행시키기에 매우 적합합니다. 하지만, Flink 애플리케이션을 빌드하고 실행시키기 위해 해야할 AWS에 관련된 고려사항이 몇 가지 있습니다:

  • Flink Amazon Kinesis 커넥터 빌드
  • Amazon Kinesis 컨슈머(Consumer) 환경 설정 수정
  • Amazon Kinesis에 워터마크를 서브밋해서 이벤트 시간 처리를 가능하게 함
  • Flink를 Amazon ES에 연동

Flink Amazon Kinesis 커넥터 빌드

Flink는 Amazon Kinesis Streams를 위한 커넥터를 제공합니다. 다른 Flink 아티팩트와는 달리, Amazon Kinesis 커넥터는 Maven central에서는 사용할 수 없습니다. 따라서 여러분이 직접 빌드하셔야 합니다. Maven 3.3.x의 경우 부적절하게 가려진 의존성(dependency)으로 인한 출력이 만들어질 수 있으므로, Maven 3.3.x 이상의 배포판 보다는 Maven 3.2.x로 Flink 빌드를 권장합니다.

$ wget -qO- https://github.com/apache/flink/archive/release-1.2.0.zip | bsdtar -xf-
$ cd flink-release-1.2.0
$ mvn clean package -Pinclude-kinesis -DskipTests -Dhadoop-two.version=2.7.3 -Daws.sdk.version=1.11.113 -Daws.kinesis-kcl.version=1.7.5 -Daws.kinesis-kpl.version=0.12.3

Flink Amazon Kinesis 커넥터를 받고 나면, 로컬 Maven 저장소에 .jar 파일들을 임포트 할 수 있습니다.

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.10-1.2.0.jar -DpomFile=flink-connector-kinesis_2.10-1.2.0.pom.xml

Amazon Kinesis consumer 환경 설정 적용
Flink는 최근 EMR 클러스터와 관련된 role에서 AWS 자격 증명을 얻을 수 있도록 지원하기 시작했습니다. Flink 애플리케이션 소스 코드에서 AWS_CREDENTIALS_PROVIDER 속성을 AUTO로 설정하고 Properties 객체에서 AWS_ACCESS_KEY_ID 및 AWS_SECRET_ACCESS_KEY 파라미터를 생략하여 이 기능을 활성화합니다.

Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

자격 증명은 인스턴스의 메타데이터에서 자동으로 검색되므로 장기 자격 증명을 Flink 애플리케이션 또는 EMR 클러스터의 소스 코드에 저장할 필요가 없습니다.

프로듀서 애플리케이션은 초당 수천 개의 이벤트를 스트림으로 가져오므로, 단일 GetRecords 호출에서 Flink가 가져온 레코드 수를 늘리는 데 도움이 됩니다. 이 값을 Amazon Kinesis 에서 지원하는 최대값으로 변경하시기 바랍니다.

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");

Amazon Kinesis에 대한 워터마크 제출을 통한 이벤트 타임 프로세싱 지원
Flink는 여러 가지 개념의 타임, 특히 이벤트 타임을 지원합니다. 이벤트 타임은 스트리밍 응용 프로그램에 매우 적합한데, 이는 쿼리에 대해 안정적인 의미의 결과를 내주기 때문입니다. 이벤트 타임은 프로듀서 또는 프로듀서에 근접한 경우에 의해 결정됩니다. 네트워크로 인한 이벤트 순서의 재지정이 쿼리 결과에 미치는 영향은 매우 작습니다.

이벤트 타임을 실현하기 위해, Flink는 일정한 시간 간격으로 프로듀서가 보낸 워터마크를 사용하여 소스의 현재 시간을 Flink 런타임에게 알립니다. Amazon Kinesis Streams와 통합할 때 Flink 에 워터 마크를 제공하는 두 가지 방법이 있습니다:

  • 스트림에 워터마크를 수동으로 추가
  • 스트림 수집 과정에서 이벤트에 자동으로 추가되도록 ApproximalArrivalTime을 이용

Amazon Kinesis 스트림에서 시간 모델을 이벤트 시간으로 설정하면, Flink는 Amazon Kinesis에서 제공하는 ApproximalArrivalTime 값을 자동으로 사용합니다.

StreamExecutionEnvironment env = StreamExecutionEnviron-ment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

또는, 스트림의 해당 이벤트에서 워터 마크 정보를 추출하는 사용자 정의 Timestamp Assigner 연산자를 지정하여 프로듀서가 정해 놓은 시간을 사용하도록 선택할 수도 있습니다.

DataStream<Event> kinesis = env
	.addSource(new FlinkKinesisConsumer<>(...))
	.assignTimestampsAndWatermarks(new PunctuatedAssigner())

PunctuatedAssigner 를 사용할 경우, 모든 개별 샤드에 워터 마크를 가져오는 것이 중요합니다. 왜냐하면Flink는 스트림별 샤드를 개별적으로 처리하기 때문입니다. 이는 스트림의 샤드를 나열하여 해결할 수 있습니다. 워터마크가 전송될 샤드의 해시 범위에 대해 해시 키를 명시적으로 세팅해서 특정 샤드에 워터마크를 수집합니다.

Amazon Kinesis를 이용하여 택시 운행 정보를 수집하는 프로듀서는 후자의 접근법을 사용합니다. 구현에 관련된 자세한 사항은 AWSLabs GitHub 저장소의flink-stream-processing-refarch를 참고하시기 바랍니다.

Amazon ES와 Flink의 연동
Flink는 Elasticsearch에 대한 몇 가지 커넥터를 제공합니다. 그러나 이러한 모든 커넥터는 단지 Elasticsearch의 TCP 전송 프로토콜을 지원합니다. 반면 Amazon ES는 HTTP 프로토콜을 사용합니다. Elasticsearch 5부터는 TCP 전송 프로토콜이 더 이상 사용되지 않습니다. HTTP 프로토콜을 지원하는 Flink 용 Elasticsearch 커넥터는 여전히 동작하지만, Jest 라이브러리를 사용하여 Amazon ES에 연결할 수 있는 사용자 정의 싱크(sink)를 빌드 할 수 있습니다. 싱크(sink)는 IAM 자격 증명으로 요청에 서명 할 수 있어야합니다.

Elasticsearch 싱크의 전체 구현에 대한 자세한 내용은 Flink 애플리케이션의 소스 코드가 포함 된 flink-taxi-stream-processor AWSLabs GitHub 저장소를 참조하십시오.

요약

이 글에서는 Apache Flink를 기반으로 일관성 있고 확장 가능하며 안정적인 스트림 처리 아키텍처를 구축하는 방법에 대해 설명했습니다. 또한 관리형 서비스를 활용하여 짧은 대기 시간 및 높은 처리량 스트림 처리 파이프라인을 구축하고 유지 관리하는 데 필요한 전문 지식과 운영 노력을 줄이는 방법을 보여줌으로써 전문 지식을 비즈니스 가치를 제공하는 데에 집중할 수 있습니다.

오늘부터 Amazon EMR에서 Apache Flink를 사용해보시기 바랍니다. AWSLabs GitHub 저장소에는 주어진 예제를 실행하는 데 필요한 리소스가 포함되어 있으며 빠르게 시작하는 데 도움이 되는 추가 정보가 포함되어 있습니다.

이 글은 AWS Bigdata 블로그의 Build a Real-time Stream Processing Pipeline with Apache Flink on AWS의 한국어 번역으로, AWS 프로페셔널 서비스의 남궁영환 컨설턴트께서 번역해 주셨습니다.

 

Amazon Kinesis 업데이트 – Amazon Elasticsearch Service 통합, 샤드 통계 및 시간 기반 반복 기능

Amazon Kinesis는 대용량 스트리밍 데이터를 클라우드에서 손쉽게 처리할 수 있도록 도와 줍니다.

Amazon Kinesis 플랫폼은 3개의 서비스로 구성되어 있습니다: Kinesis Streams은 개발자가 자신의 스트리밍 데이터 처리 애플리케이션을 구현할 수 있습니다; Kinesis Firehose를 통해 스트리밍 데이터를 저장하고 분석하기 위해 AWS에 저장하는 기능에 초점을 맞추었습니다; Kinesis Analytics 를 통해 스트리밍 데이터를 표준 SQL을 통해 분석 할 수 있습니다.

많은 AWS 고객이 스트리밍 데이터를 실시간으로 수집 · 처리하는 방식으로 Kinesis Streams와 Kinesis Firehose을 이용​​하고 있습니다. 이는 완전 관리 서비스이기 때문에 사용 편의성을 높아 스트리밍 데이터 처리를 위한 인프라를 직접 관리하는 대신 응용 프로그램에 개발하는 시간에 투자를 할 수 있다는 장점이 있습니다.

오늘 Amazon Kinesis Streams와 Kinesis Firehose 관한 3개의 새로운 기능을 신규 발표합니다.

  • Elasticsearch 통합– Amazon Kinesis Firehose는 Amazon Elasticsearch Service에 스트리밍 데이터를 전달할 수 있습니다..
  • 강화된 모니터링 수치 제공– Amazon Kinesis는 샤드 단위 메트릭을 CloudWatch로 매 분당 보낼 수 있습니다.
  • 유연성 확보– Amazon Kinesis에서 시간 기반의 반복자를 이용하여 레코드를 수신 할 수 있습니다.

Amazon Elasticsearch Service 통합
Elasticsearch
는 인기있는 오픈 소스 검색 · 분석 엔진입니다. Amazon Elasticsearch Service는 AWS 클라우드에서 Elasticsearch를 손 쉽게 설치하고, 높은 확장성을 가지고 운영할 수 있는  관리 서비스입니다.  오늘 부터 Kinesis Firehose 데이터 스트림을 Amazon Elasticsearch Service 클러스터에 배포 할 수 있게되었습니다. 이 새로운 기능은 서버의 로그 및 클릭 스트림, 소셜 미디어 트래픽 등으로 인덱스를 생성하고 분석 할 수 있습니다.

전송 받은 레코드 (Elasticsearch 문서)는 지정한 설정에 따라 Kinesis Firehose에서 버퍼링 된 후,여러 문서를 동시에 인덱스를 만들 수 있도록 벌크 요청을 사용하여 자동으로 클러스터를 추가합니다. 또한, 데이터는 Firehose로 전송하기 전에 UTF-8로 인코딩 된 단일 JSON 개체에 두어야 합니다. (관련 정보는 Amazon Kinesis Agent Update – New Data Preprocessing Feature를 참조하십시오).

이제 AWS 관리 콘솔을 통한 설정 방법을 알아보겠습니다. 대상(Amazon Elasticsearch Service)를 선택하고 전송 스트림의 이름을 입력합니다. Elasticsearch 도메인 (livedata 예제)을 선택 인덱스로 지정하고, 인덱스 주기(없음, 시간별, 매일, 매주, 매월)를 선택합니다. 또한, 모든 문서 또는 실패한 문서의 백업을받을 S3 버킷을 지정합니다 :

그리고 버퍼의 크기를 지정하고 S3 버킷에 전송되는 데이터의 압축 및 암호화 옵션을 선택합니다. 필요에 따라 로깅을 사용하고 IAM 역할을 선택합니다 :

1분 정도 이후에 스트림이 준비 됩니다 :

I can view the delivery metrics in the Console:

스트리밍 데이터가 Elasticsearch에 도달 한 후에는 Kibana와  Elasticsearch 쿼리 언어에 의해 데이터 시각화를 할 수 있습니다.

즉, 통합을 통해 여러분의 스트리밍 데이터를 수집하고 Elasticsearch에 전달 하기 위한 처리 방법은 매우 간단합니다. 더 이상 코드를 작성하거나 자체 데이터 수집 도구를 만들 필요가 없습니다.

샤드 기반 통계 모니터링
모든 Kinesis 스트림은 하나 이상의 샤드로 구성되어 있으며, 모든 샤드는 일정량의 읽기 · 쓰기의 용량을 가지고 있습니다. 필요에 따라 스트림에 샤드를 추가하면 스트림의 용량은 증가합니다.

여러분은 각 샤드의 성능을 파악하기위한 목적으로 샤드 단위의 통계 기능을 활성화 할 수 있게되었습니다. 샤드 당 6개의 메트릭이 있습니다. 각 통계는 1 분에 한 번 보고되고, 일반 통계 단위의 CloudWatch 요금이 부과됩니다.  이러한 신규 기능은 특정 샤드에 부하가 편중되지 않았는지, 다른 샤드와 비교하여 확인하거나 스트리밍 데이터의 전송 파이프 라인을 통해 비효율적 인 부분을 발견 및 변경할 수 있게 됩니다. 

아래에는 새로이 측정되는 수치입니다.

IncomingBytes샤드로 PUT이 성공한 바이트 수.

IncomingRecords샤드로 PUT이 성공한 레코드.

IteratorAgeMilliseconds샤드에 대한 GetRecords 호출이 취소 된 마지막 레코드의 체류 시간 (밀리 초). 값이 0 인 경우, 읽은 레코드가 완전히 스트림에 붙어 있다는 것을 의미합니다.

OutgoingBytes샤드에서받은 바이트 수.

OutgoingRecords샤드에서받은 레코드 수.

ReadProvisionedThroughputExceeded – 매초 5 회 또는 2MB를 초과한 GetRecords 호출 수.

WriteProvisionedThroughputExceeded – 매 초 1000 기록 또는 1MB를 초과한 레코드의 수.

EnableEnhancedMetrics 를 호출하는 것으로  활성화 할 수 있습니다. 평소처럼, 일정 기간 동안 집계를 위해 CloudWatch API를 사용할 수 있습니다.

시간 기반 반복 기능
어떤 샤드에 GetShardIterator를 호출 시작점으로 지정하고, 반복 기능을 작성하여 애플리케이션에서 Kinesis 스트림 데이터를 읽을 수 있습니다. 기존의 시작점 선택 (시퀀스 번호 시퀀스 번호 뒤에 가장 오래된 기록, 가장 새로운 레코드)에 추가로 타임 스탬프를 지정할 수 있게되었습니다. 지정한 값 (UNIX 시간 형식)은 읽고 처리하려고하는 가장 오래된 레코드의 타임 스탬프를 나타냅니다.

Jeff;

이 글은 Amazon Kinesis Update – Amazon Elasticsearch Service Integration, Shard-Level Metrics, Time-Based Iterators의 한국어 번역입니다.

Amazon Elasticsearch 및 CloudSearch 서비스 서울 리전 출시

Amazon Elasticsearch Service (Amazon ES)Amazon CloudSearch 서비스가 AWS Asia Pacific (Seoul) 리전에 출시 되었습니다.

Amazon ES는 Elasticsearch 서비스를 좀 더 쉽게 개발, 배포 운영할 수 있는 매니지드 서비스이며, Amazon CloudSearch 역시 웹 애플리케이션에 대한 간편한 검색 서비스를 가능하게 해주는 클라우드 서비스입니다.

서울 리전에 대한 서비스 가격은 Amazon ES 요금표Amazon CloudSearch 요금표를 참고하시기 바랍니다.

Amazon Elasticsearch Service and Amazon CloudSearch Available in Asia Pacific (Seoul) Region 참고

Amazon Elasticsearch Service 공개

Elasticsearch은 실시간 분산 검색 및 분석 엔진으로서 클라우드 환경에 잘 맞는 검색 도구입니다. 문서 지향 엔진으로 스키마(Schema)를 미리 정의할 필요가 없습니다. 정형 및 비정형 데이터 구조도 지원하고 시간 기반 쿼리 및 Kibana 같은 시각화 도구를 활용할 수도 있습니다.

오늘 Amazon Elasticsearch Service (약자로 Amazon ES) 신규 서비스를 공개합니다. 이제 여러분은 AWS 관리 콘솔에서 몇 분 만에 확장성 높은 Elasticsearch 클러스터를 실행할 수 있습니다. 각 클러스터의 클라이언트를 지정하고 데이터를 가져와서 처리하고 분석하는 서비스를 할 수 있습니다.

검색 도메인 생성
먼저 Amazon ES 도메인을 생성해 보겠습니다. AWS Command Line Interface (CLI), AWS Tools for Windows PowerShell 및 the Amazon Elasticsearch Service API를 활용할 수 도 있습니다. 시작하기 버튼을 눌러서 검색 도메인 명을 입력합니다.(my-es-cluster 선택):

두번째로 인스턴스 타입 및 갯수를 입력합니다.(둘 다 나중에 변경 가능)

여기에 몇 가지 인스턴스 가이드 라인을 참고하실 수 있습니다.

  • T2 – 개발 및 테스트 (마스터 노드로 사용하면 좋음)
  • R3 – 읽기가 많거나 복잡한 쿼리를 수행 하는 경우 (e.g. nested aggregations)
  • I2 – 쓰기가 많거나 대용량 데이터 스토리지가 필요한 경우
  • M3 – 읽기 및 쓰기에 균형이 있는 경우

‘Enable dedicated master’을 설정하면, Amazon ES를 통해 클러스터에 대한 마스터 노드를 생성합니다. 이 옵션을 선택하고 클러스터 안정성을 위해 적어도 3개 정도의 노드를 만드실 것을 권장합니다.

만약 Enable zone awareness를 체크하시면, 노드는 다중 가용 영역(AZ)에 배포 되며 고가용성을 제공할 수 있습니다. 이를 선택하시면 Elasticsearch Index API를 통해 리플리카 설정이 필요합니다. 또한, 같은 API를 이용해야 새로운 인덱스를 만들 수도 있습니다. (더 자세히 보기).

노드용 스토리지로 EBS General Purpose (SSD)를 사용하여 데이터를 저장할 수 있으며, 다른 볼륨 형식을 선택하여도 됩니다. EBS를 사용하면 더 많은 데이터를 저장할 수 있고 더 저렴한 비용으로 인스턴스 실행이 가능합니다. 인스턴스에 연결된 스토리지라면 더 나은 성능을 보장합니다. 대량 데이터는 I2 인스턴스를 실행할 수 있으며 노드당 1.6TB의 데이터를 저장할 수 있습니다.

다음에는 접근 정책을 설정하는 것인데 간단한 테스트를 위한 것이므로 대부분 기능을 열어두었지만, 여러분 클러스터는 자세히 설정하시기 바랍니다. IP기반 혹은 사용자 템플릿 기반 접근 정책을 기반으로 마법사 형식으로 접근 제한 정책을 만들 수 있습니다.

설정을 다 마친 후 Confirm and create를 선택 하시면 됩니다.

클러스터가 몇 분 안에 만들어 지면 대시보드에 나타납니다.

이제 기본 클러스터 설정을 완료하였습니다.

검색 문서 가져오기
다음 단계로 검색할 문서를 가져와 테스트해 보는 것인데, 처음 해보시는 분이라면 Having Fun: Python and Elasticsearch, Part 1을 따라서 해 보시길 추천합니다. Elasticsearch를 위한 Python 라이브러리를 설치한 후, 관리 콘솔에서 지정한 클러스터 엔드포인트에서 시작하면 됩니다.

제가 미리 해본 결과 잘 동작하는 것을 알 수 있었으며, 위의 블로그 글에 있는 Python 코드를 통해 샘플 데이터를 만들어 실행 해 볼 수 있습니다. 아래는 샘플 문서 가져오기 결과 스크린샷입니다.

검색 문서 질의하기
샘플 데이터를 가져왔다면 이제 Kibana 링크를 눌러 보시기 바랍니다.

Kibana (v4)을 웹 브라우저의 다른 탭으로 열고 나서 블로그 글을 인덱스 설정을 합니다.

Kibana를 통해 도메인 내 항목을 설정합니다.

시간이 좀 지난 후, Kibana를 통해 데이터 시각화를 할 수 있습니다.

Kibana 3 역시 지원이 가능합니다. 이를 사용하려면, 클러스터 엔드포인트에 _plugin/kibana3/ 플러그인을 연결하면 됩니다.

기타 기능 알아보기
여러분이 설정한 클러스터는 CLI (aws es update-elasticsearch-domain-configuration), API (UpdateElasticsearchDomainConfig) 및 콘솔에서 접근할 수 있습니다. Amazon ES 클러스터 설정 신규 데이터 복사 등 새로운 설정을 간단하게 다운타임 없이 할 수 있습니다.

오늘 Amazon ES 서비스 공개와 아울러 CloudWatch 로그 통합도 가능합니다. CloudWatch 로그를 Amazon ES로 연결 가능한데,  Amazon ES 도메인을 만든 후 Cloudwatch Logs 콘솔에서 Subscribe to Lambda / Amazon ES를 선택하기만 하면 됩니다.

여기서 들어오는 로그의 패턴을 찾아 설정할 수도 있습니다. (패턴 설정은 선택사항이지만 로그의 스키마를 정의할 수 있습니다.) 아래에는 몇 가지 Kibana 대시보드 샘플 예제들이 있고 여러 가지 형식의 로그를 모니터링 하는데 사용할 수 있습니다. ,

  • VPC Flow Dashboard – 로그 항목의 패턴을 정하는 데 필요
    [version, account_id, interface_id, srcaddr, dstaddr, srcport, dstport,
    protocol, packets, bytes, start, end, action, log_status]
    .
  • Lambda Dashboard – 로그 항목의 패턴을 정하는 데 필요
    [timestamp=*Z, request_id="*-*", event].
  • CloudTrail Dashboard – 패턴 지정 필요 없음 로그 항목에서 자동으로 JsON 형식으로 인지

Amazon ESICU AnalysisKuromoji 플러그인 역시 지원합니다. Elasticsearch Mapping API를 통해 정상적으로 설정이 가능합니다. Amazon ES는 아직 Shield나 Marvel 같은 상용 플러그인은 지원하지 않습니다. 이들 플러그인의 대체제로서 AWS Identity and Access Management (IAM)CloudWatch기능을 활용하시면 됩니다.

Amazon ES 는 자동으로 매일 클러스터의 스냅샷을 떠서 14일동안 저장합니다. 저장한 백업에서 클러스터를 복구하시려면 저희에게 알려주시면 됩니다. “automated snapshot hour”를 통해 백업이 일어날 시점을 정할 수 있으며, Elasticsearch Snapshot API을 통해 스냅샷 백업을 가져와서 S3 버킷에 저장하거나 가져와서 클러스터 복구를 할 수 있습니다.

Amazon ES 도메인은 17개의 개별 통계치를 CloudWatch로 전송합니다. Amazon ES 콘솔의 모니터 탭을 통해 이들 정보를 살펴 보실 수 있습니다. 클러스터 상태에 대해 (초록, 노란색 혹은 붉은 색상으로) 확인하실 수 있고, 모든 샤드(Shards)가 노드에 잘 연결되어 있으면 초록색, 최소 1개 이상의 샤드가 연결되어 있지 않으면 주황색, 1개 이상의 기본 샤드(Primary Shard)가 노드에 연결되어 있지 않으면 붉은 색으로 구분됩니다.  클러스터가 싱글 노드를 가지고 있으며, 리플리케이션이 1(Logstash 기본 설정)로 설정 되어 있다면 노란색으로 표시됩니다. 이 문제를 간단히 고치려면 새로운 노드를 하나 더 추가하기만 하면 됩니다.

CPU 활용도는 (읽기 및 쓰기 같은) 요청 처리에 직접적으로 영향을 주는 지표입니다. 이 수치가 높다면 리플리케이션을 증가 시키거나 새로운 인스턴스를 노드에 추가하는 것이 추가적인 병렬 처리에 도움이 됩니다. JVM 메모리 용량이 많이 필요할 때도 인스턴스 수를 높히거나 R3 인스턴스로 바꿀 필요가 있습니다. 이러한 사항을 숙지하고, CloudWatch 수치 변화에 대해 알람을 설정하고, 10-20%의 여유 공간을 두고 CPU를 활용하시면 됩니다.

지금 사용해 보기
오늘 부터 Amazon ES 클러스터를 US East (Northern Virginia), US West (Northern California), US West (Oregon), Asia Pacific (Tokyo), Asia Pacific (Singapore), Asia Pacific (Sydney), South America (Brazil), Europe (Ireland), 및 Europe (Frankfurt) 리전에서 바로 사용해 보실 수 있습니다.

AWS 프리 티어를 활용하여 t2.micro.elasticsearch 노드를 월 750시간 무료로 사용 가능합니다. 10 GB의 EBS 볼륨 역시 무료로 사용 가능합니다.

Jeff;

이 글은 New – Amazon Elasticsearch Service의 한국어 번역입니다.