2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
RabbitMQ est un courtier de messages open source largement utilisé qui prend en charge plusieurs protocoles de messagerie et peut être utilisé pour une messagerie fiable dans les systèmes distribués. En plus des fonctionnalités de base de mise en file d'attente des messages, RabbitMQ fournit également des fonctionnalités avancées qui améliorent ses capacités en termes de haute disponibilité, d'évolutivité et de flexibilité. Voici quelques-unes des principales fonctionnalités avancées :
Files d'attente en miroir:
RabbitMQ fournit une fonction de file d'attente miroir pour obtenir une haute disponibilité de la file d'attente en répliquant l'état et les messages de la file d'attente sur plusieurs nœuds. Si le nœud maître tombe en panne, un basculement transparent vers le nœud de réplique sur la file d'attente miroir est possible.
Mode cluster:
RabbitMQ peut fonctionner en mode cluster, distribuant des files d'attente et des échangeurs sur plusieurs nœuds pour améliorer la disponibilité et l'évolutivité du système. Les nœuds du cluster peuvent communiquer entre eux et partager des métadonnées de messages et de files d'attente.
Accusés de réception des messages:
Les consommateurs peuvent accuser réception des messages traités pour s'assurer qu'ils ne sont pas perdus. Si le message n'est pas reconnu, RabbitMQ le remet dans la file d'attente pour traitement par d'autres consommateurs.
Transactions:
RabbitMQ prend en charge le mode de transaction AMQP, qui permet aux producteurs de publier des messages et de confirmer les messages au sein d'une transaction pour garantir l'atomicité et la cohérence des messages.
Messages persistants:
RabbitMQ permet aux messages d'être marqués comme persistants pour garantir qu'ils ne sont pas perdus après le redémarrage du courtier. Les messages persistants sont écrits sur le disque plutôt que simplement stockés en mémoire.
Files d'attente durables:
La file d'attente persistante persiste après le redémarrage du courtier, garantissant ainsi que les métadonnées de la file d'attente ne sont pas perdues.
Accusés de réception par lots:
Permettez aux consommateurs de confirmer les messages par lots, réduisez la surcharge du réseau et des E/S et améliorez le débit.
Nombre de prélecture:
En définissant le nombre de prélecture, le consommateur peut contrôler le traitement simultané des messages en récupérant de nouveaux messages dans la file d'attente après avoir traité un nombre spécifié de messages.
Système de plugins:
RabbitMQ fournit un système de plug-ins flexible et les utilisateurs peuvent charger et décharger des plug-ins pour augmenter les fonctionnalités. Par exemple, le plugin Shovel est utilisé pour transférer des messages entre clusters, et le plugin Federation est utilisé pour transmettre des messages distribués sur des emplacements géographiques.
Plugin de gestion:
Fournit une interface utilisateur Web pour surveiller et gérer les instances RabbitMQ, y compris l'affichage de l'état de la file d'attente, de la configuration du commutateur, du débit des messages, etc.
Cryptage TLS/SSL:
RabbitMQ prend en charge l'utilisation de TLS/SSL pour le cryptage de la transmission des messages afin de garantir la sécurité des messages pendant la transmission.
Contrôle d'accès:
RabbitMQ fournit un mécanisme de contrôle d'accès basé sur les utilisateurs, les rôles et les autorisations, permettant aux administrateurs de configurer des autorisations d'accès plus précises.
Différents types d'échanges (Exchanges):
RabbitMQ prend en charge plusieurs types de commutateurs, notamment les commutateurs Direct, Topic, Fanout et Headers, pour répondre aux différents besoins de routage des messages.
Reliures:
Connectez les files d'attente et les échanges via des liaisons pour mettre en œuvre des stratégies complexes de routage des messages.
Indicateurs de suivi (Metrics):
RabbitMQ fournit des indicateurs de surveillance détaillés, notamment le débit des messages, la longueur de la file d'attente, le nombre de connexions, etc., pour aider les administrateurs à comprendre l'état de fonctionnement du système.
Alarmes et notifications:
RabbitMQ peut configurer des alarmes pour déclencher des notifications lorsque la longueur de la file d'attente dépasse un seuil ou qu'un nœud tombe en panne.
Échanges de lettres mortes et files d'attente:
Lorsqu'un message ne peut pas être consommé ou que le nombre de tentatives est dépassé, il peut être transmis à la file d'attente des lettres mortes pour un traitement ultérieur.
Nouvelle tentative de message:
Prend en charge la configuration de stratégies de nouvelle tentative de message pour garantir que la consommation peut être réessayée en cas d'échec de la consommation.
Ces fonctionnalités avancées font de RabbitMQ un middleware de messagerie puissant et flexible, adapté à divers systèmes distribués et scénarios d'application complexes. En utilisant rationnellement ces fonctions, un système de messagerie performant, hautement disponible et évolutif peut être construit.
Tout d’abord, assurez-vous d’avoir introduit la dépendance Spring AMQP dans votre projet :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
exister application.properties
Configurez les informations de connexion RabbitMQ dans le fichier :
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Dans Spring, les files d'attente, les échangeurs et les relations de liaison peuvent être définis via @Bean :
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
static final String queueName = "testQueue";
static final String exchangeName = "testExchange";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
Les consommateurs peuvent confirmer manuellement les messages pour garantir la fiabilité du traitement des messages.utiliser@RabbitListener
Lors de l'annotation, vous pouvez configureracknowledgeMode
pourMANUAL
, et confirmez le message manuellement dans la méthode :
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;
public class RabbitMQReceiver {
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理消息
System.out.println("Received message: " + new String(message.getBody()));
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息
channel.basicNack(tag, false, true);
}
}
}
Implémentez la prise en charge des transactions via RabbitTemplate :
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendMessage(String message) {
// 发送消息
rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
// 模拟事务回滚
if (message.contains("error")) {
throw new RuntimeException("Error occurred");
}
}
}
Configurez la file d'attente de lettres mortes et ses liaisons :
@Bean
Queue dlq() {
return new Queue("dlq", true);
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}
@Bean
Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("mainQueue", true, false, false, args);
}
Utilisez un plug-in pour implémenter une file d'attente différée et vous pouvez obtenir une livraison retardée des messages en configurant le TTL (Time To Live) du message :
@Bean
Queue delayedQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("delayedQueue", true, false, false, args);
}
via la configuration SimpleRabbitListenerContainerFactory
Implémentez des consommateurs simultanés :
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
Profitez des fonctions du plug-in RabbitMQ, telles que l'utilisation du plug-in Shovel pour transférer des messages entre clusters ou l'utilisation du plug-in de gestion pour la surveillance et la gestion.