моя контактная информация
Почтамезофия@protonmail.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
кафкаНеобходимые знания обсуждались в предыдущих главах и больше не будут повторяться.
Сделать копиюkafka
Полный каталог переименованkafka2
ИсправлятьКонфигурационный файл kafka2/config/server.properties
этот файл
broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源
запускатьработник зоопаркаи Кафка и Кафка2
.binwindowskafka-server-start.bat .configserver.properties
Просмотр информации о кластере и клиентских объектах
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息
Возвращаемое значение можно использовать для просмотра информации о подключении к кластеру, например идентификатора порта и т. д.
{
brokers: [
{ nodeId: 0, host: '26.26.26.1', port: 9092 },
{ nodeId: 1, host: '26.26.26.1', port: 9093 }
],
controller: 0,
clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}
Создать темуcreateTopics
разберуtrue
Была ли тема успешно создана илиfalse
Оно уже существует?Если произойдет ошибка, этот метод выдаст исключение
Удалить темуadmin.deleteTopics
Перенести удаленные темы
Посмотреть список темlistTopics
Перечисляет названия всех существующих тем и возвращает массив строк. Если произойдет ошибка, этот метод выдаст исключение`
//创建主题
await admin.createTopics({
topics: [
{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
console.log('topics', topics)
})
KafkaJS обеспечивает поддержку транзакций Kafka, которые можно использовать для выполнения операций с транзакционными характеристиками. Транзакции Kafka используются для обеспечения того, чтобы группа связанных сообщений была либо全部成功提交
,要么全部回滚
, тем самым поддерживая согласованность данных
import { Kafka, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app', //客户端标识
brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})
//生产者
const producer = kafka.producer({
transactionalId: '填写事务ID',
maxInFlightRequests: 1, //最大同时发送请求数
idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'xiaoman',
messages: [{ value: '100元' }],
})
await transaction.commit() // 事务提交
}
catch (e) {
console.log(e)
await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()