2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Avez-vous déjà rencontré une situation où le serveur doit transmettre activement des données au client. Il existe actuellement trois solutions.
Comparaison de plusieurs options :
vote:
Le client demande des données au serveur via des requêtes fréquentes pour obtenir un effet similaire aux mises à jour en temps réel. L’avantage du polling est qu’il est simple à mettre en œuvre, mais il mettra une pression supplémentaire sur le serveur et le réseau, et le délai sera important.
Connexion WebSocket :
Le serveur établit une connexion Socket avec le client pour la transmission des données. La méthode de transmission Socket est en duplex intégral. WebSocket est une connexion longue basée sur TCP. Par rapport au protocole HTTP, il permet une transmission de données légère et à faible latence. Il est très approprié pour les scénarios de communication en temps réel et est principalement utilisé pour une communication bidirectionnelle hautement interactive.
Poussée ESS :
SSE (Server-Sent Events) est une technologie push basée sur le protocole HTTP qui permet uniquement une communication unidirectionnelle. Comparé à WebSocket, SSE est plus simple et plus léger.
Voici les étapes et un exemple de code pour utiliser SSE avec SpringBoot
Dépendances de configuration
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
SSE a été intégré à spring-web, il peut donc être utilisé directement.
code back-end
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/sse")
@Slf4j
@Validated
public class SseTestController {
@Autowired
private SseService service;
@GetMapping("/testSse")
public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
final SseEmitter emitter = service.getConn(clientId);
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
log.info("建立连接成功!clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
return emitter;
}
@GetMapping("/sseConection")
public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
return service.getConn(clientId);
}
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam("clientId") String clientId) {
try {
// 异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("/sendMsgToAll")
public void sendMsgToAll() {
try {
//异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.sendToAll();
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("closeConn/{clientId}")
public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
service.closeConn(clientId);
return "连接已关闭";
}
}
package com.wry.wry_test.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
public interface SseService {
/**
* 获取连接
* @param clientId 客户端id
* @return
*/
SseEmitter getConn(String clientId);
/**
* 发送消息到指定客户端
* @param clientId 客户端id
* @throws Exception
*/
void send(String clientId);
/**
* 发送消息到所有SSE客户端
* @throws Exception
*/
void sendToAll() throws Exception;
/**
* 关闭指定客户端的连接
* @param clientId 客户端id
*/
void closeConn(String clientId);
}
package com.wry.wry_test.service.impl;
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class SseServiceImpl implements SseService {
private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
@Override
public SseEmitter getConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
return sseEmitter;
} else {
// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
final SseEmitter emitter = new SseEmitter(600_000L);
// 注册超时回调,超时后触发
emitter.onTimeout(() -> {
log.info("连接已超时,正准备关闭,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
});
// 注册完成回调,调用 emitter.complete() 触发
emitter.onCompletion(() -> {
log.info("连接已关闭,正准备释放,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
log.info("连接已释放,clientId = {}", clientId);
});
// 注册异常回调,调用 emitter.completeWithError() 触发
emitter.onError(throwable -> {
log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
SSE_CACHE.remove(clientId);
});
SSE_CACHE.put(clientId, emitter);
log.info("建立连接成功!clientId = {}", clientId);
return emitter;
}
}
/**
* 模拟类似于 chatGPT 的流式推送回答
*
* @param clientId 客户端 id
* @throws IOException 异常
*/
@Override
public void send(@NotBlank String clientId) {
final SseEmitter emitter = SSE_CACHE.get(clientId);
if (emitter == null) return;
// 开始推送数据
// todo 模拟推送数据
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
try {
this.sseSend(emitter, msg, clientId);
Thread.sleep(1000);
} catch (Exception e) {
log.error("推送数据异常", e);
break;
}
}
log.info("推送数据结束,clientId = {}", clientId);
// 结束推流
emitter.complete();
}
/**
* 发送数据给所有连接
*/
public void sendToAll() {
List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
this.sseSend(emitters, msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void closeConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
* @param clientId 客户端id
*/
private void sseSend(SseEmitter emitter, Object data, String clientId) {
try {
emitter.send(data);
log.info("推送数据成功,clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常", e);
throw new RuntimeException("推送数据异常");
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
*/
private void sseSend(List<SseEmitter> emitter, Object data) {
emitter.forEach(e -> {
try {
e.send(data);
} catch (IOException ioException) {
log.error("推送数据异常", ioException);
}
});
log.info("推送数据成功");
}
}
L'effet de mise en œuvre est le suivant : le serveur pousse continuellement les données vers le front-end, et le front-end peut également appeler l'interface pour fermer activement la connexion.
Scénarios applicables : étant donné que SSE est une communication unidirectionnelle côté serveur, il convient aux connexions qui nécessitent une persistance unidirectionnelle. Par exemple: