技術共有

Nodejs 第 80 章 (Kafka 上級)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

ここに画像の説明を挿入します

カフカ前提条件となる知識については前の章で説明したので、繰り返しません。

Kafka クラスターの操作

1. 複数の Kafka サービスを作成する

コピーを作成するkafka完全なディレクトリの名前が変更されましたkafka2

改訂設定ファイル kafka2/config/server.properties このファイル

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

起動する動物園の飼育係そしてカフカとカフカ2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.顧客管理

クラスター情報とクライアント オブジェクトを表示する

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

戻り値を使用して、ポート ID などのクラスターへの接続に関する情報を表示できます。

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

トピックの作成createTopics解析しますtrueトピックが正常に作成されたかどうかfalseすでに存在していますか?エラーが発生した場合、このメソッドは例外をスローします。

トピックの削除admin.deleteTopics 削除されたトピックを渡す

トピックのリストを見るlistTopics既存のすべてのトピックの名前をリストし、文字列の配列を返します。エラーが発生した場合、このメソッドは例外をスローします。

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3.事務

KafkaJS は、トランザクション特性を持つ操作を実行するために使用できる Kafka トランザクションのサポートを提供します。 Kafka トランザクションは、関連するメッセージのグループが次のいずれかであることを確認するために使用されます。全部成功提交要么全部回滚、それによりデータの一貫性が維持されます

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29