Обмен технологиями

Nodejs Глава 80 (Kafka Advanced)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Вставьте сюда описание изображения

кафкаНеобходимые знания обсуждались в предыдущих главах и больше не будут повторяться.

Операции кластера Kafka

1. Создайте несколько сервисов Kafka.

Сделать копиюkafkaПолный каталог переименованkafka2

ИсправлятьКонфигурационный файл kafka2/config/server.properties этот файл

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

запускатьработник зоопаркаи Кафка и Кафка2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2. Управление клиентами

Просмотр информации о кластере и клиентских объектах

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Возвращаемое значение можно использовать для просмотра информации о подключении к кластеру, например идентификатора порта и т. д.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Создать темуcreateTopicsразберуtrueБыла ли тема успешно создана илиfalse Оно уже существует?Если произойдет ошибка, этот метод выдаст исключение

Удалить темуadmin.deleteTopics Перенести удаленные темы

Посмотреть список темlistTopics Перечисляет названия всех существующих тем и возвращает массив строк. Если произойдет ошибка, этот метод выдаст исключение`

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Дела

KafkaJS обеспечивает поддержку транзакций Kafka, которые можно использовать для выполнения операций с транзакционными характеристиками. Транзакции Kafka используются для обеспечения того, чтобы группа связанных сообщений была либо全部成功提交要么全部回滚, тем самым поддерживая согласованность данных

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29