2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
kafkaEdellyttäviä tietoja on käsitelty edellisissä luvuissa, eikä niitä toisteta uudelleen.
Tee kopiokafka
Täydellinen hakemisto nimetty uudelleenkafka2
TarkistaaAsetustiedosto kafka2/config/server.properties
Tämä tiedosto
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
aloittaazooKeeperja kafka ja kafka2
.binwindowskafka-server-start.bat .configserver.properties
Tarkastele klusterin tietoja ja asiakasobjekteja
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Palautusarvoa voidaan käyttää katsomaan klusterin yhteyden tietoja, kuten porttitunnusta jne.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Luo aihecreateTopics
jäsentäätrue
Onko aiheen luominen onnistunut vaifalse
Onko se jo olemassa?Jos tapahtuu virhe, tämä menetelmä tekee poikkeuksen
Poista aiheadmin.deleteTopics
Välitä poistetut aiheet
Näytä luettelo aiheistalistTopics
Luetteloi kaikkien olemassa olevien aiheiden nimet ja palauttaa joukon merkkijonoja. Jos tapahtuu virhe, tämä menetelmä tekee poikkeuksen`
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS tukee Kafka-tapahtumia, joita voidaan käyttää tapahtumien ominaisuuksien suorittamiseen. Kafka-tapahtumia käytetään varmistamaan, että joukko toisiinsa liittyviä viestejä on joko全部成功提交
,要么全部回滚
, mikä säilyttää tietojen johdonmukaisuuden
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()