2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
किं भवन्तः कदापि एतादृशी स्थितिं प्राप्नुवन्ति यत्र सर्वरेण क्लायन्ट् प्रति सक्रियरूपेण दत्तांशं प्रसारयितुं आवश्यकता भवति?
अनेकविकल्पानां तुलना : १.
मतदानम् : १.
क्लायन्ट् वास्तविकसमय-अद्यतन-सदृशं प्रभावं प्राप्तुं नित्य-अनुरोधद्वारा सर्वर-तः आँकडानां अनुरोधं करोति । मतदानस्य लाभः अस्ति यत् एतत् कार्यान्वितुं सरलं भवति, परन्तु एतत् सर्वरे, जालपुटे च अतिरिक्तं दबावं जनयिष्यति, विलम्बः च अधिकः भविष्यति ।
WebSocket संयोजनम् : १.
सर्वरः दत्तांशसञ्चारार्थं क्लायन्ट् इत्यनेन सह Socket संयोजनं स्थापयति Socket transmission method is full-duplex । WebSocket TCP आधारितं दीर्घं संयोजनं HTTP प्रोटोकॉलस्य तुलने इदं वास्तविकसमयसञ्चारपरिदृश्यानां कृते अतीव उपयुक्तं भवति तथा च मुख्यतया अत्यन्तं अन्तरक्रियाशीलं द्विपक्षीयसञ्चारार्थं भवति
एसएसई धक्काः : १.
SSE (Server-Sent Events) इति HTTP प्रोटोकॉल आधारितं पुश-प्रौद्योगिकी अस्ति यत् केवलं एकदिशायाः संचारस्य अनुमतिं ददाति । WebSocket इत्यस्य तुलने SSE सरलतरं लघुतरं च भवति ।
SpringBoot इत्यनेन सह SSE इत्यस्य उपयोगाय निम्नलिखितपदार्थाः नमूनासङ्केतः च सन्ति
विन्यासनिर्भरता
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
SSE इत्येतत् spring-web इत्यस्मिन् एकीकृतम् अस्ति, अतः तस्य प्रत्यक्षं उपयोगः कर्तुं शक्यते ।
पृष्ठभागसङ्केतः
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/sse")
@Slf4j
@Validated
public class SseTestController {
@Autowired
private SseService service;
@GetMapping("/testSse")
public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
final SseEmitter emitter = service.getConn(clientId);
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
log.info("建立连接成功!clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
return emitter;
}
@GetMapping("/sseConection")
public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
return service.getConn(clientId);
}
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam("clientId") String clientId) {
try {
// 异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("/sendMsgToAll")
public void sendMsgToAll() {
try {
//异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.sendToAll();
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("closeConn/{clientId}")
public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
service.closeConn(clientId);
return "连接已关闭";
}
}
package com.wry.wry_test.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
public interface SseService {
/**
* 获取连接
* @param clientId 客户端id
* @return
*/
SseEmitter getConn(String clientId);
/**
* 发送消息到指定客户端
* @param clientId 客户端id
* @throws Exception
*/
void send(String clientId);
/**
* 发送消息到所有SSE客户端
* @throws Exception
*/
void sendToAll() throws Exception;
/**
* 关闭指定客户端的连接
* @param clientId 客户端id
*/
void closeConn(String clientId);
}
package com.wry.wry_test.service.impl;
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class SseServiceImpl implements SseService {
private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
@Override
public SseEmitter getConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
return sseEmitter;
} else {
// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
final SseEmitter emitter = new SseEmitter(600_000L);
// 注册超时回调,超时后触发
emitter.onTimeout(() -> {
log.info("连接已超时,正准备关闭,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
});
// 注册完成回调,调用 emitter.complete() 触发
emitter.onCompletion(() -> {
log.info("连接已关闭,正准备释放,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
log.info("连接已释放,clientId = {}", clientId);
});
// 注册异常回调,调用 emitter.completeWithError() 触发
emitter.onError(throwable -> {
log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
SSE_CACHE.remove(clientId);
});
SSE_CACHE.put(clientId, emitter);
log.info("建立连接成功!clientId = {}", clientId);
return emitter;
}
}
/**
* 模拟类似于 chatGPT 的流式推送回答
*
* @param clientId 客户端 id
* @throws IOException 异常
*/
@Override
public void send(@NotBlank String clientId) {
final SseEmitter emitter = SSE_CACHE.get(clientId);
if (emitter == null) return;
// 开始推送数据
// todo 模拟推送数据
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
try {
this.sseSend(emitter, msg, clientId);
Thread.sleep(1000);
} catch (Exception e) {
log.error("推送数据异常", e);
break;
}
}
log.info("推送数据结束,clientId = {}", clientId);
// 结束推流
emitter.complete();
}
/**
* 发送数据给所有连接
*/
public void sendToAll() {
List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
this.sseSend(emitters, msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void closeConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
* @param clientId 客户端id
*/
private void sseSend(SseEmitter emitter, Object data, String clientId) {
try {
emitter.send(data);
log.info("推送数据成功,clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常", e);
throw new RuntimeException("推送数据异常");
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
*/
private void sseSend(List<SseEmitter> emitter, Object data) {
emitter.forEach(e -> {
try {
e.send(data);
} catch (IOException ioException) {
log.error("推送数据异常", ioException);
}
});
log.info("推送数据成功");
}
}
कार्यान्वयनप्रभावः निम्नलिखितरूपेण अस्ति : सर्वरः निरन्तरं दत्तांशं अग्रभागं प्रति धक्कायति, अग्रभागः च सक्रियरूपेण संयोजनं बन्दं कर्तुं अन्तरफलकं आह्वयितुं अपि शक्नोति
प्रयोज्यपरिदृश्यानि : यतः SSE सर्वरपक्षे एकदिशा संचारः अस्ति, अतः एकदिशायाः स्थायित्वस्य आवश्यकतां विद्यमानानाम् संयोजनानां कृते उपयुक्तम् अस्ति । उदाहरणतया: