Κοινή χρήση τεχνολογίας

Προηγμένες λειτουργίες RabbitMQ

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Το RabbitMQ είναι ένας ευρέως χρησιμοποιούμενος μεσίτης μηνυμάτων ανοιχτού κώδικα που υποστηρίζει πολλαπλά πρωτόκολλα ανταλλαγής μηνυμάτων και μπορεί να χρησιμοποιηθεί για αξιόπιστη ανταλλαγή μηνυμάτων σε κατανεμημένα συστήματα. Εκτός από τη βασική λειτουργία ουράς μηνυμάτων, το RabbitMQ παρέχει επίσης προηγμένες δυνατότητες που ενισχύουν τις δυνατότητές του όσον αφορά την υψηλή διαθεσιμότητα, την επεκτασιμότητα και την ευελιξία. Εδώ είναι μερικά από τα κύρια προηγμένα χαρακτηριστικά:

1. Υψηλή διαθεσιμότητα

  • Καθρέφτες Ουρές
    Το RabbitMQ παρέχει μια λειτουργία κατοπτρικής ουράς για την επίτευξη υψηλής διαθεσιμότητας της ουράς αναπαράγοντας την κατάσταση και τα μηνύματα της ουράς σε πολλούς κόμβους. Εάν ο κύριος κόμβος αποτύχει, είναι δυνατή μια απρόσκοπτη μετάβαση στον κόμβο αντιγράφου στην ουρά κατοπτρισμού.

  • Λειτουργία συμπλέγματος
    Το RabbitMQ μπορεί να τρέξει σε λειτουργία συμπλέγματος, διανέμοντας ουρές και εναλλάκτες σε πολλούς κόμβους για να βελτιώσει τη διαθεσιμότητα και την επεκτασιμότητα του συστήματος. Οι κόμβοι στο σύμπλεγμα μπορούν να επικοινωνούν μεταξύ τους και να μοιράζονται μεταδεδομένα μηνυμάτων και ουράς.

2. Συνέπεια μηνύματος

  • Ευχαριστίες μηνυμάτων
    Οι καταναλωτές μπορούν να αναγνωρίσουν τα επεξεργασμένα μηνύματα για να διασφαλίσουν ότι δεν θα χαθούν. Εάν το μήνυμα δεν επιβεβαιωθεί, το RabbitMQ το επαναφέρει στην ουρά για επεξεργασία από άλλους καταναλωτές.

  • Συναλλαγές
    Το RabbitMQ υποστηρίζει τη λειτουργία συναλλαγής AMQP, η οποία επιτρέπει στους παραγωγούς να δημοσιεύουν μηνύματα και να επιβεβαιώνουν μηνύματα σε μια συναλλαγή για να διασφαλίζουν την ατομικότητα και τη συνέπεια των μηνυμάτων.

3. Ανθεκτικότητα μηνύματος

  • Επίμονα μηνύματα
    Το RabbitMQ επιτρέπει στα μηνύματα να επισημαίνονται ως μόνιμα για να διασφαλιστεί ότι τα μηνύματα δεν θα χαθούν μετά την επανεκκίνηση του μεσίτη. Τα μόνιμα μηνύματα εγγράφονται στο δίσκο αντί να αποθηκεύονται απλώς στη μνήμη.

  • Ανθεκτικές ουρές
    Η μόνιμη ουρά παραμένει μετά την επανεκκίνηση του μεσίτη, διασφαλίζοντας ότι τα μεταδεδομένα της ουράς δεν θα χαθούν.

4. Υψηλή απόδοση και συγχρονισμός

  • Ευχαριστίες παρτίδας
    Επιτρέψτε στους καταναλωτές να επιβεβαιώνουν τα μηνύματα σε παρτίδες, να μειώσουν τα έξοδα δικτύου και I/O και να βελτιώσουν την απόδοση.

  • Καταμέτρηση προφόρτωσης
    Ρυθμίζοντας το πλήθος προφόρτωσης, ο καταναλωτής μπορεί να ελέγξει την ταυτόχρονη επεξεργασία των μηνυμάτων λαμβάνοντας νέα μηνύματα από την ουρά μετά την επεξεργασία ενός καθορισμένου αριθμού μηνυμάτων.

5. Πρόσθετα και επεκτάσεις

  • Σύστημα πρόσθετων
    Το RabbitMQ παρέχει ένα ευέλικτο σύστημα προσθήκης και οι χρήστες μπορούν να φορτώνουν και να ξεφορτώνουν πρόσθετα για να αυξήσουν τη λειτουργικότητα. Για παράδειγμα, το πρόσθετο Shovel χρησιμοποιείται για την προώθηση μηνυμάτων σε συμπλέγματα και το πρόσθετο Federation χρησιμοποιείται για την παράδοση μηνυμάτων που διανέμονται σε γεωγραφικές τοποθεσίες.

  • Προσθήκη διαχείρισης
    Παρέχει μια διεπαφή χρήστη βασισμένη στον ιστό για την παρακολούθηση και τη διαχείριση περιπτώσεων RabbitMQ, συμπεριλαμβανομένης της προβολής της κατάστασης ουράς, της διαμόρφωσης διακόπτη, του ρυθμού μηνυμάτων κ.λπ.

6. Ασφάλεια

  • Κρυπτογράφηση TLS/SSL
    Το RabbitMQ υποστηρίζει τη χρήση TLS/SSL για κρυπτογράφηση μετάδοσης μηνυμάτων για να διασφαλιστεί η ασφάλεια των μηνυμάτων κατά τη μετάδοση.

  • Έλεγχος πρόσβασης
    Το RabbitMQ παρέχει έναν μηχανισμό ελέγχου πρόσβασης που βασίζεται σε χρήστες, ρόλους και δικαιώματα, επιτρέποντας στους διαχειριστές να διαμορφώνουν λεπτομερείς άδειες πρόσβασης.

7. Δρομολόγηση και ανταλλαγή μηνυμάτων

  • Διαφορετικοί τύποι ανταλλαγών (Ανταλλαγές)
    Το RabbitMQ υποστηρίζει πολλαπλούς τύπους διακοπτών, συμπεριλαμβανομένων των μεταγωγέων Direct, Topic, Fanout και Headers για την κάλυψη διαφορετικών αναγκών δρομολόγησης μηνυμάτων.

  • Δεσίματα
    Συνδέστε ουρές και ανταλλαγές μέσω δεσμεύσεων για να εφαρμόσετε σύνθετες στρατηγικές δρομολόγησης μηνυμάτων.

8. Παρακολούθηση και Διαχείριση

  • Δείκτες παρακολούθησης (μετρήσεις)
    Το RabbitMQ παρέχει λεπτομερείς ενδείξεις παρακολούθησης, συμπεριλαμβανομένου του ρυθμού μηνυμάτων, του μήκους της ουράς, του αριθμού των συνδέσεων κ.λπ., για να βοηθήσει τους διαχειριστές να κατανοήσουν την κατάσταση λειτουργίας του συστήματος.

  • Συναγερμοί και Ειδοποιήσεις
    Το RabbitMQ μπορεί να διαμορφώσει συναγερμούς για να ενεργοποιούν ειδοποιήσεις όταν το μήκος της ουράς υπερβαίνει ένα όριο ή όταν ένας κόμβος αποτυγχάνει.

9. Επανάληψη μηνυμάτων και ουρές νεκρών γραμμάτων (Επανάληψη και ουρές νεκρών γραμμάτων)

  • Ανταλλαγές νεκρών επιστολών και ουρές
    Όταν ένα μήνυμα δεν μπορεί να καταναλωθεί ή έχει ξεπεραστεί ο αριθμός των επαναλήψεων, μπορεί να προωθηθεί στην ουρά νεκρών γραμμάτων για περαιτέρω επεξεργασία.

  • Μήνυμα Επανάληψη
    Υποστηρίζει τη διαμόρφωση στρατηγικών επανάληψης μηνυμάτων για να διασφαλιστεί ότι η κατανάλωση μπορεί να δοκιμαστεί ξανά όταν η κατανάλωση αποτυγχάνει.

10. Hybrid Cloud και Cross-Datacenter

  • Αντιγραφή πολλαπλών κέντρων δεδομένων
    Μέσω προσθηκών ή μη αυτόματης διαμόρφωσης, το RabbitMQ υποστηρίζει την αναπαραγωγή μηνυμάτων μεταξύ διαφορετικών κέντρων δεδομένων για να εξασφαλίσει υψηλή διαθεσιμότητα δεδομένων και δυνατότητες ανάκτησης από καταστροφές.

Αυτά τα προηγμένα χαρακτηριστικά καθιστούν το RabbitMQ ένα ισχυρό και ευέλικτο ενδιάμεσο λογισμικό ανταλλαγής μηνυμάτων, κατάλληλο για διάφορα πολύπλοκα κατανεμημένα συστήματα και σενάρια εφαρμογών. Χρησιμοποιώντας ορθολογικά αυτές τις λειτουργίες, μπορεί να κατασκευαστεί ένα σύστημα ανταλλαγής μηνυμάτων υψηλής απόδοσης, υψηλής διαθεσιμότητας και κλιμάκωσης.

Πώς να εφαρμόσετε κοινές προηγμένες λειτουργίες την Άνοιξη:

1. Εγκατάσταση και διαμόρφωση

Αρχικά, βεβαιωθείτε ότι έχετε εισαγάγει την εξάρτηση Spring AMQP στο έργο σας:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

υπάρχει application.properties Διαμορφώστε τις πληροφορίες σύνδεσης RabbitMQ στο αρχείο:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

2. Δηλώστε ουρές, εναλλάκτες και δεσμεύσεις

Την Άνοιξη, οι ουρές, οι εναλλάκτες και οι δεσμευτικές σχέσεις μπορούν να οριστούν μέσω του @Bean:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    static final String queueName = "testQueue";
    static final String exchangeName = "testExchange";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. Επιβεβαίωση μηνύματος

Μη αυτόματη επιβεβαίωση μηνύματος

Οι καταναλωτές μπορούν να επιβεβαιώσουν χειροκίνητα τα μηνύματα για να εξασφαλίσουν την αξιοπιστία της επεξεργασίας των μηνυμάτων.χρήση@RabbitListener Όταν σχολιάζετε, μπορείτε να διαμορφώσετεacknowledgeMode ΓιαMANUALκαι επιβεβαιώστε το μήνυμα με μη αυτόματο τρόπο στη μέθοδο:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.Acknowledgment;

public class RabbitMQReceiver {

    @RabbitListener(queues = "testQueue", ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息
            System.out.println("Received message: " + new String(message.getBody()));
            
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. Συναλλαγή μηνύματος

Υλοποιήστε υποστήριξη συναλλαγών μέσω του RabbitTemplate:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendMessage(String message) {
        // 发送消息
        rabbitTemplate.convertAndSend("testExchange", "testRoutingKey", message);

        // 模拟事务回滚
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5. Ουρά νεκρών επιστολών

Διαμορφώστε την ουρά νεκρών γραμμάτων και τις δεσμεύσεις της:

@Bean
Queue dlq() {
    return new Queue("dlq", true);
}

@Bean
Binding dlqBinding() {
    return BindingBuilder.bind(dlq()).to(exchange()).with("dlqRoutingKey");
}

@Bean
Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("mainQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6. Ουρά καθυστέρησης

Χρησιμοποιήστε μια προσθήκη για να εφαρμόσετε μια ουρά καθυστέρησης και μπορείτε να επιτύχετε καθυστερημένη παράδοση μηνύματος διαμορφώνοντας το TTL (Time To Live) του μηνύματος:

@Bean
Queue delayedQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 消息的 TTL 为 60 秒
    args.put("x-dead-letter-exchange", exchangeName);
    args.put("x-dead-letter-routing-key", "dlqRoutingKey");
    return new Queue("delayedQueue", true, false, false, args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Ταυτόχρονα καταναλωτές

μέσω διαμόρφωσης SimpleRabbitListenerContainerFactory Εφαρμογή ταυτόχρονων καταναλωτών:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3); // 并发消费者数量
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

8. Προσθήκες και επεκτάσεις

Επωφεληθείτε από τις λειτουργίες plug-in του RabbitMQ, όπως η χρήση της προσθήκης Shovel για την προώθηση μηνυμάτων πολλαπλών συστάδων ή η χρήση της προσθήκης διαχείρισης για παρακολούθηση και διαχείριση.