informasi kontak saya
Surat[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
KafkaPengetahuan prasyarat telah dibahas pada bab-bab sebelumnya dan tidak akan terulang lagi.
Buat salinannyakafka
Direktori lengkap diganti namanyakafka2
MerevisiBerkas konfigurasi kafka2/config/server.properties
file ini
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
rintisanpenjaga kebun binatangdan kafka dan kafka2
.binwindowskafka-server-start.bat .configserver.properties
Lihat informasi cluster dan objek klien
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Nilai yang dikembalikan dapat digunakan untuk melihat informasi tentang koneksi ke cluster, seperti ID port, dll.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Buat topikcreateTopics
akan menguraikantrue
Apakah topik berhasil dibuat ataufalse
Apakah sudah ada?Jika terjadi kesalahan, metode ini akan memunculkan pengecualian
Hapus topikadmin.deleteTopics
Meneruskan topik yang dihapus
Lihat daftar topiklistTopics
Mencantumkan nama semua topik yang ada dan mengembalikan serangkaian string. Jika terjadi kesalahan, metode ini akan memunculkan pengecualian`
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS menyediakan dukungan untuk transaksi Kafka, yang dapat digunakan untuk melakukan operasi dengan karakteristik transaksional. Transaksi Kafka digunakan untuk memastikan bahwa sekelompok pesan terkait adalah salah satunya全部成功提交
,要么全部回滚
, sehingga menjaga konsistensi data
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()