내 연락처 정보
우편메소피아@프로톤메일.com
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
RabbitMQ는 여러 메시징 프로토콜을 지원하고 분산 시스템에서 안정적인 메시징에 사용할 수 있는 널리 사용되는 오픈 소스 메시지 브로커입니다. 기본 메시지 대기열 기능 외에도 RabbitMQ는 고가용성, 확장성 및 유연성 측면에서 기능을 향상시키는 고급 기능도 제공합니다. 주요 고급 기능은 다음과 같습니다.
미러링된 대기열:
RabbitMQ는 큐의 상태와 메시지를 여러 노드에 복제하여 큐의 고가용성을 달성하기 위한 미러 큐 기능을 제공합니다. 마스터 노드에 장애가 발생하면 미러 큐의 레플리카 노드로 원활한 전환이 가능합니다.
클러스터 모드:
RabbitMQ는 클러스터 모드에서 실행되어 여러 노드에 큐와 교환기를 분산시켜 시스템 가용성과 확장성을 향상시킬 수 있습니다. 클러스터의 노드는 서로 통신하고 메시지 및 대기열 메타데이터를 공유할 수 있습니다.
메시지 승인:
소비자는 처리된 메시지를 확인하여 메시지가 손실되지 않았는지 확인할 수 있습니다. 메시지가 확인되지 않으면 RabbitMQ는 다른 소비자가 처리할 수 있도록 해당 메시지를 대기열에 다시 넣습니다.
업무:
RabbitMQ는 AMQP 트랜잭션 모드를 지원합니다. 이를 통해 생성자는 메시지를 게시하고 트랜잭션 내에서 메시지를 확인하여 메시지의 원자성과 일관성을 보장할 수 있습니다.
지속성 메시지:
RabbitMQ를 사용하면 메시지를 지속성으로 표시하여 브로커가 다시 시작된 후 메시지가 손실되지 않도록 할 수 있습니다. 지속성 메시지는 단지 메모리에 저장되는 것이 아니라 디스크에 기록됩니다.
지속 가능한 대기열:
영구 대기열은 브로커가 다시 시작된 후에도 유지되므로 대기열 메타데이터가 손실되지 않습니다.
일괄 승인:
소비자가 일괄적으로 메시지를 확인하고, 네트워크 및 I/O 오버헤드를 줄이고, 처리량을 향상시킬 수 있습니다.
프리페치 수:
프리페치 수를 설정하면 소비자는 지정된 수의 메시지를 처리한 후 대기열에서 새 메시지를 가져와 동시 메시지 처리를 제어할 수 있습니다.
플러그인 시스템:
RabbitMQ는 유연한 플러그인 시스템을 제공하며 사용자는 플러그인을 로드 및 언로드하여 기능을 향상시킬 수 있습니다. 예를 들어, Shovel 플러그인은 클러스터 전체에 메시지를 전달하는 데 사용되고, Federation 플러그인은 지리적 위치에 분산된 메시지를 전달하는 데 사용됩니다.
관리 플러그인:
대기열 상태 보기, 스위치 구성, 메시지 속도 등을 포함하여 RabbitMQ 인스턴스를 모니터링하고 관리하기 위한 웹 기반 사용자 인터페이스를 제공합니다.
TLS/SSL 암호화:
RabbitMQ는 메시지 전송 암호화에 TLS/SSL 사용을 지원하여 전송 중 메시지 보안을 보장합니다.
액세스 제어:
RabbitMQ는 사용자, 역할 및 권한을 기반으로 하는 액세스 제어 메커니즘을 제공하므로 관리자는 세분화된 액세스 권한을 구성할 수 있습니다.
다양한 유형의 교환(교환):
RabbitMQ는 다양한 메시지 라우팅 요구 사항을 충족하기 위해 Direct, Topic, Fanout 및 Headers 스위치를 포함한 여러 유형의 스위치를 지원합니다.
바인딩:
바인딩을 통해 큐와 교환을 연결하여 복잡한 메시지 라우팅 전략을 구현합니다.
모니터링 지표(지표):
RabbitMQ는 관리자가 시스템의 운영 상태를 이해할 수 있도록 메시지 속도, 대기열 길이, 연결 수 등을 포함한 자세한 모니터링 지표를 제공합니다.
경보 및 알림:
RabbitMQ는 대기열 길이가 임계값을 초과하거나 노드에 장애가 발생할 경우 알림을 트리거하도록 경보를 구성할 수 있습니다.
배달 못한 편지 교환 및 대기열:
메시지를 사용할 수 없거나 재시도 횟수가 초과되면 추가 처리를 위해 배달 못한 편지 큐로 전달될 수 있습니다.
메시지 재시도:
소비 실패 시 소비를 재시도할 수 있도록 메시지 재시도 전략 구성을 지원합니다.
이러한 고급 기능을 통해 RabbitMQ는 다양하고 복잡한 분산 시스템 및 애플리케이션 시나리오에 적합한 강력하고 유연한 메시징 미들웨어입니다. 이러한 기능을 합리적으로 활용함으로써 고성능, 고가용성, 확장성이 뛰어난 메시징 시스템을 구축할 수 있습니다.
먼저 프로젝트에 Spring AMQP 종속성을 도입했는지 확인하세요.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
존재하다 application.properties
파일에서 RabbitMQ 연결 정보를 구성합니다.
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Spring에서는 @Bean을 통해 큐, 교환기 및 바인딩 관계를 정의할 수 있습니다.
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
static final String queueName = "testQueue";
static final String exchangeName = "testExchange";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
소비자는 메시지 처리의 신뢰성을 보장하기 위해 메시지를 수동으로 확인할 수 있습니다.사용@RabbitListener
주석을 달 때 다음을 구성할 수 있습니다.acknowledgeMode
~을 위한MANUAL
, 메소드에서 메시지를 수동으로 확인하십시오.
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;
public class RabbitMQReceiver {
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理消息
System.out.println("Received message: " + new String(message.getBody()));
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息
channel.basicNack(tag, false, true);
}
}
}
RabbitTemplate을 통해 트랜잭션 지원을 구현합니다.
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendMessage(String message) {
// 发送消息
rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
// 模拟事务回滚
if (message.contains("error")) {
throw new RuntimeException("Error occurred");
}
}
}
배달 못한 편지 대기열과 해당 바인딩을 구성합니다.
@Bean
Queue dlq() {
return new Queue("dlq", true);
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}
@Bean
Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("mainQueue", true, false, false, args);
}
플러그인을 사용하여 지연 대기열을 구현하고 메시지의 TTL(Time To Live)을 구성하여 지연된 메시지 전달을 달성할 수 있습니다.
@Bean
Queue delayedQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("delayedQueue", true, false, false, args);
}
구성을 통해 SimpleRabbitListenerContainerFactory
동시 소비자 구현:
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
Shovel 플러그인을 사용하여 클러스터 간 메시지를 전달하거나 Management Plugin을 사용하여 모니터링 및 관리하는 등 RabbitMQ의 플러그인 기능을 활용하세요.