minhas informações de contato
Correspondência[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
kafkaO conhecimento pré-requisito foi discutido em capítulos anteriores e não será repetido novamente.
Faça uma cópiakafka
Diretório completo renomeadokafka2
ReverArquivo de configuração kafka2/config/server.properties
este ficheiro
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
comecefuncionário do zoológicoe kafka e kafka2
.binwindowskafka-server-start.bat .configserver.properties
Visualize informações do cluster e objetos do cliente
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
O valor de retorno pode ser usado para visualizar informações sobre a conexão com o cluster, como ID da porta, etc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Criar tópicocreateTopics
irá analisartrue
Se o tópico foi criado com sucesso oufalse
Já existe?Se ocorrer um erro, este método lançará uma exceção
Excluir tópicoadmin.deleteTopics
Passar tópicos excluídos
Ver lista de tópicoslistTopics
Lista os nomes de todos os tópicos existentes e retorna uma matriz de strings. Se ocorrer um erro, este método lançará uma exceção`
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS fornece suporte para transações Kafka, que podem ser utilizadas para realizar operações com características transacionais. As transações Kafka são usadas para garantir que um grupo de mensagens relacionadas seja全部成功提交
,要么全部回滚
, mantendo assim a consistência dos dados
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()