Technology Sharing

SSE (Server-Send-Event) server-side data push technology

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

SSE (Server-Send-Event) server-side data push technology

Have you ever encountered a situation where the server needs to actively transmit data to the client? There are currently three solutions.

  1. The client polls for updated data.
  2. The server and the client establish a Socket connection for two-way communication
  3. The server and the client establish a one-way communication via SSE connection

Comparison of several solutions:

  1. polling:

    The client requests data from the server through frequent requests to achieve a similar effect to real-time updates. The advantage of polling is that it is simple to implement, but it will bring additional pressure to the server and the network, and the delay is higher.

  2. WebSocket Connection:

    The server and the client establish a Socket connection for data transmission. The Socket transmission mode is full-duplex. WebSocket is a long connection based on TCP. Compared with the HTTP protocol, it can achieve lightweight and low-latency data transmission. It is very suitable for real-time communication scenarios and is mainly used for highly interactive two-way communication.

  3. SSE Push:

    SSE (Server-Sent Events) is a push technology based on the HTTP protocol that only allows one-way communication. Compared with WebSocket, SSE is simpler and more lightweight.

Here are the steps and sample code for using SSE in SpringBoot

  1. Configuration Dependencies

    	    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-validation</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    SSE has been integrated into spring-web, so it can be used directly.

  2. Backend code

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {
    
        @Autowired
        private SseService service;
    
    
        @GetMapping("/testSse")
        public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            final SseEmitter emitter = service.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    service.send(clientId);
                    log.info("建立连接成功!clientId = {}", clientId);
                } catch (Exception e) {
                    log.error("推送数据异常");
                }
            });
            return emitter;
        }
    
    
        @GetMapping("/sseConection")
        public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            return service.getConn(clientId);
        }
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam("clientId") String clientId) {
            try {
                // 异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.send(clientId);
                    } catch (Exception e) {
                        log.error("推送数据异常");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @GetMapping("/sendMsgToAll")
        public void sendMsgToAll() {
            try {
                //异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.sendToAll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @GetMapping("closeConn/{clientId}")
        public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            service.closeConn(clientId);
            return "连接已关闭";
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    package com.wry.wry_test.service;
    
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    
    public interface SseService {
    
    
        /**
         * 获取连接
         * @param clientId 客户端id
         * @return
         */
        SseEmitter getConn(String clientId);
    
        /**
         *  发送消息到指定客户端
         * @param clientId 客户端id
         * @throws Exception
         */
        void send(String clientId);
    
        /**
         * 发送消息到所有SSE客户端
         * @throws Exception
         */
        void sendToAll() throws Exception;
    
        /**
         * 关闭指定客户端的连接
         * @param clientId 客户端id
         */
        void closeConn(String clientId);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    package com.wry.wry_test.service.impl;
    
    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    @Service
    @Slf4j
    public class SseServiceImpl implements SseService {
    
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    log.info("连接已超时,正准备关闭,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    log.info("连接已关闭,正准备释放,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                    log.info("连接已释放,clientId = {}", clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
                log.info("建立连接成功!clientId = {}", clientId);
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         * @throws IOException 异常
         */
        @Override
        public void send(@NotBlank String clientId) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
            if (emitter == null) return;
    
            // 开始推送数据
            // todo 模拟推送数据
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                try {
                    this.sseSend(emitter, msg, clientId);
                    Thread.sleep(1000);
                } catch (Exception e) {
                    log.error("推送数据异常", e);
                    break;
                }
            }
    
            log.info("推送数据结束,clientId = {}", clientId);
            // 结束推流
            emitter.complete();
        }
    
        /**
         * 发送数据给所有连接
         */
        public void sendToAll() {
            List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                this.sseSend(emitters, msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter  sse长连接
         * @param data     发送数据
         * @param clientId 客户端id
         */
        private void sseSend(SseEmitter emitter, Object data, String clientId) {
            try {
                emitter.send(data);
                log.info("推送数据成功,clientId = {}", clientId);
            } catch (Exception e) {
                log.error("推送数据异常", e);
                throw new RuntimeException("推送数据异常");
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter sse长连接
         * @param data    发送数据
         */
        private void sseSend(List<SseEmitter> emitter, Object data) {
    
            emitter.forEach(e -> {
                try {
                    e.send(data);
                } catch (IOException ioException) {
                    log.error("推送数据异常", ioException);
                }
            });
            log.info("推送数据成功");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    The implementation effect is as follows: the server continuously pushes data to the front end, and the front end can also call the interface to actively close the connection.

    image-20240710180401231

Applicable scenarios: Since SSE is a one-way communication on the server side, it is suitable for those that require a one-way persistent connection. For example:

  • ChatGPT loads conversation data in real time
  • File download, asynchronous download of files via SSE
  • Real-time data push on the server