Technology Sharing

Nodejs Chapter 80 (Kafka Advanced)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

insert image description here

kafkaThe prerequisite knowledge has been discussed in the previous chapters and will not be repeated here.

Kafka Cluster Operations

1. Create multiple kafka services

Make a copykafkaThe entire directory was renamedkafka2

ReviseConfiguration Files kafka2/config/server.properties This file

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

start upzooKeeperand kafka and kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2. Client management

View cluster information and client objects

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

The return value can be used to view the information of the connected cluster, such as port ID, etc.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Create a TopiccreateTopicsWill parsetrueWhether the topic was successfully created orfalseIf an error occurs, this method will throw an exception

Delete a topicadmin.deleteTopics Pass in the topic to be deleted

View the list of topicslistTopicsLists the names of all existing topics and returns an array of strings. If an error occurs, this method will throw an exception.

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Affairs

KafkaJS provides support for Kafka transactions, which can be used to perform operations with transactional characteristics. Kafka transactions are used to ensure that a group of related messages are either全部成功提交要么全部回滚, thereby maintaining data consistency

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29