WebSocket Node.js 实现
Node.js 侧实现 WebSocket 主要有两个选择:
本文以 ws 为主,关键差异点补充 Socket.IO 的做法。
ws:最小服务端
import { WebSocketServer } from "ws";
const wss = new WebSocketServer({ port: 3000 });
wss.on("connection", (ws, req) => {
console.log("新连接:", req.socket.remoteAddress);
ws.on("message", (raw) => {
const msg = raw.toString();
console.log("收到:", msg);
ws.send(`Echo: ${msg}`);
});
ws.on("close", (code, reason) => {
console.log(`关闭: ${code} ${reason}`);
});
ws.on("error", (err) => {
console.error("错误:", err);
});
});
广播给所有客户端
wss.clients 是当前所有连接的 Set:
function broadcast(data, exclude) {
const payload = typeof data === "string" ? data : JSON.stringify(data);
for (const client of wss.clients) {
if (client !== exclude && client.readyState === client.OPEN) {
client.send(payload);
}
}
}
wss.on("connection", (ws) => {
ws.on("message", (raw) => {
broadcast(raw.toString(), ws); // 转发给其他客户端
});
});
Note
要点
- 必须检查
readyState === OPEN,否则在CLOSING/CLOSED状态下send会抛异常 - 大广播场景用
wss.clients.forEach是 O(N),N 大时考虑分片或走消息队列
心跳检测
ws 库提供协议层的 ws.ping()。配合 pong 事件判断存活:
const HEARTBEAT_INTERVAL = 30_000;
function heartbeat() {
this.isAlive = true;
}
wss.on("connection", (ws) => {
ws.isAlive = true;
ws.on("pong", heartbeat);
});
const interval = setInterval(() => {
for (const ws of wss.clients) {
if (ws.isAlive === false) {
// 上次 Ping 后没收到 Pong,视为死连接
ws.terminate();
continue;
}
ws.isAlive = false;
ws.ping();
}
}, HEARTBEAT_INTERVAL);
wss.on("close", () => clearInterval(interval));
工作流程:
① 每 30 秒给所有连接发 Ping,并标记 isAlive = false
② 收到 Pong 的连接:heartbeat() 把 isAlive 恢复为 true
③ 30 秒后仍 isAlive === false 的连接:terminate() 立即关闭(不走四次挥手)
Note
浏览器侧的心跳
前端 WebSocket API 无法发协议层 Ping。如果前端需要主动心跳:
- 方案 A:只靠服务端 Ping(大多数场景够用)
- 方案 B:前端发应用层
{ type: "ping" }消息,服务端回{ type: "pong" },纯业务层实现
握手阶段鉴权
WebSocket 握手是标准 HTTP,可以在此阶段验证身份:
import { WebSocketServer } from "ws";
import { createServer } from "http";
const server = createServer();
const wss = new WebSocketServer({ noServer: true });
server.on("upgrade", async (req, socket, head) => {
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get("token");
const user = await verifyToken(token);
if (!user) {
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
// Origin 白名单
const origin = req.headers.origin;
if (!isAllowedOrigin(origin)) {
socket.write("HTTP/1.1 403 Forbidden\r\n\r\n");
socket.destroy();
return;
}
wss.handleUpgrade(req, socket, head, (ws) => {
ws.user = user; // 绑定用户信息
wss.emit("connection", ws, req);
});
});
server.listen(3000);
Note
鉴权方式对比
| 方式 | 优点 | 缺点 |
|---|---|---|
URL 查询参数(ws://.../?token=xxx) | 最简单 | token 可能出现在日志、历史记录中 |
| Cookie | 浏览器自动带,与现有会话一致 | 跨域场景需额外配置 |
| 握手后首条消息发 Token | 不依赖传输层 | 握手阶段无鉴权,需短超时 + 未认证队列 |
生产环境通常用短期 JWT 放 URL,或首条消息 + 超时清理。
房间(Room)模式
让消息只推送给某个子集的客户端。ws 库没有内置房间,手动维护:
const rooms = new Map(); // Map<roomId, Set<WebSocket>>
function joinRoom(ws, roomId) {
if (!rooms.has(roomId)) rooms.set(roomId, new Set());
rooms.get(roomId).add(ws);
ws.roomId = roomId;
}
function leaveRoom(ws) {
const room = rooms.get(ws.roomId);
if (!room) return;
room.delete(ws);
if (room.size === 0) rooms.delete(ws.roomId);
}
function broadcastToRoom(roomId, data, exclude) {
const room = rooms.get(roomId);
if (!room) return;
const payload = typeof data === "string" ? data : JSON.stringify(data);
for (const client of room) {
if (client !== exclude && client.readyState === client.OPEN) {
client.send(payload);
}
}
}
wss.on("connection", (ws) => {
ws.on("message", (raw) => {
const msg = JSON.parse(raw);
if (msg.type === "join") joinRoom(ws, msg.roomId);
if (msg.type === "chat") broadcastToRoom(ws.roomId, msg, ws);
});
ws.on("close", () => leaveRoom(ws));
});
Socket.IO 的对应做法
Socket.IO 把房间和广播内建:
import { Server } from "socket.io";
const io = new Server(3000, {
cors: { origin: "https://example.com" },
});
io.use((socket, next) => {
// 中间件鉴权
const token = socket.handshake.auth.token;
verifyToken(token).then((user) => {
if (!user) return next(new Error("Unauthorized"));
socket.data.user = user;
next();
});
});
io.on("connection", (socket) => {
socket.on("join", (roomId) => {
socket.join(roomId);
socket.to(roomId).emit("system", `${socket.data.user.name} joined`);
});
socket.on("chat", (msg) => {
const roomId = [...socket.rooms][1]; // rooms 的第一个是 socket.id 本身
io.to(roomId).emit("chat", { from: socket.data.user.name, text: msg });
});
socket.on("disconnect", () => {
// 离开所有 room 自动处理
});
});
Note
Socket.IO 额外的代价
- 不是原生 WebSocket 协议(有自己的握手和帧格式),只能 Socket.IO 服务端配 Socket.IO 客户端
- 包体积比
ws大得多 - 但换来了房间、广播、自动重连、HTTP 轮询降级、Redis Adapter 开箱即用
选型判断:要自己维护房间/广播/扩展 → 直接用 Socket.IO;只是简单的消息转发 → ws 更轻。
资源清理
常见的内存泄漏来源:
| 场景 | 修复 |
|---|---|
ws.on("close") 里没清理定时器 | 显式 clearInterval |
| 房间 Map 里残留已关闭的连接 | 关闭回调里 leaveRoom(ws) |
EventEmitter 监听器超过 maxListeners 警告 | setMaxListeners 或按连接创建独立 emitter |
未处理 error 事件 | 必须监听 ws.on("error"),否则会 crash 进程 |
多实例部署
单台服务端的连接数受限于内存和文件描述符(数万到十万级)。水平扩展见 水平扩展。
回到 WebSocket 总览