Compartilhamento de tecnologia

Recursos avançados do RabbitMQ

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

RabbitMQ é um corretor de mensagens de código aberto amplamente utilizado que oferece suporte a vários protocolos de mensagens e pode ser usado para mensagens confiáveis ​​em sistemas distribuídos. Além da funcionalidade básica de enfileiramento de mensagens, o RabbitMQ também oferece recursos avançados que aprimoram seus recursos em termos de alta disponibilidade, escalabilidade e flexibilidade. Aqui estão alguns dos principais recursos avançados:

1. Alta disponibilidade

  • Filas espelhadas
    RabbitMQ fornece uma função de fila espelhada para obter alta disponibilidade da fila, replicando o status e as mensagens da fila para vários nós. Se o nó mestre falhar, será possível uma alternância contínua para o nó de réplica na fila de espelhos.

  • Modo Cluster
    RabbitMQ pode ser executado em modo cluster, distribuindo filas e trocadores em vários nós para melhorar a disponibilidade e escalabilidade do sistema. Os nós do cluster podem se comunicar entre si e compartilhar metadados de mensagens e filas.

2. Consistência de mensagens

  • Agradecimentos de mensagens
    Os consumidores podem reconhecer as mensagens processadas para garantir que não sejam perdidas. Se a mensagem não for reconhecida, o RabbitMQ a coloca de volta na fila para processamento por outros consumidores.

  • Transações
    RabbitMQ suporta o modo de transação AMQP, que permite aos produtores publicar mensagens e confirmar mensagens dentro de uma transação para garantir a atomicidade e consistência das mensagens.

3. Durabilidade da mensagem

  • Mensagens persistentes
    RabbitMQ permite que as mensagens sejam marcadas como persistentes para garantir que as mensagens não sejam perdidas após a reinicialização do broker. As mensagens persistentes são gravadas no disco em vez de apenas armazenadas na memória.

  • Filas Duráveis
    A fila persistente persiste após o broker ser reiniciado, garantindo que os metadados da fila não sejam perdidos.

4. Alto rendimento e simultaneidade

  • Agradecimentos em lote
    Permita que os consumidores confirmem mensagens em lotes, reduzam a sobrecarga de rede e de E/S e melhorem o rendimento.

  • Contagem de pré-busca
    Ao definir a contagem de pré-busca, o consumidor pode controlar o processamento simultâneo de mensagens, buscando novas mensagens na fila após processar um número especificado de mensagens.

5. Plug-ins e extensões

  • Sistema de plug-ins
    RabbitMQ fornece um sistema de plug-ins flexível e os usuários podem carregar e descarregar plug-ins para aumentar a funcionalidade. Por exemplo, o plugin Shovel é usado para encaminhar mensagens entre clusters, e o plugin Federation é usado para entregar mensagens distribuídas em locais geográficos.

  • Plug-in de gerenciamento
    Fornece uma interface de usuário baseada na web para monitorar e gerenciar instâncias RabbitMQ, incluindo visualização de status de fila, configuração de switch, taxa de mensagens, etc.

6. Segurança

  • Criptografia TLS/SSL
    RabbitMQ suporta o uso de TLS/SSL para criptografia de transmissão de mensagens para garantir a segurança das mensagens durante a transmissão.

  • Controle de acesso
    RabbitMQ fornece um mecanismo de controle de acesso baseado em usuários, funções e permissões, permitindo que os administradores configurem permissões de acesso refinadas.

7. Roteamento e troca de mensagens

  • Diferentes tipos de trocas (trocas)
    RabbitMQ oferece suporte a vários tipos de switches, incluindo switches diretos, de tópico, fanout e cabeçalhos para atender a diferentes necessidades de roteamento de mensagens.

  • Ligações
    Conecte filas e trocas por meio de ligações para implementar estratégias complexas de roteamento de mensagens.

8. Monitoramento e Gestão

  • Indicadores de monitoramento (Métricas)
    RabbitMQ fornece indicadores de monitoramento detalhados, incluindo taxa de mensagens, comprimento da fila, número de conexões, etc., para ajudar os administradores a entender o status operacional do sistema.

  • Alarmes e Notificações
    RabbitMQ pode configurar alarmes para acionar notificações quando o comprimento da fila exceder um limite ou um nó falhar.

9. Novas tentativas de mensagens e filas de mensagens mortas (Retry and Dead-Letters Queues)

  • Trocas de cartas mortas e filas
    Quando uma mensagem não pode ser consumida ou o número de novas tentativas é excedido, ela pode ser encaminhada para a fila de devoluções para processamento adicional.

  • Nova tentativa de mensagem
    Suporta a configuração de estratégias de repetição de mensagens para garantir que o consumo possa ser repetido quando o consumo falhar.

10. Nuvem híbrida e datacenter cruzado

  • Replicação entre datacenters
    Por meio de plug-ins ou configuração manual, o RabbitMQ oferece suporte à replicação de mensagens entre diferentes data centers para garantir alta disponibilidade de dados e recursos de recuperação de desastres.

Esses recursos avançados tornam o RabbitMQ um middleware de mensagens poderoso e flexível, adequado para vários sistemas distribuídos complexos e cenários de aplicativos. Ao utilizar racionalmente essas funções, um sistema de mensagens de alto desempenho, altamente disponível e escalável pode ser construído.

Como implementar funções avançadas comuns no Spring:

1. Instalação e configuração

Primeiro, certifique-se de ter introduzido a dependência Spring AMQP em seu projeto:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

existir application.properties Configure as informações de conexão do RabbitMQ no arquivo:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. Declarar filas, trocadores e ligações

No Spring, filas, trocadores e relacionamentos de ligação podem ser definidos através do @Bean:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. Confirmação de mensagem

Confirmação manual de mensagem

Os consumidores podem confirmar mensagens manualmente para garantir a confiabilidade do processamento de mensagens.usar@RabbitListener Ao anotar, você pode configuraracknowledgeMode paraMANUALe confirme a mensagem manualmente no método:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. Transação de mensagem

Implemente suporte a transações por meio do RabbitTemplate:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. Fila de mensagens mortas

Configure a fila de devoluções e suas ligações:

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. Fila de atraso

Use um plug-in para implementar uma fila de atraso e você poderá obter atraso na entrega de mensagens configurando o TTL (Time To Live) da mensagem:

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Consumidores simultâneos

via configuração SimpleRabbitListenerContainerFactory Implemente consumidores simultâneos:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. Plug-ins e extensões

Aproveite as vantagens das funções do plug-in do RabbitMQ, como usar o plug-in Shovel para encaminhar mensagens entre clusters ou usar o plug-in de gerenciamento para monitoramento e gerenciamento.