Compartir tecnología

Tecnología de datos push del lado del servidor SSE (Server-Send-Event)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Tecnología de datos push del lado del servidor SSE (Server-Send-Event)

¿Alguna vez se ha encontrado con una situación en la que el servidor necesita transmitir datos activamente al cliente? Actualmente existen tres soluciones.

  1. El cliente busca datos actualizados.
  2. El servidor y el cliente establecen una conexión Socket para comunicación bidireccional.
  3. El servidor establece una conexión SSE y una comunicación unidireccional con el cliente.

Comparación de varias opciones:

  1. votación:

    El cliente solicita datos del servidor a través de solicitudes frecuentes para lograr un efecto similar a las actualizaciones en tiempo real. La ventaja del sondeo es que es fácil de implementar, pero ejercerá una presión adicional sobre el servidor y la red, y el retraso será alto.

  2. Conexión WebSocket:

    El servidor establece una conexión Socket con el cliente para la transmisión de datos. El método de transmisión Socket es full-duplex. WebSocket es una conexión larga basada en TCP. En comparación con el protocolo HTTP, puede lograr una transmisión de datos liviana y de baja latencia. Es muy adecuada para escenarios de comunicación en tiempo real y se utiliza principalmente para comunicaciones bidireccionales altamente interactivas.

  3. Empuje SSE:

    SSE (Server-Sent Events) es una tecnología push basada en el protocolo HTTP que solo permite la comunicación unidireccional. En comparación con WebSocket, SSE es más simple y liviano.

Los siguientes son los pasos y el código de muestra para usar SSE con SpringBoot

  1. Dependencias de configuración

    	    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-validation</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    SSE se ha integrado en spring-web, por lo que se puede utilizar directamente.

  2. código de fondo

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {
    
        @Autowired
        private SseService service;
    
    
        @GetMapping("/testSse")
        public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            final SseEmitter emitter = service.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    service.send(clientId);
                    log.info("建立连接成功!clientId = {}", clientId);
                } catch (Exception e) {
                    log.error("推送数据异常");
                }
            });
            return emitter;
        }
    
    
        @GetMapping("/sseConection")
        public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            return service.getConn(clientId);
        }
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam("clientId") String clientId) {
            try {
                // 异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.send(clientId);
                    } catch (Exception e) {
                        log.error("推送数据异常");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @GetMapping("/sendMsgToAll")
        public void sendMsgToAll() {
            try {
                //异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.sendToAll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @GetMapping("closeConn/{clientId}")
        public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            service.closeConn(clientId);
            return "连接已关闭";
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    package com.wry.wry_test.service;
    
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    
    public interface SseService {
    
    
        /**
         * 获取连接
         * @param clientId 客户端id
         * @return
         */
        SseEmitter getConn(String clientId);
    
        /**
         *  发送消息到指定客户端
         * @param clientId 客户端id
         * @throws Exception
         */
        void send(String clientId);
    
        /**
         * 发送消息到所有SSE客户端
         * @throws Exception
         */
        void sendToAll() throws Exception;
    
        /**
         * 关闭指定客户端的连接
         * @param clientId 客户端id
         */
        void closeConn(String clientId);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    package com.wry.wry_test.service.impl;
    
    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    @Service
    @Slf4j
    public class SseServiceImpl implements SseService {
    
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    log.info("连接已超时,正准备关闭,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    log.info("连接已关闭,正准备释放,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                    log.info("连接已释放,clientId = {}", clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
                log.info("建立连接成功!clientId = {}", clientId);
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         * @throws IOException 异常
         */
        @Override
        public void send(@NotBlank String clientId) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
            if (emitter == null) return;
    
            // 开始推送数据
            // todo 模拟推送数据
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                try {
                    this.sseSend(emitter, msg, clientId);
                    Thread.sleep(1000);
                } catch (Exception e) {
                    log.error("推送数据异常", e);
                    break;
                }
            }
    
            log.info("推送数据结束,clientId = {}", clientId);
            // 结束推流
            emitter.complete();
        }
    
        /**
         * 发送数据给所有连接
         */
        public void sendToAll() {
            List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                this.sseSend(emitters, msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter  sse长连接
         * @param data     发送数据
         * @param clientId 客户端id
         */
        private void sseSend(SseEmitter emitter, Object data, String clientId) {
            try {
                emitter.send(data);
                log.info("推送数据成功,clientId = {}", clientId);
            } catch (Exception e) {
                log.error("推送数据异常", e);
                throw new RuntimeException("推送数据异常");
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter sse长连接
         * @param data    发送数据
         */
        private void sseSend(List<SseEmitter> emitter, Object data) {
    
            emitter.forEach(e -> {
                try {
                    e.send(data);
                } catch (IOException ioException) {
                    log.error("推送数据异常", ioException);
                }
            });
            log.info("推送数据成功");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    El efecto de implementación es el siguiente: el servidor envía datos continuamente al front-end, y el front-end también puede llamar a la interfaz para cerrar activamente la conexión.

    imagen-20240710180401231

Escenarios aplicables: dado que SSE es una comunicación unidireccional entre el servidor y el servidor, es adecuada para conexiones que requieren persistencia unidireccional. Por ejemplo:

  • ChatGPT carga datos de sesión en tiempo real
  • Descarga de archivos, descarga archivos de forma asincrónica a través de SSE
  • Envío de datos en tiempo real del lado del servidor