私の連絡先情報
郵便メール:
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
カフカ前提条件となる知識については前の章で説明したので、繰り返しません。
コピーを作成するkafka
完全なディレクトリの名前が変更されましたkafka2
改訂設定ファイル kafka2/config/server.properties
このファイル
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
起動する動物園の飼育係そしてカフカとカフカ2
.binwindowskafka-server-start.bat .configserver.properties
クラスター情報とクライアント オブジェクトを表示する
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
戻り値を使用して、ポート ID などのクラスターへの接続に関する情報を表示できます。
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
トピックの作成createTopics
解析しますtrue
トピックが正常に作成されたかどうかfalse
すでに存在していますか?エラーが発生した場合、このメソッドは例外をスローします。
トピックの削除admin.deleteTopics
削除されたトピックを渡す
トピックのリストを見るlistTopics
既存のすべてのトピックの名前をリストし、文字列の配列を返します。エラーが発生した場合、このメソッドは例外をスローします。
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS は、トランザクション特性を持つ操作を実行するために使用できる Kafka トランザクションのサポートを提供します。 Kafka トランザクションは、関連するメッセージのグループが次のいずれかであることを確認するために使用されます。全部成功提交
,要么全部回滚
、それによりデータの一貫性が維持されます
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()