Technologieaustausch

Nodejs Kapitel 80 (Kafka Advanced)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Fügen Sie hier eine Bildbeschreibung ein

KafkaDie erforderlichen Kenntnisse wurden in den vorherigen Kapiteln besprochen und werden nicht noch einmal wiederholt.

Kafka-Cluster-Operationen

1. Erstellen Sie mehrere Kafka-Dienste

Eine Kopie machenkafkaKomplettes Verzeichnis umbenanntkafka2

ÜberarbeitenKonfigurationsdatei kafka2/config/server.properties Diese Datei

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

Start-upzooKeeperund Kafka und Kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.Kundenverwaltung

Clusterinformationen und Clientobjekte anzeigen

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Der Rückgabewert kann verwendet werden, um Informationen über die Verbindung zum Cluster anzuzeigen, z. B. Port-ID usw.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Thema erstellencreateTopicswird analysierentrueOb das Thema erfolgreich erstellt wurde oderfalse Existiert es bereits?Wenn ein Fehler auftritt, löst diese Methode eine Ausnahme aus

Thema löschenadmin.deleteTopics Übergeben Sie gelöschte Themen

Themenliste anzeigenlistTopics Listet die Namen aller vorhandenen Themen auf und gibt ein Array von Zeichenfolgen zurück. Wenn ein Fehler auftritt, löst diese Methode eine Ausnahme aus

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Angelegenheiten

KafkaJS bietet Unterstützung für Kafka-Transaktionen, mit denen Vorgänge mit Transaktionsmerkmalen ausgeführt werden können. Kafka-Transaktionen werden verwendet, um sicherzustellen, dass eine Gruppe zusammengehöriger Nachrichten vorhanden ist全部成功提交要么全部回滚, wodurch die Datenkonsistenz gewahrt bleibt

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29