2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
KafkaLes connaissances préalables ont été abordées dans les chapitres précédents et ne seront pas répétées.
Faire une copiekafka
Répertoire complet renommékafka2
RéviserFichier de configuration kafka2/config/server.properties
ce fichier
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
démarrerGardien de zooet kafka et kafka2
.binwindowskafka-server-start.bat .configserver.properties
Afficher les informations sur le cluster et les objets client
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
La valeur de retour peut être utilisée pour afficher des informations sur la connexion au cluster, telles que l'ID de port, etc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Créer un sujetcreateTopics
va analysertrue
Si le sujet a été créé avec succès oufalse
Est-ce que ça existe déjà ?Si une erreur se produit, cette méthode lèvera une exception
Supprimer le sujetadmin.deleteTopics
Transmettre les sujets supprimés
Afficher la liste des sujetslistTopics
Répertorie les noms de tous les sujets existants et renvoie un tableau de chaînes. Si une erreur se produit, cette méthode lèvera une exception
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS prend en charge les transactions Kafka, qui peuvent être utilisées pour effectuer des opérations présentant des caractéristiques transactionnelles. Les transactions Kafka sont utilisées pour garantir qu'un groupe de messages associés est soit全部成功提交
,要么全部回滚
, maintenant ainsi la cohérence des données
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()