le mie informazioni di contatto
Posta[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
kafkaLe conoscenze prerequisite sono state discusse nei capitoli precedenti e non verranno ripetute nuovamente.
Fare una copiakafka
Directory completa rinominatakafka2
RivedereFile di configurazione kafka2/config/server.properties
questa vita
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
avviareCustode dello zooe kafka e kafka2
.binwindowskafka-server-start.bat .configserver.properties
Visualizza le informazioni sul cluster e gli oggetti client
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Il valore restituito può essere utilizzato per visualizzare informazioni sulla connessione al cluster, come ID porta, ecc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Crea argomentocreateTopics
analizzeràtrue
Se l'argomento è stato creato correttamente ofalse
Esiste già?Se si verifica un errore, questo metodo genererà un'eccezione
Elimina argomentoadmin.deleteTopics
Passa gli argomenti eliminati
Visualizza l'elenco degli argomentilistTopics
Elenca i nomi di tutti gli argomenti esistenti e restituisce una matrice di stringhe. Se si verifica un errore, questo metodo genererà un'eccezione`
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS fornisce supporto per le transazioni Kafka, che possono essere utilizzate per eseguire operazioni con caratteristiche transazionali. Le transazioni Kafka vengono utilizzate per garantire che un gruppo di messaggi correlati lo sia全部成功提交
,要么全部回滚
, mantenendo così la coerenza dei dati
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()