Mi informacion de contacto
Correo[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
KafkaLos conocimientos previos se han analizado en capítulos anteriores y no se repetirán nuevamente.
Hacer una copiakafka
Directorio completo renombradokafka2
RevisarArchivo de configuración kafka2/config/server.properties
Este archivo
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
puesta en marchaGuardián del zoológicoy kafka y kafka2
.binwindowskafka-server-start.bat .configserver.properties
Ver información del clúster y objetos del cliente.
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
El valor de retorno se puede utilizar para ver información sobre la conexión al clúster, como el ID del puerto, etc.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Crear temacreateTopics
analizarátrue
Si el tema se creó correctamente ofalse
¿Ya existe?Si ocurre un error, este método generará una excepción.
Eliminar temaadmin.deleteTopics
Pasar temas eliminados
Ver lista de temaslistTopics
Enumera los nombres de todos los temas existentes y devuelve una serie de cadenas. Si ocurre un error, este método generará una excepción.
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS brinda soporte para transacciones Kafka, que se pueden utilizar para realizar operaciones con características transaccionales. Las transacciones Kafka se utilizan para garantizar que un grupo de mensajes relacionados sea全部成功提交
,要么全部回滚
, manteniendo así la coherencia de los datos
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()