Technology Sharing

Using Redis to implement message queues: List, Pub/Sub and Stream practices

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina


Summary

Redis is a high-performance key-value storage system. Its multiple data structures make it an ideal choice for implementing message queues. This article will explore how to use Redis's List, Pub/Sub, and Stream data structures to implement an efficient message queue system.

1. Basic concepts of message queues

A message queue is a mechanism for communication between applications, allowing applications to send and receive messages in an asynchronous manner. It is used in distributed systems to decouple service components and improve the scalability and reliability of the system.

2. Advantages of Redis as a message queue

  • high performance: Redis is a memory-based operation with extremely fast reading and writing speeds.
  • Multiple data structures: Supports multiple data structures such as List, Set, Pub/Sub, etc., suitable for different usage scenarios.
  • Persistence: Supports data persistence and ensures that messages are not lost.
  • Atomic Operations: Supports transactions and atomic operations to ensure the consistency of message queue operations.

3. Use List to implement message queue

List is one of the basic data structures in Redis and can be used as a simple message queue.

3.1 Basic Operations

  • Producer:useLPUSHThe command inserts the message into the head of the List.
  • consumer:useBRPOPThe command gets messages from the tail of the List in a blocking manner.

3.2 Implementation Example

// 生产者
jedis.lpush("queue", "message");

// 消费者
String message = jedis.brpop(0, "queue");
  • 1
  • 2
  • 3
  • 4
  • 5

4. Use Pub/Sub to implement publish/subscribe mode

Pub/Sub is a message publishing and subscription model that can achieve one-to-many message delivery.

4.1 Basic Operations

  • announcer:usePUBLISHCommand publishes a message to the specified channel.
  • subscriber:useSUBSCRIBESubscribe to a channel and receive messages.

4.2 Implementation Example

// 发布者
jedis.publish("channel", "message");

// 订阅者
jedis.subscribe(new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Received: " + message);
    }
}, "channel");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5. Use Stream to implement message queue

Stream is a new persistent data structure introduced in Redis 5.0, designed specifically for message queues and log functions.

5.1 Basic Operations

  • Producer:useXADDCommand to add a message to a Stream.
  • consumer:useXREADCommand to read messages from a Stream.

5.2 Implementation Example

// 生产者
String messageId = jedis.xadd("stream", StreamEntry.entry("field1", "value1"));

// 消费者
List<Map.Entry<String, String>> messages = jedis.xread(StreamsXReadParams.STREAMS.entry("stream", messageId));
  • 1
  • 2
  • 3
  • 4
  • 5

5.3 Using Lua scripts and Redis Stream to implement efficient message queues

1. Advantages of Lua scripts in Redis
  • Atomicity: Lua scripts are executed inside Redis, ensuring the atomicity of operations.
  • performance: Reduces the number of network round trips and improves execution efficiency.
  • flexibility: You can write complex logic to adapt to different business needs.
2. Use Lua scripts to operate Redis Stream
2.1 Basic Operations
  • Producer:useXADDCommand to add a message to a Stream.
  • consumer:useXREADCommand to read messages from a Stream.
  • Consumer Group:useXREADGROUPThe command implements the functionality of the consumer group.
2.2 Lua script example

The following is a simple Lua script example that implements the basic operations of producers and consumers.

-- 生产者脚本
local function produce(streamKey, message)
    local result = redis.call('XADD', streamKey, '*', 'message', message)
    return result
end

-- 消费者脚本
local function consume(streamKey, groupName, consumerName)
    local result = redis.call('XREADGROUP', 'GROUP', groupName, consumerName, 'BLOCK', 5000, 'STREAMS', streamKey, 0)
    return result
end

-- 调用脚本
local streamKey = 'myStream'
local message = 'Hello, Redis Stream!'
local groupName = 'myGroup'
local consumerName = 'myConsumer'

-- 生产消息
local messageId = produce(streamKey, message)

-- 消费消息
local messages = consume(streamKey, groupName, consumerName)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3. Use of Consumer Groups

Consumer groups are a feature of Redis Stream that allow multiple consumer instances to coordinate their work and consume messages from the Stream together.

3.1 Create a consumer group
redis.call('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
  • 1
3.2 Reading of Consumer Groups
redis.call('XREADGROUP', 'GROUP', groupName, consumerName, 'BLOCK', 5000, 'STREAMS', streamKey, '>')
  • 1

4. Conclusion

Using Lua scripts and Redis Stream to implement message queues can fully utilize the high performance of Redis and the atomicity of Lua scripts to build an efficient and reliable message queue system. The consumer group feature further enhances the availability and scalability of the message queue.

5. Notes
  • Make sure Lua scripts are adequately tested before execution.
  • Considering the persistence and security of messages, configure the Redis persistence strategy reasonably.
  • In a production environment, monitor the performance and status of the message queue to ensure the stable operation of the system.
6. References

6. Conclusion

Redis provides multiple ways to implement message queues, each of which has its own applicable scenarios. List is suitable for simple queue requirements, Pub/Sub is suitable for publish/subscribe mode, and Stream provides more powerful message queue functions, including persistence, consumer groups and other features.
insert image description here

7. References