Teknologian jakaminen

Nodejs Chapter 80 (Kafka Advanced)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Lisää kuvan kuvaus tähän

kafkaEdellyttäviä tietoja on käsitelty edellisissä luvuissa, eikä niitä toisteta uudelleen.

Kafka-klusterin toiminta

1. Luo useita kafka-palveluita

Tee kopiokafkaTäydellinen hakemisto nimetty uudelleenkafka2

TarkistaaAsetustiedosto kafka2/config/server.properties Tämä tiedosto

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

aloittaazooKeeperja kafka ja kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2. Asiakashallinta

Tarkastele klusterin tietoja ja asiakasobjekteja

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Palautusarvoa voidaan käyttää katsomaan klusterin yhteyden tietoja, kuten porttitunnusta jne.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Luo aihecreateTopicsjäsentäätrueOnko aiheen luominen onnistunut vaifalse Onko se jo olemassa?Jos tapahtuu virhe, tämä menetelmä tekee poikkeuksen

Poista aiheadmin.deleteTopics Välitä poistetut aiheet

Näytä luettelo aiheistalistTopics Luetteloi kaikkien olemassa olevien aiheiden nimet ja palauttaa joukon merkkijonoja. Jos tapahtuu virhe, tämä menetelmä tekee poikkeuksen`

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Asiat

KafkaJS tukee Kafka-tapahtumia, joita voidaan käyttää tapahtumien ominaisuuksien suorittamiseen. Kafka-tapahtumia käytetään varmistamaan, että joukko toisiinsa liittyviä viestejä on joko全部成功提交要么全部回滚, mikä säilyttää tietojen johdonmukaisuuden

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29