5. 实施
5.1 网络层
网络层是一个相当简单的NIO服务器,不会详细描述。sendfile 的实现是通过为TransferableRecords
接口提供一个writeTo
方法来完成的。这允许文件支持的消息集使用更有效的transferTo
实现而不是进程内缓冲写入。线程模型是单个接受器线程和N 个处理器线程,每个线程处理固定数量的连接。该设计已在其他地方进行了相当彻底的测试,发现实施简单且速度快。该协议保持非常简单,以允许将来以其他语言实现客户端。
5.2 消息
消息由可变长度标头、可变长度不透明密钥字节数组和可变长度不透明值字节数组组成。标头的格式将在以下部分中描述。让键和值不透明是正确的决定:目前序列化库正在取得很大进展,任何特定的选择都不可能适合所有用途。不用说,使用 Kafka 的特定应用程序可能会强制使用特定的序列化类型作为其使用的一部分。该RecordBatch
接口只是一个消息迭代器,具有用于批量读取和写入 NIO 的专用方法Channel
。
5.3 消息格式
消息(又名记录)始终是批量写入的。一批消息的技术术语是记录批次,记录批次包含一条或多条记录。在退化情况下,我们可以有一个包含单个记录的记录批次。记录批次和记录有自己的标题。每个的格式如下所述。
5.3.1 记录批次
以下是 RecordBatch 的磁盘格式。
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
请注意,启用压缩后,压缩的记录数据将直接按照记录数计数进行序列化。
CRC 涵盖从属性到批次结束的数据(即CRC 后面的所有字节)。它位于魔术字节之后,这意味着客户端必须先解析魔术字节,然后才能决定如何解释批量长度和魔术字节之间的字节。分区领导纪元字段不包含在 CRC 计算中,以避免在为代理接收的每个批次分配该字段时需要重新计算 CRC。CRC-32C (Castagnoli) 多项式用于计算。
关于压缩:与旧的消息格式不同,magic v2 及更高版本在清理日志时保留原始批次的第一个和最后一个偏移量/序列号。这是为了能够在重新加载日志时恢复生产者的状态所必需的。例如,如果我们没有保留最后一个序列号,那么在分区领导者失败后,生产者可能会看到 OutOfSequence 错误。必须保留基本序列号以进行重复检查(代理通过验证传入批次的第一个和最后一个序列号是否与该生产者的最后一个序列号匹配来检查传入的 Produce 请求是否有重复)。因此,当批次中的所有记录都被清除但仍保留批次以保留生产者的最后序列号时,日志中可能会有空批次。
如果记录批次包含具有空负载或中止事务标记的记录,则压缩还可能会修改baseTimestamp。baseTimestamp 将设置为应删除这些记录的时间戳,并设置删除范围属性位。
5.3.1.1 控制批次
控制批次包含称为控制记录的单个记录。控制记录不应传递给应用程序。相反,消费者使用它们来过滤掉中止的事务消息。
控制记录的键符合以下模式:
version: int16 (current version is 0)
type: int16 (0 indicates an abort marker, 1 indicates a commit)
控制记录值的架构取决于类型。该值对客户来说是不透明的。
5.3.2 记录
记录级标头是在 Kafka 0.11.0 中引入的。带有标头的记录的磁盘格式如下所示。
length: varint
attributes: int8
bit 0~7: unused
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
5.3.2.1 记录头
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]
我们使用与 Protobuf 相同的 varint 编码。有关后者的更多信息可以在这里找到。记录中的标头计数也被编码为 varint。
5.3.3 旧消息格式
在Kafka 0.11之前,消息被传输并存储在消息集中。在消息集中,每条消息都有自己的元数据。请注意,虽然消息集表示为数组,但它们不像协议中的其他数组元素那样前面有 int32 数组大小。
消息集:
MessageSet (Version: 0) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
bit 3~7: unused
key => BYTES
value => BYTES
MessageSet (Version: 1) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes timestamp key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
0: create time
1: log append time
bit 4~7: unused
timestamp => INT64
key => BYTES
value => BYTES
在 Kafka 0.10 之前的版本中,唯一支持的消息格式版本(在幻值中指示)是 0。消息格式版本 1 在 0.10 版本中引入了时间戳支持。
- 与上面的版本 2 类似,属性的最低位表示压缩类型。
- 在版本 1 中,生产者应始终将时间戳类型位设置为 0。如果主题配置为使用日志附加时间(通过代理级别配置
log.message.timestamp.type = LogAppendTime
或主题级别配置message.timestamp.type = LogAppendTime
),broker 将覆盖时间戳类型和消息集中的时间戳。 - 属性的最高位必须设置为 0。
在消息格式版本 0 和 1 中,Kafka 支持递归消息以启用压缩。在这种情况下,必须将消息的属性设置为指示其中一种压缩类型,并且值字段将包含使用该类型压缩的消息集。我们经常将嵌套消息称为 inner messages
内部消息,将包装消息称为 outer message
外部消息。请注意,外部消息的键应该为空,并且其偏移量将是最后一个内部消息的偏移量。
当接收到递归版本 0 消息时,代理会解压缩它们,并为每个内部消息单独分配一个偏移量。在版本 1 中,为了避免服务器端重新压缩,只会为包装器消息分配偏移量。内部消息将具有相对偏移。绝对偏移量可以使用外部消息的偏移量来计算,该偏移量对应于分配给最后一个内部消息的偏移量。
crc 字段包含后续消息字节(即从幻字节到值)的 CRC32(而非 CRC-32C)。
5.4 日志
具有两个分区的名为“my-topic”的主题的日志由两个目录(即my-topic-0
和my-topic-1
)组成,其中填充了包含该主题消息的数据文件。日志文件的格式是一系列“日志条目”;每个日志条目都是一个 4 字节整数N,存储消息长度,后跟N个消息字节。每条消息由 64 位整数偏移量唯一标识给出该消息在曾经发送到该分区上该主题的所有消息流中开始的字节位置。下面给出了每条消息的磁盘格式。每个日志文件都以其包含的第一条消息的偏移量命名。因此,创建的第一个文件将是 00000000000000000000.log,每个附加文件将有一个与前一个文件大约S字节的整数名称,其中S是配置中给定的最大日志文件大小。
记录的确切二进制格式作为标准接口进行版本控制和维护,因此记录批次可以在生产者、代理和客户端之间传输,而无需在需要时重新复制或转换。上一节包含有关记录的磁盘格式的详细信息。
使用消息偏移量作为消息 ID 的情况并不常见。我们最初的想法是使用生产者生成的 GUID,并在每个代理上维护从 GUID 到偏移量的映射。但由于消费者必须为每台服务器维护一个 ID,因此 GUID 的全局唯一性没有任何价值。此外,维护从随机 ID 到偏移量的映射的复杂性需要一个必须与磁盘同步的重量级索引结构,本质上需要一个完全持久的随机访问数据结构。因此,为了简化查找结构,我们决定使用一个简单的每分区原子计数器,它可以与分区 id 和节点 id 结合起来唯一地标识一条消息;这使得查找结构更简单,尽管每个消费者请求仍可能进行多次查找。然而,一旦我们选定了柜台,直接使用偏移量的跳转似乎很自然——毕竟两者都是分区特有的单调递增整数。由于偏移量对消费者 API 是隐藏的,所以这个决定最终是一个实现细节,我们采用了更有效的方法。
写
日志允许串行追加,始终转到最后一个文件。当该文件达到可配置大小(例如 1GB)时,它会转存为新文件。日志采用两个配置参数:M,它给出在强制操作系统将文件刷新到磁盘之前要写入的消息数;以及S,它给出强制刷新之前的秒数。这提供了在系统崩溃时 最多丢失M条消息或S秒数据的持久性保证。
读
读取是通过给出消息的 64 位逻辑偏移量和S字节最大块大小来完成的。这将返回S字节缓冲区中包含的消息的迭代器。S旨在大于任何单个消息,但如果消息异常大,则可以重试多次读取,每次将缓冲区大小加倍,直到成功读取消息。可以指定最大消息和缓冲区大小,以使服务器拒绝大于某个大小的消息,并为客户端提供获取完整消息所需读取的最大值的限制。读取缓冲区很可能以部分消息结束,这很容易通过大小分隔来检测到。
从偏移量读取的实际过程需要首先找到存储数据的日志段文件,根据全局偏移值计算文件特定的偏移量,然后从该文件偏移量读取。搜索是作为针对每个文件维护的内存范围的简单二分搜索变体来完成的。
该日志提供了获取最近写入的消息的功能,以允许客户端“立即”开始订阅。如果消费者未能在 SLA 指定的天数内使用其数据,这也很有用。在这种情况下,当客户端尝试使用不存在的偏移量时,它会收到 OutOfRangeException 异常,并且可能会根据用例自行重置或失败。
以下是发送给消费者的结果的格式。
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n
删除
一次删除一个日志段的数据。日志管理器应用两个指标来识别适合删除的段:时间和大小。对于基于时间的策略,会考虑记录时间戳,分段文件中最大的时间戳(记录顺序不相关)定义整个分段的保留时间。默认情况下禁用基于大小的保留。启用后,日志管理器将不断删除最旧的段文件,直到分区的总体大小再次位于配置的限制内。如果同时启用这两个策略,则由于任一策略而符合删除条件的段将被删除。
保证
日志提供了一个配置参数M,它控制在强制刷新到磁盘之前写入的最大消息数。启动时,运行日志恢复进程,迭代最新日志段中的所有消息并验证每个消息条目是否有效。如果消息条目的大小和偏移量之和小于文件的长度并且消息有效负载的 CRC32 与消息中存储的 CRC 匹配,则消息条目有效。如果检测到损坏,日志将被截断到最后一个有效偏移量。
请注意,必须处理两种类型的损坏:由于崩溃而丢失未写入块的截断,以及将无意义块添加到文件的损坏。这样做的原因是,一般来说,操作系统不保证文件 inode 和实际块数据之间的写入顺序,因此,如果 inode 更新为新大小,那么除了丢失写入数据之外,文件还可能获得无意义的数据。崩溃发生在写入包含该数据的块之前。CRC 检测到这种特殊情况,并防止其损坏日志(尽管未写入的消息当然会丢失)。
5.5 分布
消费者抵消追踪
Kafka 消费者跟踪它在每个分区中消耗的最大偏移量,并且能够提交偏移量,以便在重新启动时可以从这些偏移量恢复。Kafka 提供了将给定消费者组的所有偏移量存储在称为组协调器的指定代理(针对该组)中的选项。即,该消费者组中的任何消费者实例都应该将其偏移量提交和获取发送到该组协调器(代理)。消费者组根据组名分配给协调员。消费者可以通过向任何 Kafka 代理发出 FindCoordinatorRequest 并读取包含协调器详细信息的 FindCoordinatorResponse 来查找其协调器。然后,消费者可以继续从协调代理中提交或获取偏移量。如果协调器移动,消费者将需要重新发现协调器。偏移量提交可以由消费者实例自动或手动完成。
当组协调器收到 OffsetCommitRequest 时,它会将请求附加到名为__consumer_offsets的特殊压缩Kafka 主题。仅当偏移量主题的所有副本都收到偏移量后,代理才会向消费者发送成功的偏移量提交响应。如果偏移量未能在可配置的超时内复制,则偏移量提交将失败,并且消费者可以在退出后重试提交。代理定期压缩偏移量主题,因为它只需要维护每个分区的最新偏移量提交。协调器还将偏移量缓存在内存表中,以便快速提供偏移量获取。
当协调器收到偏移量获取请求时,它只是从偏移量缓存中返回最后提交的偏移量向量。如果协调器刚刚启动或者它刚刚成为一组新的消费者组的协调器(通过成为偏移主题分区的领导者),它可能需要将偏移主题分区加载到缓存中。在这种情况下,偏移量获取将失败并出现 CoordinatorLoadInProgressException,并且使用者可以在退出后重试 OffsetFetchRequest。
ZooKeeper 目录
下面给出了用于协调消费者和代理之间的 ZooKeeper 结构和算法。
符号
当路径中的元素被表示为 时[xyz]
,这意味着 xyz 的值不固定,并且实际上 xyz 的每个可能值都有一个 ZooKeeper znode。例如,/topics/[topic]
名为 /topics 的目录包含每个主题名称的子目录。还给出了数字范围,例如[0...5]
指示子目录 0、1、2、3、4。箭头->
用于指示 znode 的内容。例如/hello -> world
,将指示包含值“world”的 znode /hello。
经纪商节点注册表
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
这是所有现有代理节点的列表,每个节点都提供一个唯一的逻辑代理 ID,用于向消费者标识它(必须作为其配置的一部分给出)。启动时,代理节点通过在 /brokers/ids 下创建具有逻辑代理 ID 的 znode 来注册自身。逻辑broker id的目的是允许broker移动到不同的物理机器而不影响消费者。尝试注册已在使用的代理 ID(例如,因为两个服务器配置了相同的代理 ID)会导致错误。
由于代理使用临时 znode 在 ZooKeeper 中注册自身,因此这种注册是动态的,并且如果代理关闭或死亡(从而通知消费者它不再可用),该注册就会消失。
经纪商主题注册表
/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
每个代理都会在其维护的主题下注册自己,并存储该主题的分区数量。
集群 ID
集群 ID 是分配给 Kafka 集群的唯一且不可变的标识符。集群 ID 最多可以有 22 个字符,允许的字符由正则表达式 [a-zA-Z0-9_\-]+ 定义,它对应于不带填充的 URL 安全 Base64 变体使用的字符。从概念上讲,它是在集群第一次启动时自动生成的。
从实现角度来看,它是在 0.10.1 或更高版本的代理首次成功启动时生成的。代理/cluster/id
在启动期间尝试从 znode 获取集群 ID。如果 znode 不存在,broker 会生成一个新的集群 id,并使用该集群 id 创建 znode。
经纪商节点注册
代理节点基本上是独立的,因此它们只发布有关其拥有的信息。当代理加入时,它会在代理节点注册表目录下注册自己,并写入有关其主机名和端口的信息。代理还在代理主题注册表中注册现有主题及其逻辑分区的列表。新主题在代理上创建时会动态注册。