プライベートな連絡先の最初の情報
送料メール:
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
RabbitMQ は、複数のメッセージング プロトコルをサポートし、分散システムにおける信頼性の高いメッセージングに使用できる、広く使用されているオープン ソースのメッセージ ブローカーです。 RabbitMQ は、基本的なメッセージ キュー機能に加えて、高可用性、スケーラビリティ、柔軟性の点で機能を強化する高度な機能も提供します。主な高度な機能の一部を次に示します。
ミラーリングされたキュー:
RabbitMQ は、キューのステータスとメッセージを複数のノードに複製することでキューの高可用性を実現するミラー キュー機能を提供します。マスター ノードに障害が発生した場合、ミラー キュー上のレプリカ ノードへのシームレスな切り替えが可能です。
クラスターモード:
RabbitMQ はクラスター モードで実行でき、キューとエクスチェンジャーを複数のノードに分散してシステムの可用性とスケーラビリティを向上させます。クラスター内のノードは相互に通信し、メッセージとキューのメタデータを共有できます。
メッセージの確認応答:
コンシューマは、処理されたメッセージを確認して、メッセージが失われないようにすることができます。メッセージが確認応答されない場合、RabbitMQ は他のコンシューマによる処理のためにメッセージをキューに戻します。
取引:
RabbitMQ は、AMQP トランザクション モードをサポートしています。これにより、プロデューサーはメッセージを発行し、トランザクション内でメッセージを確認して、メッセージのアトミック性と一貫性を確保できます。
永続的なメッセージ:
RabbitMQ を使用すると、ブローカーの再起動後にメッセージが失われないように、メッセージを永続的としてマークできます。永続メッセージは、メモリに保存されるだけではなく、ディスクに書き込まれます。
永続的なキュー:
永続キューはブローカーの再起動後も存続し、キューのメタデータが失われないようにします。
バッチ確認応答:
コンシューマがメッセージをバッチで確認できるようにし、ネットワークと I/O のオーバーヘッドを削減し、スループットを向上させます。
プリフェッチ数:
プリフェッチ数を設定すると、コンシューマは、指定された数のメッセージを処理した後にキューから新しいメッセージをフェッチすることで、メッセージの同時処理を制御できます。
プラグインシステム:
RabbitMQ は柔軟なプラグイン システムを提供し、ユーザーはプラグインをロードおよびアンロードして機能を強化できます。たとえば、Shovel プラグインはクラスター間でメッセージを転送するために使用され、Federation プラグインは地理的な場所に分散されたメッセージを配信するために使用されます。
管理プラグイン:
キューのステータス、スイッチ構成、メッセージ レートなどの表示を含む、RabbitMQ インスタンスを監視および管理するための Web ベースのユーザー インターフェイスを提供します。
TLS/SSL暗号化:
RabbitMQ は、メッセージ送信暗号化に TLS/SSL の使用をサポートし、送信中のメッセージのセキュリティを確保します。
アクセス制御:
RabbitMQ は、ユーザー、ロール、権限に基づいたアクセス制御メカニズムを提供し、管理者がきめ細かいアクセス権限を構成できるようにします。
各種取引所(取引所):
RabbitMQ は、さまざまなメッセージ ルーティングのニーズを満たすために、ダイレクト、トピック、ファンアウト、ヘッダー スイッチなどの複数のタイプのスイッチをサポートします。
バインディング:
バインディングを介してキューと交換を接続し、複雑なメッセージ ルーティング戦略を実装します。
監視指標(メトリクス):
RabbitMQ は、メッセージ レート、キューの長さ、接続数などを含む詳細な監視指標を提供し、管理者がシステムの動作状態を理解するのに役立ちます。
アラームと通知:
RabbitMQ は、キューの長さがしきい値を超えた場合、またはノードに障害が発生した場合に通知をトリガーするようにアラームを設定できます。
デッドレター交換とキュー:
メッセージを消費できない場合、または再試行回数を超えた場合、メッセージはデッド レター キューに転送され、さらなる処理が行われます。
メッセージの再試行:
消費が失敗した場合に消費を確実に再試行できるようにするメッセージ再試行戦略の構成をサポートします。
これらの高度な機能により、RabbitMQ は、さまざまな複雑な分散システムやアプリケーション シナリオに適した、強力で柔軟なメッセージング ミドルウェアになります。これらの機能を合理的に活用することで、高性能、高可用性、スケーラブルなメッセージングシステムを構築できます。
まず、Spring AMQP の依存関係がプロジェクトに導入されていることを確認してください。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
存在する application.properties
ファイルで RabbitMQ 接続情報を構成します。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Spring では、キュー、エクスチェンジャー、バインディング関係は @Bean を通じて定義できます。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
static final String queueName = "testQueue";
static final String exchangeName = "testExchange";
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
消費者はメッセージを手動で確認して、メッセージ処理の信頼性を確保できます。使用 @RabbitListener
注釈を付けるときに設定できるのは、acknowledgeMode
のためにMANUAL
を選択し、次のメソッドでメッセージを手動で確認します。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;
public class RabbitMQReceiver {
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 处理消息
System.out.println("Received message: " + new String(message.getBody()));
// 手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息
channel.basicNack(tag, false, true);
}
}
}
RabbitTemplate を通じてトランザクション サポートを実装します。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendMessage(String message) {
// 发送消息
rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);
// 模拟事务回滚
if (message.contains("error")) {
throw new RuntimeException("Error occurred");
}
}
}
デッドレターキューとそのバインディングを設定します。
@Bean
Queue dlq() {
return new Queue("dlq", true);
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}
@Bean
Queue mainQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("mainQueue", true, false, false, args);
}
プラグインを使用して遅延キューを実装すると、メッセージの TTL (Time To Live) を構成することで、メッセージ配信の遅延を実現できます。
@Bean
Queue delayedQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
args.put("x-dead-letter-exchange", exchangeName);
args.put("x-dead-letter-routing-key", "dlqRoutingKey");
return new Queue("delayedQueue", true, false, false, args);
}
設定経由 SimpleRabbitListenerContainerFactory
同時コンシューマを実装します。
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3); // 并发消费者数量
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
Shovel プラグインを使用してクラスタ間メッセージを転送したり、管理プラグインを使用して監視および管理したりするなど、RabbitMQ のプラグイン機能を活用します。