ClickHouse 介绍
ClickHouse 是⼀个⽤于联机分析(OLAP)的列式数据库管理系统(DBMS)。根据其官⽅⽹站介绍,ClickHouse 作为⼀款⾼性能列式分析数据库,通过列式存储、向量化执⾏、压缩编码、分布式查询处理等⼿段实现了出⾊的写和查询吞吐能⼒、低延迟、⾼可扩展性。适⽤于⼤规模实时数据分析、事件⽇志处理、在线分析等应⽤场景。
ClickHouse 实时数据摄取
在许多业务场景下,如⼴告推荐、⾦融交易、⽹络监控、⽇志分析等,都对数据分析有很强的实时性需求。ClickHouse 作为具有实时数据摄⼊能⼒的⾯向 OLAP 的分析型数据库,实时数据摄取与分析是其重要应⽤场景之⼀。
在这篇⽂章中,我们会介绍通过 MSK,MSK Connect ClickHouse Kafka Connector 以及 Glue Schema Registry 来搭建端到端的全托管⽆服务器架构的实时数据摄取能⼒,并且通过 ClickHouse 与 S3 的集成来降低海量实时数据存储的成本。
整体架构
在整体架构中,我们要求数据⽣产者从数据源获取数据与 schema,并将 schema(Apache Avro 格式)注册到 AWS Glue Schema Registry 中,然后将带有 schema 信息的数据发送到 Amazon MSK 中。
在数据消费端,我们在 ClickHouse Kafka Connector 中增加了对于 AWS Glue Schema Registry 以及 Apache Avro 格式的⽀持,并将其部署在 Amazon MSK Connect 上。
ClickHouse Connector 从 Amazon MSK 中批量拉取消息并进⾏处理,然后插⼊到 ClickHouse 中。ClickHouse 使⽤ S3 作为持久存储。
ClickHouse 部署
我们可以根据官⽅⽂档将 ClickHouse 以单机⽅式或者集群⽅式部署到 EC2 上,并且参考⽂档将数据存储到 S3 上。需要注意,ClickHouse 需要的端⼝较多,需要在安全组中对应地打开。
安全组样例
也可以从 Marketplace 上搜索订阅 ClickHouse Cloud 或者 ByteHouse 等托管版本或者 ClickHouse Cluster on AWS 的⼀键部署⽅案。
Amazon MSK 以及 AWS Glue Schema Registry 部署
Amazon MSK 是 AWS 上提供的全托管的 Kafka 服务,其中 Amazon MSK Serverless 是其⽆服务器版本。MSK Connect 是 Amazon MSK 上的基于 Kafka Connect 的⼀项功能。通过不同的 Connector,MSK Connect 可以从不同的数据源抽取数据到 MSK 以及写⼊到不同的数据源。
AWS Glue Schema Registry 是亚⻢逊云科技上托管的 schema 注册发现中⼼,⽬前⽀持 Avro,JSON 以及 Protobuf 等格式。
具体步骤:
- 这⾥我们选⽤ Amazon MSK Serverless。Amazon MSK Serverless 配置相对简单,我们可以选择与 ClickHouse 联通的⼦⽹以及安全组
Amazon MSK Serverless ⽹络配置样例
- 获取连接信息
- 在 MSK Serverless 选⽤的⼦⽹内为 Glue Schema Registry 创建 VPCE(即使⼦⽹已经有通往 IGW/NATGW 的路由)
- 在 MSK Connect 下创建 Worker Configuration,注意 value.converter.endpoint 的值为 Glue Schema Registry 的 VPCE,其他配置根据消息内容调整
offset.storage.partitions=5
status.storage.partitions=2
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.region=us-west-2
value.converter.registry.name=default-registry
value.converter.schemaAutoRegistrationEnabled=true
value.converter.compatibility=BACKWARD
value.converter.avroRecordType=GENERIC_RECORD
value.converter.endpoint=https://vpce-YOUR_VPCE_DNS_NAME.glue.us-west-2.vpce.amazonaws.com
ClickHouse Kafka Connector 构建
ClickHouse Kafka Connector 是官⽅的从 Kafka Topic 中读取数据并写⼊ ClickHouse 的 Kafka Connect 插件。由于我们的 Apache Avro Schema 存储在 Glue Schema Registry 中,因此需要在官⽅ zip 包中增加两个 jar。
- 从 https://gcom/awslabs/aws-glue-schema-registry 下载源码,执⾏
得到 schema-registry-kafkaconnect-converter-1.1.18.jar
- 下载 mysql jdbc
- unzip ClickHouse Kafka Connector,添加上述 jar,然后再次 zip
- 上传到 S3 并且创建 MSK Connect Plugin
测试与验证
我们使⽤ ClickStream Kafka Producer 来进⾏测试。
- 创建可以连通 MSK Serverless 以及 ClickHouse 的 EC2(Kafka Producer)
- ⽤于连接 MSK Serverless 的 IAM Policy 并附加到 Kafka Producer 对应的 IAM Role 上
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": "arn:aws:kafka:us-west-2:YOUR_ACCOUNT_ID:cluster/*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:DescribeTopic",
"kafka-cluster:CreateTopic",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": "arn:aws:kafka:us-west-2:YOUR_ACCOUNT_ID:topic/*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": "arn:aws:kafka:us-west-2:YOUR_ACCOUNT_ID:group/*"
}
]
}
IAM Policy msk-serverless ⽰例
- 在 Kafka Producer 上使⽤ Kafka 客户端(例如 https://gcom/dpkp/kafkapython)创建 Topic
#!/usr/bin/python
# create_topic.py
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import KafkaError
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('us-west-2')
return token
tp = MSKTokenProvider()
client = KafkaAdminClient(
bootstrap_servers='YOUR_MSK_SERVERLESS_CONNECTION_INFO',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
)
client.create_topics([NewTopic('ClickStream2',2,2)])
- 在 Kafka Producer 上安装 clickhouse-client 并创建表
CREATE TABLE ClickStream2
(
`ip` String,
`eventtimestamp` Int64,
`devicetype` String,
`event_type` String,
`product_type` String,
`userid` Int32,
`globalseq` Int64,
`prevglobalseq` Int64
)
ENGINE = MergeTree
ORDER BY globalseq
SETTINGS storage_policy = 's3_main';
- 参考 AWSServiceRoleForKafkaConnect,创建 MSK Connect 需要的 Execution Role。由于我们需要使⽤ Glue Schema Registry 和 MSK Serverless,因此也添加相应权限
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface"
],
"Resource": "arn:aws:ec2:*:*:network-interface/*",
"Condition": {
"StringEquals": {
"aws:RequestTag/AmazonMSKConnectManaged": "true"
},
"ForAllValues:StringEquals": {
"aws:TagKeys": "AmazonMSKConnectManaged"
}
}
},
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface"
],
"Resource": [
"arn:aws:ec2:*:*:subnet/*",
"arn:aws:ec2:*:*:security-group/*"
]
},
{
"Effect": "Allow",
"Action": [
"ec2:CreateTags"
],
"Resource": "arn:aws:ec2:*:*:network-interface/*",
"Condition": {
"StringEquals": {
"ec2:CreateAction": "CreateNetworkInterface"
}
}
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterfacePermission",
"ec2:AttachNetworkInterface",
"ec2:DetachNetworkInterface",
"ec2:DeleteNetworkInterface"
],
"Resource": "arn:aws:ec2:*:*:network-interface/*",
"Condition": {
"StringEquals": {
"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
}
}
}
]
}
IAM Policy msk-connect-service-linked 示例
- 创建 MSK Connect,注意截图中的选项。其他可以保持默认
#Configuration settings
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
hostname=HOST_NAME_OR_IP
database=default
password=CLICKHOUSE_CLIENT_PASSWORD
port=8123
tasks.max=1
topics=ClickStream2
errors.tolerance=all
exactlyOnce=false
ssl=false
username=default
选择上⼀章创建的 Worker configuration
- 等待 MSK Connect 创建成功并且 CloudWatch Logs 中没有报错信息
- 在 Kafka Producer 上下载并编译 ClickStream Producer,修改clickstreamproducer-for-apache-kafka/producer.properties
# producer.properties
BOOTSTRAP_SERVERS_CONFIG=YOUR_MSK_SERVERLESS_CONNECTION_INFO
- 执⾏命令开始⽣产数据
java -jar clickstream-producer-for-apache-kafka/target/KafkaClickstreamClient-1.0-SNAPSHOT.jar -t ClickStream2 -pfp /home/ubuntu/clickstream-producer-for-apache-kafka/producer.properties -nt 8 -rf 300 -iam --region us-west-2 -gsr -gsrr us-west-2 -gar
- 查看 CloudWatch Logs,ClickHouse 以及对应的 S3 路径,可以看到数据成功写⼊
- 观察⼀段时间后可以暂停 Kafka Producer。通过观察 MSK Serverless 的指标,可以看到数据⽣产和消费⼏乎没有延迟
总结
ClickHouse 在实时数据摄取及分析上具有强⼤的功能,但是整体端到端的部署和运维具有⼀定的难度。通过 MSK,MSK Connect 以及 Glue Schema Registry 等托管服务,可以在很⼤程度上减轻运维的复杂度,使得⽤户能够更轻松地发掘数据的价值。
本篇作者