기술나눔

Nodejs 80장(카프카 고급)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

여기에 이미지 설명을 삽입하세요.

카프카전제조건 지식은 이전 장에서 논의되었으므로 다시 반복하지 않겠습니다.

Kafka 클러스터 작업

1. 여러 Kafka 서비스 만들기

사본 만들기kafka전체 디렉토리 이름이 변경됨kafka2

개정하다구성 파일 kafka2/config/server.properties 이 파일

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

시작하다동물원 관리자그리고 카프카와 카프카2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.클라이언트 관리

클러스터 정보 및 클라이언트 개체 보기

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

반환된 값은 포트 ID 등 클러스터 연결에 대한 정보를 보는 데 사용될 수 있습니다.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

주제 만들기createTopics분석할 것이다true주제가 성공적으로 생성되었는지 여부false 이미 존재합니까?오류가 발생하면 이 메서드는 예외를 발생시킵니다.

주제 삭제admin.deleteTopics 삭제된 주제 전달

주제 목록 보기listTopics 모든 기존 주제의 이름을 나열하고 문자열 배열을 반환합니다. 오류가 발생하면 이 메서드는 예외를 발생시킵니다.

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. 업무

KafkaJS는 트랜잭션 특성이 있는 작업을 수행하는 데 사용할 수 있는 Kafka 트랜잭션에 대한 지원을 제공합니다. Kafka 트랜잭션은 관련 메시지 그룹이 다음 중 하나인지 확인하는 데 사용됩니다.全部成功提交要么全部回滚, 이를 통해 데이터 일관성을 유지합니다.

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29