亚马逊AWS官方博客
MSK 可靠性最佳实践
1. Amazon MSK 介绍
Kafka 作为老牌的开源分布式事件流平台,已经广泛用于如数据集成,流处理,数据管道等各种应用中。
亚马逊云科技也于 2019 年 2 月推出了 Apache Kafka 的云托管版本,Amazon MSK(Managed Streaming for Apache Kafka)。相较于传统的自建式 Kafka 集群,MSK 有如下几点优势:
- 快速部署:作为完全托管的云服务,Amazon MSK 提供了原生多可用区(AZ)的 Apache Kafka 集群部署模式,可以帮助客户快速配置并部署高度可用的 Apache Kafka 集群, 且 AZ 之间的流量传输是免费的。
- 降低运维复杂度:Amazon MSK 会自动检测底层服务器,并在出现故障时进行替换,也会编排服务器的补丁与升级,同时还确保数据得到长久存储和保护,方便快捷地查看监控指标并设置警报。
- 弹性扩展:充分利用云服务的资源弹性能力,可以根据负载变化自动进行扩展。
- 与其他 AWS 服务紧密集成:Amazon MSK 与 AWS Identity and Access Management (IAM) 和 AWS Certificate Manager 集成以实现安全性;与 AWS Glue Schema Registry 集成用于 schema 管理;与 Amazon Kinesis Data Analytics 和 AWS Lambda 集成用于流式传输处理,等等。使应用程序的开发更简单。
- MSK 已经推出 Serverless 版,无需管理和扩展集群容量,按照流量付费,对于有明显的波峰波谷业,开发测试等场景,是很好的选择。
下面对比图很清晰地描述了在本地、EC2 部署 Apache Kafka 以及 Amazon MSK 的对比。使用 Amazon MSK 服务,可以让用户将更多的时间与精力关注在应用开发与运行上。
2. 构建可靠的消息队列平台
Amazon MSK 作为托管服务,除了上面提到的优势外,还提供了基础设施稳定运行的保障(例如底层的 EC2,EBS 卷等,分别有其对应的 SLA 保障)。但是,对于底层硬件,达到 0% 的故障率是非常难以企及的事情,我们无法忽视这部分不确定性对业务带来的影响。所以在构建 Apache Kafka 的消息队列平台时,仍需根据不同场景,调整 Kafka 相关参数,达到可靠性要求。而在我们提到可靠性时,往往表示的是整个系统的可靠性,而非单个组件的可靠性。所以在调整时,除了集群本身的参数外,还包括客户端的参数。
为避免歧义,我们定义在本文中提到的“可靠”的含义。其表示的是:在出现不同的系统故障时,Kafka 消息队列系统仍能正常对外提供服务,不影响业务的正常运行且尽可能减少数据丢失的风险。
下面我们会首先介绍 Kafka 提供的基本保障,以及基于这几个基本保障之上,Kafka 集群、生产者客户端,以及消费者客户端如何进行配置,以达到不同的保障性需求。
注意,下面文字内容较多,可以优先参考文档第 5 部分的“配置总结”快速进行相关参数配置。
2.1. Kafka提供的保障
首先,我们先看看 Kafka 提供的 4 个基本保障:
- 同一 partition 内消息的顺序在写入后不再变化;
- 一条消息在写入到其 partition 对应的所有 in-sync replicas(不一定已经 flush 到 disk)后,此消息才视为 committed。而 Producer 可以配置是否接受 ack 以及 ack 级别;
- 被 committed 消息,只要有至少一个 replica 存活,则消息不会丢失;
- Consumer 只能读取到 committed 的消息。
在这 4 个基本保障中,与可靠性相关的部分主要为第 2 条,其中涉及到的 Replication(副本)是 Kafka 提供可靠性保障的一个重要功能。也是 Kafka 实现高可用与数据高持久性的核心要素。
首先简单地介绍 Replication 的功能(若希望了解更多细节,请参考文档[2]):在 Kafka 中,每个 partition 可以有多个 replica,其中 1 个为 leader,其他的为 in-sync replica。Leader replica 负责写入,以及消费。其他的 in-sync replica 只需要与 leader 保持同步状态并实时复制消息(也可以配置消费者从 replica 消费数据)。如果 leader 不可用,则其他 replica 会选举为新的 leader。
下面我们围绕此机制展开,介绍构建可靠的 Kafka 消息队列平台时,需要考虑的相关要点。
2.2. Broker配置
在 broker 层面,与可靠性相关的配置主要有 3 个,分别为 default.replication.factor,unclean.leader.election.enable 以及min.insync.replicas。
Replication Factor
Replication factor 控制的是消息备份数,topic 级别的配置项为 replication.factor,broker 级别的配置项为 default.replication.factor。在 MSK 中,default.replication.factor 默认为 3 for 3-AZ clusters, 2 for 2-AZ clusters。
Replication factor 设置为 N 时,可以允许 N-1 个 broker 丢失时还能正常提供 topic 的读写服务。所以此参数的值越高,系统的可用性、可靠性越高,故障容忍度也更高。但另一方面,我们需要的 broker 数(至少 N 个 broker),磁盘量也会更多(N 倍的数据存储量),同时 broker 之间的流量也会增加(N-1 倍的复制流量)。所以这里在可用性与硬件设施上需要有所权衡。
另一方面,在数据冗余方面,Kafka 会确保同一个 partition 的各个 replica 分布在不同的 broker 上。但仅有这点是不够安全的,最好还将 broker 分布在不同的机架甚至不同的数据中心,达到更高的数据持久型保障。而 Amazon MSK 便是将 broker 跨多个可用区进行部署。
对于 replication factor 参数,常规调整准则为:
- 以 3 为起始(当然至少需要有 3 个 brokers,同时也不建议一个 Kafka 集群中节点数少于 3 个节点);
- 如果由于 replication 导致性能成为了瓶颈,则建议使用一个性能更好的 broker,而不是降低 RF 的值;
- 永远不要在生产环境中设置 RF 为 1。
Unclean Leader 选举
对于 1 个 partition 来说,如果其 leader replica 出现故障且不再可用,则会在其 in-sync replicas 中选举一个新的 leader replica。对于这个过程,我们称为一个“clean”选举,因为这个过程没有 committed 数据丢失,committed 数据存在于所有 in-sync replica 上。
与“clean”选举相反的为“unclean”选举,其表示的含义是:并不一定要强制从 in-sync replica 中选举 leader,也允许 out-of-sync replica 选举成为 leader。Out-of-sync replica,如字面意思,表示此 replica 与 leader 中间存在同步落后的情况(落后的原因可能包括网络延迟,broker 故障等)。
为什么需要考虑允许 out-of-sync replica 选举成为 leader?举一个例子,假设一个 partition 有 3 个 replica,有 2 个 in-sync replica 已经出现故障,仅有 leader 在正常工作。过了一会儿,leader 出现故障,并很快有 1 个 replica 恢复。此时,这个 replica 为 out-of-sync replica(因为中间有一段时间未与 leader 同步数据)。
若是不允许 out-of-sync replica 选举成为 leader,则此 partition 将无法继续提供服务,除非等到 leader 恢复正常。若是允许 out-of-sync replica 选举成为 leader,则此时 partition 可以继续提供服务,但是会有部分数据丢失(之前未同步的数据)。所以,这里也涉及到可用性与数据持久性的权衡。
控制此行为的参数为 unclean.leader.election.enable,在 broker 级别(实际一般在集群级别配置)适用。在 MSK 中默认为 true,也即是说允许“unclean”选举。在使用时,需要根据实际场景,在可用性与数据持久性之间进行权衡。
最少 In-Sync Replicas
从我们上面的例子可以看到,造成数据丢失的主要原因是:leader 与 replica 之间存在同步落后,且允许选举 out-of-sync replica 为 leader。这里的问题在于:in-sync replica 故障后,仍可以往 leader 正常写入,导致了同步落后。
在 Kafka 提供的保障中,消息在写入 leader 及其所有 in-sync replicas 后,才视为 committed。但是这里“所有 in-sync replicas”仅包含 in-sync replicas,并不考虑 out-of-sync replica。也就是说,如果当前 partition 有 leader 以及 2 个 out-of-sync replica,则消息在写入 leader 后,便视为 committed。
如果需要保证消息至少写入到多个 in-sync replicas 后才视为 committed,则可以通过参数 min.insync.replicas 进行控制(可以在broker级别或是topic级别设置)。
举个例子,假设我们 MSK 使用 3 broker,default.replication.factor=3,min.insync.replicas=2。则至少有 2 个 replica 为 in-sync 状态时(包括 leader),此 topic 的 partition 才能被生产者正常写入。如果 1 个 replica 发生故障(例如 AZ 宕机,broker 故障等等),topic 仍能正常进行读写。而若是有 2 个 replica 同时发生故障时,broker 会拒绝所有写入(Producer 会收到 NOT_ENOUGH_REPLICAS 异常),topic 转为只读。
我们再回看在“Unclean Leader 选举”中的例子。在设置了 min.insync.replicas=2 后,若是有 2 个 replica 出现故障,则无法再进行写入,继而避免了后续更高的同步落后,数据持久性会更高。不过此时并不能保证一定不会有数据丢失。举一个非常极端的例子,假设 leader 与 1 个 in-sync replica A 正常工作,另一个 replica B 出现故障,此时 topic 仍能对外提供服务。过了一会儿,replica B 恢复(此时为 out-of-sync replica),但是马上 leader 与 replica A 故障。若是 unclean.leader.election.enable 为 true,则 replica B 选举为 leader。此时虽然 producer 无法继续写入,但是 replica B 与原 leader 之间未同步的数据会丢失。
在 MSK 中,min.insync.replicas 参数默认为 2 for 3-AZ clusters, 1 for 2-AZ clusters。更高的值可以提供更高的数据持久性,但是会牺牲一定的可用性。
2.3. Producer配置
系统可靠性强调的是整体的可靠性,即使我们在 broker 端下足功夫调整配置,使其达到尽可能高的可靠性。但如果 producer 端发送消息的机制不够可靠,则整个系统仍有丢失数据的风险。
在 Producer 部分,实现可靠的 Producer 主要需要考虑2点:
- 根据可靠性需求,设置合理的 acks 参数值
- 妥善处理 Producer 异常
acks 有 3 种模式,分别为:
- acks=0:producer 在成功发送一条消息出去后,不需要 broker 端的 response 确认,即可认为写入成功。 如果消息发送到 broker 端时,broker 下线或是发生了故障,则 Producer 端无法感知,且这部分数据会丢失。这种模式下,由于不需要 broker 的确认,所以写入延迟较低。但是并非代表端到端延迟也低,因为只有消息从 leader 同步到所有 in-sync replica 后,consumer 才能读取到这条消息。
- acks=1:producer 在成功发送一条消息出去后,需要等待 leader 成功后写入并返回 ack 后,才认为写入成功。但是如果 leader 在尚未将消息同步到 in-sync replica 前发生故障,则仍会丢失此消息。
- acks=all:producer 在成功发送一条消息出去后,需要等待 leader 及其所有的 in-sync replicas 成功写入后,返回 ack,才认为写入成功。此参数必须与前面介绍的 insync.replicas 一起使用(控制最小所需写入的 in-sync replica 数量)。此参数是最安全的选项,能够确保消息完全写入到多个 replica。不过显而易见,也会引入更高的延迟。
除了 acks 的配置外,Producer 端还需要注意的一个点是异常的处理。为了防止一些瞬时的错误(例如 NotEnoughReplicasException)影响整个应用,一般我们需要处理一些异常,以避免数据丢失。
Kafka- 里的异常分两种:
- 可重试异常(例如 leader 不可用):对这类异常 Producer 会自动进行重试。为了避免 Producer 端耗尽重试次数而导致数据丢失,建议配置 Producer 的重试次数为 MAX_VALUE,并配置 delivery.timeout.ms 为最长的等待时间。让 Producer 可以在遇到此类异常时不断重试,直到写入成功。
- 不可重试异常(例如配置参数不对,消息大小问题等):对这类问题,一般属于客户端问题,Producer 也不会自动进行重试。从系统稳定性考量,开发人员需要根据业务场景,妥善处理这类异常,防止应用退出。
最后,重试会带来的一个风险是消息的重复。重试可以保证消息传递的语义为 at-least-once,而非 exactly-once。启用 enable.idempotence=true 的配置可以使得 Producer 引入额外的信息到 record 中,并以此让 Brokers 可以过滤掉由于重试引入的重复消息。
2.4. Idempotent Producer
首先,什么叫做幂等(Idempotent)?其表示的是:对于一个操作,如果执行多次的结果和执行一次的结果都是完全一致,这样的操作叫做幂等操作。
前面提到,在 Kafka producer 中,在通过重试做到消息传递的 at least once 语义后,同一条消息可能会多次传输,也就最终会导致可能的下游重复事件。
一个常见的情况是:producer 发送一条消息到 leader,leader 在接收到后,将消息成功的复制到了 replica,但是在 leader 向 producer 发送确认 ack 时,leader 所在的 broker 宕机,导致没有正常将 ack 发送回 producer。此时从 producer 的角度,它是没有收到确认消息的,所以会尝试重发消息。这条消息会到达一个新的 leader,它本身已经包含了上一条发送的消息,所以这种情况下就导致了消息的重复。
在某些情况下,重复消息不会对下游带来影响(例如下游是 put 到 nosql 数据库),但在部分情况下会导致下游数据重复(例如下游是 append 到数据库)。
Kafka 的 idempotent producer(幂等生产者)功能可以解决这种问题,其方法是自动检测并解决这类重复消息。
2.4.1. 如何解决幂等问题
在启用 Kafka 幂等生产者的功能后,每条消息会带上一个唯一的标识 id,称为 PID(Producer ID),以及一个序列号。这个 PID 与序列号加上 topic 与 partition 信息,可以唯一标识一条消息。
Broker 会用这个唯一标识消息的信息来跟踪最近 5 条(由参数 max.inflight.requests 来控制,默认值为 5)发送到每个 partition 的消息。当 broker 收到一条之前已经收到过的消息时,会拒绝这条消息并返回 producer 一条提示报错信息。在 producer 端,这个信息会加入到 record-error-rate 的指标中。在 broker 端,这个报错会归类于 ErrorsPerSec 的指标,属于 RequestMetrics 类型。
2.4.2. 幂等 producer 的作用范围
Idempotent producer 仅用于防止由于 producer retry 机制导致的消息重复。例如由于 producer 没收到 ack,或者由于网络,broker 问题导致的消息没有发送成功,从而 producer 自动尝试,并由此导致的可能的消息重复问题。但如果是应用层的重试,例如应用端发现消息未发送成功,把失败的消息放入重试队列,然后再次调用 send()方法发送消息。这类重试的消息会有新的唯一标识信息,不在 idempotent producer 处理的范围内。
2.4.3. 如何配 idempotent producer
在 producer 端加上配置 enable.idempotence=true 即可。在开启这个功能后,有如下几点需要了解:
- 由于要从 broker 取 producer ID,所以 producer 会在启动时额外调用一个 API。
- 每个发送的 record batch 会包含 producer ID 以及第 1 条消息的 sequence ID(batch 中第 1 条之后的消息的 sequence ID 是由第 1 条消息的 sequence ID 加上一个 delta 增量得来)。这些新的字段会增加每个 record batch 额外 96bits(producer ID 是 long 类型,sequence 是 integer 类型)。
- Broker 会验证每个 producer 发过来的消息中 sequence number,并判断是否为重复消息。
- Partition 内的消息顺序仍能保持。
2.5. Consumer 配置
Consumer 在消费 Kafka 数据时,主要需要确保的一点便是: 不漏消费消息。保障此行为的机制便是 offset commit。
Consumer 在消费了目标 topic 的消息后,可以自动(按固定时间间隔)或手动的方式完成 offset commit。Committed offset 会记录在 Kafka 的__consumer_offsets topic 内(由 group.id 的值进行 hash 并写入不同 partition),下次消费会按照上次 offset 的位置继续消费。
与 offset commit 相关的有 4 个重要的配置:
- id:用于标识消费组
- offset.reset:如果当前消费组对目标 topic 没有 committed offset 时(例如 Consumer 第一次消费),Consumer 使用的 offset 位置(可配置为 earliest 或 latest,默认为 latest)
- auto.commit:是否定期自动 commit offset
- commit.interval.ms:若是启用了自动 commit offset,指定自动 commit 的时间间隔,默认为 5s。一般来说,间隔更短,Consumer 的资源开销会更大。但是在 Consumer 停止并重新启动后,处理的重复数据更少。
可以看到,在使用自动 commit offset 机制时,消息传递语义为 at-most-once。会引入重复数据消费以及数据可能丢失的问题。例如,假设 Consumer 在拉取了 Kafka 的消息后,需要写入下游数据库。在写入数据库时发生异常,导致一直未能写入成功并最终失败。由于自动 commit offset 的机制,Consumer 仍会自动将已消费消息的 offset commit 到 Kafka 端。若是这部分异常未能正常处理,则未能写入下游的数据便会丢失。
所以,在复杂场景下,需要更高的端到端一致性以及准确性时,建议手动做 commit offset。因为自动 commit offset 的机制无法保证消息已被下游正确处理。
在执行手动 commit offset 时,常规考虑要点为:
- 在消息被下游完全处理后,再手动 commit offset。
- Commit offset 是一个较为耗费资源的操作。更频繁的 commit 可以减少 Consumer 停止并重启后重复消费数据的概率,但是会带来更高的负载。此处需要对吞吐与可能的重复数据之间进行权衡。
- Consumers 重试。例如上面介绍的 Consumer 消费数据并向下游数据库写数据的例子,Consumer 可能会遇到处理一个批次消息时,部分消息写入下游数据库失败。此时 Consumer 端要做好重试的机制,例如将失败的消息进行缓存,并进行重试。在重试时,对消费 Kafka 消息的行为有 2 种处理方式:
- 使用 Consumer pause()方法,确保 polls 不再返回新数据,优先将当前批次数据完全处理(一致性优先考量)。
- 将失败的消息写入到另一个重试 topic,并使用另一个消费组来处理失败的消息。当前 Consumer 可以继续正常消费并处理(吞吐优先考量)。
可以看到,手动执行 commit offset 的机制保证的消息传递语义为 at-least-cone 语义。也就是说,消息至少会被消费一次,但是仍可能会带来可能的数据重复消费的问题。例如在处理一个批次消息时 Consumer 发生异常退出,但是部分消息已经写入了下游。重启 Consumer 后,仍会从这个批次的 offset 起始位置消费,并再次处理同一批数据(包含上次已经写入下游的部分数据)。
针对这种 at-least-once 语义场景,常规的处理方式是:保证下游是幂等的(idempotent)。也就是说,重复数据在写入下游时,重复消息不会对下游应用的准确性产生影响。例如下游写入 ElasticSearch 时,使用_id 字段唯一识别一条消息,重复数据写入时也是覆盖同一个文档。
2.5.1. 流处理引擎 offset 管理
最后,值得提到的一点是,在流处理引擎如 Spark Structured Streaming 以及 Flink 里,offset commit 的行为有所不同。
在 Spark Structured Streaming 中,(即使指定了group.id 的配置)Kafka Source 也不会 commit offset 到 Kafka 的,而是使用其 checkpoint 机制进行管理并做故障恢复。每次在执行 checkpoint 时,会将当前消费的 offset 记录在其 checkpoint 文件内。
若是有需求监控消费组的 lag,则需要手动做 commit offset 到 Kafka 的操作。其中开源社区提供了一个方法可以直接参考[4]。
在 Flink 中,如果未启用 checkpoint,则使用的自动 commit offset 的方式,将 offset 定期提交到 Kafka(仍通过 enable.auto.commit 与 auto.commit.interval.ms 进行配置)。
在启用 checkpoint 后,同样通过 checkpoint 机制进行管理并做故障恢复。在 checkpoint 完成后(offset 已写入 checkpoint 文件),再将 offset commit 到 Kafka。但是在故障恢复时,仅使用 checkpoint 内的 offset。Kafka 端的 offset 仅用于做消费 lag 监控。
3. 监控
对于平台的稳定运行,监控是必不可少的。关键指标监控与报警机制可以让我们及时地发现平台的问题,并以自动或手动的方式介入,避免对系统稳定造成进一步的影响。
对于 AWS MSK,官方提供了 2 种查看监控指标的方式:CloudWatch 与 Prometheus。具体介绍可以参考文档[5]。
在监控指标方面,提供了不同级别的指标,包括 4 种级别,分别为:DEFAULT,PER_BROKER,PER_TOPIC_PER_BROKER, PER_TOPIC_PER_PARTITION。除了 DEFAULT 级别外,开启另外 3 个级别需要额外收费。所有可监控 CloudWatch 指标可以参考文档[6]。
常用集群负载与稳定性监控指标包括:
- ActiveControllerCount:应永远为 1
- Broker 的 CPU User 与 CPU System:两者之和应低于 60%
- Broker 的 KafkaDataLogsDiskUsed:使用率应低于 85%
- HeapMemoryAfterGC:应持续保持在 60% 以下
- 消费者的 Consumer Lag:消费是否能跟上
4. 验证
下面我们使用此配置验证 Kafka 的高可用。
首先创建一个 3-AZ 的 MSK 集群。
使用 Kafka CLI 创建 1 个 topic:
- ha-topic:RF=3,insync.replicas=2
创建 safe-producer.config 配置文件:
使用 kafka-verifiable-consumer 消费消息:
使用 kafka-verifiable-producer 生产消息:
然后滚动重启 broker。每次重启 1 个 broker,并在重启完毕后再重启第 2 个 broker(在本次测试环境中,重启一个 broker 约 4 分钟左右):
可以看到在 1 个 broker 重启:
在此过程中,Consumer 未遇到任何报错。Producer 遇到了以下 Error 信息,但很快即恢复,并继续生产消息:
虽然出现了 Error 信息,但从报错前后的信息来看,数据 1340,1341,1342 均写入了 Kafka。并且可以在 Consumer 端打印的日志找到,数据并未出现丢失。并且可以看到在 partition 级别,数据仍保持了有序:
5. 配置总结
根据以上介绍,总结在使用 AWS MSK 时,构建可靠高可用的 MSK 集群的配置为:
- MSK 多 AZ
- 使用 3-AZ 集群
- Broker/集群端配置
- default.replication.factor 设置至少为 3(MSK 为 3-AZ 时,默认为 3)
- min.insync.replicas 设置为 RF-1。例如在 RF 为 3 时,minISR 设置为 2
- Producer 端配置
- acks=all
- 对于可重试异常,配置无限次重试:retries=MAX_INT
- 配置 delivery.timeout.ms 为最长的等待时间
- 在连接串里填写所有 broker 地址
- (可选)基于是否需要处理由于重试导致的重复数据,配置 enable.idempotence
- Consumer 端配置
- 使用主动 commit offset 的机制,在数据被下游正常处理后再 commit offset 到 kafka
由于 MSK 会自动做底层硬件的维护以及软件的补丁、升级等工作。其工作方式是做滚动升级,也就是说,在执行过程中会每次下线一台 broker,以滚动的方式进行维护。上述配置可以实现在这些场景下(包括替换 broker)MSK 的高可用,同时尽可能避免数据的丢失。
参考文档
[1] MSK 特点介绍:https://aws.amazon.com/cn/msk/features/
[2] Kafka 数据复制: https://developer.confluent.io/learn-kafka/architecture/data-replication/
[3] MSK 默认配置:https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html
[4] Spark Structured Streaming commit offset: https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
[5] Monitoring an Amazon MSK cluster: https://docs.aws.amazon.com/msk/latest/developerguide/monitoring.html
[6] Amazon MSK metrics for monitoring with CloudWatch:https://docs.aws.amazon.com/msk/latest/developerguide/metrics-details.html
[7] Kafka The Definitive Guide:https://learning.oreilly.com/library/view/kafka-the-definitive/9781491936153/
[8] MSK Best Practice:https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html