моя контактная информация
Почтамезофия@protonmail.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
RabbitMQ — это широко используемый брокер сообщений с открытым исходным кодом, который поддерживает несколько протоколов обмена сообщениями и может использоваться для надежного обмена сообщениями в распределенных системах. В дополнение к базовым функциям организации очередей сообщений RabbitMQ также предоставляет расширенные функции, которые расширяют его возможности с точки зрения высокой доступности, масштабируемости и гибкости. Вот некоторые из основных расширенных функций:
Зеркальные очереди:
RabbitMQ предоставляет функцию зеркальной очереди для достижения высокой доступности очереди путем репликации состояния и сообщений очереди на несколько узлов. В случае сбоя главного узла возможно плавное переключение на узел реплики в зеркальной очереди.
Кластерный режим:
RabbitMQ может работать в режиме кластера, распределяя очереди и обменники на нескольких узлах для повышения доступности и масштабируемости системы. Узлы в кластере могут взаимодействовать друг с другом и обмениваться метаданными сообщений и очередей.
Подтверждения сообщений:
Потребители могут подтвердить обработанные сообщения, чтобы гарантировать, что они не потеряются. Если сообщение не подтверждено, RabbitMQ помещает его обратно в очередь для обработки другими потребителями.
Транзакции:
RabbitMQ поддерживает режим транзакций AMQP, который позволяет производителям публиковать сообщения и подтверждать сообщения внутри транзакции, чтобы гарантировать атомарность и согласованность сообщений.
Постоянные сообщения:
RabbitMQ позволяет помечать сообщения как постоянные, чтобы гарантировать, что сообщения не будут потеряны после перезапуска брокера. Постоянные сообщения записываются на диск, а не просто сохраняются в памяти.
Устойчивые очереди:
Постоянная очередь сохраняется после перезапуска брокера, гарантируя, что метаданные очереди не будут потеряны.
Пакетные подтверждения:
Позвольте потребителям подтверждать сообщения в пакетном режиме, сократите нагрузку на сеть и операции ввода-вывода, а также улучшите пропускную способность.
Количество предварительной выборки:
Установив счетчик предварительной выборки, потребитель может управлять одновременной обработкой сообщений, извлекая новые сообщения из очереди после обработки указанного количества сообщений.
Система плагинов:
RabbitMQ предоставляет гибкую систему плагинов, и пользователи могут загружать и выгружать плагины для расширения функциональности. Например, плагин Shovel используется для пересылки сообщений между кластерами, а плагин Federation — для доставки сообщений, распределенных по географическим местоположениям.
Плагин управления:
Предоставляет веб-интерфейс пользователя для мониторинга и управления экземплярами RabbitMQ, включая просмотр состояния очереди, конфигурации коммутатора, скорости передачи сообщений и т. д.
TLS/SSL-шифрование:
RabbitMQ поддерживает использование TLS/SSL для шифрования передачи сообщений, чтобы гарантировать безопасность сообщений во время передачи.
Контроль доступа:
RabbitMQ предоставляет механизм контроля доступа на основе пользователей, ролей и разрешений, позволяя администраторам настраивать детальные разрешения доступа.
Различные виды бирж (Биржи):
RabbitMQ поддерживает несколько типов переключателей, включая переключатели Direct, Topic, Fanout и Headers для удовлетворения различных потребностей маршрутизации сообщений.
Привязки:
Подключайте очереди и обмены через привязки для реализации сложных стратегий маршрутизации сообщений.
Индикаторы мониторинга (Метрики):
RabbitMQ предоставляет подробные индикаторы мониторинга, включая скорость сообщений, длину очереди, количество соединений и т. д., чтобы помочь администраторам понять рабочее состояние системы.
Сигналы тревоги и уведомления:
RabbitMQ может настроить сигналы тревоги для запуска уведомлений, когда длина очереди превышает пороговое значение или происходит сбой узла.
Обмен недоставленными письмами и очереди:
Если сообщение не может быть использовано или превышено количество повторов, оно может быть перенаправлено в очередь недоставленных писем для дальнейшей обработки.
Повторить сообщение:
Поддерживает настройку стратегий повтора сообщений, чтобы гарантировать возможность повторного использования в случае сбоя использования.
Эти расширенные функции делают RabbitMQ мощным и гибким промежуточным программным обеспечением для обмена сообщениями, подходящим для различных сложных распределенных систем и сценариев приложений. Рационально используя эти функции, можно построить высокопроизводительную, высокодоступную и масштабируемую систему обмена сообщениями.
Сначала убедитесь, что вы добавили в свой проект зависимость Spring AMQP:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
существовать application.properties
Настройте информацию о соединении RabbitMQ в файле:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
В Spring очереди, обменники и отношения привязки можно определить через @Bean:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
static final String queueName = "testQueue";
static final String exchangeName = "testExchange";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
Потребители могут вручную подтверждать сообщения, чтобы обеспечить надежность обработки сообщений.использовать@RabbitListener
При аннотации вы можете настроитьacknowledgeMode
дляMANUAL
и подтвердите сообщение вручную в методе:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;
public class RabbitMQReceiver {
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理消息
System.out.println("Received message: " + new String(message.getBody()));
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息
channel.basicNack(tag, false, true);
}
}
}
Реализовать поддержку транзакций через RabbitTemplate:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendMessage(String message) {
// 发送消息
rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
// 模拟事务回滚
if (message.contains("error")) {
throw new RuntimeException("Error occurred");
}
}
}
Настройте очередь недоставленных писем и ее привязки:
@Bean
Queue dlq() {
return new Queue("dlq", true);
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}
@Bean
Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("mainQueue", true, false, false, args);
}
Используйте плагин для реализации очереди с задержкой, и вы можете добиться отложенной доставки сообщений, настроив TTL (время жизни) сообщения:
@Bean
Queue delayedQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("delayedQueue", true, false, false, args);
}
через конфигурацию SimpleRabbitListenerContainerFactory
Реализуйте параллельных потребителей:
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
Воспользуйтесь преимуществами функций подключаемого модуля RabbitMQ, например, используйте подключаемый модуль Shovel для пересылки сообщений между кластерами или используйте подключаемый модуль управления для мониторинга и управления.