一、WebSocket简介

WebSocket是HTML5提供的全双工通信协议,客户端和服务器可以互相主动推送数据。

特点:

  • 全双工通信
  • 建立在TCP之上
  • 与HTTP兼容
  • 低开销

适用场景:

  • 实时消息推送
  • 在线聊天
  • 实时数据监控
  • 协同编辑

二、SpringBoot整合WebSocket

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

开启WebSocket支持

1
2
3
4
5
6
7
8
@Configuration
public class WebSocketConfig {

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

三、基于注解的方式

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {

// 在线连接
private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();

/**
* 连接建立成功
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
SESSION_MAP.put(userId, session);
sendMessage(userId, "连接成功");
}

/**
* 连接关闭
*/
@OnClose
public void onClose(@PathParam("userId") String userId) {
SESSION_MAP.remove(userId);
}

/**
* 收到消息
*/
@OnMessage
public void onMessage(String message, @PathParam("userId") String userId) {
// 处理消息
}

/**
* 发生错误
*/
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}

/**
* 发送消息给指定用户
*/
public void sendMessage(String userId, String message) {
Session session = SESSION_MAP.get(userId);
if (session != null && session.isOpen()) {
session.getAsyncRemote().sendText(message);
}
}

/**
* 广播消息
*/
public void broadcast(String message) {
SESSION_MAP.forEach((userId, session) -> {
if (session.isOpen()) {
session.getAsyncRemote().sendText(message);
}
});
}
}

注意:Spring默认管理单例Bean,而WebSocket每个连接创建一个实例。使用@Autowired注入Bean时需要通过ApplicationContext获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class SpringContextUtil implements ApplicationContextAware {

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextUtil.applicationContext = applicationContext;
}

public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}

在WebSocket类中使用:

1
2
3
4
5
6
private MessageService messageService;

@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
messageService = SpringContextUtil.getBean(MessageService.class);
}

四、前端连接

原生WebSocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let ws = null;

function connect(userId) {
ws = new WebSocket(`ws://localhost:8080/ws/${userId}`);

ws.onopen = function() {
console.log('连接建立');
};

ws.onmessage = function(event) {
let message = JSON.parse(event.data);
console.log('收到消息:', message);
};

ws.onclose = function() {
console.log('连接关闭');
// 自动重连
setTimeout(() => connect(userId), 3000);
};

ws.onerror = function() {
console.log('连接错误');
};
}

function send(message) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}

五、消息格式设计

统一消息格式

1
2
3
4
5
6
7
8
9
@Data
public class WebSocketMessage {

private String type; // 消息类型
private String from; // 发送者
private String to; // 接收者
private Object data; // 消息内容
private Long timestamp; // 时间戳
}

消息类型

1
2
3
4
5
6
public interface MessageType {
String CHAT = "CHAT"; // 聊天消息
String SYSTEM = "SYSTEM"; // 系统消息
String NOTIFICATION = "NOTIFY"; // 通知消息
String HEARTBEAT = "HEARTBEAT"; // 心跳消息
}

消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@OnMessage
public void onMessage(String message, @PathParam("userId") String userId) {
WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class);

switch (wsMessage.getType()) {
case MessageType.CHAT:
handleChatMessage(userId, wsMessage);
break;
case MessageType.HEARTBEAT:
handleHeartbeat(userId);
break;
default:
break;
}
}

六、心跳机制

服务端

1
2
3
4
5
6
7
8
9
// 定时检测连接
@Scheduled(fixedRate = 60000)
public void checkConnection() {
SESSION_MAP.forEach((userId, session) -> {
if (!session.isOpen()) {
SESSION_MAP.remove(userId);
}
});
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let heartbeatTimer = null;

function startHeartbeat() {
heartbeatTimer = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'HEARTBEAT' }));
}
}, 30000);
}

function stopHeartbeat() {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
}
}

七、断线重连

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let reconnectCount = 0;
const MAX_RECONNECT = 5;

function connect(userId) {
ws = new WebSocket(`ws://localhost:8080/ws/${userId}`);

ws.onclose = function() {
if (reconnectCount < MAX_RECONNECT) {
setTimeout(() => {
reconnectCount++;
connect(userId);
}, 3000 * reconnectCount);
}
};

ws.onopen = function() {
reconnectCount = 0;
};
}

八、集群方案

单机WebSocket无法跨节点通信,需要使用消息中间件同步。

基于Redis Pub/Sub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class RedisMessageListener {

@Autowired
private WebSocketServer webSocketServer;

@Autowired
private RedisTemplate<String, String> redisTemplate;

/**
* 发布消息
*/
public void publish(String userId, String message) {
redisTemplate.convertAndSend("ws:message",
JSON.toJSONString(new WebSocketMessage(userId, message)));
}

/**
* 订阅消息
*/
@RedisListener(topics = "ws:message")
public void onMessage(String message) {
WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class);
webSocketServer.sendMessage(wsMessage.getTo(), wsMessage.getData().toString());
}
}

基于RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class RabbitMQMessageListener {

@Autowired
private WebSocketServer webSocketServer;

@RabbitListener(queues = "ws.queue")
public void onMessage(String message) {
WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class);
webSocketServer.sendMessage(wsMessage.getTo(), wsMessage.getData().toString());
}
}

九、安全配置

Token认证

1
2
3
@ServerEndpoint(value = "/ws/{userId}", configurator = WebSocketAuthConfig.class)
public class WebSocketServer {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WebSocketAuthConfig extends ServerEndpointConfig.Configurator {

@Override
public void modifyHandshake(ServerEndpointConfig sec,
HandshakeRequest request,
HttpServletResponse response) {
// 从请求参数获取token
String token = request.getParameterMap().get("token").get(0);
// 验证token
if (!validateToken(token)) {
throw new RuntimeException("认证失败");
}
}
}

十、总结

WebSocket整合SpringBoot要点:

  • 使用@ServerEndpoint注解定义端点
  • 管理连接Session实现定向推送和广播
  • 设计统一消息格式
  • 实现心跳和断线重连机制
  • 集群环境使用消息中间件同步
  • 注意安全认证