Κοινή χρήση τεχνολογίας

Nodejs Κεφάλαιο 80 (Κάφκα για προχωρημένους)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Εισαγάγετε την περιγραφή της εικόνας εδώ

κάφκαΟι προαπαιτούμενες γνώσεις έχουν συζητηθεί σε προηγούμενα κεφάλαια και δεν θα επαναληφθούν ξανά.

Λειτουργίες συμπλέγματος Κάφκα

1. Δημιουργήστε πολλαπλές υπηρεσίες kafka

Κάνε ένα αντίγραφοkafkaΟ πλήρης κατάλογος μετονομάστηκεkafka2

ΑναθεωρώΑρχείο διαμόρφωσης kafka2/config/server.properties αυτό το αρχείο

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
  • 1
  • 2
  • 3

ξεκίναΖωοφύλακαςκαι κάφκα και κάφκα2

.binwindowskafka-server-start.bat .configserver.properties
  • 1

2.Διαχείριση πελατών

Προβολή πληροφοριών συμπλέγματος και αντικειμένων πελάτη

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Η τιμή επιστροφής μπορεί να χρησιμοποιηθεί για την προβολή πληροφοριών σχετικά με τη σύνδεση στο σύμπλεγμα, όπως το αναγνωριστικό θύρας κ.λπ.

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Δημιουργία θέματοςcreateTopicsθα αναλύσειtrueΕάν το θέμα δημιουργήθηκε με επιτυχία ήfalse Υπάρχει ήδη;Εάν παρουσιαστεί σφάλμα, αυτή η μέθοδος θα δημιουργήσει μια εξαίρεση

Διαγραφή θέματοςadmin.deleteTopics Περάστε σε διαγραμμένα θέματα

Προβολή λίστας θεμάτωνlistTopics Παραθέτει τα ονόματα όλων των υπαρχόντων θεμάτων και επιστρέφει μια σειρά από συμβολοσειρές. Εάν παρουσιαστεί σφάλμα, αυτή η μέθοδος θα δημιουργήσει μια εξαίρεση».

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3. Υποθέσεις

Το KafkaJS παρέχει υποστήριξη για συναλλαγές Kafka, οι οποίες μπορούν να χρησιμοποιηθούν για την εκτέλεση λειτουργιών με συναλλακτικά χαρακτηριστικά. Οι συναλλαγές Kafka χρησιμοποιούνται για να διασφαλιστεί ότι μια ομάδα σχετικών μηνυμάτων είναι είτε全部成功提交要么全部回滚, διατηρώντας έτσι τη συνοχή των δεδομένων

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29