τα στοιχεία επικοινωνίας μου
Ταχυδρομείο[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Έχετε συναντήσει ποτέ μια κατάσταση όπου ο διακομιστής πρέπει να μεταδίδει ενεργά δεδομένα στον πελάτη. Υπάρχουν τρεις λύσεις;
Σύγκριση πολλών επιλογών:
σφυγμομέτρηση:
Ο πελάτης ζητά δεδομένα από τον διακομιστή μέσω συχνών αιτημάτων για να επιτύχει ένα αποτέλεσμα παρόμοιο με τις ενημερώσεις σε πραγματικό χρόνο. Το πλεονέκτημα της ψηφοφορίας είναι ότι είναι απλή στην εφαρμογή της, αλλά θα ασκήσει επιπλέον πίεση στον διακομιστή και στο δίκτυο και η καθυστέρηση θα είναι μεγάλη.
Σύνδεση WebSocket:
Ο διακομιστής δημιουργεί μια σύνδεση Socket με τον πελάτη για μετάδοση δεδομένων Η μέθοδος μετάδοσης Socket είναι full-duplex. Το WebSocket είναι μια μακρά σύνδεση που βασίζεται στο TCP, σε σύγκριση με το πρωτόκολλο HTTP, μπορεί να επιτύχει ελαφριά και χαμηλής καθυστέρησης μετάδοση δεδομένων. Είναι πολύ κατάλληλο για σενάρια επικοινωνίας σε πραγματικό χρόνο και χρησιμοποιείται κυρίως για αμφίδρομη επικοινωνία.
ώθηση SSE:
Το SSE (Server-Sent Events) είναι μια τεχνολογία push που βασίζεται στο πρωτόκολλο HTTP που επιτρέπει μόνο μονόδρομη επικοινωνία. Σε σύγκριση με το WebSocket, το SSE είναι απλούστερο και πιο ελαφρύ.
Ακολουθούν τα βήματα και το δείγμα κώδικα για τη χρήση του SSE με το SpringBoot
Εξαρτήσεις διαμόρφωσης
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
Το SSE έχει ενσωματωθεί στο spring-web, ώστε να μπορεί να χρησιμοποιηθεί απευθείας.
κωδικός υποστήριξης
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/sse")
@Slf4j
@Validated
public class SseTestController {
@Autowired
private SseService service;
@GetMapping("/testSse")
public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
final SseEmitter emitter = service.getConn(clientId);
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
log.info("建立连接成功!clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
return emitter;
}
@GetMapping("/sseConection")
public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
return service.getConn(clientId);
}
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam("clientId") String clientId) {
try {
// 异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.send(clientId);
} catch (Exception e) {
log.error("推送数据异常");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("/sendMsgToAll")
public void sendMsgToAll() {
try {
//异步发送消息
CompletableFuture.runAsync(() -> {
try {
service.sendToAll();
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("closeConn/{clientId}")
public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
service.closeConn(clientId);
return "连接已关闭";
}
}
package com.wry.wry_test.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
public interface SseService {
/**
* 获取连接
* @param clientId 客户端id
* @return
*/
SseEmitter getConn(String clientId);
/**
* 发送消息到指定客户端
* @param clientId 客户端id
* @throws Exception
*/
void send(String clientId);
/**
* 发送消息到所有SSE客户端
* @throws Exception
*/
void sendToAll() throws Exception;
/**
* 关闭指定客户端的连接
* @param clientId 客户端id
*/
void closeConn(String clientId);
}
package com.wry.wry_test.service.impl;
import com.wry.wry_test.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotBlank;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class SseServiceImpl implements SseService {
private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
@Override
public SseEmitter getConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
return sseEmitter;
} else {
// 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
final SseEmitter emitter = new SseEmitter(600_000L);
// 注册超时回调,超时后触发
emitter.onTimeout(() -> {
log.info("连接已超时,正准备关闭,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
});
// 注册完成回调,调用 emitter.complete() 触发
emitter.onCompletion(() -> {
log.info("连接已关闭,正准备释放,clientId = {}", clientId);
SSE_CACHE.remove(clientId);
log.info("连接已释放,clientId = {}", clientId);
});
// 注册异常回调,调用 emitter.completeWithError() 触发
emitter.onError(throwable -> {
log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
SSE_CACHE.remove(clientId);
});
SSE_CACHE.put(clientId, emitter);
log.info("建立连接成功!clientId = {}", clientId);
return emitter;
}
}
/**
* 模拟类似于 chatGPT 的流式推送回答
*
* @param clientId 客户端 id
* @throws IOException 异常
*/
@Override
public void send(@NotBlank String clientId) {
final SseEmitter emitter = SSE_CACHE.get(clientId);
if (emitter == null) return;
// 开始推送数据
// todo 模拟推送数据
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
try {
this.sseSend(emitter, msg, clientId);
Thread.sleep(1000);
} catch (Exception e) {
log.error("推送数据异常", e);
break;
}
}
log.info("推送数据结束,clientId = {}", clientId);
// 结束推流
emitter.complete();
}
/**
* 发送数据给所有连接
*/
public void sendToAll() {
List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
for (int i = 0; i < 10000000; i++) {
String msg = "SSE 测试数据";
this.sseSend(emitters, msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void closeConn(@NotBlank String clientId) {
final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
* @param clientId 客户端id
*/
private void sseSend(SseEmitter emitter, Object data, String clientId) {
try {
emitter.send(data);
log.info("推送数据成功,clientId = {}", clientId);
} catch (Exception e) {
log.error("推送数据异常", e);
throw new RuntimeException("推送数据异常");
}
}
/**
* 推送数据封装
*
* @param emitter sse长连接
* @param data 发送数据
*/
private void sseSend(List<SseEmitter> emitter, Object data) {
emitter.forEach(e -> {
try {
e.send(data);
} catch (IOException ioException) {
log.error("推送数据异常", ioException);
}
});
log.info("推送数据成功");
}
}
Το αποτέλεσμα υλοποίησης είναι το εξής: ο διακομιστής ωθεί συνεχώς δεδομένα στο μπροστινό μέρος και το μπροστινό μέρος μπορεί επίσης να καλέσει τη διεπαφή για να κλείσει ενεργά τη σύνδεση.
Ισχύοντα σενάρια: Δεδομένου ότι το SSE είναι μια μονόδρομη επικοινωνία μεταξύ του διακομιστή και του διακομιστή, είναι κατάλληλο για συνδέσεις που απαιτούν μονόδρομη επιμονή. για παράδειγμα: