내 연락처 정보
우편메소피아@프로톤메일.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
작성자 Lewei 커뮤니티(forum.lwops.cn) Xu Yuan
최신 분산 시스템을 구축할 때 데이터 전송의 보안을 보장하는 것이 중요합니다. 널리 사용되는 분산 메시지 대기열 및 조정 서비스인 Apache Kafka 및 Zookeeper는 SSL(Secure Sockets Layer) 인증 메커니즘을 제공하여 데이터 전송 중 보안을 강화합니다.
이 문서에서는 전송 중에 데이터가 완전히 보호되도록 SSL 인증서 생성부터 서버 및 클라이언트 구성까지 전체 프로세스를 자세히 소개합니다.
1. Kafka 계정 비밀번호를 구성합니다.
1. 먼저 kafka 구성 파일 vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties를 수정해야 합니다.
브로커.아이디=0
리스너=SASL_PLAINTEXT://:9092
광고.리스너=SASL_PLAINTEXT://10.176.31.137:9092
숫자.네트워크.스레드=3
숫자.io.스레드=8
소켓.전송.버퍼.바이트=102400
소켓.수신.버퍼.바이트=102400
소켓.요청.최대.바이트=104857600
log.dirs=/asop/kafka/logs
숫자.파티션 = 1
데이터 디렉토리당 복구 스레드 수=1
오프셋.주제.복제.인자=1
트랜잭션.상태.로그.복제.인자=1
거래.상태.로그.최소.isr=1
로그.보존.시간=168
로그.세그먼트.바이트=1073741824
로그.보존.확인.간격.ms=300000
동물원 관리자.연결=로컬호스트:2181
동물원 관리자.연결.시간 초과.ms=6000
그룹.초기.재균형.지연.ms=0
#사용된 인증 프로토콜
보안.inter.broker.protocol=SASL_PLAINTEXT
#SASL메커니즘
sasl.enabled.mechanisms=일반
sasl.mechanism.inter.broker.protocol=일반
#인증완료를 위한 클래스
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#ACL(액세스 제어 목록) 구성이 없으면 모든 작업이 허용됩니다.
허용.모두.만약.acl.발견.안됨=false
#최고관리자 설정을 활성화하고 방문자 사용자를 최고관리자로 설정해야 합니다.
super.users=사용자:방문자
2. 둘째, 서버에 대한 로그인 확인 파일을 생성합니다. vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf와 같이 원하는 대로 파일 이름을 지정할 수 있습니다. 다음
카프카서버 {
org.apache.kafka.common.security.plain.PlainLoginModule이 필요합니다.
사용자 이름="방문자"
비밀번호="qaz@123"
사용자_방문자="qaz@123";
};
3. 그런 다음 kafka 설치 디렉터리 vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh를 수정하고 파일 상단에 변수를 추가합니다.
KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"를 내보냅니다.
4. 다음으로 소비자와 생산자에 대한 로그인 확인 파일을 생성합니다. kafka_client_jaas.conf 등 원하는 대로 파일 이름을 지정하면 됩니다. 파일 내용은 다음과 같습니다(springboot 액세스와 같은 프로그램 액세스인 경우). , 구성할 필요가 없습니다)
vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf
카프카클라이언트 {
org.apache.kafka.common.security.plain.PlainLoginModule이 필요합니다.
사용자 이름="방문자"
비밀번호="qaz@123";
};
5. Consumer.properties 및 producer.properties에 각각 다음 구성을 추가합니다.
vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties
vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties
보안 프로토콜=SASL_PLAINTEXT
sasl.mechanism=일반
6. kafka 설치 디렉터리 bin/kafka-console-producer.sh 및 bin/kafka-console-consumer.sh를 수정하고 파일 상단에 변수를 추가합니다.
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh
KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"를 내보냅니다.
7. Zookeeper와 Kafka를 각각 시작하면 서버측 Kafka 사용자 로그인 확인 구성이 완료됩니다(먼저 Kafka를 닫은 다음 Zookeeper를 닫습니다).
서비스 카프카 종료
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -데몬 /asop/kafka/kafka_2.11-2.1.0/config/server.properties
서비스 카프카 시작
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -데몬 /asop/kafka/kafka_2.11-2.1.0/config/server.properties
사육사 서비스 종료-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh /asop/zk/zookeeper-3.4.13/conf/zoo.cfg를 중지합니다.
사육사-3.4.13 서비스 시작
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh /asop/zk/zookeeper-3.4.13/conf/zoo.cfg를 시작합니다.
8. 테마 생성 및 보기
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --브로커 목록 10.176.31.137:9092 --주제 cmdb --생산자 속성 보안.프로토콜=SASL_PLAINTEXT --생산자 속성 sasl.메커니즘=PLAIN
메시지를 받다
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --부트스트랩 서버 10.176.31.137:9092 --주제 cmdb --시작부터 --소비자 속성 보안.프로토콜=SASL_PLAINTEXT --소비자 속성 sasl.메커니즘=PLAIN
2. zk 및 kafka에 대한 ssal 계정 비밀번호를 구성하십시오.
서버 {
org.apache.kafka.common.security.plain.PlainLoginModule이 필요합니다.
사용자 이름="관리자"
비밀번호="admin@12"
사용자_카프카="카프카@123";
};
Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名
1.2 /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 파일 구성
authProvider.1=org.apache.zookeeper.server.auth.SASL인증 공급자
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=참
Zookeeper.sasl.client는 클라이언트 인증을 활성화하기 위해 true로 설정됩니다. 그렇지 않으면 Zoo_jaas.conf에 구성된 사용자 이름은 jaas 파일 없이도 연결할 수 있지만 경고가 표시됩니다.
1.3 종속성 패키지 가져오기
사용된 권한 확인 클래스는 org.apache.kafka.common.security.plain.PlainLoginModule이므로 kafka 관련 jar 패키지가 필요하므로 다음과 같이 zk_sasl_lib 새 폴더를 생성합니다. kafka/lib 디렉터리에서 다음 jar 패키지를 복사합니다. 사육사 lib 및 새로 생성된 zk_sasl_lib 디렉토리:
카프카 클라이언트-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.jar
스내피-자바-1.1.7.3.jar
mkdir /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/lib/
/asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/를 압축 해제합니다.
/asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/를 압축 해제합니다.
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
/asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib를 cp합니다.
/asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib의 cp
/asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib를 압축 해제합니다.
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
1.4 zkEnv.sh 파일 수정/asop/zk/zookeeper-3.4.13/bin/zkEnv.sh
수정 전: 없으면 직접 추가
SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"를 내보냅니다.
수정 후:
/asop/zk/zookeeper-3.4.13/zk_sasl_lib/*.jar에 있는 jar 파일입니다.
하다
클래스 경로=" 항아리 : 항아리:예아르 자형:클래스 경로”
완료
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf "를 내보냅니다.
Zookeeper 서비스를 다시 시작하세요.
사육사 서비스 종료-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh /asop/zk/zookeeper-3.4.13/conf/zoo.cfg를 중지합니다.
사육사-3.4.13 서비스 시작
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh /asop/zk/zookeeper-3.4.13/conf/zoo.cfg를 시작합니다.
카프카서버 {
org.apache.kafka.common.security.plain.PlainLoginModule이 필요합니다.
사용자 이름="방문자"
비밀번호="qaz@123"
사용자_방문자="qaz@123";
};
고객{
org.apache.kafka.common.security.plain.PlainLoginModule이 필요합니다.
사용자 이름="카프카"
비밀번호="kafka@123";
};
KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
KafkaServer.user_xxx 여기서 xxx는 KafkaServer.username에 구성된 사용자 이름과 일치해야 하며 비밀번호도 일치해야 합니다.
KafkaServer.user_producer 및 KafkaServer.user_consumer는 소비자와 생산자가 서로 다른 계정을 사용하고 소비자 계정은 데이터를 소비할 수만 있고 생산자 계정은 데이터를 생산만 할 수 있도록 후속 ACL을 준비합니다.
Client.username과 Client.password는 Zookeeper에 등록된 계정 비밀번호를 입력하는데, 이는 Broker와 Zookeeper 간의 통신에 사용됩니다. (Zookeeper가 SASL로 구성되지 않은 경우 무시할 수 있습니다. Zookeeper.sasl.client가 false인 경우에는 무시할 수 있습니다. 무시해도 됩니다. 로그는 다음과 같습니다.
[2021-06-29 17:14:30,204] WARN SASL 구성에 실패했습니다: javax.security.auth.login.LoginException: 지정된 JAAS 구성 파일 '/Users/wjun/env/kafka/config/kafka_server_jaas.conf'에서 'Client'라는 JAAS 구성 섹션을 찾을 수 없습니다. Zookeeper 서버에서 허용하는 경우 SASL 인증 없이 Zookeeper 서버에 계속 연결합니다. (org.apache.zookeeper.ClientCnxn)
2.2 server.properties 파일 수정
브로커.아이디=0
리스너=SASL_PLAINTEXT://:9092
광고.리스너=SASL_PLAINTEXT://192.168.157.198:9092
숫자.네트워크.스레드=3
숫자.io.스레드=8
소켓.전송.버퍼.바이트=102400
소켓.수신.버퍼.바이트=102400
소켓.요청.최대.바이트=104857600
log.dirs=/asop/kafka/logs
숫자.파티션 = 1
데이터 디렉토리당 복구 스레드 수=1
오프셋.주제.복제.인자=1
트랜잭션.상태.로그.복제.인자=1
거래.상태.로그.최소.isr=1
로그.보존.시간=168
로그.세그먼트.바이트=1073741824
로그.보존.확인.간격.ms=300000
동물원 관리자.연결=127.0.0.1:2181
동물원 관리자.연결.시간 초과.ms=6000
그룹.초기.재균형.지연.ms=0
#사용된 인증 프로토콜
보안.inter.broker.protocol=SASL_PLAINTEXT
#SASL메커니즘
sasl.enabled.mechanisms=일반
sasl.mechanism.inter.broker.protocol=일반
#인증완료를 위한 클래스
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#ACL(액세스 제어 목록) 구성이 없으면 모든 작업이 허용됩니다.
허용.모두.만약.acl.발견.안됨=false
#최고관리자 설정을 활성화하고 방문자 사용자를 최고관리자로 설정해야 합니다.
super.users=사용자:방문자
localhost를 IP 주소로 변경해야 하는 경우
super.users는 후속 ACL 구성의 영향을 받지 않는 슈퍼 사용자를 구성합니다.
2.3 시작 스크립트 수정
kafka-server-start.sh 파일을 수정하여 kafka_server_jaas.conf 파일/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh에 로드합니다.
수정하기 전에:
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; 그러면
KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"를 내보냅니다.
피
수정 후:
(이 줄을 첫 번째 줄에 먼저 추가하세요. 이미 있는 경우 추가할 필요가 없습니다.) 내보내기 KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11- 2.1.0/config/kafka_server_jaas.conf "
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; 그러면
KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"를 내보냅니다.
피
사육사에 대한 ACL 규칙 설정
/asop/zk/zookeeper-3.4.13/bin/zkCli.sh # zk의 명령줄 모드로 들어가기
addauth 다이제스트 admin:admin@12 # 로그인 사용자 전환(최고 관리자는 zk 구성 파일/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf에 있습니다)
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa #(로그인 가능한 IP와 사용자 계정 비밀번호를 설정합니다. admin은 위의 zk 구성 파일에 정의된 관리자이고, Kafka 사용자는 is /asop /kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf 파일(클라이언트 아래)에 정의된 kafka 연결 zk 사용자입니다.
addauth 다이제스트 kafka:kafka@123 #kafka 사용자로 전환하고 acl을 다시 설정
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa
참고: 화이트리스트 IP를 추가하려는 경우 또는 사용자가 원래대로 추가해야 합니다. 그렇지 않으면 덮어쓰게 됩니다.
setAcl / ip:127.0.0.1:cdrwa,인증:카프카:카프카@123:cdrwa,인증:관리자:관리자@12:cdrwa,ip:1.1.1.1
권한을 복원해야 합니다. acl을 설정하지 않고 실행하세요.
setAcl / world:anyone:cdrwa
카프카 서비스를 다시 시작하세요
서비스 카프카 종료
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -데몬 /asop/kafka/kafka_2.11-2.1.0/config/server.properties
서비스 카프카 시작
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -데몬 /asop/kafka/kafka_2.11-2.1.0/config/server.properties
이제 Kafka와 Zookeeper의 SSL 인증 구성이 완료되었습니다. 더 많은 운영 및 유지 관리 기술을 원하시면 Lewei 커뮤니티에 관심을 가져주세요. 더 많은 운영 및 유지 관리 관련 질문은 메시지를 남겨주세요.