亚马逊AWS官方博客

MSK 可靠性最佳实践

1. Amazon MSK 介绍

Kafka 作为老牌的开源分布式事件流平台,已经广泛用于如数据集成,流处理,数据管道等各种应用中。

亚马逊云科技也于 2019 年 2 月推出了 Apache Kafka 的云托管版本,Amazon MSK(Managed Streaming for Apache Kafka)。相较于传统的自建式 Kafka 集群,MSK 有如下几点优势:

  • 快速部署:作为完全托管的云服务,Amazon MSK 提供了原生多可用区(AZ)的 Apache Kafka 集群部署模式,可以帮助客户快速配置并部署高度可用的 Apache Kafka 集群,  且 AZ 之间的流量传输是免费的。
  • 降低运维复杂度:Amazon MSK 会自动检测底层服务器,并在出现故障时进行替换,也会编排服务器的补丁与升级,同时还确保数据得到长久存储和保护,方便快捷地查看监控指标并设置警报。
  • 弹性扩展:充分利用云服务的资源弹性能力,可以根据负载变化自动进行扩展。
  • 与其他 AWS 服务紧密集成:Amazon MSK 与 AWS Identity and Access Management (IAM) 和 AWS Certificate Manager 集成以实现安全性;与 AWS Glue Schema Registry 集成用于 schema 管理;与 Amazon Kinesis Data Analytics 和 AWS Lambda 集成用于流式传输处理,等等。使应用程序的开发更简单。
  • MSK 已经推出 Serverless 版,无需管理和扩展集群容量,按照流量付费,对于有明显的波峰波谷业,开发测试等场景,是很好的选择。

下面对比图很清晰地描述了在本地、EC2 部署 Apache Kafka 以及 Amazon MSK 的对比。使用 Amazon MSK 服务,可以让用户将更多的时间与精力关注在应用开发与运行上。

2. 构建可靠的消息队列平台

Amazon MSK 作为托管服务,除了上面提到的优势外,还提供了基础设施稳定运行的保障(例如底层的 EC2,EBS 卷等,分别有其对应的 SLA 保障)。但是,对于底层硬件,达到 0% 的故障率是非常难以企及的事情,我们无法忽视这部分不确定性对业务带来的影响。所以在构建 Apache Kafka 的消息队列平台时,仍需根据不同场景,调整 Kafka 相关参数,达到可靠性要求。而在我们提到可靠性时,往往表示的是整个系统的可靠性,而非单个组件的可靠性。所以在调整时,除了集群本身的参数外,还包括客户端的参数。

为避免歧义,我们定义在本文中提到的“可靠”的含义。其表示的是:在出现不同的系统故障时,Kafka 消息队列系统仍能正常对外提供服务,不影响业务的正常运行且尽可能减少数据丢失的风险。

下面我们会首先介绍 Kafka 提供的基本保障,以及基于这几个基本保障之上,Kafka 集群、生产者客户端,以及消费者客户端如何进行配置,以达到不同的保障性需求。

注意,下面文字内容较多,可以优先参考文档第 5 部分的“配置总结”快速进行相关参数配置。

2.1. Kafka提供的保障

首先,我们先看看 Kafka 提供的 4 个基本保障:

  • 同一 partition 内消息的顺序在写入后不再变化;
  • 一条消息在写入到其 partition 对应的所有 in-sync replicas(不一定已经 flush 到 disk)后,此消息才视为 committed。而 Producer 可以配置是否接受 ack 以及 ack 级别;
  • 被 committed 消息,只要有至少一个 replica 存活,则消息不会丢失;
  • Consumer 只能读取到 committed 的消息。

在这 4 个基本保障中,与可靠性相关的部分主要为第 2 条,其中涉及到的 Replication(副本)是 Kafka 提供可靠性保障的一个重要功能。也是 Kafka 实现高可用与数据高持久性的核心要素。

首先简单地介绍 Replication 的功能(若希望了解更多细节,请参考文档[2]):在 Kafka 中,每个 partition 可以有多个 replica,其中 1 个为 leader,其他的为 in-sync replica。Leader replica 负责写入,以及消费。其他的 in-sync replica 只需要与 leader 保持同步状态并实时复制消息(也可以配置消费者从 replica 消费数据)。如果 leader 不可用,则其他 replica 会选举为新的 leader。

下面我们围绕此机制展开,介绍构建可靠的 Kafka 消息队列平台时,需要考虑的相关要点。

2.2. Broker配置

在 broker 层面,与可靠性相关的配置主要有 3 个,分别为 default.replication.factor,unclean.leader.election.enable 以及min.insync.replicas。

Replication Factor

Replication factor 控制的是消息备份数,topic 级别的配置项为  replication.factor,broker 级别的配置项为 default.replication.factor。在 MSK 中,default.replication.factor 默认为 3 for 3-AZ clusters, 2 for 2-AZ clusters。

Replication factor 设置为 N 时,可以允许 N-1 个 broker 丢失时还能正常提供 topic 的读写服务。所以此参数的值越高,系统的可用性、可靠性越高,故障容忍度也更高。但另一方面,我们需要的 broker 数(至少 N 个 broker),磁盘量也会更多(N 倍的数据存储量),同时 broker 之间的流量也会增加(N-1 倍的复制流量)。所以这里在可用性与硬件设施上需要有所权衡。

另一方面,在数据冗余方面,Kafka 会确保同一个 partition 的各个 replica 分布在不同的 broker 上。但仅有这点是不够安全的,最好还将 broker 分布在不同的机架甚至不同的数据中心,达到更高的数据持久型保障。而 Amazon MSK 便是将 broker 跨多个可用区进行部署。

对于 replication factor 参数,常规调整准则为:

  • 以 3 为起始(当然至少需要有 3 个 brokers,同时也不建议一个 Kafka 集群中节点数少于 3 个节点);
  • 如果由于 replication 导致性能成为了瓶颈,则建议使用一个性能更好的 broker,而不是降低 RF 的值;
  • 永远不要在生产环境中设置 RF 为 1。

Unclean Leader 选举

对于 1 个 partition 来说,如果其 leader replica 出现故障且不再可用,则会在其 in-sync replicas 中选举一个新的 leader replica。对于这个过程,我们称为一个“clean”选举,因为这个过程没有 committed 数据丢失,committed 数据存在于所有 in-sync replica 上。

与“clean”选举相反的为“unclean”选举,其表示的含义是:并不一定要强制从 in-sync replica 中选举 leader,也允许 out-of-sync replica 选举成为 leader。Out-of-sync replica,如字面意思,表示此 replica 与 leader 中间存在同步落后的情况(落后的原因可能包括网络延迟,broker 故障等)。

为什么需要考虑允许 out-of-sync replica 选举成为 leader?举一个例子,假设一个 partition 有 3 个 replica,有 2 个 in-sync replica 已经出现故障,仅有 leader 在正常工作。过了一会儿,leader 出现故障,并很快有 1 个 replica 恢复。此时,这个 replica 为 out-of-sync replica(因为中间有一段时间未与 leader 同步数据)。

若是不允许 out-of-sync replica 选举成为 leader,则此 partition 将无法继续提供服务,除非等到 leader 恢复正常。若是允许 out-of-sync replica 选举成为 leader,则此时 partition 可以继续提供服务,但是会有部分数据丢失(之前未同步的数据)。所以,这里也涉及到可用性与数据持久性的权衡。

控制此行为的参数为 unclean.leader.election.enable,在 broker 级别(实际一般在集群级别配置)适用。在 MSK 中默认为 true,也即是说允许“unclean”选举。在使用时,需要根据实际场景,在可用性与数据持久性之间进行权衡。

最少 In-Sync Replicas

从我们上面的例子可以看到,造成数据丢失的主要原因是:leader 与 replica 之间存在同步落后,且允许选举 out-of-sync replica 为 leader。这里的问题在于:in-sync replica 故障后,仍可以往 leader 正常写入,导致了同步落后。

在 Kafka 提供的保障中,消息在写入 leader 及其所有 in-sync replicas 后,才视为 committed。但是这里“所有 in-sync replicas”仅包含 in-sync replicas,并不考虑 out-of-sync replica。也就是说,如果当前 partition 有 leader 以及 2 个 out-of-sync replica,则消息在写入 leader 后,便视为 committed。

如果需要保证消息至少写入到多个 in-sync replicas 后才视为 committed,则可以通过参数 min.insync.replicas 进行控制(可以在broker级别或是topic级别设置)。

举个例子,假设我们 MSK 使用 3 broker,default.replication.factor=3,min.insync.replicas=2。则至少有 2 个 replica 为 in-sync 状态时(包括 leader),此 topic 的 partition 才能被生产者正常写入。如果 1 个 replica 发生故障(例如 AZ 宕机,broker 故障等等),topic 仍能正常进行读写。而若是有 2 个 replica 同时发生故障时,broker 会拒绝所有写入(Producer 会收到 NOT_ENOUGH_REPLICAS 异常),topic 转为只读。

我们再回看在“Unclean Leader 选举”中的例子。在设置了 min.insync.replicas=2 后,若是有 2 个 replica 出现故障,则无法再进行写入,继而避免了后续更高的同步落后,数据持久性会更高。不过此时并不能保证一定不会有数据丢失。举一个非常极端的例子,假设 leader 与 1 个 in-sync replica A 正常工作,另一个 replica B 出现故障,此时 topic 仍能对外提供服务。过了一会儿,replica B 恢复(此时为 out-of-sync replica),但是马上 leader 与 replica A 故障。若是 unclean.leader.election.enable 为 true,则 replica B 选举为 leader。此时虽然 producer 无法继续写入,但是 replica B 与原 leader 之间未同步的数据会丢失。

在 MSK 中,min.insync.replicas 参数默认为 2 for 3-AZ clusters, 1 for 2-AZ clusters。更高的值可以提供更高的数据持久性,但是会牺牲一定的可用性。

2.3. Producer配置

系统可靠性强调的是整体的可靠性,即使我们在 broker 端下足功夫调整配置,使其达到尽可能高的可靠性。但如果 producer 端发送消息的机制不够可靠,则整个系统仍有丢失数据的风险。

在 Producer 部分,实现可靠的 Producer 主要需要考虑2点:

  • 根据可靠性需求,设置合理的 acks 参数值
  • 妥善处理 Producer 异常

acks 有 3 种模式,分别为:

  • acks=0:producer 在成功发送一条消息出去后,不需要 broker 端的 response 确认,即可认为写入成功。 如果消息发送到 broker 端时,broker 下线或是发生了故障,则 Producer 端无法感知,且这部分数据会丢失。这种模式下,由于不需要 broker 的确认,所以写入延迟较低。但是并非代表端到端延迟也低,因为只有消息从 leader 同步到所有 in-sync replica 后,consumer 才能读取到这条消息。
  • acks=1:producer 在成功发送一条消息出去后,需要等待 leader 成功后写入并返回 ack 后,才认为写入成功。但是如果 leader 在尚未将消息同步到 in-sync replica 前发生故障,则仍会丢失此消息。
  • acks=all:producer 在成功发送一条消息出去后,需要等待 leader 及其所有的 in-sync replicas 成功写入后,返回 ack,才认为写入成功。此参数必须与前面介绍的 insync.replicas 一起使用(控制最小所需写入的 in-sync replica 数量)。此参数是最安全的选项,能够确保消息完全写入到多个 replica。不过显而易见,也会引入更高的延迟。

除了 acks 的配置外,Producer 端还需要注意的一个点是异常的处理。为了防止一些瞬时的错误(例如 NotEnoughReplicasException)影响整个应用,一般我们需要处理一些异常,以避免数据丢失。

Kafka- 里的异常分两种:

  • 可重试异常(例如 leader 不可用):对这类异常 Producer 会自动进行重试。为了避免 Producer 端耗尽重试次数而导致数据丢失,建议配置 Producer 的重试次数为 MAX_VALUE,并配置 delivery.timeout.ms 为最长的等待时间。让 Producer 可以在遇到此类异常时不断重试,直到写入成功。
  • 不可重试异常(例如配置参数不对,消息大小问题等):对这类问题,一般属于客户端问题,Producer 也不会自动进行重试。从系统稳定性考量,开发人员需要根据业务场景,妥善处理这类异常,防止应用退出。

最后,重试会带来的一个风险是消息的重复。重试可以保证消息传递的语义为 at-least-once,而非 exactly-once。启用 enable.idempotence=true 的配置可以使得 Producer 引入额外的信息到 record 中,并以此让 Brokers 可以过滤掉由于重试引入的重复消息。

2.4. Idempotent Producer

首先,什么叫做幂等(Idempotent)?其表示的是:对于一个操作,如果执行多次的结果和执行一次的结果都是完全一致,这样的操作叫做幂等操作。

前面提到,在 Kafka producer 中,在通过重试做到消息传递的 at least once 语义后,同一条消息可能会多次传输,也就最终会导致可能的下游重复事件。

一个常见的情况是:producer 发送一条消息到 leader,leader 在接收到后,将消息成功的复制到了 replica,但是在 leader 向 producer 发送确认 ack 时,leader 所在的 broker 宕机,导致没有正常将 ack 发送回 producer。此时从 producer 的角度,它是没有收到确认消息的,所以会尝试重发消息。这条消息会到达一个新的 leader,它本身已经包含了上一条发送的消息,所以这种情况下就导致了消息的重复。

在某些情况下,重复消息不会对下游带来影响(例如下游是 put 到 nosql 数据库),但在部分情况下会导致下游数据重复(例如下游是 append 到数据库)。

Kafka 的 idempotent producer(幂等生产者)功能可以解决这种问题,其方法是自动检测并解决这类重复消息。

2.4.1. 如何解决幂等问题

在启用 Kafka 幂等生产者的功能后,每条消息会带上一个唯一的标识 id,称为 PID(Producer ID),以及一个序列号。这个 PID 与序列号加上 topic 与 partition 信息,可以唯一标识一条消息。

Broker 会用这个唯一标识消息的信息来跟踪最近 5 条(由参数 max.inflight.requests 来控制,默认值为 5)发送到每个 partition 的消息。当 broker 收到一条之前已经收到过的消息时,会拒绝这条消息并返回 producer 一条提示报错信息。在 producer 端,这个信息会加入到 record-error-rate 的指标中。在 broker 端,这个报错会归类于 ErrorsPerSec 的指标,属于 RequestMetrics 类型。

2.4.2. 幂等 producer 的作用范围

Idempotent producer 仅用于防止由于 producer retry 机制导致的消息重复。例如由于 producer 没收到 ack,或者由于网络,broker 问题导致的消息没有发送成功,从而 producer 自动尝试,并由此导致的可能的消息重复问题。但如果是应用层的重试,例如应用端发现消息未发送成功,把失败的消息放入重试队列,然后再次调用 send()方法发送消息。这类重试的消息会有新的唯一标识信息,不在 idempotent producer 处理的范围内。

2.4.3. 如何配 idempotent producer

在 producer 端加上配置 enable.idempotence=true 即可。在开启这个功能后,有如下几点需要了解:

  • 由于要从 broker 取 producer ID,所以 producer 会在启动时额外调用一个 API。
  • 每个发送的 record batch 会包含 producer ID 以及第 1 条消息的 sequence ID(batch 中第 1 条之后的消息的 sequence ID 是由第 1 条消息的 sequence ID 加上一个 delta 增量得来)。这些新的字段会增加每个 record batch 额外 96bits(producer ID 是 long 类型,sequence 是 integer 类型)。
  • Broker 会验证每个 producer 发过来的消息中 sequence number,并判断是否为重复消息。
  • Partition 内的消息顺序仍能保持。

2.5. Consumer 配置

Consumer 在消费 Kafka 数据时,主要需要确保的一点便是: 不漏消费消息。保障此行为的机制便是 offset commit。

Consumer 在消费了目标 topic 的消息后,可以自动(按固定时间间隔)或手动的方式完成 offset commit。Committed offset 会记录在 Kafka 的__consumer_offsets topic 内(由 group.id 的值进行 hash 并写入不同 partition),下次消费会按照上次 offset 的位置继续消费。

与 offset commit 相关的有 4 个重要的配置:

  • id:用于标识消费组
  • offset.reset:如果当前消费组对目标 topic 没有 committed offset 时(例如 Consumer 第一次消费),Consumer 使用的 offset 位置(可配置为 earliest 或 latest,默认为 latest)
  • auto.commit:是否定期自动 commit offset
  • commit.interval.ms:若是启用了自动 commit offset,指定自动 commit 的时间间隔,默认为 5s。一般来说,间隔更短,Consumer 的资源开销会更大。但是在 Consumer 停止并重新启动后,处理的重复数据更少。

可以看到,在使用自动 commit offset 机制时,消息传递语义为 at-most-once。会引入重复数据消费以及数据可能丢失的问题。例如,假设 Consumer 在拉取了 Kafka 的消息后,需要写入下游数据库。在写入数据库时发生异常,导致一直未能写入成功并最终失败。由于自动 commit offset 的机制,Consumer 仍会自动将已消费消息的 offset commit 到 Kafka 端。若是这部分异常未能正常处理,则未能写入下游的数据便会丢失。

所以,在复杂场景下,需要更高的端到端一致性以及准确性时,建议手动做 commit offset。因为自动 commit offset 的机制无法保证消息已被下游正确处理。

在执行手动 commit offset 时,常规考虑要点为:

  • 在消息被下游完全处理后,再手动 commit offset。
  • Commit offset 是一个较为耗费资源的操作。更频繁的 commit 可以减少 Consumer 停止并重启后重复消费数据的概率,但是会带来更高的负载。此处需要对吞吐与可能的重复数据之间进行权衡。
  • Consumers 重试。例如上面介绍的 Consumer 消费数据并向下游数据库写数据的例子,Consumer 可能会遇到处理一个批次消息时,部分消息写入下游数据库失败。此时 Consumer 端要做好重试的机制,例如将失败的消息进行缓存,并进行重试。在重试时,对消费 Kafka 消息的行为有 2 种处理方式:
    • 使用 Consumer pause()方法,确保 polls 不再返回新数据,优先将当前批次数据完全处理(一致性优先考量)。
    • 将失败的消息写入到另一个重试 topic,并使用另一个消费组来处理失败的消息。当前 Consumer 可以继续正常消费并处理(吞吐优先考量)。

可以看到,手动执行 commit offset 的机制保证的消息传递语义为 at-least-cone 语义。也就是说,消息至少会被消费一次,但是仍可能会带来可能的数据重复消费的问题。例如在处理一个批次消息时 Consumer 发生异常退出,但是部分消息已经写入了下游。重启 Consumer 后,仍会从这个批次的 offset 起始位置消费,并再次处理同一批数据(包含上次已经写入下游的部分数据)。

针对这种 at-least-once 语义场景,常规的处理方式是:保证下游是幂等的(idempotent)。也就是说,重复数据在写入下游时,重复消息不会对下游应用的准确性产生影响。例如下游写入 ElasticSearch 时,使用_id 字段唯一识别一条消息,重复数据写入时也是覆盖同一个文档。

2.5.1. 流处理引擎 offset 管理

最后,值得提到的一点是,在流处理引擎如 Spark Structured Streaming 以及 Flink 里,offset commit 的行为有所不同。

在 Spark Structured Streaming 中,(即使指定了group.id 的配置)Kafka Source 也不会 commit offset 到 Kafka 的,而是使用其 checkpoint 机制进行管理并做故障恢复。每次在执行 checkpoint 时,会将当前消费的 offset 记录在其 checkpoint 文件内。

若是有需求监控消费组的 lag,则需要手动做 commit offset 到 Kafka 的操作。其中开源社区提供了一个方法可以直接参考[4]。

在 Flink 中,如果未启用 checkpoint,则使用的自动 commit offset 的方式,将 offset 定期提交到 Kafka(仍通过 enable.auto.commit 与 auto.commit.interval.ms 进行配置)。

在启用 checkpoint 后,同样通过 checkpoint 机制进行管理并做故障恢复。在 checkpoint 完成后(offset 已写入 checkpoint 文件),再将 offset commit 到 Kafka。但是在故障恢复时,仅使用 checkpoint 内的 offset。Kafka 端的 offset 仅用于做消费 lag 监控。

3. 监控

对于平台的稳定运行,监控是必不可少的。关键指标监控与报警机制可以让我们及时地发现平台的问题,并以自动或手动的方式介入,避免对系统稳定造成进一步的影响。

对于 AWS MSK,官方提供了 2 种查看监控指标的方式:CloudWatch 与 Prometheus。具体介绍可以参考文档[5]。

在监控指标方面,提供了不同级别的指标,包括 4 种级别,分别为:DEFAULT,PER_BROKER,PER_TOPIC_PER_BROKER, PER_TOPIC_PER_PARTITION。除了 DEFAULT 级别外,开启另外 3 个级别需要额外收费。所有可监控 CloudWatch 指标可以参考文档[6]。

常用集群负载与稳定性监控指标包括:

  • ActiveControllerCount:应永远为 1
  • Broker 的 CPU User 与 CPU System:两者之和应低于 60%
  • Broker 的 KafkaDataLogsDiskUsed:使用率应低于 85%
  • HeapMemoryAfterGC:应持续保持在 60% 以下
  • 消费者的 Consumer Lag:消费是否能跟上

4. 验证

下面我们使用此配置验证 Kafka 的高可用。

首先创建一个 3-AZ 的 MSK 集群。

使用 Kafka CLI 创建 1 个 topic:

  • ha-topic:RF=3,insync.replicas=2
# 创建ha-topic
./kafka-topics.sh --bootstrap-server broker:9092 --config min.insync.replicas=2 --partitions 6 --replication-factor 3 --topic ha-topic --create

创建 safe-producer.config 配置文件:

retries=2147483647
delivery.timeout.ms=604800000
acks=all

使用 kafka-verifiable-consumer 消费消息:

./kafka-verifiable-consumer.sh  --bootstrap-server broker:9092 --topic ha-topic --max-messages 10000 --verbose --reset-policy latest --group-id  ha-consumer1

使用 kafka-verifiable-producer 生产消息:

./kafka-verifiable-producer.sh  --bootstrap-server broker:9092 --topic ha-topic --max-messages 100000 --throughput 100 --producer.config  safe-producer.config

然后滚动重启 broker。每次重启 1 个 broker,并在重启完毕后再重启第 2 个 broker(在本次测试环境中,重启一个 broker 约 4 分钟左右):

aws kafka reboot-broker --cluster-arn arn:aws:kafka:xxx --broker-ids 2 --profile global

可以看到在 1 个 broker 重启:

在此过程中,Consumer 未遇到任何报错。Producer 遇到了以下 Error 信息,但很快即恢复,并继续生产消息:

{"timestamp":1675758285482,"name":"producer_send_success","key":null,"value":"1340","offset":209259,"topic":"ha-topic","partition":2}
 
WARN  [Producer clientId=producer-1] Got error produce response with correlation id  1342 on topic-partition ha-topic-0, retrying (2147483646 attempts left).  Error: NOT_LEADER_OR_FOLLOWER  (org.apache.kafka.clients.producer.internals.Sender)
 
Received  invalid metadata error in produce request on partition ha-topic-0 due to  org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests  intended only for the leader, this error indicates that the broker is not the  current leader. For requests intended for any replica, this error indicates  that the broker is not a replica of the topic partition.. Going to request  metadata update now (org.apache.kafka.clients.producer.internals.Sender)
 
{"timestamp":1675758285692,"name":"producer_send_success","key":null,"value":"1342","offset":209260,"topic":"ha-topic","partition":2}
{"timestamp":1675758285798,"name":"producer_send_success","key":null,"value":"1341","offset":210908,"topic":"ha-topic","partition":0}

虽然出现了 Error 信息,但从报错前后的信息来看,数据 1340,1341,1342 均写入了 Kafka。并且可以在 Consumer 端打印的日志找到,数据并未出现丢失。并且可以看到在 partition 级别,数据仍保持了有序:

{"timestamp":1675758285482,"name":"record_data","key":null,"value":"1340","topic":"ha-topic","partition":2,"offset":209259}
{"timestamp":1675758285801,"name":"record_data","key":null,"value":"1341","topic":"ha-topic","partition":0,"offset":210908}
{"timestamp":1675758285815,"name":"record_data","key":null,"value":"1343","topic":"ha-topic","partition":1,"offset":207770}
{"timestamp":1675758285886,"name":"record_data","key":null,"value":"1344","topic":"ha-topic","partition":3,"offset":207459}
{"timestamp":1675758285890,"name":"record_data","key":null,"value":"1342","topic":"ha-topic","partition":2,"offset":209260}

5. 配置总结

根据以上介绍,总结在使用 AWS MSK 时,构建可靠高可用的 MSK 集群的配置为:

  • MSK 多 AZ
    • 使用 3-AZ 集群
  • Broker/集群端配置
    • default.replication.factor 设置至少为 3(MSK 为 3-AZ 时,默认为 3)
    • min.insync.replicas 设置为 RF-1。例如在 RF 为 3 时,minISR 设置为 2
  • Producer 端配置
    • acks=all
    • 对于可重试异常,配置无限次重试:retries=MAX_INT
    • 配置 delivery.timeout.ms 为最长的等待时间
    • 在连接串里填写所有 broker 地址
    • (可选)基于是否需要处理由于重试导致的重复数据,配置 enable.idempotence
  • Consumer 端配置
    • 使用主动 commit offset 的机制,在数据被下游正常处理后再 commit offset 到 kafka

由于 MSK 会自动做底层硬件的维护以及软件的补丁、升级等工作。其工作方式是做滚动升级,也就是说,在执行过程中会每次下线一台 broker,以滚动的方式进行维护。上述配置可以实现在这些场景下(包括替换 broker)MSK 的高可用,同时尽可能避免数据的丢失。

参考文档

[1] MSK 特点介绍:https://aws.amazon.com/cn/msk/features/

[2] Kafka 数据复制: https://developer.confluent.io/learn-kafka/architecture/data-replication/

[3] MSK 默认配置:https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html

[4] Spark Structured Streaming commit offset: https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer

[5] Monitoring an Amazon MSK cluster: https://docs.aws.amazon.com/msk/latest/developerguide/monitoring.html

[6] Amazon MSK metrics for monitoring with CloudWatch:https://docs.aws.amazon.com/msk/latest/developerguide/metrics-details.html

[7] Kafka The Definitive Guide:https://learning.oreilly.com/library/view/kafka-the-definitive/9781491936153/

[8] MSK Best Practice:https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html

本篇作者

汤市建

亚马逊云科技数据分析解决方案架构师,负责客户大数据解决方案的咨询与架构设计。

潘超

AWS数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。