跳转至

1. 入门

1.1 简介

什么是事件流?

事件流相当于人体中枢神经系统的数字化。它是“永远在线”世界的技术基础,在这个世界中,企业越来越多地由软件定义和自动化,软件的用户更多地是软件。

从技术上讲,事件流是以事件流的形式从数据库、传感器、移动设备、云服务和软件应用程序等事件源实时捕获数据的实践;持久存储这些事件流以供以后检索;实时和回顾性地操作、处理事件流并对其做出反应;并根据需要将事件流路由到不同的目标技术。因此,事件流可确保数据的连续流动和解释,以便正确的信息在正确的时间出现在正确的地点。

我可以使用事件流做什么?

事件流适用于 众多行业和组织的各种用例。它的许多例子包括:

  • 实时处理支付和金融交易,例如在证券交易所、银行和保险中。
  • 实时跟踪和监控汽车、卡车、车队和货运,例如物流和汽车行业。
  • 持续捕获和分析来自物联网设备或其他设备(例如工厂和风电场)的传感器数据。
  • 收集客户互动和订单并立即做出反应,例如零售、酒店和旅游业以及移动应用程序。
  • 监测医院护理中的患者并预测病情变化,以确保在紧急情况下及时得到治疗。
  • 连接、存储并提供公司不同部门生成的数据。
  • 作为数据平台、事件驱动架构和微服务的基础。

Apache Kafka® 是一个事件流平台。这意味着什么?

Kafka 结合了三个关键功能,因此您可以使用 一个经过实战检验的解决方案来实现端到端事件流的 用例:

  1. 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
  2. 根据需要持久可靠地存储事件流。
  3. 在事件发生时或回顾性地处理事件流。

所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以选择自行管理 Kafka 环境,也可以选择使用各种供应商提供的完全托管服务。

简而言之,Kafka 是如何工作的?

Kafka是一个分布式系统,由通过高性能TCP网络协议进行通信的服务器客户端组成。它可以部署在本地和云环境中的裸机硬件、虚拟机和容器上。

服务器:Kafka 作为一台或多台服务器的集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以持续导入和导出数据作为事件流,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何服务器发生故障,其他服务器将接管其工作,以确保连续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,即使在网络问题或机器故障的情况下,也可以以容错的方式并行、大规模地读取、写入和处理事件流。Kafka 附带了一些此类客户端,并由 Kafka 社区提供的数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,适用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。

主要概念和术语

事件记录了世界上或您的企业中“发生了一些事情”的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。这是一个示例事件:

  • Event key: "Alice"
  • Event value: "Made a payment of $200 to Bob"
  • Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

生产者是将事件发布(写入)到 Kafka 的客户端应用程序,而消费者是订阅(读取和处理)这些事件的客户端应用程序。在 Kafka 中,生产者和消费者彼此完全解耦且互不可知,这是实现 Kafka 闻名的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如一次性处理事件的能力。

事件被组织并持久存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。示例主题名称可以是“付款”。Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统消息传递系统不同,事件在使用后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应保留事件的时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全可以的。

主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。这种数据的分布式放置对于可扩展性非常重要,因为它允许客户端应用程序同时从多个代理读取数据或向多个代理写入数据。当新事件发布到主题时,它实际上会附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一分区,并且 Kafka保证给定主题分区的任何消费者将始终按照与写入的顺序完全相同的顺序读取该分区的事件。

图:此示例主题有四个分区 P1–P4。两个不同的生产者客户端通过网络将事件写入主题的分区,相互独立地向主题发布新事件。具有相同键(在图中用颜色表示)的事件将写入同一分区。请注意,如果合适,两个生产者都可以写入同一分区。

为了使您的数据具有容错性和高可用性,每个主题都可以复制,甚至可以跨地理区域或数据中心进行复制,因此始终有多个代理拥有数据副本,以防出现问题时,您希望对broker进行维护等等。常见的生产设置是复制因子为 3,即始终存在数据的三个副本。此复制是在主题分区级别执行的。

这本入门读物对于介绍来说应该足够了。如果您有兴趣,文档的设计部分详细解释了 Kafka 的各种概念。

Kafka API

除了用于管理和管理任务的命令行工具之外,Kafka 还有五个适用于 Java 和 Scala 的核心 API:

  • 用于管理和检查主题、代理和其他 Kafka 对象的 管理 API 。
  • Producer API,用于将事件流发布(写入)到一个或多个 Kafka 主题。
  • Consumer API用于订阅(读取)一个或多个主题并处理为其生成的事件流。
  • Kafka Streams API用于实现流处理应用程序和微服务。它提供了更高级别的函数来处理事件流,包括转换、有状态操作(例如聚合和连接)、窗口、基于事件时间的处理等等。从一个或多个主题读取输入,以便生成一个或多个主题的输出,从而有效地将输入流转换为输出流。
  • Kafka Connect API用于构建和运行可重用的数据导入/导出连接器,这些连接器消耗(读取)或生成(写入)来自外部系统和应用程序的事件流,以便它们可以与 Kafka 集成。例如,关系数据库(如 PostgreSQL)的连接器可能会捕获对一组表的每个更改。然而,在实践中,您通常不需要实现自己的连接器,因为 Kafka 社区已经提供了数百个现成的连接器。

从这往哪儿走

1.2 使用案例

以下是 Apache Kafka® 的一些流行用例的描述。有关其中一些正在实施的领域的概述,请参阅此博客文章

消息传递

Kafka 可以很好地替代更传统的消息代理。使用消息代理的原因有多种(将处理与数据生产者分离、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

根据我们的经验,消息传递的使用通常吞吐量相对较低,但可能需要较低的端到端延迟,并且通常依赖于 Kafka 提供的强大的持久性保证。

在这个领域,Kafka 可以与ActiveMQ或 RabbitMQ等传统消息系统相媲美。

网站活动跟踪

Kafka 的最初用例是能够将用户活动跟踪管道重建为一组实时发布-订阅源。这意味着站点活动(页面浏览、搜索或用户可能执行的其他操作)将发布到中心主题,每种活动类型一个主题。这些源可用于一系列用例的订阅,包括实时处理、实时监控以及加载到 Hadoop 或离线数据仓库系统中以进行离线处理和报告。

活动跟踪的量通常非常大,因为每个用户页面视图都会生成许多活动消息。

指标

Kafka常用于运行监控数据。这涉及聚合来自分布式应用程序的统计数据以生成集中的操作数据源。

日志聚合

许多人使用 Kafka 作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在一个中心位置(可能是文件服务器或 HDFS)进行处理。Kafka 抽象了文件的详细信息,并将日志或事件数据作为消息流提供了更清晰的抽象。这可以实现更低延迟的处理,并更轻松地支持多个数据源和分布式数据消费。与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka 提供同样良好的性能、由于复制而提供更强的持久性保证以及更低的端到端延迟。

流处理

Kafka 的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从 Kafka 主题中消费,然后聚合、丰富或以其他方式转换为新主题以供进一步消费或后续处理。例如,用于推荐新闻文章的处理管道可能会从 RSS 源中抓取文章内容并将其发布到“文章”主题;进一步处理可能会规范化或删除重复内容,并将清理后的文章内容发布到新主题;最后的处理阶段可能会尝试向用户推荐该内容。此类处理管道根据各个主题创建实时数据流图。从0.10.0.0开始,一个轻量级但功能强大的流处理库,称为Kafka Streams Apache Kafka 中可以执行上述数据处理。除了 Kafka Streams 之外,替代的开源流处理工具还包括Apache Storm和 Apache Samza

事件溯源

事件溯源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列。Kafka 对非常大的存储日志数据的支持使其成为以此风格构建的应用程序的出色后端。

提交日志

Kafka 可以充当分布式系统的一种外部提交日志。日志有助于在节点之间复制数据,并充当故障节点恢复数据的重新同步机制。Kafka 中的日志压缩功能有助于支持这种用法。在这种用法中,Kafka 类似于Apache BookKeeper项目。

1.3 快速入门

第 1 步:获取Kafka

下载 最新的 Kafka 版本并解压:

$ tar -xzf kafka_2.13-3.5.0.tgz
$ cd kafka_2.13-3.5.0

第 2 步:启动KAFKA环境

注意:您的本地环境必须安装 Java 8+。

Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动。要开始使用任一配置,请按照以下部分之一进行操作,但不能同时进行这两个部分的操作。

Kafka与动物园管理员

运行以下命令以便以正确的顺序启动所有服务:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

所有服务成功启动后,您将拥有一个正在运行并可供使用的基本 Kafka 环境。

Kafka与 Kraft

生成集群 UUID

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

设置日志目录格式

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动Kafka服务器

$ bin/kafka-server-start.sh config/kraft/server.properties

一旦 Kafka 服务器成功启动,您将拥有一个正在运行并可供使用的基本 Kafka 环境。

第 3 步:创建一个主题来存储您的事件

Kafka 是一个分布式事件流平台,可让您跨多台机器读取、写入、存储和处理 事件(在文档中 也称为记录或 消息)。

示例事件包括支付交易、手机的地理位置更新、运输订单、物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。

因此,在编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

所有 Kafka 的命令行工具都有附加选项:运行kafka-topics.sh不带任何参数的命令来显示使用信息。例如,它还可以向您显示 新主题的 分区计数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1    Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

第 4 步:将一些事件写入主题

Kafka 客户端通过网络与 Kafka 代理进行通信以写入(或读取)事件。一旦收到,代理将以持久且容错的方式存储事件,只要您需要,甚至永远存储。

运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致将一个单独的事件写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C您可以随时 停止生产者客户端。

第 5 步:阅读事件

打开另一个终端会话并运行控制台消费者客户端来读取您刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C您可以随时停止消费者客户端。

请随意尝试:例如,切换回生产者终端(上一步)以编写其他事件,并查看事件如何立即显示在消费者终端中。

由于事件持久存储在 Kafka 中,因此它们可以被任意数量的消费者读取任意多次。您可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。

第 6 步:使用 KAFKA CONNECT 将数据作为事件流导入/导出

您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及已经使用这些系统的许多应用程序。 Kafka Connect允许您不断地将数据从外部系统摄取到 Kafka 中,反之亦然。它是一个运行连接器的可扩展工具 ,它实现与外部系统交互的自定义​​逻辑。因此,将现有系统与 Kafka 集成非常容易。为了使这个过程更加容易,有数百个这样的连接器可供使用。

在本快速入门中,我们将了解如何使用简单的连接器运行 Kafka Connect,将数据从文件导入到 Kafka 主题,并将数据从 Kafka 主题导出到文件。

首先,确保添加connect-file-3.5.0.jarplugin.pathConnect 工作线程配置中的属性。出于本快速入门的目的,我们将使用相对路径并将连接器的包视为 uber jar,它在从安装目录运行快速入门命令时起作用。但是,值得注意的是,对于生产部署,使用绝对路径始终是更好的选择。有关如何设置此配置的详细说明, 请参阅plugin.path 。

编辑config/connect-standalone.properties文件,添加或更改与plugin.path以下内容匹配的配置属性,然后保存文件:

> echo "plugin.path=libs/connect-file-3.5.0.jar"

然后,首先创建一些种子数据进行测试:

> echo -e "foo\nbar" > test.txt

或者在 Windows 上:

> echo foo> test.txt
> echo bar>> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka 代理和数据的序列化格式。其余配置文件各自指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些示例配置文件包含在 Kafka 中,使用您之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,用于从输入文件读取行并将每行生成到 Kafka 主题,第二个是接收器连接器从 Kafka 主题读取消息并将每条消息生成为输出文件中的一行。

在启动过程中,您将看到许多日志消息,其中包括一些指示连接器正在实例化的日志消息。一旦 Kafka Connect 进程启动,源连接器应开始从test.txt主题读取行并将其生成到主题connect-test,接收器连接器应开始从主题读取消息connect-test 并将其写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:

> more test.sink.txt
foo
bar

请注意,数据存储在 Kafka topic 中connect-test,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,因此我们可以将数据添加到文件并查看它在管道中移动:

> echo Another line>> test.txt

您应该看到该行出现在控制台使用者输出和接收器文件中。

第 7 步:使用 KAFKA STREAMS 处理您的事件

将数据作为事件存储在 Kafka 中后,您可以使用适用于 Java/Scala 的Kafka Streams客户端库 处理数据 。它允许您实现关键任务实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 服务器端集群技术的优点相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式性。该库支持一次性处理、有状态操作和聚合、窗口、连接、基于事件时间的处理等等。

为了让您初步体验一下,以下是如何实现流行的WordCount算法:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams 演示 和应用程序开发教程 演示了如何从头到尾编码和运行此类流应用程序。

步骤8:终止KAFKA环境

现在您已经完成了快速入门,可以随意拆除 Kafka 环境,或者继续尝试。

  1. Ctrl-C如果您还没有这样做, 请使用 停止生产者和消费者客户端。
  2. 使用 停止 Kafka 代理Ctrl-C
  3. 最后,如果遵循 Kafka with ZooKeeper 部分,请使用 停止 ZooKeeper 服务器Ctrl-C

如果您还想删除本地 Kafka 环境的任何数据,包括您在此过程中创建的任何事件,请运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

恭喜!

您已成功完成 Apache Kafka 快速入门。

要了解更多信息,我们建议执行以下后续步骤:

1.4 生态系统

在主发行版之外,有大量与 Kafka 集成的工具。生态系统页面列出了其中的许多内容,包括流处理系统、Hadoop 集成、监控和部署工具。

1.5 旧版本升级

从 0.8.x 到 3.4.x 的任何版本升级到 3.5.1

所有升级步骤与升级到 3.5.0 相同

3.5.1 中的显着变化

  • 将依赖项 snappy-java 升级到不易受 CVE-2023-34455 影响的版本。 您可以在 Kafka CVE 列表中找到有关 CVE 的更多信息
  • 修复了 3.3.0 中引入的回归,该回归导致security.protocol配置值仅限于大写。修复后,security.protocol值不区分大小写。有关详细信息,请参阅KAFKA-15053

从 0.8.x 到 3.4.x 的任何版本升级到 3.5.0

3.5.0 中的显着变化

  • Kafka Streams 引入了一种新的状态存储类型,即版本化键值存储,用于每个键存储多个记录版本,从而使带时间戳的检索操作能够返回指定时间戳的最新记录(每个键)。 有关更多详细信息,请参阅KIP-889 和KIP-914 。如果在 DSL 中使用新的存储类型,则将应用改进的处理语义,如 KIP-914中所述。
  • KTable 聚合语义通过KIP-904得到了进一步改进 ,现在避免了虚假的中间结果。
  • Kafka StreamsProductionExceptionHandler通过 KIP-399进行了改进,现在还涵盖了序列化错误。
  • MirrorMaker 现在默认使用增量AlterConfigs API 来同步主题配置,而不是已弃用的 alterConfigs API。引入了一个名为 的新设置,use.incremental.alter.configs允许用户控制要使用的 API。当始终使用增量AlterConfigs API 时,此新设置已标记为已弃用,并将在下一个主要版本中删除。有关更多详细信息, 请参阅KIP-894 。
  • JmxTool、EndToEndLatency、StreamsResetter、ConsumerPerformance 和 ClusterTool 已迁移到工具模块。“kafka.tools”包已弃用,并将在下一个主要版本中更改为“org.apache.kafka.tools”。有关更多详细信息, 请参阅KAFKA-14525 。

升级基于ZooKeeper的集群

如果您要从 2.1.x 之前的版本升级,请参阅下面步骤 5 中有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.43.3等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.43.3等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.5。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.5 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

升级基于KRaft的集群

如果您是从 3.3.0 之前的版本升级,请参阅下面步骤 3 中的注释。将metadata.version更改为最新版本后,将无法降级到3.3-IV0之前的版本。

对于滚动升级:

  1. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。
  2. 一旦集群的行为和性能得到验证,通过运行来提升metadata.version ./bin/kafka-features.sh upgrade --metadata 3.5
  3. 请注意,集群元数据版本升级后无法降级到预生产 3.0.x、3.1.x 或 3.2.x 版本。但是,可以降级到生产版本,例如 3.3-IV0、3.3-IV1 等。

从 0.8.x 到 3.3.x 的任何版本升级到 3.4.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.33.2等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.33.2等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.4。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.4 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

将基于 KRaft 的集群从 3.0.x 到 3.3.x 的任何版本升级到 3.4.0

如果您是从 3.3.0 之前的版本升级,请参阅下面的注释。将metadata.version更改为最新版本后,将无法降级到3.3-IV0之前的版本。

对于滚动升级:

  1. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。
  2. 一旦集群的行为和性能得到验证,通过运行来提升metadata.version ./bin/kafka-features.sh upgrade --metadata 3.4
  3. 请注意,集群元数据版本升级后无法降级到预生产 3.0.x、3.1.x 或 3.2.x 版本。但是,可以降级到生产版本,例如 3.3-IV0、3.3-IV1 等。

3.4.0 中的显着变化

  • 自 Apache Kafka 3.4.0 起,我们添加了一个系统属性(“org.apache.kafka.disallowed.login.modules”)来禁用 SASL JAAS 配置中有问题的登录模块使用。默认情况下,“com.sun.security.auth.module.JndiLoginModule”在 Apache Kafka 3.4.0 中被禁用。

从 0.8.x 到 3.2.x 的任何版本升级到 3.3.1

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.23.1等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.23.1等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.3。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.3 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

将基于 KRaft 的集群从 3.0.x 到 3.2.x 的任何版本升级到 3.3.1

如果您要从 3.3.1 之前的版本升级,请参阅下面的注释。将metadata.version更改为最新版本后,将无法降级到3.3-IV0之前的版本。

对于滚动升级:

  1. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。
  2. 一旦集群的行为和性能得到验证,通过运行来提升metadata.version ./bin/kafka-features.sh upgrade --metadata 3.3
  3. 请注意,集群元数据版本升级后无法降级到预生产 3.0.x、3.1.x 或 3.2.x 版本。但是,可以降级到生产版本,例如 3.3-IV0、3.3-IV1 等。

3.3.1 中的显着变化

  • KRaft 模式已为新集群做好生产准备。 有关更多详细信息(包括限制), 请参阅KIP-833 。
  • 默认情况下用于没有键的记录的分区器已得到改进,以避免当一个或多个代理速度缓慢时出现病态行为。新逻辑可能会影响批处理行为,可以使用batch.size和/或linger.ms配置设置来调整批处理行为。可以通过设置恢复以前的行为partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner。有关更多详细信息, 请参阅KIP-794 。
  • 如上所述,现在 KRaft 集群的升级过程与基于 ZK 的集群略有不同。
  • 引入了一个新的 API,如果不存在,addMetricIfAbsent它将Metrics创建一个新的指标;如果已经注册,则返回相同的指标。请注意,此行为与 API 不同,后者在尝试创建已存在的指标时addMetric抛出。IllegalArgumentException( 有关更多详细信息, 请参阅KIP-843 )。

从 0.8.x 到 3.1.x 的任何版本升级到 3.2.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.13.0等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.13.0等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.2。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.2 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

3.2.0 中的显着变化

  • 如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。当向 2.8.0 之前的经纪商生产时,IDEMPOTENT_WRITE需要许可。有关详细信息,请检查KIP-679的兼容性部分 。在 3.0.0 和 3.1.0 中,一个错误阻止了应用此默认值,这意味着除非用户明确设置enable.idempotence为 true,否则幂等性仍然处于禁用状态(有关更多详细信息,请参阅KAFKA-13598)。此问题已修复,默认值已在 3.0.1、3.1.1 和 3.2.0 中正确应用。
  • 一个值得注意的例外是 Connect,它默认禁用所有生产者的幂等行为,以便统一支持使用各种 Kafka 代理版本。用户可以更改此行为,以通过 Connect Worker 和/或连接器配置为部分或所有生产者启用幂等性。Connect 可能会在未来的主要版本中默认启用幂等生产者。
  • 出于安全考虑,Kafka 用 reload4j 取代了 log4j。这仅影响指定日志记录后端的模块(connect-runtimekafka-tools是两个这样的示例)。许多模块(包括 )kafka-clients将其留给应用程序来指定日志记录后端。更多信息可以在reload4j找到。依赖于 Kafka 项目中受影响模块的项目应使用 slf4j-log4j12 版本 1.7.35 或更高版本或 slf4j-reload4j 以避免 源自日志记录框架的可能的兼容性问题
  • 示例连接器FileStreamSourceConnectorFileStreamSinkConnector已从默认类路径中删除。要在 Kafka Connect 独立或分布式模式下使用它们,需要显式添加它们,例如CLASSPATH=./lib/connect-file-3.2.0.jar ./bin/connect-distributed.sh.

从 0.8.x 到 3.0.x 的任何版本升级到 3.1.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.02.8等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如3.02.8等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.1。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.1 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

3.1.1 中的显着变化

  • 如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。当向 2.8.0 之前的经纪商生产时,IDEMPOTENT_WRITE需要许可。有关详细信息,请检查KIP-679的兼容性部分 。一个错误阻止了生产者幂等性默认值的应用,这意味着除非用户明确设置enable.idempotence为 true,否则它仍然处于禁用状态。有关更多详细信息,请参阅KAFKA-13598 。此问题已修复并且默认值已正确应用。
  • 一个值得注意的例外是 Connect,它默认禁用所有生产者的幂等行为,以便统一支持使用各种 Kafka 代理版本。用户可以更改此行为,以通过 Connect Worker 和/或连接器配置为部分或所有生产者启用幂等性。Connect 可能会在未来的主要版本中默认启用幂等生产者。
  • 出于安全考虑,Kafka 用 reload4j 取代了 log4j。这仅影响指定日志记录后端的模块(connect-runtimekafka-tools是两个这样的示例)。许多模块(包括 )kafka-clients将其留给应用程序来指定日志记录后端。更多信息可以在reload4j找到。依赖于 Kafka 项目中受影响模块的项目应使用 slf4j-log4j12 版本 1.7.35 或更高版本或 slf4j-reload4j 以避免 源自日志记录框架的可能的兼容性问题

3.1.0 中的显着变化

  • Apache Kafka 支持 Java 17。
  • 以下指标已被弃用:bufferpool-wait-time-totalio-waittime-totaliotime-total。请使用bufferpool-wait-time-ns-totalio-wait-time-ns-total、 和io-time-ns-total代替。 有关更多详细信息,请参阅KIP-773 。
  • IBP 3.1 将主题 ID 作为KIP-516的一部分引入 FetchRequest 。

从 0.8.x 到 2.8.x 的任何版本升级到 3.0.1

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.82.7等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.82.7等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本3.0。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 3.0 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

3.0.1 中的显着变化

  • 如果没有设置冲突的配置,默认情况下会启用生产者的幂等性。当向 2.8.0 之前的经纪商生产时,IDEMPOTENT_WRITE需要许可。有关详细信息,请检查KIP-679的兼容性部分 。一个错误阻止了生产者幂等性默认值的应用,这意味着除非用户明确设置enable.idempotence为 true,否则它仍然处于禁用状态。有关更多详细信息,请参阅KAFKA-13598 。此问题已修复并且默认值已正确应用。

3.0.0 中的显着变化

  • 默认情况下,生产者具有更强的交付保证:idempotence已启用并acks设置all为而不是1。有关详细信息,请参阅KIP-679。在 3.0.0 和 3.1.0 中,一个错误阻止应用幂等默认值,这意味着除非用户明确设置 enable.idempotence为 true,否则它仍然处于禁用状态。请注意,该错误并不影响acks=all更改。有关更多详细信息,请参阅KAFKA-13598 。此问题已修复,默认值已在 3.0.1、3.1.1 和 3.2.0 中正确应用。
  • 自 Apache Kafka 3.0 起,Java 8 和 Scala 2.12 支持已被弃用,并将在 Apache Kafka 4.0 中删除。有关更多详细信息,请参阅KIP-750 和KIP-751 。
  • ZooKeeper已升级至3.6.3版本。
  • KRaft 模式的预览版可用,但无法从 2.8 早期访问版本升级到该模式。config/kraft/README.md详细信息请参阅文件。
  • 发布 tarball 不再包含测试、源、javadoc 和测试源 jar。这些仍然发布到 Maven 中央存储库。
  • 现在,运行时类路径(而不是编译和运行时类路径)中提供了许多实现依赖项 jar 。升级后的编译错误可以通过显式添加缺少的依赖项 jar 或更新应用程序以不使用内部类来修复。
  • 消费者配置的默认值session.timeout.ms从 10 秒增加到 45 秒。有关更多详细信息,请参阅 KIP-735 。
  • 代理配置log.message.format.version和主题配置message.format.version已被弃用。两种配置的值始终假定为3.0if或inter.broker.protocol.version更高3.0。如果设置了log.message.format.versionmessage.format.version,我们建议在升级到 3.0 的同时清除它们 inter.broker.protocol.version。这将避免降级时潜在的兼容性问题inter.broker.protocol.version 。有关更多详细信息,请参阅KIP-724 。
  • Streams API 删除了在 2.5.0 或更早版本中已弃用的所有已弃用的 API。有关已删除 API 的完整列表,请比较详细的 Kafka Streams 升级说明。
  • Kafka Streams 不再对“connect:json”模块有编译时依赖性 ( KAFKA-5146 )。依赖这种传递依赖的项目必须显式声明它。
  • 通过指定的自定义主体构建器实现principal.builder.class现在必须实现该 KafkaPrincipalSerde接口以允许在代理之间转发。有关 KafkaPrincipalSerde 使用的更多详细信息,请参阅KIP-590 。
  • 许多已弃用的类、方法和工具已从、 和clients模块connect中删除:core``tools

  • ScalaAuthorizerSimpleAclAuthorizer相关类已被删除。请使用JavaAuthorizer 和AclAuthorizer代替。

  • Metric#value()方法已被删除(KAFKA-12573)。
  • SumTotal已被删除(KAFKA-12584)。请使用WindowedSumandCumulativeSum代替。
  • CountSampledTotal被删除。请分别使用WindowedCountWindowedSum 来代替。
  • PrincipalBuilderDefaultPrincipalBuilderResourceFilter已被删除。
  • SslConfigsSaslConfigsAclBinding中 删除了各种常量和构造函数AclBindingFilter
  • 这些Admin.electedPreferredLeaders()方法已被删除。请改用Admin.electLeaders
  • 命令行工具kafka-preferred-replica-election已被删除。请改用kafka-leader-election
  • --zookeeper选项已从kafka-topicskafka-reassign-partitions命令行工具中删除。请改用--bootstrap-server
  • kafka-configs命令行工具中,该--zookeeper选项仅支持更新SCRAM 凭证配置 以及在代理未运行时描述/更新动态代理配置。请用于--bootstrap-server 其他配置操作。
  • 构造函数ConfigEntry被删除(KAFKA-12577)。请改用剩余的公共构造函数。
  • default客户端配置的配置值client.dns.lookup已被删除。万一您显式设置此配置,我们建议您保留该配置未设置(use_all_dns_ips默认情况下使用)。
  • ExtendedDeserializerExtendedSerializer已被删除。请使用Deserializer andSerializer代替。
  • close(long, TimeUnit)方法已从生产者、消费者和管理客户端中删除。请使用 close(Duration).
  • ConsumerConfig.addDeserializerToConfig方法ProducerConfig.addSerializerToConfig已被删除。这些方法无意成为公共 API,并且没有替代方法。
  • NoOffsetForPartitionException.partition()方法已被删除。请改用partitions() 。
  • 默认值partition.assignment.strategy更改为“[RangeAssignor, CooperativeStickyAssignor]”,默认情况下将使用 RangeAssignor,但允许升级到 CooperativeStickyAssignor,只需一次滚动弹跳即可从列表中删除 RangeAssignor。请在此处查看客户端升级路径指南以了解更多详细信息。
  • Scalakafka.common.MessageFormatter被删除了。请使用Java org.apache.kafka.common.MessageFormatter.
  • MessageFormatter.init(Properties)方法已被删除。请改用configure(Map)
  • checksum()方法已从ConsumerRecord和中删除RecordMetadata。消息格式 v2(从 0.11 开始一直是默认格式)将校验和从记录移至记录批次。因此,这些方法没有意义,也不存在替代方法。
  • 该类ChecksumMessageFormatter已被删除。它不是公共 API 的一部分,但可能已与kafka-console-consumer.sh. 它报告了每条记录的校验和,自消息格式 v2 以来不再支持该校验和。
  • 该类org.apache.kafka.clients.consumer.internals.PartitionAssignor已被删除。请改用 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
  • 和配置已被删除 ( quota.producer.defaultKAFKA -12591 )。必须改用动态配额默认值。quota.consumer.default
  • 和配置已被port删除host.name。请改用listeners
  • 和配置已被advertised.port删除advertised.host.name。请改用advertised.listeners
  • 已弃用的工作程序配置rest.host.name已从Kafka Connect 工作程序配置中rest.port删除 ( KAFKA-12482 )。请改用listeners

  • Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被弃用。请 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)改用,其中ConsumerGroupMetadata 可以通过 检索KafkaConsumer#groupMetadata()以获得更强的语义。请注意,完整的消费者组元数据集只有代理或 2.5 或更高版本才能理解,因此您必须升级您的 kafka 集群以获得更强大的语义。否则,您可以直接new ConsumerGroupMetadata(consumerGroupId)与老broker合作。有关更多详细信息, 请参阅KIP-732 。

  • 连接internal.key.converterinternal.value.converter属性已被完全删除。自版本 2.0.0 以来,已弃用这些 Connect 工作线程属性的使用。现在,工作人员被硬编码为使用 JSON 转换器并schemas.enable设置为false如果您的集群一直在使用不同的内部键或值转换器,您可以按照KIP-738中概述的迁移步骤 将 Connect 集群安全升级到 3.0。
  • 基于 Connect 的 MirrorMaker (MM2) 包括对支持的更改IdentityReplicationPolicy,无需重命名主题即可实现复制。默认情况下仍使用现有的DefaultReplicationPolicy,但可以通过 replication.policy配置属性启用身份复制。这对于从较旧的 MirrorMaker (MM1) 迁移的用户,或者对于具有简单单向复制拓扑且不希望主题重命名的用例特别有用。请注意IdentityReplicationPolicy,与 不同 DefaultReplicationPolicy, 无法阻止基于主题名称的复制循环,因此在构建复制拓扑时请注意避免循环。
  • 最初的 MirrorMaker (MM1) 和相关类已被弃用。请使用基于 Connect 的 MirrorMaker (MM2),如 异地复制部分中所述。

从 0.8.x 到 2.7.x 的任何版本升级到 2.8.1

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.72.6等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.72.6等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本2.8。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.8 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

2.8.0 中的显着变化

  • 2.8.0 版本向KIP-679 中引入的授权者接口添加了一个新方法 。其动机是解锁我们未来的计划,以默认启用最强的消息传递保证。自定义授权者应考虑提供更有效的实现,支持审核日志记录和任何自定义配置或访问规则。
  • IBP 2.8 将主题 ID 作为KIP-516 的一部分引入主题 。使用 ZooKeeper 时,此信息存储在 TopicZNode 中。如果集群降级到以前的 IBP 或版本,未来的主题将不会获得主题 ID,并且不保证主题将在 ZooKeeper 中保留其主题 ID。这意味着再次升级时,某些主题或所有主题将被分配新的 ID。
  • Kafka Streams 引入了类型安全split()运算符作为已弃用方法的替代KStream#branch()(参见KIP-418)。

从 0.8.x 到 2.6.x 的任何版本升级到 2.7.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.62.5等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.62.5等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本2.7。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.7 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

2.7.0 中的显着变化

  • 2.7.0 版本包含 KIP-595中指定的核心 Raft 实现。有一个单独的“raft”模块包含大部分逻辑。在与控制器的集成完成之前,用户可以使用一个独立的服务器来测试 Raft 实现的性能。详细信息请参见 raft 模块中的 README.md
  • KIP-651添加了 对使用 PEM 文件进行密钥和信任存储的 支持。
  • KIP-612添加了 对强制代理范围和每个侦听器连接创建速率的支持。2.7.0 版本包含 KIP-612 的第一部分,动态配置将在 2.8.0 版本中出现。
  • 能够限制主题和分区创建或主题删除,以防止集群因 KIP-599受到损害
  • 当 Kafka 推出新功能时,存在两个主要问题:

    1. Kafka 客户端如何了解代理功能?
    2. 经纪商如何决定启用哪些功能?

    KIP-584 提供了灵活且易于操作的解决方案,只需一次重启即可实现客户端发现、功能门控和滚动升级。 * ConsoleConsumer现在可以通过KIP-431 打印记录偏移量和标题 * KIP-554 的添加 继续朝着从 Kafka 中删除 Zookeeper 的目标迈进。添加 KIP-554 意味着您不必再直接连接到 ZooKeeper 来管理 SCRAM 凭证。 * 更改现有侦听器的不可重新配置的配置会导致InvalidRequestException. 相比之下,之前的(意外的)行为会导致更新的配置被保留,但直到代理重新启动后才会生效。有关更多讨论,请参阅KAFKA-10479 。请参阅DynamicBrokerConfig.DynamicSecurityConfigsSocketServer.ListenerReconfigurableConfigs 了解现有侦听器支持的可重新配置配置。 * Kafka Streams 在 KStreams DSL 中 添加了对滑动 Windows 聚合的支持。 * 状态存储上的反向迭代可使用KIP-617 实现更高效的最新更新搜索 * Kafka Steams 中的端到端延迟指标请参阅 KIP-613 了解更多详细信息 * Kafka Streams 添加了使用KIP-607 报告默认 RocksDB 属性的指标 * KIP-616 提供更好的 Scala 隐式 Serdes 支持

从 0.8.x 到 2.5.x 的任何版本升级到 2.6.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.52.4等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.52.4等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本2.6。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.6 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

2.6.0 中的显着变化

  • Kafka Streams 添加了一种新的处理模式(需要代理 2.5 或更高版本),该模式使用一次性保证提高应用程序可扩展性(参见KIP-447
  • Java 11 或更高版本默认启用 TLSv1.3。如果客户端和服务器都支持 TLSv1.3,则将协商 TLSv1.3,否则回退到 TLSv1.2。有关更多详细信息, 请参阅 KIP-573 。
  • 配置的默认值client.dns.lookup已从 更改default 为use_all_dns_ips。如果主机名解析为多个 IP 地址,客户端和代理现在将尝试按顺序连接到每个 IP,直到成功建立连接。 有关更多详细信息, 请参阅 KIP-602 。
  • NotLeaderForPartitionException已被弃用并替换为NotLeaderOrFollowerException. 如果代理不是副本,则仅针对领导者或追随者的获取请求和其他请求将返回 NOT_LEADER_OR_FOLLOWER(6) 而不是 REPLICA_NOT_AVAILABLE(9),确保所有客户端将重新分配期间的短暂错误作为可重试异常进行处理。

从 0.8.x 到 2.4.x 的任何版本升级到 2.5.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.42.3等)
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如2.42.3等) 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑协议版本 inter.broker.protocol.version并将其设置为 来提升协议版本2.5。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.5 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。 6. KIP-455kafka-reassign-partitions.sh 完成后, 重新分配工具发生了一些显着的变化。该工具现在要求在更改主动重新分配的限制时提供标志。现在可以使用该 命令取消重新分配。最后,重新分配 已被弃用,取而代之的是。有关更多详细信息,请参阅 KIP。 --additional``--cancel``--zookeeper``--bootstrap-server

2.5.0 中的显着变化

  • RebalanceProtocol#COOPERATIVE使用时,Consumer#poll当它处于消费者仍然拥有的那些分区的重新平衡过程中时,仍然可以返回数据;此外 Consumer#commitSync,现在可能会抛出一个非致命RebalanceInProgressException事件来通知用户此类事件,以便与致命事件区分开来CommitFailedException并允许用户完成正在进行的重新平衡,然后重新尝试为那些仍然拥有的分区提交偏移量。
  • 为了提高典型网络环境中的弹性,默认值 zookeeper.session.timeout.ms已从 6 秒增加到 18 秒, replica.lag.time.max.ms从 10 秒增加到 30 秒。
  • cogroup()添加了新的 DSL 运算符,用于一次将多个流聚合在一起。
  • 添加了一个新的KStream.toTable()API,用于将输入事件流转换为 KTable。
  • 添加了新的 Serde 类型Void来表示输入主题中的空键或空值。
  • 已弃用UsePreviousTimeOnInvalidTimestamp并替换为UsePartitionTimeOnInvalidTimeStamp.
  • 通过添加挂起的偏移防护机制和更强的事务提交一致性检查来改进精确一次语义,这极大地简化了可扩展的精确一次应用程序的实现。我们还在示例文件夹下添加了一个新的一次性语义代码示例 。查看 KIP-447 了解完整详细信息。
  • 添加了新的公共 apiKafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried. It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.
  • 通过弃用并将KafkaStreams.store(String, QueryableStoreType)其替换为KafkaStreams.store(StoreQueryParameters).
  • 添加了一个新的公共 api,用于访问实例本地存储的滞后信息KafkaStreams.allLocalStorePartitionLags()
  • 不再支持 Scala 2.11。详细信息请参阅 KIP-531 。
  • 包中的所有 Scala 类kafka.security.auth均已弃用。 有关 2.4.0 中添加的新 Java 授权者 API 的详细信息,请参阅 KIP-504 。请注意,kafka.security.auth.Authorizer 和kafka.security.auth.SimpleAclAuthorizer在 2.4.0 中已弃用。
  • 默认情况下,TLSv1 和 TLSv1.1 已被禁用,因为它们存在已知的安全漏洞。现在默认仅启用 TLSv1.2。ssl.protocol您可以通过在配置选项和 中显式启用 TLSv1 和 TLSv1.1 来继续使用它们 ssl.enabled.protocols
  • ZooKeeper 已升级到 3.5.7,如果 3.4 数据目录中没有快照文件,ZooKeeper 从 3.4.X 升级到 3.5.7 可能会失败。这通常发生在测试升级中,其中 ZooKeeper 3.5.7 尝试加载尚未创建快照文件的现有 3.4 数据目录。有关该问题的更多详细信息,请参阅ZOOKEEPER-3056ZOOKEEPER-3056中给出了修复,即在升级之前 设置snapshot.trust.empty=true 配置。zookeeper.properties
  • ZooKeeper 版本 3.5.7 支持使用或不使用客户端证书与 ZooKeeper 进行 TLS 加密连接,并且可以使用其他 Kafka 配置来利用此功能。详细信息请参阅KIP-515

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级,或者2.2.x 或 2.3.x 至 2.4.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.10.0、0.11.0、1.0、2.0、2.2)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从0.11.0.x或更高版本升级,并且没有覆盖消息格式,那么您只需要覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0、1.0、1.1、2.0、2.1、2.2、2.3)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.4 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.4 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

附加升级说明:

  1. ZooKeeper已升级至3.5.6。如果 3.4 数据目录中没有快照文件,ZooKeeper 从 3.4.X 升级到 3.5.6 可能会失败。这通常发生在测试升级中,其中 ZooKeeper 3.5.6 尝试加载尚未创建快照文件的现有 3.4 数据目录。有关该问题的更多详细信息,请参阅ZOOKEEPER-3056ZOOKEEPER-3056中给出了修复,即在升级之前设置snapshot.trust.empty=true 配置。zookeeper.properties但我们观察到使用 snapshot.trust.empty=trueconfig.js 进行独立集群升级时会出现数据丢失的情况。有关该问题的更多详细信息,请参阅ZOOKEEPER-3644因此,我们建议使用复制空快照的安全解决方法如果3.4数据目录中没有快照文件,则将文件复制到3.4数据目录中。有关解决方法的更多详细信息,请参阅ZooKeeper 升级常见问题解答
  2. ZooKeeper 3.5 中添加了 基于嵌入式 Jetty 的AdminServer 。zookeeper.propertiesAdminServer 在 ZooKeeper 中默认启用,并在端口 8080 上启动。AdminServer 在Apache Kafka 发行版提供的ZooKeeper 配置 ( ) 中默认禁用。如果您想禁用管理服务器,请确保更新您的本地zookeeper.properties文件。admin.enableServer=false请参考AdminServer 配置来配置 AdminServer。

2.4.0 中的显着变化

  • 添加了新的管理 API,用于分区重新分配。由于改变了 Kafka 传播重新分配信息的方式,因此在升级到新版本时,在故障边缘情况下可能会丢失重新分配状态。不建议在升级时开始重新分配。
  • ZooKeeper已从3.4.14升级到3.5.6。新版本支持TLS和动态重新配置。
  • 命令bin/kafka-preferred-replica-election.sh行工具已被弃用。它已被替换为bin/kafka-leader-election.sh.
  • electPreferredLeadersJava 类中的方法AdminClient已被弃用,取而代之的是方法electLeaders
  • 利用带有文字值的构造函数的 Scala 代码NewTopic(String, int, short)需要显式调用toShort第二个文字。
  • 构造函数中的参数GroupAuthorizationException(String)现在用于指定异常消息。以前它指的是授权失败的组。这样做是为了与其他异常类型保持一致并避免潜在的误用。TopicAuthorizationException(String)之前用于单个未授权主题的 构造函数也进行了类似的更改。
  • 内部PartitionAssignor接口已被弃用,并替换为新的ConsumerPartitionAssignor公共 API。两个接口之间的某些方法/签名略有不同。实现自定义 PartitionAssignor 的用户应尽快迁移到新界面。
  • 现在DefaultPartitioner使用粘性分区策略。这意味着具有空键且未分配分区的特定主题的记录将被发送到同一分区,直到准备好发送批次为止。创建新批次时,会选择一个新分区。这减少了生成延迟,但可能会导致边缘情况下记录在分区之间分布不均匀。一般来说,用户不会受到影响,但在测试和其他在很短的时间内生成记录的情况下,这种差异可能会很明显。
  • 阻塞KafkaConsumer#committed方法已扩展为允许分区列表作为输入参数,而不是单个分区。它可以减少客户端和代理之间为消费者组获取已提交偏移量的请求/响应迭代。旧的重载函数已被弃用,我们建议用户更改代码以利用新方法(详细信息可以在KIP-520中找到)。
  • 我们INVALID_RECORD在生成响应中引入了一个新错误以与该CORRUPT_MESSAGE错误区分开。更具体地说,以前当一批记录作为单个请求的一部分发送到代理时,由于各种原因(魔术字节不匹配、crc 校验和错误、压缩日志的空键),一个或多个记录未能通过验证主题等),整个批次将被拒绝,并且具有相同和误导性CORRUPT_MESSAGE,并且生产者客户端的调用者将从RecordMetadata调用返回的未来对象send以及在Callback#onCompletion(RecordMetadata metadata, Exception exception) 现在,有了新的错误代码和改进的异常错误消息,生产者调用者可以更好地了解其发送记录失败的根本原因。
  • 我们正在向客户端组协议引入增量合作重新平衡,该协议允许消费者在重新平衡期间保留所有分配的分区,并在最后仅撤销那些必须迁移到另一个消费者以实现整体集群平衡的分区。他们将选择 所有消费者支持的分配者普遍支持的ConsumerCoordinator最新版本。RebalanceProtocol您可以使用新的内置CooperativeStickyAssignor或插入您自己的自定义协作分配器。为此,您必须实现该ConsumerPartitionAssignor接口并包含RebalanceProtocol.COOPERATIVE在 . 返回的列表中ConsumerPartitionAssignor#supportedProtocols。然后,您的自定义分配者可以利用ownedPartitions每个消费者的字段Subscription尽可能将分区归还给以前的所有者。请注意,当要将分区重新分配给另一个使用者时,必须将其从新分配中删除,直到它从其原始所有者手中撤销。任何必须撤销分区的消费者都将触发后续重新平衡,以允许撤销的分区安全地分配给其新所有者。有关更多信息, 请参阅 ConsumerPartitionAssignor RebalanceProtocol javadocs 。
    要从旧的(急切的)协议(在重新平衡之前总是撤销所有分区)升级到协作重新平衡,您必须遵循特定的升级路径以使所有客户端都处于同一状态ConsumerPartitionAssignor 支持合作协议。这可以通过两次滚动弹跳来完成,例如CooperativeStickyAssignor:在第一次弹跳期间,将“合作粘性”添加到每个成员支持的分配器列表中(不删除先前的分配器 - 请注意,如果之前使用默认分配器) ,您也必须明确包含该内容)。然后您可以退回和/或升级它。一旦整个组处于 2.4+ 并且所有成员在其支持的分配者之间都具有“合作粘性”,请删除其他分配者并执行第二次滚动反弹,以便最终所有成员仅支持合作协议。有关协作再平衡协议和升级路径的更多详细信息,请参阅KIP-429
  • ConsumerRebalanceListener以及新的 API进行了一些行为更改。在侦听器的三个回调中的任何一个期间抛出的异常将不再被吞掉,而是会一直重新抛出直到调用为止Consumer.poll()onPartitionsLost添加该方法是为了允许用户对消费者可能失去其分区所有权(例如错过重新平衡)并且无法提交偏移量的异常情况做出反应。默认情况下,这将简单地调用现有onPartitionsRevokedAPI 来与之前的行为保持一致。但请注意,onPartitionsLost当丢失的分区集为空时,不会调用该方法。这意味着在新消费者加入组的第一次重新平衡开始时不会调用回调。
    的语义ConsumerRebalanceListener's当遵循上述合作再平衡协议时,回调会进一步改变。此外onPartitionsLostonPartitionsRevoked 当撤销分区集为空时,也永远不会被调用。该回调通常仅在重新平衡结束时调用,并且仅在正在移动到另一个使用者的分区集上调用。然而,即使分区集为空,回调 onPartitionsAssigned也将始终被调用,作为通知用户重新平衡事件的一种方式(对于合作型和渴望型都是如此)。有关新回调语义的详细信息,请参阅ConsumerRebalanceListener javadocs
  • Scala 特征kafka.security.auth.Authorizer已被弃用并被新的 Java API 取代 org.apache.kafka.server.authorizer.Authorizer。授权者实现类 kafka.security.auth.SimpleAclAuthorizer也已被弃用并被新的实现所取代kafka.security.authorizer.AclAuthorizerAclAuthorizer使用新 API 支持的功能来改进授权日志记录,并与SimpleAclAuthorizer. 有关更多详细信息,请参阅KIP-504

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级,或者2.2.x 至 2.3.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级,并且尚未覆盖消息格式,则只需覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0、1.0、1.1、2.0、2.1、2.2)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.3 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.3 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

2.3.0 中的显着变化

  • 我们正在为 Kafka Connect 引入一种基于 增量合作再平衡的新再平衡协议。新协议不需要在 Connect 工作线程之间的重新平衡阶段停止所有任务。相反,只有需要在工作人员之间交换的任务才会停止,并在后续重新平衡中启动它们。从 2.3.0 开始,新的 Connect 协议默认启用。有关其工作原理以及如何启用急切再平衡的旧行为的更多详细信息,请查看 增量协作再平衡设计
  • 我们正在向消费者用户引入静态会员资格。此功能减少了正常应用程序升级或滚动弹跳期间不必要的重新平衡。有关如何使用它的更多详细信息,请查看静态成员资格设计
  • Kafka Streams DSL 切换其使用的存储类型。虽然此更改主要对用户是透明的,但在某些极端情况下可能需要更改代码。有关更多详细信息,请参阅Kafka Streams 升级部分
  • Kafka Streams 2.3.0 需要 0.11 消息格式或更高版本,并且不适用于较旧的消息格式。

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x、2.0.x 或 2.1.x 升级到2.2.0

如果您要从 2.1.x 之前的版本升级,请参阅下面有关用于存储消费者偏移量的架构更改的注释。一旦您将 inter.broker.protocol.version 更改为最新版本,将无法降级到 2.1 之前的版本。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x、1.0.x、1.1.x 或 2.0.x 升级并且尚未覆盖消息格式,则只需覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0、1.0、1.1、2.0)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.2 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.2 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

2.2.1 中的显着变化

  • Kafka Streams 2.2.1 需要 0.11 消息格式或更高版本,并且不适用于较旧的消息格式。

2.2.0 中的显着变化

  • 默认消费者组 ID 已从空字符串 ( "") 更改为null。使用新的默认组 ID 的消费者将无法订阅主题、获取或提交偏移量。不推荐使用空字符串作为消费者组 ID,但在未来的主要版本之前将受支持。依赖空字符串组 ID 的旧客户端现在必须显式提供它作为其消费者配置的一部分。有关详细信息,请参阅 KIP-289
  • 命令bin/kafka-topics.sh行工具现在可以直接连接到代理而--bootstrap-server不是zookeeper。旧--zookeeper 选项目前仍然可用。请阅读KIP-377了解更多信息。
  • Kafka Streams 依赖于较新版本的 RocksDB,需要 MacOS 10.13 或更高版本。

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x、1.1.x 或 2.0.0 升级到 2.1.0](https://kafka.apache.org/documentation/#upgrade_2_1_0)

请注意,2.1.x 包含对用于存储消费者偏移量的内部架构的更改。升级完成后,将无法降级到之前的版本。有关更多详细信息,请参阅下面的滚动升级说明。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x、1.0.x、1.1.x 或 2.0.x 升级并且尚未覆盖消息格式,则只需覆盖代理间协议版本。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0、1.0、1.1、2.0)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。完成此操作后,代理将运行最新版本,您可以验证集群的行为和性能是否符合预期。如果出现任何问题,此时仍然可以降级。 3. 验证集群的行为和性能后,通过编辑 inter.broker.protocol.version并将其设置为 2.1 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。一旦代理开始使用最新的协议版本,就无法再将集群降级到旧版本。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.1 并一一重新启动它们。请注意,不再维护的旧版 Scala 客户端不支持 0.11 中引入的消息格式,因此为了避免转换成本(或利用恰好一次语义),必须使用较新的 Java 客户端。

附加升级说明:

  1. 在此版本中,偏移过期语义略有变化。根据新的语义,当组订阅了相应的主题并且仍然处于活动状态(有活跃的消费者)时,组中分区的偏移量不会被删除。如果组变空,则在默认偏移保留期(或代理设置的保留期)过去后,其所有偏移将被删除(除非该组再次变为活动状态)。与不使用 Kafka 组管理的独立(简单)消费者相关的偏移量,将在自上次提交以来默认偏移量保留期(或由代理设置的保留期)过去后被删除。
  2. enable.auto.commit未提供时控制台使用者属性的默认值group.id现在设置为false。这是为了避免污染消费者协调器缓存,因为自动生成的组不太可能被其他消费者使用。
  3. 正如我们 在KIP-91中介绍的那样,生产者retries配置的默认值已更改为,它设置了发送记录和从代理接收确认之间的总时间上限。默认情况下,传送超时设置为 2 分钟。Integer.MAX_VALUE``delivery.timeout.ms
  4. 默认情况下,MirrorMaker 现在会在配置生产者时delivery.timeout.ms覆盖。如果您为了更快地失败Integer.MAX_VALUE而覆盖了 的值,则您将需要覆盖。retries``delivery.timeout.ms
  5. ListGroup作为推荐的替代方案,API 现在期望访问Describe Group用户应该能够列出的组。尽管Describe Cluster仍支持旧的访问以实现向后兼容性,但不建议将其用于此 API。
  6. KIP-336弃用了 ExtendedSerializer 和 ExtendedDeserializer 接口,并推广了 Serializer 和 Deserializer 的使用。ExtendedSerializer 和 ExtendedDeserializer 是随KIP-82引入的, 以 Java 7 兼容的方式为序列化器和反序列化器提供记录标头。现在我们整合了这些接口,因为 Java 7 支持已被删除。

2.1.0 中的显着变化

  • Jetty 已升级到 9.4.12,默认情况下不包括 TLS_RSA_* 密码,因为它们不支持前向保密,有关更多信息,请参阅 https://github.com/eclipse/jetty.project/issues/2807。
  • unclean.leader.election.enable当使用每个主题配置覆盖动态更新配置时,控制器会自动启用不干净的领导者选举。
  • 添加AdminClient了一个方法AdminClient#metrics()。现在,任何使用 的应用程序都AdminClient可以通过查看从 捕获的指标来获得更多信息和见解AdminClient。欲了解更多信息,请参阅KIP-324
  • Kafka 现在支持KIP-110的 Zstandard 压缩。您必须升级经纪商和客户端才能使用它。2.1.0 之前的消费者将无法读取使用 Zstandard 压缩的主题,因此在所有下游消费者升级之前,不应为主题启用它。有关更多详细信息,请参阅 KIP。

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x、1.0.x 或 1.1.x 升级到 2.0.0](https://kafka.apache.org/documentation/#upgrade_2_0_0)

Kafka 2.0.0 引入了有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。不过,请在升级之前查看2.0.0 中的显着变化

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x、1.0.x 或 1.1.x 升级并且尚未覆盖消息格式,则只需覆盖代理间协议格式。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0、1.0、1.1)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。 3. 整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 2.0 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 2.0 并一一重新启动它们。请注意,较旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

  1. 如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,他们将从新协议开始。
  2. 升级代理后,可以随时更改协议版本并重新启动。它不一定是紧随其后。消息格式版本也类似。
  3. 如果您在 Kafka Streams 代码中使用 Java8 方法引用,您可能需要更新代码以解决方法歧义。仅热交换 jar 文件可能不起作用。
  4. 在更新集群中的所有代理之前, 不应将 ACL 添加到前缀资源(在KIP-290中添加)。

    注意:如果集群再次降级,即使在集群完全升级之后,添加到集群的任何前缀 ACL 也将被忽略。

2.0.0 中的显着变化

  • KIP-186将默认偏移量保留时间从 1 天增加到 7 天。这使得在不频繁提交的应用程序中“丢失”偏移量的可能性较小。它还会增加活动的偏移集,因此会增加代理上的内存使用量。请注意,控制台使用者当前默认启用偏移量提交,并且可能是大量偏移量的来源,此更改现在将保留 7 天而不是 1 天。您可以通过将代理配置设置为 1440 来保留现有行为offsets.retention.minutes
  • 对 Java 7 的支持已取消,Java 8 现在是所需的最低版本。
  • 的默认值ssl.endpoint.identification.algorithm已更改为https,它执行主机名验证(否则可能会发生中间人攻击)。设置ssl.endpoint.identification.algorithm为空字符串以恢复之前的行为。
  • KAFKA-5674将最小值的下限间隔扩展max.connections.per.ip到零,因此允许对入站连接进行基于 IP 的过滤。
  • KIP-272 在指标中添加了 API 版本标签kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}。这个指标现在变成了kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}。这将影响不自动聚合的 JMX 监控工具。要获取特定请求类型的总计数,需要更新该工具以跨不同版本进行聚合。
  • KIP-225更改了指标“records.lag”以使用主题和分区的标签。名称格式为“{topic}-{partition}.records-lag”的原始版本已被删除。
  • 自 0.11.0.0 起已弃用的 Scala 消费者已被删除。自 0.10.0.0 以来,Java 消费者一直是推荐的选项。请注意,即使代理升级到 2.0.0,1.1.0(及更早版本)中的 Scala 消费者也将继续工作。
  • 自 0.10.0.0 起已弃用的 Scala 生产者已被删除。从 0.9.0.0 开始,Java 生产者一直是推荐选项。请注意,Java 生产者中默认分区器的行为与 Scala 生产者中默认分区器的行为不同。迁移的用户应考虑配置保留以前行为的自定义分区程序。请注意,即使代理升级到 2.0.0,1.1.0(及更早版本)中的 Scala 生产者也将继续工作。
  • MirrorMaker 和 ConsoleConsumer 不再支持 Scala 消费者,它们始终使用 Java 消费者。
  • ConsoleProducer 不再支持 Scala 生产者,它始终使用 Java 生产者。
  • 许多依赖 Scala 客户端的已弃用工具已被删除:ReplayLogProducer、SimpleConsumerPerformance、SimpleConsumerShell、ExportZkOffsets、ImportZkOffsets、UpdateOffsetsInZK、VerifyConsumerRebalance。
  • 已弃用的 kafka.tools.ProducerPerformance 已被删除,请使用 org.apache.kafka.tools.ProducerPerformance。
  • 添加了新的 Kafka Streams 配置参数upgrade.from,允许从旧版本滚动反弹升级。
  • KIP-284通过将默认值设置为 来更改 Kafka Streams 重新分区主题的保留时间Long.MAX_VALUE
  • 更新了ProcessorStateManagerKafka Streams 中的 API,用于将状态存储注册到处理器拓扑。欲了解更多详细信息,请阅读 Streams升级指南
  • 在早期版本中,Connect 的工作线程配置需要internal.key.converterinternal.value.converter属性。在 2.0 中,不再需要这些,并且默认为 JSON 转换器。您可以安全地从 Connect 独立和分布式工作线程配置中删除这些属性:
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false
  • KIP-266添加了一个新的消费者配置default.api.timeout.ms ,以指定用于可能阻塞的 API 的默认超时KafkaConsumer。KIP 还为此类阻塞 API 添加了重载,以支持为每个 API 指定特定的超时,而不是使用default.api.timeout.ms. 特别是,poll(Duration)添加了一个新的 API,该 API 不会阻止动态分区分配。旧的poll(long)API 已被弃用,并将在未来版本中删除。KafkaConsumer还为其他方法添加了重载,例如partitionsForlistTopicsoffsetsForTimesbeginningOffsets,endOffsets以及close接受Duration.
  • 另外,作为 KIP-266 的一部分,默认值request.timeout.ms已更改为 30 秒。考虑到重新平衡所需的最长时间,之前的值略高于 5 分钟。max.poll.interval.ms现在,我们将重新平衡中的 JoinGroup 请求视为特殊情况,并使用从请求超时派生的值 。所有其他请求类型使用以下定义的超时request.timeout.ms
  • 内部方法kafka.admin.AdminClient.deleteRecordsBefore已被删除。鼓励用户迁移到org.apache.kafka.clients.admin.AdminClient.deleteRecords.
  • AclCommand 工具--producer便利选项在给定主题上使用KIP-277更细粒度的 ACL。
  • KIP-176删除了--new-consumer所有基于消费者的工具的选项。此选项是多余的,因为如果定义了 --bootstrap-server,则会自动使用新的使用者。
  • KIP-290添加了在前缀资源上定义 ACL 的能力,例如以“foo”开头的任何主题。
  • KIP-283改进了 Kafka 代理上的消息下转换处理,这通常是一项内存密集型操作。KIP 添加了一种机制,通过一次向下转换分区数据块,操作会减少内存密集度,这有助于设定内存消耗的上限。通过这一改进,协议行为发生了变化 FetchResponse,代理可以在响应结束时发送带有无效偏移量的超大消息批次。消费者客户端必须忽略此类过大的消息,就像KafkaConsumer.

    KIP-283还添加了新的主题和代理配置message.downconversion.enablelog.message.downconversion.enable分别控制是否启用下转换。禁用后,代理不会执行任何下转换,而是UNSUPPORTED_VERSION 向客户端发送错误。

  • 在启动代理之前,可以使用 kafka-configs.sh 将动态代理配置选项存储在 ZooKeeper 中。此选项可用于避免在 server.properties 中存储明文密码,因为所有密码配置都可能以加密方式存储在 ZooKeeper 中。

  • 如果连接尝试失败,ZooKeeper 主机现在会重新解析。但是,如果您的 ZooKeeper 主机名解析为多个地址,并且其中一些地址无法访问,那么您可能需要增加连接超时 zookeeper.connection.timeout.ms

新协议版本

  • KIP-279:OffsetsForLeaderEpochResponse v1 引入了分区级leader_epoch字段。
  • KIP-219:提高因配额违规而受到限制的非集群操作请求和响应的协议版本。
  • KIP-290:提高 ACL 创建、描述和删除请求和响应的协议版本。

升级 1.1 Kafka Streams 应用程序

  • 将 Streams 应用程序从 1.1 升级到 2.0 不需要代理升级。Kafka Streams 2.0 应用程序可以连接到 2.0、1.1、1.0、0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 请注意,在 2.0 中,我们删除了 1.0 之前已弃用的公共 API;利用这些已弃用的 API 的用户需要相应地更改代码。有关更多详细信息,请参阅2.0.0 中的 Streams API 更改。

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x、0.11.0.x 或 1.0.x 升级到 1.1.x

Kafka 1.1.0 引入了有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。不过,请在升级之前查看1.1.0 中的显着变化

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0、1.0)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x 或 1.0.x 升级并且尚未覆盖消息格式,则只需覆盖代理间协议格式。 * inter.broker.protocol.version=CURRENT_KAFKA_VERSION(0.11.0 或 1.0)。 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。 3. 整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 1.1 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 1.1 并一一重新启动它们。请注意,较旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

  1. 如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,他们将从新协议开始。
  2. 升级代理后,可以随时更改协议版本并重新启动。它不一定是紧随其后。消息格式版本也类似。
  3. 如果您在 Kafka Streams 代码中使用 Java8 方法引用,您可能需要更新代码以解决方法歧义。仅热交换 jar 文件可能不起作用。

1.1.1 中的显着变化

  • 添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动弹跳升级
  • 有关此新配置的详细信息, 请参阅Kafka Streams 升级指南。

1.1.0 中的显着变化

  • Maven 中的 kafka 工件不再依赖 log4j 或 slf4j-log4j12。与 kafka-clients 工件类似,用户现在可以通过包含适当的 slf4j 模块(slf4j-log4j12、logback 等)来选择日志记录后端。发布 tarball 仍然包含 log4j 和 slf4j-log4j12。
  • KIP-225更改了指标“records.lag”以使用主题和分区的标签。名称格式为“{topic}-{partition}.records-lag”的原始版本已弃用,并将在 2.0.0 中删除。
  • Kafka Streams 对于代理通信错误更加稳健。Kafka Streams 不会因致命异常而停止 Kafka Streams 客户端,而是尝试自我修复并重新连接到集群。使用新版本,AdminClient您可以更好地控制 Kafka Streams 重试的频率,并可以配置 细粒度的超时(而不是像旧版本中那样硬编码重试)。
  • Kafka Streams 重新平衡时间进一步减少,使 Kafka Streams 的响应速度更快。
  • Kafka Connect 现在支持接收器和源连接器中的消息标头,并通过简单的消息转换来操作它们。必须更改连接器才能显式使用它们。引入了一个新功能HeaderConverter来控制标头的序列化(反序列化)方式,并且默认情况下使用新的“SimpleHeaderConverter”来使用值的字符串表示形式。
  • 如果由于解码器等任何其他选项而显式或隐式启用打印数据日志,kafka.tools.DumpLogSegments 现在会自动设置深度迭代选项。

新协议版本

  • KIP-226引入了DescribeConfigs 请求/响应 v1。
  • KIP-227引入了 Fetch Request/Response v7。

升级 1.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 1.0 升级到 1.1 不需要代理升级。Kafka Streams 1.1 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 有关更多详细信息,请参阅1.1.0 中的 Streams API 更改。

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x、0.10.2.x 或 0.11.0.x 升级到 1.0.0

Kafka 1.0.0 引入了有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。不过,请在升级之前查看1.0.0 中的显着变化

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的消息格式版本。如果您之前已覆盖消息格式版本,则应保留其当前值。或者,如果您要从 0.11.0.x 之前的版本升级,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1、0.10.2、0.11.0)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)

    如果您从 0.11.0.x 升级并且未覆盖消息格式,则必须将消息格式版本和代理间协议版本设置为 0.11.0。 * inter.broker.protocol.version=0.11.0 * log.message.format.version=0.11.0 2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。 3. 整个集群升级后,通过编辑inter.broker.protocol.version并将其设置为 1.0 来提升协议版本。 4. 一一重启broker,新协议版本即可生效。 5. 如果您已按照上述说明覆盖消息格式版本,则需要再进行一次滚动重启才能将其升级到最新版本。一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,将每个代理上的 log.message.format.version 更改为 1.0 并一一重新启动它们。如果您从 0.11.0 升级并且 log.message.format.version 设置为 0.11.0,则可以更新配置并跳过滚动重启。请注意,较旧的 Scala 消费者不支持 0.11 中引入的新消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用较新的 Java 消费者。

附加升级说明:

  1. 如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,他们将从新协议开始。
  2. 升级代理后,可以随时更改协议版本并重新启动。它不一定是紧随其后。消息格式版本也类似。

1.0.2 中的显着变化

  • 添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动弹跳升级
  • 有关此新配置的详细信息, 请参阅Kafka Streams 升级指南。

1.0.1 中的显着变化

  • 恢复了 AdminClient 的选项类(例如 CreateTopicsOptions、DeleteTopicsOptions 等)与 0.11.0.x 的二进制兼容性。二进制(但不是源代码)兼容性在 1.0.0 中被无意中破坏。

1.0.0 中的显着变化

  • 现在默认启用主题删除,因为该功能现已稳定。希望保留以前行为的用户应将代理配置设置delete.topic.enablefalse。请记住,主题删除会删除数据,并且该操作是不可逆的(即不存在“取消删除”操作)
  • 对于支持时间戳搜索的主题,如果找不到分区的偏移量,则该分区现在包含在搜索结果中,且偏移量值为空。以前,分区不包含在地图中。进行此更改是为了使搜索行为与不支持时间戳搜索的主题的情况一致。
  • 如果inter.broker.protocol.version是 1.0 或更高版本,即使存在脱机日志目录,代理现在也将保持在线状态以在实时日志目录上提供副本。由于硬件故障引起的IOException,日志目录可能会离线。用户需要监控每个broker的指标offlineLogDirectoryCount来检查是否存在离线日志目录。
  • 添加了 KafkaStorageException,这是一个可重试的异常。如果客户端的FetchRequest或ProducerRequest的版本不支持KafkaStorageException,则响应中的KafkaStorageException将转换为NotLeaderForPartitionException。
  • 默认 JVM 设置中的 -XX:+DisableExplicitGC 已替换为 -XX:+ExplicitGCInvokesConcurrent。在某些情况下,这有助于避免在通过直接缓冲区分配本机内存期间出现内存不足异常。
  • handleError已从包中以下已弃用的类中删除了重写的方法实现kafka.apiFetchRequestGroupCoordinatorRequestOffsetCommitRequest、 OffsetFetchRequestOffsetRequestProducerRequestTopicMetadataRequest。这仅适用于代理,但已不再使用且实现也未得到维护。为了二进制兼容性,保留了存根实现。
  • Java 客户端和工具现在接受任何字符串作为客户端 ID。
  • 已弃用的工具kafka-consumer-offset-checker.sh已被删除。用于kafka-consumer-groups.sh获取消费者组详细信息。
  • SimpleAclAuthorizer 现在默认将访问拒绝记录到授权者日志中。
  • 身份验证失败现在作为 的子类之一报告给客户端AuthenticationException。如果客户端连接身份验证失败,则不会执行重试。
  • 自定义SaslServer实现可能会SaslAuthenticationException抛出错误消息以返回给客户端,指示身份验证失败的原因。实施者应注意不要在异常消息中包含任何安全关键信息,这些信息不应泄露给未经身份验证的客户端。
  • 向 JMX 注册以提供版本和提交 ID 的mbeanapp-info将被弃用,并替换为提供这些属性的指标。
  • Kafka 指标现在可能包含非数字值。org.apache.kafka.common.Metric#value()已被弃用,并将0.0在这种情况下返回,以最大限度地减少破坏读取每个客户端指标值的用户的可能性(通过实现MetricsReporter或调用该metrics()方法)。 org.apache.kafka.common.Metric#metricValue()可用于检索数字和非数字度量值。
  • 现在,每个 Kafka 速率指标都有一个相应的累积计数指标,并带有后缀-total 以简化下游处理。例如,records-consumed-rate有一个名为 的相应指标records-consumed-total
  • kafka_mx4jenable仅当系统属性设置为时,才会启用 Mx4j true。由于逻辑反转错误,它以前默认启用,如果kafka_mx4jenable设置为则禁用true
  • 客户端 jar 中的包org.apache.kafka.common.security.auth已公开并添加到 javadocs 中。以前位于此包中的内部类已移至其他地方。
  • 当使用授权者并且用户没有主题所需的权限时,代理将向请求返回 TOPIC_AUTHORIZATION_FAILED 错误,无论代理上是否存在主题。如果用户具有所需的权限并且主题不存在,则将返回 UNKNOWN_TOPIC_OR_PARTITION 错误代码。
  • config/consumer.properties 文件已更新以使用新的消费者配置属性。

新协议版本

  • KIP-112:LeaderAndIsrRequest v1 引入了分区级is_new字段。
  • KIP-112:UpdateMetadataRequest v4 引入了分区级offline_replicas字段。
  • KIP-112:MetadataResponse v5 引入了分区级offline_replicas字段。
  • KIP-112:ProduceResponse v4 引入了 KafkaStorageException 的错误代码。
  • KIP-112:FetchResponse v6 引入了 KafkaStorageException 的错误代码。
  • KIP-152:添加了 SaslAuthenticate 请求以启用身份验证失败报告。如果 SaslHandshake 请求版本大于 0,则将使用此请求。

升级 0.11.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.11.0 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。但是,Kafka Streams 1.0 需要 0.10 消息格式或更新版本,并且不适用于较旧的消息格式。
  • 如果您正在监视流指标,则需要对报告和监视代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。
  • 有一些公共 API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder已被新 API 弃用。我们建议您在升级时进行相应的代码更改,这些更改应该非常小,因为新的 API 看起来非常相似。
  • 有关更多详细信息,请参阅1.0.0 中的 Streams API 更改。

升级 0.10.2 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.2 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 如果您正在监视流指标,则需要对报告和监视代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。
  • 有一些公共 API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder已被新 API 弃用。我们建议您在升级时进行相应的代码更改,这些更改应该非常小,因为新的 API 看起来非常相似。
  • 如果您在配置中指定了customized key.serde,value.serdetimestamp.extractor,建议使用它们替换的配置参数,因为这些配置已被弃用。
  • 有关更多详细信息,请参阅0.11.0 中的 Streams API 更改。

升级 0.10.1 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.1 升级到 1.0 不需要代理升级。Kafka Streams 1.0 应用程序可以连接到 1.0、0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 您需要重新编译您的代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 如果您正在监视流指标,则需要对报告和监视代码中的指标名称进行一些更改,因为指标传感器层次结构已更改。
  • 有一些公共 API,包括ProcessorContext#schedule()Processor#punctuate()KStreamBuilderTopologyBuilder已被新 API 弃用。我们建议您在升级时进行相应的代码更改,这些更改应该非常小,因为新的 API 看起来非常相似。
  • 如果您在配置中指定了customized key.serde,value.serdetimestamp.extractor,建议使用它们替换的配置参数,因为这些配置已被弃用。
  • 如果您使用自定义(即用户实现的)时间戳提取器,您将需要更新此代码,因为接口TimestampExtractor已更改。
  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。
  • 有关更多详细信息,请参阅1.0.0 中的 Streams API 更改、 0.11.0 中的 Streams API 更改和 0.10.2 中的 Streams API 更改。

升级 0.10.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.0 升级到 1.0 确实需要代理升级,因为 Kafka Streams 1.0 应用程序只能连接到 0.1、0.11.0、0.10.2 或 0.10.1 代理。
  • 有一些 API 更改不向后兼容(请参阅1.0.0 中的 Streams API 更改、 0.11.0 中的 Streams API 更改、 0.10.2 中的 Streams API 更改和 0.10.1 中的 Streams API 更改了解更多详细信息)。因此,您需要更新并重新编译代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 从 0.10.0.x 升级到 1.0.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。
    • 准备应用程序实例以进行滚动反弹,并确保将配置upgrade.from设置"0.10.0"为新版本 0.11.0.3
    • 将应用程序的每个实例退回一次
    • 为新部署的 1.0.2 应用程序实例做好第二轮滚动跳出的准备;确保删除 config 的值upgrade.from
    • 再次弹跳应用程序的每个实例以完成升级
  • 从0.10.0.x升级到1.0.0或1.0.1需要离线升级(不支持滚动弹跳升级)
    • 停止所有旧的(0.10.0.x)应用程序实例
    • 更新您的代码并将旧代码和 jar 文件替换为新代码和新 jar 文件
    • 重新启动所有新的(1.0.0 或 1.0.1)应用程序实例

从 0.8.x、0.9.x、0.10.0.x、0.10.1.x 或 0.10.2.x 升级到 0.11.0.0

Kafka 0.11.0.0 引入了新的消息格式版本以及有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级之前查看0.11.0.0 中的显着变化

从版本 0.10.2 开始,Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本 0.11.0 客户端可以与版本 0.10.0 或更高版本的代理进行通信。但是,如果您的代理低于 0.10.0,则必须在升级客户端之前升级 Kafka 集群中的所有代理。版本 0.11.0 代理支持 0.8.x 及更高版本的客户端。

对于滚动升级:

  1. 更新所有代理上的 server.properties 并添加以下属性。CURRENT_KAFKA_VERSION 是指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION 指当前使用的当前消息格式版本。如果您之前没有覆盖消息格式,则应将 CURRENT_MESSAGE_FORMAT_VERSION 设置为与 CURRENT_KAFKA_VERSION 匹配。
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0、0.10.1或0.10.2)。
    • log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION(有关此配置的详细信息,请参阅升级后的潜在性能影响。)
  2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。
  3. 整个集群升级后,通过编辑并将inter.broker.protocol.version其设置为 0.11.0 来提升协议版本,但暂时不要更改log.message.format.version
  4. 一一重启broker,新协议版本即可生效。
  5. 一旦所有(或大多数)消费者升级到 0.11.0 或更高版本,然后在每个代理上将 log.message.format.version 更改为 0.11.0 并一一重新启动它们。请注意,旧的 Scala 消费者不支持新的消息格式,因此为了避免下转换的性能成本(或利用恰好一次语义),必须使用新的 Java 消费者。

附加升级说明:

  1. 如果您愿意接受停机,您可以简单地关闭所有代理,更新代码并重新启动它们。默认情况下,他们将从新协议开始。
  2. 升级代理后,可以随时更改协议版本并重新启动。它不一定是紧随其后。消息格式版本也类似。
  3. bin/kafka-topics.sh在更新全局设置之前,还可以使用主题管理工具 () 在各个主题上启用 0.11.0 消息格式log.message.format.version
  4. 如果您要从 0.10.0 之前的版本升级,则无需在切换到 0.11.0 之前先将消息格式更新到 0.10.0。

升级 0.10.2 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.2 升级到 0.11.0 不需要代理升级。Kafka Streams 0.11.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 如果您在配置中指定了customized key.serde,value.serdetimestamp.extractor,建议使用它们替换的配置参数,因为这些配置已被弃用。
  • 有关更多详细信息,请参阅0.11.0 中的 Streams API 更改。

升级 0.10.1 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.1 升级到 0.11.0 不需要代理升级。Kafka Streams 0.11.0 应用程序可以连接到 0.11.0、0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 您需要重新编译您的代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 如果您在配置中指定了customized key.serde,value.serdetimestamp.extractor,建议使用它们替换的配置参数,因为这些配置已被弃用。
  • 如果您使用自定义(即用户实现的)时间戳提取器,您将需要更新此代码,因为接口TimestampExtractor已更改。
  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。
  • 有关更多详细信息,请参阅0.11.0 中的 Streams API 更改和 0.10.2 中的 Streams API 更改。

升级 0.10.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.0 升级到 0.11.0 确实需要代理升级,因为 Kafka Streams 0.11.0 应用程序只能连接到 0.11.0、0.10.2 或 0.10.1 代理。
  • 有一些 API 更改不向后兼容(请参阅0.11.0 中的 Streams API 更改、 0.10.2 中的 Streams API 更改和 0.10.1 中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 从 0.10.0.x 升级到 0.11.0.3 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。
    • 准备应用程序实例以进行滚动反弹,并确保将配置upgrade.from设置"0.10.0"为新版本 0.11.0.3
    • 将应用程序的每个实例退回一次
    • 准备新部署的 0.11.0.3 应用程序实例以进行第二轮滚动反弹;确保删除 config 的值upgrade.from
    • 再次弹跳应用程序的每个实例以完成升级
  • 从0.10.0.x升级到0.11.0.0、0.11.0.1或0.11.0.2需要离线升级(不支持滚动弹跳升级)
    • 停止所有旧的(0.10.0.x)应用程序实例
    • 更新您的代码并将旧代码和 jar 文件替换为新代码和新 jar 文件
    • 重新启动所有新的(0.11.0.0、0.11.0.1 或 0.11.0.2)应用程序实例

0.11.0.3 中的显着变化

  • 添加了新的 Kafka Streams 配置参数upgrade.from,允许从版本 0.10.0.x 滚动弹跳升级
  • 有关此新配置的详细信息, 请参阅Kafka Streams 升级指南。

0.11.0.0 中的显着变化

  • 现在默认禁用不干净的领导者选举。新的默认设置更注重持久性而不是可用性。希望保留以前行为的用户应将代理配置设置unclean.leader.election.enabletrue
  • 生产者配置block.on.buffer.fullmetadata.fetch.timeout.mstimeout.ms被删除。它们最初在 Kafka 0.9.0.0 中被弃用。
  • offsets.topic.replication.factor现在,在自动创建主题时强制执行代理配置。内部自动主题创建将失败并出现 GROUP_COORDINATOR_NOT_AVAILABLE 错误,直到集群大小满足此复制因子要求。
  • 使用 snappy 压缩数据时,生产者和代理将使用压缩方案的默认块大小 (2 x 32 KB) 而不是 1 KB,以提高压缩率。有报告称,使用较小块大小压缩的数据比使用较大块大小压缩时大 50%。对于快速的情况,具有 5000 个分区的生产者将需要额外的 315 MB JVM 堆。
  • 同样,当使用 gzip 压缩数据时,生产者和代理将使用 8 KB 而不是 1 KB 作为缓冲区大小。gzip 的默认值过低(512 字节)。
  • 代理配置max.message.bytes现在适用于一批消息的总大小。以前,该设置应用于批量压缩消息,或单独应用于非压缩消息。消息批可能仅包含单个消息,因此在大多数情况下,对单个消息的大小的限制仅通过批处理格式的开销来减少。然而,消息格式转换有一些微妙的含义(更多详细信息请参见下文)。另请注意,虽然以前代理将确保每个提取请求中至少返回一条消息(无论总提取大小和分区级别提取大小如何),但现在相同的行为适用于一批消息。
  • 默认启用 GC 日志轮换,详细信息请参阅 KAFKA-3754。
  • RecordMetadata、MetricName 和 Cluster 类已弃用的构造函数已被删除。
  • 通过提供用户标头读写访问的新标头接口添加了用户标头支持。
  • Headers headers()ProducerRecord 和 ConsumerRecord 通过方法调用公开新的 Headers API 。
  • 引入了 ExtendedSerializer 和 ExtendedDeserializer 接口来支持标头的序列化和反序列化。如果配置的序列化器和反序列化器不是上述类,则标头将被忽略。
  • group.initial.rebalance.delay.ms引入了新配置。GroupCoordinator此配置指定延迟初始消费者重新平衡的时间(以毫秒为单位) 。当新成员加入该组时,重新平衡将进一步延迟 的值group.initial.rebalance.delay.ms,最多为max.poll.interval.ms。默认值为 3 秒。在开发和测试期间,可能需要将其设置为 0,以免延迟测试执行时间。
  • org.apache.kafka.common.Cluster#partitionsForTopicpartitionsForNode并且availablePartitionsForTopic方法将返回一个空列表null(这被认为是一种不好的做法),以防所需主题的元数据不存在。
  • Streams API 配置参数timestamp.extractorkey.serdevalue.serde已弃用并分别替换为default.timestamp.extractordefault.key.serdedefault.value.serde
  • 对于 Java 使用者 API 中的偏移提交失败,当 的实例传递给提交回调commitAsync时,我们不再公开根本原因。 有关更多详细信息, RetriableCommitFailedException请参阅 KAFKA-5052 。

新协议版本

  • KIP-107:FetchRequest v5 引入了分区级log_start_offset字段。
  • KIP-107:FetchResponse v5 引入了分区级log_start_offset字段。
  • KIP-82header :ProduceRequest v3在消息协议中引入了一个数组,包含keyfield和valuefield。
  • KIP-82header :FetchResponse v5在消息协议中引入了一个数组,包含key字段和value字段。

关于 Exactly Once 语义的注释

Kafka 0.11.0 支持生产者中的幂等和事务功能。幂等传递可确保在单个生产者的生命周期内将消息恰好传递到特定主题分区一次。事务传递允许生产者将数据发送到多个分区,以便所有消息都成功传递,或者没有消息成功传递。这些功能共同实现了 Kafka 中的“恰好一次语义”。用户指南中提供了有关这些功能的更多详细信息,但下面我们添加了一些有关在升级的集群中启用这些功能的具体说明。请注意,启用 EoS 不是必需的,并且如果不使用,也不会影响代理的行为。

  1. 只有新的 Java 生产者和消费者支持 Exactly Once 语义。
  2. 这些功能主要取决于0.11.0 消息格式。尝试在旧格式上使用它们将导致版本不受支持的错误。
  3. 事务状态存储在新的内部主题中__transaction_state。直到第一次尝试使用事务请求 API 时才会创建此主题。与消费者偏移量主题类似,有几个设置可以控制主题的配置。例如,transaction.state.log.min.isr控制该主题的最小 ISR。有关选项的完整列表,请参阅用户指南中的配置部分。
  4. 对于安全集群,事务 API 需要新的 ACL,可以使用bin/kafka-acls.sh. 工具。
  5. Kafka 中的 EoS 引入了新的请求 API 并修改了多个现有 API。 有关完整详细信息,请参阅 KIP-98

关于 0.11.0 中新消息格式的说明

0.11.0 消息格式包括几个主要增强功能,以便支持生产者更好的传递语义(请参阅KIP-98)和改进的复制容错能力(请参阅KIP-101)。尽管新格式包含更多信息以使这些改进成为可能,但我们已经使批处理格式更加高效。只要每批次的消息数量超过 2,您就可以预期较低的总体开销。然而,对于较小的批次,可能会对性能产生较小的影响。请参阅此处,了解我们对新消息格式的初始性能分析的结果。您还可以在KIP-98提案中找到有关消息格式的更多详细信息 。

新消息格式的显着差异之一是,即使未压缩的消息也会作为单个批次存储在一起。这对代理配置有一些影响max.message.bytes,它限制了单个批次的大小。首先,如果较旧的客户端使用旧格式向主题分区生成消息,并且这些消息各自小于 max.message.bytes,则在上转换过程中将它们合并为单个批次后,代理可能仍会拒绝它们。通常,当单个消息的总大小大于 时,就会发生这种情况max.message.bytes。对于旧消费者读取从新格式向下转换的消息,也会产生类似的效果:如果提取大小未设置为至少与 max.message.bytes,即使单个未压缩消息小于配置的获取大小,消费者也可能无法取得进展。此行为不会影响 0.10.1.0 及更高版本的 Java 客户端,因为它使用更新的获取协议,该协议确保即使超过获取大小也可以返回至少一条消息。为了解决这些问题,您应该确保 1) 生产者的批量大小设置不大于max.message.bytes,2) 消费者的获取大小设置至少与 一样大max.message.bytes

大多数关于升级到 0.10.0 消息格式对性能影响的讨论 仍然与 0.11.0 升级有关。这主要影响未使用 TLS 保护的集群,因为在这种情况下“零复制”传输已经不可能。为了避免降频转换的成本,您应该确保消费者应用程序升级到最新的 0.11.0 客户端。值得注意的是,由于旧的消费者在 0.11.0.0 中已被弃用,因此它不支持新的消息格式。您必须升级才能使用新的消费者才能使用新的消息格式,而无需进行下转换的成本。请注意,0.11.0 消费者支持向后兼容 0.10.0 代理及向上版本,因此可以在代理之前先升级客户端。

从 0.8.x、0.9.x、0.10.0.x 或 0.10.1.x 升级到 0.10.2.0

0.10.2.0 进行了有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。不过,请在升级之前查看0.10.2.0 中的显着变化

从版本 0.10.2 开始,Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本 0.10.2 客户端可以与版本 0.10.0 或更高版本的代理进行通信。但是,如果您的代理低于 0.10.0,则必须在升级客户端之前升级 Kafka 集群中的所有代理。版本 0.10.2 代理支持 0.8.x 及更高版本的客户端。

对于滚动升级:

  1. 更新所有代理上的 server.properties 文件并添加以下属性:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2、0.9.0、0.10.0或0.10.1)。
    • log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)
  2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。
  3. 整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.2 来提升协议版本。
  4. 如果您以前的消息格式是 0.10.0,请将 log.message.format.version 更改为 0.10.2(这是无操作,因为 0.10.0、0.10.1 和 0.10.2 的消息格式相同)。如果您之前的消息格式版本低于 0.10.0,请不要更改 log.message.format.version - 只有在所有使用者升级到 0.10.0.0 或更高版本后,此参数才应更改。
  5. 一一重启broker,新协议版本即可生效。
  6. 如果此时 log.message.format.version 仍然低于 0.10.0,请等到所有消费者都升级到 0.10.0 或更高版本,然后在每个代理上将 log.message.format.version 更改为 0.10.2 并将它们一一重新启动。

注意:如果您愿意接受停机,您只需关闭所有代理,更新代码并启动所有代理即可。默认情况下,他们将从新协议开始。

注意:在代理升级后,可以随时更改协议版本并重新启动。它不一定是紧随其后。

升级 0.10.1 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.1 升级到 0.10.2 不需要代理升级。Kafka Streams 0.10.2 应用程序可以连接到 0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • 您需要重新编译您的代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 如果您使用自定义(即用户实现的)时间戳提取器,您将需要更新此代码,因为接口TimestampExtractor已更改。
  • 如果您注册自定义指标,则需要更新此代码,因为StreamsMetric界面已更改。
  • 有关更多详细信息,请参阅0.10.2 中的 Streams API 更改。

升级 0.10.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.0 升级到 0.10.2 确实需要代理升级,因为 Kafka Streams 0.10.2 应用程序只能连接到 0.10.2 或 0.10.1 代理。
  • 有一些 API 更改不向后兼容(请参阅0.10.2 中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 从 0.10.0.x 升级到 0.10.2.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。
    • 准备应用程序实例以进行滚动反弹,并确保将配置upgrade.from设置"0.10.0"为新版本 0.10.2.2
    • 将应用程序的每个实例退回一次
    • 为新部署的 0.10.2.2 应用程序实例做好第二轮滚动反弹的准备;确保删除 config 的值upgrade.from
    • 再次弹跳应用程序的每个实例以完成升级
  • 从0.10.0.x升级到0.10.2.0或0.10.2.1需要离线升级(不支持滚动弹跳升级)
    • 停止所有旧的(0.10.0.x)应用程序实例
    • 更新您的代码并将旧代码和 jar 文件替换为新代码和新 jar 文件
    • 重新启动所有新的(0.10.2.0 或 0.10.2.1)应用程序实例

0.10.2.2 中的显着变化

  • 添加了新的配置参数upgrade.from,允许从版本 0.10.0.x 滚动弹跳升级

0.10.2.1 中的显着变化

  • StreamsConfig 类的两个配置的默认值已更改,以提高 Kafka Streams 应用程序的弹性。内部 Kafka Streams 生产者retries默认值从 0 更改为 10。内部 Kafka Streams 消费者max.poll.interval.ms 默认值从 300000 更改为Integer.MAX_VALUE

0.10.2.0 中的显着变化

  • Java 客户端(生产者和消费者)已经获得了与旧代理进行通信的能力。版本 0.10.2 客户端可以与版本 0.10.0 或更高版本的代理进行通信。请注意,使用较旧的代理时,某些功能不可用或受到限制。
  • InterruptException如果调用线程被中断,Java 使用者上的多个方法现在可能会抛出异常。请参阅KafkaConsumerJavadoc 以获取有关此更改的更深入的说明。
  • Java 消费者现在正常关闭。默认情况下,使用者最多等待 30 秒才能完成待处理的请求。添加了一个带有超时的新关闭 APIKafkaConsumer以控制最大等待时间。
  • 可以通过 --whitelist 选项将多个以逗号分隔的正则表达式传递给新 Java 使用者的 MirrorMaker。这使得使用旧的 Scala 消费者时的行为与 MirrorMaker 一致。
  • 将 Streams 应用程序从 0.10.1 升级到 0.10.2 不需要代理升级。Kafka Streams 0.10.2 应用程序可以连接到 0.10.2 和 0.10.1 代理(但无法连接到 0.10.0 代理)。
  • Zookeeper 依赖项已从 Streams API 中删除。Streams API现在使用Kafka协议来管理内部主题,而不是直接修改Zookeeper。这消除了直接访问 Zookeeper 的权限的需要,并且不应再在 Streams 应用程序中设置“StreamsConfig.ZOOKEEPER_CONFIG”。如果 Kafka 集群受到保护,Streams 应用程序必须具有创建新主题所需的安全权限。
  • StreamsConfig 类中添加了几个新字段,包括“security.protocol”、“connections.max.idle.ms”、“retry.backoff.ms”、“reconnect.backoff.ms”和“request.timeout.ms”。用户应注意默认值并根据需要进行设置。更多详细信息请参考3.5 Kafka Streams配置

新协议版本

  • KIP-88topics :如果数组设置为 ,OffsetFetchRequest v2 支持检索所有主题的偏移量null
  • KIP-88:OffsetFetchResponse v2 引入了一个顶级error_code字段。
  • KIP-103:UpdateMetadataRequest v3listener_name向数组元素引入了一个字段end_points
  • KIP-108:CreateTopicsRequest v1 引入了一个validate_only字段。
  • KIP-108:CreateTopicsResponse v1error_message向数组元素引入了一个字段topic_errors

从 0.8.x、0.9.x 或 0.10.0.X 升级到 0.10.1.0

0.10.1.0 进行了有线协议更改。通过遵循下面推荐的滚动升级计划,您可以保证升级期间不会出现停机。但是,请在升级之前注意0.10.1.0 中的潜在重大更改
注意:由于引入了新协议,因此在升级客户端之前升级 Kafka 集群非常重要(即 0.10.1.x 客户端仅支持 0.10.1.x 或更高版本的代理,而 0.10.1.x 代理也支持旧客户端) 。

对于滚动升级:

  1. 更新所有代理上的 server.properties 文件并添加以下属性:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2.0、0.9.0.0或0.10.0.0)。
    • log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)
  2. 一次升级一个代理:关闭代理,更新代码,然后重新启动。
  3. 整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.1.0 来提升协议版本。
  4. 如果您以前的消息格式是 0.10.0,请将 log.message.format.version 更改为 0.10.1(这是无操作,因为 0.10.0 和 0.10.1 的消息格式相同)。如果您之前的消息格式版本低于 0.10.0,请不要更改 log.message.format.version - 只有在所有使用者升级到 0.10.0.0 或更高版本后,此参数才应更改。
  5. 一一重启broker,新协议版本即可生效。
  6. 如果此时 log.message.format.version 仍然低于 0.10.0,请等到所有消费者都升级到 0.10.0 或更高版本,然后在每个代理上将 log.message.format.version 更改为 0.10.1 并将它们一一重新启动。

注意:如果您愿意接受停机,您只需关闭所有代理,更新代码并启动所有代理即可。默认情况下,他们将从新协议开始。

注意:在代理升级后,可以随时更改协议版本并重新启动。它不一定是紧随其后。

0.10.1.2 中的显着变化

  • 添加了新的配置参数upgrade.from,允许从版本 0.10.0.x 滚动弹跳升级

0.10.1.0 中潜在的重大变化

  • 日志保留时间不再基于日志段的最后修改时间。相反,它将基于日志段中消息的最大时间戳。
  • 日志滚动时间不再取决于日志段创建时间。相反,它现在基于消息中的时间戳。进一步来说。如果段中第一条消息的时间戳为T,则当新消息的时间戳大于或等于T + log.roll.ms时,日志将被转出
  • 由于每个段增加了时间索引文件,0.10.0 的打开文件处理程序将增加约 33%。
  • 时间索引和偏移索引共享相同的索引大小配置。由于每次索引条目的大小是偏移索引条目大小的 1.5 倍。用户可能需要增加 log.index.size.max.bytes 以避免潜在的频繁日志滚动。
  • 由于索引文件数量的增加,在一些日志段数量较大(例如>15K)的broker上,broker启动期间的日志加载过程可能会更长。根据我们的实验,将 num.recovery.threads.per.data.dir 设置为 1 可能会减少日志加载时间。

升级 0.10.0 Kafka Streams 应用程序

  • 将 Streams 应用程序从 0.10.0 升级到 0.10.1 确实需要代理升级,因为 Kafka Streams 0.10.1 应用程序只能连接到 0.10.1 代理。
  • 有一些 API 更改不向后兼容(请参阅0.10.1 中的 Streams API 更改以了解更多详细信息)。因此,您需要更新并重新编译代码。仅交换 Kafka Streams 库 jar 文件是行不通的,并且会破坏您的应用程序。
  • 从 0.10.0.x 升级到 0.10.1.2 需要两次滚动反弹,并upgrade.from="0.10.0"为第一个升级阶段设置配置(参见KIP-268)。作为替代方案,也可以进行离线升级。
    • 准备应用程序实例以进行滚动反弹,并确保将配置upgrade.from设置"0.10.0"为新版本 0.10.1.2
    • 将应用程序的每个实例退回一次
    • 准备新部署的 0.10.1.2 应用程序实例以进行第二轮滚动反弹;确保删除 config 的值upgrade.from
    • 再次弹跳应用程序的每个实例以完成升级
  • 从0.10.0.x升级到0.10.1.0或0.10.1.1需要离线升级(不支持滚动弹跳升级)
    • 停止所有旧的(0.10.0.x)应用程序实例
    • 更新您的代码并将旧代码和 jar 文件替换为新代码和新 jar 文件
    • 重新启动所有新的(0.10.1.0 或 0.10.1.1)应用程序实例

0.10.1.0 中的显着变化

  • 新的 Java 消费者不再处于测试阶段,我们建议将其用于所有新开发。旧的 Scala 使用者仍然受支持,但它们将在下一个版本中弃用,并在未来的主要版本中删除。
  • 在新消费者中使用 MirrorMaker 和 Console Consumer 等工具时不再需要--new-consumer/开关;--new.consumer只需要通过 Kafka 代理来连接即可,而不是通过 ZooKeeper 整体进行连接。此外,控制台消费者与旧消费者的使用已被弃用,并将在未来的主要版本中删除。
  • Kafka 集群现在可以通过集群 ID 来唯一标识。当broker升级到0.10.1.0时,它将自动生成。集群 ID 可通过 kafka.server:type=KafkaServer,name=ClusterId 指标获得,它是元数据响应的一部分。序列化器、客户端拦截器和指标报告器可以通过实现 ClusterResourceListener 接口来接收集群 ID。
  • BrokerState“RunningAsController”(值 4)已被删除。由于存在错误,代理在退出该状态之前只会短暂处于此状态,因此删除的影响应该很小。检测给定代理是否是控制器的推荐方法是通过 kafka.controller:type=KafkaController,name=ActiveControllerCount 指标。
  • 新的 Java Consumer 现在允许用户按分区上的时间戳搜索偏移量。
  • 新的 Java Consumer 现在支持后台线程的心跳。有一个新配置 max.poll.interval.ms可以控制消费者主动离开组之前轮询调用之间的最长时间(默认情况下为 5 分钟)。配置值 request.timeout.ms(默认为 30 秒)必须始终小于max.poll.interval.ms(默认为 5 分钟),因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间。最后,默认值session.timeout.ms调整为10秒,默认值max.poll.records改为500。
  • 当使用授权者并且用户没有主题的描述授权时,代理将不再向请求返回 TOPIC_AUTHORIZATION_FAILED 错误,因为这会泄漏主题名称相反,将返回 UNKNOWN_TOPIC_OR_PARTITION 错误代码。在使用生产者和消费者时,这可能会导致意外超时或延迟,因为 Kafka 客户端通常会针对未知主题错误自动重试。如果您怀疑可能发生这种情况,您应该查阅客户端日志。
  • 默认情况下,获取响应有大小限制(消费者为 50 MB,复制为 10 MB)。现有的每个分区限制也适用(使用者和复制为 1 MB)。请注意,这些限制都不是绝对最大值,如下一点所述。
  • 如果发现大于响应/分区大小限制的消息,使用者和副本可以取得进展。更具体地说,如果提取的第一个非空分区中的第一条消息大于其中一个或两个限制,则仍将返回该消息。
  • 添加了重载的构造函数kafka.api.FetchRequest,并kafka.javaapi.FetchRequest允许调用者指定分区的顺序(因为顺序在 v3 中很重要)。以前存在的构造函数已被弃用,并且在发送请求之前会对分区进行洗牌以避免饥饿问题。

新协议版本

  • ListOffsetRequest v1支持基于时间戳的精确偏移搜索。
  • MetadataResponse v2 引入了一个新字段:“cluster_id”。
  • FetchRequest v3 支持限制响应大小(除了现有的每个分区限制),如果需要取得进展,它会返回大于限制的消息,并且请求中的分区顺序现在很重要。
  • JoinGroup v1 引入了一个新字段:“rebalance_timeout”。

从 0.8.x 或 0.9.x 升级到 0.10.0.0

0.10.0.0 具有潜在的重大更改(请在升级前查看),并且 升级后可能会影响性能。通过遵循下面推荐的滚动升级计划,您可以保证升级期间和升级后不会出现停机且不会影响性能。
注意:由于引入了新协议,因此在升级客户端之前升级 Kafka 集群非常重要。

版本 0.9.0.0 的客户端注意事项:由于 0.9.0.0 中引入的错误,依赖 ZooKeeper(旧的 Scala 高级 Consumer 和 MirrorMaker,如果与旧的 Consumer 一起使用)的客户端将无法与 0.10.0.x 代理一起使用。因此,在代理升级到0.10.0.x之前, 0.9.0.0客户端应升级到0.9.0.1。对于 0.8.X 或 0.9.0.1 客户端,无需执行此步骤。

对于滚动升级:

  1. 更新所有代理上的 server.properties 文件并添加以下属性:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
    • log.message.format.version=CURRENT_KAFKA_VERSION(有关此配置的详细信息, 请参阅升级后的潜在性能影响。)
  2. 升级broker。只需将代理关闭、更新代码并重新启动即可一次完成此操作。
  3. 整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.0.0 来提升协议版本。注意:您还不应该触摸 log.message.format.version - 只有在所有使用者都升级到 0.10.0.0 后,此参数才应更改
  4. 一一重启broker,新协议版本即可生效。
  5. 所有消费者升级到 0.10.0 后,将每个代理上的 log.message.format.version 更改为 0.10.0 并一一重新启动它们。

注意:如果您愿意接受停机,您只需关闭所有代理,更新代码并启动所有代理即可。默认情况下,他们将从新协议开始。

注意:在代理升级后,可以随时更改协议版本并重新启动。它不一定是紧随其后。

升级到 0.10.0.0 后的潜在性能影响

0.10.0 中的消息格式包括一个新的时间戳字段,并使用压缩消息的相对偏移量。磁盘消息格式可以通过 server.properties 文件中的 log.message.format.version 配置。默认磁盘消息格式为 0.10.0。如果消费者客户端使用的是 0.10.0.0 之前的版本,则它只能理解 0.10.0 之前的消息格式。在这种情况下,代理能够将消息从 0.10.0 格式转换为较早的格式,然后再将响应发送给旧版本的使用者。但是,在这种情况下,经纪商不能使用零拷贝传输。Kafka 社区关于性能影响的报告显示,升级后 CPU 利用率从之前的 20% 上升到 100%,这迫使所有客户端立即升级以使性能恢复正常。为了避免在消费者升级到 0.10.0.0 之前发生此类消息转换,可以在将代理升级到 0.10.0.0 时将 log.message.format.version 设置为 0.8.2 或 0.9.0。这样,broker仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,就可以在代理上将消息格式更改为 0.10.0,并享受包括新时间戳和改进压缩的新消息格式。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。将代理升级到 0.10.0.0 时,将 message.format.version 更改为 0.8.2 或 0.9.0。这样,broker仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,就可以在代理上将消息格式更改为 0.10.0,并享受包括新时间戳和改进压缩的新消息格式。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。将代理升级到 0.10.0.0 时,将 message.format.version 更改为 0.8.2 或 0.9.0。这样,broker仍然可以使用零拷贝传输将数据发送给旧的消费者。一旦消费者升级,就可以在代理上将消息格式更改为 0.10.0,并享受包括新时间戳和改进压缩的新消息格式。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。Broker 仍然可以使用零拷贝传输将数据发送给旧消费者。一旦消费者升级,就可以在代理上将消息格式更改为 0.10.0,并享受包括新时间戳和改进压缩的新消息格式。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。Broker 仍然可以使用零拷贝传输将数据发送给旧消费者。一旦消费者升级,就可以在代理上将消息格式更改为 0.10.0,并享受包括新时间戳和改进压缩的新消息格式。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。支持转换是为了确保兼容性,并且对于支持一些尚未更新到较新客户端的应用程序很有用,但即使在过度配置的集群上支持所有消费者流量也是不切实际的。因此,当代理已升级但大多数客户端尚未升级时,尽可能避免消息转换至关重要。

对于升级到 0.10.0.0 的客户端,不会产生性能影响。

注意:通过设置消息格式版本,可以证明所有现有消息均等于或低于该消息格式版本。否则 0.10.0.0 之前的消费者可能会崩溃。特别是,在消息格式设置为 0.10.0 后,不应将其更改回较早的格式,因为这可能会破坏 0.10.0.0 之前版本的消费者。

注意:由于每条消息中引入了额外的时间戳,发送小消息的生产者可能会因为开销增加而看到消息吞吐量下降。同样,复制现在每条消息额外传输 8 个字节。如果您的运行接近集群的网络容量,则可能会导致网卡不堪重负,并因过载而出现故障和性能问题。

注意:如果您在生产者上启用了压缩,您可能会注意到在某些情况下生产者吞吐量降低和/或代理上的压缩率降低。接收压缩消息时,0.10.0 代理会避免重新压缩消息,这通常会减少延迟并提高吞吐量。然而,在某些情况下,这可能会减少生产者的批处理大小,从而导致吞吐量下降。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。此外,snappy 用于压缩消息的生产者缓冲区小于代理使用的缓冲区,这可能会对磁盘上​​消息的压缩率产生负面影响。我们打算在未来的 Kafka 版本中对此进行配置。

0.10.0.0 中潜在的重大变化

  • 从Kafka 0.10.0.0开始,Kafka中的消息格式版本以Kafka版本表示。例如,消息格式0.9.0是指Kafka 0.9.0支持的最高消息版本。
  • 已引入消息格式 0.10.0,并且默认使用该格式。它在消息中包含时间戳字段,并且相对偏移量用于压缩消息。
  • 引入ProduceRequest/Response v2,默认支持消息格式0.10.0
  • 引入FetchRequest/Response v2,默认支持消息格式0.10.0
  • MessageFormatter 接口已更改def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader 接口已更改def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter 的包已更改kafka.toolskafka.common
  • MessageReader 的包已更改kafka.toolskafka.common
  • MirrorMakerMessageHandler 不再公开该handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])方法,因为它从未被调用。
  • 0.7 KafkaMigrationTool 不再与 Kafka 一起打包。如果您需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照记录的升级流程从0.8升级到0.10.0。
  • 新的消费者已标准化其 API,以接受java.util.Collection作为方法参数的序列类型。现有代码可能需要更新才能与 0.10.0 客户端库一起使用。
  • LZ4 压缩消息处理已更改为使用可互操作的帧规范 (LZ4f v1.5.1)。为了保持与旧客户端的兼容性,此更改仅适用于消息格式 0.10.0 及更高版本。使用 v0/v1(消息格式 0.9.0)生成/获取 LZ4 压缩消息的客户端应继续使用 0.9.0 帧实现。使用 Produce/Fetch 协议 v2 或更高版本的客户端应使用可互操作的 LZ4f 帧。可互操作的 LZ4 库列表位于https://www.lz4.org/

0.10.0.0 中的显着变化

  • 从 Kafka 0.10.0.0 开始,一个名为Kafka Streams的新客户端库可用于对 Kafka 主题中存储的数据进行流处理。由于上述消息格式的变化,这个新的客户端库仅适用于 0.10.x 及更高版本的代理。有关更多信息,请阅读Streams 文档
  • receive.buffer.bytes对于新消费者来说,配置参数的默认值现在是 64K。
  • 新的消费者现在公开配置参数exclude.internal.topics以限制内部主题(例如消费者偏移主题)意外地包含在正则表达式订阅中。默认情况下,它是启用的。
  • 旧的 Scala 生成器已被弃用。用户应尽快将其代码迁移到 kafka-clients JAR 中包含的 Java 生产者。
  • 新的消费者 API 已标记为稳定。

从 0.8.0、0.8.1.X 或 0.8.2.X 升级到 0.9.0.0

0.9.0.0 具有潜在的重大更改(请在升级前查看)以及与先前版本相比的经纪商间协议更改。这意味着升级后的经纪商和客户端可能与旧版本不兼容。在升级客户端之前升级 Kafka 集群非常重要。如果您使用 MirrorMaker,下游集群也应该首先升级。

对于滚动升级:

  1. 更新所有代理上的 server.properties 文件并添加以下属性: inter.broker.protocol.version=0.8.2.X
  2. 升级broker。只需将代理关闭、更新代码并重新启动即可一次完成此操作。
  3. 整个集群升级后,通过编辑 inter.broker.protocol.version 并将其设置为 0.9.0.0 来提升协议版本。
  4. 一一重启broker,新协议版本生效

注意:如果您愿意接受停机,您只需关闭所有代理,更新代码并启动所有代理即可。默认情况下,他们将从新协议开始。

注意:在代理升级后,可以随时更改协议版本并重新启动。它不一定是紧随其后。

0.9.0.0 中潜在的重大变化

  • 不再支持 Java 1.6。
  • 不再支持 Scala 2.9。
  • 1000 以上的经纪商 ID 现在默认保留为自动分配的经纪商 ID。如果您的集群的现有代理 ID 高于该阈值,请确保相应地增加served.broker.max.id 代理配置属性。
  • 配置参数replica.lag.max.messages已删除。分区领导者在决定哪些副本同步时将不再考虑滞后消息的数量。
  • 配置参数replica.lag.time.max.ms现在不仅指自上次从副本获取请求以来经过的时间,还指自副本上次赶上以来的时间。仍在从领导者获取消息但未赶上replica.lag.time.max.ms 中最新消息的副本将被视为不同步。
  • 压缩主题不再接受没有密钥的消息,如果尝试这样做,生产者会抛出异常。在 0.8.x 中,没有 key 的消息将导致日志压缩线程随后抱怨并退出(并停止压缩所有压缩主题)。
  • MirrorMaker 不再支持多个目标集群。因此,它只接受单个 --consumer.config 参数。要镜像多个源集群,每个源集群至少需要一个 MirrorMaker 实例,每个实例都有自己的使用者配置。
  • 打包在org.apache.kafka.clients.tools.*下的工具已移至org.apache.kafka.tools.*。所有包含的脚本仍将照常运行,只有直接导入这些类的自定义代码才会受到影响。
  • kafka-run-class.sh 中的默认 Kafka JVM 性能选项 (KAFKA_JVM_PERFORMANCE_OPTS) 已更改。
  • kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 现在在失败时以非零退出代码退出。
  • 当主题名称因使用“.”而存在指标冲突风险时,kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 现在将打印警告 或主题名称中的“_”,并且在实际冲突的情况下出错。
  • kafka-console- Producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用Java生产者而不是旧的Scala生产者,用户必须指定“old- Producer”才能使用旧生产者。
  • 默认情况下,所有命令行工具都会将所有日志消息打印到 stderr 而不是 stdout。

0.9.0.1 中的显着变化

  • 可以通过将broker.id. Generation.enable 设置为 false 来禁用新的代理 ID 生成功能。
  • 配置参数 log.cleaner.enable 现在默认为 true。这意味着具有 cleanup.policy=compact 的主题现在将默认进行压缩,并且 128 MB 的堆将通过 log.cleaner.dedupe.buffer.size 分配给清理进程。您可能需要根据压缩主题的使用情况查看 log.cleaner.dedupe.buffer.size 和其他 log.cleaner 配置值。
  • 新消费者的配置参数 fetch.min.bytes 的默认值现在默认为 1。

0.9.0.0 中的弃用

  • 已弃用从 kafka-topics.sh 脚本 (kafka.admin.TopicCommand) 更改主题配置。今后,请使用 kafka-configs.sh 脚本 (kafka.admin.ConfigCommand) 来实现此功能。
  • kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 已被弃用。今后,请使用 kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) 来实现此功能。
  • kafka.tools.ProducerPerformance 类已被弃用。今后,请使用 org.apache.kafka.tools.ProducerPerformance 来实现此功能(kafka- Producer-perf-test.sh 也将更改为使用新类)。
  • 生产者配置 block.on.buffer.full 已被弃用,并将在未来版本中删除。目前其默认值已更改为 false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms值进行阻塞,之后它将抛出TimeoutException。如果 block.on.buffer.full 属性显式设置为 true,则会将 max.block.ms 设置为 Long.MAX_VALUE,并且metadata.fetch.timeout.ms 将不会被遵守

从0.8.1升级到0.8.2

0.8.2 与 0.8.1 完全兼容。只需将其关闭、更新代码并重新启动即可一次对一个代理进行升级。

从0.8.0升级到0.8.1

0.8.1 与 0.8 完全兼容。只需将其关闭、更新代码并重新启动即可一次对一个代理进行升级。

从0.7升级

版本 0.7 与较新的版本不兼容。为了添加复制(0.7 中缺少),对 API、ZooKeeper 数据结构、协议和配置进行了重大更改。从0.7升级到更高版本需要专门的迁移工具。此迁移无需停机即可完成。



回到顶部