跳转至

6. Operations 操作

以下是根据 LinkedIn 的使用情况和经验实际将 Kafka 作为生产系统运行的一些信息。请将您知道的任何其他提示发送给我们。

6.1 Kafka基本操作

本节将回顾您将在 Kafka 集群上执行的最常见操作。本节中回顾的所有工具都可以在bin/Kafka 发行版的目录下找到,如果不带参数运行,每个工具都会打印所有可能的命令行选项的详细信息。

添加和删​​除主题

您可以选择手动添加主题,也可以在数据首次发布到不存在的主题时自动创建主题。如果主题是自动创建的,那么您可能需要调整用于自动创建主题的 默认主题配置。

使用主题工具添加和修改主题:

> bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
      --partitions 20 --replication-factor 3 --config x=y

复制因子控制有多少服务器将复制每条写入的消息。如果复制因子为 3,则在您失去对数据的访问权限之前最多 2 个服务器可能会发生故障。我们建议您使用 2 或 3 的复制因子,以便您可以透明地弹跳机器而不中断数据消耗。

分区计数控制主题将被分片为多少个日志。分区计数有多种影响。首先,每个分区必须完全适合一台服务器。因此,如果您有 20 个分区,则完整数据集(以及读写负载)将由不超过 20 个服务器(不包括副本)处理。最后,分区计数会影响消费者的最大并行度。概念部分对此进行了更详细的讨论。

每个分片分区日志都放置在 Kafka 日志目录下自己的文件夹中。此类文件夹的名称由主题名称、附加破折号 (-) 和分区 ID 组成。由于典型的文件夹名称长度不能超过 255 个字符,因此主题名称的长度将受到限制。我们假设分区数量永远不会超过 100,000。因此,主题名称不能超过 249 个字符。这会在文件夹名称中留下足够的空间用于短划线和可能为 5 位数长的分区 ID。

在命令行上添加的配置会覆盖服务器的默认设置,例如数据应保留的时间长度。完整的每个主题配置集记录在此处

修改主题

您可以使用同一主题工具更改主题的配置或分区。

要添加分区,您可以执行以下操作

> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
      --partitions 40

请注意,分区的一种用例是对数据进行语义分区,并且添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,这可能会干扰他们。也就是说,如果数据已分区hash(key) % number_of_partitions,则该分区可能会通过添加分区而被打乱,但 Kafka 不会尝试以任何方式自动重新分配数据。

添加配置:

> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

要删除配置:

> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x

最后删除一个主题:

> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

Kafka目前不支持减少主题的分区数量。

可以在此处 找到有关更改主题的复制因子的说明。

优雅关机

Kafka 集群将自动检测任何代理关闭或故障,并为该机器上的分区选举新的领导者。无论服务器发生故障还是为了维护或配置更改而故意关闭服务器,都会发生这种情况。对于后一种情况,Kafka 支持一种更优雅的机制来停止服务器,而不仅仅是杀死它。当服务器正常停止时,它将利用两个优化:

  1. 它将所有日志同步到磁盘,以避免重新启动时需要执行任何日志恢复(即验证日志尾部所有消息的校验和)。日志恢复需要时间,因此这可以加快有意重新启动的速度。
  2. 它将在关闭之前将服务器作为领导者的任何分区迁移到其他副本。这将使领导权转移更快,并将每个分区不可用的时间最小化到几毫秒。

每当服务器停止(除了硬终止)时,同步日志都会自动发生,但受控领导迁移需要使用特殊设置:

controlled.shutdown.enable=true

请注意,只有在代理上托管的所有分区都有副本(即复制因子大于 1并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。这通常是您想要的,因为关闭最后一个副本将使该主题分区不可用。

平衡领导力

每当代理停止或崩溃时,该代理的分区的领导权就会转移到其他副本。当代理重新启动时,它只会成为其所有分区的追随者,这意味着它不会用于客户端读取和写入。

为了避免这种不平衡,Kafka 有一个首选副本的概念。如果分区的副本列表为 1、5、9,则节点 1 优先作为节点 5 或 9 的领导者,因为它在副本列表中较早。默认情况下,Kafka 集群将尝试恢复首选副本的领导地位。此行为配置为:

auto.leader.rebalance.enable=true

您也可以将其设置为 false,但随后您需要通过运行以下命令手动恢复已恢复副本的领导权:

> bin/kafka-leader-election.sh --bootstrap-server broker_host:port --election-type preferred --all-topic-partitions

跨机架平衡副本

机架感知功能将同一分区的副本分布在不同的机架上。这扩展了 Kafka 为代理故障提供的保证以涵盖机架故障,从而限制了机架上的所有代理同时发生故障时数据丢失的风险。该功能还可以应用于其他代理分组,例如 EC2 中的可用区域。

您可以通过向代理配置添加属性来指定代理属于特定机架:

broker.rack=my-rack-id

当创建修改主题或重新分配副本 时,将遵守机架约束,确保副本跨越尽可能多的机架(分区将跨越 min(#racks,replication-factor) 个不同的机架)。

用于将副本分配给代理的算法可确保每个代理的领导者数量保持不变,无论代理如何跨机架分布。这确保了平衡的吞吐量。

但是,如果为机架分配了不同数量的代理,则副本的分配将不均匀。具有较少代理的机架将获得更多副本,这意味着它们将使用更多存储并将更多资源用于复制。因此,每个机架配置相同数量的代理是明智的。

集群之间的数据镜像和异地复制

Kafka 管理员可以定义跨越各个 Kafka 集群、数据中心或地理区域边界的数据流。请参阅异地复制部分以获取更多信息。

检查消费者位置

有时了解消费者的立场很有用。我们有一个工具可以显示消费者组中所有消费者的位置以及它们距离日志末尾有多远。要在使用名为my-topic 的主题的名为my-group 的消费者组上运行此工具,如下所示:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

管理消费者群体

使用 ConsumerGroupCommand 工具,我们可以列出、描述或删除消费者组。可以手动删除消费者组,也可以在该组的最后提交的偏移量到期时自动删除该消费者组。仅当组没有任何活动成员时,手动删除才有效。例如,要列出所有主题的所有消费者组:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

test-consumer-group

要查看偏移量,如前所述,我们像这样“描述”消费者组:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

有许多附加的“描述”选项可用于提供有关消费者组的更详细信息:

  • --members:此选项提供消费者组中所有活跃成员的列表。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members

CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
  • --members --verbose:除了上面的“--members”选项报告的信息之外,此选项还提供分配给每个成员的分区。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose

CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
  • --offsets:这是默认的描述选项,提供与“--describe”选项相同的输出。
  • --state:此选项提供有用的组级别信息。
      > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

      COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
      localhost:9092 (0)        range                     Stable               4

要手动删除一个或多个消费者组,可以使用“--delete”选项:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

  Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

要重置消费者组的偏移量,可以使用“--reset-offsets”选项。此选项同时支持一个消费者组。它需要定义以下范围:--all-topics 或--topic。除非您使用“--from-file”方案,否则必须选择一个范围。另外,首先确保使用者实例处于非活动状态。有关更多详细信息, 请参阅 KIP-122 。

它有 3 个执行选项:

  • (默认)显示要重置的偏移量。
  • --execute : 执行 --reset-offsets 进程。
  • --export :将结果导出为 CSV 格式。

--reset-offsets 还有以下场景可供选择(至少必须选择一种场景):

  • --to-datetime :将偏移量重置为日期时间的偏移量。格式:'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest :将偏移量重置为最早的偏移量。
  • --to-latest :将偏移量重置为最新偏移量。
  • --shift-by :重置偏移量,将当前偏移量移动“n”,其中“n”可以是正数或负数。
  • --from-file :将偏移量重置为 CSV 文件中定义的值。
  • --to-current :将偏移量重置为当前偏移量。
  • --by-duration :将偏移量重置为从当前时间戳开始按持续时间偏移。格式:'PnDTnHnMnS'
  • --to-offset :将偏移量重置为特定偏移量。

请注意,超出范围的偏移将调整为可用的偏移端。例如,如果偏移结束为10,偏移移位请求为15,则实际上会选择10处的偏移。

例如,将消费者组的偏移量重置为最新的偏移量:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0

如果您使用旧的高级使用者并将组元数据存储在 ZooKeeper 中(即offsets.storage=zookeeper),请传递 --zookeeper而不是--bootstrap-server

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

扩展您的集群

将服务器添加到 Kafka 集群非常简单,只需为它们分配一个唯一的代理 ID 并在新服务器上启动 Kafka 即可。但是,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会执行任何工作。因此,通常当您向集群添加机器时,您会希望将一些现有数据迁移到这些机器。

迁移数据的过程是手动启动的,但完全自动化。在幕后发生的事情是,Kafka 会将新服务器添加为它正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。当新服务器完全复制该分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。

分区重新分配工具可用于跨代理移动分区。理想的分区分布将确保所有代理的数据负载和分区大小均匀。分区重新分配工具无法自动研究 Kafka 集群中的数据分布并移动分区以获得均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以在 3 种互斥的模式下运行:

  • --generate:在此模式下,给定主题列表和代理列表,该工具会生成候选重新​​分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种在给定主题和目标代理列表的情况下生成分区重新分配计划的便捷方法。
  • --execute:在此模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。(使用 --reassignment-json-file 选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用 --generate 选项提供
  • --verify:在此模式下,该工具验证上次 --execute 期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行

自动将数据迁移到新机器

分区重新分配工具可用于将某些主题从当前代理集中移至新添加的代理。这在扩展现有集群时通常很有用,因为将整个主题移动到一组新的代理比一次移动一个分区更容易。当用于执行此操作时,用户应提供应移动到新代理集的主题列表以及新代理的目标列表。然后,该工具将给定主题列表的所有分区均匀分布在新的代理集上。在此移动过程中,主题的复制因子保持不变。实际上,主题输入列表的所有分区的副本都从旧的代理集移动到新添加的代理。

例如,以下示例将主题 foo1,foo2 的所有分区移动到新的代理集 5,6。在此移动结束时,主题 foo1 和 foo2 的所有分区将仅存在于代理 5,6 上。

由于该工具接受 json 文件形式的主题输入列表,因此您首先需要确定要移动的主题并创建 json 文件,如下所示:

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
"version":1
}

json 文件准备好后,使用分区重新分配工具生成候选分配:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
              {"topic":"foo1","partition":1,"replicas":[1,3]},
              {"topic":"foo1","partition":2,"replicas":[3,4]},
              {"topic":"foo2","partition":0,"replicas":[4,2]},
              {"topic":"foo2","partition":1,"replicas":[2,1]},
              {"topic":"foo2","partition":2,"replicas":[1,3]}]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[6,5]},
              {"topic":"foo1","partition":1,"replicas":[5,6]},
              {"topic":"foo1","partition":2,"replicas":[6,5]},
              {"topic":"foo2","partition":0,"replicas":[5,6]},
              {"topic":"foo2","partition":1,"replicas":[6,5]},
              {"topic":"foo2","partition":2,"replicas":[5,6]}]
}

该工具生成一个候选分配,将所有分区从主题 foo1,foo2 移动到代理 5,6。但请注意,此时分区移动尚未开始,它仅告诉您当前分配和建议的新分配。应保存当前分配,以防您想回滚到当前分配。新分配应保存在 json 文件中(例如 Expand-cluster-reassignment.json),以便使用 --execute 选项输入到工具中,如下所示:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
              {"topic":"foo1","partition":1,"replicas":[1,3]},
              {"topic":"foo1","partition":2,"replicas":[3,4]},
              {"topic":"foo2","partition":0,"replicas":[4,2]},
              {"topic":"foo2","partition":1,"replicas":[2,1]},
              {"topic":"foo2","partition":2,"replicas":[1,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for foo1-0,foo1-1,foo1-2,foo2-0,foo2-1,foo2-2

最后,--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的 Expand-cluster-reassignment.json(与 --execute 选项一起使用)应与 --verify 选项一起使用:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] is completed
Reassignment of partition [foo1,1] is still in progress
Reassignment of partition [foo1,2] is still in progress
Reassignment of partition [foo2,0] is completed
Reassignment of partition [foo2,1] is completed
Reassignment of partition [foo2,2] is completed

自定义分区分配和迁移

分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的代理。当以这种方式使用时,假设用户知道重新分配计划并且不需要该工具来生成候选重新​​分配,从而有效地跳过 --generate 步骤并直接进入 --execute 步骤

例如,以下示例将主题 foo1 的分区 0 移动到代理 5,6,将主题 foo2 的分区 1 移动到代理 2,3:

第一步是在 json 文件中手工制定自定义重新分配计划:

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用带有 --execute 选项的 json 文件来启动重新分配过程:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
              {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for foo1-0,foo2-1

--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的 custom-reassignment.json (与 --execute 选项一起使用)应与 --verify 选项一起使用:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] is completed
Reassignment of partition [foo2,1] is completed

退役broker

分区重新分配工具尚不具备为退役代理自动生成重新分配计划的能力。因此,管理员必须制定一个重新分配计划,将要停用的代理上托管的所有分区的副本移动到其余代理。这可能相对繁琐,因为重新分配需要确保所有副本不会从已停用的代理仅移动到另一个代理。为了使这个过程变得轻松,我们计划在未来为退役broker添加工具支持。

增加复制因子

增加现有分区的复制因子很容易。只需在自定义重新分配 json 文件中指定额外的副本,并将其与 --execute 选项一起使用即可增加指定分区的复制因子。

例如,以下示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 5 上添加更多副本broker 6 和 7。

第一步是在 json 文件中手工制定自定义重新分配计划:

> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然后,使用带有 --execute 选项的 json 文件来启动重新分配过程:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for foo-0

--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的increase-replication-factor.json(与--execute选项一起使用)应与--verify选项一起使用:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] is completed

您还可以使用 kafka-topics 工具验证复制因子的增加:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo   PartitionCount:1    ReplicationFactor:3 Configs:
  Topic: foo    Partition: 0    Leader: 5   Replicas: 5,6,7 Isr: 5,6,7

限制数据迁移期间的带宽使用

Kafka 允许您对复制流量进行限制,设置用于在机器之间移动副本的带宽上限。这在重新平衡集群、引导新代理或添加或删除代理时非常有用,因为它限制了这些数据密集型操作对用户产生的影响。

有两个接口可用于接合油门。最简单、最安全的方法是在调用 kafka-reassign-partitions.sh 时应用限制,但 kafka-configs.sh 也可用于直接查看和更改限制值。

例如,如果您要使用以下命令执行重新平衡,它将以不超过 50MB/s 的速度移动分区。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file big-cluster.json --throttle 50000000

当您执行此脚本时,您将看到油门接合:

The inter-broker throttle limit was set to 50000000 B/s
Successfully started partition reassignment for foo1-0

如果您希望在重新平衡期间改变油门,例如增加吞吐量,以便更快地完成,您可以通过使用传递相同的重新分配 json 文件的 --additional 选项重新运行执行命令来完成此操作:

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --additional --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
  The inter-broker throttle limit was set to 700000000 B/s

重新平衡完成后,管理员可以使用 --verify 选项检查重新平衡的状态。如果重新平衡已完成,则将通过 --verify 命令删除限制。重要的是,管理员通过运行带有 --verify 选项的命令,在重新平衡完成后及时取消限制。如果不这样做可能会导致常规复制流量受到限制。

当执行 --verify 选项并且重新分配完成时,脚本将确认限制已被删除:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] is completed
Reassignment of partition [my-topic,0] is completed

Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic my-topic

管理员还可以使用 kafka-configs.sh 验证分配的配置。有两对节流配置用于管理节流过程。第一对指油门值本身。这是在代理级别使用动态属性进行配置的:

leader.replication.throttled.rate
follower.replication.throttled.rate

然后是限制副本的枚举集的配置对:

leader.replication.throttled.replicas
follower.replication.throttled.replicas

哪些是按主题配置的。

所有四个配置值均由 kafka-reassign-partitions.sh 自动分配(如下所述)。

查看油门限制配置:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的领导者和跟随者端的限制。默认情况下,双方都分配有相同的限制吞吐量值。

要查看受限制的副本列表:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
    follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到领导者节流应用于代理 102 上的分区 1 和代理 101 上的分区 0。同样,追随者节流应用于代理 101 上的分区 1 和代理 102 上的分区 0。

默认情况下,kafka-reassign-partitions.sh 会将领导者限制应用于重新平衡之前存在的所有副本,其中任何一个都可能是领导者。它将对所有移动目的地应用跟随油门。因此,如果代理 101,102 上有一个具有副本的分区,被重新分配给 102,103,则该分区的领导者限制将应用于 101,102,追随者限制将仅应用于 103。

如果需要,您还可以使用 kafka-configs.sh 上的 --alter 开关手动更改节流配置。

安全使用限制复制

使用限制复制时应小心。尤其:

(1) Throttle Removal:

重新分配完成后应及时移除限制(通过运行 kafka-reassign-partitions.sh --verify)。

(2) Ensuring Progress:

如果与传入写入速率相比,限制设置得太低,复制可能无法取得进展。出现这种情况时:

max(BytesInPerSec) > throttle

其中 BytesInPerSec 是监控生产者写入每个代理的吞吐量的指标。

管理员可以在重新平衡期间使用以下指标监控复制是否取得进展:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

复制过程中滞后应该不断减少。如果指标没有减少,管理员应如上所述增加限制吞吐量。

设置配额

配额覆盖和默认值可以在(用户、客户端 ID)、用户或客户端 ID 级别进行配置,如此处所述。默认情况下,客户端获得无限配额。可以为每个(用户、客户端 ID)、用户或客户端 ID 组设置自定义配额。

配置自定义配额(user=user1,client-id=clientA):

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

为 user=user1 配置自定义配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.

为 client-id=clientA 配置自定义配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.

通过指定 --entity-default 选项而不是--entity-name , 可以为每个(用户、客户端 ID)、用户或客户端 ID 组设置默认配额。

为 user=userA 配置默认 client-id 配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.

为用户配置默认配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.

配置 client-id 的默认配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.

以下是描述给定(用户、客户端 ID)的配额的方法:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定用户的配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定客户端 ID 的配额:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

如果未指定实体名称,则描述指定类型的所有实体。例如,描述所有用户:

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

类似地对于(用户,客户端):

> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

6.2 数据中心

某些部署需要管理跨越多个数据中心的数据管道。我们推荐的方法是在每个数据中心部署一个本地 Kafka 集群,每个数据中心中的应用程序实例仅与其本地集群交互并在集群之间镜像数据(有关如何执行此操作的信息, 请参阅异地复制文档)。

这种部署模式允许数据中心充当独立实体,并允许我们集中管理和调整数据中心间的复制。即使数据中心间链路不可用,这也允许每个设施独立运行:当发生这种情况时,镜像会落后,直到链路恢复,此时它会赶上。

对于需要所有数据的全局视图的应用程序,您可以使用镜像来提供具有从所有数据中心的本地集群镜像的聚合数据的集群。这些聚合集群用于由需要完整数据集的应用程序进行读取。

这不是唯一可能的部署模式。可以通过 WAN 读取或写入远程 Kafka 集群,但显然这会增加获取集群所需的延迟。

Kafka 自然地在生产者和消费者中对数据进行批处理,因此即使在高延迟连接下也能实现高吞吐量。为了实现这一点,可能需要使用socket.send.buffer.bytessocket.receive.buffer.bytes配置来增加生产者、消费者和代理的 TCP 套接字缓冲区大小。此处记录了设置此值的适当方法。

通常建议通过高延迟链路运行跨多个数据中心的单个Kafka 集群。这将导致 Kafka 写入和 ZooKeeper 写入产生非常高的复制延迟,并且如果位置之间的网络不可用,Kafka 和 ZooKeeper 都不会在所有位置保持可用。

6.3 异地复制(跨集群数据镜像)

异地复制概述

Kafka 管理员可以定义跨越各个 Kafka 集群、数据中心或地理区域边界的数据流。组织、技术或法律要求通常需要此类事件流设置。常见场景包括:

  • 异地复制
  • 灾难恢复
  • 将边缘集群馈送到中央聚合集群
  • 集群的物理隔离(例如生产与测试)
  • 云迁移或混合云部署
  • 法律和合规要求

管理员可以使用 Kafka 的 MirrorMaker(版本 2)设置此类集群间数据流,这是一种以流式传输方式在不同 Kafka 环境之间复制数据的工具。MirrorMaker 构建在 Kafka Connect 框架之上,支持以下功能:

  • 复制主题(数据加配置)
  • 复制消费者组,包括在集群之间迁移应用程序的偏移量
  • 复制 ACL
  • 保留分区
  • 自动检测新主题和分区
  • 提供广泛的指标,例如跨多个数据中心/集群的端到端复制延迟
  • 容错和水平可扩展的操作

注意:使用 MirrorMaker 进行异地复制可跨 Kafka 集群复制数据。这种集群间复制与 Kafka 的集群内复制不同,后者在同一个 Kafka 集群内复制数据。

什么是复制流

借助 MirrorMaker,Kafka 管理员可以将主题、主题配置、消费者组及其偏移量以及 ACL 从一个或多个源 Kafka 集群复制到一个或多个目标 Kafka 集群,即跨集群环境。简而言之,MirrorMaker 使用连接器从源集群进行消费并生产到目标集群。

这些从源集群到目标集群的定向流称为复制流。它们是使用 MirrorMaker 配置文件中的格式定义的,{source_cluster}->{target_cluster}如下所述。管理员可以根据这些流程创建复杂的复制拓扑。

以下是一些示例模式:

  • 主动/主动高可用性部署:A->B, B->A
  • 主动/被动或主动/备用高可用性部署:A->B
  • 聚合(例如,从多个集群到一个集群):A->K, B->K, C->K
  • 扇出(例如,从一个集群到多个集群):K->A, K->B, K->C
  • 转发:A->B, B->C, C->D

默认情况下,流会复制所有主题和消费者组。但是,每个复制流都可以独立配置。例如,您可以定义仅将特定主题或消费者组从源集群复制到目标集群。

以下是有关如何配置从primary集群到secondary集群的数据复制(主动/被动设置)的第一个示例:

# Basic settings
clusters = primary, secondary
primary.bootstrap.servers = broker3-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092

# Define replication flows
primary->secondary.enabled = true
primary->secondary.topics = foobar-topic, quux-.*

配置异地复制

以下部分介绍如何配置和运行专用 MirrorMaker 集群。如果您想在现有 Kafka Connect 集群或其他支持的部署设置中运行 MirrorMaker,请参阅KIP-382:MirrorMaker 2.0,并注意配置设置的名称可能因部署模式而异。

除了以下部分中介绍的内容之外,有关配置设置的更多示例和信息可在以下位置找到:

配置文件语法

MirrorMaker 配置文件通常命名为connect-mirror-maker.properties. 您可以在此文件中配置各种组件:

  • MirrorMaker 设置:全局设置,包括集群定义(别名)以及每个复制流的自定义设置
  • Kafka Connect 和连接器设置
  • Kafka 生产者、消费者和管理客户端设置

示例:定义 MirrorMaker 设置(稍后详细说明)。

# Global settings
clusters = us-west, us-east   # defines cluster aliases
us-west.bootstrap.servers = broker3-west:9092
us-east.bootstrap.servers = broker5-east:9092

topics = .*   # all topics to be replicated by default

# Specific replication flow settings (here: flow from us-west to us-east)
us-west->us-east.enabled = true
us-west->us.east.topics = foo.*, bar.*  # override the default above

MirrorMaker 基于 Kafka Connect 框架。Kafka Connect 文档章节中描述的任何 Kafka Connect、源连接器和接收器连接器设置都可以直接在 MirrorMaker 配置中使用,而无需更改配置设置的名称或为其添加前缀。

示例:定义 MirrorMaker 使用的自定义 Kafka Connect 设置。

# Setting Kafka Connect defaults for MirrorMaker
tasks.max = 5

大多数默认的 Kafka Connect 设置对于开箱即用的 MirrorMaker 都能很好地工作,但tasks.max. 为了在多个 MirrorMaker 进程之间均匀分配工作负载,建议根据可用硬件资源和要复制的主题分区总数 设置tasks.max为至少(最好更高)。2

您可以进一步自定义每个源或目标集群 的 MirrorMaker 的 Kafka Connect 设置(更准确地说,您可以“每个连接器”指定 Kafka Connect 工作线程级别的配置设置)。{cluster}.{config_name}使用MirrorMaker 配置文件中 的格式。

示例:为us-west集群定义自定义连接器设置。

# us-west custom settings
us-west.offset.storage.topic = my-mirrormaker-offsets

MirrorMaker 内部使用 Kafka 生产者、消费者和管理客户端。通常需要为这些客户端进行自定义设置。要覆盖默认值,请在 MirrorMaker 配置文件中使用以下格式:

  • {source}.consumer.{consumer_config_name}
  • {target}.producer.{producer_config_name}
  • {source_or_target}.admin.{admin_config_name}

示例:定义自定义生产者、消费者、管理客户端设置。

# us-west cluster (from which to consume)
us-west.consumer.isolation.level = read_committed
us-west.admin.bootstrap.servers = broker57-primary:9092

# us-east cluster (to which to produce)
us-east.producer.compression.type = gzip
us-east.producer.buffer.memory = 32768
us-east.admin.bootstrap.servers = broker8-secondary:9092

Exactly once

从3.5.0版本开始,专用的MirrorMaker集群支持精确一次语义。

对于新的MirrorMaker集群,将 exactly.once.source.support属性设置为启用所有目标Kafka集群,这些集群应该使用精确一次语义写入。例如,要为集群us-east的写入启用精确一次,可以使用以下配置:

us-east.exactly.once.source.support = enabled

对于现有的MirrorMaker集群,需要两步升级。与其立即将theexactlyexactly.once.source.support属性设置为启用,不如首先将其设置为在集群中的所有节点上preparing。一旦完成,可以在第二轮重新启动中将其设置为在集群中的所有节点上启用。

无论哪种情况,都需要启用MirrorMaker节点之间的集群内通信,如KIP-710所述。为此,dedicated.mode.enable.internal.rest属性必须设置为true。此外,可用于Kafka Connect的许多REST相关配置属性可以指定MirrorMaker配置。例如,要启用MirrorMaker集群中与本地计算机80端口80上监听的每个节点进行集群内部通信,应将以下内容添加到MirrorMaker配置文件中:

dedicated.mode.enable.internal.rest = true
listeners = http://localhost:8080

运行MirrorMaker时,还建议从已复制的数据中筛选已中止事务中的记录。为此,请确保将用于从源集群读取的使用者配置为隔离级别设置为read_committed。如果从us west集群复制数据,则可以通过将以下内容添加到MirrorMaker配置文件中,对从该集群读取的所有复制流执行此操作:

请注意,如果在生产环境中启用集群内部通信,强烈建议保护每个MirrorMaker节点带来的REST服务器。有关如何实现此操作的信息请参阅Kafka Connect配置属性

还建议在运行 MirrorMaker 时从复制数据中过滤掉来自已中止事务的记录。 为此,请确保用于从源集群读取的使用者配置为 isolation.level 设置为 read_committed。 如果从集群 us-west 复制数据,则可以通过将以下内容添加到 MirrorMaker 配置文件来对从该集群读取的所有复制流完成此操作:

us-west.consumer.isolation.level = read_committed

最后一点,在引擎盖下,MirrorMaker使用Kafka Connect源连接器来复制数据。有关此类连接器的精确一次支持的更多信息,请参阅相关文档页面

创建和启用复制流

要定义复制流,您必须首先在 MirrorMaker 配置文件中定义相应的源和目标 Kafka 集群。

  • clusters(必需):以逗号分隔的 Kafka 集群“别名”列表
  • {clusterAlias}.bootstrap.servers(必填):特定集群的连接信息;“引导”Kafka broker的逗号分隔列表

示例:定义两个集群别名primarysecondary,包括它们的连接信息。

clusters = primary, secondary
primary.bootstrap.servers = broker10-primary:9092,broker-11-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092,broker6-secondary:9092

{source}->{target}.enabled = true其次,您必须根据需要 显式启用各个复制流。请记住,流是定向的:如果需要双向(双向)复制,则必须启用两个方向的流。

# Enable replication from primary to secondary
primary->secondary.enabled = true

默认情况下,复制流会将除少数特殊主题和使用者组之外的所有内容从源集群复制到目标集群,并自动检测任何新创建的主题和组。目标集群中复制主题的名称将以源集群的名称为前缀(请参阅下面的进一步部分)。例如,foo源集群中的主题将被复制到目标集群中us-west命名的主题。 us-west.foo``us-east

后续部分解释如何根据您的需要自定义此基本设置。

配置复制流

复制流的配置是顶级默认设置(例如topics)的组合,在其之上应用特定于流的设置(如果有)(例如us-west->us-east.topics)。要更改顶级默认设置,请将相应的顶级设置添加到 MirrorMaker 配置文件中。要仅覆盖特定复制流的默认值,请使用语法 format {source}->{target}.{config.name}

最重要的设置是:

  • topics:主题列表或正则表达式,定义源集群中要复制的主题(默认值topics = .*:)
  • topics.exclude:主题列表或正则表达式,用于随后排除与设置匹配的主题topics(默认值topics.exclude = .*[\-.]internal, .*.replica, __.*:)
  • groups:主题或正则表达式列表,定义源集群中要复制的消费者组(默认值groups = .*:)
  • groups.exclude:主题列表或正则表达式,用于随后排除与设置匹配的消费者组groups(默认值groups.exclude = console-consumer-.*, connect-.*, __.*:)
  • {source}->{target}.enable:设置true为启用复制流(默认值false:)

例子:

# Custom top-level defaults that apply to all replication flows
topics = .*
groups = consumer-group1, consumer-group2

# Don't forget to enable a flow!
us-west->us-east.enabled = true

# Custom settings for specific replication flows
us-west->us-east.topics = foo.*
us-west->us-east.groups = bar.*
us-west->us-east.emit.heartbeats = false

支持其他配置设置,下面列出了其中一些。在大多数情况下,您可以将这些设置保留为默认值。有关更多详细信息,请参阅MirrorMakerConfigMirrorConnectorConfig

  • refresh.topics.enabled:是否定期检查源集群中的新主题(默认:true)
  • refresh.topics.interval.seconds:在源集群中检查新主题的频率;低于默认值可能会导致性能下降(默认值:600,每十分钟)
  • refresh.groups.enabled:是否定期检查源集群中是否有新的消费者组(默认:true)
  • refresh.groups.interval.seconds:检查源集群中新消费者组的频率;低于默认值可能会导致性能下降(默认值:600,每十分钟)
  • sync.topic.configs.enabled:是否从源集群复制主题配置(默认:true)
  • sync.topic.acls.enabled:是否同步源集群的ACL(默认:true)
  • emit.heartbeats.enabled:是否定期发出心跳(默认:true)
  • emit.heartbeats.interval.seconds:发出心跳的频率(默认值:1,每隔一秒)
  • heartbeats.topic.replication.factor:MirrorMaker内部心跳主题的复制因子(默认:3)
  • emit.checkpoints.enabled:是否定期发出 MirrorMaker 的消费者偏移量(默认值:true)
  • emit.checkpoints.interval.seconds:发出检查点的频率(默认值:60,每分钟)
  • checkpoints.topic.replication.factor:MirrorMaker 内部检查点主题的复制因子(默认值:3)
  • sync.group.offsets.enabled``__consumer_offsets:只要该组中没有活动消费者连接到目标集群,是否 定期将复制的消费者组(在源集群中)的翻译偏移量写入目标集群中的主题(默认值:false)
  • sync.group.offsets.interval.seconds:消费者组偏移量同步的频率(默认值:60,每分钟)
  • offset-syncs.topic.replication.factor:MirrorMaker内部偏移同步主题的复制因子(默认值:3)

保护复制流

MirrorMaker 支持与 Kafka Connect 相同的安全设置,因此请参阅链接部分以获取更多信息。

示例:加密 MirrorMaker 与us-east集群之间的通信。

us-east.security.protocol=SSL
us-east.ssl.truststore.location=/path/to/truststore.jks
us-east.ssl.truststore.password=my-secret-password
us-east.ssl.keystore.location=/path/to/keystore.jks
us-east.ssl.keystore.password=my-secret-password
us-east.ssl.key.password=my-secret-password

目标集群中复制主题的自定义命名

目标集群中的复制主题(有时称为远程主题)根据复制策略进行重命名。MirrorMaker 使用此策略来确保来自不同集群的事件(也称为记录、消息)不会写入同一主题分区。默认情况下,根据DefaultReplicationPolicy,目标集群中复制主题的名称采用以下格式{source}.{source_topic_name}

us-west         us-east
=========       =================
                bar-topic
foo-topic  -->  us-west.foo-topic

您可以使用以下设置自定义分隔符(默认:.replication.policy.separator

# Defining a custom separator
us-west->us-east.replication.policy.separator = _

如果您需要进一步控制复制主题的命名方式,您可以在 MirrorMaker 配置中 实现自定义ReplicationPolicy并覆盖replication.policy.class(默认为)。DefaultReplicationPolicy

防止配置冲突

MirrorMaker 进程通过其目标 Kafka 集群共享配置。当针对同一目标集群运行的 MirrorMaker 进程之间的配置不同时,此行为可能会导致冲突。

例如,以下两个 MirrorMaker 进程将是活泼的:

# Configuration of process 1
A->B.enabled = true
A->B.topics = foo

# Configuration of process 2
A->B.enabled = true
A->B.topics = bar

在这种情况下,两个进程将通过 cluster 共享配置B,这会导致冲突。根据两个进程中的哪一个被选举为“领导者”,结果将是主题foo或主题bar被复制,但不会同时复制。

因此,在同一目标集群的复制流中保持 MirrorMaker 配置一致非常重要。例如,这可以通过自动化工具或为整个组织使用单个共享的 MirrorMaker 配置文件来实现。

最佳实践:从远程消费,生产到本地

为了最大限度地减少延迟(“生产者滞后”),建议将 MirrorMaker 进程放置在尽可能靠近其目标集群(即它向其生成数据的集群)的位置。这是因为 Kafka 生产者通常比 Kafka 消费者更容易遇到不可靠或高延迟的网络连接。

First DC          Second DC
==========        =========================
primary --------- MirrorMaker --> secondary
(remote)                           (local)

要运行这样的“从远程使用,生成到本地”设置,请在靠近目标集群且最好在与目标集群相同的位置运行 MirrorMaker 进程,并在命令行参数(空白分隔列表)中显式设置这些“本地”--clusters集群集群别名):

# Run in secondary's data center, reading from the remote `primary` cluster
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary

--clusters secondary告诉 MirrorMaker 进程给定的集群位于附近,并阻止其复制数据或将配置发送到其他远程位置的集群。

示例:主动/被动高可用性部署

以下示例显示了将主题从主 Kafka 环境复制到辅助 Kafka 环境的基本设置,但不从辅助 Kafka 环境复制回主环境。请注意,大多数生产设置都需要进一步配置,例如安全设置。

# Unidirectional flow (one-way) from primary to secondary cluster
primary.bootstrap.servers = broker1-primary:9092
secondary.bootstrap.servers = broker2-secondary:9092

primary->secondary.enabled = true
secondary->primary.enabled = false

primary->secondary.topics = foo.*  # only replicate some topics

示例:主动/主动高可用性部署

以下示例显示了以两种方式在两个集群之间复制主题的基本设置。请注意,大多数生产设置都需要进一步配置,例如安全设置。

# Bidirectional flow (two-way) between us-west and us-east clusters
clusters = us-west, us-east
us-west.bootstrap.servers = broker1-west:9092,broker2-west:9092
Us-east.bootstrap.servers = broker3-east:9092,broker4-east:9092

us-west->us-east.enabled = true
us-east->us-west.enabled = true

关于防止复制“循环”的注意事项(其中主题最初从 A 复制到 B,然后复制的主题将再次从 B 复制到 A,依此类推):只要您在同一个 MirrorMaker 中定义上述流程配置文件中,您不需要显式添加topics.exclude设置来防止两个集群之间的复制循环。

示例:多集群异地复制

让我们将前面部分中的所有信息放在一个更大的示例中。想象一下,有三个数据中心(西、东、北),每个数据中心有两个 Kafka 集群(例如 、west-1west-2。本节中的示例显示如何配置 MirrorMaker (1) 以实现每个数据中心内的主动/主动复制,以及 (2) 跨数据中心复制 (XDCR)。

首先,在配置中定义源集群和目标集群及其复制流:

# Basic settings
clusters: west-1, west-2, east-1, east-2, north-1, north-2
west-1.bootstrap.servers = ...
west-2.bootstrap.servers = ...
east-1.bootstrap.servers = ...
east-2.bootstrap.servers = ...
north-1.bootstrap.servers = ...
north-2.bootstrap.servers = ...

# Replication flows for Active/Active in West DC
west-1->west-2.enabled = true
west-2->west-1.enabled = true

# Replication flows for Active/Active in East DC
east-1->east-2.enabled = true
east-2->east-1.enabled = true

# Replication flows for Active/Active in North DC
north-1->north-2.enabled = true
north-2->north-1.enabled = true

# Replication flows for XDCR via west-1, east-1, north-1
west-1->east-1.enabled  = true
west-1->north-1.enabled = true
east-1->west-1.enabled  = true
east-1->north-1.enabled = true
north-1->west-1.enabled = true
north-1->east-1.enabled = true

然后,在每个数据中心中,启动一个或多个 MirrorMaker,如下所示:

# In West DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters west-1 west-2

# In East DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters east-1 east-2

# In North DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters north-1 north-2

通过此配置,任何集群生成的记录都将在数据中心内复制,并跨到其他数据中心。通过提供--clusters参数,我们确保每个 MirrorMaker 进程仅向附近的集群生成数据。

注意:--clusters从技术上讲,此处不需要该参数。没有它,MirrorMaker 也能正常工作。但是,吞吐量可能会受到数据中心之间“生产者滞后”的影响,并且您可能会产生不必要的数据传输成本。

开始异地复制

您可以根据需要运行任意数量的 MirrorMaker 进程(例如:节点、服务器)。由于 MirrorMaker 基于 Kafka Connect,因此配置为复制相同 Kafka 集群的 MirrorMaker 进程在分布式设置中运行:它们将找到彼此、共享配置(请参阅下面的部分)、负载平衡其工作等等。例如,如果您想要提高复制流的吞吐量,一种选择是并行运行其他 MirrorMaker 进程。

要启动 MirrorMaker 进程,请运行以下命令:

$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

启动后,MirrorMaker 进程可能需要几分钟时间才开始复制数据。

或者,如前所述,您可以设置参数--clusters以确保 MirrorMaker 进程仅向附近的集群生成数据。

# Note: The cluster alias us-west must be defined in the configuration file
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties \
            --clusters us-west

测试使用者组复制时请注意:默认情况下,MirrorMaker 不会复制该 kafka-console-consumer.sh工具创建的使用者组,您可以使用该工具在命令行上测试 MirrorMaker 设置。如果您确实也想复制这些使用者组,请groups.exclude相应地设置配置(默认值groups.exclude = console-consumer-.*, connect-.*, __.*:)。请记住在完成测试后再次更新配置。

停止异地复制

您可以通过使用以下命令发送 SIGTERM 信号来停止正在运行的 MirrorMaker 进程:

$ kill <MirrorMaker pid>

应用配置更改

要使配置更改生效,必须重新启动 MirrorMaker 进程。

监控异地复制

建议监控 MirrorMaker 进程,以确保所有定义的复制流程均正常启动并运行。MirrorMaker 基于 Connect 框架构建,并继承了 Connect 的所有指标,例如source-record-poll-rate. 此外,MirrorMaker 在kafka.connect.mirror指标组下生成自己的指标。指标带有以下属性标记:

  • source:源集群的别名(例如,primary
  • target:目标集群的别名(例如,secondary
  • topic:目标集群上的复制主题
  • partition:正在复制的分区

跟踪每个复制主题的指标。可以从主题名称推断出源集群。例如,复制topic1primary->secondary产生如下指标:

  • target=secondary
  • topic=primary.topic1
  • partition=1

发出以下指标:

# MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)

record-count            # number of records replicated source -> target
record-age-ms           # age of records when they are replicated
record-age-ms-min
record-age-ms-max
record-age-ms-avg
replication-latency-ms  # time it takes records to propagate source->target
replication-latency-ms-min
replication-latency-ms-max
replication-latency-ms-avg
byte-rate               # average number of bytes/sec in replicated records

# MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)

checkpoint-latency-ms   # time it takes to replicate consumer offsets
checkpoint-latency-ms-min
checkpoint-latency-ms-max
checkpoint-latency-ms-avg

这些指标不区分创建时间和日志追加时间戳。

6.4 多租户

多租户概述

作为一个高度可扩展的事件流平台,Kafka 被许多用户用作他们的中枢神经系统,实时连接来自不同团队和业务线的各种不同系统和应用程序。这种多租户集群环境需要适当的控制和管理,以确保这些不同需求的和平共存。本节重点介绍设置此类共享环境的功能和最佳实践,这将帮助您操作满足 SLA/OLA 的集群,并最大限度地减少“吵闹的邻居”造成的潜在附带损害。

多租户是一个多方面的主题,包括但不限于:

  • 为租户创建用户空间(有时称为命名空间)
  • 使用数据保留策略等配置主题
  • 通过加密、身份验证和授权来保护主题和集群
  • 通过配额和费率限制隔离租户
  • 监控与计量
  • 集群间数据共享(参见异地复制)

使用主题命名为租户创建用户空间(命名空间)

操作多租户集群的 Kafka 管理员通常需要为每个租户定义用户空间。就本节而言,“用户空间”是主题的集合,这些主题在单个实体或用户的管理下组合在一起。

在Kafka中,数据的主要单位是主题。用户可以创建并命名每个主题。他们也可以删除它们,但无法直接重命名主题。相反,要重命名主题,用户必须创建新主题,将消息从原始主题移动到新主题,然后删除原始主题。考虑到这一点,建议基于分层主题命名结构来定义逻辑空间。然后,此设置可以与安全功能(例如前缀 ACL)相结合,以隔离不同的空间和租户,同时最大限度地减少保护集群中数据的管理开销。

这些逻辑用户空间可以通过不同的方式进行分组,具体选择取决于您的组织更喜欢如何使用 Kafka 集群。最常见的分组如下。

按团队或组织单位:在这里,团队是主要的聚合者。在团队是 Kafka 基础设施主要用户的组织中,这可能是最好的分组。

主题命名结构示例:

  • <organization>.<team>.<dataset>.<event-name>
    (e.g., "acme.infosec.telemetry.logins")

按项目或产品:在这里,一个团队管理多个项目。每个项目的凭据都不同,因此所有控件和设置将始终与项目相关。

主题命名结构示例:

  • <project>.<product>.<event-name>
    (e.g., "mobility.payments.suspicious")

某些信息通常不应放在主题名称中,例如可能随时间变化的信息(例如,目标消费者的名称)或者是其他地方可用的技术细节或元数据(例如,主题的分区)计数和其他配置设置)。

要强制实施主题命名结构,可以使用以下几个选项:

  • 使用前缀 ACL(参见KIP-290)强制主题名称使用公共前缀。例如,团队 A 可能只被允许创建名称以 开头的主题payments.teamA.
  • 定义自定义CreateTopicPolicy(参见KIP-108和设置create.topic.policy.class.name)以强制执行严格的命名模式。这些策略提供了最大的灵活性,并且可以涵盖复杂的模式和规则来满足组织的需求。
  • 通过使用 ACL 拒绝普通用户禁用主题创建,然后依靠外部进程代表用户创建主题(例如,脚本或您最喜欢的自动化工具包)。
  • auto.create.topics.enable=false通过在代理配置中进行设置来禁用 Kafka 功能以按需自动创建主题也可能很有用。请注意,您不应仅仅依赖此选项。

配置主题:数据保留等

Kafka 的配置由于其精细的粒度而非常灵活,并且它支持大量的按主题配置设置,以帮助管理员设置多租户集群。例如,管理员通常需要定义数据保留策略,以控制数据在主题中存储的数量和/或多长时间,并使用诸如retention.bytes(大小)和retention.ms(时间)等设置。这限制了集群内的存储消耗,并有助于遵守 GDPR 等法律要求。

保护集群和主题:身份验证、授权、加密

由于该文档有专门的一章介绍适用于任何 Kafka 部署的安全性,因此本节重点介绍多租户环境的其他注意事项。

Kafka 的安全设置分为三个主要类别,这与管理员保护其他客户端-服务器数据系统(如关系数据库和传统消息系统)的方式类似。

  1. 对 Kafka 代理和 Kafka 客户端之间、代理之间、代理和 ZooKeeper 节点之间以及代理和其他可选工具之间传输的数据进行加密。
  2. 对从 Kafka 客户端和应用程序到 Kafka 代理的连接以及从 Kafka 代理到 ZooKeeper 节点的连接进行身份验证。
  3. 对主题的创建、删除、更改配置等客户端操作进行授权;将事件写入主题或从主题读取事件;创建和删除 ACL。管理员还可以定义自定义策略以实施其他限制,例如CreateTopicPolicyand AlterConfigPolicy(请参阅KIP-108和设置create.topic.policy.class.namealter.config.policy.class.name)。

当保护多租户 Kafka 环境时,最常见的管理任务是第三类(授权),即管理用户/客户端权限,授予或拒绝对某些主题的访问,从而授予或拒绝对集群内用户存储的数据的访问。该任务主要通过访问控制列表(ACL)的设置来执行。在这里,多租户环境的管理员特别受益于将分层主题命名结构放在适当的位置(如上一节所述),因为他们可以通过前缀 ACL 方便地控制对主题的访问(--resource-pattern-type Prefixed)。这大大减少了多租户环境中保护主题的管理开销:管理员可以在更高的开发便利性(更宽松的权限,使用更少和更广泛的 ACL)与更严格的安全性(更严格的权限,使用更多和更广泛的 ACL)之间进行权衡。更窄的 ACL)。

在以下示例中,用户 Alice(ACME 公司 InfoSec 团队的新成员)被授予对名称以“acme.infosec.”开头的所有主题的写入权限,例如“acme.infosec.telemetry.logins”和“acme.infosec.logins”。 infosec.syslogs.events”。

# Grant permissions to user Alice
$ bin/kafka-acls.sh \
    --bootstrap-server broker1:9092 \
    --add --allow-principal User:Alice \
    --producer \
    --resource-pattern-type prefixed --topic acme.infosec.

您可以类似地使用此方法来隔离同一共享集群上的不同客户。

隔离租户:配额、速率限制、限制

多租户集群通常应配置配额,以防止用户(租户)占用过多集群资源,例如当他们尝试写入或读取大量数据时,或以过高的速率向代理创建请求时。这可能会导致网络饱和、垄断代理资源并影响其他客户端——所有这些都是您希望在共享环境中避免的。

客户端配额: Kafka 支持不同类型的(每用户主体)客户端配额。由于客户端的配额与客户端写入或读取哪个主题无关,因此它们是在多租户集群中分配资源的便捷且有效的工具。例如,请求速率配额通过限制代理在请求处理路径上花费的时间来帮助限制用户对代理 CPU 使用率的影响在许多情况下,在多租户集群中,使用请求速率配额隔离用户比设置传入/传出网络带宽配额影响更大,因为用于处理请求的代理 CPU 使用率过高会降低有效带宽broker可以提供服务。此外,管理员还可以定义主题操作的配额(例如创建、删除和更改),以防止 Kafka 集群因高并发主题操作而不堪重负(请参阅KIP-599和配额类型controller_mutation_rate)。

服务器配额: Kafka还支持不同类型的broker端配额。例如,管理员可以设置代理接受新连接的速率限制、设置每个代理的最大连接数或设置允许来自特定 IP 地址的最大连接数。

欲了解更多信息,请参阅配额概述如何设置配额

监控与计量

监控是一个更广泛的主题,在文档的其他部分中有介绍。任何 Kafka 环境(尤其是多租户环境)的管理员都应根据这些说明设置监控。Kafka 支持广泛的指标,例如身份验证尝试失败率、请求延迟、消费者滞后、消费者组总数、上一节中描述的配额指标等等。

例如,可以将监控配置为跟踪主题分区的大小(使用 JMX 指标kafka.log.Log.Size.<TOPIC-NAME>),从而跟踪主题中存储的数据的总大小。然后,您可以定义当共享集群上的租户即将使用过多存储空间时发出的警报。

多租户和地理复制

Kafka 允许您跨不同集群共享数据,这些集群可能位于不同的地理区域、数据中心等。除了灾难恢复等用例之外,当多租户设置需要集群间数据共享时,此功能非常有用。有关详细信息, 请参阅异地复制(跨集群数据镜像)部分。

进一步的考虑

数据合约:您可能需要使用事件模式在集群中的数据生产者和消费者之间定义数据契约。这确保写入 Kafka 的事件始终可以再次正确读取,并防止写入格式错误或损坏的事件。实现此目标的最佳方法是与集群一起部署所谓的模式注册表。(Kafka 不包含模式注册表,但有可用的第三方实现。)模式注册表管理事件模式并将模式映射到主题,以便生产者知道哪些主题正在接受哪些类型(模式)的事件,以及消费者知道如何读取和解析主题中的事件。一些注册表实现提供了更多功能,例如架构演变、存储所有架构的历史记录以及架构兼容性设置。

6.5 Kafka配置

重要的客户端配置

最重要的生产者配置是:

  • 确认
  • 压缩
  • 批量大小

最重要的消费者配置是获取大小。

所有配置都记录在配置部分中。

生产服务器配置

以下是生产服务器配置示例:

# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]

# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]

我们的客户端配置在不同的用例之间存在很大差异。

6.6 Java版本

支持 Java 8、Java 11 和 Java 17。请注意,自 Apache Kafka 3.0 起,Java 8 支持已被弃用,并将在 Apache Kafka 4.0 中删除。如果启用 TLS,Java 11 及更高版本的性能会显着提高,因此强烈推荐它们(它们还包括许多其他性能改进:G1GC、CRC32C、紧凑字符串、线程本地握手等)。从安全角度来看,我们推荐最新发布的补丁版本,因为旧的免费版本已披露了安全漏洞。使用基于 OpenJDK 的 Java 实现(包括 Oracle JDK)运行 Kafka 的典型参数是:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent

作为参考,以下是使用上述 Java 参数的 LinkedIn 最繁忙集群之一(高峰期)的统计数据:

  • 60 名broker
  • 50k 分区(复制因子 2)
  • 800k 消息/秒
  • 入站 300 MB/秒,出站 1 GB/秒以上

该集群中的所有代理的 90% GC 暂停时间约为 21 毫秒,每秒少于 1 次年轻 GC。

6.7 硬件和操作系统

我们使用具有 24GB 内存的双四核 Intel Xeon 机器。

您需要足够的内存来缓冲活动的读取器和写入器。您可以通过假设您希望能够缓冲 30 秒并将内存需求计算为 write_throughput*30 来对内存需求进行粗略估计。

磁盘吞吐量很重要。我们有 8x7200 rpm SATA 驱动器。一般来说磁盘吞吐量是性能瓶颈,磁盘越多越好。根据您配置刷新行为的方式,您可能会也可能不会从更昂贵的磁盘中受益(如果您经常强制刷新,那么更高 RPM 的 SAS 驱动器可能会更好)。

操作系统

Kafka应该可以在任何unix系统上运行良好,并且已经在Linux和Solaris上进行了测试。

我们发现在 Windows 上运行时存在一些问题,并且 Windows 目前不是一个得到良好支持的平台,但我们很乐意对此进行更改。

它不太可能需要太多操作系统级别的调整,但存在三个潜在重要的操作系统级别配置:

  • 文件描述符限制:Kafka 使用文件描述符来表示日志段和打开的连接。如果代理托管许多分区,请考虑代理除了代理建立的连接数之外,还至少需要 (number_of_partitions)*(partition_size/segment_size) 来跟踪所有日志段。我们建议代理进程至少使用 100000 个允许的文件描述符作为起点。注意:mmap() 函数添加对与文件描述符 fildes 关联的文件的额外引用,该文件描述符上的后续 close() 不会删除该引用。当不再有到该文件的映射时,该引用将被删除。
  • 最大套接字缓冲区大小:可以增加以实现数据中心之间的高性能数据传输,如此处所述
  • 进程可以拥有的内存映射区域的最大数量(也称为 vm.max_map_count)。请参阅 Linux 内核文档。在考虑代理可能拥有的最大分区数时,您应该留意这个操作系统级别的属性。默认情况下,在许多 Linux 系统上,vm.max_map_count 的值约为 65535。每个分区分配的每个日志段都需要一对索引/时间索引文件,并且每个文件消耗 1 个映射区域。换句话说,每个日志段使用2个地图区域。因此,每个分区至少需要 2 个映射区域,只要它托管单个日志段即可。也就是说,在代理上创建 50000 个分区将导致分配 100000 个映射区域,并可能在具有默认 vm.max_map_count 的系统上导致代理崩溃并出现 OutOfMemoryError(映射失败)。请记住,每个分区的日志段数根据段大小、负载强度、保留策略以及通常情况下的变化而变化

磁盘和文件系统

我们建议使用多个驱动器来获得良好的吞吐量,并且不要与应用程序日志或其他操作系统文件系统活动共享用于 Kafka 数据的相同驱动器,以确保良好的延迟。您可以将这些驱动器一起 RAID 到单个卷中,也可以将每个驱动器格式化并安装为其自己的目录。由于 Kafka 具有复制功能,因此 RAID 提供的冗余也可以在应用程序级别提供。这种选择有几个权衡。

如果配置多个数据目录,分区将循环分配给数据目录。每个分区将完全位于一个数据目录中。如果分区之间的数据没有很好地平衡,这可能会导致磁盘之间的负载不平衡。

RAID 在平衡磁盘之间的负载方面可能会做得更好(尽管看起来并不总是如此),因为它在较低级别上平衡负载。RAID 的主要缺点是它通常会严重影响写入吞吐量并减少可用磁盘空间。

RAID 的另一个潜在好处是能够容忍磁盘故障。然而,我们的经验是,重建 RAID 阵列的 I/O 密集程度很高,以至于它会有效地禁用服务器,因此这并不能提供太多真正的可用性改进。

应用程序与操作系统刷新管理

Kafka 总是立即将所有数据写入文件系统,并支持配置刷新策略的功能,该策略控制何时使用刷新将数据强制从操作系统缓存中取出并写入磁盘。可以控制此刷新策略,以在一段时间后或在写入一定数量的消息后强制将数据写入磁盘。此配置有多种选择。

Kafka 最终必须调用 fsync 才能知道数据已刷新。当从任何未知的 fsync 日志段的崩溃中恢复时,Kafka 将通过检查其 CRC 来检查每条消息的完整性,并重建随附的偏移量索引文件,作为启动时执行的恢复过程的一部分。

请注意,Kafka 中的持久性不需要将数据同步到磁盘,因为故障节点始终会从其副本中恢复。

我们建议使用默认刷新设置,完全禁用应用程序 fsync。这意味着依赖操作系统完成的后台刷新和 Kafka 自己的后台刷新。这为大多数用途提供了最好的特性:无需调节旋钮、出色的吞吐量和延迟以及完全恢复保证。我们通常认为复制提供的保证比同步到本地磁盘更强,但是偏执者可能仍然更喜欢两者,并且仍然支持应用程序级 fsync 策略。

使用应用程序级别刷新设置的缺点是,它的磁盘使用模式效率较低(它给操作系统重新排序写入的余地较小),并且可能会引入延迟,因为大多数 Linux 文件系统中的 fsync 会阻止写入文件,而后台刷新执行更细粒度的页面级锁定。

一般来说,您不需要对文件系统进行任何低级调整,但在接下来的几节中,我们将讨论其中的一些内容,以防它有用。

了解 Linux 操作系统刷新行为

在 Linux 中,写入文件系统的数据保留在页面缓存中,直到必须将其写出到磁盘(由于应用程序级 fsync 或操作系统自身的刷新策略)。数据的刷新是由一组称为 pdflush 的后台线程(或在 2.6.32 后的内核中的“flusher 线程”)完成的。

Pdflush 有一个可配置的策略,可以控制缓存中可以保留多少脏数据以及必须将其写回磁盘之前的时间。此处描述了该策略。当 Pdflush 无法跟上数据写入的速率时,最终会导致写入过程阻塞,从而产生写入延迟,从而减慢数据的积累。

您可以通过以下方式查看操作系统内存使用的当前状态

> 猫 /proc/meminfo

这些值的含义在上面的链接中有描述。

与进程内缓存相比,使用页面缓存来存储将写出到磁盘的数据有几个优点:

  • I/O 调度程序会将连续的小写入批量合并为更大的物理写入,从而提高吞吐量。
  • I/O 调度程序将尝试重新排序写入,以最大限度地减少磁盘头的移动,从而提高吞吐量。
  • 它会自动使用机器上的所有可用内存

文件系统选择

Kafka 使用磁盘上的常规文件,因此它对特定文件系统没有硬依赖。然而,使用最多的两个文件系统是 EXT4 和 XFS。从历史上看,EXT4 的使用量较多,但最近对 XFS 文件系统的改进表明,它对于 Kafka 工作负载具有更好的性能特征,且不影响稳定性。

使用各种文件系统创建和挂载选项,在具有大量消息负载的集群上执行比较测试。Kafka 中受监控的主要指标是“请求本地时间”,表示追加操作所花费的时间。XFS 带来了更好的本地时间(对于最佳 EXT4 配置,本地时间为 160 毫秒,而 250 毫秒以上),并且平均等待时间更短。XFS 性能还显示磁盘性能的变化较小。

一般文件系统注释

对于用于数据目录的任何文件系统,在 Linux 系统上,建议在挂载时使用以下选项:

  • noatime:此选项禁止在读取文件时更新文件的 atime(上次访问时间)属性。这可以消除大量的文件系统写入,特别是在引导消费者的情况下。Kafka 根本不依赖 atime 属性,因此禁用它是安全的。

XFS 注释

XFS 文件系统具有大量自动调整功能,因此无论是在文件系统创建时还是在挂载时,都不需要对默认设置进行任何更改。唯一值得考虑的调整参数是:

  • Largeio:这会影响 stat 调用报告的首选 I/O 大小。虽然这可以在较大的磁盘写入上实现更高的性能,但实际上它对性能的影响很小或没有影响。
  • nobarrier:对于具有电池支持缓存的底层设备,此选项可以通过禁用定期写入刷新来提供更高的性能。但是,如果底层设备表现良好,它将向文件系统报告它不需要刷新,并且此选项将不起作用。

EXT4注释

EXT4 是 Kafka 数据目录的一个可用的文件系统选择,但是要充分利用它的性能需要调整多个安装选项。此外,这些选项在故障情况下通常是不安全的,并且会导致更多的数据丢失和损坏。对于单个代理故障,这并不是什么大问题,因为可以擦除磁盘并从集群重建副本。在多次故障的情况下,例如断电,这可能意味着底层文件系统(以及数据)损坏且难以恢复。可以调整以下选项:

  • data=writeback:Ext4 默认为 data=ordered,这对某些写入设置了强顺序。Kafka 不需要这种排序,因为它对所有未刷新的日志进行非常偏执的数据恢复。此设置消除了排序限制,并且似乎显着减少了延迟。
  • 禁用日志记录:日志记录是一种权衡:它使服务器崩溃后重新启动速度更快,但它引入了大量额外的锁定,从而增加了写入性能的差异。那些不关心重新启动时间并希望减少写入延迟峰值的主要来源的人可以完全关闭日志记录。
  • commit=num_secs:这会调整 ext4 提交其元数据日志的频率。将其设置为较低的值可以减少崩溃期间未刷新数据的丢失。将其设置为更高的值将提高吞吐量。
  • nobh:此设置控制使用 data=writeback 模式时的附加排序保证。这对于 Kafka 来说应该是安全的,因为我们不依赖于写入顺序并提高了吞吐量和延迟。
  • delalloc:延迟分配意味着文件系统在物理写入发生之前避免分配任何块。这允许 ext4 分配较大的范围而不是较小的页面,并有助于确保数据按顺序写入。此功能对于吞吐量非常有用。它似乎确实涉及文件系统中的一些锁定,这增加了一些延迟差异。

更换 KRaft 控制器磁盘

metadata.log.dir当 Kafka 配置为使用 KRaft 时,控制器将集群元数据存储在-- 或第一个日志目录(如果metadata.log.dir未配置)中指定的目录中。metadata.log.dir有关详细信息,请参阅文档。

如果由于硬件故障或需要更换硬件而导致集群元数据目录中的数据丢失,则在配置新的控制器节点时应小心。在大多数控制器拥有所有提交的数据之前,不应格式化并启动新的控制器节点。要确定大多数控制器是否具有已提交的数据,请运行该kafka-metadata-quorum.sh工具来描述复制状态:

 > bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --replication
 NodeId  LogEndOffset    Lag     LastFetchTimestamp      LastCaughtUpTimestamp   Status
 1       25806           0       1662500992757           1662500992757           Leader
 ...     ...             ...     ...                     ...                     ...

检查并等待,直到Lag对于大多数控制器来说都很小。如果领导者的结束偏移量没有增加,则可以等到滞后为 0 时才获得多数;否则,您可以选择最新的领导者末端偏移量并等待所有副本都到达它。检查并等待,直到大多数控制器的LastFetchTimestamp和彼此接近。LastCaughtUpTimestamp此时,格式化控制器的元数据日志目录会更安全。这可以通过运行命令来完成kafka-storage.sh

 > bin/kafka-storage.sh format --cluster-id uuid --config server_properties

上面的命令可能bin/kafka-storage.sh format会失败并显示类似 的消息Log directory ... is already formatted。当使用组合模式并且仅丢失元数据日志目录而不丢失其他目录时,可能会发生这种情况。在这种情况下并且只有在这种情况下,您才能运行kafka-storage.sh format带有该--ignore-formatted选项的命令。

格式化日志目录后启动 KRaft 控制器。

 > /bin/kafka-server-start.sh server_properties

6.8 监控

Kafka 使用 Yammer Metrics 在服务器中进行指标报告。Java 客户端使用 Kafka Metrics,这是一个内置指标注册表,可最大程度地减少客户端应用程序中的传递依赖性。两者都通过 JMX 公开指标,并且可以配置为使用可插入统计报告器报告统计信息以连接到您的监控系统。

所有 Kafka 速率指标都有一个相应的累积计数指标,后缀为-total。例如, records-consumed-rate有一个名为 的相应指标records-consumed-total

查看可用指标的最简单方法是启动 jconsole 并将其指向正在运行的 kafka 客户端或服务器;这将允许使用 JMX 浏览所有指标。

使用 JMX 进行远程监控的安全注意事项

Apache Kafka 默认禁用远程 JMX。JMX_PORT您可以通过为使用 CLI启动的进程设置环境变量或标准 Java 系统属性来以编程方式启用远程 JMX,从而使用 JMX 启用远程监控 。在生产场景中启用远程 JMX 时,您必须启用安全性,以确保未经授权的用户无法监视或控制您的代理或应用程序以及它们运行的​​平台。KAFKA_JMX_OPTS请注意,默认情况下,Kafka 中的 JMX 身份验证处于禁用状态,并且必须通过为使用 CLI 启动的进程设置环境变量或设置适当的 Java 系统属性来覆盖生产部署的安全配置 。请参阅 使用 JMX 技术进行监控和管理 有关保护 JMX 的详细信息。

我们对以下指标进行绘图和警报:

DESCRIPTION MBEAN NAME NORMAL VALUE
Message in rate kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+) Incoming message rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Byte in rate from clients kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=([-.\w]+) Byte in (from the clients) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Byte in rate from other brokers kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec Byte in (from the other brokers) rate across all topics.
Controller Request rate from Broker kafka.controller:type=ControllerChannelManager,name=RequestRateAndQueueTimeMs,brokerId=([0-9]+) The rate (requests per second) at which the ControllerChannelManager takes requests from the queue of the given broker. And the time it takes for a request to stay in this queue before it is taken from the queue.
Controller Event queue size kafka.controller:type=ControllerEventManager,name=EventQueueSize Size of the ControllerEventManager's queue.
Controller Event queue time kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs Time that takes for any event (except the Idle event) to wait in the ControllerEventManager's queue before being processed
Request rate kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce FetchConsumer
Error rate kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+) Number of errors in responses counted per-request-type, per-error-code. If a response contains multiple errors, all are counted. error=NONE indicates successful responses.
Produce request rate kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec,topic=([-.\w]+) Produce request rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Fetch request rate kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec,topic=([-.\w]+) Fetch request (from clients or followers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Failed produce request rate kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic=([-.\w]+) Failed Produce request rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Failed fetch request rate kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic=([-.\w]+) Failed Fetch request (from clients or followers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Request size in bytes kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+) Size of requests for each request type.
Temporary memory size in bytes kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce Fetch}
Message conversion time kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce Fetch}
Message conversion rate kafka.server:type=BrokerTopicMetrics,name={Produce Fetch}MessageConversionsPerSec,topic=([-.\w]+)
Request Queue Size kafka.network:type=RequestChannel,name=RequestQueueSize Size of the request queue.
Byte out rate to clients kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=([-.\w]+) Byte out (to the clients) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate.
Byte out rate to other brokers kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec Byte out (to the other brokers) rate across all topics
Rejected byte rate kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=([-.\w]+) Rejected byte rate per topic, due to the record batch size being greater than max.message.bytes configuration. Omitting 'topic=(...)' will yield the all-topic rate.
Message validation failure rate due to no key specified for compacted topic kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec 0
Message validation failure rate due to invalid magic number kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec 0
Message validation failure rate due to incorrect crc checksum kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec 0
Message validation failure rate due to non-continuous offset or sequence number in batch kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec 0
Log flush rate and time kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
# of offline log directories kafka.log:type=LogManager,name=OfflineLogDirectoryCount 0
Leader election rate kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs non-zero when there are broker failures
Unclean leader election rate kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec 0
Is controller active on broker kafka.controller:type=KafkaController,name=ActiveControllerCount only one broker in the cluster should have 1
Pending topic deletes kafka.controller:type=KafkaController,name=TopicsToDeleteCount
Pending replica deletes kafka.controller:type=KafkaController,name=ReplicasToDeleteCount
Ineligible pending topic deletes kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount
Ineligible pending replica deletes kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount
# of under replicated partitions ( ISR <
# of under minIsr partitions ( ISR < min.insync.replicas)
# of at minIsr partitions ( ISR = min.insync.replicas)
Producer Id counts kafka.server:type=ReplicaManager,name=ProducerIdCount Count of all producer ids created by transactional and idempotent producers in each replica on the broker
Partition counts kafka.server:type=ReplicaManager,name=PartitionCount mostly even across brokers
Offline Replica counts kafka.server:type=ReplicaManager,name=OfflineReplicaCount 0
Leader replica counts kafka.server:type=ReplicaManager,name=LeaderCount mostly even across brokers
ISR shrink rate kafka.server:type=ReplicaManager,name=IsrShrinksPerSec If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate kafka.server:type=ReplicaManager,name=IsrExpandsPerSec See above
Failed ISR update rate kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec 0
Max lag in messages btw follower and leader replicas kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica lag should be proportional to the maximum batch size of a produce request.
Lag in messages per follower replica kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) lag should be proportional to the maximum batch size of a produce request.
Requests waiting in the producer purgatory kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce non-zero if ack=-1 is used
Requests waiting in the fetch purgatory kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch size depends on fetch.wait.max.ms in the consumer
Request total time kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce FetchConsumer
Time the request waits in the request queue kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce FetchConsumer
Time the request is processed at the leader kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce FetchConsumer
Time the request waits for the follower kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce FetchConsumer
Time the request waits in the response queue kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce FetchConsumer
Time to send the response kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce FetchConsumer
Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max
The average fraction of time the network processors are idle kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent between 0 and 1, ideally > 0.3
The number of connections disconnected on a processor due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count
The total number of connections disconnected, across all processors, due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this broker
The average fraction of time the request handler threads are idle kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent between 0 and 1, ideally > 0.3
Bandwidth quota metrics per (user, client-id), user or client-id kafka.server:type={Produce Fetch},user=([-.\w]+),client-id=([-.\w]+)
Request quota metrics per (user, client-id), user or client-id kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) Two attributes. throttle-time indicates the amount of time in ms the client was throttled. Ideally = 0. request-time indicates the percentage of time spent in broker network and I/O threads to process requests from client group. For (user, client-id) quotas, both user and client-id are specified. If per-client-id quota is applied to the client, user is not specified. If per-user quota is applied, client-id is not specified.
Requests exempt from throttling kafka.server:type=Request exempt-throttle-time indicates the percentage of time spent in broker network and I/O threads to process requests that are exempt from throttling.
ZooKeeper client request latency kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs Latency in milliseconds for ZooKeeper requests from broker.
ZooKeeper connection status kafka.server:type=SessionExpireListener,name=SessionState Connection status of broker's ZooKeeper session which may be one of Disconnected
Max time to load group metadata kafka.server:type=group-coordinator-metrics,name=partition-load-time-max maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Avg time to load group metadata kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Max time to load transaction metadata kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Avg time to load transaction metadata kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Rate of transactional verification errors kafka.server:type=AddPartitionsToTxnManager,name=VerificationFailureRate Rate of verifications that returned in failure either from the AddPartitionsToTxn API response or through errors in the AddPartitionsToTxnManager. In steady state 0, but transient errors are expected during rolls and reassignments of the transactional state partition.
Time to verify a transactional request kafka.server:type=AddPartitionsToTxnManager,name=VerificationTimeMs The amount of time queueing while a possible previous request is in-flight plus the round trip to the transaction coordinator to verify (or not verify)
Consumer Group Offset Count kafka.server:type=GroupMetadataManager,name=NumOffsets Total number of committed offsets for Consumer Groups
Consumer Group Count kafka.server:type=GroupMetadataManager,name=NumGroups Total number of Consumer Groups
Consumer Group Count, per State kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead] The number of Consumer Groups in each state: PreparingRebalance, CompletingRebalance, Empty, Stable, Dead
Number of reassigning partitions kafka.server:type=ReplicaManager,name=ReassigningPartitions The number of reassigning leader partitions on a broker.
Outgoing byte rate of reassignment traffic kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec 0; non-zero when a partition reassignment is in progress.
Incoming byte rate of reassignment traffic kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec 0; non-zero when a partition reassignment is in progress.
Size of a partition on disk (in bytes) kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+) The size of a partition on disk, measured in bytes.
Number of log segments in a partition kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+) The number of log segments in a partition.
First offset in a partition kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+) The first offset in a partition.
Last offset in a partition kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+) The last offset in a partition.

Tiered Storage Monitoring

以下一组指标可用于监控分层存储功能:

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
Remote Fetch Bytes Per Sec Rate of bytes read from remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteFetchBytesPerSec,topic=([-.\w]+)
Remote Fetch Requests Per Sec Rate of read requests from remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteFetchRequestsPerSec,topic=([-.\w]+)
Remote Fetch Errors Per Sec Rate of read errors from remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteFetchErrorsPerSec,topic=([-.\w]+)
Remote Copy Bytes Per Sec Rate of bytes copied to remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteCopyBytesPerSec,topic=([-.\w]+)
Remote Copy Requests Per Sec Rate of write requests to remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteCopyRequestsPerSec,topic=([-.\w]+)
Remote Copy Errors Per Sec Rate of write errors from remote storage per topic. Omitting 'topic=(...)' will yield the all-topic rate kafka.server:type=BrokerTopicMetrics,name=RemoteCopyErrorsPerSec,topic=([-.\w]+)
RemoteLogReader Task Queue Size Size of the queue holding remote storage read tasks org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderTaskQueueSize
RemoteLogReader Avg Idle Percent Average idle percent of thread pool for processing remote storage read tasks org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderAvgIdlePercent
RemoteLogManager Tasks Avg Idle Percent Average idle percent of thread pool for copying data to remote storage kafka.log.remote:type=RemoteLogManager,name=RemoteLogManagerTasksAvgIdlePercent

Kraft 监控指标

允许监控 KRaft 仲裁和元数据日志的指标集。
请注意,一些公开的指标取决于节点的角色,如process.roles

KRaft Quorum Monitoring Metrics

这些指标在 KRaft 集群中的控制器和代理上报告

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
Current State The current state of this member; possible values are leader, candidate, voted, follower, unattached, observer. kafka.server:type=raft-metrics,name=current-state
Current Leader The current quorum leader's id; -1 indicates unknown. kafka.server:type=raft-metrics,name=current-leader
Current Voted The current voted leader's id; -1 indicates not voted for anyone. kafka.server:type=raft-metrics,name=current-vote
Current Epoch The current quorum epoch. kafka.server:type=raft-metrics,name=current-epoch
High Watermark The high watermark maintained on this member; -1 if it is unknown. kafka.server:type=raft-metrics,name=high-watermark
Log End Offset The current raft log end offset. kafka.server:type=raft-metrics,name=log-end-offset
Number of Unknown Voter Connections Number of unknown voters whose connection information is not cached. This value of this metric is always 0. kafka.server:type=raft-metrics,name=number-unknown-voter-connections
Average Commit Latency The average time in milliseconds to commit an entry in the raft log. kafka.server:type=raft-metrics,name=commit-latency-avg
Maximum Commit Latency The maximum time in milliseconds to commit an entry in the raft log. kafka.server:type=raft-metrics,name=commit-latency-max
Average Election Latency The average time in milliseconds spent on electing a new leader. kafka.server:type=raft-metrics,name=election-latency-avg
Maximum Election Latency The maximum time in milliseconds spent on electing a new leader. kafka.server:type=raft-metrics,name=election-latency-max
Fetch Records Rate The average number of records fetched from the leader of the raft quorum. kafka.server:type=raft-metrics,name=fetch-records-rate
Append Records Rate The average number of records appended per sec by the leader of the raft quorum. kafka.server:type=raft-metrics,name=append-records-rate
Average Poll Idle Ratio The average fraction of time the client's poll() is idle as opposed to waiting for the user code to process records. kafka.server:type=raft-metrics,name=poll-idle-ratio-avg
Current Metadata Version Outputs the feature level of the current effective metadata version. kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
Metadata Snapshot Load Count The total number of times we have loaded a KRaft snapshot since the process was started. kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Latest Metadata Snapshot Size The total size in bytes of the latest snapshot that the node has generated. If none have been generated yet, this is the size of the latest snapshot that was loaded. If no snapshots have been generated or loaded, this is 0. kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
Latest Metadata Snapshot Age The interval in milliseconds since the latest snapshot that the node has generated. If none have been generated yet, this is approximately the time delta since the process was started. kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

KRaft Controller Monitoring Metrics

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
Active Controller Count The number of Active Controllers on this node. Valid values are '0' or '1'. kafka.controller:type=KafkaController,name=ActiveControllerCount
Event Queue Time Ms A Histogram of the time in milliseconds that requests spent waiting in the Controller Event Queue. kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs
Event Queue Processing Time Ms A Histogram of the time in milliseconds that requests spent being processed in the Controller Event Queue. kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs
Fenced Broker Count The number of fenced brokers as observed by this Controller. kafka.controller:type=KafkaController,name=FencedBrokerCount
Active Broker Count The number of active brokers as observed by this Controller. kafka.controller:type=KafkaController,name=ActiveBrokerCount
Global Topic Count The number of global topics as observed by this Controller. kafka.controller:type=KafkaController,name=GlobalTopicCount
Global Partition Count The number of global partitions as observed by this Controller. kafka.controller:type=KafkaController,name=GlobalPartitionCount
Offline Partition Count The number of offline topic partitions (non-internal) as observed by this Controller. kafka.controller:type=KafkaController,name=OfflinePartitionCount
Preferred Replica Imbalance Count The count of topic partitions for which the leader is not the preferred leader. kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
Metadata Error Count The number of times this controller node has encountered an error during metadata log processing. kafka.controller:type=KafkaController,name=MetadataErrorCount
Last Applied Record Offset The offset of the last record from the cluster metadata partition that was applied by the Controller. kafka.controller:type=KafkaController,name=LastAppliedRecordOffset
Last Committed Record Offset The offset of the last record committed to this Controller. kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
Last Applied Record Timestamp The timestamp of the last record from the cluster metadata partition that was applied by the Controller. kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp
Last Applied Record Lag Ms The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the controller. For active Controllers the value of this lag is always zero. kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
ZooKeeper Write Behind Lag The amount of lag in records that ZooKeeper is behind relative to the highest committed record in the metadata log. This metric will only be reported by the active KRaft controller. kafka.controller:type=KafkaController,name=ZkWriteBehindLag
ZooKeeper Metadata Snapshot Write Time The number of milliseconds the KRaft controller took reconciling a snapshot into ZooKeeper. kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs
ZooKeeper Metadata Delta Write Time The number of milliseconds the KRaft controller took writing a delta into ZK. kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs
Timed-out Broker Heartbeat Count The number of broker heartbeats that timed out on this controller since the process was started. Note that only active controllers handle heartbeats, so only they will see increases in this metric. kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
Number Of Operations Started In Event Queue The total number of controller event queue operations that were started. This includes deferred operations. kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
Number of Operations Timed Out In Event Queue The total number of controller event queue operations that timed out before they could be performed. kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
Number Of New Controller Elections Counts the number of times this node has seen a new controller elected. A transition to the "no leader" state is not counted here. If the same controller as before becomes active, that still counts. kafka.controller:type=KafkaController,name=NewActiveControllersCount

KRaft Broker Monitoring Metrics

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
Last Applied Record Offset The offset of the last record from the cluster metadata partition that was applied by the broker kafka.server:type=broker-metadata-metrics,name=last-applied-record-offset
Last Applied Record Timestamp The timestamp of the last record from the cluster metadata partition that was applied by the broker. kafka.server:type=broker-metadata-metrics,name=last-applied-record-timestamp
Last Applied Record Lag Ms The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker kafka.server:type=broker-metadata-metrics,name=last-applied-record-lag-ms
Metadata Load Error Count The number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new MetadataDelta based on it. kafka.server:type=broker-metadata-metrics,name=metadata-load-error-count
Metadata Apply Error Count The number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta. kafka.server:type=broker-metadata-metrics,name=metadata-apply-error-count

Common monitoring metrics for producer/consumer/connect/streams

以下指标可用于生产者/消费者/连接器/流实例。有关具体指标,请参阅以下部分。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
connection-close-rate Connections closed per second in the window. kafka.[producer
connection-close-total Total connections closed in the window. kafka.[producer
connection-creation-rate New connections established per second in the window. kafka.[producer
connection-creation-total Total new connections established in the window. kafka.[producer
network-io-rate The average number of network operations (reads or writes) on all connections per second. kafka.[producer
network-io-total The total number of network operations (reads or writes) on all connections. kafka.[producer
outgoing-byte-rate The average number of outgoing bytes sent per second to all servers. kafka.[producer
outgoing-byte-total The total number of outgoing bytes sent to all servers. kafka.[producer
request-rate The average number of requests sent per second. kafka.[producer
request-total The total number of requests sent. kafka.[producer
request-size-avg The average size of all requests in the window. kafka.[producer
request-size-max The maximum size of any request sent in the window. kafka.[producer
incoming-byte-rate Bytes/second read off all sockets. kafka.[producer
incoming-byte-total Total bytes read off all sockets. kafka.[producer
response-rate Responses received per second. kafka.[producer
response-total Total responses received. kafka.[producer
select-rate Number of times the I/O layer checked for new I/O to perform per second. kafka.[producer
select-total Total number of times the I/O layer checked for new I/O to perform. kafka.[producer
io-wait-time-ns-avg The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. kafka.[producer
io-wait-time-ns-total The total time the I/O thread spent waiting in nanoseconds. kafka.[producer
io-waittime-total *Deprecated* The total time the I/O thread spent waiting in nanoseconds. Replacement is io-wait-time-ns-total. kafka.[producer
io-wait-ratio The fraction of time the I/O thread spent waiting. kafka.[producer
io-time-ns-avg The average length of time for I/O per select call in nanoseconds. kafka.[producer
io-time-ns-total The total time the I/O thread spent doing I/O in nanoseconds. kafka.[producer
iotime-total *Deprecated* The total time the I/O thread spent doing I/O in nanoseconds. Replacement is io-time-ns-total. kafka.[producer
io-ratio The fraction of time the I/O thread spent doing I/O. kafka.[producer
connection-count The current number of active connections. kafka.[producer
successful-authentication-rate Connections per second that were successfully authenticated using SASL or SSL. kafka.[producer
successful-authentication-total Total connections that were successfully authenticated using SASL or SSL. kafka.[producer
failed-authentication-rate Connections per second that failed authentication. kafka.[producer
failed-authentication-total Total connections that failed authentication. kafka.[producer
successful-reauthentication-rate Connections per second that were successfully re-authenticated using SASL. kafka.[producer
successful-reauthentication-total Total connections that were successfully re-authenticated using SASL. kafka.[producer
reauthentication-latency-max The maximum latency in ms observed due to re-authentication. kafka.[producer
reauthentication-latency-avg The average latency in ms observed due to re-authentication. kafka.[producer
failed-reauthentication-rate Connections per second that failed re-authentication. kafka.[producer
failed-reauthentication-total Total connections that failed re-authentication. kafka.[producer
successful-authentication-no-reauth-total Total connections that were successfully authenticated by older, pre-2.2.0 SASL clients that do not support re-authentication. May only be non-zero. kafka.[producer

Common Per-broker metrics for producer/consumer/connect/streams

以下指标可用于生产者/消费者/连接器/流实例。有关具体指标,请参阅以下部分。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
outgoing-byte-rate The average number of outgoing bytes sent per second for a node. kafka.[producer
outgoing-byte-total The total number of outgoing bytes sent for a node. kafka.[producer
request-rate The average number of requests sent per second for a node. kafka.[producer
request-total The total number of requests sent for a node. kafka.[producer
request-size-avg The average size of all requests in the window for a node. kafka.[producer
request-size-max The maximum size of any request sent in the window for a node. kafka.[producer
incoming-byte-rate The average number of bytes received per second for a node. kafka.[producer
incoming-byte-total The total number of bytes received for a node. kafka.[producer
request-latency-avg The average request latency in ms for a node. kafka.[producer
request-latency-max The maximum request latency in ms for a node. kafka.[producer
response-rate Responses received per second for a node. kafka.[producer
response-total Total responses received for a node. kafka.[producer

Producer monitoring

以下指标可用于生产者实例。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
waiting-threads The number of user threads blocked waiting for buffer memory to enqueue their records. kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-total-bytes The maximum amount of buffer memory the client can use (whether or not it is currently used). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-available-bytes The total amount of buffer memory that is not being used (either unallocated or in the free list). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-time The fraction of time an appender waits for space allocation. kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-time-total *Deprecated* The total time an appender waits for space allocation in nanoseconds. Replacement is bufferpool-wait-time-ns-total kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-time-ns-total The total time an appender waits for space allocation in nanoseconds. kafka.producer:type=producer-metrics,client-id=([-.\w]+)
flush-time-ns-total The total time the Producer spent in Producer.flush in nanoseconds. kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-init-time-ns-total The total time the Producer spent initializing transactions in nanoseconds (for EOS). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-begin-time-ns-total The total time the Producer spent in beginTransaction in nanoseconds (for EOS). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-send-offsets-time-ns-total The total time the Producer spent sending offsets to transactions in nanoseconds (for EOS). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-commit-time-ns-total The total time the Producer spent committing transactions in nanoseconds (for EOS). kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-abort-time-ns-total The total time the Producer spent aborting transactions in nanoseconds (for EOS). kafka.producer:type=producer-metrics,client-id=([-.\w]+)

Producer Sender Metrics

kafka.producer:type=producer-metrics,client-id="{client-id}"

ATTRIBUTE NAME DESCRIPTION
batch-size-avg The average number of bytes sent per partition per-request.
batch-size-max The max number of bytes sent per partition per-request.
batch-split-rate The average number of batch splits per second
batch-split-total The total number of batch splits
compression-rate-avg The average compression rate of record batches, defined as the average ratio of the compressed batch size over the uncompressed size.
metadata-age The age in seconds of the current producer metadata being used.
produce-throttle-time-avg The average time in ms a request was throttled by a broker
produce-throttle-time-max The maximum time in ms a request was throttled by a broker
record-error-rate The average per-second number of record sends that resulted in errors
record-error-total The total number of record sends that resulted in errors
record-queue-time-avg The average time in ms record batches spent in the send buffer.
record-queue-time-max The maximum time in ms record batches spent in the send buffer.
record-retry-rate The average per-second number of retried record sends
record-retry-total The total number of retried record sends
record-send-rate The average number of records sent per second.
record-send-total The total number of records sent.
record-size-avg The average record size
record-size-max The maximum record size
records-per-request-avg The average number of records per request.
request-latency-avg The average request latency in ms
request-latency-max The maximum request latency in ms
requests-in-flight The current number of in-flight requests awaiting a response.

kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"

ATTRIBUTE NAME DESCRIPTION
byte-rate The average number of bytes sent per second for a topic.
byte-total The total number of bytes sent for a topic.
compression-rate The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size over the uncompressed size.
record-error-rate The average per-second number of record sends that resulted in errors for a topic
record-error-total The total number of record sends that resulted in errors for a topic
record-retry-rate The average per-second number of retried record sends for a topic
record-retry-total The total number of retried record sends for a topic
record-send-rate The average number of records sent per second for a topic.
record-send-total The total number of records sent for a topic.

Consumer monitoring

以下指标可用于消费者实例。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
time-between-poll-avg The average delay between invocations of poll(). kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
time-between-poll-max The max delay between invocations of poll(). kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
last-poll-seconds-ago The number of seconds since the last poll() invocation. kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
poll-idle-ratio-avg The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records. kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
committed-time-ns-total The total time the Consumer spent in committed in nanoseconds. kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
commit-sync-time-ns-total The total time the Consumer spent committing offsets in nanoseconds (for AOS). kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)

Consumer Group Metrics

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
commit-latency-avg The average time taken for a commit request kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-latency-max The max time taken for a commit request kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-rate The number of commit calls per second kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-total The total number of commit calls kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
assigned-partitions The number of partitions currently assigned to this consumer kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-response-time-max The max time taken to receive a response to a heartbeat request kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-rate The average number of heartbeats per second kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-total The total number of heartbeats kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-avg The average time taken for a group rejoin kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-max The max time taken for a group rejoin kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-rate The number of group joins per second kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-total The total number of group joins kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-avg The average time taken for a group sync kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-max The max time taken for a group sync kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-rate The number of group syncs per second kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-total The total number of group syncs kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-avg The average time taken for a group rebalance kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-max The max time taken for a group rebalance kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-total The total time taken for group rebalances so far kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-total The total number of group rebalances participated kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-rate-per-hour The number of group rebalance participated per hour kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-total The total number of failed group rebalances kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-rate-per-hour The number of failed group rebalance event per hour kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-rebalance-seconds-ago The number of seconds since the last rebalance event kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-heartbeat-seconds-ago The number of seconds since the last controller heartbeat kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-avg The average time taken by the on-partitions-revoked rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-max The max time taken by the on-partitions-revoked rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-avg The average time taken by the on-partitions-assigned rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-max The max time taken by the on-partitions-assigned rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-avg The average time taken by the on-partitions-lost rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-max The max time taken by the on-partitions-lost rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)

Consumer Fetch Metrics

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

ATTRIBUTE NAME DESCRIPTION
bytes-consumed-rate The average number of bytes consumed per second
bytes-consumed-total The total number of bytes consumed
fetch-latency-avg The average time taken for a fetch request.
fetch-latency-max The max time taken for any fetch request.
fetch-rate The number of fetch requests per second.
fetch-size-avg The average number of bytes fetched per request
fetch-size-max The maximum number of bytes fetched per request
fetch-throttle-time-avg The average throttle time in ms
fetch-throttle-time-max The maximum throttle time in ms
fetch-total The total number of fetch requests.
records-consumed-rate The average number of records consumed per second
records-consumed-total The total number of records consumed
records-lag-max The maximum lag in terms of number of records for any partition in this window. NOTE: This is based on current offset and not committed offset
records-lead-min The minimum lead in terms of number of records for any partition in this window
records-per-request-avg The average number of records in each request

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"

ATTRIBUTE NAME DESCRIPTION
bytes-consumed-rate The average number of bytes consumed per second for a topic
bytes-consumed-total The total number of bytes consumed for a topic
fetch-size-avg The average number of bytes fetched per request for a topic
fetch-size-max The maximum number of bytes fetched per request for a topic
records-consumed-rate The average number of records consumed per second for a topic
records-consumed-total The total number of records consumed for a topic
records-per-request-avg The average number of records in each request for a topic

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

ATTRIBUTE NAME DESCRIPTION
preferred-read-replica The current read replica for the partition, or -1 if reading from leader
records-lag The latest lag of the partition
records-lag-avg The average lag of the partition
records-lag-max The max lag of the partition
records-lead The latest lead of the partition
records-lead-avg The average lead of the partition
records-lead-min The min lead of the partition

Connect Monitoring

Connect 工作进程包含所有生产者和消费者指标以及特定于 Connect 的指标。工作进程本身有许多指标,而每个连接器和任务都有其他指标。[2023-05-22 16:22:33,884] INFO Metrics 调度程序关闭(org.apache.kafka.common.metrics.Metrics:693) [2023-05-22 16:22:33,886] INFO Metrics 记者关闭(org.apache.kafka.common.metrics.Metrics:693) apache.kafka.common.metrics.Metrics:703)

kafka.connect:type=connect-worker-metrics

ATTRIBUTE NAME DESCRIPTION
connector-count The number of connectors run in this worker.
connector-startup-attempts-total The total number of connector startups that this worker has attempted.
connector-startup-failure-percentage The average percentage of this worker's connectors starts that failed.
connector-startup-failure-total The total number of connector starts that failed.
connector-startup-success-percentage The average percentage of this worker's connectors starts that succeeded.
connector-startup-success-total The total number of connector starts that succeeded.
task-count The number of tasks run in this worker.
task-startup-attempts-total The total number of task startups that this worker has attempted.
task-startup-failure-percentage The average percentage of this worker's tasks starts that failed.
task-startup-failure-total The total number of task starts that failed.
task-startup-success-percentage The average percentage of this worker's tasks starts that succeeded.
task-startup-success-total The total number of task starts that succeeded.

kafka.connect:type=connect-worker-metrics,connector="{connector}"

ATTRIBUTE NAME DESCRIPTION
connector-destroyed-task-count The number of destroyed tasks of the connector on the worker.
connector-failed-task-count The number of failed tasks of the connector on the worker.
connector-paused-task-count The number of paused tasks of the connector on the worker.
connector-restarting-task-count The number of restarting tasks of the connector on the worker.
connector-running-task-count The number of running tasks of the connector on the worker.
connector-total-task-count The number of tasks of the connector on the worker.
connector-unassigned-task-count The number of unassigned tasks of the connector on the worker.

kafka.connect:type=connect-worker-rebalance-metrics

ATTRIBUTE NAME DESCRIPTION
completed-rebalances-total The total number of rebalances completed by this worker.
connect-protocol The Connect protocol used by this cluster
epoch The epoch or generation number of this worker.
leader-name The name of the group leader.
rebalance-avg-time-ms The average time in milliseconds spent by this worker to rebalance.
rebalance-max-time-ms The maximum time in milliseconds spent by this worker to rebalance.
rebalancing Whether this worker is currently rebalancing.
time-since-last-rebalance-ms The time in milliseconds since this worker completed the most recent rebalance.

kafka.connect:type=connector-metrics,connector="{connector}"

ATTRIBUTE NAME DESCRIPTION
connector-class The name of the connector class.
connector-type The type of the connector. One of 'source' or 'sink'.
connector-version The version of the connector class, as reported by the connector.
status The status of the connector. One of 'unassigned', 'running', 'paused', 'stopped', 'failed', or 'restarting'.

kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"

ATTRIBUTE NAME DESCRIPTION
batch-size-avg The average number of records in the batches the task has processed so far.
batch-size-max The number of records in the largest batch the task has processed so far.
offset-commit-avg-time-ms The average time in milliseconds taken by this task to commit offsets.
offset-commit-failure-percentage The average percentage of this task's offset commit attempts that failed.
offset-commit-max-time-ms The maximum time in milliseconds taken by this task to commit offsets.
offset-commit-success-percentage The average percentage of this task's offset commit attempts that succeeded.
pause-ratio The fraction of time this task has spent in the pause state.
running-ratio The fraction of time this task has spent in the running state.
status The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'restarting'.

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

ATTRIBUTE NAME DESCRIPTION
offset-commit-completion-rate The average per-second number of offset commit completions that were completed successfully.
offset-commit-completion-total The total number of offset commit completions that were completed successfully.
offset-commit-seq-no The current sequence number for offset commits.
offset-commit-skip-rate The average per-second number of offset commit completions that were received too late and skipped/ignored.
offset-commit-skip-total The total number of offset commit completions that were received too late and skipped/ignored.
partition-count The number of topic partitions assigned to this task belonging to the named sink connector in this worker.
put-batch-avg-time-ms The average time taken by this task to put a batch of sinks records.
put-batch-max-time-ms The maximum time taken by this task to put a batch of sinks records.
sink-record-active-count The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-active-count-avg The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-active-count-max The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-lag-max The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions.
sink-record-read-rate The average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied.
sink-record-read-total The total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted.
sink-record-send-rate The average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.
sink-record-send-total The total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted.

kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

ATTRIBUTE NAME DESCRIPTION
poll-batch-avg-time-ms The average time in milliseconds taken by this task to poll for a batch of source records.
poll-batch-max-time-ms The maximum time in milliseconds taken by this task to poll for a batch of source records.
source-record-active-count The number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-avg The average number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-max The maximum number of records that have been produced by this task but not yet completely written to Kafka.
source-record-poll-rate The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.
source-record-poll-total The total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.
source-record-write-rate The average per-second number of records written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations.
source-record-write-total The number of records output written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations.
transaction-size-avg The average number of records in the transactions the task has committed so far.
transaction-size-max The number of records in the largest transaction the task has committed so far.
transaction-size-min The number of records in the smallest transaction the task has committed so far.

kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"

ATTRIBUTE NAME DESCRIPTION
deadletterqueue-produce-failures The number of failed writes to the dead letter queue.
deadletterqueue-produce-requests The number of attempted writes to the dead letter queue.
last-error-timestamp The epoch timestamp when this task last encountered an error.
total-errors-logged The number of errors that were logged.
total-record-errors The number of record processing errors in this task.
total-record-failures The number of record processing failures in this task.
total-records-skipped The number of records skipped due to errors.
total-retries The number of operations retried.

Streams Monitoring

Kafka Streams 实例包含所有生产者和消费者指标以及特定于 Streams 的其他指标。这些指标具有三个记录级别:infodebugtrace

请注意,指标有 4 层层次结构。在顶层,每个启动的 Kafka Streams 客户端都有客户端级指标。每个客户端都有流线程,有自己的指标。每个流线程都有任务,有自己的指标。每个任务都有多个处理器节点,并具有自己的指标。每个任务还​​有许多状态存储和记录缓存,它们都有自己的指标。

使用以下配置选项指定您要收集的指标:

metrics.recording.level="info"

Client Metrics

以下所有指标的记录级别均为info

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
version The version of the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-id The version control commit ID of the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
application-id The application ID of the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
topology-description The description of the topology executed in the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
state The state of the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
failed-stream-threads The number of failed stream threads since the start of the Kafka Streams client. kafka.streams:type=stream-metrics,client-id=([-.\w]+)

Thread Metrics

以下所有指标的记录级别均为info

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
commit-latency-avg The average execution time in ms, for committing, across all running tasks of this thread. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-latency-max The maximum execution time in ms, for committing, across all running tasks of this thread. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-avg The average execution time in ms, for consumer polling. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-max The maximum execution time in ms, for consumer polling. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-avg The average execution time in ms, for processing. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-max The maximum execution time in ms, for processing. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-avg The average execution time in ms, for punctuating. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-max The maximum execution time in ms, for punctuating. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-rate The average number of commits per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-total The total number of commit calls. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-rate The average number of consumer poll calls per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-total The total number of consumer poll calls. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-rate The average number of processed records per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-total The total number of processed records. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-rate The average number of punctuate calls per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-total The total number of punctuate calls. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-rate The average number of tasks created per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-total The total number of tasks created. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-rate The average number of tasks closed per second. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-total The total number of tasks closed. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
blocked-time-ns-total The total time the thread spent blocked on kafka. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
thread-start-time The time that the thread was started. kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)

Task Metrics

以下所有指标的记录级别均为debug,但 drop-records-* 和 active-process-ratio 指标的记录级别除外info

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
process-latency-avg The average execution time in ns, for processing. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-latency-max The maximum execution time in ns, for processing. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-rate The average number of processed records per second across all source processor nodes of this task. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-total The total number of processed records across all source processor nodes of this task. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-avg The average execution time in ns, for committing. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-max The maximum execution time in ns, for committing. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-rate The average number of commit calls per second. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
commit-total The total number of commit calls. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-avg The average observed lateness of records (stream time - record timestamp). kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-max The max observed lateness of records (stream time - record timestamp). kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-rate The average number of enforced processings per second. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-total The total number enforced processings. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-rate The average number of records dropped within this task. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-total The total number of records dropped within this task. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
active-process-ratio The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)

Processor Node Metrics

以下指标仅在某些类型的节点上可用,即 process-* 指标仅适用于源处理器节点,suppression-emit-* 指标仅适用于抑制操作节点,而 record-e2e-latency- * 指标仅适用于源处理器节点和终端节点(没有后继节点的节点)。所有指标的记录级别均为debug,但 record-e2e-latency-* 指标的记录级别除外info

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
bytes-consumed-total The total number of bytes consumed by a source processor node. kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
bytes-produced-total The total number of bytes produced by a sink processor node. kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
process-rate The average number of records processed by a source processor node per second. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-total The total number of records processed by a source processor node per second. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-rate The rate at which records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-total The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-max The maximum end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-min The minimum end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
records-consumed-total The total number of records consumed by a source processor node. kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
records-produced-total The total number of records produced by a sink processor node. kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)

State Store Metrics

以下所有指标的记录级别均为debug,但 record-e2e-latency-* 指标的记录级别除外trace。请注意,该store-scope值是在StoreSupplier#metricsScope()用户自定义状态存储中指定的;对于内置状态存储,目前我们有:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression(用于抑制缓冲液)
  • rocksdb-state(对于 RocksDB 支持的键值存储)
  • rocksdb-window-state(对于 RocksDB 支持的橱窗商店)
  • rocksdb-session-state(对于 RocksDB 支持的会话存储)

指标suppression-buffer-size-avg、suppression-buffer-size-max、suppression-buffer-count-avg和suppression-buffer-count-max仅适用于抑制缓冲区。所有其他指标均不可用于抑制缓冲区。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
put-latency-avg The average put execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-latency-max The maximum put execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-avg The average put-if-absent execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-max The maximum put-if-absent execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-avg The average get execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-max The maximum get execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-avg The average delete execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-max The maximum delete execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-avg The average put-all execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-max The maximum put-all execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-avg The average all operation execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-max The maximum all operation execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-avg The average range execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-max The maximum range execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-avg The average flush execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-max The maximum flush execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-avg The average restore execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-max The maximum restore execution time in ns. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-rate The average put rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-rate The average put-if-absent rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-rate The average get rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-rate The average delete rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-rate The average put-all rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-rate The average all operation rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-rate The average range rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-rate The average flush rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-rate The average restore rate for this store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
suppression-buffer-size-avg The average total size, in bytes, of the buffered data over the sampling window. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-size-max The maximum total size, in bytes, of the buffered data over the sampling window. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-avg The average number of records buffered over the sampling window. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-max The maximum number of records buffered over the sampling window. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
record-e2e-latency-max The maximum end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
record-e2e-latency-min The minimum end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

RocksDB Metrics

RocksDB 指标分为基于统计的指标和基于属性的指标。前者是从 RocksDB 状态存储收集的统计信息中记录的,而后者是从 RocksDB 公开的属性中记录的。RocksDB 收集的统计数据提供了一段时间内的累积测量值,例如写入状态存储的字节数。RocksDB 公开的属性提供当前测量值,例如当前使用的内存量。请注意,store-scope内置 RocksDB 状态存储当前如下:

  • rocksdb-state(对于 RocksDB 支持的键值存储)
  • rocksdb-window-state(对于 RocksDB 支持的橱窗商店)
  • rocksdb-session-state(对于 RocksDB 支持的会话存储)

RocksDB 基于统计的指标: 以下所有基于统计的指标的记录级别都是 ,debug因为在 RocksDB 中收集统计数据可能会对性能产生影响。每分钟从 RocksDB 状态存储收集基于统计数据的指标。如果状态存储由多个 RocksDB 实例组成(如 WindowStores 和 SessionStores 的情况),则每个指标都会报告状态存储的 RocksDB 实例的聚合。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
bytes-written-rate The average number of bytes written per second to the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-total The total number of bytes written to the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-rate The average number of bytes read per second from the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-total The total number of bytes read from the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-rate The average number of bytes flushed per second from the memtable to disk. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-total The total number of bytes flushed from the memtable to disk. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-hit-ratio The ratio of memtable hits relative to all lookups to the memtable. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-avg The average duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-min The minimum duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-max The maximum duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-data-hit-ratio The ratio of block cache hits for data blocks relative to all lookups for data blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-index-hit-ratio The ratio of block cache hits for index blocks relative to all lookups for index blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-filter-hit-ratio The ratio of block cache hits for filter blocks relative to all lookups for filter blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-avg The average duration of write stalls in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-total The total duration of write stalls in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-compaction-rate The average number of bytes read per second during compaction. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-compaction-rate The average number of bytes written per second during compaction. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-avg The average duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-min The minimum duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-max The maximum duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-open-files The number of current open files. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-file-errors-total The total number of file errors occurred. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

RocksDB 基于属性的指标: 以下所有基于属性的指标的记录级别均为 ,info并在访问指标时进行记录。如果状态存储由多个 RocksDB 实例组成,如 WindowStores 和 SessionStores 的情况,则每个指标报告状态存储的所有 RocksDB 实例的总和,块缓存指标除外 block-cache-*。如果每个实例都使用自己的块缓存,则块缓存指标报告所有 RocksDB 实例的总和;如果在所有实例之间共享单个块缓存,则它们仅报告一个实例的记录值。

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
num-immutable-mem-table The number of immutable memtables that have not yet been flushed. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
cur-size-active-mem-table The approximate size of the active memtable in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
cur-size-all-mem-tables The approximate size of active and unflushed immutable memtables in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
size-all-mem-tables The approximate size of active, unflushed immutable, and pinned immutable memtables in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-entries-active-mem-table The number of entries in the active memtable. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-entries-imm-mem-tables The number of entries in the unflushed immutable memtables. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-deletes-active-mem-table The number of delete entries in the active memtable. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-deletes-imm-mem-tables The number of delete entries in the unflushed immutable memtables. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
mem-table-flush-pending This metric reports 1 if a memtable flush is pending, otherwise it reports 0. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-running-flushes The number of currently running flushes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-pending This metric reports 1 if at least one compaction is pending, otherwise it reports 0. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-running-compactions The number of currently running compactions. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-pending-compaction-bytes The estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size (only valid for level compaction). kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
total-sst-files-size The total size in bytes of all SST files. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
live-sst-files-size The total size in bytes of all SST files that belong to the latest LSM tree. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-live-versions Number of live versions of the LSM tree. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-capacity The capacity of the block cache in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-usage The memory size of the entries residing in block cache in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-pinned-usage The memory size for the entries being pinned in the block cache in bytes. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-num-keys The estimated number of keys in the active and unflushed immutable memtables and storage. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-table-readers-mem The estimated memory in bytes used for reading SST tables, excluding memory used in block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
background-errors The total number of background errors. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

Record Cache Metrics

以下所有指标的记录级别均为debug

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
hit-ratio-avg The average cache hit ratio defined as the ratio of cache read hits over the total cache read requests. kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-min The minimum cache hit ratio. kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-max The maximum cache hit ratio. kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)

Others

我们建议监控 GC 时间和其他统计信息以及各种服务器统计信息,例如 CPU 利用率、I/O 服务时间等。在客户端,我们建议监控消息/字节率(全局和每个主题)、请求率/大小/时间,在消费者方面,所有分区之间消息的最大延迟和最小获取请求率。为了让消费者跟上,最大延迟需要小于阈值,最小获取率需要大于 0。

6.9 动物园管理员

稳定版

当前的稳定分支是 3.5。Kafka 会定期更新以包含 3.5 系列中的最新版本。

ZooKeeper 弃用

随着 Apache Kafka 3.5 的发布,Zookeeper 现已被标记为已弃用。计划在 Apache Kafka 的下一个主要版本(版本 4.0)中删除 ZooKeeper,该版本计划最早于 2024 年 4 月进行。在弃用阶段,仍然支持 ZooKeeper 进行 Kafka 集群的元数据管理,但不建议这样做用于新的部署。KRaft 中仍有一小部分功能有待实现,请参阅当前缺失的功能以获取更多信息。

移民

将现有的基于 ZooKeeper 的 Kafka 集群迁移到 KRaft 目前处于预览阶段,我们预计它可以在 3.6 版本中投入生产使用。建议用户开始计划迁移到 KRaft,并开始测试以提供反馈。有关如何执行从ZooKeeper 到 KRaft实时迁移以及当前限制的详细信息,请参阅ZooKeeper 到 KRaft 迁移。

3.x 和 ZooKeeper 支持

支持 ZooKeeper 模式的最终 3.x 小版本将在发布后 12 个月内获得关键错误修复和安全修复。

ZooKeeper 和 KRaft 时间轴

有关 ZooKeeper 删除和计划的 KRaft 功能发布的暂定时间表的详细信息和更新,请参阅KIP-833

运行 ZooKeeper

在操作上,我们为健康的 ZooKeeper 安装执行以下操作:

  • 物理/硬件/网络布局中的冗余:尽量不要将它们全部放在同一个机架中,体面(但不要疯狂)的硬件,尽量保留冗余电源和网络路径等。典型的 ZooKeeper 整体有 5 个或7 台服务器,分别允许 2 台和 3 台服务器宕机。如果您的部署规模较小,那么使用 3 台服务器是可以接受的,但请记住,在这种情况下您只能容忍 1 台服务器停机。
  • I/O 隔离:如果您执行大量写入类型流量,您几乎肯定希望事务日志位于专用磁盘组上。对事务日志的写入是同步的(但为了性能而进行批处理),因此并发写入会显着影响性能。ZooKeeper 快照就是这样一种并发写入源,理想情况下应该写入与事务日志分开的磁盘组上。快照异步写入磁盘,因此通常可以与操作系统和消息日志文件共享。您可以通过 dataLogDir 参数将服务器配置为使用单独的磁盘组。
  • 应用程序隔离:除非您真正了解要安装在同一机器上的其他应用程序的应用程序模式,否则最好单独运行 ZooKeeper(尽管这可能是与硬件功能的平衡行为)。
  • 谨慎使用虚拟化:它可以工作,具体取决于您的集群布局、读/写模式和 SLA,但是虚拟化层引入的微小开销可能会累积起来并导致 ZooKeeper 失效,因为它可能对时间非常敏感
  • ZooKeeper 配置:它是 java,请确保给它“足够”的堆空间(我们通常使用 3-5G 运行它们,但这主要是由于我们这里的数据集大小)。不幸的是,我们没有一个好的公式,但请记住,允许更多的 ZooKeeper 状态意味着快照可能会变得很大,而大快照会影响恢复时间。事实上,如果快照变得太大(几 GB),那么您可能需要增加 initLimit 参数,以便为服务器提供足够的时间来恢复并加入集合。
  • 监控:JMX 和 4 字母单词 (4lw) 命令都非常有用,它们在某些情况下确实重叠(在这些情况下,我们更喜欢 4 字母命令,它们看起来更可预测,或者至少,它们与LI 监控基础设施)
  • 不要过度构建集群:大型集群,尤其是在写入大量使用模式中,意味着大量的集群内通信(写入和后续集群成员更新的仲裁),但不要构建不足(并有淹没集群的风险)。拥有更多服务器会增加您的读取能力。

总体而言,我们尝试使 ZooKeeper 系统尽可能小,以处理负载(加上标准增长容量规划)并尽可能简单。与官方版本相比,我们尽量不对配置或应用程序布局做任何花哨的事情,并尽可能保持其独立性。由于这些原因,我们倾向于跳过操作系统打包版本,因为它倾向于尝试将内容放入操作系统标准层次结构中,这可能会“混乱”,因为缺乏更好的措辞方式。

6.10 KRaft

配置

流程角色

在 KRaft 模式下,每个 Kafka 服务器都可以使用该属性配置为控制器、代理或两者process.roles。该属性可以具有以下值:

  • 如果process.roles设置为broker,则服务器充当代理。
  • 如果process.roles设置为controller,则服务器充当控制器。
  • 如果process.roles设置为broker,controller,则服务器既充当代理又充当控制器。
  • 如果process.roles根本没有设置,则假定处于 ZooKeeper 模式。

既充当代理又充当控制器的 Kafka 服务器被称为“组合”服务器。对于开发环境等小型用例,组合服务器更易于操作。主要缺点是控制器与系统其他部分的隔离程度较低。例如,无法在组合模式下与代理分开滚动或扩展控制器。在关键部署环境中不建议使用组合模式。

控制器

在KRaft模式下,选择特定的Kafka服务器作为控制器(与基于ZooKeeper的模式不同,在该模式中任何服务器都可以成为控制器)。被选为控制器的服务器将参与元数据仲裁。每个控制器都是当前活动控制器的活动控制器或热备用控制器。

Kafka 管理员通常会选择 3 或 5 个服务器来担任此角色,具体取决于成本和系统在不影响可用性的情况下应承受的并发故障数量等因素。大多数控制器必须处于活动状态才能保持可用性。3个控制器时,集群可以容忍1个控制器故障;如果有 5 个控制器,集群可以容忍 2 个控制器故障。

Kafka 集群中的所有服务器都使用该controller.quorum.voters属性发现仲裁投票者。这标识了应使用的仲裁控制器服务器。必须枚举所有控制器。每个控制器均通过其idhostport信息进行标识。例如:

controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3

如果一个Kafka集群有3个控制器,分别名为controller1、controller2和controller3,那么controller1可能有以下配置:


process.roles=controller
node.id=1
listeners=CONTROLLER://controller1.example.com:9093
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093

每个broker和控制器都必须设置该controller.quorum.voters属性。属性中提供的节点 IDcontroller.quorum.voters必须与控制器服务器上的相应 ID 匹配。例如,在controller1上,node.id必须设置为1,依此类推。每个节点 ID 在特定集群中的所有服务器中必须是唯一的。任何两个服务器都不能具有相同的节点 ID,无论其process.roles值如何。

存储工具

kafka-storage.sh random-uuid命令可用于为新集群生成集群 ID。使用该命令格式化集群中的每个服务器时,必须使用此集群 ID kafka-storage.sh format

这与Kafka过去的运作方式不同。此前,Kafka会自动格式化空白存储目录,并自动生成新的集群ID。进行更改的原因之一是自动格式化有时会掩盖错误情况。这对于控制器和代理服务器维护的元数据日志尤其重要。如果大多数控制器能够以空日志目录启动,则可能会在丢失已提交数据的情况下选举领导者。

调试

元数据仲裁工具

kafka-metadata-quorum工具可用于描述集群元数据分区的运行时状态。例如,以下命令显示元数据仲裁的摘要:

  > bin/kafka-metadata-quorum.sh --bootstrap-server  broker_host:port describe --status
ClusterId:              fMCL8kv1SWm87L_Md-I2hg
LeaderId:               3002
LeaderEpoch:            2
HighWatermark:          10
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [3000,3001,3002]
CurrentObservers:       [0,1,2]

转储日志工具

kafka-dump-log工具可用于调试集群元数据目录的日志段和快照。该工具将扫描提供的文件并解码元数据记录。例如,此命令解码并打印第一个日志段中的记录:

  > bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000000.log

此命令解码并打印集群元数据快照中的记录:

  > bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000100-0000000001.checkpoint

元数据外壳

kafka-metadata-shell工具可用于交互式检查集群元数据分区的状态:


  > bin/kafka-metadata-shell.sh  --snapshot metadata_log_dir/__cluster_metadata-0/00000000000000000000.log
>> ls /
brokers  local  metadataQuorum  topicIds  topics
>> ls /topics
foo
>> cat /topics/foo/0/data
{
  "partitionId" : 0,
  "topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
  "replicas" : [ 1 ],
  "isr" : [ 1 ],
  "removingReplicas" : null,
  "addingReplicas" : null,
  "leader" : 1,
  "leaderEpoch" : 0,
  "partitionEpoch" : 0
}
>> exit

部署注意事项

  • Kafka 服务器process.role应设置为其中之一brokercontroller但不能同时设置两者。组合模式可以在开发环境中使用,但在关键部署环境中应避免使用。
  • 为了实现冗余,Kafka 集群应使用 3 个控制器。在关键环境中不建议使用超过 3 个控制器。在极少数情况下,出现部分网络故障时,集群元数据仲裁可能会变得不可用。此限制将在 Kafka 的未来版本中得到解决。
  • Kafka 控制器将集群的所有元数据存储在内存和磁盘上。我们认为,对于典型的 Kafka 集群,5GB 主内存和元数据日志管理器上的 5GB 磁盘空间就足够了。

缺少的功能

KRaft 模式中未完全实现以下功能:

  • 支持具有多个存储目录的JBOD配置
  • 修改独立 KRaft 控制器上的某些动态配置
  • 委托代币

ZooKeeper 到 KRaft 迁移

ZooKeeper 到 KRaft 的迁移被视为早期访问功能,不建议用于生产集群。

ZK 到 KRaft 的迁移尚不支持以下功能:

请使用 项目 JIRA和“kraft”组件报告 ZooKeeper 到 KRaft 迁移的问题。

术语

我们这里使用术语“迁移”来指将Kafka集群的元数据系统从ZooKeeper更改为KRaft并迁移现有元数据的过程。“升级”是指安装更新版本的 Kafka。不建议在执行元数据迁移的同时升级软件。

我们还使用术语“ZK 模式”来指代使用 ZooKeeper 作为元数据系统的 Kafka 代理。“KRaft 模式”是指使用 KRaft 控制器仲裁作为其元数据系统的 Kafka 代理。

准备迁移

在开始迁移之前,Kafka 代理必须升级到软件版本 3.5.0,并将“inter.broker.protocol.version”配置设置为“3.5”。有关升级说明,请参阅升级到 3.5.0

建议在迁移处于活动状态时为迁移组件启用 TRACE 级别日志记录。这可以通过将以下 log4j 配置添加到每个 KRaft 控制器的“log4j.properties”文件来完成。

log4j.logger.org.apache.kafka.metadata.migration=TRACE

在迁移期间在 KRaft 控制器和 ZK 代理上启用 DEBUG 日志记录通常很有用。

配置 KRaft 控制器仲裁

在开始迁移之前需要做两件事。首先,必须配置代理以支持迁移,其次,必须部署 KRaft 控制器仲裁。KRaft 控制器应配置与现有 Kafka 集群相同的集群 ID。这可以通过检查代理数据目录中的“meta.properties”文件之一或运行以下命令来找到。

./bin/zookeeper-shell.sh localhost:2181 get /cluster/id

KRaft 控制器仲裁还应配置最新metadata.version的“3.4”。有关 KRaft 部署的更多说明,请参阅上述文档

除了标准的 KRaft 配置之外,KRaft 控制器还需要启用迁移支持并提供 ZooKeeper 连接配置。

以下是准备迁移的 KRaft 控制器的示例配置:

# Sample KRaft cluster controller.properties listening on 9093
process.roles=controller
node.id=3000
controller.quorum.voters=3000@localhost:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093

# Enable the migration
zookeeper.metadata.migration.enable=true

# ZooKeeper client configuration
zookeeper.connect=localhost:2181

# Other configs ...

注意:KRaft 集群node.id值必须与任何现有的 ZK 代理不同broker.id。在 KRaft 模式中,代理和控制器共享相同的 Node ID 命名空间。

在代理上启用迁移

一旦 KRaft 控制器仲裁启动,代理将需要重新配置并重新启动。代理可以以滚动方式重新启动,以避免影响集群可用性。每个代理都需要以下配置才能与 KRaft 控制器通信并启用迁移。

以下是准备迁移的代理的示例配置:

# Sample ZK broker server.properties listening on 9092
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT

# Set the IBP
inter.broker.protocol.version=3.5

# Enable the migration
zookeeper.metadata.migration.enable=true

# ZooKeeper client configuration
zookeeper.connect=localhost:2181

# KRaft controller quorum configuration
controller.quorum.voters=3000@localhost:9093
controller.listener.names=CONTROLLER

注意:使用必要的配置重新启动最终的 ZK 代理后,迁移将自动开始。 迁移完成后,在主控上可以观察到一条INFO级别的日志:

Completed migration of metadata from Zookeeper to KRaft

将代理迁移到 KRaft

一旦 KRaft 控制器完成元数据迁移,代理仍将以 ZK 模式运行。当 KRaft 控制器处于迁移模式时,它将继续向 ZK 模式代理发送控制器 RPC。这包括 UpdateMetadata 和 LeaderAndIsr 等 RPC。

要将代理迁移到 KRaft,只需将它们重新配置为 KRaft 代理并重新启动即可。以上面的代理配置为例,我们将替换broker.idnode.id并添加 process.roles=broker。代理在重新启动时保持相同的代理/节点 ID 非常重要。此时应该删除 Zookeeper 配置。

# Sample KRaft broker server.properties listening on 9092
process.roles=broker
node.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT

# Don't set the IBP, KRaft uses "metadata.version" feature flag
# inter.broker.protocol.version=3.5

# Remove the migration enabled flag
# zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=localhost:2181

# Keep the KRaft controller quorum configuration
controller.quorum.voters=3000@localhost:9093
controller.listener.names=CONTROLLER

每个代理都会使用 KRaft 配置重新启动,直到整个集群在 KRaft 模式下运行。

完成迁移

在 KRaft 模式下重新启动所有代理后,完成迁移的最后一步是使 KRaft 控制器退出迁移模式。这是通过从每个配置中删除“zookeeper.metadata.migration.enable”属性并一次重新启动它们来完成的。

# Sample KRaft cluster controller.properties listening on 9093
process.roles=controller
node.id=3000
controller.quorum.voters=3000@localhost:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093

# Disable the migration
# zookeeper.metadata.migration.enable=true

# Remove ZooKeeper client configuration
# zookeeper.connect=localhost:2181

# Other configs ...

6.11分层存储

分层存储概述

Kafka数据主要以流媒体方式使用尾部读取来消费。Tail读取利用操作系统的页面缓存来服务数据,而不是磁盘读取。旧数据通常从磁盘读取以进行回填或故障恢复,并且不常见。

在分层存储方法中,Kafka集群配置了两层存储——本地存储和远程。本地层与当前使用Kafka Broker 上的本地磁盘存储日志段的Kafka相同。新的远程层使用外部存储系统,如HDFS或S3,来存储已完成的日志段。有关更多信息,请查看KIP-405

注意:分层存储被视为早期访问功能,不建议在生产环境中使用

配置

Broker 配置

默认情况下,Kafka服务器不会启用分层存储功能。remote.log.storage.system.enable是控制是否在 Broker 中启用分层存储功能的属性。将其设置为“true”启用此功能。

RemoteStorageManager是一个提供远程日志段和索引生命周期的接口。Kafka服务器不提供RemoteStorageManager的开箱即用实现。配置remote.log.storage.manager.class.nameremote.log.storage.manager.class.path,以指定RemoteStorageManager的实现。

RemoteLogMetadataManager是一个接口,用于提供具有强烈一致的语义的远程日志段元数据生命周期。默认情况下,Kafka提供了一个以存储为内部主题的实现。可以通过配置remote.log.metadata.manager.class.nameremote.log.metadata.manager.class.path来更改此实现。当采用默认的基于kafka内部主题的实现时,remote.log.metadata.manager.listener.name是一个强制性属性,用于指定默认的RemoteLogMetadataManager实现创建的客户端。

主题配置

在正确配置分层存储功能的代理端配置后,仍然需要设置主题级别的配置。remote.storage.enable是确定主题是否想要使用分层存储的开关。默认情况下,它被设置为false。启用remote.storage.enable属性后,接下来要考虑的是日志保留。当为主题启用分层存储时,需要设置2个额外的日志保留配置:

  • local.retention.ms
  • retention.ms
  • local.retention.bytes
  • retention.bytes

The configuration prefixed with local are to specify the time/size the "local" log file can accept before moving to remote storage, and then get deleted. If unset, The value in retention.ms and retention.bytes will be used.

快速入门示例

Apache Kafka不提供开箱即用的RemoteStorageManager实现。要预览分层存储功能,可以使用为集成测试而实施的LocalTieredStorage,这将在本地存储中创建一个临时目录来模拟远程存储。

要采用“LocalTieredStorage”,需要在本地构建测试库

# please checkout to the specific version tag you're using before building it
# ex: `git checkout 3.6.0`
./gradlew clean :storage:testJar

构建成功后,在 storage/build/libs 下应该有一个 kafka-storage-x.x.x-test.jar 文件。接下来,在代理端设置配置以启用分层存储功能。

# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092
remote.log.storage.system.enable=true

# Setting the listener for the clients in RemoteLogMetadataManager to talk to the brokers.
remote.log.metadata.manager.listener.name=PLAINTEXT

# Please provide the implementation info for remoteStorageManager.
# This is the mandatory configuration for tiered storage.
# Here, we use the `LocalTieredStorage` built above.
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
remote.log.storage.manager.class.path=/PATH/TO/kafka-storage-x.x.x-test.jar

# These 2 prefix are default values, but customizable
remote.log.storage.manager.impl.prefix=rsm.config.
remote.log.metadata.manager.impl.prefix=rlmm.config.

# Configure the directory used for `LocalTieredStorage`
# Note, please make sure the brokers need to have access to this directory
rsm.config.dir=/tmp/kafka-remote-storage

# This needs to be changed if number of brokers in the cluster is more than 1
rlmm.config.remote.log.metadata.topic.replication.factor=1

# Try to speed up the log retention check interval for testing
log.retention.check.interval.ms=1000

按照快速入门指南启动卡夫卡环境。然后,创建一个具有配置启用分层存储的主题:

# remote.storage.enable=true -> enables tiered storage on the topic
# local.retention.ms=1000 -> The number of milliseconds to keep the local log segment before it gets deleted.
  Note that a local log segment is eligible for deletion only after it gets uploaded to remote.
# retention.ms=3600000 -> when segments exceed this time, the segments in remote storage will be deleted
# segment.bytes=1048576 -> for test only, to speed up the log segment rolling interval
# file.delete.delay.ms=10000 -> for test only, to speed up the local-log segment file delete delay

bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:9092 \
--config remote.storage.enable=true --config local.retention.ms=1000 --config retention.ms=3600000 \
--config segment.bytes=1048576 --config file.delete.delay.ms=1000

尝试向“分层主题”主题发送消息以滚动日志段:

bin/kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092

然后,滚动活动段后,旧段应移动到远程存储并删除。这可以通过检查上面配置的远程日志目录来验证。例如:

 > ls /tmp/kafka-remote-storage/kafka-tiered-storage/tieredTopic-0-jF8s79t9SrG_PNqlwv7bAA
00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.index
00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.snapshot
00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.leader_epoch_checkpoint
00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.timeindex
00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.log

最后,我们可以尝试从头开始消耗一些数据并打印偏移数,以确保它能成功从远程存储中获取偏移0。

bin/kafka-console-consumer.sh --topic tieredTopic --from-beginning --max-messages 1 --bootstrap-server localhost:9092 --property print.offset=true

请注意,如果您想在集群级别禁用分层存储,您应该明确删除启用分层存储的主题。尝试禁用集群级别的分层存储而不删除使用分层存储的主题,将导致启动期间的异常。

bin/kafka-topics.sh --delete --topic tieredTopic --bootstrap-server localhost:9092

删除主题后,您可以在代理配置中安全地设置 remote.log.storage.system.enable=false

限制

虽然分层存储的早期访问版本提供了尝试此新功能的机会,但重要的是要意识到以下限制:

  • 不支持具有多个日志目录的集群(即JBOD功能)
  • 不支持压缩主题
  • 无法禁用主题级别的分层存储
  • 在代理级别禁用分层存储之前,需要删除启用分层存储的主题
  • 与分层存储功能相关的管理操作仅在3.0版本起的客户端上受支持

有关更多信息,请查看分层存储早期访问发布说明



回到顶部