跳转至

8.Kafka连接

8.1 概述

Kafka Connect 是一种在 Apache Kafka 和其他系统之间可扩展且可靠地传输数据的工具。它使得快速定义将大量数据移入和移出 Kafka 的连接器变得简单。Kafka Connect 可以摄取整个数据库或将所有应用程序服务器的指标收集到 Kafka 主题中,从而使数据可用于低延迟的流处理。导出作业可以将 Kafka 主题中的数据传输到辅助存储和查询系统或批处理系统中以进行离线分析。

Kafka连接功能包括:

  • Kafka 连接器的通用框架 - Kafka Connect 标准化了其他数据系统与 Kafka 的集成,简化了连接器的开发、部署和管理
  • 分布式和独立模式 - 扩展到支持整个组织的大型集中管理服务,或缩小到开发、测试和小型生产部署
  • REST 接口 - 通过易于使用的 REST API 提交和管理到 Kafka Connect 集群的连接器
  • 自动偏移管理 - 只需来自连接器的少量信息,Kafka Connect 就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中这个容易出错的部分
  • 默认情况下是分布式且可扩展的 - Kafka Connect 构建在现有的组管理协议之上。可以添加更多工作人员来扩展 Kafka Connect 集群。
  • Streaming/batch 集成 - 利用 Kafka 的现有功能,Kafka Connect 是桥接流和批数据系统的理想解决方案

8.2 用户指南

快速入门提供了如何运行独立版本的 Kafka Connect 的简短示例。本节更详细地介绍如何配置、运行和管理 Kafka Connect。

运行 Kafka 连接

Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。

在独立模式下,所有工作都在单个进程中执行。此配置更易于设置和上手,并且在只有一个工作人员有意义的情况下可能很有用(例如收集日志文件),但它无法从 Kafka Connect 的某些功能(例如容错)中受益。您可以使用以下命令启动独立进程:

> bin/connect-standalone.sh config/connect-standalone.properties [connector1.propertiesconnector2.properties ...]

第一个参数是工作线程的配置。这包括 Kafka 连接参数、序列化格式以及提交偏移量的频率等设置。提供的示例应该适用于使用 提供的默认配置运行的本地集群config/server.properties。需要进行调整才能与不同的配置或生产部署一起使用。所有工作人员(独立的和分布式的)都需要一些配置:

  • bootstrap.servers - 用于引导与 Kafka 的连接的 Kafka 服务器列表
  • key.converter - 转换器类,用于在 Kafka Connect 格式和写入 Kafka 的序列化形式之间进行转换。这控制写入 Kafka 或从 Kafka 读取的消息中键的格式,并且由于它独立于连接器,因此允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。
  • value.converter - 转换器类,用于在 Kafka Connect 格式和写入 Kafka 的序列化形式之间进行转换。这控制写入 Kafka 或从 Kafka 读取的消息中的值的格式,并且由于它独立于连接器,因此允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。
  • plugin.path(默认empty) - 包含 Connect 插件(连接器、转换器、转换)的路径列表。在运行快速启动之前,用户必须添加包含打包在 中的示例 FileStreamSourceConnector 和 FileStreamSinkConnector 的绝对路径connect-file-"version".jar,因为默认情况下这些连接器不包含在Connect Worker 中(请参阅CLASSPATHplugin.path 属性以获取示例)。

独立模式特有的重要配置选项是:

  • offset.storage.file.filename - 存储源连接器偏移的文件

此处配置的参数适用于 Kafka Connect 使用的生产者和消费者来访问配置、偏移量和状态主题。配置Kafka Source任务使用的生产者和Kafka Sink任务使用的消费者,可以使用相同的参数,但需要分别加上producer.和前缀consumer.。从工作配置中继承的唯一不带前缀的 Kafka 客户端参数是bootstrap.servers,在大多数情况下这就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数来允许连接。这些参数需要在工作配置中设置最多三次,一次用于管理访问,一次用于 Kafka 源,一次用于 Kafka 接收器。

从 2.3.0 开始,可以分别使用 Kafka 源或 Kafka 接收器的前缀producer.override.consumer.override.为每个连接器单独配置客户端配置覆盖。这些覆盖包含在连接器的其余配置属性中。

其余参数是连接器配置文件。您可以包含任意数量的内容,但所有内容都将在同一进程中(在不同的线程上)执行。您还可以选择不在命令行上指定任何连接器配置文件,而是使用 REST API 在独立工作线程启动后在运行时创建连接器。

分布式模式处理工作的自动平衡,允许您动态扩展(或缩小),并在活动任务以及配置和偏移提交数据中提供容错能力。执行与独立模式非常相似:

> bin/connect-distributed.sh config/connect-distributed.properties

区别在于启动的类和配置参数,它们改变了 Kafka Connect 进程决定在哪里存储配置、如何分配工作以及在哪里存储偏移量和任务状态的方式。在分布式模式下,Kafka Connect 将偏移量、配置和任务状态存储在 Kafka 主题中。建议手动创建偏移量、配置和状态的主题,以获得所需的分区数量和复制因子。如果启动 Kafka Connect 时尚未创建主题,则会使用默认的分区数和复制因子自动创建主题,这可能不太适合其使用。

特别是,除了上面提到的常见设置之外,在启动集群之前设置以下配置参数也很重要:

  • group.id(默认connect-cluster)- 集群的唯一名称,用于形成 Connect 集群组;请注意,这不能与消费者组 ID冲突
  • config.storage.topic(默认connect-configs)- 用于存储连接器和任务配置的主题;请注意,这应该是单个分区、高度复制、压缩的主题。您可能需要手动创建主题以确保正确配置,因为自动创建的主题可能有多个分区或自动配置为删除而不是压缩
  • offset.storage.topic(默认connect-offsets)- 用于存储偏移量的主题;该主题应该有许多分区,可以复制,并配置为压缩
  • status.storage.topic(默认connect-status)- 用于存储状态的主题;该主题可以有多个分区,并且应该复制并配置为压缩

请注意,在分布式模式下,连接器配置不会在命令行上传递。相反,请使用下面描述的 REST API 来创建、修改和销毁连接器。

配置连接器

连接器配置是简单的键值映射。在独立模式和分布式模式下,它们都包含在创建(或修改)连接器的 REST 请求的 JSON 有效负载中。在独立模式下,这些也可以在属性文件中定义并传递到命令行上的 Connect 进程。

大多数配置都依赖于连接器,因此此处无法概述。但是,有一些常见的选项:

  • name - 连接器的唯一名称。尝试使用相同名称再次注册将会失败。
  • connector.class - 连接器的 Java 类
  • tasks.max - 应为此连接器创建的最大任务数。如果连接器无法实现这种并行级别,它可能会创建更少的任务。
  • key.converter -(可选)覆盖工作人员设置的默认密钥转换器。
  • value.converter -(可选)覆盖工作人员设置的默认值转换器。

connector.class配置支持多种格式:该连接器的类的全名或别名。如果连接器是 org.apache.kafka.connect.file.FileStreamSinkConnector,您可以指定此全名,也可以使用 FileStreamSink 或 FileStreamSinkConnector 使配置更短一些。

接收器连接器还有一些附加选项来控制其输入。每个接收器连接器必须设置以下其中一项:

  • topics - 用作此连接器输入的以逗号分隔的主题列表
  • topics.regex - 用作此连接器输入的主题的 Java 正则表达式

对于任何其他选项,您应该查阅连接器的文档。

转换

连接器可以配置转换以进行轻量级的一次消息修改。它们可以方便地进行数据处理和事件路由。

可以在连接器配置中指定转换链。

  • transforms - 转换的别名列表,指定转换的应用顺序。
  • transforms.$alias.type - 转换的完全限定类名称。
  • transforms.$alias.$transformationSpecificConfig转换的配置属性

例如,让我们采用内置文件源连接器并使用转换来添加静态字段。

在整个示例中,我们将使用无模式 JSON 数据格式。为了使用无模式格式,我们将以下两行connect-standalone.properties从 true 更改为 false:

key.converter.schemas.enable
value.converter.schemas.enable

文件源连接器将每一行读取为字符串。我们将把每一行包装在一个 Map 中,然后添加第二个字段来标识事件的起源。为此,我们使用两种转换:

  • HoistField将输入线放置在地图内
  • InsertField添加静态字段。在此示例中,我们将指示记录来自文件连接器

添加转换后,connect-file-source.properties文件如下所示:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

所有以 开头的行transforms都是为了转换而添加的。您可以看到我们创建的两个转换:“InsertSource”和“MakeMap”是我们选择提供转换的别名。转换类型基于您可以在下面看到的内置转换列表。每个转换类型都有附加配置:HoistField 需要一个名为“field”的配置,它是映射中包含文件中原始字符串的字段的名称。InsertField 转换让我们指定字段名称和要添加的值。

当我们在不进行转换的情况下在示例文件上运行文件源连接器,然后使用 读取它们时kafka-console-consumer.sh,结果是:

"foo"
"bar"
"hello world"

然后,我们在将转换添加到配置文件之后创建一个新的文件连接器。这次,结果将是:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

您可以看到我们读取的行现在是 JSON 映射的一部分,并且有一个带有我们指定的静态值的额外字段。这只是您可以使用转换执行的操作的示例之一。

包含的转换

Kafka Connect 包含多种广泛适用的数据和路由转换:

  • InsertField - 使用静态数据或记录元数据添加字段
  • ReplaceField - 过滤或重命名字段
  • MaskField - 将字段替换为类型的有效空值(0、空字符串等)或自定义替换(仅限非空字符串或数值)
  • ValueToKey - 将记录键替换为由记录值中的字段子集形成的新键
  • HoistField - 将整个事件包装为结构或映射中的单个字段
  • ExtractField - 从 Struct 和 Map 中提取特定字段并在结果中仅包含该字段
  • SetSchemaMetadata - 修改模式名称或版本
  • TimestampRouter - 根据原始主题和时间戳修改记录的主题。当使用需要根据时间戳写入不同表或索引的接收器时很有用
  • RegexRouter - 根据原始主题、替换字符串和正则表达式修改记录的主题
  • Filter - 从所有进一步处理中删除消息。这与谓词一起使用来选择性地过滤某些消息。
  • InsertHeader - 使用静态数据添加标题
  • HeadersFrom - 将键或值中的字段复制或移动到记录标题
  • DropHeaders - 按名称删除标头

下面列出了如何配置每个转换的详细信息:

org.apache.kafka.connect.transforms.InsertField

使用记录元数据中的属性或配置的静态值插入字段。

使用为记录键 ( org.apache.kafka.connect.transforms.InsertField$Key) 或值 ( org.apache.kafka.connect.transforms.InsertField$Value) 设计的具体转换类型。

  • offset.field

    Kafka 偏移量的字段名称 - 仅适用于接收器连接器。
    后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • partition.field

    Kafka 分区的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • static.field

    静态数据字段的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • static.value

    静态字段值(如果配置了字段名称)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • timestamp.field

    记录时间戳的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • topic.field

    Kafka 主题的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.ReplaceField

过滤或重命名字段。

使用为记录键 ( org.apache.kafka.connect.transforms.ReplaceField$Key) 或值 ( org.apache.kafka.connect.transforms.ReplaceField$Value) 设计的具体转换类型。

  • exclude

    要排除的字段。这优先于要包含的字段。

    Type:list
    Default:""
    Valid Values:
    Importance:medium

  • include

    要包含的字段。如果指定,则仅使用这些字段。

    Type:list
    Default:""
    Valid Values:
    Importance:medium

  • renames

    字段重命名映射。

    Type:list
    Default:""
    Valid Values:以冒号分隔的对的列表,例如foo:bar,abc:xyz
    Importance:medium

  • blacklist

    已弃用。请改用排除。

    Type:list
    Default:null
    Valid Values:
    Importance:low

  • whitelist

    已弃用。使用包含代替。

    Type:list
    Default:null
    Valid Values:
    Importance:low

org.apache.kafka.connect.transforms.MaskField

使用字段类型的有效空值(即 0, false, empty string 等)屏蔽指定字段。

对于数字和字符串字段,可以指定转换为正确类型的可选替换值。

使用为记录键 ( org.apache.kafka.connect.transforms.MaskField$Key) 或值 ( org.apache.kafka.connect.transforms.MaskField$Value) 设计的具体转换类型。

  • fields

    要屏蔽的字段的名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • replacement

    自定义值替换,将应用于所有“字段”值(仅限数字或非空字符串值)。

    Type:string
    Default:null
    Valid Values:non-empty string
    Importance:low

org.apache.kafka.connect.transforms.ValueToKey

将记录键替换为由记录值中的字段子集形成的新键。

  • fields

    要提取作为记录键的记录值上的字段名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

org.apache.kafka.connect.transforms.HoistField

如果存在架构,则使用结构中指定的字段名称包装数据;如果存在无架构数据,则使用映射包装数据。

使用为记录键 ( org.apache.kafka.connect.transforms.HoistField$Key) 或值 ( org.apache.kafka.connect.transforms.HoistField$Value) 设计的具体转换类型。

  • fields

    将在生成的结构或映射中创建的单个字段的字段名称。

    Type:string
    Default:
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.ExtractField

如果存在模式,则从结构中提取指定字段;如果存在无模式数据,则从映射中提取指定字段。任何空值都会不加修改地传递。

使用为记录键 ( org.apache.kafka.connect.transforms.ExtractField$Key) 或值 ( org.apache.kafka.connect.transforms.ExtractField$Value) 设计的具体转换类型。

  • fields

    要提取的字段名称。

    Type:string
    Default:
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.SetSchemaMetadata

org.apache.kafka.connect.transforms.SetSchemaMetadata$Key在记录的键 ( ) 或值 ( org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) 架构 上设置架构名称、版本或两者。

  • schema.name

    要设置的架构名称。

    Type:string
    Default:null
    Valid Values:
    Importance:high

  • schema.version

    要设置的架构版本。

    Type:int
    Default:null
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.TimestampRouter

根据原始主题值和记录时间戳更新记录的主题字段。

这主要对接收器连接器有用,因为主题字段通常用于确定目标系统中的等效实体名称(例如数据库表或搜索索引名称)。

  • timestamp.format

    与 兼容的时间戳的格式字符串java.text.SimpleDateFormat

    Type:string
    Default:yyyyMMdd
    Valid Values:
    Importance:high

  • topic.format

    格式字符串可以包含${topic}${timestamp}作为主题和时间戳的占位符。

    Type:string
    Default:\({topic}-\){timestamp}
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.RegexRouter

使用配置的正则表达式和替换字符串更新记录主题。

在底层,正则表达式被编译为java.util.regex.Pattern. 如果模式与输入主题匹配,java.util.regex.Matcher#replaceFirst()则与替换字符串一起使用来获取新主题。

  • regex

    用于匹配的正则表达式。

    Type:string
    Default:
    Valid Values:有效的正则表达式
    Importance:high

  • replacement

    替换字符串。

    Type:string
    Default:
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.Flatten

展平嵌套数据结构,通过将每个级别的字段名称与可配置的分隔符连接来生成每个字段的名称。当模式存在时适用于结构,或者在无模式数据的情况下适用于映射。数组字段及其内容不会被修改。默认分隔符是“.”。

使用为记录键 ( org.apache.kafka.connect.transforms.Flatten$Key) 或值 ( org.apache.kafka.connect.transforms.Flatten$Value) 设计的具体转换类型。

  • delimiter

    为输出记录生成字段名称时在输入记录的字段名称之间插入的分隔符

    Type:string
    Default:.
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.Cast

将字段或整个键或值转换为特定类型,例如强制整数字段具有较小的宽度。从整数、浮点数、布尔值和字符串转换为任何其他类型,并将二进制转换为字符串(base64 编码)。

使用为记录键 ( org.apache.kafka.connect.transforms.Cast$Key) 或值 ( org.apache.kafka.connect.transforms.Cast$Value) 设计的具体转换类型。

  • spec

    字段列表以及将其转换为 form field1:type,field2:type 的类型以转换映射或结构的字段。用于转换整个值的单一类型。有效类型包括 int8、int16、int32、int64、float32、float64、boolean 和 string。请注意,二进制字段只能转换为字符串。

    Type:list
    Default:
    Valid Values:以冒号分隔的对的列表,例如foo:bar,abc:xyz
    Importance:high

org.apache.kafka.connect.transforms.TimestampConverter

在不同格式(例如 Unix 纪元、字符串和连接日期/时间戳类型)之间转换时间戳。适用于单个字段或整个值。

使用为记录键 ( org.apache.kafka.connect.transforms.TimestampConverter$Key) 或值 ( org.apache.kafka.connect.transforms.TimestampConverter$Value) 设计的具体转换类型。

  • target.type

    所需的时间戳表示形式:字符串、unix、日期、时间或时间戳

    Type:string
    Default:
    Valid Values:[string, unix, Date, Time, Timestamp]
    Importance:high

  • fields

    包含时间戳的字段,如果整个值是时间戳,则为空

    Type:string
    Default:""
    Valid Values:
    Importance:high

  • format

    与 SimpleDateFormat 兼容的时间戳格式。当 type=string 时用于生成输出,或者如果输入是字符串则用于解析输入。

    Type:string
    Default:""
    Valid Values:
    Importance:medium

  • unix.precision

    时间戳所需的 Unix 精度:秒、毫秒、微秒或纳秒。用于在 type=unix 时生成输出,或用于在输入为 Long 时解析输入。注意:此 SMT 将在与具有亚毫秒分量的值进行转换期间导致精度损失。

    Type:string
    Default:milliseconds
    Valid Values:[nanoseconds, microseconds, milliseconds, seconds]
    Importance:low

org.apache.kafka.connect.transforms.Filter

删除所有记录,从链中的后续转换中过滤它们。这旨在有条件地用于过滤掉与特定谓词匹配(或不匹配)的记录。

org.apache.kafka.connect.transforms.InsertHeader

为每条记录添加标题。

  • 标头的名称。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:high

  • value.literal

    要设置为所有记录上的标题值的文字值。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:high

org.apache.kafka.connect.transforms.DropHeaders

从每条记录中删除一个或多个标头。

  • header

    要删除的标头的名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

org.apache.kafka.connect.transforms.HeaderFrom

将记录的键/值中的字段移动或复制到该记录的标题中。fields和 的相应元素headers一起标识字段以及应将其移动或复制到的标题。使用为记录键 ( org.apache.kafka.connect.transforms.HeaderFrom$Key) 或值 ( org.apache.kafka.connect.transforms.HeaderFrom$Value) 设计的具体转换类型。

  • fields

    记录中的字段名称,其值将被复制或移动到标题。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • header

    标头名称的顺序与字段配置属性中列出的字段名称的顺序相同。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • operation

    如果move要将字段移动到标题(从键/值中删除),或者copy将字段复制到标题(保留在键/值中)。

    Type:string
    Default:
    Valid Values:[move, copy]
    Importance:high

谓词

转换可以用谓词配置,以便转换仅适用于满足某些条件的消息。特别是,当与过滤器转换谓词相结合时,可以使用选择性地过滤某些消息。

连接器配置中指定了谓词。

  • predicates-应用于某些转换的谓词的别名集。
  • predicates.$alias.type-谓词的完全合格的类名。
  • predicates.$alias.$predicateSpecificConfig-谓词的配置属性。

所有转换都具有隐式配置属性predicatenegate。通过将转换的predicate配置设置为谓词的别名,谓词与转换相关联。可以使用negate配置属性反转谓词的值。

例如,假设您有一个源连接器,可以生成许多不同主题的消息,并且您想要:

  • 完全过滤掉“foo”主题中的消息
  • 将字段名称为“other_field”的ExtractField转换应用于主题“bar”以外的所有主题中的记录

要做到这一点,我们首先需要过滤掉“foo”主题的记录。过滤器转换将记录从进一步处理中删除,并且可以使用TopicNameMatches谓词仅将转换应用于与特定正则表达式匹配的主题中的记录。TopicNameMatches的唯一配置属性是pattern,它是与主题名称匹配的Java正则表达式。配置看起来像这样:

transforms=Filter
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo

predicates=IsFoo
predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo

接下来,仅当记录的主题名称不是“bar”时,我们才需要应用ExtractField。我们不能直接使用 TopicNameMatches,因为这会将转换应用于匹配的主题名称,而不是不匹配的主题名称。转换的隐式negate配置属性允许我们反转谓词匹配的记录集。将其配置添加到前面的示例中,我们得到:

transforms=Filter,Extract
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo

transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=other_field
transforms.Extract.predicate=IsBar
transforms.Extract.negate=true

predicates=IsFoo,IsBar
predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo

predicates.IsBar.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsBar.pattern=bar

Kafka Connect 包括以下谓词:

  • TopicNameMatches - 匹配主题中名称与特定 Java 正则表达式匹配的记录。
  • HasHeaderKey - 匹配具有给定键的标头的记录。
  • RecordIsTombstone - 匹配墓碑记录,即具有空值的记录。

下面列出了如何配置每个谓词的详细信息:

org.apache.kafka.connect.transforms.predicates.HasHeaderKey

一个谓词,对于至少具有一个具有配置名称的标头的记录为真。

  • name

    标头名称。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:medium

org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

对于逻辑删除(即具有空值)的记录为真的谓词。

org.apache.kafka.connect.transforms.predicates.TopicNameMatches

对于主题名称与配置的正则表达式匹配的记录为 true 的谓词。

  • pattern

    用于匹配记录主题名称的 Java 正则表达式。

    Type:string
    Default:
    Valid Values:非空字符串,有效的正则表达式
    Importance:medium

REST API

由于 Kafka Connect 旨在作为服务运行,因此它还提供了用于管理连接器的 REST API。此 REST API 可在独立模式和分布式模式下使用。可以使用配置选项配置 REST API 服务器listeners。该字段应包含以下格式的侦听器列表:protocol://host:port,protocol2://host2:port2。目前支持的协议有httphttps。例如:

listeners=http://localhost:8080,https://localhost:8443

默认情况下,如果没有指定listeners,REST服务器使用HTTP协议在端口8083上运行。使用HTTPS时,配置必须包括SSL配置。默认情况下,它将使用ssl.*设置。如果需要使用REST API与连接到Kafka经纪人不同的配置,这些字段可以前缀为listeners.https。使用前缀时,将仅使用前缀选项,不带前缀的ssl.*选项将被忽略。以下字段可用于为REST API配置HTTPS:

  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.keystore.type
  • ssl.key.password
  • ssl.truststore.location
  • ssl.truststore.password
  • ssl.truststore.type
  • ssl.enabled.protocols
  • ssl.provider
  • ssl.protocol
  • ssl.cipher.suites
  • ssl.keymanager.algorithm
  • ssl.secure.random.implementation
  • ssl.trustmanager.algorithm
  • ssl.endpoint.identification.algorithm
  • ssl.client.auth

REST API不仅被用户用于监控/管理Kafka Connect。在分布式模式下,它还用于Kafka Connect跨集群通信。在关注节点REST API上收到的一些请求将转发到领导节点REST API。如果给定主机可访问的URI与它监听的URI不同,则可以使用配置选项rest.advertised.host.namerest.advertised.portrest.advertised.listener来更改追随者节点将用于与领导者连接的URI。当同时使用HTTP和HTTPS侦听器时,rest.advertised.listener选项还可用于定义哪个侦听器将用于跨集群通信。当使用HTTPS进行节点之间的通信时,将使用相同的ssl.*listeners.https选项来配置HTTPS客户端。

以下是当前支持的 REST API 端点:

  • GET /connectors - 返回活动连接器的列表
  • POST /connectors - 创建一个新的连接器;请求正文应该是一个 JSON 对象,其中包含字符串name字段和config带有连接器配置参数的对象字段
  • GET /connectors/{name} - 获取有关特定连接器的信息
  • GET /connectors/{name}/config - 获取特定连接器的配置参数
  • PUT /connectors/{name}/config - 更新特定连接器的配置参数
  • GET /connectors/{name}/status - 获取连接器的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个工作线程,失败时的错误信息,以及它的所有任务的状态
  • GET /connectors/{name}/tasks - 获取连接器当前运行的任务列表
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个worker,以及失败时的错误信息
  • PUT /connectors/{name}/pause - 暂停连接器及其任务,这会停止消息处理,直到连接器恢复为止。其任务占用的任何资源都会保留分配状态,这使得连接器可以在恢复后快速开始处理数据。
  • PUT /connectors/{name}/stop - 停止连接器并关闭其任务,取消分配其任务占用的任何资源。从资源使用的角度来看,这比暂停连接器更有效,但可能会导致连接器恢复后需要更长时间才能开始处理数据。
  • PUT /connectors/{name}/resume - 恢复暂停或停止的连接器(或者如果连接器未暂停或停止则不执行任何操作)
  • POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false> - 重新启动连接器及其任务实例。
    • “includeTasks”参数指定是否重新启动连接器实例和任务实例(“includeTasks=true”)或仅重新启动连接器实例(“includeTasks=false”),默认值(“false”)保留与早期版本相同的行为。
    • “onlyFailed”参数指定是仅重新启动具有 FAILED 状态的实例(“onlyFailed=true”)还是重新启动所有实例(“onlyFailed=false”),默认值(“false”)保留与早期版本相同的行为。
  • POST /connectors/{name}/tasks/{taskId}/restart - 重新启动单个任务(通常是因为它失败了)
  • DELETE /connectors/{name} - 删除连接器,停止所有任务并删除其配置
  • GET /connectors/{name}/topics - 获取自创建连接器或发出重置其活动主题集的请求以来特定连接器正在使用的主题集
  • PUT /connectors/{name}/topics/reset - 发送请求以清空连接器的活动主题集
  • GET /connectors/{name}/offsets - 获取连接器的当前偏移(有关更多详细信息,请参阅KIP-875 )

FileStreamSourceConnector的示例请求主体:

{
  "offsets": [
    {
      "partition": {
        "filename": "test.txt"
      },
      "offset": {
        "position": 30
      }
    }
  ]
}

FileStreamSinkConnector的请求主体示例:

{
  "offsets": [
    {
      "partition": {
        "kafka_topic": "test",
        "kafka_partition": 0
      },
      "offset": {
        "kafka_offset": 5
      }
    },
    {
      "partition": {
        "kafka_topic": "test",
        "kafka_partition": 1
      },
      "offset": null
    }
  ]
}

“偏移”字段可能为空,以重置特定分区的偏移量(适用于源连接器和接收器连接器)。请注意,在源连接器的情况下,请求主体格式取决于连接器的实现,而所有接收器连接器都有通用格式。

Kafka Connect 还提供了一个 REST API,用于获取有关连接器插件的信息:

  • GET /connector-plugins - 返回 Kafka Connect 集群中安装的连接器插件列表。请注意,API 仅检查处理请求的工作线程上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加新的连接器 jar
  • PUT /connector-plugins/{connector-type}/config/validate - 根据配置定义验证提供的配置值。此 API 执行每个配置验证,在验证期间返回建议值和错误消息。

以下是顶级(根)端点支持的 REST 请求:

  • GET / - 返回有关 Kafka Connect 集群的基本信息,例如服务 REST 请求的 Connect Worker 的版本(包括源代码的 git commit ID)以及连接到的 Kafka 集群 ID。

有关 REST API 的完整规范,请参阅OpenAPI 文档

Connect 中的错误报告

Kafka Connect 提供错误报告来处理各个处理阶段遇到的错误。默认情况下,转换期间或转换中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过此类错误、选择性地将每个错误以及失败操作的详细信息和有问题的记录(具有各种详细级别)写入 Connect 应用程序日志来容忍此类错误。当接收器连接器处理从其 Kafka 主题消耗的消息时,这些机制还会捕获错误,并且所有错误都可以写入可配置的“死信队列”(DLQ) Kafka 主题。

要将连接器的转换器、转换或接收器连接器本身内的错误报告到日志,请errors.log.enable=true在连接器配置中设置以记录每个错误和问题记录的主题、分区和偏移量的详细信息。出于其他调试目的,设置errors.log.include.messages=true为还将问题记录键、值和标头记录到日志中(请注意,这可能会记录敏感信息)。

要将连接器的转换器、转换或接收器连接器本身内的错误报告给死信队列主题,请设置errors.deadletterqueue.topic.name和(可选)errors.deadletterqueue.context.headers.enable=true

默认情况下,连接器在出现错误或异常时立即表现出“快速失败”行为。这相当于将以下配置属性及其默认值添加到连接器配置中:

# disable retries on failure
errors.retry.timeout=0

# do not log the error and their contexts
errors.log.enable=false

# do not record errors in a dead letter queue topic
errors.deadletterqueue.topic.name=

# Fail on first error
errors.tolerance=none

可以更改这些和其他相关的连接器配置属性以提供不同的行为。例如,可以将以下配置属性添加到连接器配置中,以通过多次重试设置错误处理、记录到应用程序日志和 Kafka 主题my-connector-errors,并通过报告错误而不是使连接器任务失败来容忍所有错误:

# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.messages=false

# produce error context into the Kafka topic
errors.deadletterqueue.topic.name=my-connector-errors

# Tolerate all errors.
errors.tolerance=all

exactly.once.support

Kafka Connect 能够为接收器连接器(从版本 0.11.0 开始)和源连接器(从版本 3.3.0 开始)提供一次性语义。请注意,对一次语义的支持高度依赖于您运行的连接器类型。即使您在集群中每个节点的配置中设置了所有正确的工作线程属性,如果连接器未设计为或无法利用 Kafka Connect 框架的功能,则可能无法实现精确一次。

水槽连接器

如果接收器连接器支持精确一次语义,要在 Connect 工作线程级别启用精确一次,您必须确保其使用者组配置为忽略中止事务中的记录。您可以通过将工作程序属性设置consumer.isolation.level为来实现此read_committed目的,或者,如果运行支持它的 Kafka Connect 版本,则使用连接器客户端配置覆盖策略,允许在各个连接器配置中consumer.override.isolation.level将该属性设置为。read_committed没有额外的 ACL 要求。

源连接器

如果源连接器支持一次性语义,则必须配置 Connect 集群以启用对一次性源连接器的框架级支持。如果针对安全的 Kafka 集群运行,可能需要额外的 ACL。请注意,对源连接器的一次性支持目前仅在分布式模式下可用;独立的 Connect 工作线程无法提供一次性语义。

工作人员配置

对于新的 Connect 集群,请在集群中每个节点的工作程序配置中将该exactly.once.source.support属性设置为。enabled对于现有集群,需要进行两次滚动升级。在第一次升级期间,该exactly.once.source.support属性应设置为preparing,在第二次升级期间,应设置为enabled

ACL要求

启用一次性源支持后,每个 Connect 工作线程的主体将需要以下 ACL:

OPERATION RESOURCE TYPE RESOURCE NAME NOTE
Write TransactionalId connect-cluster-${groupId}, where ${groupId} is the group.id of the cluster
Describe TransactionalId connect-cluster-${groupId}, where ${groupId} is the group.id of the cluster
IdempotentWrite Cluster ID of the Kafka cluster that hosts the worker's config topic The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters

每个连接器的主体将需要以下 ACL:

OPERATION RESOURCE TYPE RESOURCE NAME NOTE
Write TransactionalId ${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group.id of the Connect cluster, ${connector}is the name of the connector, and ${taskId} is the ID of the task (starting from zero) A wildcard prefix of ${groupId}-${connector}*can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
Describe TransactionalId ${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group.id of the Connect cluster, ${connector}is the name of the connector, and ${taskId} is the ID of the task (starting from zero) A wildcard prefix of ${groupId}-${connector}*can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
Write Topic Offsets topic used by the connector, which is either the value of the offsets.storage.topicproperty in the connector’s configuration if provided, or the value of the offsets.storage.topicproperty in the worker’s configuration if not.
Read Topic Offsets topic used by the connector, which is either the value of the offsets.storage.topicproperty in the connector’s configuration if provided, or the value of the offsets.storage.topicproperty in the worker’s configuration if not.
Describe Topic Offsets topic used by the connector, which is either the value of the offsets.storage.topicproperty in the connector’s configuration if provided, or the value of the offsets.storage.topicproperty in the worker’s configuration if not.
Create Topic Offsets topic used by the connector, which is either the value of the offsets.storage.topicproperty in the connector’s configuration if provided, or the value of the offsets.storage.topicproperty in the worker’s configuration if not. Only necessary if the offsets topic for the connector does not exist yet
IdempotentWrite Cluster ID of the Kafka cluster that the source connector writes to The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters

插件发现

插件发现是Connect工作者用于查找插件类并使其在连接器中配置和运行的策略的名称。这由plugin.discovery worker配置控制,并对工人启动时间有重大影响。service_load是最快的策略,但在将此配置设置为service_load之前,应注意验证插件是否兼容。

在3.6版本之前,此策略是不可配置的,其行为类似于与所有插件兼容only_scan模式。对于3.6及更高版本,此模式默认为hybrid_warn,它也与所有插件兼容,但会记录与service_load不兼容的插件的警告。如果检测到与service_load不兼容的插件,hybrid_fail策略会错误地阻止工人,并断言所有插件都是兼容的。最后,service_load策略禁用所有其他模式中使用的缓慢遗留扫描机制,而是使用更快的ServiceLoader机制。与该机制不兼容的插件可能无法使用。

验证插件兼容性

要验证您的所有插件是否与service_load兼容,首先确保您使用的是Kafka Connect 3.6或更高版本。然后,您可以执行以下检查之一:

  • 使用默认的hybrid_warn和为theorgorg.apache.kafka.connect包启用的WARN日志启动您的员工。至少应打印一条提及plugin.discovery配置的警告日志消息。此日志消息将明确表示所有插件都兼容,或列出不兼容的插件。
  • 使用hybrid_fail在测试环境中启动您的员工。如果所有插件都兼容,启动将成功。如果至少有一个插件不兼容,工人将无法启动,所有不兼容的插件都将列在例外中。

如果验证步骤成功,那么您当前安装的插件集是兼容的,将plugin.discovery配置更改为service_load应该是安全的。如果验证失败,您无法使用service_load策略,并应注意不兼容的插件列表。在使用service_load策略之前,必须解决所有插件。建议在安装或更改插件版本后执行此验证,并且可以在持续集成环境中自动完成验证。

运营商:文物迁移

作为Connect的运营商,如果您发现不兼容的插件,有多种方法可以解决不兼容的问题。它们从大多数到最不受欢迎都在下面列出。

  1. 检查插件提供商的最新版本,如果兼容,请升级。
  2. 联系您的插件提供商,要求他们按照源迁移说明迁移插件使其兼容,然后升级到兼容版本。
  3. 使用随附的迁移脚本自行迁移插件工件。

迁移脚本位于Kafka安装的bin/connect-plugin-path.shbin\windows\connect-plugin-path.bat中。该脚本可以通过添加或修改JAR或资源文件来迁移已安装在Connect worker'spluginplugin.path上的不兼容插件工件。这不适合使用代码签名的环境,因为这可能会改变工件,使其无法通过签名验证。查看--help的内置帮助。

要执行迁移,首先使用list子命令来获取脚本可用的插件的概述。您必须告诉脚本在哪里可以找到插件,这可以通过可重复的--worker-config--plugin-path--plugin-location参数来完成。该脚本将忽略类路径上的插件,因此您的类路径上的任何自定义插件都应移动到插件路径,以便与此迁移脚本一起使用,或手动迁移。请务必将list输出与工人启动警告或错误消息进行比较,以确保脚本找到所有受影响的插件。

一旦您看到所有不兼容的插件都包含在列表中,您就可以继续使用sync-manifests --dry-run进行干运行迁移。除了将迁移结果写入磁盘外,这将执行迁移的所有部分。请注意,sync-manifests命令要求所有指定的路径都是可写入的,并且可能会更改目录的内容。在指定的路径中备份您的插件,或将它们复制到可写目录。

在删除--dry-run标志并实际运行迁移之前,确保您有插件的备份,并且干运行成功。如果在没有--dry-run标志的情况下迁移失败,那么部分迁移的工件应该被丢弃。迁移是幂等的,因此多次运行并在已迁移的插件上运行它是安全的。脚本完成后,您应该验证迁移是否完成。迁移脚本适合在持续集成环境中用于自动迁移。

开发人员:源迁移

为了使插件与service_load兼容,有必要将ServiceLoader清单添加到源代码中,然后将其打包在发布工件中。清单是META-INF/services/中以其超类类型命名的资源文件,并包含一个完全合格的子类名称列表,每行一个。

为了使插件兼容,它必须在与它扩展的插件超类相对应的清单中显示为一行。如果单个插件实现了多个插件接口,那么它应该出现在它实现的每个接口的清单中。如果您没有特定类型插件的类,则无需包含该类型的清单文件。如果您的类不应该显示为插件,它们应该被标记为抽象。以下类型预计将有清单:

  • org.apache.kafka.connect.sink.SinkConnector
  • org.apache.kafka.connect.source.SourceConnector
  • org.apache.kafka.connect.storage.Converter
  • org.apache.kafka.connect.storage.HeaderConverter
  • org.apache.kafka.connect.transforms.Transformation
  • org.apache.kafka.connect.transforms.predicates.Predicate
  • org.apache.kafka.common.config.provider.ConfigProvider
  • org.apache.kafka.connect.rest.ConnectRestExtension
  • org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy

例如,如果您只有一个具有完全合格名称com.example.MySinkConnector的连接器,那么必须将一个清单文件添加到META-INF/services/org.apache.kafka.connect.sink.SinkConnector中的资源中,并且内容应类似于以下内容:

# license header or comment
com.example.MySinkConnector

然后,您应该使用预发布工件的验证步骤来验证您的清单是否正确。如果验证成功,您可以正常释放插件,运营商可以升级到兼容版本。

8.3 连接器开发指南

本指南介绍了开发人员如何为 Kafka Connect 编写新的连接器以在 Kafka 和其他系统之间移动数据。它简要回顾了一些关键概念,然后描述了如何创建简单的连接器。

核心概念和 API

连接器和任务

要在 Kafka 和另一个系统之间复制数据,用户需要Connector为他们想要从中提取数据或向其中推送数据的系统创建一个 Kafka 系统。连接器有两种类型:SourceConnectors从另一个系统导入数据(例如JDBCSourceConnector将关系数据库导入到 Kafka)和SinkConnectors导出数据(例如HDFSSinkConnector将 Kafka 主题的内容导出到 HDFS 文件)。

Connectors自己不执行任何数据复制:它们的配置描述了要复制的数据,并且Connector负责将该作业分解为一组Tasks可以分发给工作人员的数据。它们Tasks也有两种相应的风格:SourceTaskSinkTask

完成任务后,每个人都Task必须将其数据子集复制到 Kafka 或从 Kafka 复制数据。在 Kafka Connect 中,应该始终可以将这些分配构建为一组输入和输出流,其中包含具有一致模式的记录。有时这种映射是显而易见的:一组日志文件中的每个文件都可以被视为一个流,其中每个解析的行使用相同的模式形成一条记录,并将偏移量存储为文件中的字节偏移量。在其他情况下,映射到此模型可能需要更多工作:JDBC 连接器可以将每个表映射到流,但偏移量不太清楚。一种可能的映射使用时间戳列来生成增量返回新数据的查询,并且最后查询的时间戳可以用作偏移量。

流和记录

每个流应该是一系列键值记录。键和值都可以具有复杂的结构——提供了许多基本类型,但也可以表示数组、对象和嵌套数据结构。运行时数据格式不采用任何特定的序列化格式;这种转换由框架内部处理。

除了键和值之外,记录(由源生成的记录和传送到接收器的记录)还具有关联的流 ID 和偏移量。框架使用它们定期提交已处理的数据的偏移量,以便在发生故障时,可以从上次提交的偏移量恢复处理,从而避免不必要的重新处理和重复事件。

动态连接器

并非所有作业都是静态的,因此Connector实现还负责监视外部系统是否有任何可能需要重新配置的更改。例如,在该JDBCSourceConnector示例中,Connector可能会为每个Task. 创建新表时,它必须发现这一点,以便可以通过更新其配置将新表分配给其中之一Tasks。当它注意到需要重新配置的更改(或 数量的更改Tasks)时,它会通知框架,框架会更新任何相应的Tasks.

开发一个简单的连接器

开发连接器只需要实现两个接口,ConnectorTaskfile包中Kafka的源代码包含一个简单的示例。此连接器旨在用于独立模式,并实现了SourceConnector/SourceTask来读取文件的每一行并将其作为记录和将每条记录写入文件的SinkConnector/SinkTask

本节的其余部分将介绍一些代码,以演示创建连接器的关键步骤,但开发人员还应参考完整的示例源代码,因为为简洁起见省略了许多细节。

连接器示例

我们将以一个简单的例子介绍SourceConnectorSinkConnector的实现非常相似。选择软件包和类名,这些示例将使用FileStreamSourceConnector,但酌情替换您自己的类名。为了使插件在运行时可被发现,请在META-INF/services/org.apache.kafka.connect.source.SourceConnector中的资源中添加一个ServiceLoader清单,并在一行中添加您完全合格的类名:

com.example.FileStreamSourceConnector

创建一个继承SourceConnector的类,并添加一个字段,该字段将存储要传播到任务的配置信息(要向的任务发送数据的主题,也可以选择-要读取的文件名和最大批处理大小):

package com.example;

public class FileStreamSourceConnector extends SourceConnector {
    private Map<String, String> props;

最简单的填充方法是taskClass()它定义了应该在工作进程中实例化以实际读取数据的类:

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

我们将在下面定义FileStreamSourceTask类。接下来,我们添加一些标准的生命周期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // Initialization logic and setting up of resources can take place in this method.
    // This connector doesn't need to do any of that, but we do log a helpful message to the user.

    this.props = props;
    AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
    String filename = config.getString(FILE_CONFIG);
    filename = (filename == null || filename.isEmpty()) ? "standard input" : config.getString(FILE_CONFIG);
    log.info("Starting file source connector reading from {}", filename);
}

@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最后,实现的真正核心在于taskConfigs()在这种情况下,我们只处理单个文件,因此即使我们可能被允许根据maxTasks参数生成更多任务,我们也会返回一个只有一个条目的列表:

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    // Note that the task configs could contain configs additional to or different from the connector configs if needed. For instance,
    // if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream).
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    configs.add(props);
    return configs;
}

即使有多个任务,这种方法的实现通常也非常简单。它只需要确定输入任务的数量,这可能需要联系它正在提取数据的远程服务,然后将它们分配起来。由于一些在任务之间拆分工作的模式非常常见,因此ConnectorUtils中提供了一些实用程序来简化这些情况。

请注意,这个简单的示例不包括动态输入。有关如何触发任务配置更新,请参阅下一节中的讨论。

任务示例-源任务

接下来,我们将描述相应SourceTask的实现。实施时间很短,但太长,无法在本指南中完全涵盖。我们将使用伪代码来描述大多数实现,但您可以参考源代码以获取完整示例。

与连接器一样,我们需要创建一个继承适当基础Task类的类。它还有一些标准的生命周期方法:

public class FileStreamSourceTask extends SourceTask {
    private String filename;
    private InputStream stream;
    private String topic;
    private int batchSize;

    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
        batchSize = props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);
    }

    @Override
    public synchronized void stop() {
        stream.close();
    }

这些是略微简化的版本,但表明这些方法应该相对简单,它们应该执行的唯一工作是分配或释放资源。关于这一实施,有两点需要注意。首先,start()方法尚未处理从之前的偏移量恢复,这将在后面的章节中解决。其次,stop()方法同步。这是必要的,因为SourceTasks获得了一个可以无限期阻止的专用线程,因此需要通过来自Worker中不同线程的调用来阻止它们。

接下来,我们实现了任务的主要功能,即poll()方法,该方法从输入系统获取事件并返回List<SourceRecord>

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
                if (records.size() >= batchSize) {
                    return records;
                }
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}

同样,我们省略了一些细节,但我们可以看到重要的步骤:poll()方法将被反复调用,对于每次调用,它将循环尝试从文件中读取记录。对于它读取的每一行,它还会跟踪文件偏移量。它使用此信息创建包含四条信息的输出SourceRecord:源分区(只有一个,正在读取的单个文件)、源偏移量(文件中的字节偏移量)、输出主题名称和输出值(该行,我们包括一个表示此值始终是字符串的模式)。SourceRecord构造函数的其他变体还可以包括特定的输出分区、密钥和标头。

请注意,此实现使用正常的Java InputStream接口,如果数据不可用,可能会休眠。这是可以接受的,因为Kafka Connect为每个任务提供了一个专用线程。虽然任务实现必须符合基本的poll()接口,但它们在实现方式上有很大的灵活性。在这种情况下,基于NIO的实现会更有效率,但这种简单的方法有效,可以快速实现,并且与旧版本的Java兼容。

尽管在示例中没有使用,SourceTask也提供了两个API来提交源系统中的偏移量:commitcommitRecord。 API 是为具有消息确认机制的源系统提供的。 重写这些方法允许源连接器在将消息写入 Kafka 后,批量或单独地确认源系统中的消息。 commit API 将偏移量存储在源系统中,直到“poll”返回的偏移量为止。 此 API 的实现应阻塞,直到提交完成。 在将每个 SourceRecord 写入 Kafka 后,commitRecord API 会保存源系统中的偏移量。 由于 Kafka Connect 会自动记录偏移量,因此不需要 SourceTask 来实现它们。 如果连接器确实需要确认源系统中的消息,通常只需要其中一个 API。

Sink Tasks

上一节描述了如何实现一个简单的SourceTask。与SourceConnectorSinkConnectorSourceTaskSinkTask具有非常不同的接口,因为SourceTask使用拉取接口,而SinkTask使用推送接口。两者都有共同的生命周期方法,但SinkTask界面截然不同:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }

    public abstract void put(Collection<SinkRecord> records);

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

SinkTask文档包含完整的详细信息,但这个界面几乎和SourceTask一样简单。Theputput()方法应包含大部分实现,接受SinkRecords集,执行任何所需的翻译,并将其存储在目标系统中。此方法不需要确保数据在返回之前已完全写入目标系统。事实上,在许多情况下,内部缓冲将是有用的,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销。SinkRecords包含与SourceRecords基本相同的信息:Kafka主题、分区、偏移量、事件键和值以及可选标头。

flush()方法在偏移提交过程中使用,它允许任务从故障中恢复并从安全点恢复,这样就不会错过任何事件。该方法应将任何未完成的数据推送到目标系统,然后阻止,直到写入得到确认。offsets参数通常可以忽略,但在某些情况下,当实现希望将偏移信息存储在目标存储中以提供精确一次交付时非常有用。例如,HDFS连接器可以做到这一点,并使用原子移动操作来确保flushflush()操作原子地提交数据并偏移到HDFS中的最终位置。

Errant Record Reporter

当为连接器启用错误报告时,连接器可以使用ErrantRecordReporter报告发送到水槽连接器的单个记录的问题。以下示例显示了连接器的SinkTask子类如何获取和使用ErrantRecordReporter,在DLQ未启用或连接器安装在没有此报告器功能的旧Connect运行时安全地处理空报告器:

private ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
    ...
    try {
        reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
    } catch (NoSuchMethodException | NoClassDefFoundError e) {
        // Will occur in Connect runtimes earlier than 2.6
        reporter = null;
    }
}

@Override
public void put(Collection<SinkRecord> records) {
    for (SinkRecord record: records) {
        try {
            // attempt to process and send record to data sink
            process(record);
        } catch(Exception e) {
            if (reporter != null) {
                // Send errant record to error reporter
                reporter.report(record, e);
            } else {
                // There's no error reporter, so fail
                throw new ConnectException("Failed on record", e);
            }
        }
    }
}

Resuming from Previous Offsets

SourceTask实现包括每条记录的流ID(输入文件名)和偏移量(文件中的位置)。该框架使用它定期提交偏移,以便在失败的情况下,任务可以恢复和最小化重新处理和可能重复的事件数量(或者如果Kafka Connect被优雅地停止,例如在独立模式下或由于作业重新配置,则从最近的偏移量恢复)。这个提交过程完全由框架自动化,但只有连接器知道如何寻求回到输入流中的正确位置,从该位置恢复。

要在启动时正确恢复,任务可以使用传递到其initialize()方法中的SourceContext来访问偏移数据。在initialize()我们将添加更多代码来读取偏移量(如果存在),并寻求该位置:

stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
    Long lastRecordedOffset = (Long) offset.get("position");
    if (lastRecordedOffset != null)
        seekToOffset(stream, lastRecordedOffset);
}

当然,您可能需要读取每个输入流的许多键。OffsetStorageReader接口还允许您发布批量读取以高效加载所有偏移量,然后通过查找每个输入流到适当的位置来应用它们。

Exactly-once source connectors

Supporting exactly-once

随着KIP-618的通过,Kafka Connect从3.3.0版本开始支持精确一次的源连接器。为了使源连接器利用这种支持,它必须能够为其发出的每条记录提供有意义的源偏移量,并在与任何这些偏移量相对应的确切位置恢复外部系统的消耗,而不会丢弃或复制消息。

Defining transaction boundaries

默认情况下,Kafka Connect框架将为源任务从其poll方法返回的每批记录创建并提交新的Kafka事务。然而,连接器还可以定义自己的事务边界,用户可以通过在连接器的配置中将transaction.boundary属性设置为connector来启用该边界。

如果启用,连接器的任务将从其SourceTaskContext访问TransactionContext,他们可以使用该Context来控制事务何时中止和提交。

例如,至少每十条记录提交一笔交易:

private int recordsSent;

@Override
public void start(Map<String, String> props) {
    this.recordsSent = 0;
}

@Override
public List<SourceRecord> poll() {
    List<SourceRecord> records = fetchRecords();
    boolean shouldCommit = false;
    for (SourceRecord record : records) {
        if (++this.recordsSent >= 10) {
            shouldCommit = true;
        }
    }
    if (shouldCommit) {
        this.recordsSent = 0;
        this.context.transactionContext().commitTransaction();
    }
    return records;
}

或者每十条记录提交一笔交易:

private int recordsSent;

@Override
public void start(Map<String, String> props) {
    this.recordsSent = 0;
}

@Override
public List<SourceRecord> poll() {
    List<SourceRecord> records = fetchRecords();
    for (SourceRecord record : records) {
        if (++this.recordsSent % 10 == 0) {
            this.context.transactionContext().commitTransaction(record);
        }
    }
    return records;
}

大多数连接器不需要定义自己的事务边界。然而,如果源系统中的文件或对象被分解为多个源记录,但应该以原子方式交付,这可能是有用的。此外,如果每个具有给定偏移量的每个记录都在单个事务中交付,如果不可能给每个源记录一个唯一的源偏移量,这可能会很有用。

请注意,如果用户没有在连接器配置中启用连接器定义的事务边界,则由context.transactionContext()返回的TransactionContext将为null

验证API

源连接器开发人员可以实现一些额外的飞行前验证API。

某些用户可能需要来自连接器的一次性语义。 在这种情况下,他们可以在连接器的配置中将 exactly.once.support 属性设置为required。 发生这种情况时,Kafka Connect 框架将询问连接器是否可以使用指定的配置提供一次性语义。 这是通过调用连接器上的exactlyOnceSupport方法来完成的。

如果连接器不支持精确一次的语义,它仍然应该实现此方法,让用户确信它无法提供精确一次的语义:

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
    // This connector cannot provide exactly-once semantics under any conditions
    return ExactlyOnceSupport.UNSUPPORTED;
}

否则,连接器应检查配置,并返回 ExactlyOnceSupport.SUPPORTED,如果它可以提供精确一次的语义:

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
    // This connector can always provide exactly-once semantics
    return ExactlyOnceSupport.SUPPORTED;
}

此外,如果用户已将连接器配置为定义自己的事务边界,Kafka Connect框架将询问连接器是否可以使用canDefineTransactionBoundaries方法使用指定的配置定义自己的事务边界:

@Override
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> props) {
    // This connector can always define its own transaction boundaries
    return ConnectorTransactionBoundaries.SUPPORTED;
}

在某些情况下,这种方法只能针对可以定义自己的事务边界的连接器实施。如果连接器永远无法定义自己的事务边界,它就不需要实现这种方法。

动态 Input/Output Streams

Kafka Connect旨在定义批量数据复制作业,例如复制整个数据库,而不是创建许多作业来单独复制每个表。这种设计的一个后果是,连接器的输入或输出流集可能会随着时间的推移而变化。

源连接器需要监控源系统的变化,例如数据库中的表添加/删除。当他们接收更改时,他们应该通过ConnectorContext对象通知框架需要重新配置。例如,在SourceConnector

if (inputsChanged())
    this.context.requestTaskReconfiguration();

该框架将及时请求新的配置信息并更新任务,允许他们在重新配置之前优雅地提交进度。请注意,在SourceConnector,此监控目前由连接器实现。如果需要额外的线程来执行此监控,连接器必须自行分配它。

理想情况下,这个用于监控更改的代码将隔离在Connector上,任务无需担心它们。然而,更改也会影响任务,最常见的是当其中一个输入流在输入系统中被销毁时,例如,如果表从数据库中删除。如果TaskConnector之前遇到问题,如果Connector需要轮询更改,这将很常见,则Task将需要处理后续错误。谢天谢地,这通常可以通过捕获和处理适当的异常来处理。

SinkConnectors通常只需要处理流的添加,这些流可能会转换为输出中的新条目(例如,新的数据库表)。该框架管理对Kafka输入的任何更改,例如当输入主题集因正则表达式订阅而发生变化时。SinkTasks应该期待新的输入流,这可能需要在下游系统中创建新资源,例如数据库中的新表。在这些情况下,最棘手的情况可能是多个SinkTasks首次看到新的输入流,同时尝试创建新资源之间的冲突。另一方面,SinkConnectors通常不需要特殊代码来处理一组动态流。

配置验证

Kafka Connect允许您在提交要执行的连接器之前验证连接器配置,并可以提供有关错误和推荐值的反馈。为了利用这一点,连接器开发人员需要提供config()的实现,以将配置定义公开给框架。

FileStreamSourceConnector中的以下代码定义了配置并将其公开给框架。

static final ConfigDef CONFIG_DEF = new ConfigDef()
    .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
    .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
    .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
        "The maximum number of records the source task can read from the file each time it is polled");

public ConfigDef config() {
    return CONFIG_DEF;
}

ConfigDef类用于指定预期配置集。对于每个配置,您可以指定名称、类型、默认值、文档、组信息、组中的顺序、配置值的宽度以及适合在UI中显示的名称。此外,您可以通过覆盖Validator类来提供用于单个配置验证的特殊验证逻辑。此外,由于配置之间可能存在依赖性,例如,配置的有效值和可见性可能会根据其他配置的值而变化。为了处理这一点,ConfigDef允许您指定配置的依赖项,并提供Recommender实现,以获得有效值并设置给定当前配置值的可见性。

此外,Connector中的validate()方法提供了一个默认的验证实现,该实现返回允许的配置列表以及每个配置的配置错误和推荐值。然而,它不使用推荐的值进行配置验证。您可以为自定义配置验证提供默认实现的覆盖,这可能会使用推荐的值。

使用Schemas

FileStream连接器是一个很好的例子,因为它们很简单,但它们也有琐碎的结构化数据——每行都只是一个字符串。几乎所有实用连接器都需要具有更复杂数据格式的模式。

要创建更复杂的数据,您需要使用Kafka ConnectdataAPI。除了原始类型外,大多数结构化记录还需要与两个类交互:SchemaStruct

API文档提供了一个完整的参考,但以下是创建SchemaStruct的简单示例:

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", SchemaBuilder.bool().defaultValue(false).build())
    .build();

Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

如果您正在实现源连接器,您需要决定何时以及如何创建模式。在可能的情况下,你应该尽可能避免重新计算它们。例如,如果您的连接器保证具有固定的模式,请静态创建它并重用单个实例。

然而,许多连接器将具有动态模式。一个简单的例子是数据库连接器。即使只考虑单个表,也不会为整个连接器预定义模式(因为它因表而异)。但在连接器的生命周期内,它也可能不会为单个表固定,因为用户可以执行ALTER TABLE命令。连接器必须能够检测这些变化并做出适当的反应。

水槽连接器通常更简单,因为它们正在消耗数据,因此不需要创建模式。然而,他们应该同样小心地验证他们收到的模式是否具有预期的格式。当模式不匹配时——通常表示上游生产者正在生成无法正确转换为目标系统的无效数据——接收器连接器应抛出异常以向系统指示此错误。

8.4 管理

Kafka Connect的REST层提供了一组API来管理集群。这包括查看连接器配置及其任务状态以及更改其当前行为(例如更改配置和重新启动任务)的API。

当连接器首次提交给集群时,Connect工人之间会触发重新平衡,以分配由新连接器任务组成的负载。当连接器增加或减少所需的任务数量,当连接器的配置发生变化时,或者当作为Connect集群的故意升级的一部分或由于故障而添加或删除工人时,也会使用同样的重新平衡过程。

在2.3.0之前的版本中,Connect工人将重新平衡集群中的全套连接器及其任务,作为确保每个工人拥有大致相同的工作量的简单方法。仍然可以通过设置connect.protocol=eager来启用此行为。

从2.3.0开始,Kafka Connect默认使用一种执行增量合作再平衡的协议,该协议逐步平衡Connect工人之间的连接器和任务,只影响新任务、要删除或需要从一个工人移动到另一个工人的任务。其他任务在重新平衡期间不会像旧协议那样停止和重新启动。

如果 Connect 工作线程有意或由于故障而离开组,Connect 会在触发重新平衡之前等待scheduled.rebalance.max.delay.ms。 此延迟默认为五分钟(300000ms),以容忍工作人员的故障或升级,而无需立即重新分配即将离开的工作人员的负载。 如果该工作人员在配置的延迟内返回,它将获得之前分配的完整任务。 但是,这意味着任务将保持未分配状态,直到scheduled.rebalance.max.delay.ms指定的时间过去。 如果工作人员未在该时间限制内返回,Connect 将在 Connect 集群中的剩余工作人员之间重新分配这些任务。

当构成 Connect 集群的所有工作线程都配置了connect.protocol=兼容时,新的 Connect 协议就会启用,这也是缺少此属性时的默认值。 因此,当所有工作人员升级到 2.3.0 时,会自动升级到新的 Connect 协议。 当最后一个工作线程加入版本 2.3.0 时,Connect 集群的滚动升级将激活增量协作重新平衡。

您可以使用REST API查看连接器的当前状态及其任务,包括分配给每个连接器的工作人员的ID。例如,GET /connectors/file-source/status请求显示名为file-source的连接器的状态:

{
    "name": "file-source",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.1.208:8083"
    },
    "tasks": [
        {
        "id": 0,
        "state": "RUNNING",
        "worker_id": "192.168.1.209:8083"
        }
    ]
}

连接器及其任务发布共享主题的状态更新(配置为status.storage.topic),集群中的所有工作人员都会监控该主题。由于工人异步使用此主题,在状态更改通过状态API可见之前,通常会有(短暂的)延迟。以下状态对于连接器或其任务之一是可能的:

  • UNASSIGNED: 连接器/任务尚未分配给工人。
  • RUNNING: 连接器/任务正在运行。
  • PAUSED: 连接器/任务已在管理上暂停。
  • STOPPED: 连接器已停止。请注意,此状态不适用于任务,因为停止连接器的任务已关闭,不会在状态API中可见。
  • FAILED: 连接器/任务失败(通常通过引发异常,该异常在状态输出中报告)。
  • RESTARTING: 连接器/任务要么正在主动重新启动,要么预计很快就会重新启动

在大多数情况下,连接器和任务状态将匹配,尽管当发生变化或任务失败时,它们在短时间内可能会有所不同。例如,当连接器首次启动时,在连接器及其任务全部过渡到运行状态之前可能会出现明显的延迟。当任务失败时,状态也会有所不同,因为Connect不会自动重新启动失败的任务。要手动重新启动连接器/任务,您可以使用上面列出的重新启动API。请注意,如果您在重新平衡时尝试重新启动任务,Connect将返回409(冲突)状态代码。您可以在重新平衡完成后重试,但可能没有必要,因为重新平衡会有效地重新启动集群中的所有连接器和任务。

从2.5.0开始,Kafka Connect使用status.storage.topic来存储与每个连接器正在使用的主题相关的信息。Connect Workers使用这些每个连接器主题状态更新,通过返回连接器正在使用的主题名称集来响应对REST端点GET /connectors/{name}/topics请求。对REST端点PUT /connectors/{name}/topics/reset的请求会重置连接器的活动主题集,并允许根据连接器的最新主题使用模式填充一组新主题。连接器删除后,连接器的活动主题集也会被删除。默认情况下启用主题跟踪,但可以通过设置topic.tracking.enable=false来禁用。如果您想禁止在运行时重置连接器活动主题的请求,请设置Worker属性topic.tracking.allow.reset=false

有时暂时停止连接器的消息处理很有用。例如,如果远程系统正在进行维护,源连接器最好停止轮询新数据,而不是用异常垃圾邮件填充日志。对于此用例,Connect提供了一个暂停/恢复API。当源连接器暂停时,Connect将停止轮询它以获取其他记录。当接收器连接器暂停时,Connect将停止向其推送新消息。暂停状态是持久的,因此即使您重新启动集群,在任务恢复之前,连接器也不会再次开始消息处理。请注意,在连接器的所有任务过渡到暂停状态之前可能会出现延迟,因为它们可能需要时间来完成暂停时的任何处理。此外,失败的任务在重新启动之前不会过渡到暂停状态。

在35.0中,Connect引入了一个停止API,可以完全关闭连接器的任务,并取消分配他们声称的任何资源。这与暂停连接器不同,连接器的任务处于空转,并分配了它们声称的任何资源(这允许连接器在恢复后快速开始处理数据)。从资源使用的角度来看,停止连接器比暂停它更有效,但一旦恢复,可能需要更长的时间才能开始处理数据。请注意,连接器的偏移量只有在处于停止状态时才能通过偏移管理端点进行修改。



回到顶部