Compartilhamento de tecnologia

Nodejs Capítulo 80 (Kafka Avançado)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Insira a descrição da imagem aqui

kafkaO conhecimento pré-requisito foi discutido em capítulos anteriores e não será repetido novamente.

Operações de cluster Kafka

1. Crie vários serviços kafka

Faça uma cópiakafkaDiretório completo renomeadokafka2

ReverArquivo de configuração kafka2/config/server.properties este ficheiro

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

comecefuncionário do zoológicoe kafka e kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.Gestão de clientes

Visualize informações do cluster e objetos do cliente

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

O valor de retorno pode ser usado para visualizar informações sobre a conexão com o cluster, como ID da porta, etc.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Criar tópicocreateTopicsirá analisartrueSe o tópico foi criado com sucesso oufalse Já existe?Se ocorrer um erro, este método lançará uma exceção

Excluir tópicoadmin.deleteTopics Passar tópicos excluídos

Ver lista de tópicoslistTopics Lista os nomes de todos os tópicos existentes e retorna uma matriz de strings. Se ocorrer um erro, este método lançará uma exceção`

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Assuntos

KafkaJS fornece suporte para transações Kafka, que podem ser utilizadas para realizar operações com características transacionais. As transações Kafka são usadas para garantir que um grupo de mensagens relacionadas seja全部成功提交要么全部回滚, mantendo assim a consistência dos dados

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29