τα στοιχεία επικοινωνίας μου
Ταχυδρομείο[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
κάφκαΟι προαπαιτούμενες γνώσεις έχουν συζητηθεί σε προηγούμενα κεφάλαια και δεν θα επαναληφθούν ξανά.
Κάνε ένα αντίγραφοkafka
Ο πλήρης κατάλογος μετονομάστηκεkafka2
ΑναθεωρώΑρχείο διαμόρφωσης kafka2/config/server.properties
αυτό το αρχείο
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
ξεκίναΖωοφύλακαςκαι κάφκα και κάφκα2
.binwindowskafka-server-start.bat .configserver.properties
Προβολή πληροφοριών συμπλέγματος και αντικειμένων πελάτη
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Η τιμή επιστροφής μπορεί να χρησιμοποιηθεί για την προβολή πληροφοριών σχετικά με τη σύνδεση στο σύμπλεγμα, όπως το αναγνωριστικό θύρας κ.λπ.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Δημιουργία θέματοςcreateTopics
θα αναλύσειtrue
Εάν το θέμα δημιουργήθηκε με επιτυχία ήfalse
Υπάρχει ήδη;Εάν παρουσιαστεί σφάλμα, αυτή η μέθοδος θα δημιουργήσει μια εξαίρεση
Διαγραφή θέματοςadmin.deleteTopics
Περάστε σε διαγραμμένα θέματα
Προβολή λίστας θεμάτωνlistTopics
Παραθέτει τα ονόματα όλων των υπαρχόντων θεμάτων και επιστρέφει μια σειρά από συμβολοσειρές. Εάν παρουσιαστεί σφάλμα, αυτή η μέθοδος θα δημιουργήσει μια εξαίρεση».
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
Το KafkaJS παρέχει υποστήριξη για συναλλαγές Kafka, οι οποίες μπορούν να χρησιμοποιηθούν για την εκτέλεση λειτουργιών με συναλλακτικά χαρακτηριστικά. Οι συναλλαγές Kafka χρησιμοποιούνται για να διασφαλιστεί ότι μια ομάδα σχετικών μηνυμάτων είναι είτε全部成功提交
,要么全部回滚
, διατηρώντας έτσι τη συνοχή των δεδομένων
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()