技術共有

RabbitMQ の高度な機能

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

RabbitMQ は、複数のメッセージング プロトコルをサポートし、分散システムにおける信頼性の高いメッセージングに使用できる、広く使用されているオープン ソースのメッセージ ブローカーです。 RabbitMQ は、基本的なメッセージ キュー機能に加えて、高可用性、スケーラビリティ、柔軟性の点で機能を強化する高度な機能も提供します。主な高度な機能の一部を次に示します。

1. 高可用性

  • ミラーリングされたキュー
    RabbitMQ は、キューのステータスとメッセージを複数のノードに複製することでキューの高可用性を実現するミラー キュー機能を提供します。マスター ノードに障害が発生した場合、ミラー キュー上のレプリカ ノードへのシームレスな切り替えが可能です。

  • クラスターモード
    RabbitMQ はクラスター モードで実行でき、キューとエクスチェンジャーを複数のノードに分散してシステムの可用性とスケーラビリティを向上させます。クラスター内のノードは相互に通信し、メッセージとキューのメタデータを共有できます。

2. メッセージの一貫性

  • メッセージの確認応答
    コンシューマは、処理されたメッセージを確認して、メッセージが失われないようにすることができます。メッセージが確認応答されない場合、RabbitMQ は他のコンシューマによる処理のためにメッセージをキューに戻します。

  • 取引
    RabbitMQ は、AMQP トランザクション モードをサポートしています。これにより、プロデューサーはメッセージを発行し、トランザクション内でメッセージを確認して、メッセージのアトミック性と一貫性を確保できます。

3. メッセージの耐久性

  • 永続的なメッセージ
    RabbitMQ を使用すると、ブローカーの再起動後にメッセージが失われないように、メッセージを永続的としてマークできます。永続メッセージは、メモリに保存されるだけではなく、ディスクに書き込まれます。

  • 永続的なキュー
    永続キューはブローカーの再起動後も存続し、キューのメタデータが失われないようにします。

4. 高スループットと同時実行性

  • バッチ確認応答
    コンシューマがメッセージをバッチで確認できるようにし、ネットワークと I/O のオーバーヘッドを削減し、スループットを向上させます。

  • プリフェッチ数
    プリフェッチ数を設定すると、コンシューマは、指定された数のメッセージを処理した後にキューから新しいメッセージをフェッチすることで、メッセージの同時処理を制御できます。

5. プラグインと拡張機能

  • プラグインシステム
    RabbitMQ は柔軟なプラグイン システムを提供し、ユーザーはプラグインをロードおよびアンロードして機能を強化できます。たとえば、Shovel プラグインはクラスター間でメッセージを転送するために使用され、Federation プラグインは地理的な場所に分散されたメッセージを配信するために使用されます。

  • 管理プラグイン
    キューのステータス、スイッチ構成、メッセージ レートなどの表示を含む、RabbitMQ インスタンスを監視および管理するための Web ベースのユーザー インターフェイスを提供します。

6. セキュリティ

  • TLS/SSL暗号化
    RabbitMQ は、メッセージ送信暗号化に TLS/SSL の使用をサポートし、送信中のメッセージのセキュリティを確保します。

  • アクセス制御
    RabbitMQ は、ユーザー、ロール、権限に基づいたアクセス制御メカニズムを提供し、管理者がきめ細かいアクセス権限を構成できるようにします。

7. メッセージのルーティングと交換

  • 各種取引所(取引所)
    RabbitMQ は、さまざまなメッセージ ルーティングのニーズを満たすために、ダイレクト、トピック、ファンアウト、ヘッダー スイッチなどの複数のタイプのスイッチをサポートします。

  • バインディング
    バインディングを介してキューと交換を接続し、複雑なメッセージ ルーティング戦略を実装します。

8. 監視と管理

  • 監視指標(メトリクス)
    RabbitMQ は、メッセージ レート、キューの長さ、接続数などを含む詳細な監視指標を提供し、管理者がシステムの動作状態を理解するのに役立ちます。

  • アラームと通知
    RabbitMQ は、キューの長さがしきい値を超えた場合、またはノードに障害が発生した場合に通知をトリガーするようにアラームを設定できます。

9. メッセージの再試行と配信不能キュー (再試行キューと配信不能キュー)

  • デッドレター交換とキュー
    メッセージを消費できない場合、または再試行回数を超えた場合、メッセージはデッド レター キューに転送され、さらなる処理が行われます。

  • メッセージの再試行
    消費が失敗した場合に消費を確実に再試行できるようにするメッセージ再試行戦略の構成をサポートします。

10. ハイブリッド クラウドとクロスデータセンター

  • データセンター間のレプリケーション
    RabbitMQ は、プラグインまたは手動構成を通じて、異なるデータ センター間のメッセージ レプリケーションをサポートし、高いデータ可用性と災害復旧機能を保証します。

これらの高度な機能により、RabbitMQ は、さまざまな複雑な分散システムやアプリケーション シナリオに適した、強力で柔軟なメッセージング ミドルウェアになります。これらの機能を合理的に活用することで、高性能、高可用性、スケーラブルなメッセージングシステムを構築できます。

Spring で一般的な高度な機能を実装する方法:

1. インストールと設定

まず、Spring AMQP の依存関係がプロジェクトに導入されていることを確認してください。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

存在する application.properties ファイルで RabbitMQ 接続情報を構成します。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. キュー、エクスチェンジャー、バインディングを宣言する

Spring では、キュー、エクスチェンジャー、バインディング関係は @Bean を通じて定義できます。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3.メッセージ確認

手動メッセージ確認

消費者はメッセージを手動で確認して、メッセージ処理の信頼性を確保できます。使用 @RabbitListener 注釈を付けるときに設定できるのは、acknowledgeMode のためにMANUALを選択し、次のメソッドでメッセージを手動で確認します。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. メッセージトランザクション

RabbitTemplate を通じてトランザクション サポートを実装します。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. デッドレターキュー

デッドレターキューとそのバインディングを設定します。

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. 遅延キュー

プラグインを使用して遅延キューを実装すると、メッセージの TTL (Time To Live) を構成することで、メッセージ配信の遅延を実現できます。

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. 同時利用者

設定経由 SimpleRabbitListenerContainerFactory 同時コンシューマを実装します。

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. プラグインと拡張機能

Shovel プラグインを使用してクラスタ間メッセージを転送したり、管理プラグインを使用して監視および管理したりするなど、RabbitMQ のプラグイン機能を活用します。