Berbagi teknologi

Fitur lanjutan RabbitMQ

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

RabbitMQ adalah broker pesan sumber terbuka yang banyak digunakan yang mendukung banyak protokol pengiriman pesan dan dapat digunakan untuk pengiriman pesan yang andal dalam sistem terdistribusi. Selain fungsionalitas antrian pesan dasar, RabbitMQ juga menyediakan fitur-fitur canggih yang meningkatkan kemampuannya dalam hal ketersediaan tinggi, skalabilitas, dan fleksibilitas. Berikut adalah beberapa fitur lanjutan utama:

1. Ketersediaan Tinggi

  • Antrian yang Dicerminkan
    RabbitMQ menyediakan fungsi antrian cermin untuk mencapai ketersediaan antrian yang tinggi dengan mereplikasi status dan pesan antrian ke beberapa node. Jika node master gagal, peralihan mulus ke node replika pada antrian cermin dapat dilakukan.

  • Modus Klaster
    RabbitMQ dapat berjalan dalam mode cluster, mendistribusikan antrian dan penukar di beberapa node untuk meningkatkan ketersediaan dan skalabilitas sistem. Node di cluster dapat berkomunikasi satu sama lain dan berbagi metadata pesan dan antrian.

2. Konsistensi Pesan

  • Ucapan Terima Kasih
    Konsumen dapat mengetahui pesan yang diproses untuk memastikan pesan tersebut tidak hilang. Jika pesan tidak diterima, RabbitMQ mengembalikannya ke antrean untuk diproses oleh konsumen lain.

  • Transaksi
    RabbitMQ mendukung mode transaksi AMQP, yang memungkinkan produsen mempublikasikan pesan dan mengonfirmasi pesan dalam suatu transaksi untuk memastikan atomisitas dan konsistensi pesan.

3. Daya Tahan Pesan

  • Pesan yang Persisten
    RabbitMQ memungkinkan pesan ditandai sebagai persisten untuk memastikan bahwa pesan tidak hilang setelah broker dimulai ulang. Pesan persisten ditulis ke disk, bukan hanya disimpan di memori.

  • Antrian Tahan Lama
    Antrean persisten tetap ada setelah broker dimulai ulang, memastikan bahwa metadata antrean tidak hilang.

4. Throughput dan Konkurensi Tinggi

  • Ucapan Terima Kasih Batch
    Memungkinkan konsumen mengonfirmasi pesan dalam batch, mengurangi overhead jaringan dan I/O, serta meningkatkan throughput.

  • Jumlah Pengambilan Awal
    Dengan mengatur jumlah prefetch, konsumen dapat mengontrol pemrosesan pesan secara bersamaan dengan mengambil pesan baru dari antrian setelah memproses sejumlah pesan tertentu.

5. Plugin dan Ekstensi

  • Sistem Pengaya
    RabbitMQ menyediakan sistem plug-in yang fleksibel, dan pengguna dapat memuat dan membongkar plug-in untuk meningkatkan fungsionalitas. Misalnya, plugin Shovel digunakan untuk meneruskan pesan antar cluster, dan plugin Federasi digunakan untuk mengirimkan pesan yang didistribusikan ke seluruh lokasi geografis.

  • Plugin Manajemen
    Menyediakan antarmuka pengguna berbasis web untuk memantau dan mengelola instans RabbitMQ, termasuk melihat status antrean, konfigurasi sakelar, kecepatan pesan, dll.

6. Keamanan

  • Enkripsi TLS/SSL
    RabbitMQ mendukung penggunaan TLS/SSL untuk enkripsi transmisi pesan guna menjamin keamanan pesan selama transmisi.

  • Kontrol akses
    RabbitMQ menyediakan mekanisme kontrol akses berdasarkan pengguna, peran, dan izin, memungkinkan administrator untuk mengonfigurasi izin akses yang terperinci.

7. Perutean dan Pertukaran Pesan

  • Berbagai jenis bursa (Exchange)
    RabbitMQ mendukung berbagai jenis sakelar, termasuk sakelar Langsung, Topik, Fanout, dan Header untuk memenuhi kebutuhan perutean pesan yang berbeda.

  • Binding
    Hubungkan antrian dan pertukaran melalui pengikatan untuk menerapkan strategi perutean pesan yang kompleks.

8. Pemantauan dan Pengelolaan

  • Indikator pemantauan (Metrik)
    RabbitMQ menyediakan indikator pemantauan terperinci, termasuk kecepatan pesan, panjang antrian, jumlah koneksi, dll., untuk membantu administrator memahami status pengoperasian sistem.

  • Alarm dan Pemberitahuan
    RabbitMQ dapat mengonfigurasi alarm untuk memicu pemberitahuan ketika panjang antrian melebihi ambang batas atau sebuah node gagal.

9. Percobaan ulang pesan dan antrian surat mati (Retry and Dead-Letter Queues)

  • Pertukaran Surat Mati dan Antrian
    Ketika sebuah pesan tidak dapat digunakan atau jumlah percobaan ulang terlampaui, pesan tersebut dapat diteruskan ke antrian surat mati untuk diproses lebih lanjut.

  • Coba Ulang Pesan
    Mendukung konfigurasi strategi percobaan ulang pesan untuk memastikan bahwa konsumsi dapat dicoba ulang ketika konsumsi gagal.

10. Cloud Hibrid dan Pusat Data Lintas

  • Replikasi Lintas Pusat Data
    Melalui plug-in atau konfigurasi manual, RabbitMQ mendukung replikasi pesan antara pusat data yang berbeda, memastikan ketersediaan data yang tinggi dan kemampuan pemulihan bencana.

Fitur-fitur canggih ini menjadikan RabbitMQ sebagai middleware perpesanan yang kuat dan fleksibel, cocok untuk berbagai sistem terdistribusi dan skenario aplikasi yang kompleks. Dengan memanfaatkan fungsi-fungsi ini secara rasional, sistem pesan berkinerja tinggi, ketersediaan tinggi, dan skalabel dapat dibangun.

Cara mengimplementasikan fungsi-fungsi lanjutan umum di Spring:

1. Instalasi dan konfigurasi

Pertama, pastikan Anda telah memperkenalkan ketergantungan Spring AMQP ke dalam proyek Anda:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

ada application.properties Konfigurasikan informasi koneksi RabbitMQ dalam file:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. Deklarasikan antrian, penukar dan pengikatan

Di Spring, antrian, penukar, dan hubungan yang mengikat dapat ditentukan melalui @Bean:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. Konfirmasi pesan

Konfirmasi pesan manual

Konsumen dapat mengkonfirmasi pesan secara manual untuk memastikan keandalan pemrosesan pesan.menggunakan@RabbitListener Saat membuat anotasi, Anda dapat mengonfigurasiacknowledgeMode untukMANUAL, dan konfirmasikan pesan secara manual dengan metode:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. Transaksi pesan

Menerapkan dukungan transaksi melalui RabbitTemplate:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. Antrian surat mati

Konfigurasikan antrian surat mati dan pengikatannya:

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. Penundaan antrian

Gunakan plugin untuk menerapkan antrian penundaan, dan Anda dapat mencapai penundaan pengiriman pesan dengan mengonfigurasi TTL (Time To Live) pesan:

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Konsumen serentak

melalui konfigurasi SimpleRabbitListenerContainerFactory Menerapkan konsumen bersamaan:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. Plug-in dan Ekstensi

Manfaatkan fungsi plug-in RabbitMQ, seperti menggunakan plug-in Shovel untuk meneruskan pesan lintas cluster, atau menggunakan Plugin Manajemen untuk pemantauan dan manajemen.