Mi informacion de contacto
Correo[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
RabbitMQ es un intermediario de mensajes de código abierto ampliamente utilizado que admite múltiples protocolos de mensajería y puede usarse para mensajería confiable en sistemas distribuidos. Además de la funcionalidad básica de cola de mensajes, RabbitMQ también proporciona funciones avanzadas que mejoran sus capacidades en términos de alta disponibilidad, escalabilidad y flexibilidad. Estas son algunas de las principales funciones avanzadas:
Colas reflejadas:
RabbitMQ proporciona una función de cola espejo para lograr una alta disponibilidad de la cola replicando el estado y los mensajes de la cola en múltiples nodos. Si el nodo maestro falla, es posible una conmutación perfecta al nodo réplica en la cola reflejada.
Modo de clúster:
RabbitMQ puede ejecutarse en modo clúster, distribuyendo colas e intercambiadores en múltiples nodos para mejorar la disponibilidad y escalabilidad del sistema. Los nodos del clúster pueden comunicarse entre sí y compartir metadatos de colas y mensajes.
Confirmaciones de mensajes:
Los consumidores pueden reconocer los mensajes procesados para asegurarse de que no se pierdan. Si no se reconoce el mensaje, RabbitMQ lo vuelve a poner en la cola para que otros consumidores lo procesen.
Actas:
RabbitMQ admite el modo de transacción AMQP, que permite a los productores publicar mensajes y confirmar mensajes dentro de una transacción para garantizar la atomicidad y coherencia de los mensajes.
Mensajes persistentes:
RabbitMQ permite marcar mensajes como persistentes para garantizar que no se pierdan después de que se reinicie el broker. Los mensajes persistentes se escriben en el disco en lugar de simplemente almacenarse en la memoria.
Colas duraderas:
La cola persistente persiste después de que se reinicia el intermediario, lo que garantiza que no se pierdan los metadatos de la cola.
Reconocimientos por lotes:
Permita que los consumidores confirmen mensajes en lotes, reduzca la sobrecarga de red y de E/S y mejore el rendimiento.
Recuento de captación previa:
Al configurar el recuento de captación previa, el consumidor puede controlar el procesamiento simultáneo de mensajes recuperando mensajes nuevos de la cola después de procesar una cantidad específica de mensajes.
Sistema de complementos:
RabbitMQ proporciona un sistema de complementos flexible y los usuarios pueden cargar y descargar complementos para aumentar la funcionalidad. Por ejemplo, el complemento Shovel se usa para reenviar mensajes entre clústeres y el complemento Federation se usa para entregar mensajes distribuidos en ubicaciones geográficas.
Complemento de gestión:
Proporciona una interfaz de usuario basada en web para monitorear y administrar instancias de RabbitMQ, incluida la visualización del estado de la cola, la configuración del conmutador, la velocidad de mensajes, etc.
Cifrado TLS/SSL:
RabbitMQ admite el uso de TLS/SSL para el cifrado de transmisión de mensajes para garantizar la seguridad de los mensajes durante la transmisión.
Control de acceso:
RabbitMQ proporciona un mecanismo de control de acceso basado en usuarios, roles y permisos, lo que permite a los administradores configurar permisos de acceso detallados.
Diferentes tipos de intercambios (Exchanges):
RabbitMQ admite múltiples tipos de conmutadores, incluidos conmutadores Direct, Topic, Fanout y Headers para satisfacer diferentes necesidades de enrutamiento de mensajes.
Fijaciones:
Conecte colas e intercambios a través de enlaces para implementar estrategias complejas de enrutamiento de mensajes.
Indicadores de seguimiento (Métricas):
RabbitMQ proporciona indicadores de monitoreo detallados, incluida la tasa de mensajes, la longitud de la cola, la cantidad de conexiones, etc., para ayudar a los administradores a comprender el estado operativo del sistema.
Alarmas y Notificaciones:
RabbitMQ puede configurar alarmas para activar notificaciones cuando la longitud de la cola excede un umbral o falla un nodo.
Intercambios de mensajes fallidos y colas:
Cuando un mensaje no se puede consumir o se excede el número de reintentos, se puede reenviar a la cola de mensajes no entregados para su posterior procesamiento.
Reintentar mensaje:
Admite la configuración de estrategias de reintento de mensajes para garantizar que se pueda reintentar el consumo cuando falla.
Estas funciones avanzadas hacen de RabbitMQ un middleware de mensajería potente y flexible, adecuado para diversos sistemas distribuidos complejos y escenarios de aplicaciones. Al utilizar racionalmente estas funciones, se puede crear un sistema de mensajería escalable, de alto rendimiento y de alta disponibilidad.
Primero, asegúrese de haber introducido la dependencia Spring AMQP en su proyecto:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
existir application.properties
Configure la información de conexión de RabbitMQ en el archivo:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
En Spring, las colas, los intercambiadores y las relaciones vinculantes se pueden definir a través de @Bean:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
static final String queueName = "testQueue";
static final String exchangeName = "testExchange";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
Los consumidores pueden confirmar mensajes manualmente para garantizar la confiabilidad del procesamiento de mensajes.usar@RabbitListener
Al anotar, puede configuraracknowledgeMode
paraMANUAL
y confirme el mensaje manualmente en el método:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;
public class RabbitMQReceiver {
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理消息
System.out.println("Received message: " + new String(message.getBody()));
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息
channel.basicNack(tag, false, true);
}
}
}
Implemente soporte de transacciones a través de RabbitTemplate:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendMessage(String message) {
// 发送消息
rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
// 模拟事务回滚
if (message.contains("error")) {
throw new RuntimeException("Error occurred");
}
}
}
Configure la cola de mensajes no entregados y sus enlaces:
@Bean
Queue dlq() {
return new Queue("dlq", true);
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}
@Bean
Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("mainQueue", true, false, false, args);
}
Utilice un complemento para implementar una cola de retraso y podrá lograr la entrega de mensajes retrasada configurando el TTL (tiempo de vida) del mensaje:
@Bean
Queue delayedQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("delayedQueue", true, false, false, args);
}
vía configuración SimpleRabbitListenerContainerFactory
Implementar consumidores concurrentes:
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
Aproveche las funciones del complemento de RabbitMQ, como el uso del complemento Shovel para reenviar mensajes entre clústeres o el complemento de administración para monitoreo y administración.