Technology Sharing

SSL certification tutorial for kafka and zookeeper

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Author: Xu Yuan from Lewei Community (forum.lwops.cn)
When building modern distributed systems, ensuring the security of data transmission is crucial. Apache Kafka and Zookeeper, as popular distributed message queues and coordination services, provide SSL (Secure Sockets Layer) authentication mechanisms to enhance the security of data transmission.
This article will detail the entire process from generating SSL certificates to configuring the server and client to ensure that data is fully protected during transmission.

1. Configure Kafka account password:
1. First, you need to modify the kafka configuration file: vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties

broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://10.176.31.137:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#Authentication protocol used
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL mechanism
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#Class that completes authentication
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#If no ACL (Access Control List) configuration is found, any operation is allowed.
allow.everyone.if.no.acl.found=false
#You need to enable the setting of super administrator and set the visitor user as super administrator
super.users=User:visitor

2. Secondly, create a login authentication file for the server. You can name the file according to your preferences, such as vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf. The file content is as follows

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};

3. Then modify the kafka installation directory vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh and add variables at the top of the file

export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
insert image description here
4. Next, create a login verification file for the consumer and producer. You can name the file according to your preferences, such as kafka_client_jaas.conf. The file content is as follows (if it is program access, such as springboot access, it can be not configured)
vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”;
};

5. Add the following configurations to consumer.properties and producer.properties respectively:
vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties
vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

6. Modify the Kafka installation directory bin/kafka-console-producer.sh and bin/kafka-console-consumer.sh, and add variables at the top of the file
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"insert image description here
7. Start Zookeeper and Kafka respectively. At this point, the server-side Kafka user login verification configuration is completed (close Kafka first and then Zookeeper)

Shut down the kafka service
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

Start the kafka service
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

Shut down the service zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
Start service zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

8. Create and view topics

/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list 10.176.31.137:9092 --topic cmdb --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
insert image description here
Receiving Messages

/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 10.176.31.137:9092 --topic cmdb --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAINinsert image description here

2. Configure ssal account and password for zk and kafka:

  1. Zookeeper configuration SASL
    1.1 Create a new zoo_jaas.conf file
    There are no special requirements for the zoo_jaas.conf file name and file path. It is usually placed in the ${ZOOKEEPER_HOME}/conf directory. vim /asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin@12”
user_kafka=“kafka@123”;
};

    Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
    Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名
  • 1
  • 2

1.2 Configure the /asop/zk/zookeeper-3.4.13/conf/zoo.cfg file
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true

Set zookeeper.sasl.client to true to enable client authentication. Otherwise, the username configured in zoo_jaas.conf will not work. The client can still connect without a jaas file, but with a WARNNING.

1.3 Importing dependent packages
Because the permission verification class used is: org.apache.kafka.common.security.plain.PlainLoginModule, kafka-related jar packages are required. Create a new folder zk_sasl_lib as follows: Copy the following jar packages from the kafka/lib directory to the zookeeper lib and the newly created zk_sasl_lib directory:

kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.jar
snappy-java-1.1.7.3.jar

mkdir /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/

1.4 Modify the zkEnv.sh file /asop/zk/zookeeper-3.4.13/bin/zkEnv.sh
Before modification: If not, add

export SERVER_JVMFLAGS=“-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS”

After modification:

for jar in /asop/zk/zookeeper-3.4.13/zk_sasl_lib/*.jar;
do
CLASSPATH=“ j a r : jar: jar:CLASSPATH”
done

export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf "

Restart the Zookeeper service

Shut down the service zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
Start service zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

  1. Kakfa configures SASL
    2.1 Create a new kafka_server_jaas.conf file
    There is no requirement for the kafka_server_jaas.conf file name and storage path. It is usually placed in the ${KAFKA_HOME}/config directory/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“kafka”
password=“kafka@123”;
};

    KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
  • 1

KafkaServer.user_xxx xxx must be the same as the username and password configured in KafkaServer.username
KafkaServer.user_producer, KafkaServer.user_consumer are used to prepare for the subsequent ACL, so that consumers and producers use different accounts and the consumer account can only consume data, and the producer account can only produce data
Client.username and Client.password are filled in with the account and password registered in Zookeeper, which are used for communication between broker and zookeeper (if zookeeper is not configured with SASL, it can be ignored; if zookeeper.sasl.client is false, it can also be ignored. The log is as follows)

[2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/Users/wjun/env/kafka/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

2.2 Modify the server.properties file
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.168.157.198:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#Authentication protocol used
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL mechanism
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#Class that completes authentication
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#If no ACL (Access Control List) configuration is found, any operation is allowed.
allow.everyone.if.no.acl.found=false
#You need to enable the setting of super administrator and set the visitor user as super administrator
super.users=User:visitor

The localhost needs to be changed to an IP address

super.users configures super users, which are not affected by subsequent ACL configurations

2.3 Modify the startup script
Modify the kafka-server-start.sh file to load it into the kafka_server_jaas.conf file /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh

before fixing:

if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G”
fi

After modification:
(Add this line to the first line first, if it is already there, you don't need to add it) export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf”
fi

Set ZooKeeper ACL rules
/asop/zk/zookeeper-3.4.13/bin/zkCli.sh #Enter zk's command line mode

addauth digest admin:admin@12 #Switch the login user (the super administrator is in the zk configuration file /asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf)

setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa #(Set the IP and user account password for login. admin is the administrator defined in the zk configuration file above. The Kafka user is the user defined in the /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf file for connecting to zk (under Client))

addauth digest kafka:kafka@123 #Switch to kafka user and set acl again
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa

Note: If you want to add a whitelist IP or the user wants to add it on the basis of the original one, otherwise it will be overwritten
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa,auth:admin:admin@12:cdrwa,ip:1.1.1.1

Need to restore permissions, if you do not set acl, run
setAcl / world:anyone:cdrwa

Just restart the kafka service

Shut down the kafka service
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

Start the kafka service
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

At this point, the SSL authentication configuration for Kafka and Zookeeper is complete. For more operation and maintenance tips, please follow the Lewei community. For more operation and maintenance questions, please leave a message.