내 연락처 정보
우편메소피아@프로톤메일.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
카프카전제조건 지식은 이전 장에서 논의되었으므로 다시 반복하지 않겠습니다.
사본 만들기kafka
전체 디렉토리 이름이 변경됨kafka2
개정하다구성 파일 kafka2/config/server.properties
이 파일
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
시작하다동물원 관리자그리고 카프카와 카프카2
.binwindowskafka-server-start.bat .configserver.properties
클러스터 정보 및 클라이언트 개체 보기
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
반환된 값은 포트 ID 등 클러스터 연결에 대한 정보를 보는 데 사용될 수 있습니다.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
주제 만들기createTopics
분석할 것이다true
주제가 성공적으로 생성되었는지 여부false
이미 존재합니까?오류가 발생하면 이 메서드는 예외를 발생시킵니다.
주제 삭제admin.deleteTopics
삭제된 주제 전달
주제 목록 보기listTopics
모든 기존 주제의 이름을 나열하고 문자열 배열을 반환합니다. 오류가 발생하면 이 메서드는 예외를 발생시킵니다.
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS는 트랜잭션 특성이 있는 작업을 수행하는 데 사용할 수 있는 Kafka 트랜잭션에 대한 지원을 제공합니다. Kafka 트랜잭션은 관련 메시지 그룹이 다음 중 하나인지 확인하는 데 사용됩니다.全部成功提交
,要么全部回滚
, 이를 통해 데이터 일관성을 유지합니다.
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()