2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
KafkaDie erforderlichen Kenntnisse wurden in den vorherigen Kapiteln besprochen und werden nicht noch einmal wiederholt.
Eine Kopie machenkafka
Komplettes Verzeichnis umbenanntkafka2
ÜberarbeitenKonfigurationsdatei kafka2/config/server.properties
Diese Datei
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
Start-upzooKeeperund Kafka und Kafka2
.binwindowskafka-server-start.bat .configserver.properties
Clusterinformationen und Clientobjekte anzeigen
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Der Rückgabewert kann verwendet werden, um Informationen über die Verbindung zum Cluster anzuzeigen, z. B. Port-ID usw.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Thema erstellencreateTopics
wird analysierentrue
Ob das Thema erfolgreich erstellt wurde oderfalse
Existiert es bereits?Wenn ein Fehler auftritt, löst diese Methode eine Ausnahme aus
Thema löschenadmin.deleteTopics
Übergeben Sie gelöschte Themen
Themenliste anzeigenlistTopics
Listet die Namen aller vorhandenen Themen auf und gibt ein Array von Zeichenfolgen zurück. Wenn ein Fehler auftritt, löst diese Methode eine Ausnahme aus
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS bietet Unterstützung für Kafka-Transaktionen, mit denen Vorgänge mit Transaktionsmerkmalen ausgeführt werden können. Kafka-Transaktionen werden verwendet, um sicherzustellen, dass eine Gruppe zusammengehöriger Nachrichten vorhanden ist全部成功提交
,要么全部回滚
, wodurch die Datenkonsistenz gewahrt bleibt
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()