7. 安全
7.1 安全概述
在版本 0.9.0.0 中,Kafka 社区添加了许多功能,这些功能单独或一起使用可以提高 Kafka 集群的安全性。目前支持以下安全措施:
- 使用 SSL 或 SASL 对客户端(生产者和消费者)、其他代理和工具与代理的连接进行身份验证。Kafka支持以下SASL机制:
- SASL/GSSAPI (Kerberos) - 从版本 0.9.0.0 开始
- SASL/PLAIN - 从版本 0.10.0.0 开始
- SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - 从版本 0.10.2.0 开始
- SASL/OAUTHBEARER - 从版本 2.0 开始
- 从代理到 ZooKeeper 的连接身份验证
- 使用 SSL 对代理和客户端之间、代理之间或代理和工具之间传输的数据进行加密(请注意,启用 SSL 时会出现性能下降,其程度取决于 CPU 类型和 JVM 实现。)
- 客户端读/写操作的授权
- 授权可插拔,支持与外部授权服务集成
值得注意的是,安全性是可选的 - 支持非安全集群,以及经过身份验证、未经身份验证、加密和非加密客户端的混合。以下指南解释了如何在客户端和代理中配置和使用安全功能。
7.2 监听器配置
为了保护 Kafka 集群的安全,有必要保护用于与服务器通信的通道的安全。每个服务器必须定义一组侦听器,用于接收来自客户端以及其他服务器的请求。每个监听器可以被配置为使用各种机制来验证客户端并确保服务器和客户端之间的流量被加密。本节提供侦听器配置的入门知识。
Kafka 服务器支持侦听多个端口上的连接。这是通过listeners
服务器配置中的属性进行配置的,该属性接受要启用的侦听器的逗号分隔列表。每台服务器上必须至少定义一个侦听器。中定义的每个监听器的格式listeners
如下:
{LISTENER_NAME}://{hostname}:{port}
通常LISTENER_NAME
是一个描述性名称,定义了侦听器的目的。例如,许多配置对客户端流量使用单独的侦听器,因此它们可能会引用CLIENT
配置中相应的侦听器:
listeners=CLIENT://localhost:9092
每个侦听器的安全协议在单独的配置中定义: listener.security.protocol.map
。该值是映射到其安全协议的每个侦听器的逗号分隔列表。例如,以下值配置指定CLIENT
侦听器将使用 SSL,而 BROKER
侦听器将使用明文。
listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
下面给出了安全协议的可能选项:
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
明文协议不提供安全性,并且不需要任何额外的配置。在以下部分中,本文档介绍了如何配置其余协议。
如果每个所需的侦听器使用单独的安全协议,也可以使用安全协议名称作为 中的侦听器名称listeners
。使用上面的示例,我们可以使用以下定义跳过CLIENT
和侦听器的定义:BROKER
listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093
但是,我们建议用户为侦听器提供明确的名称,因为这样可以使每个侦听器的预期用途更加清晰。
inter.broker.listener.name
在此列表中的侦听器中,可以通过将配置设置为侦听器的名称来声明用于代理间通信的侦听器。代理间侦听器的主要目的是分区复制。如果未定义,则代理间侦听器由 定义的安全协议确定security.inter.broker.protocol
,默认为PLAINTEXT
。
对于依赖 Zookeeper 存储集群元数据的遗留集群,可以声明一个单独的侦听器,用于从活动控制器到代理的元数据传播。这是由 定义的control.plane.listener.name
。当活动控制器需要将元数据更新推送到集群中的代理时,它将使用此侦听器。使用控制平面侦听器的好处是它使用单独的处理线程,这使得应用程序流量不太可能阻碍元数据更改(例如分区领导者和 ISR 更新)的及时传播。请注意,默认值为 null,这意味着控制器将使用由inter.broker.listener
在 KRaft 集群中,代理是任何broker
启用了角色的服务器process.roles
,控制器是任何启用了controller
角色的服务器。侦听器配置取决于角色。定义的侦听器 inter.broker.listener.name
专门用于代理之间的请求。另一方面,控制器必须使用由 controller.listener.names
配置定义的单独侦听器。不能将其设置为与代理间侦听器相同的值。
控制器接收来自其他控制器和代理的请求。因此,即使服务器没有controller
启用该角色(即它只是一个代理),它仍然必须定义控制器侦听器以及配置它所需的任何安全属性。例如,我们可以在独立代理上使用以下配置:
process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
在此示例中,控制器侦听器仍配置为使用SASL_SSL
安全协议,但它未包含在其中,listeners
因为代理不公开控制器侦听器本身。在这种情况下将使用的端口来自controller.quorum.voters
配置,它定义了控制器的完整列表。
对于同时启用代理和控制器角色的 KRaft 服务器,配置类似。唯一的区别是控制器侦听器必须包含在 listeners
:
process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
要求定义的端口与controller.quorum.voters
公开的控制器侦听器之一完全匹配。例如,此处 CONTROLLER
侦听器绑定到端口 9093。 定义的连接字符串controller.quorum.voters
也必须使用端口 9093,如此处所示。
控制器将接受由 定义的所有侦听器上的请求controller.listener.names
。通常只有一个控制器侦听器,但也可以有更多。例如,这提供了一种通过集群滚动(一次滚动公开新监听器,一次滚动删除旧监听器)将活动侦听器从一个端口或安全协议更改为另一个端口或安全协议的方法。当定义了多个控制器侦听器时,列表中的第一个侦听器将用于出站请求。
在 Kafka 中,通常为客户端使用单独的侦听器。这允许集群间侦听器在网络级别进行隔离。对于 KRaft 中的控制器侦听器,侦听器应该被隔离,因为客户端无论如何都不使用它。客户端应该连接到代理上配置的任何其他侦听器。任何绑定到控制器的请求都将按 如下所述转发
在以下部分中,本文档介绍如何在侦听器上启用 SSL 进行加密和身份验证。后续部分将介绍使用 SASL 的其他身份验证机制。
7.3 使用 SSL 加密和验证
Apache Kafka 允许客户端使用 SSL 进行流量加密和身份验证。默认情况下,SSL 处于禁用状态,但可以根据需要打开。以下段落详细解释了如何设置您自己的 PKI 基础设施、使用它来创建证书以及配置 Kafka 以使用这些证书。
部署一个或多个支持 SSL 的代理的第一步是为每台服务器生成公钥/私钥对。由于 Kafka 希望所有密钥和证书都存储在密钥库中,因此我们将使用 Java 的 keytool 命令来完成此任务。该工具支持两种不同的密钥库格式:Java 特定的 jks 格式(现已弃用)以及 PKCS12。从 Java 版本 9 开始,PKCS12 是默认格式,为了确保无论使用哪个 Java 版本,都使用此格式,所有以下命令都显式指定 PKCS12 格式。
> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12
上述命令中需要指定两个参数:
- keystorefile:存储此代理的密钥(以及后来的证书)的密钥库文件。密钥库文件包含该代理的私钥和公钥,因此需要保证其安全。理想情况下,此步骤在将使用密钥的 Kafka 代理上运行,因为该密钥永远不应该传输/离开其预期的服务器。
- 有效性:密钥的有效时间(以天为单位)。请注意,这与证书的有效期不同,证书的有效期将在签署证书中确定。您可以使用相同的密钥来请求多个证书:如果您的密钥的有效期为 10 年,但您的 CA 只会签署有效期为一年的证书,那么随着时间的推移,您可以对 10 个证书使用相同的密钥。
要获取可与刚刚创建的私钥一起使用的证书,需要创建证书签名请求。此签名请求由受信任的 CA 签名后会生成实际证书,然后可以将其安装在密钥库中并用于身份验证目的。
要生成证书签名请求,请对迄今为止创建的所有服务器密钥库运行以下命令。
> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
该命令假设您想要将主机名信息添加到证书中,如果不是这种情况,您可以省略扩展参数-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
。请参阅下文了解更多相关信息。
主机名验证
启用后,主机名验证是根据该服务器的实际主机名或 IP 地址检查您要连接的服务器提供的证书中的属性的过程,以确保您确实连接到正确的服务器。
进行此检查的主要原因是为了防止中间人攻击。对于 Kafka,此检查已默认禁用很长一段时间,但从 Kafka 2.0.0 开始,默认为客户端连接以及代理间连接启用服务器主机名验证。可以通过设置为空字符串来
禁用服务器主机名验证。ssl.endpoint.identification.algorithm
对于动态配置的代理侦听器,可以使用以下方法禁用主机名验证kafka-configs.sh
:
> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
笔记:
通常,除了“让它正常工作”的最快方法之外,没有充分的理由禁用主机名验证,然后承诺“稍后有更多时间时修复它”!
如果在正确的时间完成正确的主机名验证并不那么困难,但一旦集群启动并运行就会变得更加困难 - 帮自己一个忙,现在就做!
如果启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名 (FQDN) 或 IP 地址:
- 通用名(中文)
- 主题备用名称 (SAN)
虽然 Kafka 检查这两个字段,但自 2000 年以来,不推荐 使用通用名称字段进行主机名验证, 并且应尽可能避免使用。此外,SAN 字段更加灵活,允许在证书中声明多个 DNS 和 IP 条目。
另一个优点是,如果 SAN 字段用于主机名验证,则可以将公用名称设置为更有意义的值以用于授权目的。由于我们需要将 SAN 字段包含在签名证书中,因此将在生成签名请求时指定该字段。也可以在生成密钥对时指定,但这不会自动复制到签名请求中。
要添加 SAN 字段,请将以下参数附加 -ext SAN=DNS:{FQDN},IP:{IPADDRESS}
到 keytool 命令:
> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
在此步骤之后,集群中的每台计算机都拥有一个可用于加密流量的公钥/私钥对和一个证书签名请求,这是创建证书的基础。要添加身份验证功能,此签名请求需要由将在此步骤中创建的受信任机构进行签名。
证书颁发机构 (CA) 负责签署证书。CA 的工作方式类似于颁发护照的政府 - 政府在每本护照上盖章(签名),以使护照难以伪造。其他政府会验证印章以确保护照的真实性。类似地,CA 对证书进行签名,并且密码学保证签名的证书在计算上难以伪造。因此,只要 CA 是真正且值得信赖的权威机构,客户端就可以强有力地保证他们正在连接到真实的机器。
对于本指南,我们将成为我们自己的证书颁发机构。在企业环境中设置生产集群时,这些证书通常由整个公司信任的企业 CA 签名。请参阅生产中的常见陷阱,了解这种情况下需要考虑的一些事项。
由于OpenSSL 中的错误,x509 模块不会将请求的扩展字段从 CSR 复制到最终证书中。由于我们希望 SAN 扩展出现在我们的证书中以启用主机名验证,因此我们将使用ca模块。这需要在生成 CA 密钥对之前进行一些额外的配置。
将以下列表保存到名为 openssl-ca.cnf 的文件中,并根据需要调整有效性和通用属性的值。
HOME = .
RANDFILE = $ENV::HOME/.rnd
####################################################################
[ ca ]
default_ca = CA_default # The default ca section
[ CA_default ]
base_dir = .
certificate = $base_dir/cacert.pem # The CA certifcate
private_key = $base_dir/cakey.pem # The CA private key
new_certs_dir = $base_dir # Location for new certs after signing
database = $base_dir/index.txt # Database index file
serial = $base_dir/serial.txt # The current serial number
default_days = 1000 # How long to certify for
default_crl_days = 30 # How long before next CRL
default_md = sha256 # Use public key default MD
preserve = no # Keep passed DN ordering
x509_extensions = ca_extensions # The extensions to add to the cert
email_in_dn = no # Don't concat the email in the DN
copy_extensions = copy # Required to copy SANs from CSR to cert
####################################################################
[ req ]
default_bits = 4096
default_keyfile = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions = ca_extensions
string_mask = utf8only
####################################################################
[ ca_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = DE
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = Test Province
localityName = Locality Name (eg, city)
localityName_default = Test Town
organizationName = Organization Name (eg, company)
organizationName_default = Test Company
organizationalUnitName = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name
emailAddress = Email Address
emailAddress_default = test@test.com
####################################################################
[ ca_extensions ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints = critical, CA:true
keyUsage = keyCertSign, cRLSign
####################################################################
[ signing_policy ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
####################################################################
[ signing_req ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
然后创建数据库和序列号文件,这些文件将用于跟踪使用该 CA 签署的证书。这两个文件都是简单的文本文件,与您的 CA 密钥位于同一目录中。
> echo 01 > serial.txt
> touch index.txt
完成这些步骤后,您现在就可以生成 CA,稍后将使用该 CA 来签署证书。
> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
CA 只是一个由自身签名的公钥/私钥对和证书,仅用于签署其他证书。
该密钥对应该保持非常安全,如果有人访问它,他们可以创建并签署您的基础设施信任的证书,这意味着他们在连接到信任该 CA 的任何服务时将能够冒充任何人。
下一步是将生成的 CA 添加到 **客户端的信任库**,以便客户端可以信任此 CA:
> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
注意: 如果您通过在 Kafka 代理配置中将 ssl.client.auth 设置为“请求”或“必需”来将 Kafka 代理配置为需要客户端身份验证,那么您还必须为 Kafka 代理提供一个信任库,并且它应该包含所有内容签署客户端密钥的 CA 证书。
> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
与步骤 1 中存储每台计算机自己的身份的密钥库不同,客户端的信任库存储客户端应该信任的所有证书。将证书导入到信任库中还意味着信任由该证书签名的所有证书。正如上面的类比,信任政府(CA)也意味着信任其颁发的所有护照(证书)。该属性称为信任链,在大型 Kafka 集群上部署 SSL 时特别有用。您可以使用单个 CA 签署集群中的所有证书,并让所有计算机共享信任该 CA 的同一信任库。这样所有机器都可以验证所有其他机器。
然后使用 CA 对其进行签名:
> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}
最后,您需要将 CA 的证书和签名的证书导入密钥库:
> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
> keytool -keystore {keystore} -alias localhost -import -file cert-signed
参数定义如下:
- keystore:密钥库的位置
- CA证书:CA的证书
- 证书签名请求:使用服务器密钥创建的 csr
- 服务器证书:用于写入服务器签名证书的文件
这将为您留下一个名为truststore.jks的信任库- 这对于所有客户和经纪商来说都是相同的,并且不包含任何敏感信息,因此无需保护这一点。
此外,每个节点都会有一个server.keystore.jks文件,其中包含节点密钥、证书和 CA 证书,请参阅配置 Kafka 代理和配置 Kafka 客户端 以获取有关如何使用这些文件的信息。
有关此主题的一些工具帮助,请查看easyRSA项目,该项目具有丰富的脚本来帮助完成这些步骤。
PEM 格式的 SSL 密钥和证书
从 2.7.0 开始,可以直接在 PEM 格式的配置中为 Kafka 代理和客户端配置 SSL 密钥和信任存储。这避免了在文件系统上存储单独文件的需要,并受益于 Kafka 配置的密码保护功能。除了 JKS 和 PKCS12 之外,PEM 还可以用作基于文件的密钥和信任存储的存储类型。要直接在代理或客户端配置中配置 PEM 密钥存储,应在 中提供 PEM 格式的私钥,ssl.keystore.key
并应在 中提供 PEM 格式的证书链ssl.keystore.certificate.chain
。要配置信任存储,应在以下位置提供信任证书,例如 CA 的公共证书ssl.truststore.certificates
。由于 PEM 通常存储为多行 base-64 字符串,因此配置值可以作为多行字符串包含在 Kafka 配置中,其中行以反斜杠 ('\') 结尾以继续行。
存储密码配置ssl.keystore.password
,ssl.truststore.password
不用于 PEM。如果私钥使用密码加密,则必须在 中提供密钥密码ssl.key.password
。私钥可以以未加密的形式提供,无需密码。在生产部署中,在这种情况下,应使用 Kafka 中的密码保护功能对配置进行加密或外部化。请注意,当使用 OpenSSL 等外部工具进行加密时,默认 SSL 引擎工厂对加密私钥的解密功能有限。BouncyCastle 等第三方库可以与自定义集成,SslEngineFactory
以支持更广泛的加密私钥。
上面的段落展示了创建您自己的 CA 并使用它为您的集群签署证书的过程。虽然对于沙箱、开发、测试和类似系统非常有用,但这通常不是在企业环境中为生产集群创建证书的正确过程。企业通常会运营自己的CA,用户可以发送CSR来与该CA签署,这样做的好处是用户无需负责保证CA的安全,并且有一个每个人都可以信任的中央权威。然而,它也剥夺了用户对证书签名过程的很多控制权。
1. 扩展密钥使用
证书可能包含控制证书使用目的的扩展字段。如果此字段为空,则对使用没有限制,但如果此处指定了任何使用,则有效的 SSL 实现必须强制执行这些使用。
Kafka的相关用法有:
* 客户端认证
* 服务器认证
Kafka 代理需要允许这两种用法,因为对于集群内通信,每个代理将同时充当其他代理的客户端和服务器。企业 CA 拥有 Web 服务器的签名配置文件并将其用于 Kafka 的情况并不罕见,该配置文件仅包含 serverAuth*使用*值并导致 SSL 握手失败。
- 出于安全原因,中间证书
企业根 CA 通常保持离线状态。为了实现日常使用,创建了所谓的中间 CA,然后使用它们来签署最终证书。将由中间 CA 签名的证书导入密钥库时,有必要提供直至根 CA 的整个信任链。只需将证书文件合并到一个组合证书文件中,然后使用 keytool 导入该文件即可完成此操作。 - 无法复制扩展字段
CA 运营商通常不愿意从 CSR 复制和请求扩展字段,而是更愿意自己指定这些字段,因为这使得恶意方更难获取具有潜在误导性或欺诈性值的证书。建议仔细检查签名证书,这些证书是否包含所有请求的 SAN 字段以启用正确的主机名验证。以下命令可用于将证书详细信息打印到控制台,该详细信息应与最初请求的内容进行比较:
> openssl x509 -in certificate.crt -text -noout
如果没有为代理间通信启用 SSL(请参阅下文了解如何启用它),则 PLAINTEXT 和 SSL 端口都是必需的。
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
代理端需要以下 SSL 配置
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234
注意:ssl.truststore.password 在技术上是可选的,但强烈建议使用。如果未设置密码,仍然可以访问信任库,但完整性检查将被禁用。值得考虑的可选设置:
- ssl.client.auth=none (“required”=> 需要客户端身份验证,“requested”=> 请求客户端身份验证,没有证书的客户端仍然可以连接。不鼓励使用“requested”,因为它提供了一种错误的感觉安全性和配置错误的客户端仍将成功连接。)
- ssl.cipher.suites(可选)。密码套件是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。(默认为空列表)
- ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1(列出您要从客户端接受的 SSL 协议。请注意,SSL 已被弃用,取而代之的是 TLS,并且不建议在生产中使用 SSL)
- ssl.keystore.type=JKS
- ssl.truststore.type=JKS
- ssl.secure.random.implementation=SHA1PRNG
如果要为代理间通信启用 SSL,请将以下内容添加到 server.properties 文件(默认为 PLAINTEXT)
security.inter.broker.protocol=SSL
由于某些国家/地区的进口法规,Oracle 实施限制了默认情况下可用的加密算法的强度。如果需要更强的算法(例如,具有 256 位密钥的 AES),则必须获取JCE 无限强度管辖策略文件并将其安装在 JDK/JRE 中。有关详细信息, 请参阅 JCA 提供商文档。
JRE/JDK 将有一个默认的伪随机数生成器 (PRNG),用于加密操作,因此不需要配置ssl.secure.random.implementation
. 然而,某些实现存在性能问题(特别是,Linux 系统上默认选择的,NativePRNG
使用全局锁)。如果 SSL 连接的性能成为问题,请考虑显式设置要使用的实现。该SHA1PRNG
实现是非阻塞的,并且在重负载(50 MB/秒生成的消息,加上每个代理的复制流量)下表现出了非常好的性能特征。
启动代理后,您应该能够在 server.log 中看到
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
要快速检查服务器密钥库和信任库是否设置正确,您可以运行以下命令
> openssl s_client -debug -connect localhost:9093 -tls1
(注意:TLSv1 应列在 ssl.enabled.protocols 下)
在此命令的输出中,您应该看到服务器的证书:
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
如果证书未显示或存在任何其他错误消息,则说明您的密钥库设置不正确。 6. #### 配置 Kafka 客户端
仅新的 Kafka Producer 和 Consumer 支持 SSL,不支持旧的 API。SSL 配置对于生产者和消费者来说都是相同的。
如果代理中不需要客户端身份验证,则以下是最小配置示例:
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234
注意:ssl.truststore.password 在技术上是可选的,但强烈建议使用。如果未设置密码,仍然可以访问信任库,但完整性检查将被禁用。如果需要客户端身份验证,则必须像步骤 1 中那样创建密钥库,并且还必须配置以下内容:
ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
根据我们的要求和代理配置,可能还需要其他配置设置:
- ssl.provider(可选)。用于 SSL 连接的安全提供程序的名称。默认值是 JVM 的默认安全提供程序。
- ssl.cipher.suites(可选)。密码套件是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。
- ssl.enabled.protocols=TLSv1.2、TLSv1.1、TLSv1。它应该至少列出代理端配置的协议之一
- ssl.truststore.type=JKS
- ssl.keystore.type=JKS
使用控制台生产者和控制台消费者的示例:
> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
7.4 使用 SASL 进行身份验证
Kafka 使用 Java 身份验证和授权服务 ( JAAS ) 进行 SASL 配置。
KafkaServer是每个 KafkaServer/Broker 使用的 JAAS 文件中的部分名称。本部分为代理提供 SASL 配置选项,包括代理为代理间通信而建立的任何 SASL 客户端连接。如果将多个侦听器配置为使用 SASL,则部分名称可以以小写侦听器名称为前缀,后跟句点,例如sasl_ssl.KafkaServer。
客户端部分用于验证与 Zookeeper 的 SASL 连接。它还允许代理在 Zookeeper 节点上设置 SASL ACL,从而锁定这些节点,以便只有代理可以修改它。所有经纪商必须具有相同的主体名称。如果要使用除 Client 之外的节名称,请将系统属性Zookeeper.sasl.clientconfig设置为适当的名称(例如-Dzookeeper.sasl.clientconfig=ZkClient)。
ZooKeeper 默认使用“zookeeper”作为服务名称。如果要更改此设置,请将系统属性 Zookeeper.sasl.client.username设置为适当的名称(例如-Dzookeeper.sasl.client.username \=zk)。
代理还可以使用代理配置属性来配置 JAAS sasl.jaas.config
。属性名称必须以侦听器前缀(包括 SASL 机制)作为前缀,即listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
. 配置值中只能指定一个登录模块。如果在侦听器上配置了多个机制,则必须使用侦听器和机制前缀为每个机制提供配置。例如,
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";
如果在不同级别定义 JAAS 配置,则使用的优先顺序为:
- 代理配置属性
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
{listenerName}.KafkaServer
静态 JAAS 配置部分KafkaServer
静态 JAAS 配置部分
请注意,ZooKeeper JAAS 配置只能使用静态 JAAS 配置进行配置。
有关代理配置示例,请参阅GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER 。
客户端可以使用客户端配置属性 sasl.jaas.config 或使用 类似于代理的静态 JAAS 配置文件来配置 JAAS。
客户端可以将 JAAS 配置指定为生产者或消费者属性,而无需创建物理配置文件。此模式还允许同一 JVM 中的不同生产者和消费者通过为每个客户端指定不同的属性来使用不同的凭据。如果同时指定了静态 JAAS 配置系统属性 java.security.auth.login.config
和客户端属性sasl.jaas.config
,则将使用客户端属性。
有关示例配置,请参阅GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER 。
要使用静态 JAAS 配置文件在客户端上配置 SASL 身份验证: 1. 添加一个 JAAS 配置文件,其中包含名为KafkaClient的客户端登录部分。在KafkaClient中为所选机制配置登录模块,如设置GSSAPI (Kerberos)、 PLAIN、 SCRAM或 OAUTHBEARER的示例中所述。例如,GSSAPI 凭证可以配置为:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-client-1@EXAMPLE.COM";
};
- 将 JAAS 配置文件位置作为 JVM 参数传递给每个客户端 JVM。例如:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
SASL 可以与 PLAINTEXT 或 SSL 一起使用,作为分别使用安全协议 SASL_PLAINTEXT 或 SASL_SSL 的传输层。如果使用SASL_SSL,则还必须配置SSL。
-
SASL机制
Kafka支持以下SASL机制: * GSSAPI(Kerberos) * 清楚的 * SCRAM-SHA-256 * SCRAM-SHA-512 * 承载者
-
Kafka 代理的 SASL 配置
-
在 server.properties 中配置 SASL 端口,方法是将 SASL_PLAINTEXT 或 SASL_SSL 至少之一添加到listeners参数,该参数包含一个或多个逗号分隔值:
listeners=SASL_PLAINTEXT://host.name:port
如果您仅配置 SASL 端口(或者如果您希望 Kafka 代理使用 SASL 相互进行身份验证),请确保为代理间通信设置相同的 SASL 协议:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
-
选择 要在代理中启用的一种或多种受支持的机制,并按照步骤为该机制配置 SASL。要在代理中启用多种机制,请按照此处的步骤操作 。
-
Kafka 客户端的 SASL 配置
仅新的 Java Kafka 生产者和消费者支持 SASL 身份验证,不支持旧的 API。
要在客户端上配置 SASL 身份验证,请选择 在代理中启用的SASL机制以进行客户端身份验证,然后按照步骤为所选机制配置 SASL。
注意:当通过 SASL 建立与代理的连接时,客户端可以对代理地址执行反向 DNS 查找。由于 JRE 实现反向 DNS 查找的方式,如果未使用完全限定的域名(对于客户端bootstrap.servers
和代理的 [advertised.listeners](https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners)
.
-
使用 SASL/Kerberos 进行身份验证
-
先决条件
-
Kerberos
如果您的组织已经使用 Kerberos 服务器(例如,通过使用 Active Directory),则无需仅为 Kafka 安装新服务器。否则,您将需要安装一个,您的 Linux 供应商可能有 Kerberos 软件包以及有关如何安装和配置它的简短指南(Ubuntu、Redhat)。请注意,如果您使用 Oracle Java,则需要下载适合您的 Java 版本的 JCE 策略文件并将其复制到 $JAVA_HOME/jre/lib/security。 - 创建 Kerberos 主体
如果您使用组织的 Kerberos 或 Active Directory 服务器,请向 Kerberos 管理员询问集群中每个 Kafka 代理以及将使用 Kerberos 身份验证(通过客户端和工具)访问 Kafka 的每个操作系统用户的主体。
如果您安装了自己的 Kerberos,则需要使用以下命令自行创建这些主体:
> sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
> sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
-
确保可以使用主机名访问所有主机- Kerberos 要求所有主机都可以使用其 FQDN 进行解析。
-
配置 Kafka 代理
-
将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在本示例中我们将其称为 kafka_server_jaas.conf(请注意,每个代理应该有自己的密钥表):
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};
// Zookeeper client authentication
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};
JAAS 文件中的KafkaServer部分告诉代理要使用哪个主体以及存储该主体的密钥表的位置。它允许代理使用本节中指定的密钥表登录。有关 Zookeeper SASL 配置的更多详细信息, 请参阅注释。
- 将 JAAS 和可选的 krb5 文件位置作为 JVM 参数传递给每个 Kafka 代理(请参阅此处了解更多详细信息):
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
- 确保启动 kafka 代理的操作系统用户可以读取 JAAS 文件中配置的密钥表。
- 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
我们还必须在 server.properties 中配置服务名称,该名称应与 kafka 代理的主体名称匹配。在上面的示例中,主体是“kafka/kafka1.hostname.com@EXAMPLE.com”,因此:
sasl.kerberos.service.name=kafka
要在客户端上配置 SASL 身份验证:
1. 客户端(生产者、消费者、连接工作人员等)将使用自己的主体(通常与运行客户端的用户同名)向集群进行身份验证,因此根据需要获取或创建这些主体。然后为每个客户端配置 JAAS 配置属性。JVM 中的不同客户端可以通过指定不同的主体以不同的用户身份运行。该物业sasl.jaas.config
Producer.properties 或 Consumer.properties 中描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是使用密钥表的客户端的示例配置(建议用于长时间运行的进程):
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/keytabs/kafka_client.keytab" \
principal="kafka-client-1@EXAMPLE.COM";
对于 kafka-console-consumer 或 kafka-console- Producer 等命令行实用程序,kinit 可以与“useTicketCache = true”一起使用,如下所示:
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useTicketCache=true;
客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。 2. 确保启动 kafka 客户端的操作系统用户可以读取 JAAS 配置中配置的密钥表。 3. (可选)将 krb5 文件位置作为 JVM 参数传递给每个客户端 JVM(请参阅此处了解更多详细信息):
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
- 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
SASL/PLAIN是一种简单的用户名/密码认证机制,通常与TLS结合使用进行加密,实现安全认证。Kafka 支持 SASL/PLAIN 的默认实现,可以将其扩展到生产用途,如此处所述。
在默认实现下principal.builder.class
,用户名用作Principal
ACL 等配置的 身份验证。
1. ##### 配置 Kafka 代理
- 将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
此配置定义了两个用户(admin和alice)。KafkaServer部分 中的属性用户名和密码由代理用来启动与其他代理的连接。在此示例中, admin是经纪商间通信的用户。属性集user_ userName定义连接到代理的所有用户的密码,并且代理使用这些属性验证所有客户端连接,包括来自其他代理的连接。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
- 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 PLAIN 机制的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
客户端使用用户名和密码选项来配置客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在sasl.jaas.config
.
客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。
- 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
-
在生产中使用 SASL/PLAIN
- SASL/PLAIN 只能与 SSL 一起使用作为传输层,以确保明文密码不会在未经加密的情况下在线路上传输。
- Kafka 中 SASL/PLAIN 的默认实现在 JAAS 配置文件中指定用户名和密码,如下 所示。
sasl.server.callback.handler.class
从 Kafka 2.0 版本开始,您可以通过配置自己的回调处理程序来避免在磁盘上存储明文密码,这些回调处理程序使用配置选项和来从外部源获取用户名和密码sasl.client.callback.handler.class
。 - 在生产系统中,外部认证服务器可以实现密码认证。从 Kafka 2.0 版开始,您可以插入自己的回调处理程序,通过配置
sasl.server.callback.handler.class
.
-
使用 SASL/SCRAM 进行身份验证
Salted 质询响应身份验证机制 (SCRAM) 是一系列 SASL 机制,可解决执行用户名/密码身份验证(例如 PLAIN 和 DIGEST-MD5)的传统机制的安全问题。该机制在RFC 5802中定义。Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,可以与TLS结合使用来执行安全身份验证。默认实现下principal.builder.class
,使用用户名作为经过身份验证的 Principal
用于配置 ACL 等。Kafka 中的默认 SCRAM 实现将 SCRAM 凭证存储在 Zookeeper 中,适合在 Zookeeper 位于专用网络上的 Kafka 安装中使用。 有关更多详细信息,请参阅安全注意事项。
Kafka 中的 SCRAM 实现使用 Zookeeper 作为凭证存储。可以使用kafka-configs.sh在 Zookeeper 中创建凭证。对于启用的每个 SCRAM 机制,必须通过添加具有机制名称的配置来创建凭据。必须在启动 Kafka 代理之前创建代理间通信的凭证。可以动态创建和更新客户端凭证,并且更新的凭证将用于验证新连接。
使用密码alice-secret为用户alice创建 SCRAM 凭证:
> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
如果未指定迭代,则使用默认迭代计数 4096。创建随机盐,并将由盐、迭代、StoredKey 和 ServerKey 组成的 SCRAM 身份存储在 Zookeeper 中。有关 SCRAM 身份和各个字段的详细信息, 请参阅RFC 5802 。
以下示例还需要用户管理员进行代理间通信,可以使用以下命令创建:
> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
可以使用--describe选项列出现有凭据:
> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice
可以使用--alter --delete-config选项删除一种或多种 SCRAM 机制的凭证:
> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
-
配置 Kafka 代理
-
将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
KafkaServer部分中的 属性用户名和密码由代理用来启动与其他代理的连接。在此示例中,admin是经纪商间通信的用户。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
- 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 SCRAM 机制的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alice" \
password="alice-secret";
客户端使用用户名和密码选项来配置客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在sasl.jaas.config
.
客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。
- 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
-
SASL/SCRAM 的安全注意事项
-
Kafka 中 SASL/SCRAM 的默认实现将 SCRAM 凭证存储在 Zookeeper 中。这适用于 Zookeeper 安全且位于专用网络上的安装中的生产使用。
- Kafka 仅支持强哈希函数 SHA-256 和 SHA-512,最小迭代次数为 4096。强哈希函数与强密码和高迭代次数相结合,可以在 Zookeeper 安全性受到损害时防止暴力攻击。
- SCRAM 应仅与 TLS 加密一起使用,以防止 SCRAM 交换被拦截。这可以防止字典或暴力攻击,并在 Zookeeper 受到威胁时防止假冒。
sasl.server.callback.handler.class
从 Kafka 2.0 版开始,可以通过在 Zookeeper 不安全的安装中进行配置,使用自定义回调处理程序覆盖默认 SASL/SCRAM 凭证存储。-
有关安全注意事项的更多详细信息,请参阅 RFC 5802。
-
使用 SASL/OAUTHBEARER 进行身份验证
OAuth 2 授权框架“使第三方应用程序能够代表资源所有者通过协调资源所有者和 HTTP 服务之间的批准交互,或者通过允许第三方应用程序获得对 HTTP 服务的有限访问权限。以自己的名义获得访问权限。” SASL OAUTHBEARER 机制允许在 SASL(即非 HTTP)上下文中使用该框架;它在RFC 7628中定义。Kafka 中的默认 OAUTHBEARER 实现创建并验证不安全的 JSON Web 令牌 ,并且仅适合在非生产 Kafka 安装中使用。请参阅安全注意事项 更多细节。
在默认实现下principal.builder.class
,OAuthBearerToken的principalName用作Principal
ACL等配置的身份 验证。
-
配置 Kafka 代理
-
将一个经过适当修改的 JAAS 文件(类似于下面的文件)添加到每个 Kafka 代理的配置目录中,在此示例中我们将其称为 kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="admin";
};
KafkaServer部分中的 属性unsecuredLoginStringClaim_sub由代理在启动与其他代理的连接时使用。在此示例中,admin将出现在主题 ( sub ) 声明中,并将成为broker间通信的用户。 2. 将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
- 按照此处所述在 server.properties 中配置 SASL 端口和 SASL 机制。例如:
listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER
要在客户端上配置 SASL 身份验证: 1. 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是 OAUTHBEARER 机制的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="alice";
客户端使用选项unsecuredLoginStringClaim_sub来配置主题 ( sub ) 声明,该声明确定客户端连接的用户。在此示例中,客户端以用户alice的身份连接到代理。JVM 中的不同客户端可以通过在 中指定不同的主题 ( sub ) 声明来作为不同用户进行连接sasl.jaas.config
。
客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。
- 在 Producer.properties 或 Consumer.properties 中配置以下属性:
security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER
- SASL/OAUTHBEARER 的默认实现取决于 jackson-databind 库。由于它是一个可选依赖项,因此用户必须通过其构建工具将其配置为依赖项。
-
SASL/OAUTHBEARER 的无担保令牌创建选项
- Kafka 中 SASL/OAUTHBEARER 的默认实现创建并验证不安全的 JSON Web 令牌。虽然仅适用于非生产用途,但它确实提供了在开发或测试环境中创建任意令牌的灵活性。
- 以下是客户端(以及代理端,如果 OAUTHBEARER 是代理间协议)支持的各种 JAAS 模块选项:
JAAS Module Option for Unsecured Token Creation Documentation unsecuredLoginStringClaim_ ="value" Creates a String claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated). unsecuredLoginNumberClaim_ ="value" Creates a Number claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated). unsecuredLoginListClaim_ ="value" Creates a String List claim with the given name and values parsed from the given value where the first character is taken as the delimiter. For example: unsecuredLoginListClaim_fubar=" unsecuredLoginExtension_ ="value" Creates a String extension with the given name and value. For example: unsecuredLoginExtension_traceId="123". A valid extension name is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved. A valid extension value is any combination of characters with ASCII codes 1-127. unsecuredLoginPrincipalClaimName Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than 'sub'. unsecuredLoginLifetimeSeconds Set to an integer value if the token expiration is to be set to something other than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will be set to reflect the expiration time. unsecuredLoginScopeClaimName Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than 'scope'. -
SASL/OAUTHBEARER 的不安全令牌验证选项
- 以下是代理端用于不安全 JSON Web 令牌验证的各种受支持的 JAAS 模块选项:
JAAS Module Option for Unsecured Token Validation Documentation unsecuredValidatorPrincipalClaimName="value" Set to a non-empty value if you wish a particular String claim holding a principal name to be checked for existence; the default is to check for the existence of the 'sub' claim. unsecuredValidatorScopeClaimName="value" Set to a custom claim name if you wish the name of the String orString List claim holding any token scope to be something other than 'scope'. unsecuredValidatorRequiredScope="value" Set to a space-delimited list of scope values if you wish theString/String Listclaim holding the token scope to be checked to make sure it contains certain values. unsecuredValidatorAllowableClockSkewMs="value" Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0). - 可以使用自定义登录和 SASL 服务器回调处理程序来覆盖默认的不安全 SASL/OAUTHBEARER 实现(并且必须在生产环境中覆盖)。
- 有关安全注意事项的更多详细信息,请参阅RFC 6749 第 10 节。
-
SASL/OAUTHBEARER 的令牌刷新
Kafka 会在令牌过期之前定期刷新令牌,以便客户端可以继续与代理建立连接。影响刷新算法运行方式的参数被指定为生产者/消费者/代理配置的一部分,如下所示。有关详细信息,请参阅其他地方有关这些属性的文档。默认值通常是合理的,在这种情况下,不需要显式设置这些配置参数。
Producer/Consumer/Broker Configuration Property sasl.login.refresh.window.factor sasl.login.refresh.window.jitter sasl.login.refresh.min.period.seconds sasl.login.refresh.min.buffer.seconds -
SASL/OAUTHBEARER 的安全/生产使用
生产用例需要编写org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现 ,它可以处理org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback的实例 ,并通过sasl.login声明它 非代理客户端的 .callback.handler.class配置选项或通过代理的listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class 配置选项(当 SASL/OAUTHBEARER 是代理间协议时) 。
生产用例还需要编写org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现 ,它可以处理org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback的实例 ,并通过listener.name声明它 .sasl_ssl.oauthbearer.sasl.server.callback.handler.class 代理配置选项。
-
SASL/OAUTHBEARER 的安全注意事项
- Kafka 中 SASL/OAUTHBEARER 的默认实现创建并验证不安全的 JSON Web 令牌。这仅适用于非生产用途。
- OAUTHBEARER 应仅在具有 TLS 加密的生产环境中使用,以防止令牌被拦截。
- 如上所述,可以使用自定义登录和 SASL 服务器回调处理程序来覆盖默认的不安全 SASL/OAUTHBEARER 实现(并且必须在生产环境中覆盖)。
- 有关 OAuth 2 一般安全注意事项的更多详细信息,请参阅RFC 6749,第 10 节。
-
在代理中启用多个 SASL 机制
-
在JAAS 配置文件的KafkaServer部分中指定所有启用机制的登录模块的配置。例如:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
- 在 server.properties 中启用 SASL 机制:
sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
- 如果需要,请在 server.properties 中指定用于代理间通信的 SASL 安全协议和机制:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
- 按照GSSAPI (Kerberos)、 PLAIN、 SCRAM和OAUTHBEARER中特定于机制的步骤 为启用的机制配置 SASL。
-
修改正在运行的集群中的SASL机制
可以使用以下顺序在正在运行的集群中修改 SASL 机制:
- 通过将机制添加到每个代理的 server.properties 中的sasl.enabled.mechanisms中,启用新的 SASL 机制。更新 JAAS 配置文件以包括此处所述的两种机制。增量反弹集群节点。
- 使用新机制重新启动客户端。
- 要更改代理间通信的机制(如果需要),请将server.properties 中的sasl.mechanism.inter.broker.protocol设置为新机制,并再次增量地弹跳集群。
-
要删除旧机制(如果需要),请从server.properties 中的sasl.enabled.mechanisms中删除旧机制,并从 JAAS 配置文件中删除旧机制的条目。再次增量地反弹集群。
-
使用委托令牌进行身份验证
基于委托令牌的身份验证是一种轻量级身份验证机制,可补充现有的 SASL/SSL 方法。委托令牌是 kafka broker和客户端之间的共享秘密。委派令牌将帮助处理框架将工作负载分配给安全环境中的可用工作人员,而无需在使用 2 向 SSL 时增加分配 Kerberos TGT/密钥表或密钥库的成本。 有关更多详细信息,请参阅KIP-48 。
在默认实现下principal.builder.class
,委托令牌的所有者用作Principal
ACL 等配置的 身份验证。
使用委托令牌的典型步骤是:
- 用户通过 SASL 或 SSL 向 Kafka 集群进行身份验证,并获取委托令牌。这可以使用管理 API 或使用kafka-delegation-tokens.sh脚本来完成。
- 用户将委托令牌安全地传递给 Kafka 客户端,以便通过 Kafka 集群进行身份验证。
-
令牌所有者/更新者可以更新/过期委托令牌。
-
代币管理
秘密用于生成和验证委托令牌。这是使用配置选项delegate.token.secret.key提供的。必须在所有代理之间配置相同的密钥。如果密钥未设置或设置为空字符串,代理将禁用委托令牌身份验证。
在当前的实现中,令牌详细信息存储在 Zookeeper 中,适合在 Zookeeper 位于专用网络上的 Kafka 安装中使用。目前,此机密以纯文本形式存储在 server.properties 配置文件中。我们打算在未来的 Kafka 版本中对这些进行配置。
代币具有当前寿命和最大可再生寿命。默认情况下,令牌必须每 24 小时更新一次,最多 7 天。这些可以使用delegation.token.expiry.time.ms 和delegation.token.max.lifetime.ms配置选项进行配置。
令牌也可以显式取消。如果令牌在令牌过期时间内未更新,或者令牌超出了最大生命周期,则将从所有代理缓存以及 Zookeeper 中删除该令牌。
可以使用管理 API 或使用kafka-delegation-tokens.sh脚本创建令牌。委托令牌请求(创建/更新/过期/描述)应仅在 SASL 或 SSL 身份验证通道上发出。如果通过委托令牌完成初始身份验证,则无法请求令牌。用户也可以通过指定--owner-principal参数为该用户或其他人创建令牌。所有者/更新者可以更新或过期令牌。所有者/更新者始终可以描述自己的代币。要描述其他令牌,需要在代表令牌所有者的用户资源上添加 DESCRIBE_TOKEN 权限。 下面给出了kafka-delegation-tokens.sh脚本示例。
创建委托令牌:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1
为不同的所有者创建委托令牌:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1
更新委托令牌:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
使委托令牌过期:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
可以使用 --describe 选项描述现有令牌:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1
委托令牌身份验证搭载当前 SASL/SCRAM 身份验证机制。我们必须在 Kafka 集群上启用 SASL/SCRAM 机制,如此处所述。
配置 Kafka 客户端:
- 在 Producer.properties 或 Consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。以下是用于令牌身份验证的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="tokenID123" \
password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
tokenauth="true";
客户端使用用户名和密码选项来配置令牌 id 和令牌 HMAC。tokenauth选项用于向服务器指示令牌认证。在此示例中,客户端使用令牌 id 连接到代理:tokenID123。JVM 中的不同客户端可以通过在 中指定不同的令牌详细信息来使用不同的令牌进行连接sasl.jaas.config
。
客户端的 JAAS 配置也可以指定为类似于代理的 JVM 参数,如此处所述。客户端使用名为KafkaClient的登录部分 。此选项仅允许一名用户进行来自 JVM 的所有客户端连接。
当秘密需要轮换时,我们需要重新部署。在此过程中,已连接的客户端将继续工作。但是任何新的连接请求和使用旧令牌的续订/过期请求都可能会失败。步骤如下。
- 使所有现有令牌过期。
- 通过滚动升级轮换秘密,并且
- 生成新的代币
我们打算在未来的 Kafka 版本中实现自动化。
7.5 授权和 ACL
Kafka 附带了一个可插入的授权框架,该框架在服务器配置中使用authorizer.class.name属性进行配置。配置的实现必须扩展org.apache.kafka.server.authorizer.Authorizer
。Kafka 提供了将 ACL 存储在集群元数据(Zookeeper 或 KRaft 元数据日志)中的默认实现。对于基于Zookeeper的集群,提供的实现配置如下:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
对于 KRaft 集群,在所有节点(代理、控制器或组合代理/控制器节点)上使用以下配置:
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
Kafka ACL 的通用格式是“主体 {P} 是[允许|拒绝]来自主机 {H} 的任何与 ResourcePattern {RP} 匹配的资源 {R} 上的操作 {O}”。您可以在KIP-11中阅读有关 ACL 结构的更多信息,在KIP-290中阅读有关资源模式的更多信息。要添加、删除或列出 ACL,您可以使用 Kafka ACL CLI kafka-acls.sh
。默认情况下,如果没有 ResourcePatterns 与特定资源 R 匹配,则 R 没有关联的 ACL,因此除了超级用户之外,任何人都不允许访问 R。如果要更改该行为,可以在 server.properties 中包含以下内容。
allow.everyone.if.no.acl.found=true
还可以在 server.properties 中添加超级用户,如下所示(请注意,分隔符是分号,因为 SSL 用户名可能包含逗号)。默认的 PrimaryType 字符串“User”区分大小写。
super.users=User:Bob;User:Alice
Kraft 主要转发
在 KRaft 集群中,诸如CreateTopics
和 之类的管理请求DeleteTopics
由客户端发送到代理侦听器。然后,代理通过在 中配置的第一个侦听器将请求转发到活动控制器controller.listener.names
。这些请求的授权是在控制器节点上完成的。这是通过Envelope
打包来自客户端和客户端主体的底层请求的请求来实现的。当控制器收到Envelope
代理转发的请求时,它首先Envelope
使用经过身份验证的代理主体来授权该请求。然后它使用转发的主体授权底层请求。
所有这些都意味着 Kafka 必须了解如何序列化和反序列化客户端主体。身份验证框架允许通过覆盖principal.builder.class
配置来定制主体。为了使自定义主体能够与 KRaft 一起使用,配置的类必须实现,org.apache.kafka.common.security.auth.KafkaPrincipalSerde
以便 Kafka 知道如何序列化和反序列化主体。默认实现org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
使用源代码中定义的 Kafka RPC 格式:clients/src/main/resources/common/message/DefaultPrincipalData.json
。有关 KRaft 中请求转发的更多详细信息,请参阅KIP-590
自定义 SSL 用户名
默认情况下,SSL 用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。人们可以通过ssl.principal.mapping.rules
在 server.properties 中设置自定义规则来更改这一点。此配置允许将 X.500 可分辨名称映射到短名称的规则列表。规则按顺序进行评估,并且与专有名称匹配的第一个规则用于将其映射到短名称。列表中的任何后续规则都将被忽略。
格式为ssl.principal.mapping.rules
是一个列表,其中每个规则都以“RULE:”开头,并包含以下格式的表达式。默认规则将返回 X.500 证书专有名称的字符串表示形式。如果专有名称与模式匹配,则将对该名称运行替换命令。它还支持小写/大写选项,以强制翻译结果全部为小写/大写。这是通过在规则末尾添加“/L”或“/U”来完成的。
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]
示例ssl.principal.mapping.rules
值是:
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT
上述规则将专有名称“CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”转换为“serviceuser”和“CN=adminUser,OU=Admin,O=Unknown,L=”未知,ST=未知,C=未知”到“adminuser@admin”。
对于高级用例,可以通过在 server.properties 中设置自定义的 PrimaryBuilder 来自定义名称,如下所示。
principal.builder.class=CustomizedPrincipalBuilderClass
自定义 SASL 用户名
默认情况下,SASL 用户名将是 Kerberos 主体的主要部分。人们可以通过sasl.kerberos.principal.to.local.rules
在 server.properties 中设置自定义规则来更改这一点。的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)sasl.kerberos.principal.to.local.rules
中的 auth_to_local 相同。这还支持额外的小写/大写规则,以强制翻译结果全部为小写/大写。这是通过在规则末尾添加“/L”或“/U”来完成的。检查以下格式的语法。每条规则均以 RULE: 开头,并包含如下格式的表达式。有关更多详细信息,请参阅 kerberos 文档。
RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U
添加规则以将 user@MYDOMAIN.COM 正确转换为 user 同时保留默认规则的示例如下:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
命令行界面
Kafka 授权管理 CLI 与所有其他 CLI 都可以在 bin 目录下找到。CLI 脚本称为kafka-acls.sh。以下列出了该脚本支持的所有选项:
OPTION | DESCRIPTION | DEFAULT | OPTION TYPE |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--bootstrap-server | A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. | Configuration | |
--command-config | A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. | Configuration | |
--cluster | Indicates to the script that the user is trying to interact with acls on the singular cluster resource. | ResourcePattern | |
--topic [topic-name] | Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). | ResourcePattern | |
--group [group-name] | Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) | ResourcePattern | |
--transactional-id [transactional-id] | The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. | ResourcePattern | |
--delegation-token [delegation-token] | Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. | ResourcePattern | |
--user-principal [user-principal] | A user resource to which ACLs should be added or removed. This is currently supported in relation with delegation tokens. A value of * indicates ACL should apply to all users. | ResourcePattern | |
--resource-pattern-type [pattern-type] | Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use. \n When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. \n When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). \n WARNING: 'match', when used in combination with the '--remove' switch, should be used with care. | literal | Configuration |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. \n You can specify multiple --allow-principal in a single command. | Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. \n You can specify multiple --deny-principal in a single command. | Principal | |
--principal | Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. \n You can specify multiple --principal in a single command. | Principal | |
--allow-host | IP address from which principals listed in --allow-principal will have access. | if --allow-principal is specified defaults to * which translates to "all hosts" | Host |
--deny-host | IP address from which principals listed in --deny-principal will be denied access. | if --deny-principal is specified defaults to * which translates to "all hosts" | Host |
--operation | Operation that will be allowed or denied. \n有效值为:\n Read\n Write\n Create\n Delete\n Alter\n Describe\n ClusterAction\n DescribeConfigs\n AlterConfigs\n IdempotentWrite\n CreateTokens\n DescribeTokens\n* All | All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience | |
--idempotent | Enable idempotence for the producer. This should be used in combination with the --producer option. \n Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id. | Convenience | |
--force | Convenience option to assume yes to all queries and do not prompt. | Convenience | |
--authorizer | (DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer. | kafka.security.authorizer.AclAuthorizer | Configuration |
--authorizer-properties | (DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--zk-tls-config-file | (DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type | Configuration |
例子
- 添加ACL
假设您要添加一条ACL“允许主体User:Bob 和User:Alice 对来自IP 198.51.100.0 和IP 198.51.100.1 的Topic Test-Topic 进行读写操作”。您可以通过使用以下选项执行 CLI 来做到这一点:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默认情况下,所有不具有允许访问资源操作的显式 acl 的主体都会被拒绝。在极少数情况下,定义了允许访问除某些主体之外的所有主体的允许 acl,我们将必须使用 --deny-principal 和 --deny-host 选项。例如,如果我们想要允许所有用户从 Test-topic 读取,但仅拒绝来自 IP 198.51.100.3 的 User:BadBob,我们可以使用以下命令来实现:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
请注意,--allow-host
和--deny-host
仅支持 IP 地址(不支持主机名)。上面的示例通过指定 --topic [topic-name] 作为资源模式选项将 acl 添加到主题。同样,用户可以通过指定 --cluster 将 acl 添加到集群,并通过指定 --group [group-name] 将 acl 添加到消费者组。您可以在某种类型的任何资源上添加 acl,例如,假设您想添加一个 acl“主要用户:Peter 被允许从 IP 198.51.200.0 生成任何主题”,您可以通过使用通配符资源“*”来做到这一点,例如通过使用以下选项执行 CLI:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'
您可以在前缀资源模式上添加 acl,例如,假设您要添加一个 acl“主要用户:Jane 被允许从任何主机生成名称以“Test-”开头的任何主题”。您可以通过使用以下选项执行 CLI 来做到这一点:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed
请注意,--resource-pattern-type 默认为“literal”,它仅影响具有完全相同名称的资源,或者在通配符资源名称“*”的情况下,影响具有任何名称的资源。
* 删除 Acl
删除 acl 几乎是相同的。唯一的区别是用户必须指定 --remove 选项,而不是 --add 选项。要删除上面第一个示例添加的 acl,我们可以使用以下选项执行 CLI:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
如果您想删除添加到上面的前缀资源模式中的 acl,我们可以使用以下选项执行 CLI:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
- 列出 ACL
我们可以通过为资源指定 --list 选项来列出任何资源的 ACL。要列出文字资源模式 Test-topic 上的所有 acl,我们可以使用以下选项执行 CLI:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic
但是,这只会返回已添加到该确切资源模式的 acl。可能存在影响对主题的访问的其他 acl,例如主题通配符“*”上的任何 acl,或前缀资源模式上的任何 acl。可以显式查询通配符资源模式上的 ACL:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'
但是,不一定可以在与 Test-topic 匹配的前缀资源模式上显式查询 acl,因为此类模式的名称可能未知。我们可以使用“--resource-pattern-type match”列出影响测试主题的 所有acl,例如
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic --resource-pattern-type match
这将列出所有匹配文字、通配符和前缀资源模式的 acl。
* 添加或删除主体作为生产者或消费者
ACL 管理最常见的用例是添加/删除主体作为生产者或消费者,因此我们添加了方便的选项来处理这些情况。为了将 User:Bob 添加为 Test-topic 的生产者,我们可以执行以下命令:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --producer --topic Test-topic
类似地,要将 Alice 添加为消费者组 Group-1 的 Test-topic 消费者,我们只需传递 --consumer 选项:
> bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
请注意,对于消费者选项,我们还必须指定消费者组。为了从生产者或消费者角色中删除主体,我们只需要传递 --remove 选项。
* 基于Admin API的ACL管理 对
ClusterResource拥有Alter权限的用户可以使用Admin API进行ACL管理。kafka-acls.sh 脚本支持 AdminClient API 来管理 ACL,而无需直接与 Zookeeper/Authorizer 交互。上述所有示例都可以使用--bootstrap-server选项来执行。例如:
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:tokenRequester --operation CreateTokens --user-principal "owner1"
授权原语
协议调用通常是对 Kafka 中的某些资源执行一些操作。需要了解操作和资源才能建立有效的保护。在本节中,我们将列出这些操作和资源,然后列出这些操作和资源与协议的组合以查看有效的场景。
Kafka中的操作
有一些操作原语可用于建立权限。这些可以与某些资源相匹配,以允许给定用户进行特定协议调用。这些都是:
- Read
- Write
- Create
- Delete
- Alter
- Describe
- ClusterAction
- DescribeConfigs
- AlterConfigs
- IdempotentWrite
- CreateTokens
- DescribeTokens
- All
Kafka中的资源
上述操作可以应用于下面描述的某些资源。
- 主题:这仅代表一个主题。所有作用于主题(例如读取、写入主题)的协议调用都需要添加相应的权限。如果主题资源存在授权错误,则会返回 TOPIC_AUTHORIZATION_FAILED(错误代码:29)。
- 组:代表broker中的消费者组。所有与消费者组一起使用的协议调用(例如加入组)都必须具有主题组的权限。如果未授予权限,则协议响应中将返回 GROUP_AUTHORIZATION_FAILED(错误代码:30)。
- 集群:该资源代表集群。影响整个集群的操作(例如受控关闭)受到集群资源特权的保护。如果集群资源存在授权问题,则会返回 CLUSTER_AUTHORIZATION_FAILED(错误代码:31)。
- TransactionalId:该资源表示与事务相关的操作,例如提交。如果发生任何错误,经纪商将返回 TRANSACTIONAL_ID_AUTHORIZATION_FAILED(错误代码:53)。
- DelegationToken:这代表集群中的委托令牌。诸如描述委托令牌之类的操作可以通过DelegationToken资源上的特权来保护。由于这些对象在 Kafka 中具有一些特殊行为,因此建议阅读 KIP-48和使用委托令牌进行身份验证 中的相关上游文档。
- 用户:可以将CreateToken和DescribeToken操作授予用户资源,以允许为其他用户创建和描述令牌。更多信息可以在KIP-373中找到。
协议操作和资源
在下表中,我们将列出 Kafka API 协议对资源执行的有效操作。
PROTOCOL (API KEY) | OPERATION | RESOURCE | NOTE |
---|---|---|---|
PRODUCE (0) | Write | TransactionalId | An transactional producer which has its transactional.id set requires this privilege. |
PRODUCE (0) | IdempotentWrite | Cluster | An idempotent produce action requires this privilege. |
PRODUCE (0) | Write | Topic | This applies to a normal produce action. |
FETCH (1) | ClusterAction | Cluster | A follower must have ClusterAction on the Cluster resource in order to fetch partition data. |
FETCH (1) | Read | Topic | Regular Kafka consumers need READ permission on each partition they are fetching. |
LIST_OFFSETS (2) | Describe | Topic | |
METADATA (3) | Describe | Topic | |
METADATA (3) | Create | Cluster | If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one). |
METADATA (3) | Create | Topic | This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above). |
LEADER_AND_ISR (4) | ClusterAction | Cluster | |
STOP_REPLICA (5) | ClusterAction | Cluster | |
UPDATE_METADATA (6) | ClusterAction | Cluster | |
CONTROLLED_SHUTDOWN (7) | ClusterAction | Cluster | |
OFFSET_COMMIT (8) | Read | Group | An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access. |
OFFSET_COMMIT (8) | Read | Topic | Since offset commit is part of the consuming process, it needs privileges for the read action. |
OFFSET_FETCH (9) | Describe | Group | Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access. |
OFFSET_FETCH (9) | Describe | Topic | |
FIND_COORDINATOR (10) | Describe | Group | The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode. |
FIND_COORDINATOR (10) | Describe | TransactionalId | This applies only on transactional producers and checked when a producer tries to find the transaction coordinator. |
JOIN_GROUP (11) | Read | Group | |
HEARTBEAT (12) | Read | Group | |
LEAVE_GROUP (13) | Read | Group | |
SYNC_GROUP (14) | Read | Group | |
DESCRIBE_GROUPS (15) | Describe | Group | |
LIST_GROUPS (16) | Describe | Cluster | When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. |
LIST_GROUPS (16) | Describe | Group | If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release. |
SASL_HANDSHAKE (17) | The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
API_VERSIONS (18) | The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization. | ||
CREATE_TOPICS (19) | Create | Cluster | If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem. |
CREATE_TOPICS (19) | Create | Topic | This is applicable from the 2.0 release. |
DELETE_TOPICS (20) | Delete | Topic | |
DELETE_RECORDS (21) | Delete | Topic | |
INIT_PRODUCER_ID (22) | Write | TransactionalId | |
INIT_PRODUCER_ID (22) | IdempotentWrite | Cluster | |
OFFSET_FOR_LEADER_EPOCH (23) | ClusterAction | Cluster | If there is no cluster level privilege for this operation, then it'll check for topic level one. |
OFFSET_FOR_LEADER_EPOCH (23) | Describe | Topic | This is applicable from the 2.1 release. |
ADD_PARTITIONS_TO_TXN (24) | Write | TransactionalId | This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below). |
ADD_PARTITIONS_TO_TXN (24) | Write | Topic | |
ADD_OFFSETS_TO_TXN (25) | Write | TransactionalId | Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below). |
ADD_OFFSETS_TO_TXN (25) | Read | Group | |
END_TXN (26) | Write | TransactionalId | |
WRITE_TXN_MARKERS (27) | ClusterAction | Cluster | |
TXN_OFFSET_COMMIT (28) | Write | TransactionalId | |
TXN_OFFSET_COMMIT (28) | Read | Group | |
TXN_OFFSET_COMMIT (28) | Read | Topic | |
DESCRIBE_ACLS (29) | Describe | Cluster | |
CREATE_ACLS (30) | Alter | Cluster | |
DELETE_ACLS (31) | Alter | Cluster | |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Cluster | If broker configs are requested, then the broker will check cluster level privileges. |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Topic | If topic configs are requested, then the broker will check topic level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_REPLICA_LOG_DIRS (34) | Alter | Cluster | |
DESCRIBE_LOG_DIRS (35) | Describe | Cluster | An empty response will be returned on authorization failure. |
SASL_AUTHENTICATE (36) | SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
CREATE_PARTITIONS (37) | Alter | Topic | |
CREATE_DELEGATION_TOKEN (38) | Creating delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection. | ||
CREATE_DELEGATION_TOKEN (38) | CreateTokens | User | Allows creating delegation tokens for the User resource. |
RENEW_DELEGATION_TOKEN (39) | Renewing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection. | ||
EXPIRE_DELEGATION_TOKEN (40) | Expiring delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection. | ||
DESCRIBE_DELEGATION_TOKEN (41) | Describe | DelegationToken | Describing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokenssection. |
DESCRIBE_DELEGATION_TOKEN (41) | DescribeTokens | User | Allows describing delegation tokens of the User resource. |
DELETE_GROUPS (42) | Delete | Group | |
ELECT_PREFERRED_LEADERS (43) | ClusterAction | Cluster | |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_PARTITION_REASSIGNMENTS (45) | Alter | Cluster | |
LIST_PARTITION_REASSIGNMENTS (46) | Describe | Cluster | |
OFFSET_DELETE (47) | Delete | Group | |
OFFSET_DELETE (47) | Read | Topic | |
DESCRIBE_CLIENT_QUOTAS (48) | DescribeConfigs | Cluster | |
ALTER_CLIENT_QUOTAS (49) | AlterConfigs | Cluster | |
DESCRIBE_USER_SCRAM_CREDENTIALS (50) | Describe | Cluster | |
ALTER_USER_SCRAM_CREDENTIALS (51) | Alter | Cluster | |
VOTE (52) | ClusterAction | Cluster | |
BEGIN_QUORUM_EPOCH (53) | ClusterAction | Cluster | |
END_QUORUM_EPOCH (54) | ClusterAction | Cluster | |
DESCRIBE_QUORUM (55) | Describe | Cluster | |
ALTER_PARTITION (56) | ClusterAction | Cluster | |
UPDATE_FEATURES (57) | Alter | Cluster | |
ENVELOPE (58) | ClusterAction | Cluster | |
FETCH_SNAPSHOT (59) | ClusterAction | Cluster | |
DESCRIBE_CLUSTER (60) | Describe | Cluster | |
DESCRIBE_PRODUCERS (61) | Read | Topic | |
BROKER_REGISTRATION (62) | ClusterAction | Cluster | |
BROKER_HEARTBEAT (63) | ClusterAction | Cluster | |
UNREGISTER_BROKER (64) | Alter | Cluster | |
DESCRIBE_TRANSACTIONS (65) | Describe | TransactionalId | |
LIST_TRANSACTIONS (66) | Describe | TransactionalId | |
ALLOCATE_PRODUCER_IDS (67) | ClusterAction | Cluster | |
CONSUMER_GROUP_HEARTBEAT (68) | Read | Group |
7.6 将安全功能纳入正在运行的集群中
您可以通过前面讨论的一种或多种受支持的协议来保护正在运行的集群。这是分阶段完成的:
- 增量反弹集群节点以打开其他安全端口。
- 使用安全端口而不是 PLAINTEXT 端口重新启动客户端(假设您正在保护客户端代理连接)。
- 再次增量地反弹集群以启用代理到代理的安全性(如果需要)
- 最后增量反弹以关闭 PLAINTEXT 端口。
配置SSL和SASL的具体步骤在7.3和7.4 节中描述。请按照以下步骤为您所需的协议启用安全性。
安全实现允许您为代理-客户端和代理-代理通信配置不同的协议。这些必须在单独的反弹中启用。PLAINTEXT 端口必须始终保持开放状态,以便代理和/或客户端可以继续通信。
当执行增量反弹时,通过 SIGTERM 彻底停止代理。在移动到下一个节点之前,等待重新启动的副本返回到 ISR 列表也是一个很好的做法。
举个例子,假设我们希望使用 SSL 加密代理-客户端和代理-代理通信。在第一次增量反弹中,每个节点上都会打开一个 SSL 端口:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
然后我们重新启动客户端,更改其配置以指向新打开的安全端口:
bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc
在第二次增量服务器反弹中,我们指示 Kafka 使用 SSL 作为代理-代理协议(将使用相同的 SSL 端口):
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL
在最后的反弹中,我们通过关闭 PLAINTEXT 端口来保护集群:
listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL
或者,我们可以选择打开多个端口,以便可以使用不同的协议进行broker-broker和broker-客户端通信。假设我们希望在整个过程中使用 SSL 加密(即用于代理-代理和代理-客户端通信),但我们还希望将 SASL 身份验证添加到代理-客户端连接。我们将通过在第一次反弹期间打开两个额外的端口来实现这一点:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
然后,我们将重新启动客户端,更改其配置以指向新打开的 SASL 和 SSL 安全端口:
bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc
第二次服务器反弹会将集群切换为通过我们之前在端口 9092 上打开的 SSL 端口使用加密的代理间通信:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
最后的反弹通过关闭 PLAINTEXT 端口来保护集群。
listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
ZooKeeper 的安全可以独立于 Kafka 集群。第7.7.2节介绍了执行此操作的步骤。
7.7 ZooKeeper 认证
ZooKeeper 从 3.5.x 版本开始支持双向 TLS (mTLS) 身份验证。从 2.5 版本开始,Kafka 支持使用 SASL 和 mTLS 单独或同时向 ZooKeeper 进行身份验证。 有关更多详细信息, 请参阅 KIP-515:启用 ZK 客户端使用新的 TLS 支持的身份验证。
单独使用 mTLS 时,每个代理和任何 CLI 工具(例如ZooKeeper 安全迁移工具)都应使用相同的可分辨名称 (DN) 来标识自己,因为它是经过 ACL 处理的 DN。这可以按如下所述进行更改,但它涉及编写和部署自定义 ZooKeeper 身份验证提供程序。通常,每个证书应具有相同的 DN,但具有不同的主题备用名称 (SAN),以便 ZooKeeper 对代理和任何 CLI 工具的主机名验证能够成功。
当将 SASL 身份验证与 mTLS 一起使用到 ZooKeeper 时,SASL 身份和创建 znode 的 DN(即创建代理的证书)或安全迁移工具的 DN(如果在创建 znode 后执行迁移)都将被ACL'ed,并且所有代理和 CLI 工具都将获得授权,即使它们都使用不同的 DN,因为它们都将使用相同的 ACL'ed SASL 身份。仅当单独使用 mTLS 身份验证时,所有 DN 都必须匹配(并且 SAN 变得至关重要——同样,在没有编写和部署自定义 ZooKeeper 身份验证提供程序的情况下,如下所述)。
使用代理属性文件为代理设置 TLS 配置,如下所述。
使用--zk-tls-config-file
使用-zk-tls-config-file
7.7.1 新集群
7.7.1.1 ZooKeeper SASL 身份验证
要在代理上启用 ZooKeeper SASL 身份验证,有两个必要步骤:
- 创建 JAAS 登录文件并设置相应的系统属性以指向它,如上所述
- 将每个代理中的配置属性zookeeper.set.acl设置为true
Kafka 集群的 ZooKeeper 中存储的元数据是世界可读的,但只能由代理修改。这一决定背后的理由是,ZooKeeper 中存储的数据并不敏感,但对该数据的不当操作可能会导致集群中断。我们还建议通过网络分段限制对 ZooKeeper 的访问(只有代理和某些管理工具需要访问 ZooKeeper)。
7.7.1.2 ZooKeeper 相互 TLS 身份验证
ZooKeeper mTLS 身份验证可以在有或没有 SASL 身份验证的情况下启用。如上所述,单独使用 mTLS 时,每个代理和任何 CLI 工具(例如ZooKeeper 安全迁移工具)通常必须使用相同的可分辨名称 (DN) 来标识自己,因为它是经过 ACL 处理的 DN,这意味着每个证书应具有适当的主题备用名称 (SAN),以便 ZooKeeper 对代理和任何 CLI 工具的主机名验证能够成功。
通过编写一个扩展org.apache.zookeeper.server.auth.X509AuthenticationProvider的类并覆盖方法 protected String getClientId(X509Certificate clientCert) , 可以使用 DN 以外的其他内容来标识 mTLS 客户端。选择一个方案名称,并将ZooKeeper 中的authProvider.[scheme]设置为自定义实现的完全限定类名称;然后设置ssl.authProvider=[scheme]来使用它。
以下是用于启用 TLS 身份验证的 ZooKeeper 配置示例(部分)。这些配置在ZooKeeper 管理指南中有描述 。
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
重要提示:ZooKeeper 不支持将 ZooKeeper 服务器密钥库中的密钥密码设置为与密钥库密码本身不同的值。请务必将密钥密码设置为与密钥库密码相同。
以下是使用 mTLS 身份验证连接到 ZooKeeper 的 Kafka Broker 配置示例(部分)。这些配置在上面的Broker Configs中有描述。
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true
重要提示:ZooKeeper 不支持将 ZooKeeper 客户端(即代理)密钥库中的密钥密码设置为与密钥库密码本身不同的值。请务必将密钥密码设置为与密钥库密码相同。
7.7.2 迁移集群
如果您运行的 Kafka 版本不支持安全性或只是禁用了安全性,并且希望确保集群安全,那么您需要执行以下步骤来启用 ZooKeeper 身份验证,同时将对您的操作造成的干扰降至最低:
- 在 ZooKeeper 上启用 SASL 和/或 mTLS 身份验证。如果启用 mTLS,您现在将同时拥有非 TLS 端口和 TLS 端口,如下所示:
clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
- 根据需要执行代理的滚动重启,设置 JAAS 登录文件和/或定义 ZooKeeper 双向 TLS 配置(包括连接到启用 TLS 的 ZooKeeper 端口),这使代理能够向 ZooKeeper 进行身份验证。在滚动重启结束时,代理能够使用严格的 ACL 操作 znode,但它们不会使用这些 ACL 创建 znode
- 如果启用了 mTLS,请在 ZooKeeper 中禁用非 TLS 端口
- 对代理执行第二次滚动重启,这次将配置参数zookeeper.set.acl设置为true,这样可以在创建znode时使用安全ACL
- 执行 ZkSecurityMigrator 工具。要执行该工具,有以下脚本:bin/zookeeper-security-migration.sh,其中zookeeper.acl设置为secure。该工具遍历相应的子树,更改 znode 的 ACL。
--zk-tls-config-file <file>
如果启用 mTLS,请使用该选项。
还可以关闭安全集群中的身份验证。为此,请按照下列步骤操作:
- 对代理执行滚动重启,设置 JAAS 登录文件和/或定义 ZooKeeper 双向 TLS 配置,这使代理能够进行身份验证,但将Zookeeper.set.acl设置为 false。在滚动重启结束时,代理停止使用安全 ACL 创建 znode,但仍然能够验证和操作所有 znode
- 执行 ZkSecurityMigrator 工具。要执行该工具,请运行此脚本bin/zookeeper-security-migration.sh,并将Zookeeper.acl设置为不安全。该工具遍历相应的子树,更改 znode 的 ACL。
--zk-tls-config-file <file>
如果需要设置 TLS 配置,请使用该选项。 - 如果要禁用 mTLS,请在 ZooKeeper 中启用非 TLS 端口
- 对代理执行第二次滚动重启,这次省略设置 JAAS 登录文件的系统属性和/或根据需要删除 ZooKeeper 双向 TLS 配置(包括连接到未启用 TLS 的 ZooKeeper 端口)
- 如果要禁用 mTLS,请在 ZooKeeper 中禁用 TLS 端口
以下是如何运行迁移工具的示例:
> bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
运行此命令以查看完整的参数列表:
> bin/zookeeper-security-migration.sh --help
7.7.3 迁移 ZooKeeper 整体
还需要在 ZooKeeper 整体上启用 SASL 和/或 mTLS 身份验证。为此,我们需要执行服务器的滚动重新启动并设置一些属性。有关 mTLS 信息,请参阅上文。更多详细信息请参考 ZooKeeper 文档:
7.7.4 ZooKeeper 仲裁相互 TLS 认证
可以在 ZooKeeper 服务器本身之间启用 mTLS 身份验证。请参阅ZooKeeper 文档以获取更多详细信息。
7.8 ZooKeeper加密
使用双向 TLS 的 ZooKeeper 连接是加密的。从 ZooKeeper 3.5.7 版(Kafka 2.5 版附带的版本)开始,ZooKeeper 支持服务器端配置 ssl.clientAuth(不区分大小写:want / need / none是有效选项,默认为need),并设置此ZooKeeper 中的值为none允许客户端通过 TLS 加密连接进行连接,而无需提供自己的证书。以下是仅使用 TLS 加密连接到 ZooKeeper 的 Kafka Broker 配置示例(部分)。这些配置在上面的Broker Configs中有描述。
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true