Partage de technologie

Fonctionnalités avancées de RabbitMQ

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

RabbitMQ est un courtier de messages open source largement utilisé qui prend en charge plusieurs protocoles de messagerie et peut être utilisé pour une messagerie fiable dans les systèmes distribués. En plus des fonctionnalités de base de mise en file d'attente des messages, RabbitMQ fournit également des fonctionnalités avancées qui améliorent ses capacités en termes de haute disponibilité, d'évolutivité et de flexibilité. Voici quelques-unes des principales fonctionnalités avancées :

1. Haute disponibilité

  • Files d'attente en miroir
    RabbitMQ fournit une fonction de file d'attente miroir pour obtenir une haute disponibilité de la file d'attente en répliquant l'état et les messages de la file d'attente sur plusieurs nœuds. Si le nœud maître tombe en panne, un basculement transparent vers le nœud de réplique sur la file d'attente miroir est possible.

  • Mode cluster
    RabbitMQ peut fonctionner en mode cluster, distribuant des files d'attente et des échangeurs sur plusieurs nœuds pour améliorer la disponibilité et l'évolutivité du système. Les nœuds du cluster peuvent communiquer entre eux et partager des métadonnées de messages et de files d'attente.

2. Cohérence des messages

  • Accusés de réception des messages
    Les consommateurs peuvent accuser réception des messages traités pour s'assurer qu'ils ne sont pas perdus. Si le message n'est pas reconnu, RabbitMQ le remet dans la file d'attente pour traitement par d'autres consommateurs.

  • Transactions
    RabbitMQ prend en charge le mode de transaction AMQP, qui permet aux producteurs de publier des messages et de confirmer les messages au sein d'une transaction pour garantir l'atomicité et la cohérence des messages.

3. Durabilité des messages

  • Messages persistants
    RabbitMQ permet aux messages d'être marqués comme persistants pour garantir qu'ils ne sont pas perdus après le redémarrage du courtier. Les messages persistants sont écrits sur le disque plutôt que simplement stockés en mémoire.

  • Files d'attente durables
    La file d'attente persistante persiste après le redémarrage du courtier, garantissant ainsi que les métadonnées de la file d'attente ne sont pas perdues.

4. Haut débit et concurrence

  • Accusés de réception par lots
    Permettez aux consommateurs de confirmer les messages par lots, réduisez la surcharge du réseau et des E/S et améliorez le débit.

  • Nombre de prélecture
    En définissant le nombre de prélecture, le consommateur peut contrôler le traitement simultané des messages en récupérant de nouveaux messages dans la file d'attente après avoir traité un nombre spécifié de messages.

5. Plugins et extensions

  • Système de plugins
    RabbitMQ fournit un système de plug-ins flexible et les utilisateurs peuvent charger et décharger des plug-ins pour augmenter les fonctionnalités. Par exemple, le plugin Shovel est utilisé pour transférer des messages entre clusters, et le plugin Federation est utilisé pour transmettre des messages distribués sur des emplacements géographiques.

  • Plugin de gestion
    Fournit une interface utilisateur Web pour surveiller et gérer les instances RabbitMQ, y compris l'affichage de l'état de la file d'attente, de la configuration du commutateur, du débit des messages, etc.

6. Sécurité

  • Cryptage TLS/SSL
    RabbitMQ prend en charge l'utilisation de TLS/SSL pour le cryptage de la transmission des messages afin de garantir la sécurité des messages pendant la transmission.

  • Contrôle d'accès
    RabbitMQ fournit un mécanisme de contrôle d'accès basé sur les utilisateurs, les rôles et les autorisations, permettant aux administrateurs de configurer des autorisations d'accès plus précises.

7. Routage et échange de messages

  • Différents types d'échanges (Exchanges)
    RabbitMQ prend en charge plusieurs types de commutateurs, notamment les commutateurs Direct, Topic, Fanout et Headers, pour répondre aux différents besoins de routage des messages.

  • Reliures
    Connectez les files d'attente et les échanges via des liaisons pour mettre en œuvre des stratégies complexes de routage des messages.

8. Surveillance et gestion

  • Indicateurs de suivi (Metrics)
    RabbitMQ fournit des indicateurs de surveillance détaillés, notamment le débit des messages, la longueur de la file d'attente, le nombre de connexions, etc., pour aider les administrateurs à comprendre l'état de fonctionnement du système.

  • Alarmes et notifications
    RabbitMQ peut configurer des alarmes pour déclencher des notifications lorsque la longueur de la file d'attente dépasse un seuil ou qu'un nœud tombe en panne.

9. Nouvelles tentatives de messages et files d'attente de lettres mortes (files d'attente de nouvelles tentatives et de lettres mortes)

  • Échanges de lettres mortes et files d'attente
    Lorsqu'un message ne peut pas être consommé ou que le nombre de tentatives est dépassé, il peut être transmis à la file d'attente des lettres mortes pour un traitement ultérieur.

  • Nouvelle tentative de message
    Prend en charge la configuration de stratégies de nouvelle tentative de message pour garantir que la consommation peut être réessayée en cas d'échec de la consommation.

10. Cloud hybride et cross-datacenter

  • Réplication entre centres de données
    Grâce à des plug-ins ou à une configuration manuelle, RabbitMQ prend en charge la réplication des messages entre différents centres de données pour garantir une haute disponibilité des données et des capacités de reprise après sinistre.

Ces fonctionnalités avancées font de RabbitMQ un middleware de messagerie puissant et flexible, adapté à divers systèmes distribués et scénarios d'application complexes. En utilisant rationnellement ces fonctions, un système de messagerie performant, hautement disponible et évolutif peut être construit.

Comment implémenter des fonctions avancées communes au printemps :

1. Installation et configuration

Tout d’abord, assurez-vous d’avoir introduit la dépendance Spring AMQP dans votre projet :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

exister application.properties Configurez les informations de connexion RabbitMQ dans le fichier :

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. Déclarez les files d'attente, les échangeurs et les liaisons

Dans Spring, les files d'attente, les échangeurs et les relations de liaison peuvent être définis via @Bean :

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. Confirmation des messages

Confirmation manuelle des messages

Les consommateurs peuvent confirmer manuellement les messages pour garantir la fiabilité du traitement des messages.utiliser@RabbitListener Lors de l'annotation, vous pouvez configureracknowledgeMode pourMANUAL, et confirmez le message manuellement dans la méthode :

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. Transaction de messages

Implémentez la prise en charge des transactions via RabbitTemplate :

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. File d'attente des lettres mortes

Configurez la file d'attente de lettres mortes et ses liaisons :

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. Retarder la file d'attente

Utilisez un plug-in pour implémenter une file d'attente différée et vous pouvez obtenir une livraison retardée des messages en configurant le TTL (Time To Live) du message :

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Consommateurs simultanés

via la configuration SimpleRabbitListenerContainerFactory Implémentez des consommateurs simultanés :

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. Plugins et extensions

Profitez des fonctions du plug-in RabbitMQ, telles que l'utilisation du plug-in Shovel pour transférer des messages entre clusters ou l'utilisation du plug-in de gestion pour la surveillance et la gestion.