Condivisione della tecnologia

Funzionalità avanzate di RabbitMQ

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

RabbitMQ è un broker di messaggi open source ampiamente utilizzato che supporta più protocolli di messaggistica e può essere utilizzato per la messaggistica affidabile nei sistemi distribuiti. Oltre alle funzionalità di base di accodamento dei messaggi, RabbitMQ fornisce anche funzionalità avanzate che ne migliorano le capacità in termini di elevata disponibilità, scalabilità e flessibilità. Ecco alcune delle principali funzionalità avanzate:

1. Alta disponibilità

  • Code con mirroring
    RabbitMQ fornisce una funzione di coda mirror per ottenere un'elevata disponibilità della coda replicando lo stato e i messaggi della coda su più nodi. Se il nodo master fallisce, è possibile un passaggio senza interruzioni al nodo di replica sulla coda mirror.

  • Modalità cluster
    RabbitMQ può essere eseguito in modalità cluster, distribuendo code e scambiatori su più nodi per migliorare la disponibilità e la scalabilità del sistema. I nodi nel cluster possono comunicare tra loro e condividere metadati di messaggi e code.

2. Coerenza del messaggio

  • Riconoscimenti dei messaggi
    I consumatori possono riconoscere i messaggi elaborati per garantire che non vadano persi. Se il messaggio non viene riconosciuto, RabbitMQ lo rimette in coda per l'elaborazione da parte di altri consumatori.

  • Transazioni
    RabbitMQ supporta la modalità di transazione AMQP, che consente ai produttori di pubblicare messaggi e confermare i messaggi all'interno di una transazione per garantire l'atomicità e la coerenza dei messaggi.

3. Durabilità del messaggio

  • Messaggi persistenti
    RabbitMQ consente di contrassegnare i messaggi come persistenti per garantire che i messaggi non vadano persi dopo il riavvio del broker. I messaggi persistenti vengono scritti su disco anziché semplicemente archiviati in memoria.

  • Code durevoli
    La coda persistente persiste dopo il riavvio del broker, garantendo che i metadati della coda non vadano persi.

4. Velocità effettiva e concorrenza elevate

  • Riconoscimenti batch
    Consenti ai consumatori di confermare i messaggi in batch, ridurre il sovraccarico di rete e I/O e migliorare la velocità effettiva.

  • Conteggio precaricamento
    Impostando il conteggio di prelettura, il consumatore può controllare l'elaborazione simultanea dei messaggi recuperando nuovi messaggi dalla coda dopo aver elaborato un numero specificato di messaggi.

5. Plugin ed estensioni

  • Sistema di plug-in
    RabbitMQ fornisce un sistema di plug-in flessibile e gli utenti possono caricare e scaricare plug-in per aumentare la funzionalità. Ad esempio, il plug-in Shovel viene utilizzato per inoltrare messaggi tra cluster e il plug-in Federation viene utilizzato per consegnare messaggi distribuiti in posizioni geografiche.

  • Plug-in di gestione
    Fornisce un'interfaccia utente basata sul Web per il monitoraggio e la gestione delle istanze RabbitMQ, inclusa la visualizzazione dello stato della coda, la configurazione dello switch, la frequenza dei messaggi, ecc.

6. Sicurezza

  • Crittografia TLS/SSL
    RabbitMQ supporta l'uso di TLS/SSL per la crittografia della trasmissione dei messaggi per garantire la sicurezza dei messaggi durante la trasmissione.

  • Controllo di accesso
    RabbitMQ fornisce un meccanismo di controllo dell'accesso basato su utenti, ruoli e autorizzazioni, consentendo agli amministratori di configurare autorizzazioni di accesso dettagliate.

7. Instradamento e scambio di messaggi

  • Diversi tipi di scambi (Exchanges)
    RabbitMQ supporta più tipi di switch, inclusi switch Direct, Topic, Fanout e Headers per soddisfare le diverse esigenze di routing dei messaggi.

  • Legami
    Connetti code e scambi tramite collegamenti per implementare strategie complesse di instradamento dei messaggi.

8. Monitoraggio e Gestione

  • Indicatori di monitoraggio (metriche)
    RabbitMQ fornisce indicatori di monitoraggio dettagliati, tra cui la velocità dei messaggi, la lunghezza della coda, il numero di connessioni, ecc., per aiutare gli amministratori a comprendere lo stato operativo del sistema.

  • Allarmi e notifiche
    RabbitMQ può configurare allarmi per attivare notifiche quando la lunghezza della coda supera una soglia o un nodo fallisce.

9. Nuovi tentativi di messaggio e code di messaggi non recapitati (code di tentativi e messaggi non recapitati)

  • Scambi e code di lettere non recapitate
    Quando un messaggio non può essere consumato o il numero di tentativi viene superato, può essere inoltrato alla coda dei messaggi non recapitati per un'ulteriore elaborazione.

  • Riprova messaggio
    Supporta la configurazione delle strategie di ripetizione dei messaggi per garantire che l'utilizzo possa essere ritentato quando l'utilizzo non riesce.

10. Cloud ibrido e data center incrociati

  • Replica tra data center
    Tramite plug-in o configurazione manuale, RabbitMQ supporta la replica dei messaggi tra diversi data center per garantire un'elevata disponibilità dei dati e funzionalità di disaster recovery.

Queste funzionalità avanzate rendono RabbitMQ un middleware di messaggistica potente e flessibile, adatto a vari sistemi distribuiti complessi e scenari applicativi. Utilizzando razionalmente queste funzioni, è possibile creare un sistema di messaggistica ad alte prestazioni, altamente disponibile e scalabile.

Come implementare le funzioni avanzate comuni in Spring:

1. Installazione e configurazione

Innanzitutto, assicurati di aver introdotto la dipendenza Spring AMQP nel tuo progetto:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

esistere application.properties Configura le informazioni di connessione RabbitMQ nel file:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. Dichiarare code, scambiatori e vincoli

In Spring, le code, gli scambiatori e le relazioni vincolanti possono essere definite tramite @Bean:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. Conferma del messaggio

Conferma manuale del messaggio

I consumatori possono confermare manualmente i messaggi per garantire l'affidabilità dell'elaborazione dei messaggi.utilizzo@RabbitListener Quando annoti, puoi configurareacknowledgeMode perMANUALe confermare manualmente il messaggio nel metodo:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. Transazione del messaggio

Implementa il supporto delle transazioni tramite RabbitTemplate:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. Coda di lettere non consegnate

Configura la coda dei messaggi non recapitabili e i suoi collegamenti:

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. Coda di ritardo

Utilizza un plug-in per implementare una coda di ritardo e puoi ottenere una consegna ritardata dei messaggi configurando il TTL (Time To Live) del messaggio:

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Consumatori simultanei

tramite configurazione SimpleRabbitListenerContainerFactory Implementare consumatori simultanei:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. Plug-in ed estensioni

Sfrutta le funzioni plug-in di RabbitMQ, come l'utilizzo del plug-in Shovel per inoltrare messaggi tra cluster o l'utilizzo del plug-in di gestione per il monitoraggio e la gestione.