2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
kafkaThe prerequisite knowledge has been discussed in the previous chapters and will not be repeated here.
Make a copykafka
The entire directory was renamedkafka2
ReviseConfiguration Files kafka2/config/server.properties
This file
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
start upzooKeeperand kafka and kafka2
.binwindowskafka-server-start.bat .configserver.properties
View cluster information and client objects
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
The return value can be used to view the information of the connected cluster, such as port ID, etc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Create a TopiccreateTopics
Will parsetrue
Whether the topic was successfully created orfalse
If an error occurs, this method will throw an exception
Delete a topicadmin.deleteTopics
Pass in the topic to be deleted
View the list of topicslistTopics
Lists the names of all existing topics and returns an array of strings. If an error occurs, this method will throw an exception.
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS provides support for Kafka transactions, which can be used to perform operations with transactional characteristics. Kafka transactions are used to ensure that a group of related messages are either全部成功提交
,要么全部回滚
, thereby maintaining data consistency
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()