跳转至

4. 设计

4.1 动机

我们将 Kafka 设计为能够充当统一平台来处理大公司可能拥有的 所有实时数据源。为此,我们必须考虑相当广泛的用例。

它必须具有高吞吐量才能支持大容量事件流,例如实时日志聚合。

它需要妥善处理大量积压数据,以便能够支持离线系统的定期数据加载。

这还意味着系统必须处理低延迟交付,以处理更传统的消息传递用例。

我们希望支持这些提要的分区、分布式、实时处理,以创建新的派生提要。这激发了我们的分区和消费者模型。

最后,在将流输入其他数据系统进行服务的情况下,我们知道系统必须能够在出现机器故障时保证容错。

支持这些用途使我们设计出具有许多独特元素的设计,与传统的消息系统相比,它更类似于数据库日志。我们将在以下部分中概述设计的一些元素。

4.2 持久化

不要害怕文件系统!

Kafka 严重依赖文件系统来存储和缓存消息。人们普遍认为“磁盘速度很慢”,这使人们怀疑持久结构能否提供有竞争力的性能。事实上,磁盘比人们预期的要慢得多,也要快得多,这取决于它们的使用方式;设计得当的磁盘结构往往可以和网络一样快。

关于磁盘性能的关键事实是,在过去十年中,硬盘驱动器的吞吐量与磁盘寻道的延迟一直存在差异。因此,在具有六个 7200rpm SATA RAID-5 阵列的JBOD 配置上,线性写入性能约为 600MB/秒,但随机写入性能仅为约 100k/秒,相差超过 6000 倍。这些线性读取和写入是所有使用模式中最可预测的,并且经过操作系统的大力优化。现代操作系统提供预读和后写技术,以大块倍数预取数据,并将较小的逻辑写入分组为较大的物理写入。有关此问题的进一步讨论可以在这篇ACM Queue 文章中找到;他们实际上发现 在某些情况下,顺序磁盘访问可能比随机内存访问更快!

为了弥补这种性能差异,现代操作系统越来越积极地使用主内存进行磁盘缓存。现代操作系统会很乐意将所有可用内存转移到磁盘缓存,当内存被回收时,性能损失很小。所有的磁盘读写都会经过这个统一的缓存。如果不使用直接 I/O,则无法轻松关闭此功能,因此即使进程维护数据的进程内缓存,该数据也可能会在操作系统页面缓存中重复,从而有效地将所有内容存储两次。

此外,我们是在 JVM 之上构建的,任何花时间研究 Java 内存使用的人都知道两件事:

  1. 对象的内存开销非常高,通常会使存储的数据大小增加一倍(或更糟)。
  2. 随着堆内数据的增加,Java 垃圾收集变得越来越繁琐和缓慢。

由于这些因素,使用文件系统并依赖页面缓存优于维护内存中缓存或其他结构 - 我们通过自动访问所有空闲内存至少使可用缓存加倍,并且可能通过存储紧凑的内容再次加倍字节结构而不是单个对象。这样做将在 32GB 机器上产生高达 28-30GB 的缓存,而不会造成 GC 损失。此外,即使服务重新启动,此缓存也将保持热状态,而进程内缓存将需要在内存中重建(对于 10GB 缓存可能需要 10 分钟),否则它将需要以完全冷的缓存启动(这可能意味着糟糕的初始性能)。这也极大地简化了代码,因为用于维护缓存和文件系统之间一致性的所有逻辑现在都在操作系统中,这往往比一次性的过程中尝试更有效、更正确。如果您的磁盘使用有利于线性读取,那么预读实际上是在每次磁盘读取时使用有用的数据预先填充此缓存。

这表明了一种非常简单的设计:我们不是在内存中保留尽可能多的内容,而是在空间不足时将其全部刷新到文件系统,而是将其反转。所有数据都会立即写入文件系统上的持久日志,而不必刷新到磁盘。实际上,这仅仅意味着它被转移到内核的页面缓存中。

这种以页面缓存为中心的设计风格在一篇关于 Varnish 设计的 文章 中进行了描述(同时还带有一定程度的傲慢)。

恒定的时间就足够了

消息传递系统中使用的持久数据结构通常是每个消费者的队列,具有关联的 BTree 或其他通用随机访问数据结构,以维护有关消息的元数据。BTree 是可用的最通用的数据结构,并且可以支持消息传递系统中的各种事务性和非事务性语义。不过,它们的成本确实相当高:Btree 操作的时间复杂度为 O(log N)。通常 O(log N) 被认为本质上等于常数时间,但对于磁盘操作来说并非如此。磁盘寻道一次弹出的时间为 10 毫秒,并且每个磁盘一次只能执行一次寻道,因此并行性受到限制。因此,即使少量的磁盘查找也会导致非常高的开销。由于存储系统将非常快的缓存操作与非常慢的物理磁盘操作混合在一起,

直观上,持久队列可以基于简单的读取和附加到文件来构建,就像日志记录解决方案的常见情况一样。这种结构的优点是所有操作都是 O(1) 并且读取不会阻塞写入或彼此之间。这具有明显的性能优势,因为性能与数据大小完全解耦——一台服务器现在可以充分利用许多廉价、低转速的 1+TB SATA 驱动器。尽管它们的寻道性能较差,但这些驱动器对于大量读取和写入具有可接受的性能,并且价格仅为其 1/3,容量为 3 倍。

可以访问几乎无限的磁盘空间而不会造成任何性能损失,这意味着我们可以提供一些消息传递系统中通常不具备的功能。例如,在 Kafka 中,我们可以将消息保留相对较长的时间(例如一周),而不是在消息被消费后立即尝试删除它们。正如我们将要描述的,这为消费者带来了很大的灵活性。

4.3 效率

我们在效率方面投入了大量精力。我们的主要用例之一是处理 Web 活动数据,该数据量非常大:每个页面视图可能会生成数十次写入。此外,我们假设发布的每条消息都被至少一个消费者(通常是多个)阅读,因此我们努力使消费尽可能便宜。

根据构建和运行许多类似系统的经验,我们还发现,效率是有效多租户运营的关键。如果下游基础设施服务很容易因为应用程序使用量的微小变化而成为瓶颈,那么这种微小的变化通常会产生问题。通过非常快的速度,我们有助于确保应用程序在负载下先于基础设施发生翻倒。当尝试在集中式集群上运行支持数十或数百个应用程序的集中式服务时,这一点尤其重要,因为使用模式的变化几乎每天都会发生。

我们在上一节中讨论了磁盘效率。一旦消除了不良的磁盘访问模式,此类系统效率低下的常见原因有两个:太多的小型 I/O 操作和过多的字节复制。

小 I/O 问题既发生在客户端与服务器之间,也发生在服务器自身的持久操作中。

为了避免这种情况,我们的协议是围绕“消息集”抽象构建的,该抽象自然地将消息分组在一起。这允许网络请求将消息分组在一起并分摊网络往返的开销,而不是一次发送单个消息。服务器依次将消息块一次性追加到其日志中,而消费者一次获取大的线性块。

这个简单的优化产生了几个数量级的加速。批处理会导致更大的网络数据包、更大的顺序磁盘操作、连续的内存块等等,所有这些都允许 Kafka 将随机消息写入的突发流转换为流向消费者的线性写入。

另一个低效之处在于字节复制。在低消息速率下这不是问题,但在负载下影响很大。为了避免这种情况,我们采用了由生产者、代理和消费者共享的标准化二进制消息格式(因此数据块可以在它们之间传输而无需修改)。

代理维护的消息日志本身只是一个文件目录,每个文件都由一系列消息集填充,这些消息集已以生产者和消费者使用的相同格式写入磁盘。维护这种通用格式可以优化最重要的操作:持久日志块的网络传输。现代 UNIX 操作系统提供了高度优化的代码路径,用于将数据从页面缓存传输到套接字;在 Linux 中,这是通过sendfile 系统调用完成的。

要了解 sendfile 的影响,了解从文件到套接字传输数据的通用数据路径非常重要:

  1. 操作系统将数据从磁盘读取到内核空间的pagecache中
  2. 应用程序将数据从内核空间读取到用户空间缓冲区
  3. 应用程序将数据写回到内核空间的套接字缓冲区中
  4. 操作系统将数据从套接字缓冲区复制到 NIC 缓冲区,并在其中通过网络发送

这显然效率低下,有四个副本和两个系统调用。使用 sendfile,允许操作系统将数据从页面缓存直接发送到网络,从而避免这种重新复制。所以在这个优化路径中,只需要最终拷贝到NIC缓冲区即可。

我们期望一个常见的用例是一个主题的多个消费者。使用上面的零复制优化,数据被复制到页面缓存一次,并在每次使用时重复使用,而不是存储在内存中并在每次读取时复制到用户空间。这允许以接近网络连接限制的速率使用消息。

pagecache 和 sendfile 的这种组合意味着,在消费者大多陷入困境的 Kafka 集群上,您将看不到磁盘上的任何读取活动,因为它们将完全从缓存提供数据。

SSL_sendfileTLS/SSL 库在用户空间运行( Kafka 目前不支持 内核)。由于此限制,sendfile启用 SSL 时不使用。要启用 SSL 配置,请参阅security.protocolsecurity.inter.broker.protocol

有关 Java 中的 sendfile 和零拷贝支持的更多背景信息,请参阅这篇文章

端到端批量压缩

在某些情况下,瓶颈实际上不是 CPU 或磁盘,而是网络带宽。对于需要通过广域网在数据中心之间发送消息的数据管道来说尤其如此。当然,用户总是可以一次压缩一个消息,而不需要 Kafka 的任何支持,但这可能会导致压缩率非常差,因为大部分冗余是由于相同类型的消息之间的重复造成的(例如, JSON 或网络日志中的用户代理或常见字符串值)。有效的压缩需要将多个消息压缩在一起,而不是单独压缩每个消息。

Kafka 通过高效的批处理格式来支持这一点。一批消息可以聚集在一起压缩并以这种形式发送到服务器。这批消息将以压缩形式写入,并在日志中保持压缩状态,仅由消费者解压缩。

Kafka 支持 GZIP、Snappy、LZ4 和 ZStandard 压缩协议。有关压缩的更多详细信息可以在此处找到。

4.4 生产者

负载均衡

生产者将数据直接发送到作为分区领导者的代理,而无需任何中间路由层。为了帮助生产者做到这一点,所有 Kafka 节点都可以在任何给定时间响应有关哪些服务器处于活动状态以及主题分区的领导者所在位置的元数据请求,以允许生产者适当地引导其请求。

客户端控制将消息发布到哪个分区。这可以随机完成,实现一种随机负载平衡,也可以通过某种语义分区函数来完成。我们通过允许用户指定分区键并使用它来哈希到分区来公开语义分区的接口(如果需要,还有一个选项可以覆盖分区函数)。例如,如果选择的键是用户 ID,则给定用户的所有数据都将发送到同一分区。这反过来又将使消费者能够对其消费做出局部假设。这种分区方式明确设计为允许消费者进行局部敏感处理。

异步发送

批处理是效率的重要驱动因素之一,为了启用批处理,Kafka 生产者将尝试在内存中累积数据并在单个请求中发送更大的批次。可以将批处理配置为累积不超过固定数量的消息,并且等待时间不超过某个固定延迟限制(例如 64k 或 10 ms)。这允许累积更多要发送的字节,并且服务器上很少有较大的 I/O 操作。这种缓冲是可配置的,并提供了一种机制来权衡少量的额外延迟以获得更好的吞吐量。

有关生产者的 配置api 的 详细信息可以在文档的其他地方找到。

4.5 消费者

Kafka 消费者的工作方式是向引导其想要使用的分区的代理发出“获取”请求。消费者在每个请求中指定其在日志中的偏移量,并接收从该位置开始的日志块。因此,消费者对该位置具有显着的控制权,并且可以在需要时将其倒回以重新使用数据。

推与拉

我们考虑的最初问题是消费者是否应该从broker那里提取数据,或者broker应该将数据推送给消费者。在这方面,Kafka 遵循大多数消息传递系统所共享的更传统的设计,其中数据从生产者推送到代理并由消费者从代理中提取。一些以日志记录为中心的系统,例如Scribe和 Apache Flume,遵循一种非常不同的基于推送的路径,将数据推送到下游。两种方法各有利弊。然而,基于推送的系统很难处理不同的消费者,因为代理控制数据传输的速率。目标通常是让消费者能够以尽可能高的速度消费;不幸的是,在推送系统中,这意味着当消费者的消费率低于生产率时,消费者往往会不知所措(本质上是拒绝服务攻击)。基于拉动的系统具有更好的特性,即消费者可以简单地落在后面并在可以的时候赶上。这可以通过某种退避协议来缓解,消费者可以通过退避协议来表明它已不堪重负,但要充分利用(但绝不过度利用)消费者的传输率比看起来更棘手。之前以这种方式构建系统的尝试导致我们采用了更传统的拉模型。

基于拉动的系统的另一个优点是,它有助于将数据批量发送给消费者。基于推送的系统必须选择立即发送请求或积累更多数据然后稍后发送,而不知道下游消费者是否能够立即处理它。如果调整为低延迟,这将导致一次发送一条消息,但无论如何传输最终都会被缓冲,这是浪费的。基于拉取的设计解决了这个问题,因为消费者总是在日志中当前位置之后(或达到某个可配置的最大大小)拉取所有可用消息。因此,人们可以获得最佳的批处理,而不会引入不必要的延迟。

基于拉动的系统的缺陷在于,如果代理没有数据,消费者最终可能会在紧密的循环中进行轮询,实际上忙于等待数据到达。为了避免这种情况,我们在拉取请求中设置了参数,允许消费者请求在“长轮询”中阻塞,等待数据到达(并且可以选择等待给定数量的字节可用,以确保大的传输大小)。

您可以想象其他可能的设计,这些设计只是拉式、端到端的。生产者将在本地写入本地日志,broker将从其中拉取,消费者也将从中拉取。人们经常提出类似类型的“存储转发”生产者。这很有趣,但我们觉得不太适合我们拥有数千个生产者的目标用例。我们大规模运行持久数据系统的经验让我们感到,在系统中跨许多应用程序涉及数千个磁盘实际上并不会让事情变得更可靠,而且操作起来会是一场噩梦。在实践中,我们发现我们可以大规模运行具有强大 SLA 的管道,而不需要生产者持久性。

消费者定位

令人惊讶的是,跟踪已消耗的内容是消息传递系统的关键性能点之一。

大多数消息传递系统都会保存有关代理上已使用的消息的元数据。也就是说,当消息被分发给消费者时,代理要么立即在本地记录该事实,要么等待消费者的确认。这是一个相当直观的选择,实际上对于单机服务器来说,并不清楚这种状态还可以去哪里。由于许多消息传递系统中用于存储的数据结构扩展性很差,这也是一个务实的选择——因为代理知道消耗了什么,所以可以立即删除它,从而保持数据大小较小。

也许不太明显的是,让broker和消费者就消费内容达成一致并不是一个小问题。如果代理在每次通过网络分发消息时立即将其记录为已消费,那么如果消费者无法处理该消息(例如因为崩溃或请求超时或其他原因),该消息将丢失为了解决这个问题,许多消息系统添加了确认功能,这意味着消息在发送时仅被标记为已发送,而不是被消费;代理等待消费者的特定确认以将消息记录为已消费。这种策略解决了丢失消息的问题,但也产生了新的问题。首先,如果消费者处理消息但在发送确认之前失败,那么该消息将被消费两次。第二个问题是关于性能的,现在代理必须保留每条消息的多个状态(首先锁定它,这样就不会再次发出,然后将其标记为永久消耗,以便可以将其删除)。必须处理棘手的问题,例如如何处理已发送但从未确认的消息。

Kafka对此的处理方式有所不同。我们的主题分为一组完全有序的分区,每个分区在任何给定时间都由每个订阅消费者组中的一个消费者消费。这意味着每个分区中消费者的位置只是一个整数,即要消费的下一条消息的偏移量。这使得有关已消耗内容的状态非常小,每个分区只有一个数字。可以定期检查此状态。这使得消息确认的成本非常低。

这个决定还有一个附带的好处。消费者可以故意退到旧的偏移量并重新使用数据。这违反了队列的常见契约,但事实证明对于许多消费者来说这是一个基本功能。例如,如果消费者代码存在错误,并且在消费了一些消息后发现,则消费者可以在错误修复后重新消费这些消息。

离线数据加载

可扩展的持久性允许消费者仅定期消费,例如批量数据加载,定期将数据批量加载到离线系统(例如 Hadoop 或关系数据仓库)中。

对于 Hadoop,我们通过将负载拆分到各个映射任务(每个节点/主题/分区组合一个)来并行化数据加载,从而允许加载完全并行。Hadoop 提供任务管理,失败的任务可以重新启动,而不会出现重复数据的危险——它们只是从原来的位置重新启动。

静态会员资格

静态成员资格旨在提高流应用程序、消费者组和其他构建在组再平衡协议之上的应用程序的可用性。再平衡协议依赖组协调器为组成员分配实体 ID。这些生成的 ID 是短暂的,并且会在成员重新启动和重新加入时发生变化。对于基于消费者的应用程序,这种“动态成员资格”可能会导致在管理操作(例如代码部署、配置更新和定期重新启动)期间将大部分任务重新分配给不同的实例。对于大型状态应用程序,洗牌任务在处理之前需要很长时间才能恢复其本地状态,从而导致应用程序部分或完全不可用。受这一观察的启发,Kafka 的组管理协议允许组成员提供持久的实体 ID。根据这些 ID,组成员资格保持不变,因此不会触发重新平衡。

如果您想使用静态成员资格,

  • 将代理集群和客户端应用程序升级到 2.3 或更高版本,并确保升级后的代理也使用inter.broker.protocol.version 2.3 或更高版本。
  • 将配置设置ConsumerConfig#GROUP_INSTANCE_ID_CONFIG为一组下每个消费者实例的唯一值。
  • 对于 Kafka Streams 应用程序,为每个 KafkaStreams 实例设置唯一的值就足够了ConsumerConfig#GROUP_INSTANCE_ID_CONFIG,与实例使用的线程数无关。

如果您的代理使用的版本低于 2.3,但您选择ConsumerConfig#GROUP_INSTANCE_ID_CONFIG在客户端设置,则应用程序将检测代理版本,然后抛出 UnsupportedException。如果您不小心为不同的实例配置了重复的 ID,代理端的防护机制将通过触发org.apache.kafka.common.errors.FencedInstanceIdException. 有关更多详细信息,请参阅 KIP-345

4.6 消息传递语义

现在我们对生产者和消费者的工作原理有了一些了解,让我们讨论一下 Kafka 在生产者和消费者之间提供的语义保证。显然,可以提供多种可能的消息传递保证:

  • 最多一次——消息可能会丢失,但永远不会重新传送。
  • 至少一次——消息永远不会丢失,但可以重新传送。
  • 恰好一次——这就是人们真正想要的,每条消息都传递一次且仅一次。

值得注意的是,这分为两个问题:发布消息的持久性保证和消费消息时的保证。

许多系统声称提供“恰好一次”交付语义,但阅读细则很重要,这些声明中的大多数都是误导性的(即它们不能转化为消费者或生产者可能失败的情况,有多个的情况)消费者进程,或者写入磁盘的数据可能丢失的情况)。

Kafka的语义是直接的。当发布消息时,我们有一个消息被“提交”到日志的概念。一旦提交了已发布的消息,只要复制该消息所写入的分区的一个代理保持“活动”状态,该消息就不会丢失。已提交消息、活动分区的定义以及我们尝试处理的故障类型的描述将在下一节中更详细地描述。现在让我们假设一个完美的、无损的broker,并尝试理解对生产者和消费者的保证。如果生产者尝试发布消息并遇到网络错误,则无法确定此错误是在提交消息之前还是之后发生。

在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已成功,则在重新发送期间消息可能会再次写入日志。从0.11.0.0开始,Kafka生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目​​。为了实现这一点,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号来删除重复的消息。同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即 要么所有消息均已成功写入,要么均未成功写入。其主要用例是 Kafka 主题之间的一次性处理(如下所述)。

并非所有用例都需要如此强有力的保证。对于对延迟敏感的用途,我们允许生产者指定其所需的持久性级别。如果生产者指定它想要等待消息被提交,这可能需要大约 10 毫秒。然而,生产者也可以指定它想要完全异步地执行发送,或者它只想等到领导者(但不一定是追随者)收到消息。

现在让我们从消费者的角度来描述语义。所有副本都具有完全相同的日志和相同的偏移量。消费者控制其在此日志中的位置。如果消费者从未崩溃,它可以只将该位置存储在内存中,但如果消费者失败并且我们希望该主题分区由另一个进程接管,则新进程将需要选择一个适当的位置来开始处理。假设消费者读取了一些消息——它有几个选项来处理消息并更新其位置。

  1. 它可以读取消息,然后将其位置保存在日志中,最后处理消息。在这种情况下,消费者进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息尚未处理。这对应于“至多一次”语义,因为在消费者失败消息可能不会被处理的情况下。
  2. 它可以读取消息,处理消息,最后保存其位置。在这种情况下,消费者进程有可能在处理消息之后但在保存其位置之前崩溃。在这种情况下,当新进程接管时,它收到的前几条消息将已经被处理。这对应于消费者失败情况下的“至少一次”语义。在许多情况下,消息具有主键,因此更新是幂等的(两次接收相同的消息只会用其自身的另一个副本覆盖记录)。

那么恰好一次语义(即你真正想要的东西)又如何呢?当从 Kafka 主题消费并生产到另一个主题时(如在Kafka Streams中) 应用程序),我们可以利用上面提到的 0.11.0.0 中新的事务性生产者功能。消费者的位置作为消息存储在主题中,因此我们可以在与接收处理数据的输出主题相同的事务中将偏移量写入 Kafka。如果事务中止,消费者的位置将恢复到其旧值,并且输出主题上生成的数据对其他消费者不可见,具体取决于他们的“隔离级别”。在默认的“read_uncommissed”隔离级别中,所有消息对消费者都是可见的,即使它们是中止事务的一部分,但在“read_commissed”中,消费者只会从已提交的事务中返回消息(以及不属于已提交事务的任何消息)交易的一部分)。

当写入外部系统时,限制在于需要将消费者的位置与实际存储为输出的内容相协调。实现这一目标的经典方法是在消费者位置的存储和消费者输出的存储之间引入两阶段提交。但这可以通过让消费者将其偏移量存储在与其输出相同的位置来更简单和更普遍地处理。这更好,因为消费者可能想要写入的许多输出系统不支持两阶段提交。作为一个例子,考虑 Kafka Connect连接器将数据及其读取的数据的偏移量填充到 HDFS 中,以便保证数据和偏移量要么都更新,要么都不更新。对于许多其他数据系统,我们遵循类似的模式,这些系统需要这些更强的语义,并且消息没有主键来允许重复数据删除。

因此,Kafka有效地支持Kafka Streams 中的一次性交付,并且在Kafka主题之间传输和处理数据时,通常可以使用事务性生产者/消费者来提供一次性交付。其他目标系统的一次性交付通常需要与此类系统合作,但 Kafka 提供了偏移量,使得实现这一点变得可行(另请参阅Kafka Connect)。否则,Kafka 默认保证至少一次传送,并允许用户通过在处理一批消息之前禁用生产者重试并在消费者中提交偏移量来实现最多一次传送。

4.7 复制

Kafka 在可配置数量的服务器上复制每个主题分区的日志(您可以逐个主题设置此复制因子)。当集群中的服务器发生故障时,这允许自动故障转移到这些副本,因此消息在出现故障时仍然可用。

其他消息系统提供了一些与复制相关的功能,但是,在我们(完全有偏见的)看来,这似乎是一个附加的东西,没有被大量使用,并且有很大的缺点:副本不活动,吞吐量受到严重影响,它需要繁琐的手动配置等。Kafka 默认情况下与复制一起使用 - 事实上,我们将未复制的主题实现为复制主题,其中复制因子为 1。

复制的单位是主题分区。在非故障条件下,Kafka 中的每个分区都有一个领导者和零个或多个追随者。包括领导者在内的副本总数构成了复制因子。所有写入都将发送到分区的领导者,而读取可能会发送到分区的领导者或追随者。通常,分区的数量比broker多得多,并且领导者均匀分布在broker之间。追随者上的日志与领导者的日志相同 - 都具有相同的偏移量和相同顺序的消息(当然,在任何给定时间,领导者可能在其日志末尾有一些尚未复制的消息) )。

追随者像普通 Kafka 消费者一样消费来自领导者的消息,并将其应用到自己的日志中。让追随者从领导者那里拉取有一个很好的特性,即允许追随者自然地将他们应用于其日志的日志条目分批在一起。

与大多数分布式系统一样,自动处理故障需要精确定义节点“活动”的含义。在Kafka中,一个被称为“控制器”的特殊节点负责管理集群中broker的注册。经纪商活跃度有两个条件:

  1. 代理必须与控制器保持活动会话,以便定期接收元数据更新。
  2. 作为追随者的broker必须复制领导者的写入,并且不能落后“太远”。

“活动会话”的含义取决于集群配置。对于 KRaft 集群,通过向控制器发送定期心跳来维护活动会话。如果控制器在配置的超时时间到期之前未能收到心跳 broker.session.timeout.ms,则该节点被视为离线。

对于使用 Zookeeper 的集群,活动性是通过临时节点的存在来间接确定的,临时节点是由代理在其 Zookeeper 会话初始化时创建的。如果代理在 过期之前未能向 Zookeeper 发送心跳后丢失会话 zookeeper.session.timeout.ms,则该节点将被删除。然后,控制器将通过 Zookeeper 监视注意到节点删除,并将代理标记为离线。

我们将满足这两个条件的节点称为“同步”,以避免“活动”或“失败”的模糊性。领导者跟踪一组“同步”副本,称为 ISR。如果其中任何一个条件未能满足,则该经纪商将从 ISR 中删除。例如,如果跟随者死亡,那么控制器将通过其会话丢失注意到失败,并将从 ISR 中删除代理。另一方面,如果追随者落后领导者太多,但仍然有活动会话,那么领导者也可以将其从 ISR 中删除。滞后副本的确定是通过以下方式控制的replica.lag.time.max.ms配置。在此配置设置的最大时间内无法赶上领导者日志末尾的副本将从 ISR 中删除。

在分布式系统术语中,我们只尝试处理“失败/恢复”故障模型,其中节点突然停止工作,然后恢复(可能不知道它们已经死亡)。Kafka 不处理所谓的“拜占庭”故障,即节点产生任意或恶意响应(可能是由于错误或犯规)。

现在,我们可以更精确地定义,当该分区的 ISR 中的所有副本都已将消息应用到其日志时,该消息被视为已提交。只有提交的消息才会发送给消费者。这意味着消费者不必担心如果领导者失败可能会看到可能丢失的消息。另一方面,生产者可以选择等待消息提交或不提交,具体取决于他们对延迟和持久性之间权衡的偏好。此首选项由生产者使用的 ack 设置控制。请注意,主题具有同步副本“最小数量”的设置,当生产者请求确认消息已写入完整的同步副本集时,会检查该设置。

Kafka 提供的保证是,只要至少有一个同步副本始终处于活动状态,已提交的消息就不会丢失。

在短暂的故障转移期后,Kafka 将在出现节点故障的情况下保持可用,但在存在网络分区的情况下可能无法保持可用。

复制日志:仲裁、ISR 和状态机(天哪!)

Kafka 分区的核心是复制日志。复制日志是分布式数据系统中最基本的原语之一,有多种实现方法。复制日志可以被其他系统用作以状态机方式实现其他分布式系统的原语。

复制日志模拟了就一系列值的顺序达成共识的过程(通常将日志条目编号为 0、1、2...)。实现这一点的方法有很多,但最简单、最快的方法是由领导者选择提供给它的值的顺序。只要领导者还活着,所有追随者只需复制领导者选择的值和命令即可。

当然,如果领导者没有失败,我们就不需要追随者!当领导者死亡时,我们需要从追随者中选择一位新的领导者。但追随者本身可能会落后或崩溃,所以我们必须确保选择一个最新的追随者。日志复制算法必须提供的基本保证是,如果我们告诉客户端一条消息已提交,并且领导者失败,那么我们选出的新领导者也必须具有该消息。这会产生一个权衡:如果领导者在宣布消息已提交之前等待更多追随者确认消息,那么将会有更多潜在的当选领导者。

如果您选择所需的确认数量和必须比较的日志数量来选举领导者,以确保存在重叠,那么这称为仲裁。

这种权衡的常见方法是对提交决策和领导者选举都使用多数票。这不是 Kafka 所做的,但无论如何我们还是要探索一下它以了解其中的权衡。假设我们有 2 f +1 个副本。如果f +1 个副本必须在领导者声明提交之前接收消息,并且如果我们通过从至少 f +1 个副本中选举具有最完整日志的追随者来选举新的领导者,那么,不超过f失败时,领导者保证拥有所有已提交的消息。这是因为在任何f+1 个副本,必须至少有一个副本包含所有已提交的消息。该副本的日志将是最完整的,因此将被选为新的领导者。每个算法还必须处理许多剩余的细节(例如精确定义什么使日志更完整,确保领导者故障期间的日志一致性或更改副本集中的服务器集),但我们现在将忽略这些。

这种多数投票方法有一个非常好的特性:延迟仅取决于最快的服务器。也就是说,如果复制因子为 3,则延迟由较快的跟随者决定,而不是由较慢的跟随者决定。

这个家族中有各种各样的算法,包括 ZooKeeper 的 Zab、 RaftViewstamped Replication。据我们所知,与 Kafka 实际实现最相似的学术出版物是 Microsoft 的 PacificA 。

多数投票的缺点是,不需要多次失败就会导致没有可供选举的领导人。容忍一次故障需要三份数据,容忍两次故障需要五份数据。根据我们的经验,只有足够的冗余来容忍单个故障对于实际系统来说是不够的,但是对于大容量数据问题来说,每次写入五次,磁盘空间要求为 5 倍,吞吐量为 1/5,这不太实用。这可能就是为什么仲裁算法更常出现在共享集群配置(例如 ZooKeeper)中,但在主数据存储中却不太常见。例如,在 HDFS 中,namenode 的高可用性功能是建立在基于多数票的日志之上的,但这种更昂贵的方法并不用于数据本身。

Kafka 采用略有不同的方法来选择仲裁集。Kafka 不是多数投票,而是动态维护一组赶上领导者的同步副本 (ISR)。只有该组的成员才有资格当选为领导者。在所有同步副本都收到写入之前,对 Kafka 分区的写入不会被视为已提交。只要该 ISR 集发生更改,它就会保留在集群元数据中。因此,ISR 中的任何副本都有资格被选举为领导者。对于 Kafka 的使用模型来说,这是一个重要因素,其中有很多分区,确保领导平衡很重要。借助此 ISR 模型和f+1 个副本,Kafka 主题可以容忍f 个故障而不会丢失已提交的消息。

对于我们希望处理的大多数用例,我们认为这种权衡是合理的。实际上,为了容忍失败,多数投票和 ISR 方法都将在提交消息之前等待相同数量的副本进行确认(例如,为了在一次失败中幸存,多数仲裁需要三个副本和一个确认,而 ISR 方法需要两个副本和一个确认)。无需最慢的服务器即可提交的能力是多数投票方法的优点。然而,我们认为通过允许客户端选择是否在消息提交时阻塞可以改善这一情况,并且由于所需的复制因子较低而带来的额外吞吐量和磁盘空间是值得的。

另一个重要的设计区别是 Kafka 不要求崩溃的节点恢复时所有数据都完好无损。该领域的复制算法依赖于“稳定存储”的存在并不少见,“稳定存储”在任何故障恢复场景中都不会丢失,而不会导致潜在的一致性违规。这个假设有两个主要问题。首先,磁盘错误是我们在持久数据系统的实际操作中观察到的最常见问题,并且它们通常不会使数据完好无损。其次,即使这不是问题,我们也不希望在每次写入时都使用 fsync 来保证一致性,因为这可能会将性能降低两到三个数量级。我们允许副本重新加入 ISR 的协议确保在重新加入之前,

不干净的领导人选举:如果他们都死了怎么办?

请注意,Kafka 对数据丢失的保证是基于至少一个副本保持同步。如果复制分区的所有节点都死亡,则此保证不再成立。

然而,当所有副本死亡时,一个实用的系统需要做一些合理的事情。如果您不幸发生这种情况,重要的是要考虑会发生什么。有两种可以实现的行为:

  1. 等待 ISR 中的副本恢复并选择该副本作为领导者(希望它仍然拥有所有数据)。
  2. 选择第一个作为领导者复活的副本(不一定在 ISR 中)。

这是可用性和一致性之间的简单权衡。如果我们在 ISR 中等待副本,那么只要这些副本关闭,我们就将保持不可用状态。如果此类副本被破坏或数据丢失,那么我们就会永久瘫痪。另一方面,如果一个非同步副本恢复正常,并且我们允许它成为领导者,那么它的日志就会成为事实来源,即使它不能保证拥有每条已提交的消息。默认情况下,从 0.11.0.0 版本开始,Kafka 选择第一种策略并倾向于等待一致的副本。可以使用配置属性 unclean.leader.election.enable 更改此行为,以支持正常运行时间优于一致性的用例。

这种困境并不是Kafka特有的。它存在于任何基于仲裁的方案中。例如,在多数投票方案中,如果大多数服务器遭受永久性故障,那么您必须选择丢失 100% 的数据,或者通过将现有服务器上剩余的数据作为新的事实来源来违反一致性。

可用性和耐用性保证

当写入 Kafka 时,生产者可以选择是否等待 0,1 或所有(-1)副本确认消息。请注意,“所有副本的确认”并不能保证所有分配的副本都已收到该消息。默认情况下,当 acks=all 时,一旦所有当前同步副本收到消息,就会立即发生确认。例如,如果某个主题仅配置了两个副本,其中一个失败(即仅保留一个同步副本),则指定 acks=all 的写入将会成功。但是,如果剩余副本也发生故障,这些写入可能会丢失。尽管这确保了分区的最大可用性,但对于某些更喜欢持久性而不是可用性的用户来说,这种行为可能是不受欢迎的。所以,

  1. 禁用不干净的领导者选举 - 如果所有副本都不可用,则分区将保持不可用,直到最近的领导者再次可用。这实际上更倾向于不可用而不是消息丢失的风险。请参阅上一节关于不干净的领导者选举的说明。
  2. 指定最小 ISR 大小 - 仅当 ISR 大小高于某个最小值时,分区才会接受写入,以防止仅写入单个副本的消息丢失,该副本随后变得不可用。仅当生产者使用 acks=all 并保证消息将被至少这么多同步副本确认时,此设置才会生效。此设置提供了一致性和可用性之间的权衡。最小 ISR 大小的较高设置可以保证更好的一致性,因为消息可以保证写入更多副本,从而降低丢失的可能性。但是,它会降低可用性,因为如果同步副本的数量低于最小阈值,则分区将无法用于写入。

副本管理

上面关于复制日志的讨论实际上只涵盖了单个日志,即一个主题分区。然而,Kafka 集群将管理数百或数千个这样的分区。我们尝试以循环方式平衡集群内的分区,以避免将大量主题的所有分区聚集在少量节点上。同样,我们尝试平衡领导力,以便每个节点都是其分区中一定比例份额的领导者。

优化领导选举流程也很重要,因为这是不可用的关键窗口。领导者选举的简单实现最终会在该节点发生故障时为该节点托管的所有分区运行每个分区的选举。正如上面有关复制的部分所讨论的Kafka集群有一个特殊的角色,称为“控制器”,负责管理broker的注册。如果控制器检测到代理发生故障,则它负责选举 ISR 的剩余成员之一作为新的领导者。结果是,我们能够将许多所需的领导层变更通知批处理在一起,这使得大量分区的选举过程更加便宜且更快。如果控制器本身发生故障,则将选举另一个控制器。

4.8 日志压缩

日志压缩确保 Kafka 始终至少保留单个主题分区的数据日志中每个消息键的最后一个已知值。它解决了用例和场景,例如应用程序崩溃或系统故障后恢复状态,或在操作维护期间应用程序重新启动后重新加载缓存。让我们更详细地研究这些用例,然后描述压缩的工作原理。

到目前为止,我们仅描述了更简单的数据保留方法,其中旧日志数据在固定时间段后或当日志达到某个预定大小时被丢弃。这对于时间事件数据非常有效,例如记录每个记录独立的情况。然而,一类重要的数据流是对键控、可变数据的更改的日志(例如,对数据库表的更改)。

让我们讨论这样一个流的具体示例。假设我们有一个包含用户电子邮件地址的主题;每次用户更新其电子邮件地址时,我们都会使用其用户 ID 作为主键向该主题发送一条消息。现在假设我们在一段时间内为 id 为 123 的用户发送以下消息,每条消息对应于电子邮件地址的更改(其他 id 的消息被省略):

123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

日志压缩为我们提供了更细粒度的保留机制,以便保证我们至少保留每个主键的最后更新(例如bill@gmail.com)。通过这样做,我们保证日志包含每个键的最终值的完整快照,而不仅仅是最近更改的键。这意味着下游消费者可以在该主题之外恢复自己的状态,而无需我们保留所有更改的完整日志。

让我们首先看一些有用的用例,然后我们将了解如何使用它。

  1. 数据库变更订阅。通常需要在多个数据系统中拥有一个数据集,并且通常这些系统之一是某种数据库(RDBMS 或可能是新型键值存储)。例如,您可能有一个数据库、一个缓存、一个搜索集群和一个 Hadoop 集群。对数据库的每次更改都需要反映在缓存、搜索集群中,并最终反映在 Hadoop 中。如果只处理实时更新,您只需要最近的日志。但如果您希望能够重新加载缓存或恢复失败的搜索节点,您可能需要完整的数据集。
  2. 事件溯源。这是一种应用程序设计风格,它将查询处理与应用程序设计放在一起,并使用更改日志作为应用程序的主要存储。
  3. 日志记录以实现高可用性。进行本地计算的进程可以通过注销对其本地状态所做的更改来实现容错,以便另一个进程可以重新加载这些更改并在失败时继续执行。一个具体的例子是在流查询系统中处理计数、聚合和其他类似“group by”的处理。Samza 是一个实时流处理框架, 正是利用此功能来实现此目的。

在每一种情况下,我们主要需要处理实时的变化,但偶尔,当机器崩溃或需要重新加载或重新处理数据时,就需要进行完全加载。日志压缩允许将这两个用例提供给同一支持主题。这篇博客文章更详细地描述了日志的这种使用方式。

总体思路非常简单。如果我们有无限的日志保留,并且我们记录了上述情况下的每个更改,那么我们将捕获系统从第一次开始时的每个时间的状态。使用这个完整的日志,我们可以通过重放日志中的前 N ​​条记录来恢复到任意时间点。对于多次更新单个记录的系统来说,这种假设的完整日志不太实用,因为即使对于稳定的数据集,日志也会无限制地增长。丢弃旧更新的简单日志保留机制会限制空间,但日志不再是恢复当前状态的方法 - 现在从日志开头恢复不再重新创建当前状态,因为旧更新可能根本无法捕获。

日志压缩是一种提供更细粒度的每条记录保留的机制,而不是提供更粗粒度的基于时间的保留。这个想法是有选择地删除具有相同主键的最新更新的记录。这样可以保证日志至少具有每个键的最后状态。

可以为每个主题设置此保留策略,因此单个集群可以具有一些通过大小或时间强制保留的主题,以及通过压缩强制保留的其他主题。

此功能的灵感来自于 LinkedIn 最古老、最成功的基础设施之一——名为Databus 的数据库变更日志缓存服务。与大多数日志结构存储系统不同,Kafka 是为订阅而构建的,并组织数据以进行快速线性读取和写入。与 Databus 不同,Kafka 充当真实来源存储,因此即使在上游数据源无法重放的情况下它也很有用。

日志压缩基础知识

这是一张高级图片,显示了 Kafka 日志的逻辑结构以及每条消息的偏移量。

日志的头部与传统的 Kafka 日志相同。它具有密集、连续的偏移量并保留所有消息。日志压缩添加了处理日志尾部的选项。上图显示了一根带有压紧尾部的原木。请注意,日志尾部的消息保留首次写入时分配的原始偏移量,并且永远不会改变。另请注意,即使具有该偏移量的消息已被压缩,所有偏移量仍保留在日志中的有效位置;在这种情况下,该位置与日志中出现的下一个最高偏移量无法区分。例如,在上图中,偏移量 36、37 和 38 都是等效位置,从这些偏移量中的任何一个开始读取都将返回以 38 开头的消息集。

压缩还允许删除。带有密钥和空负载的消息将被视为从日志中删除。这样的记录有时被称为墓碑。此删除标记将导致任何具有该键的先前消息被删除(就像具有该键的任何新消息一样),但删除标记的特殊之处在于,它们本身会在一段时间后从日志中清除以释放空间。删除不再保留的时间点在上图中被标记为“删除保留点”。

压缩是通过定期重新复制日志段在后台完成的。清理不会阻止读取,并且可以限制使用不超过可配置的 I/O 吞吐量,以避免影响生产者和消费者。压缩日志段的实际过程如下所示:

日志压缩提供什么保证

日志压缩保证以下几点:

  1. 任何关注日志头部的消费者都会看到写入的每条消息;这些消息将具有连续的偏移量。主题min.compaction.lag.ms可用于保证消息写入后必须经过的最短时间长度才能被压缩。即,它提供了每条消息在(未压缩的)头部中保留多长时间的下限。主题max.compaction.lag.ms可用于保证写入消息的时间和消息适合压缩的时间之间的最大延迟。
  2. 消息的顺序始终保持不变。压缩永远不会重新排序消息,只是删除一些消息。
  3. 消息的偏移量永远不会改变。它是日志中某个位置的永久标识符。
  4. 从日志开始进行的任何消费者都将至少按照写入顺序看到所有记录的最终状态。此外,如果消费者在小于主题设置的时间段内到达日志的头部delete.retention.ms(默认为 24​​ 小时),则将看到已删除记录的所有删除标记。换句话说:由于删除标记的删除与读取同时发生,因此如果滞后超过 ,消费者可能会错过删除标记delete.retention.ms

日志压缩详细信息

日志压缩由日志清理器处理,日志清理器是一个后台线程池,用于重新复制日志段文件,删除其键出现在日志头部的记录。每个压缩器线程的工作原理如下:

  1. 它选择具有最高日志头与日志尾比的日志
  2. 它为日志头部的每个键创建最后一个偏移量的简洁摘要
  3. 它从头到尾重新复制日志,删除日志中稍后出现的键。新的、干净的段会立即交换到日志中,因此所需的额外磁盘空间只是一个额外的日志段(而不是日志的完整副本)。
  4. 日志头的摘要本质上只是一个空间紧凑的哈希表。每个条目正好使用 24 个字节。因此,使用 8GB 的​​清理缓冲区,一次清理迭代可以清理大约 366GB 的日志头(假设有 1k 条消息)。

配置日志清理器

默认情况下启用日志清理器。这将启动清理线程池。要启用特定主题的日志清理,请添加特定于日志的属性

log.cleanup.policy=compact

log.cleanup.policy属性是代理server.properties文件中定义的代理配置设置;它会影响集群中没有配置覆盖的所有主题(如此处所述 。日志清理器可以配置为保留最少量的未压实的日志“头部”。这是通过设置压缩时间延迟来启用的。

log.cleaner.min.compaction.lag.ms

这可用于防止比最小消息年龄新的消息受到压缩。如果未设置,则除最后一个段(即当前正在写入的段)之外的所有日志段都适合压缩。即使活动段的所有消息都早于最小压缩时间延迟,也不会压缩活动段。日志清理器可以配置为确保最大延迟,之后日志的未压缩“头部”就可以进行日志压缩。

log.cleaner.max.compaction.lag.ms

这可用于防止低生产率的原木在无限制的持续时间内保持不符合压缩条件。如果未设置,则不会压缩不超过 min.cleanable.dirty.ratio 的日志。请注意,这个压缩期限并不是一个硬性保证,因为它仍然受到日志清理线程的可用性和实际压缩时间的影响。您将需要监控 uncleanable-partitions-count、max-clean-time-secs 和 max-compaction-delay-secs 指标。

这里 描述了更多更清洁的配置。

4.9 配额

Kafka 集群能够对请求强制实施配额,以控制客户端使用的代理资源。Kafka 代理可以为共享配额的每组客户端强制执行两种类型的客户端配额:

  1. 网络带宽配额定义字节率阈值(自 0.9 起)
  2. 请求率配额将 CPU 利用率阈值定义为网络和 I/O 线程的百分比(自 0.11 起)

为什么需要配额

生产者和消费者有可能产生/消耗非常大量的数据或以非常高的速率生成请求,从而垄断代理资源,导致网络饱和,并且通常会 DOS 其他客户端和代理本身。拥有配额可以防止这些问题,并且在大型多租户集群中尤为重要,其中一小部分行为不良的客户端可能会降低行为良好客户端的用户体验。事实上,当将 Kafka 作为服务运行时,这甚至可以根据商定的合同强制实施 API 限制。

客户群体

Kafka 客户端的身份是用户主体,它代表安全集群中经过身份验证的用户。在支持未经身份验证的客户端的集群中,用户主体是代理使用可配置的PrincipalBuilder. Client-id 是客户端的逻辑分组,具有由客户端应用程序选择的有意义的名称。元组(用户、客户端 ID)定义共享用户主体和客户端 ID 的安全逻辑客户端组。

配额可以应用于(用户、客户端 ID)、用户或客户端 ID 组。对于给定连接,将应用与该连接匹配的最具体的配额。配额组的所有连接共享为该组配置的配额。例如,如果 (user="test-user", client-id="test-client") 的生产配额为 10MB/秒,则该配额将在用户“test-user”的所有生产者实例与客户端之间共享id“测试客户端”。

配额配置

可以为(用户、客户端 ID)、用户和客户端 ID 组定义配额配置。可以在需要更高(甚至更低)配额的任何配额级别覆盖默认配额。该机制类似于每个主题的日志配置覆盖。用户和(用户、客户端 ID)配额覆盖写入到 ZooKeeper 的/config/users下,客户端 ID 配额覆盖写入到/config/clients下。所有经纪商都会读取这些覆盖并立即生效。这使我们可以更改配额,而无需滚动重新启动整个集群。详细信息请参见此处。每个组的默认配额也可以使用相同的机制动态更新。

配额配置的优先顺序是:

  1. /config/users//clients/
  2. /config/users//clients/
  3. /config/users/
  4. /config/users//clients/
  5. /config/users//clients/
  6. /config/users/
  7. /config/clients/
  8. /config/clients/

网络带宽配额

网络带宽配额定义为共享配额的每组客户端的字节率阈值。默认情况下,每个唯一的客户端组都会收到集群配置的固定配额(以字节/秒为单位)。该配额是按每个经纪商定义的。在客户端受到限制之前,每组客户端可以在每个代理上发布/获取最大 X 字节/秒。

请求速率配额

请求速率配额定义为客户端在配额窗口内可以利用每个代理的请求处理程序 I/O 线程和网络线程的时间百分比。n%的配额表示 一个线程的n%,因此配额超出了((num.io.threads + num.network.threads) * 100)%的总容量。每组客户最多可以使用n%的总百分比在配额窗口中的所有 I/O 和网络线程受到限制之前。由于分配给 I/O 和网络线程的线程数通常基于代理主机上可用的核心数,因此请求率配额表示共享配额的每组客户端可能使用的 CPU 总百分比。

执行

默认情况下,每个唯一的客户端组都会收到集群配置的固定配额。该配额是按每个经纪商定义的。在受到限制之前,每个客户端都可以使用每个代理的此配额。我们认为为每个代理定义这些配额比为每个客户端拥有固定的集群宽带宽要好得多,因为这需要一种在所有代理之间共享客户端配额使用情况的机制。这可能比配额实施本身更难!

当经纪商检测到配额违规时如何反应?在我们的解决方案中,broker首先计算使违规客户端低于其配额所需的延迟量,并立即返回包含延迟的响应。如果是获取请求,响应将不包含任何数据。然后,代理将与客户端的通道静音,不再处理来自客户端的请求,直到延迟结束。在收到延迟时间非零的响应后,Kafka 客户端也将在延迟期间避免向代理发送进一步的请求。因此,来自受限制客户端的请求会被双方有效地阻止。即使使用不尊重代理延迟响应的旧客户端实现,代理通过静音其套接字通道施加的背压仍然可以处理行为不良的客户端的限制。那些向受限通道发送进一步请求的客户端只有在延迟结束后才会收到响应。

字节率和线程利用率是在多个小窗口(例如 30 个窗口,每个窗口 1 秒)上测量的,以便快速检测和纠正配额违规。通常,具有较大的测量窗口(例如,10 个窗口,每个窗口 30 秒)会导致大量流量突发,然后是较长的延迟,这对于用户体验而言并不是很好。



回到顶部