跳转至

7. 安全

7.1 安全概述

在版本 0.9.0.0 中,Kafka 社区添加了许多功能,这些功能单独或一起使用可以提高 Kafka 集群的安全性。目前支持以下安全措施:

  1. 使用 SSL 或 SASL 对客户端(生产者和消费者)、其他代理和工具与代理的连接进行身份验证。Kafka支持以下SASL机制:
    • SASL/GSSAPI (Kerberos) - 从版本 0.9.0.0 开始
    • SASL/PLAIN - 从版本 0.10.0.0 开始
    • SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - 从版本 0.10.2.0 开始
    • SASL/OAUTHBEARER - 从版本 2.0 开始
  2. 从代理到 ZooKeeper 的连接身份验证
  3. 使用 SSL 对代理和客户端之间、代理之间或代理和工具之间传输的数据进行加密(请注意,启用 SSL 时会出现性能下降,其程度取决于 CPU 类型和 JVM 实现。)
  4. 客户端读/写操作的授权
  5. 授权可插拔,支持与外部授权服务集成

值得注意的是,安全性是可选的 - 支持非安全集群,以及经过身份验证、未经身份验证、加密和非加密客户端的混合。以下指南解释了如何在客户端和代理中配置和使用安全功能。

7.2 监听器配置

为了保护 Kafka 集群的安全,有必要保护用于与服务器通信的通道的安全。每个服务器必须定义一组侦听器,用于接收来自客户端以及其他服务器的请求。每个监听器可以被配置为使用各种机制来验证客户端并确保服务器和客户端之间的流量被加密。本节提供侦听器配置的入门知识。

Kafka 服务器支持侦听多个端口上的连接。这是通过listeners服务器配置中的属性进行配置的,该属性接受要启用的侦听器的逗号分隔列表。每台服务器上必须至少定义一个侦听器。中定义的每个监听器的格式listeners如下:

{LISTENER_NAME}://{hostname}:{port}

通常LISTENER_NAME是一个描述性名称,定义了侦听器的目的。例如,许多配置对客户端流量使用单独的侦听器,因此它们可能会引用CLIENT配置中相应的侦听器:

listeners=CLIENT://localhost:9092

每个侦听器的安全协议在单独的配置中定义: listener.security.protocol.map。该值是映射到其安全协议的每个侦听器的逗号分隔列表。例如,以下值配置指定CLIENT侦听器将使用 SSL,而 BROKER侦听器将使用明文。

listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT

下面给出了安全协议的可能选项:

  1. PLAINTEXT
  2. SSL
  3. SASL_PLAINTEXT
  4. SASL_SSL

明文协议不提供安全性,并且不需要任何额外的配置。在以下部分中,本文档介绍了如何配置其余协议。

如果每个所需的侦听器使用单独的安全协议,也可以使用安全协议名称作为 中的侦听器名称listeners。使用上面的示例,我们可以使用以下定义跳过CLIENT和侦听器的定义:BROKER

listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093

但是,我们建议用户为侦听器提供明确的名称,因为这样可以使每个侦听器的预期用途更加清晰。

inter.broker.listener.name在此列表中的侦听器中,可以通过将配置设置为侦听器的名称来声明用于代理间通信的侦听器。代理间侦听器的主要目的是分区复制。如果未定义,则代理间侦听器由 定义的安全协议确定security.inter.broker.protocol,默认为PLAINTEXT

对于依赖 Zookeeper 存储集群元数据的遗留集群,可以声明一个单独的侦听器,用于从活动控制器到代理的元数据传播。这是由 定义的control.plane.listener.name。当活动控制器需要将元数据更新推送到集群中的代理时,它将使用此侦听器。使用控制平面侦听器的好处是它使用单独的处理线程,这使得应用程序流量不太可能阻碍元数据更改(例如分区领导者和 ISR 更新)的及时传播。请注意,默认值为 null,这意味着控制器将使用由inter.broker.listener

在 KRaft 集群中,代理是任何broker启用了角色的服务器process.roles,控制器是任何启用了controller 角色的服务器。侦听器配置取决于角色。定义的侦听器 inter.broker.listener.name专门用于代理之间的请求。另一方面,控制器必须使用由 controller.listener.names配置定义的单独侦听器。不能将其设置为与代理间侦听器相同的值。

控制器接收来自其他控制器和代理的请求。因此,即使服务器没有controller启用该角色(即它只是一个代理),它仍然必须定义控制器侦听器以及配置它所需的任何安全属性。例如,我们可以在独立代理上使用以下配置:

process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

在此示例中,控制器侦听器仍配置为使用SASL_SSL 安全协议,但它未包含在其中,listeners因为代理不公开控制器侦听器本身。在这种情况下将使用的端口来自controller.quorum.voters配置,它定义了控制器的完整列表。

对于同时启用代理和控制器角色的 KRaft 服务器,配置类似。唯一的区别是控制器侦听器必须包含在 listeners

process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

要求定义的端口与controller.quorum.voters公开的控制器侦听器之一完全匹配。例如,此处 CONTROLLER侦听器绑定到端口 9093。 定义的连接字符串controller.quorum.voters也必须使用端口 9093,如此处所示。

控制器将接受由 定义的所有侦听器上的请求controller.listener.names。通常只有一个控制器侦听器,但也可以有更多。例如,这提供了一种通过集群滚动(一次滚动公开新监听器,一次滚动删除旧监听器)将活动侦听器从一个端口或安全协议更改为另一个端口或安全协议的方法。当定义了多个控制器侦听器时,列表中的第一个侦听器将用于出站请求。

在 Kafka 中,通常为客户端使用单独的侦听器。这允许集群间侦听器在网络级别进行隔离。对于 KRaft 中的控制器侦听器,侦听器应该被隔离,因为客户端无论如何都不使用它。客户端应该连接到代理上配置的任何其他侦听器。任何绑定到控制器的请求都将按 如下所述转发

在以下部分中,本文档介绍如何在侦听器上启用 SSL 进行加密和身份验证。后续部分将介绍使用 SASL 的其他身份验证机制。

7.3 使用 SSL 加密和验证

Apache Kafka 允许客户端使用 SSL 进行流量加密和身份验证。默认情况下,SSL 处于禁用状态,但可以根据需要打开。以下段落详细解释了如何设置您自己的 PKI 基础设施、使用它来创建证书以及配置 Kafka 以使用这些证书。

  1. 为每个 Kafka 代理生成 SSL 密钥和证书

部署一个或多个支持 SSL 的代理的第一步是为每台服务器生成公钥/私钥对。由于 Kafka 希望所有密钥和证书都存储在密钥库中,因此我们将使用 Java 的 keytool 命令来完成此任务。该工具支持两种不同的密钥库格式:Java 特定的 jks 格式(现已弃用)以及 PKCS12。从 Java 版本 9 开始,PKCS12 是默认格式,为了确保无论使用哪个 Java 版本,都使用此格式,所有以下命令都显式指定 PKCS12 格式。

> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12

上述命令中需要指定两个参数:

  1. keystorefile:存储此代理的密钥(以及后来的证书)的密钥库文件。密钥库文件包含该代理的私钥和公钥,因此需要保证其安全。理想情况下,此步骤在将使用密钥的 Kafka 代理上运行,因为该密钥永远不应该传输/离开其预期的服务器。
  2. 有效性:密钥的有效时间(以天为单位)。请注意,这与证书的有效期不同,证书的有效期将在签署证书中确定。您可以使用相同的密钥来请求多个证书:如果您的密钥的有效期为 10 年,但您的 CA 只会签署有效期为一年的证书,那么随着时间的推移,您可以对 10 个证书使用相同的密钥。

要获取可与刚刚创建的私钥一起使用的证书,需要创建证书签名请求。此签名请求由受信任的 CA 签名后会生成实际证书,然后可以将其安装在密钥库中并用于身份验证目的。
要生成证书签名请求,请对迄今为止创建的所有服务器密钥库运行以下命令。

> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}

该命令假设您想要将主机名信息添加到证书中,如果不是这种情况,您可以省略扩展参数-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}。请参阅下文了解更多相关信息。

主机名验证

启用后,主机名验证是根据该服务器的实际主机名或 IP 地址检查您要连接的服务器提供的证书中的属性的过程,以确保您确实连接到正确的服务器。
进行此检查的主要原因是为了防止中间人攻击。对于 Kafka,此检查已默认禁用很长一段时间,但从 Kafka 2.0.0 开始,默认为客户端连接以及代理间连接启用服务器主机名验证。可以通过设置为空字符串来
禁用服务器主机名验证。ssl.endpoint.identification.algorithm
对于动态配置的代理侦听器,可以使用以下方法禁用主机名验证kafka-configs.sh

> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

笔记:

通常,除了“让它正常工作”的最快方法之外,没有充分的理由禁用主机名验证,然后承诺“稍后有更多时间时修复它”!
如果在正确的时间完成正确的主机名验证并不那么困难,但一旦集群启动并运行就会变得更加困难 - 帮自己一个忙,现在就做!

如果启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名 (FQDN) 或 IP 地址:

  1. 通用名(中文)
  2. 主题备用名称 (SAN)

虽然 Kafka 检查这两个字段,但自 2000 年以来,不推荐 使用通用名称字段进行主机名验证, 并且应尽可能避免使用。此外,SAN 字段更加灵活,允许在证书中声明多个 DNS 和 IP 条目。
另一个优点是,如果 SAN 字段用于主机名验证,则可以将公用名称设置为更有意义的值以用于授权目的。由于我们需要将 SAN 字段包含在签名证书中,因此将在生成签名请求时指定该字段。也可以在生成密钥对时指定,但这不会自动复制到签名请求中。
要添加 SAN 字段,请将以下参数附加 -ext SAN=DNS:{FQDN},IP:{IPADDRESS}到 keytool 命令:

> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
  1. 创建您自己的 CA

在此步骤之后,集群中的每台计算机都拥有一个可用于加密流量的公钥/私钥对和一个证书签名请求,这是创建证书的基础。要添加身份验证功能,此签名请求需要由将在此步骤中创建的受信任机构进行签名。

证书颁发机构 (CA) 负责签署证书。CA 的工作方式类似于颁发护照的政府 - 政府在每本护照上盖章(签名),以使护照难以伪造。其他政府会验证印章以确保护照的真实性。类似地,CA 对证书进行签名,并且密码学保证签名的证书在计算上难以伪造。因此,只要 CA 是真正且值得信赖的权威机构,客户端就可以强有力地保证他们正在连接到真实的机器。

对于本指南,我们将成为我们自己的证书颁发机构。在企业环境中设置生产集群时,这些证书通常由整个公司信任的企业 CA 签名。请参阅生产中的常见陷阱,了解这种情况下需要考虑的一些事项。

由于OpenSSL 中的错误,x509 模块不会将请求的扩展字段从 CSR 复制到最终证书中。由于我们希望 SAN 扩展出现在我们的证书中以启用主机名验证,因此我们将使用ca模块。这需要在生成 CA 密钥对之前进行一些额外的配置。
将以下列表保存到名为 openssl-ca.cnf 的文件中,并根据需要调整有效性和通用属性的值。

HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # The default ca section

[ CA_default ]

base_dir      = .
certificate   = $base_dir/cacert.pem   # The CA certifcate
private_key   = $base_dir/cakey.pem    # The CA private key
new_certs_dir = $base_dir              # Location for new certs after signing
database      = $base_dir/index.txt    # Database index file
serial        = $base_dir/serial.txt   # The current serial number

default_days     = 1000         # How long to certify for
default_crl_days = 30           # How long before next CRL
default_md       = sha256       # Use public key default MD
preserve         = no           # Keep passed DN ordering

x509_extensions = ca_extensions # The extensions to add to the cert

email_in_dn     = no            # Don't concat the email in the DN
copy_extensions = copy          # Required to copy SANs from CSR to cert

####################################################################
[ req ]
default_bits       = 4096
default_keyfile    = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions    = ca_extensions
string_mask        = utf8only

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)
countryName_default = DE

stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = Test Province

localityName                = Locality Name (eg, city)
localityName_default        = Test Town

organizationName            = Organization Name (eg, company)
organizationName_default    = Test Company

organizationalUnitName         = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit

commonName         = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name

emailAddress         = Email Address
emailAddress_default = test@test.com

####################################################################
[ ca_extensions ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints       = critical, CA:true
keyUsage               = keyCertSign, cRLSign

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment

然后创建数据库和序列号文件,这些文件将用于跟踪使用该 CA 签署的证书。这两个文件都是简单的文本文件,与您的 CA 密钥位于同一目录中。

> echo 01 > serial.txt
> touch index.txt

完成这些步骤后,您现在就可以生成 CA,稍后将使用该 CA 来签署证书。

> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM

CA 只是一个由自身签名的公钥/私钥对和证书,仅用于签署其他证书。
该密钥对应该保持非常安全,如果有人访问它,他们可以创建并签署您的基础设施信任的证书,这意味着他们在连接到信任该 CA 的任何服务时将能够冒充任何人。
下一步是将生成的 CA 添加到 **客户端的信任库**,以便客户端可以信任此 CA:

> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

注意: 如果您通过在 Kafka 代理配置中将 ssl.client.auth 设置为“请求”或“必需”来将 Kafka 代理配置为需要客户端身份验证,那么您还必须为 Kafka 代理提供一个信任库,并且它应该包含所有内容签署客户端密钥的 CA 证书。

> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

与步骤 1 中存储每台计算机自己的身份的密钥库不同,客户端的信任库存储客户端应该信任的所有证书。将证书导入到信任库中还意味着信任由该证书签名的所有证书。正如上面的类比,信任政府(CA)也意味着信任其颁发的所有护照(证书)。该属性称为信任链,在大型 Kafka 集群上部署 SSL 时特别有用。您可以使用单个 CA 签署集群中的所有证书,并让所有计算机共享信任该 CA 的同一信任库。这样所有机器都可以验证所有其他机器。

  1. 签署证书

然后使用 CA 对其进行签名:

> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}

最后,您需要将 CA 的证书和签名的证书导入密钥库:

> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
> keytool -keystore {keystore} -alias localhost -import -file cert-signed

参数定义如下:

  1. keystore:密钥库的位置
  2. CA证书:CA的证书
  3. 证书签名请求:使用服务器密钥创建的 csr
  4. 服务器证书:用于写入服务器签名证书的文件

这将为您留下一个名为truststore.jks的信任库- 这对于所有客户和经纪商来说都是相同的,并且不包含任何敏感信息,因此无需保护这一点。
此外,每个节点都会有一个server.keystore.jks文件,其中包含节点密钥、证书和 CA 证书,请参阅配置 Kafka 代理配置 Kafka 客户端 以获取有关如何使用这些文件的信息。

有关此主题的一些工具帮助,请查看easyRSA项目,该项目具有丰富的脚本来帮助完成这些步骤。

PEM 格式的 SSL 密钥和证书

从 2.7.0 开始,可以直接在 PEM 格式的配置中为 Kafka 代理和客户端配置 SSL 密钥和信任存储。这避免了在文件系统上存储单独文件的需要,并受益于 Kafka 配置的密码保护功能。除了 JKS 和 PKCS12 之外,PEM 还可以用作基于文件的密钥和信任存储的存储类型。要直接在代理或客户端配置中配置 PEM 密钥存储,应在 中提供 PEM 格式的私钥,ssl.keystore.key并应在 中提供 PEM 格式的证书链ssl.keystore.certificate.chain。要配置信任存储,应在以下位置提供信任证书,例如 CA 的公共证书ssl.truststore.certificates。由于 PEM 通常存储为多行 base-64 字符串,因此配置值可以作为多行字符串包含在 Kafka 配置中,其中行以反斜杠 ('\') 结尾以继续行。

存储密码配置ssl.keystore.passwordssl.truststore.password不用于 PEM。如果私钥使用密码加密,则必须在 中提供密钥密码ssl.key.password。私钥可以以未加密的形式提供,无需密码。在生产部署中,在这种情况下,应使用 Kafka 中的密码保护功能对配置进行加密或外部化。请注意,当使用 OpenSSL 等外部工具进行加密时,默认 SSL 引擎工厂对加密私钥的解密功能有限。BouncyCastle 等第三方库可以与自定义集成,SslEngineFactory以支持更广泛的加密私钥。

  1. 生产中常见的陷阱

上面的段落展示了创建您自己的 CA 并使用它为您的集群签署证书的过程。虽然对于沙箱、开发、测试和类似系统非常有用,但这通常不是在企业环境中为生产集群创建证书的正确过程。企业通常会运营自己的CA,用户可以发送CSR来与该CA签署,这样做的好处是用户无需负责保证CA的安全,并且有一个每个人都可以信任的中央权威。然而,它也剥夺了用户对证书签名过程的很多控制权。 1. 扩展密钥使用
证书可能包含控制证书使用目的的扩展字段。如果此字段为空,则对使用没有限制,但如果此处指定了任何使用,则有效的 SSL 实现必须强制执行这些使用。
Kafka的相关用法有:

*   客户端认证
*   服务器认证

Kafka 代理需要允许这两种用法,因为对于集群内通信,每个代理将同时充当其他代理的客户端和服务器。企业 CA 拥有 Web 服务器的签名配置文件并将其用于 Kafka 的情况并不罕见,该配置文件仅包含 serverAuth*使用*值并导致 SSL 握手失败。
  1. 出于安全原因,中间证书
    企业根 CA 通常保持离线状态。为了实现日常使用,创建了所谓的中间 CA,然后使用它们来签署最终证书。将由中间 CA 签名的证书导入密钥库时,有必要提供直至根 CA 的整个信任链。只需将证书文件合并到一个组合证书文件中,然后使用 keytool 导入该文件即可完成此操作
  2. 无法复制扩展字段
    CA 运营商通常不愿意从 CSR 复制和请求扩展字段,而是更愿意自己指定这些字段,因为这使得恶意方更难获取具有潜在误导性或欺诈性值的证书。建议仔细检查签名证书,这些证书是否包含所有请求的 SAN 字段以启用正确的主机名验证。以下命令可用于将证书详细信息打印到控制台,该详细信息应与最初请求的内容进行比较:
> openssl x509 -in certificate.crt -text -noout
  1. 配置 Kafka 代理

如果没有为代理间通信启用 SSL(请参阅下文了解如何启用它),则 PLAINTEXT 和 SSL 端口都是必需的。

listeners=PLAINTEXT://host.name:port,SSL://host.name:port

代理端需要以下 SSL 配置

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password 在技术上是可选的,但强烈建议使用。如果未设置密码,仍然可以访问信任库,但完整性检查将被禁用。值得考虑的可选设置:

  1. ssl.client.auth=none (“required”=> 需要客户端身份验证,“requested”=> 请求客户端身份验证,没有证书的客户端仍然可以连接。不鼓励使用“requested”,因为它提供了一种错误的感觉安全性和配置错误的客户端仍将成功连接。)
  2. ssl.cipher.suites(可选)。密码套件是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。(默认为空列表)
  3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1(列出您要从客户端接受的 SSL 协议。请注意,SSL 已被弃用,取而代之的是 TLS,并且不建议在生产中使用 SSL)
  4. ssl.keystore.type=JKS
  5. ssl.truststore.type=JKS
  6. ssl.secure.random.implementation=SHA1PRNG

如果要为代理间通信启用 SSL,请将以下内容添加到 server.properties 文件(默认为 PLAINTEXT)

security.inter.broker.protocol=SSL

由于某些国家/地区的进口法规,Oracle 实施限制了默认情况下可用的加密算法的强度。如果需要更强的算法(例如,具有 256 位密钥的 AES),则必须获取JCE 无限强度管辖策略文件并将其安装在 JDK/JRE 中。有关详细信息, 请参阅 JCA 提供商文档。

JRE/JDK 将有一个默认的伪随机数生成器 (PRNG),用于加密操作,因此不需要配置ssl.secure.random.implementation. 然而,某些实现存在性能问题(特别是,Linux 系统上默认选择的,NativePRNG使用全局锁)。如果 SSL 连接的性能成为问题,请考虑显式设置要使用的实现。该SHA1PRNG实现是非阻塞的,并且在重负载(50 MB/秒生成的消息,加上每个代理的复制流量)下表现出了非常好的性能特征。

启动代理后,您应该能够在 server.log 中看到

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

要快速检查服务器密钥库和信任库是否设置正确,您可以运行以下命令

> openssl s_client -debug -connect localhost:9093 -tls1

(注意:TLSv1 应列在 ssl.enabled.protocols 下)
在此命令的输出中,您应该看到服务器的证书:

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

如果证书未显示或存在任何其他错误消息,则说明您的密钥库设置不正确。 6. #### 配置 Kafka 客户端

仅新的 Kafka Producer 和 Consumer 支持 SSL,不支持旧的 API。SSL 配置对于生产者和消费者来说都是相同的。
如果代理中不需要客户端身份验证,则以下是最小配置示例:

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password 在技术上是可选的,但强烈建议使用。如果未设置密码,仍然可以访问信任库,但完整性检查将被禁用。如果需要客户端身份验证,则必须像步骤 1 中那样创建密钥库,并且还必须配置以下内容:

ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

根据我们的要求和代理配置,可能还需要其他配置设置:

  1. ssl.provider(可选)。用于 SSL 连接的安全提供程序的名称。默认值是 JVM 的默认安全提供程序。
  2. ssl.cipher.suites(可选)。密码套件是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。
  3. ssl.enabled.protocols=TLSv1.2、TLSv1.1、TLSv1。它应该至少列出代理端配置的协议之一
  4. ssl.truststore.type=JKS
  5. ssl.keystore.type=JKS

使用控制台生产者和控制台消费者的示例:

> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

7.4 使用 SASL 进行身份验证

  1. JAAS配置

Kafka 使用 Java 身份验证和授权服务 ( JAAS ) 进行 SASL 配置。

  1. Kafka 代理的 JAAS 配置

KafkaServer是每个 KafkaServer/Broker 使用的 JAAS 文件中的部分名称。本部分为代理提供 SASL 配置选项,包括代理为代理间通信而建立的任何 SASL 客户端连接。如果将多个侦听器配置为使用 SASL,则部分名称可以以小写侦听器名称为前缀,后跟句点,例如sasl_ssl.KafkaServer。

客户端部分用于验证与 Zookeeper 的 SASL 连接。它还允许代理在 Zookeeper 节点上设置 SASL ACL,从而锁定这些节点,以便只有代理可以修改它。所有经纪商必须具有相同的主体名称。如果要使用除 Client 之外的节名称,请将系统属性Zookeeper.sasl.clientconfig设置为适当的名称(例如-Dzookeeper.sasl.clientconfig=ZkClient)。

ZooKeeper 默认使用“zookeeper”作为服务名称。如果要更改此设置,请将系统属性 Zookeeper.sasl.client.username设置为适当的名称(例如-Dzookeeper.sasl.client.username \=zk)。

代理还可以使用代理配置属性来配置 JAAS sasl.jaas.config。属性名称必须以侦听器前缀(包括 SASL 机制)作为前缀,即listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 配置值中只能指定一个登录模块。如果在侦听器上配置了多个机制,则必须使用侦听器和机制前缀为每个机制提供配置。例如,

listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";

如果在不同级别定义 JAAS 配置,则使用的优先顺序为:

  • 代理配置属性listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
  • {listenerName}.KafkaServer静态 JAAS 配置部分
  • KafkaServer静态 JAAS 配置部分

请注意,ZooKeeper JAAS 配置只能使用静态 JAAS 配置进行配置。

有关代理配置示例,请参阅GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER 。

  1. Kafka 客户端的 JAAS 配置

客户端可以使用客户端配置属性 sasl.jaas.config 或使用 类似于代理的静态 JAAS 配置文件来配置 JAAS。

  1. 使用客户端配置属性的 JAAS 配置

客户端可以将 JAAS 配置指定为生产者或消费者属性,而无需创建物理配置文件。此模式还允许同一 JVM 中的不同生产者和消费者通过为每个客户端指定不同的属性来使用不同的凭据。如果同时指定了静态 JAAS 配置系统属性 java.security.auth.login.config和客户端属性sasl.jaas.config ,则将使用客户端属性。

有关示例配置,请参阅GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER 。

  1. 使用静态配置文件的 JAAS 配置

要使用静态 JAAS 配置文件在客户端上配置 SASL 身份验证: 1. 添加一个 JAAS 配置文件,其中包含名为KafkaClient的客户端登录部分。在KafkaClient中为所选机制配置登录模块,如设置GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER的示例中所述。例如,GSSAPI 凭证可以配置为:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="kafka-client-1@EXAMPLE.COM";
};
  1. 将 JAAS 配置文件位置作为 JVM 参数传递给每个客户端 JVM。例如:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
  1. SASL配置

SASL 可以与 PLAINTEXT 或 SSL 一起使用,作为分别使用安全协议 SASL_PLAINTEXT 或 SASL_SSL 的传输层。如果使用SASL_SSL,则还必须配置SSL

  1. SASL机制

    Kafka支持以下SASL机制: * GSSAPI(Kerberos) * 清楚的 * SCRAM-SHA-256 * SCRAM-SHA-512 * 承载者

  2. Kafka 代理的 SASL 配置
  3. 在 server.properties 中配置 SASL 端口,方法是将 SASL_PLAINTEXT 或 SASL_SSL 至少之一添加到listeners参数,该参数包含一个或多个逗号分隔值:

listeners=SASL_PLAINTEXT://host.name:port

如果您仅配置 SASL 端口(或者如果您希望 Kafka 代理使用 SASL 相互进行身份验证),请确保为代理间通信设置相同的 SASL 协议:

security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
  1. 选择 要在代理中启用的一种或多种受支持的机制,并按照步骤为该机制配置 SASL。要在代理中启用多种机制,请按照此处的步骤操作 。

  2. Kafka 客户端的 SASL 配置

仅新的 Java Kafka 生产者和消费者支持 SASL 身份验证,不支持旧的 API。

要在客户端上配置 SASL 身份验证,请选择 在代理中启用的SASL机制以进行客户端身份验证,然后按照步骤为所选机制配置 SASL。

注意:当通过 SASL 建立与代理的连接时,客户端可以对代理地址执行反向 DNS 查找。由于 JRE 实现反向 DNS 查找的方式,如果未使用完全限定的域名(对于客户端bootstrap.servers和代理的 [advertised.listeners](https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners).

  1. 使用 SASL/Kerberos 进行身份验证

  2. 先决条件
  3. Kerberos
    如果您的组织已经使用 Kerberos 服务器(例如,通过使用 Active Directory),则无需仅为 Kafka 安装新服务器。否则,您将需要安装一个,您的 Linux 供应商可能有 Kerberos 软件包以及有关如何安装和配置它的简短指南(UbuntuRedhat)。请注意,如果您使用 Oracle Java,则需要下载适合您的 Java 版本的 JCE 策略文件并将其复制到 $JAVA_HOME/jre/lib/security。

  4. 创建 Kerberos 主体
    如果您使用组织的 Kerberos 或 Active Directory 服务器,请向 Kerberos 管理员询问集群中每个 Kafka 代理以及将使用 Kerberos 身份验证(通过客户端和工具)访问 Kafka 的每个操作系统用户的主体。
    如果您安装了自己的 Kerberos,则需要使用以下命令自行创建这些主体:
> sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
> sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
  1. 确保可以使用主机名访问所有主机- Kerberos 要求所有主机都可以使用其 FQDN 进行解析。

  2. 配置 Kafka 代理
  3. 将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在本示例中我们将其称为 kafka_server_jaas.conf(请注意,每个代理应该有自己的密钥表):

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

// Zookeeper client authentication
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

JAAS 文件中的KafkaServer部分告诉代理要使用哪个主体以及存储该主体的密钥表的位置。它允许代理使用本节中指定的密钥表登录。有关 Zookeeper SASL 配置的更多详细信息, 请参阅注释。

  1. 将 JAAS 和可选的 krb5 文件位置作为 JVM 参数传递给每个 Kafka 代理(请参阅此处了解更多详细信息):
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
  1. 确保启动 kafka 代理的操作系统用户可以读取 JAAS 文件中配置的密钥表。
  2. 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI

我们还必须在 server.properties 中配置服务名称,该名称应与 kafka 代理的主体名称匹配。在上面的示例中,主体是“kafka/kafka1.hostname.com@EXAMPLE.com”,因此:

sasl.kerberos.service.name=kafka
  1. 配置 Kafka 客户端

要在客户端上配置 SASL 身份验证: 1. 客户端(生产者、消费者、连接工作人员等)将使用自己的主体(通常与运行客户端的用户同名)向集群进行身份验证,因此根据需要获取或创建这些主体。然后为每个客户端配置 JAAS 配置属性。JVM 中的不同客户端可以通过指定不同的主体以不同的用户身份运行。该物业sasl.jaas.configProducer.properties 或 Consumer.properties 中描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是使用密钥表的客户端的示例配置(建议用于长时间运行的进程):

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true  \
    keyTab="/etc/security/keytabs/kafka_client.keytab" \
    principal="kafka-client-1@EXAMPLE.COM";

对于 kafka-console-consumer 或 kafka-console- Producer 等命令行实用程序,kinit 可以与“useTicketCache = true”一起使用,如下所示:

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useTicketCache=true;

客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。 2. 确保启动 kafka 客户端的操作系统用户可以读取 JAAS 配置中配置的密钥表。 3. (可选)将 krb5 文件位置作为 JVM 参数传递给每个客户端 JVM(请参阅此处了解更多详细信息):

-Djava.security.krb5.conf=/etc/kafka/krb5.conf
  1. 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
  1. 使用 SASL/PLAIN 进行身份验证

SASL/PLAIN是一种简单的用户名/密码认证机制,通常与TLS结合使用进行加密,实现安全认证。Kafka 支持 SASL/PLAIN 的默认实现,可以将其扩展到生产用途,如此处所述

在默认实现下principal.builder.class,用户名用作PrincipalACL 等配置的 身份验证。 1. ##### 配置 Kafka 代理

  1. 将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

此配置定义了两个用户(adminalice)。KafkaServer部分 中的属性用户名和密码由代理用来启动与其他代理的连接。在此示例中, admin是经纪商间通信的用户。属性集user_ userName定义连接到代理的所有用户的密码,并且代理使用这些属性验证所有客户端连接,包括来自其他代理的连接。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
  1. 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
  1. 配置 Kafka 客户端

要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 PLAIN 机制的客户端配置示例:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

客户端使用用户名和密码选项来配置客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在sasl.jaas.config.

客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。

  1. 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
  1. 在生产中使用 SASL/PLAIN
    • SASL/PLAIN 只能与 SSL 一起使用作为传输层,以确保明文密码不会在未经加密的情况下在线路上传输。
    • Kafka 中 SASL/PLAIN 的默认实现在 JAAS 配置文件中指定用户名和密码,如下 所示sasl.server.callback.handler.class从 Kafka 2.0 版本开始,您可以通过配置自己的回调处理程序来避免在磁盘上存储明文密码,这些回调处理程序使用配置选项和来从外部源获取用户名和密码 sasl.client.callback.handler.class
    • 在生产系统中,外部认证服务器可以实现密码认证。从 Kafka 2.0 版开始,您可以插入自己的回调处理程序,通过配置 sasl.server.callback.handler.class.
  2. 使用 SASL/SCRAM 进行身份验证

Salted 质询响应身份验证机制 (SCRAM) 是一系列 SASL 机制,可解决执行用户名/密码身份验证(例如 PLAIN 和 DIGEST-MD5)的传统机制的安全问题。该机制在RFC 5802中定义。Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,可以与TLS结合使用来执行安全身份验证。默认实现下principal.builder.class,使用用户名作为经过身份验证的 Principal用于配置 ACL 等。Kafka 中的默认 SCRAM 实现将 SCRAM 凭证存储在 Zookeeper 中,适合在 Zookeeper 位于专用网络上的 Kafka 安装中使用。 有关更多详细信息,请参阅安全注意事项。

  1. 创建 SCRAM 凭证

Kafka 中的 SCRAM 实现使用 Zookeeper 作为凭证​​存储。可以使用kafka-configs.sh在 Zookeeper 中创建凭证。对于启用的每个 SCRAM 机制,必须通过添加具有机制名称的配置来创建凭据。必须在启动 Kafka 代理之前创建代理间通信的凭证。可以动态创建和更新客户端凭证,并且更新的凭证将用于验证新连接。

使用密码alice-secret为用户alice创建 SCRAM 凭证:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

如果未指定迭代,则使用默认迭代计数 4096。创建随机盐,并将由盐、迭代、StoredKey 和 ServerKey 组成的 SCRAM 身份存储在 Zookeeper 中。有关 SCRAM 身份和各个字段的详细信息, 请参阅RFC 5802 。

以下示例还需要用户管理员进行代理间通信,可以使用以下命令创建:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

可以使用--describe选项列出现有凭据:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice

可以使用--alter --delete-config选项删除一种或多种 SCRAM 机制的凭证:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
  1. 配置 Kafka 代理
  2. 将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};

KafkaServer部分中的 属性用户名和密码由代理用来启动与其他代理的连接。在此示例中,admin是经纪商间通信的用户。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
  1. 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
  1. 配置 Kafka 客户端

要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 SCRAM 机制的客户端配置示例:

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="alice" \
    password="alice-secret";

客户端使用用户名和密码选项来配置客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在sasl.jaas.config.

客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。

  1. 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
  1. SASL/SCRAM 的安全注意事项
  2. Kafka 中 SASL/SCRAM 的默认实现将 SCRAM 凭证存储在 Zookeeper 中。这适用于 Zookeeper 安全且位于专用网络上的安装中的生产使用。

  3. Kafka 仅支持强哈希函数 SHA-256 和 SHA-512,最小迭代次数为 4096。强哈希函数与强密码和高迭代次数相结合,可以在 Zookeeper 安全性受到损害时防止暴力攻击。
  4. SCRAM 应仅与 TLS 加密一起使用,以防止 SCRAM 交换被拦截。这可以防止字典或暴力攻击,并在 Zookeeper 受到威胁时防止假冒。
  5. sasl.server.callback.handler.class从 Kafka 2.0 版开始,可以通过在 Zookeeper 不安全的安装中进行配置,使用自定义回调处理程序覆盖默认 SASL/SCRAM 凭证存储。
  6. 有关安全注意事项的更多详细信息,请参阅 RFC 5802

  7. 使用 SASL/OAUTHBEARER 进行身份验证

OAuth 2 授权框架“使第三方应用程序能够代表资源所有者通过协调资源所有者和 HTTP 服务之间的批准交互,或者通过允许第三方应用程序获得对 HTTP 服务的有限访问权限。以自己的名义获得访问权限。” SASL OAUTHBEARER 机制允许在 SASL(即非 HTTP)上下文中使用该框架;它在RFC 7628中定义。Kafka 中的默认 OAUTHBEARER 实现创建并验证不安全的 JSON Web 令牌 ,并且仅适合在非生产 Kafka 安装中使用。请参阅安全注意事项 更多细节。

在默认实现下principal.builder.class,OAuthBearerToken的principalName用作PrincipalACL等配置的身份 验证。

  1. 配置 Kafka 代理
  2. 将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    unsecuredLoginStringClaim_sub="admin";
};

KafkaServer部分中的 属性unsecuredLoginStringClaim_sub由代理在启动与其他代理的连接时使用。在此示例中,admin将出现在主题 ( sub ) 声明中,并将成为broker间通信的用户。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
  1. 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER
  1. 配置 Kafka 客户端

要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 OAUTHBEARER 机制的客户端配置示例:

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    unsecuredLoginStringClaim_sub="alice";

客户端使用选项unsecuredLoginStringClaim_sub来配置主题 ( sub ) 声明,该声明确定客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在 中指定不同的主题 ( sub ) 声明来作为不同用户进行连接sasl.jaas.config

客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。

  1. 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER
  1. SASL/OAUTHBEARER 的默认实现取决于 jackson-databind 库。由于它是一个可选依赖项,因此用户必须通过其构建工具将其配置为依赖项。
  2. SASL/OAUTHBEARER 的无担保令牌创建选项
    • Kafka 中 SASL/OAUTHBEARER 的默认实现创建并验证不安全的 JSON Web 令牌。虽然仅适用于非生产用途,但它确实提供了在开发或测试环境中创建任意令牌的灵活性。
    • 以下是客户端(以及代理端,如果 OAUTHBEARER 是代理间协议)支持的各种 JAAS 模块选项:
    JAAS Module Option for Unsecured Token Creation Documentation
    unsecuredLoginStringClaim_="value" Creates a String claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
    unsecuredLoginNumberClaim_="value" Creates a Number claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
    unsecuredLoginListClaim_="value" Creates a String List claim with the given name and values parsed from the given value where the first character is taken as the delimiter. For example: unsecuredLoginListClaim_fubar="
    unsecuredLoginExtension_="value" Creates a String extension with the given name and value. For example: unsecuredLoginExtension_traceId="123". A valid extension name is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved. A valid extension value is any combination of characters with ASCII codes 1-127.
    unsecuredLoginPrincipalClaimName Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than 'sub'.
    unsecuredLoginLifetimeSeconds Set to an integer value if the token expiration is to be set to something other than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will be set to reflect the expiration time.
    unsecuredLoginScopeClaimName Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than 'scope'.
  3. SASL/OAUTHBEARER 的不安全令牌验证选项
    JAAS Module Option for Unsecured Token Validation Documentation
    unsecuredValidatorPrincipalClaimName="value" Set to a non-empty value if you wish a particular String claim holding a principal name to be checked for existence; the default is to check for the existence of the 'sub' claim.
    unsecuredValidatorScopeClaimName="value" Set to a custom claim name if you wish the name of the String orString List claim holding any token scope to be something other than 'scope'.
    unsecuredValidatorRequiredScope="value" Set to a space-delimited list of scope values if you wish theString/String Listclaim holding the token scope to be checked to make sure it contains certain values.
    unsecuredValidatorAllowableClockSkewMs="value" Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0).
    • 可以使用自定义登录和 SASL 服务器回调处理程序来覆盖默认的不安全 SASL/OAUTHBEARER 实现(并且必须在生产环境中覆盖)。
    • 有关安全注意事项的更多详细信息,请参阅RFC 6749 第 10 节
    • SASL/OAUTHBEARER 的令牌刷新

    Kafka 会在令牌过期之前定期刷新令牌,以便客​​户端可以继续与代理建立连接。影响刷新算法运行方式的参数被指定为生产者/消费者/代理配置的一部分,如下所示。有关详细信息,请参阅其他地方有关这些属性的文档。默认值通常是合理的,在这种情况下,不需要显式设置这些配置参数。

    Producer/Consumer/Broker Configuration Property
    sasl.login.refresh.window.factor
    sasl.login.refresh.window.jitter
    sasl.login.refresh.min.period.seconds
    sasl.login.refresh.min.buffer.seconds
  4. SASL/OAUTHBEARER 的安全/生产使用

    生产用例需要编写org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现 ,它可以处理org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback的实例 ,并通过sasl.login声明它 非代理客户端的 .callback.handler.class配置选项或通过代理的listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class 配置选项(当 SASL/OAUTHBEARER 是代理间协议时) 。

    生产用例还需要编写org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现 ,它可以处理org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback的实例 ,并通过listener.name声明它 .sasl_ssl.oauthbearer.sasl.server.callback.handler.class 代理配置选项。

  5. SASL/OAUTHBEARER 的安全注意事项
    • Kafka 中 SASL/OAUTHBEARER 的默认实现创建并验证不安全的 JSON Web 令牌。这仅适用于非生产用途。
    • OAUTHBEARER 应仅在具有 TLS 加密的生产环境中使用,以防止令牌被拦截。
    • 如上所述,可以使用自定义登录和 SASL 服务器回调处理程序来覆盖默认的不安全 SASL/OAUTHBEARER 实现(并且必须在生产环境中覆盖)。
    • 有关 OAuth 2 一般安全注意事项的更多详细信息,请参阅RFC 6749,第 10 节
    • 在代理中启用多个 SASL 机制

  6. 在JAAS 配置文件的KafkaServer部分中指定所有启用机制的登录模块的配置。例如:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};
  1. 在 server.properties 中启用 SASL 机制:
sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
  1. 如果需要,请在 server.properties 中指定用于代理间通信的 SASL 安全协议和机制:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
  1. 按照GSSAPI (Kerberos)、 PLAIN、 SCRAMOAUTHBEARER中特定于机制的步骤 为启用的机制配置 SASL。
  2. 修改正在运行的集群中的SASL机制

可以使用以下顺序在正在运行的集群中修改 SASL 机制:

  1. 通过将机制添加到每个代理的 server.properties 中的sasl.enabled.mechanisms中,启用新的 SASL 机制。更新 JAAS 配置文件以包括此处所述的两种机制。增量反弹集群节点。
  2. 使用新机制重新启动客户端。
  3. 要更改代理间通信的机制(如果需要),请将server.properties 中的sasl.mechanism.inter.broker.protocol设置为新机制,并再次增量地弹跳集群。
  4. 要删除旧机制(如果需要),请从server.properties 中的sasl.enabled.mechanisms中删除旧机制,并从 JAAS 配置文件中删除旧机制的条目。再次增量地反弹集群。

  5. 使用委托令牌进行身份验证

基于委托令牌的身份验证是一种轻量级身份验证机制,可补充现有的 SASL/SSL 方法。委托令牌是 kafka broker和客户端之间的共享秘密。委派令牌将帮助处理框架将工作负载分配给安全环境中的可用工作人员,而无需在使用 2 向 SSL 时增加分配 Kerberos TGT/密钥表或密钥库的成本。 有关更多详细信息,请参阅KIP-48 。

在默认实现下principal.builder.class,委托令牌的所有者用作PrincipalACL 等配置的 身份验证。

使用委托令牌的典型步骤是:

  1. 用户通过 SASL 或 SSL 向 Kafka 集群进行身份验证,并获取委托令牌。这可以使用管理 API 或使用kafka-delegation-tokens.sh脚本来完成。
  2. 用户将委托令牌安全地传递给 Kafka 客户端,以便通过 Kafka 集群进行身份验证。
  3. 令牌所有者/更新者可以更新/过期委托令牌。

  4. 代币管理

秘密用于生成和验证委托令牌。这是使用配置选项delegate.token.secret.key提供的。必须在所有代理之间配置相同的密钥。如果密钥未设置或设置为空字符串,代理将禁用委托令牌身份验证。

在当前的实现中,令牌详细信息存储在 Zookeeper 中,适合在 Zookeeper 位于专用网络上的 Kafka 安装中使用。目前,此机密以纯文本形式存储在 server.properties 配置文件中。我们打算在未来的 Kafka 版本中对这些进行配置。

代币具有当前寿命和最大可再生寿命。默认情况下,令牌必须每 24 小时更新一次,最多 7 天。这些可以使用delegation.token.expiry.time.ms 和delegation.token.max.lifetime.ms配置选项进行配置。

令牌也可以显式取消。如果令牌在令牌过期时间内未更新,或者令牌超出了最大生命周期,则将从所有代理缓存以及 Zookeeper 中删除该令牌。

  1. 创建委托令牌

可以使用管理 API 或使用kafka-delegation-tokens.sh脚本创建令牌。委托令牌请求(创建/更新/过期/描述)应仅在 SASL 或 SSL 身份验证通道上发出。如果通过委托令牌完成初始身份验证,则无法请求令牌。用户也可以通过指定--owner-principal参数为该用户或其他人创建令牌。所有者/更新者可以更新或过期令牌。所有者/更新者始终可以描述自己的代币。要描述其他令牌,需要在代表令牌所有者的用户资源上添加 DESCRIBE_TOKEN 权限。 下面给出了kafka-delegation-tokens.sh脚本示例。

创建委托令牌:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

为不同的所有者创建委托令牌:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1

更新委托令牌:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

使委托令牌过期:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

可以使用 --describe 选项描述现有令牌:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
  1. 令牌认证

委托令牌身份验证搭载当前 SASL/SCRAM 身份验证机制。我们必须在 Kafka 集群上启用 SASL/SCRAM 机制,如此处所述

配置 Kafka 客户端:

  1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是用于令牌身份验证的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="tokenID123" \
    password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
    tokenauth="true";

客户端使用用户名和密码选项来配置令牌 id 和令牌 HMAC。tokenauth选项用于向服务器指示令牌认证。在此示例中,客户端使用令牌 id 连接到代理:tokenID123。JVM 中的不同客户端可以通过在 中指定不同的令牌详细信息来使用不同的令牌进行连接sasl.jaas.config

客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。

  1. 手动轮换机密的过程:

当秘密需要轮换时,我们需要重新部署。在此过程中,已连接的客户端将继续工作。但是任何新的连接请求和使用旧令牌的续订/过期请求都可能会失败。步骤如下。

  1. 使所有现有令牌过期。
  2. 通过滚动升级轮换秘密,并且
  3. 生成新的代币

我们打算在未来的 Kafka 版本中实现自动化。

7.5 授权和 ACL

Kafka 附带了一个可插入的授权框架,该框架在服务器配置中使用authorizer.class.name属性进行配置。配置的实现必须扩展org.apache.kafka.server.authorizer.Authorizer。Kafka 提供了将 ACL 存储在集群元数据(Zookeeper 或 KRaft 元数据日志)中的默认实现。对于基于Zookeeper的集群,提供的实现配置如下:

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

对于 KRaft 集群,在所有节点(代理、控制器或组合代理/控制器节点)上使用以下配置:

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

Kafka ACL 的通用格式是“主体 {P} 是[允许|拒绝]来自主机 {H} 的任何与 ResourcePattern {RP} 匹配的资源 {R} 上的操作 {O}”。您可以在KIP-11中阅读有关 ACL 结构的更多信息,在KIP-290中阅读有关资源模式的更多信息。要添加、删除或列出 ACL,您可以使用 Kafka ACL CLI kafka-acls.sh。默认情况下,如果没有 ResourcePatterns 与特定资源 R 匹配,则 R 没有关联的 ACL,因此除了超级用户之外,任何人都不允许访问 R。如果要更改该行为,可以在 server.properties 中包含以下内容。

allow.everyone.if.no.acl.found=true

还可以在 server.properties 中添加超级用户,如下所示(请注意,分隔符是分号,因为 SSL 用户名可能包含逗号)。默认的 PrimaryType 字符串“User”区分大小写。

super.users=User:Bob;User:Alice

Kraft 主要转发

在 KRaft 集群中,诸如CreateTopics和 之类的管理请求DeleteTopics由客户端发送到代理侦听器。然后,代理通过在 中配置的第一个侦听器将请求转发到活动控制器controller.listener.names。这些请求的授权是在控制器节点上完成的。这是通过Envelope打包来自客户端和客户端主体的底层请求的请求来实现的。当控制器收到Envelope代理转发的请求时,它首先Envelope使用经过身份验证的代理主体来授权该请求。然后它使用转发的主体授权底层请求。
所有这些都意味着 Kafka 必须了解如何序列化和反序列化客户端主体。身份验证框架允许通过覆盖principal.builder.class配置来定制主体。为了使自定义主体能够与 KRaft 一起使用,配置的类必须实现,org.apache.kafka.common.security.auth.KafkaPrincipalSerde以便 Kafka 知道如何序列化和反序列化主体。默认实现org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder使用源代码中定义的 Kafka RPC 格式:clients/src/main/resources/common/message/DefaultPrincipalData.json。有关 KRaft 中请求转发的更多详细信息,请参阅KIP-590

自定义 SSL 用户名

默认情况下,SSL 用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。人们可以通过ssl.principal.mapping.rules在 server.properties 中设置自定义规则来更改这一点。此配置允许将 X.500 可分辨名称映射到短名称的规则列表。规则按顺序进行评估,并且与专有名称匹配的第一个规则用于将其映射到短名称。列表中的任何后续规则都将被忽略。
格式为ssl.principal.mapping.rules是一个列表,其中每个规则都以“RULE:”开头,并包含以下格式的表达式。默认规则将返回 X.500 证书专有名称的字符串表示形式。如果专有名称与模式匹配,则将对该名称运行替换命令。它还支持小写/大写选项,以强制翻译结果全部为小写/大写。这是通过在规则末尾添加“/L”或“/U”来完成的。

RULE:pattern/replacement/
RULE:pattern/replacement/[LU]

示例ssl.principal.mapping.rules值是:

RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT

上述规则将专有名称“CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”转换为“serviceuser”和“CN=adminUser,OU=Admin,O=Unknown,L=”未知,ST=未知,C=未知”到“adminuser@admin”。
对于高级用例,可以通过在 server.properties 中设置自定义的 PrimaryBuilder 来自定义名称,如下所示。

principal.builder.class=CustomizedPrincipalBuilderClass

自定义 SASL 用户名

默认情况下,SASL 用户名将是 Kerberos 主体的主要部分。人们可以通过sasl.kerberos.principal.to.local.rules在 server.properties 中设置自定义规则来更改这一点。的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)sasl.kerberos.principal.to.local.rules中的 auth_to_local 相同。这还支持额外的小写/大写规则,以强制翻译结果全部为小写/大写。这是通过在规则末尾添加“/L”或“/U”来完成的。检查以下格式的语法。每条规则均以 RULE: 开头,并包含如下格式的表达式。有关更多详细信息,请参阅 kerberos 文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U

添加规则以将 user@MYDOMAIN.COM 正确转换为 user 同时保留默认规则的示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka 授权管理 CLI 与所有其他 CLI 都可以在 bin 目录下找到。CLI 脚本称为kafka-acls.sh。以下列出了该脚本支持的所有选项:

OPTION DESCRIPTION DEFAULT OPTION TYPE
--add Indicates to the script that user is trying to add an acl. Action
--remove Indicates to the script that user is trying to remove an acl. Action
--list Indicates to the script that user is trying to list acls. Action
--bootstrap-server A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. Configuration
--command-config A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. Configuration
--cluster Indicates to the script that the user is trying to interact with acls on the singular cluster resource. ResourcePattern
--topic [topic-name] Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). ResourcePattern
--group [group-name] Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) ResourcePattern
--transactional-id [transactional-id] The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. ResourcePattern
--delegation-token [delegation-token] Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. ResourcePattern
--user-principal [user-principal] A user resource to which ACLs should be added or removed. This is currently supported in relation with delegation tokens. A value of * indicates ACL should apply to all users. ResourcePattern
--resource-pattern-type [pattern-type] Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use. \n When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. \n When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). \n WARNING: 'match', when used in combination with the '--remove' switch, should be used with care. literal Configuration
--allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. \n You can specify multiple --allow-principal in a single command. Principal
--deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. \n You can specify multiple --deny-principal in a single command. Principal
--principal Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. \n You can specify multiple --principal in a single command. Principal
--allow-host IP address from which principals listed in --allow-principal will have access. if --allow-principal is specified defaults to * which translates to "all hosts" Host
--deny-host IP address from which principals listed in --deny-principal will be denied access. if --deny-principal is specified defaults to * which translates to "all hosts" Host
--operation Operation that will be allowed or denied. \n有效值为:\n Read\n Write\n Create\n Delete\n Alter\n Describe\n ClusterAction\n DescribeConfigs\n AlterConfigs\n IdempotentWrite\n CreateTokens\n DescribeTokens\n* All All Operation
--producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. Convenience
--consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience
--idempotent Enable idempotence for the producer. This should be used in combination with the --producer option. \n Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id. Convenience
--force Convenience option to assume yes to all queries and do not prompt. Convenience
--authorizer (DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer. kafka.security.authorizer.AclAuthorizer Configuration
--authorizer-properties (DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181 Configuration
--zk-tls-config-file (DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type Configuration

例子

  • 添加ACL
    假设您要添加一条ACL“允许主体User:Bob 和User:Alice 对来自IP 198.51.100.0 和IP 198.51.100.1 的Topic Test-Topic 进行读写操作”。您可以通过使用以下选项执行 CLI 来做到这一点:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic

默认情况下,所有不具有允许访问资源操作的显式 acl 的主体都会被拒绝。在极少数情况下,定义了允许访问除某些主体之外的所有主体的允许 acl,我们将必须使用 --deny-principal 和 --deny-host 选项。例如,如果我们想要允许所有用户从 Test-topic 读取,但仅拒绝来自 IP 198.51.100.3 的 User:BadBob,我们可以使用以下命令来实现:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic

请注意,--allow-host--deny-host仅支持 IP 地址(不支持主机名)。上面的示例通过指定 --topic [topic-name] 作为资源模式选项将 acl 添加到主题。同样,用户可以通过指定 --cluster 将 acl 添加到集群,并通过指定 --group [group-name] 将 acl 添加到消费者组。您可以在某种类型的任何资源上添加 acl,例如,假设您想添加一个 acl“主要用户:Peter 被允许从 IP 198.51.200.0 生成任何主题”,您可以通过使用通配符资源“*”来做到这一点,例如通过使用以下选项执行 CLI:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'

您可以在前缀资源模式上添加 acl,例如,假设您要添加一个 acl“主要用户:Jane 被允许从任何主机生成名称以“Test-”开头的任何主题”。您可以通过使用以下选项执行 CLI 来做到这一点:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed

请注意,--resource-pattern-type 默认为“literal”,它仅影响具有完全相同名称的资源,或者在通配符资源名称“*”的情况下,影响具有任何名称的资源。 * 删除 Acl
删除 acl 几乎是相同的。唯一的区别是用户必须指定 --remove 选项,而不是 --add 选项。要删除上面第一个示例添加的 acl,我们可以使用以下选项执行 CLI:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 

如果您想删除添加到上面的前缀资源模式中的 acl,我们可以使用以下选项执行 CLI:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
  • 列出 ACL
    我们可以通过为资源指定 --list 选项来列出任何资源的 ACL。要列出文字资源模式 Test-topic 上的所有 acl,我们可以使用以下选项执行 CLI:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic

但是,这只会返回已添加到该确切资源模式的 acl。可能存在影响对主题的访问的其他 acl,例如主题通配符“*”上的任何 acl,或前缀资源模式上的任何 acl。可以显式查询通配符资源模式上的 ACL:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'

但是,不一定可以在与 Test-topic 匹配的前缀资源模式上显式查询 acl,因为此类模式的名称可能未知。我们可以使用“--resource-pattern-type match”列出影响测试主题的 所有acl,例如

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic --resource-pattern-type match

这将列出所有匹配文字、通配符和前缀资源模式的 acl。 * 添加或删除主体作为生产者或消费者
ACL 管理最常见的用例是添加/删除主体作为生产者或消费者,因此我们添加了方便的选项来处理这些情况。为了将 User:Bob 添加为 Test-topic 的生产者,我们可以执行以下命令:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --producer --topic Test-topic

类似地,要将 Alice 添加为消费者组 Group-1 的 Test-topic 消费者,我们只需传递 --consumer 选项:

> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 

请注意,对于消费者选项,我们还必须指定消费者组。为了从生产者或消费者角色中删除主体,我们只需要传递 --remove 选项。 * 基于Admin API的ACL管理 对
ClusterResource拥有Alter权限的用户可以使用Admin API进行ACL管理。kafka-acls.sh 脚本支持 AdminClient API 来管理 ACL,而无需直接与 Zookeeper/Authorizer 交互。上述所有示例都可以使用--bootstrap-server选项来执行。例如:

bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:tokenRequester --operation CreateTokens --user-principal "owner1"

授权原语

协议调用通常是对 Kafka 中的某些资源执行一些操作。需要了解操作和资源才能建立有效的保护。在本节中,我们将列出这些操作和资源,然后列出这些操作和资源与协议的组合以查看有效的场景。

Kafka中的操作

有一些操作原语可用于建立权限。这些可以与某些资源相匹配,以允许给定用户进行特定协议调用。这些都是:

  • Read
  • Write
  • Create
  • Delete
  • Alter
  • Describe
  • ClusterAction
  • DescribeConfigs
  • AlterConfigs
  • IdempotentWrite
  • CreateTokens
  • DescribeTokens
  • All

Kafka中的资源

上述操作可以应用于下面描述的某些资源。

  • 主题:这仅代表一个主题。所有作用于主题(例如读取、写入主题)的协议调用都需要添加相应的权限。如果主题资源存在授权错误,则会返回 TOPIC_AUTHORIZATION_FAILED(错误代码:29)。
  • 组:代表broker中的消费者组。所有与消费者组一起使用的协议调用(例如加入组)都必须具有主题组的权限。如果未授予权限,则协议响应中将返回 GROUP_AUTHORIZATION_FAILED(错误代码:30)。
  • 集群:该资源代表集群。影响整个集群的操作(例如受控关闭)受到集群资源特权的保护。如果集群资源存在授权问题,则会返回 CLUSTER_AUTHORIZATION_FAILED(错误代码:31)。
  • TransactionalId:该资源表示与事务相关的操作,例如提交。如果发生任何错误,经纪商将返回 TRANSACTIONAL_ID_AUTHORIZATION_FAILED(错误代码:53)。
  • DelegationToken:这代表集群中的委托令牌。诸如描述委托令牌之类的操作可以通过DelegationToken资源上的特权来保护。由于这些对象在 Kafka 中具有一些特殊行为,因此建议阅读 KIP-48和使用委托令牌进行身份验证 中的相关上游文档。
  • 用户:可以将CreateToken和DescribeToken操作授予用户资源,以允许为其他用户创建和描述令牌。更多信息可以在KIP-373中找到。

协议操作和资源

在下表中,我们将列出 Kafka API 协议对资源执行的有效操作。

PROTOCOL (API KEY) OPERATION RESOURCE NOTE
PRODUCE (0) Write TransactionalId An transactional producer which has its transactional.id set requires this privilege.
PRODUCE (0) IdempotentWrite Cluster An idempotent produce action requires this privilege.
PRODUCE (0) Write Topic This applies to a normal produce action.
FETCH (1) ClusterAction Cluster A follower must have ClusterAction on the Cluster resource in order to fetch partition data.
FETCH (1) Read Topic Regular Kafka consumers need READ permission on each partition they are fetching.
LIST_OFFSETS (2) Describe Topic
METADATA (3) Describe Topic
METADATA (3) Create Cluster If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one).
METADATA (3) Create Topic This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above).
LEADER_AND_ISR (4) ClusterAction Cluster
STOP_REPLICA (5) ClusterAction Cluster
UPDATE_METADATA (6) ClusterAction Cluster
CONTROLLED_SHUTDOWN (7) ClusterAction Cluster
OFFSET_COMMIT (8) Read Group An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.
OFFSET_COMMIT (8) Read Topic Since offset commit is part of the consuming process, it needs privileges for the read action.
OFFSET_FETCH (9) Describe Group Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.
OFFSET_FETCH (9) Describe Topic
FIND_COORDINATOR (10) Describe Group The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.
FIND_COORDINATOR (10) Describe TransactionalId This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.
JOIN_GROUP (11) Read Group
HEARTBEAT (12) Read Group
LEAVE_GROUP (13) Read Group
SYNC_GROUP (14) Read Group
DESCRIBE_GROUPS (15) Describe Group
LIST_GROUPS (16) Describe Cluster When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED.
LIST_GROUPS (16) Describe Group If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.
SASL_HANDSHAKE (17) The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
API_VERSIONS (18) The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization.
CREATE_TOPICS (19) Create Cluster If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem.
CREATE_TOPICS (19) Create Topic This is applicable from the 2.0 release.
DELETE_TOPICS (20) Delete Topic
DELETE_RECORDS (21) Delete Topic
INIT_PRODUCER_ID (22) Write TransactionalId
INIT_PRODUCER_ID (22) IdempotentWrite Cluster
OFFSET_FOR_LEADER_EPOCH (23) ClusterAction Cluster If there is no cluster level privilege for this operation, then it'll check for topic level one.
OFFSET_FOR_LEADER_EPOCH (23) Describe Topic This is applicable from the 2.1 release.
ADD_PARTITIONS_TO_TXN (24) Write TransactionalId This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).
ADD_PARTITIONS_TO_TXN (24) Write Topic
ADD_OFFSETS_TO_TXN (25) Write TransactionalId Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).
ADD_OFFSETS_TO_TXN (25) Read Group
END_TXN (26) Write TransactionalId
WRITE_TXN_MARKERS (27) ClusterAction Cluster
TXN_OFFSET_COMMIT (28) Write TransactionalId
TXN_OFFSET_COMMIT (28) Read Group
TXN_OFFSET_COMMIT (28) Read Topic
DESCRIBE_ACLS (29) Describe Cluster
CREATE_ACLS (30) Alter Cluster
DELETE_ACLS (31) Alter Cluster
DESCRIBE_CONFIGS (32) DescribeConfigs Cluster If broker configs are requested, then the broker will check cluster level privileges.
DESCRIBE_CONFIGS (32) DescribeConfigs Topic If topic configs are requested, then the broker will check topic level privileges.
ALTER_CONFIGS (33) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
ALTER_CONFIGS (33) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
ALTER_REPLICA_LOG_DIRS (34) Alter Cluster
DESCRIBE_LOG_DIRS (35) Describe Cluster An empty response will be returned on authorization failure.
SASL_AUTHENTICATE (36) SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
CREATE_PARTITIONS (37) Alter Topic
CREATE_DELEGATION_TOKEN (38) Creating delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection.
CREATE_DELEGATION_TOKEN (38) CreateTokens User Allows creating delegation tokens for the User resource.
RENEW_DELEGATION_TOKEN (39) Renewing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection.
EXPIRE_DELEGATION_TOKEN (40) Expiring delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection.
DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken Describing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection.
DESCRIBE_DELEGATION_TOKEN (41) DescribeTokens User Allows describing delegation tokens of the User resource.
DELETE_GROUPS (42) Delete Group
ELECT_PREFERRED_LEADERS (43) ClusterAction Cluster
INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
ALTER_PARTITION_REASSIGNMENTS (45) Alter Cluster
LIST_PARTITION_REASSIGNMENTS (46) Describe Cluster
OFFSET_DELETE (47) Delete Group
OFFSET_DELETE (47) Read Topic
DESCRIBE_CLIENT_QUOTAS (48) DescribeConfigs Cluster
ALTER_CLIENT_QUOTAS (49) AlterConfigs Cluster
DESCRIBE_USER_SCRAM_CREDENTIALS (50) Describe Cluster
ALTER_USER_SCRAM_CREDENTIALS (51) Alter Cluster
VOTE (52) ClusterAction Cluster
BEGIN_QUORUM_EPOCH (53) ClusterAction Cluster
END_QUORUM_EPOCH (54) ClusterAction Cluster
DESCRIBE_QUORUM (55) Describe Cluster
ALTER_PARTITION (56) ClusterAction Cluster
UPDATE_FEATURES (57) Alter Cluster
ENVELOPE (58) ClusterAction Cluster
FETCH_SNAPSHOT (59) ClusterAction Cluster
DESCRIBE_CLUSTER (60) Describe Cluster
DESCRIBE_PRODUCERS (61) Read Topic
BROKER_REGISTRATION (62) ClusterAction Cluster
BROKER_HEARTBEAT (63) ClusterAction Cluster
UNREGISTER_BROKER (64) Alter Cluster
DESCRIBE_TRANSACTIONS (65) Describe TransactionalId
LIST_TRANSACTIONS (66) Describe TransactionalId
ALLOCATE_PRODUCER_IDS (67) ClusterAction Cluster
CONSUMER_GROUP_HEARTBEAT (68) Read Group

7.6 将安全功能纳入正在运行的集群中

您可以通过前面讨论的一种或多种受支持的协议来保护正在运行的集群。这是分阶段完成的:

  • 增量反弹集群节点以打开其他安全端口。
  • 使用安全端口而不是 PLAINTEXT 端口重新启动客户端(假设您正在保护客户端代理连接)。
  • 再次增量地反弹集群以启用代理到代理的安全性(如果需要)
  • 最后增量反弹以关闭 PLAINTEXT 端口。

配置SSL和SASL的具体步骤在7.37.4 节中描述。请按照以下步骤为您所需的协议启用安全性。

安全实现允许您为代理-客户端和代理-代理通信配置不同的协议。这些必须在单独的反弹中启用。PLAINTEXT 端口必须始终保持开放状态,以便代理和/或客户端可以继续通信。

当执行增量反弹时,通过 SIGTERM 彻底停止代理。在移动到下一个节点之前,等待重新启动的副本返回到 ISR 列表也是一个很好的做法。

举个例子,假设我们希望使用 SSL 加密代理-客户端和代理-代理通信。在第一次增量反弹中,每个节点上都会打开一个 SSL 端口:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092

然后我们重新启动客户端,更改其配置以指向新打开的安全端口:

bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc

在第二次增量服务器反弹中,我们指示 Kafka 使用 SSL 作为代理-代理协议(将使用相同的 SSL 端口):

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL

在最后的反弹中,我们通过关闭 PLAINTEXT 端口来保护集群:

listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL

或者,我们可以选择打开多个端口,以便可以使用不同的协议进行broker-broker和broker-客户端通信。假设我们希望在整个过程中使用 SSL 加密(即用于代理-代理和代理-客户端通信),但我们还希望将 SASL 身份验证添加到代理-客户端连接。我们将通过在第一次反弹期间打开两个额外的端口来实现这一点:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093

然后,我们将重新启动客户端,更改其配置以指向新打开的 SASL 和 SSL 安全端口:

bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc

第二次服务器反弹会将集群切换为通过我们之前在端口 9092 上打开的 SSL 端口使用加密的代理间通信:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

最后的反弹通过关闭 PLAINTEXT 端口来保护集群。

listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

ZooKeeper 的安全可以独立于 Kafka 集群。第7.7.2节介绍了执行此操作的步骤。

7.7 ZooKeeper 认证

ZooKeeper 从 3.5.x 版本开始支持双向 TLS (mTLS) 身份验证。从 2.5 版本开始,Kafka 支持使用 SASL 和 mTLS 单独或同时向 ZooKeeper 进行身份验证。 有关更多详细信息, 请参阅 KIP-515:启用 ZK 客户端使用新的 TLS 支持的身份验证。

单独使用 mTLS 时,每个代理和任何 CLI 工具(例如ZooKeeper 安全迁移工具)都应使用相同的可分辨名称 (DN) 来标识自己,因为它是经过 ACL 处理的 DN。这可以按如下所述进行更改,但它涉及编写和部署自定义 ZooKeeper 身份验证提供程序。通常,每个证书应具有相同的 DN,但具有不同的主题备用名称 (SAN),以便 ZooKeeper 对代理和任何 CLI 工具的主机名验证能够成功。

当将 SASL 身份验证与 mTLS 一起使用到 ZooKeeper 时,SASL 身份和创建 znode 的 DN(即创建代理的证书)或安全迁移工具的 DN(如果在创建 znode 后执行迁移)都将被ACL'ed,并且所有代理和 CLI 工具都将获得授权,即使它们都使用不同的 DN,因为它们都将使用相同的 ACL'ed SASL 身份。仅当单独使用 mTLS 身份验证时,所有 DN 都必须匹配(并且 SAN 变得至关重要——同样,在没有编写和部署自定义 ZooKeeper 身份验证提供程序的情况下,如下所述)。

使用代理属性文件为代理设置 TLS 配置,如下所述。

使用--zk-tls-config-file 选项在 Zookeeper 安全迁移工具中设置 TLS 配置。kafka-acls.sh和kafka-configs.sh CLI 工具还支持--zk - tls-config-file 选项。

使用-zk-tls-config-file 选项(注意单破折号而不是双破折号)为Zookeeper-shell.sh CLI 工具设置 TLS 配置。

7.7.1 新集群

7.7.1.1 ZooKeeper SASL 身份验证

要在代理上启用 ZooKeeper SASL 身份验证,有两个必要步骤:

  1. 创建 JAAS 登录文件并设置相应的系统属性以指向它,如上所述
  2. 将每个代理中的配置属性zookeeper.set.acl设置为true

Kafka 集群的 ZooKeeper 中存储的元数据是世界可读的,但只能由代理修改。这一决定背后的理由是,ZooKeeper 中存储的数据并不敏感,但对该数据的不当操作可能会导致集群中断。我们还建议通过网络分段限制对 ZooKeeper 的访问(只有代理和某些管理工具需要访问 ZooKeeper)。

7.7.1.2 ZooKeeper 相互 TLS 身份验证

ZooKeeper mTLS 身份验证可以在有或没有 SASL 身份验证的情况下启用。如上所述,单独使用 mTLS 时,每个代理和任何 CLI 工具(例如ZooKeeper 安全迁移工具)通常必须使用相同的可分辨名称 (DN) 来标识自己,因为它是经过 ACL 处理的 DN,这意味着每个证书应具有适当的主题备用名称 (SAN),以便 ZooKeeper 对代理和任何 CLI 工具的主机名验证能够成功。

通过编写一个扩展org.apache.zookeeper.server.auth.X509AuthenticationProvider的类并覆盖方法 protected String getClientId(X509Certificate clientCert) , 可以使用 DN 以外的其他内容来标识 mTLS 客户端。选择一个方案名称,并将ZooKeeper 中的authProvider.[scheme]设置为自定义实现的完全限定类名称;然后设置ssl.authProvider=[scheme]来使用它。

以下是用于启用 TLS 身份验证的 ZooKeeper 配置示例(部分)。这些配置在ZooKeeper 管理指南中有描述 。

secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd

重要提示:ZooKeeper 不支持将 ZooKeeper 服务器密钥库中的密钥密码设置为与密钥库密码本身不同的值。请务必将密钥密码设置为与密钥库密码相同。

以下是使用 mTLS 身份验证连接到 ZooKeeper 的 Kafka Broker 配置示例(部分)。这些配置在上面的Broker Configs中有描述。

# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true

重要提示:ZooKeeper 不支持将 ZooKeeper 客户端(即代理)密钥库中的密钥密码设置为与密钥库密码本身不同的值。请务必将密钥密码设置为与密钥库密码相同。

7.7.2 迁移集群

如果您运行的 Kafka 版本不支持安全性或只是禁用了安全性,并且希望确保集群安全,那么您需要执行以下步骤来启用 ZooKeeper 身份验证,同时将对您的操作造成的干扰降至最低:

  1. 在 ZooKeeper 上启用 SASL 和/或 mTLS 身份验证。如果启用 mTLS,您现在将同时拥有非 TLS 端口和 TLS 端口,如下所示:
clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
  1. 根据需要执行代理的滚动重启,设置 JAAS 登录文件和/或定义 ZooKeeper 双向 TLS 配置(包括连接到启用 TLS 的 ZooKeeper 端口),这使代理能够向 ZooKeeper 进行身份验证。在滚动重启结束时,代理能够使用严格的 ACL 操作 znode,但它们不会使用这些 ACL 创建 znode
  2. 如果启用了 mTLS,请在 ZooKeeper 中禁用非 TLS 端口
  3. 对代理执行第二次滚动重启,这次将配置参数zookeeper.set.acl设置为true,这样可以在创建znode时使用安全ACL
  4. 执行 ZkSecurityMigrator 工具。要执行该工具,有以下脚本:bin/zookeeper-security-migration.sh,其中zookeeper.acl设置为secure。该工具遍历相应的子树,更改 znode 的 ACL。--zk-tls-config-file <file>如果启用 mTLS,请使用该选项。

还可以关闭安全集群中的身份验证。为此,请按照下列步骤操作:

  1. 对代理执行滚动重启,设置 JAAS 登录文件和/或定义 ZooKeeper 双向 TLS 配置,这使代理能够进行身份验证,但将Zookeeper.set.acl设置为 false。在滚动重启结束时,代理停止使用安全 ACL 创建 znode,但仍然能够验证和操作所有 znode
  2. 执行 ZkSecurityMigrator 工具。要执行该工具,请运行此脚本bin/zookeeper-security-migration.sh,并将Zookeeper.acl设置为不安全。该工具遍历相应的子树,更改 znode 的 ACL。--zk-tls-config-file <file>如果需要设置 TLS 配置,请使用该选项。
  3. 如果要禁用 mTLS,请在 ZooKeeper 中启用非 TLS 端口
  4. 对代理执行第二次滚动重启,这次省略设置 JAAS 登录文件的系统属性和/或根据需要删除 ZooKeeper 双向 TLS 配置(包括连接到未启用 TLS 的 ZooKeeper 端口)
  5. 如果要禁用 mTLS,请在 ZooKeeper 中禁用 TLS 端口

以下是如何运行迁移工具的示例:

> bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

运行此命令以查看完整的参数列表:

> bin/zookeeper-security-migration.sh --help

7.7.3 迁移 ZooKeeper 整体

还需要在 ZooKeeper 整体上启用 SASL 和/或 mTLS 身份验证。为此,我们需要执行服务器的滚动重新启动并设置一些属性。有关 mTLS 信息,请参阅上文。更多详细信息请参考 ZooKeeper 文档:

  1. Apache ZooKeeper 文档
  2. Apache ZooKeeper 维基

7.7.4 ZooKeeper 仲裁相互 TLS 认证

可以在 ZooKeeper 服务器本身之间启用 mTLS 身份验证。请参阅ZooKeeper 文档以获取更多详细信息。

7.8 ZooKeeper加密

使用双向 TLS 的 ZooKeeper 连接是加密的。从 ZooKeeper 3.5.7 版(Kafka 2.5 版附带的版本)开始,ZooKeeper 支持服务器端配置 ssl.clientAuth(不区分大小写:want / need / none是有效选项,默认为need),并设置此ZooKeeper 中的值为none允许客户端通过 TLS 加密连接进行连接,而无需提供自己的证书。以下是仅使用 TLS 加密连接到 ZooKeeper 的 Kafka Broker 配置示例(部分)。这些配置在上面的Broker Configs中有描述。

# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true


回到顶部