一、WebSocket简介
WebSocket是HTML5提供的全双工通信协议,客户端和服务器可以互相主动推送数据。
特点:
- 全双工通信
- 建立在TCP之上
- 与HTTP兼容
- 低开销
适用场景:
二、SpringBoot整合WebSocket
添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
开启WebSocket支持
1 2 3 4 5 6 7 8
| @Configuration public class WebSocketConfig {
@Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
三、基于注解的方式
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Component @ServerEndpoint("/ws/{userId}") public class WebSocketServer {
private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();
@OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { SESSION_MAP.put(userId, session); sendMessage(userId, "连接成功"); }
@OnClose public void onClose(@PathParam("userId") String userId) { SESSION_MAP.remove(userId); }
@OnMessage public void onMessage(String message, @PathParam("userId") String userId) { }
@OnError public void onError(Session session, Throwable error) { error.printStackTrace(); }
public void sendMessage(String userId, String message) { Session session = SESSION_MAP.get(userId); if (session != null && session.isOpen()) { session.getAsyncRemote().sendText(message); } }
public void broadcast(String message) { SESSION_MAP.forEach((userId, session) -> { if (session.isOpen()) { session.getAsyncRemote().sendText(message); } }); } }
|
注意:Spring默认管理单例Bean,而WebSocket每个连接创建一个实例。使用@Autowired注入Bean时需要通过ApplicationContext获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override public void setApplicationContext(ApplicationContext applicationContext) { SpringContextUtil.applicationContext = applicationContext; }
public static <T> T getBean(Class<T> clazz) { return applicationContext.getBean(clazz); } }
|
在WebSocket类中使用:
1 2 3 4 5 6
| private MessageService messageService;
@OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { messageService = SpringContextUtil.getBean(MessageService.class); }
|
四、前端连接
原生WebSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| let ws = null;
function connect(userId) { ws = new WebSocket(`ws://localhost:8080/ws/${userId}`);
ws.onopen = function() { console.log('连接建立'); };
ws.onmessage = function(event) { let message = JSON.parse(event.data); console.log('收到消息:', message); };
ws.onclose = function() { console.log('连接关闭'); setTimeout(() => connect(userId), 3000); };
ws.onerror = function() { console.log('连接错误'); }; }
function send(message) { if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(message)); } }
|
五、消息格式设计
统一消息格式
1 2 3 4 5 6 7 8 9
| @Data public class WebSocketMessage {
private String type; private String from; private String to; private Object data; private Long timestamp; }
|
消息类型
1 2 3 4 5 6
| public interface MessageType { String CHAT = "CHAT"; String SYSTEM = "SYSTEM"; String NOTIFICATION = "NOTIFY"; String HEARTBEAT = "HEARTBEAT"; }
|
消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @OnMessage public void onMessage(String message, @PathParam("userId") String userId) { WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class); switch (wsMessage.getType()) { case MessageType.CHAT: handleChatMessage(userId, wsMessage); break; case MessageType.HEARTBEAT: handleHeartbeat(userId); break; default: break; } }
|
六、心跳机制
服务端
1 2 3 4 5 6 7 8 9
| @Scheduled(fixedRate = 60000) public void checkConnection() { SESSION_MAP.forEach((userId, session) -> { if (!session.isOpen()) { SESSION_MAP.remove(userId); } }); }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| let heartbeatTimer = null;
function startHeartbeat() { heartbeatTimer = setInterval(() => { if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'HEARTBEAT' })); } }, 30000); }
function stopHeartbeat() { if (heartbeatTimer) { clearInterval(heartbeatTimer); } }
|
七、断线重连
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| let reconnectCount = 0; const MAX_RECONNECT = 5;
function connect(userId) { ws = new WebSocket(`ws://localhost:8080/ws/${userId}`);
ws.onclose = function() { if (reconnectCount < MAX_RECONNECT) { setTimeout(() => { reconnectCount++; connect(userId); }, 3000 * reconnectCount); } };
ws.onopen = function() { reconnectCount = 0; }; }
|
八、集群方案
单机WebSocket无法跨节点通信,需要使用消息中间件同步。
基于Redis Pub/Sub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class RedisMessageListener {
@Autowired private WebSocketServer webSocketServer;
@Autowired private RedisTemplate<String, String> redisTemplate;
public void publish(String userId, String message) { redisTemplate.convertAndSend("ws:message", JSON.toJSONString(new WebSocketMessage(userId, message))); }
@RedisListener(topics = "ws:message") public void onMessage(String message) { WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class); webSocketServer.sendMessage(wsMessage.getTo(), wsMessage.getData().toString()); } }
|
基于RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12
| @Component public class RabbitMQMessageListener {
@Autowired private WebSocketServer webSocketServer;
@RabbitListener(queues = "ws.queue") public void onMessage(String message) { WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class); webSocketServer.sendMessage(wsMessage.getTo(), wsMessage.getData().toString()); } }
|
九、安全配置
Token认证
1 2 3
| @ServerEndpoint(value = "/ws/{userId}", configurator = WebSocketAuthConfig.class) public class WebSocketServer { }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class WebSocketAuthConfig extends ServerEndpointConfig.Configurator {
@Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HttpServletResponse response) { String token = request.getParameterMap().get("token").get(0); if (!validateToken(token)) { throw new RuntimeException("认证失败"); } } }
|
十、总结
WebSocket整合SpringBoot要点:
- 使用@ServerEndpoint注解定义端点
- 管理连接Session实现定向推送和广播
- 设计统一消息格式
- 实现心跳和断线重连机制
- 集群环境使用消息中间件同步
- 注意安全认证