Partage de technologie

Technologie de données push côté serveur SSE (Server-Send-Event)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Technologie de données push côté serveur SSE (Server-Send-Event)

Avez-vous déjà rencontré une situation où le serveur doit transmettre activement des données au client. Il existe actuellement trois solutions.

  1. Le client interroge les données mises à jour.
  2. Le serveur et le client établissent une connexion Socket pour une communication bidirectionnelle.
  3. Le serveur établit une connexion unidirectionnelle SSE avec le client

Comparaison de plusieurs options :

  1. vote:

    Le client demande des données au serveur via des requêtes fréquentes pour obtenir un effet similaire aux mises à jour en temps réel. L’avantage du polling est qu’il est simple à mettre en œuvre, mais il mettra une pression supplémentaire sur le serveur et le réseau, et le délai sera important.

  2. Connexion WebSocket :

    Le serveur établit une connexion Socket avec le client pour la transmission des données. La méthode de transmission Socket est en duplex intégral. WebSocket est une connexion longue basée sur TCP. Par rapport au protocole HTTP, il permet une transmission de données légère et à faible latence. Il est très approprié pour les scénarios de communication en temps réel et est principalement utilisé pour une communication bidirectionnelle hautement interactive.

  3. Poussée ESS :

    SSE (Server-Sent Events) est une technologie push basée sur le protocole HTTP qui permet uniquement une communication unidirectionnelle. Comparé à WebSocket, SSE est plus simple et plus léger.

Voici les étapes et un exemple de code pour utiliser SSE avec SpringBoot

  1. Dépendances de configuration

    	    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-validation</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    SSE a été intégré à spring-web, il peut donc être utilisé directement.

  2. code back-end

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {
    
        @Autowired
        private SseService service;
    
    
        @GetMapping("/testSse")
        public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            final SseEmitter emitter = service.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    service.send(clientId);
                    log.info("建立连接成功!clientId = {}", clientId);
                } catch (Exception e) {
                    log.error("推送数据异常");
                }
            });
            return emitter;
        }
    
    
        @GetMapping("/sseConection")
        public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            return service.getConn(clientId);
        }
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam("clientId") String clientId) {
            try {
                // 异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.send(clientId);
                    } catch (Exception e) {
                        log.error("推送数据异常");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @GetMapping("/sendMsgToAll")
        public void sendMsgToAll() {
            try {
                //异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.sendToAll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @GetMapping("closeConn/{clientId}")
        public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            service.closeConn(clientId);
            return "连接已关闭";
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    package com.wry.wry_test.service;
    
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    
    public interface SseService {
    
    
        /**
         * 获取连接
         * @param clientId 客户端id
         * @return
         */
        SseEmitter getConn(String clientId);
    
        /**
         *  发送消息到指定客户端
         * @param clientId 客户端id
         * @throws Exception
         */
        void send(String clientId);
    
        /**
         * 发送消息到所有SSE客户端
         * @throws Exception
         */
        void sendToAll() throws Exception;
    
        /**
         * 关闭指定客户端的连接
         * @param clientId 客户端id
         */
        void closeConn(String clientId);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    package com.wry.wry_test.service.impl;
    
    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    @Service
    @Slf4j
    public class SseServiceImpl implements SseService {
    
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    log.info("连接已超时,正准备关闭,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    log.info("连接已关闭,正准备释放,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                    log.info("连接已释放,clientId = {}", clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
                log.info("建立连接成功!clientId = {}", clientId);
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         * @throws IOException 异常
         */
        @Override
        public void send(@NotBlank String clientId) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
            if (emitter == null) return;
    
            // 开始推送数据
            // todo 模拟推送数据
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                try {
                    this.sseSend(emitter, msg, clientId);
                    Thread.sleep(1000);
                } catch (Exception e) {
                    log.error("推送数据异常", e);
                    break;
                }
            }
    
            log.info("推送数据结束,clientId = {}", clientId);
            // 结束推流
            emitter.complete();
        }
    
        /**
         * 发送数据给所有连接
         */
        public void sendToAll() {
            List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                this.sseSend(emitters, msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter  sse长连接
         * @param data     发送数据
         * @param clientId 客户端id
         */
        private void sseSend(SseEmitter emitter, Object data, String clientId) {
            try {
                emitter.send(data);
                log.info("推送数据成功,clientId = {}", clientId);
            } catch (Exception e) {
                log.error("推送数据异常", e);
                throw new RuntimeException("推送数据异常");
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter sse长连接
         * @param data    发送数据
         */
        private void sseSend(List<SseEmitter> emitter, Object data) {
    
            emitter.forEach(e -> {
                try {
                    e.send(data);
                } catch (IOException ioException) {
                    log.error("推送数据异常", ioException);
                }
            });
            log.info("推送数据成功");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    L'effet de mise en œuvre est le suivant : le serveur pousse continuellement les données vers le front-end, et le front-end peut également appeler l'interface pour fermer activement la connexion.

    image-20240710180401231

Scénarios applicables : étant donné que SSE est une communication unidirectionnelle côté serveur, il convient aux connexions qui nécessitent une persistance unidirectionnelle. Par exemple:

  • ChatGPT charge les données de session en temps réel
  • Téléchargement de fichiers, téléchargement de fichiers de manière asynchrone via SSE
  • Transmission de données en temps réel côté serveur