Berbagi teknologi

Nodejs Bab 80 (Kafka Tingkat Lanjut)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Masukkan deskripsi gambar di sini

KafkaPengetahuan prasyarat telah dibahas pada bab-bab sebelumnya dan tidak akan terulang lagi.

Operasi klaster Kafka

1. Buat beberapa layanan kafka

Buat salinannyakafkaDirektori lengkap diganti namanyakafka2

MerevisiBerkas konfigurasi kafka2/config/server.properties file ini

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

rintisanpenjaga kebun binatangdan kafka dan kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.Manajemen klien

Lihat informasi cluster dan objek klien

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Nilai yang dikembalikan dapat digunakan untuk melihat informasi tentang koneksi ke cluster, seperti ID port, dll.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Buat topikcreateTopicsakan menguraikantrueApakah topik berhasil dibuat ataufalse Apakah sudah ada?Jika terjadi kesalahan, metode ini akan memunculkan pengecualian

Hapus topikadmin.deleteTopics Meneruskan topik yang dihapus

Lihat daftar topiklistTopics Mencantumkan nama semua topik yang ada dan mengembalikan serangkaian string. Jika terjadi kesalahan, metode ini akan memunculkan pengecualian`

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Urusan

KafkaJS menyediakan dukungan untuk transaksi Kafka, yang dapat digunakan untuk melakukan operasi dengan karakteristik transaksional. Transaksi Kafka digunakan untuk memastikan bahwa sekelompok pesan terkait adalah salah satunya全部成功提交要么全部回滚, sehingga menjaga konsistensi data

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29