2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
kafkaPraecognita cognitio in praecedentibus capitibus discussa est et iterum non repetitur.
Fac exemplumkafka
Complete Directory renamedkafka2
ReviseConfigurationis file kafka2/config/server.properties
haec file
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
Start upzooKeeperet kafka et kafka2
.binwindowskafka-server-start.bat .configserver.properties
View botrum portassent notitia et client obiecti
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Reditus valor adhiberi potest informationes de nexu ad botrum, ut portum ID, etc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Create topiccreateTopics
et parsetrue
Utrum topic bene creatus est velfalse
Iamne est?Si error incidit, haec methodus exceptionem mittet
Delere topicadmin.deleteTopics
Transi in delevit thema
Visum index rerumlistTopics
Enumerat nomina omnium locorum exsistentium et chordarum ordinem reddit. Si error incidit, methodus hanc exceptionem` mittet
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS subsidium praebet transactionibus Kafka, quae operationes cum notis transactionalibus praestare possunt. Kafka transactions sunt ut coetus paginarum affinium vel est全部成功提交
,要么全部回滚
, Ita servans data constantia
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()