Partage de technologie

Nodejs Chapitre 80 (Kafka avancé)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Insérer la description de l'image ici

KafkaLes connaissances préalables ont été abordées dans les chapitres précédents et ne seront pas répétées.

Opérations du cluster Kafka

1. Créez plusieurs services Kafka

Faire une copiekafkaRépertoire complet renommékafka2

RéviserFichier de configuration kafka2/config/server.properties ce fichier

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

démarrerGardien de zooet kafka et kafka2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.Gestion des clients

Afficher les informations sur le cluster et les objets client

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

La valeur de retour peut être utilisée pour afficher des informations sur la connexion au cluster, telles que l'ID de port, etc.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Créer un sujetcreateTopicsva analysertrueSi le sujet a été créé avec succès oufalse Est-ce que ça existe déjà ?Si une erreur se produit, cette méthode lèvera une exception

Supprimer le sujetadmin.deleteTopics Transmettre les sujets supprimés

Afficher la liste des sujetslistTopics Répertorie les noms de tous les sujets existants et renvoie un tableau de chaînes. Si une erreur se produit, cette méthode lèvera une exception

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Affaires

KafkaJS prend en charge les transactions Kafka, qui peuvent être utilisées pour effectuer des opérations présentant des caractéristiques transactionnelles. Les transactions Kafka sont utilisées pour garantir qu'un groupe de messages associés est soit全部成功提交要么全部回滚, maintenant ainsi la cohérence des données

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29