跳至正文

WebSocket Node.js 实现

Node.js 侧实现 WebSocket 主要有两个选择:

  • ws:原生协议实现,轻量、控制力强
  • Socket.IO:高级封装,自带房间、广播、自动降级(HTTP 轮询兜底)、Redis Adapter

本文以 ws 为主,关键差异点补充 Socket.IO 的做法。

ws:最小服务端

import { WebSocketServer } from "ws";

const wss = new WebSocketServer({ port: 3000 });

wss.on("connection", (ws, req) => {
  console.log("新连接:", req.socket.remoteAddress);

  ws.on("message", (raw) => {
    const msg = raw.toString();
    console.log("收到:", msg);
    ws.send(`Echo: ${msg}`);
  });

  ws.on("close", (code, reason) => {
    console.log(`关闭: ${code} ${reason}`);
  });

  ws.on("error", (err) => {
    console.error("错误:", err);
  });
});

广播给所有客户端

wss.clients 是当前所有连接的 Set

function broadcast(data, exclude) {
  const payload = typeof data === "string" ? data : JSON.stringify(data);
  for (const client of wss.clients) {
    if (client !== exclude && client.readyState === client.OPEN) {
      client.send(payload);
    }
  }
}

wss.on("connection", (ws) => {
  ws.on("message", (raw) => {
    broadcast(raw.toString(), ws); // 转发给其他客户端
  });
});

Note

要点

  • 必须检查 readyState === OPEN,否则在 CLOSING / CLOSED 状态下 send 会抛异常
  • 大广播场景用 wss.clients.forEach 是 O(N),N 大时考虑分片或走消息队列

心跳检测

ws 库提供协议层的 ws.ping()。配合 pong 事件判断存活:

const HEARTBEAT_INTERVAL = 30_000;

function heartbeat() {
  this.isAlive = true;
}

wss.on("connection", (ws) => {
  ws.isAlive = true;
  ws.on("pong", heartbeat);
});

const interval = setInterval(() => {
  for (const ws of wss.clients) {
    if (ws.isAlive === false) {
      // 上次 Ping 后没收到 Pong,视为死连接
      ws.terminate();
      continue;
    }
    ws.isAlive = false;
    ws.ping();
  }
}, HEARTBEAT_INTERVAL);

wss.on("close", () => clearInterval(interval));

工作流程:

① 每 30 秒给所有连接发 Ping,并标记 isAlive = false
② 收到 Pong 的连接:heartbeat() 把 isAlive 恢复为 true
③ 30 秒后仍 isAlive === false 的连接:terminate() 立即关闭(不走四次挥手)

Note

浏览器侧的心跳

前端 WebSocket API 无法发协议层 Ping。如果前端需要主动心跳:

  • 方案 A:只靠服务端 Ping(大多数场景够用)
  • 方案 B:前端发应用层 { type: "ping" } 消息,服务端回 { type: "pong" },纯业务层实现

握手阶段鉴权

WebSocket 握手是标准 HTTP,可以在此阶段验证身份:

import { WebSocketServer } from "ws";
import { createServer } from "http";

const server = createServer();

const wss = new WebSocketServer({ noServer: true });

server.on("upgrade", async (req, socket, head) => {
  const url = new URL(req.url, `http://${req.headers.host}`);
  const token = url.searchParams.get("token");

  const user = await verifyToken(token);
  if (!user) {
    socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
    socket.destroy();
    return;
  }

  // Origin 白名单
  const origin = req.headers.origin;
  if (!isAllowedOrigin(origin)) {
    socket.write("HTTP/1.1 403 Forbidden\r\n\r\n");
    socket.destroy();
    return;
  }

  wss.handleUpgrade(req, socket, head, (ws) => {
    ws.user = user; // 绑定用户信息
    wss.emit("connection", ws, req);
  });
});

server.listen(3000);

Note

鉴权方式对比

方式优点缺点
URL 查询参数(ws://.../?token=xxx最简单token 可能出现在日志、历史记录中
Cookie浏览器自动带,与现有会话一致跨域场景需额外配置
握手后首条消息发 Token不依赖传输层握手阶段无鉴权,需短超时 + 未认证队列

生产环境通常用短期 JWT 放 URL,或首条消息 + 超时清理。

房间(Room)模式

让消息只推送给某个子集的客户端。ws 库没有内置房间,手动维护:

const rooms = new Map(); // Map<roomId, Set<WebSocket>>

function joinRoom(ws, roomId) {
  if (!rooms.has(roomId)) rooms.set(roomId, new Set());
  rooms.get(roomId).add(ws);
  ws.roomId = roomId;
}

function leaveRoom(ws) {
  const room = rooms.get(ws.roomId);
  if (!room) return;
  room.delete(ws);
  if (room.size === 0) rooms.delete(ws.roomId);
}

function broadcastToRoom(roomId, data, exclude) {
  const room = rooms.get(roomId);
  if (!room) return;
  const payload = typeof data === "string" ? data : JSON.stringify(data);
  for (const client of room) {
    if (client !== exclude && client.readyState === client.OPEN) {
      client.send(payload);
    }
  }
}

wss.on("connection", (ws) => {
  ws.on("message", (raw) => {
    const msg = JSON.parse(raw);
    if (msg.type === "join") joinRoom(ws, msg.roomId);
    if (msg.type === "chat") broadcastToRoom(ws.roomId, msg, ws);
  });
  ws.on("close", () => leaveRoom(ws));
});

Socket.IO 的对应做法

Socket.IO 把房间和广播内建:

import { Server } from "socket.io";

const io = new Server(3000, {
  cors: { origin: "https://example.com" },
});

io.use((socket, next) => {
  // 中间件鉴权
  const token = socket.handshake.auth.token;
  verifyToken(token).then((user) => {
    if (!user) return next(new Error("Unauthorized"));
    socket.data.user = user;
    next();
  });
});

io.on("connection", (socket) => {
  socket.on("join", (roomId) => {
    socket.join(roomId);
    socket.to(roomId).emit("system", `${socket.data.user.name} joined`);
  });

  socket.on("chat", (msg) => {
    const roomId = [...socket.rooms][1]; // rooms 的第一个是 socket.id 本身
    io.to(roomId).emit("chat", { from: socket.data.user.name, text: msg });
  });

  socket.on("disconnect", () => {
    // 离开所有 room 自动处理
  });
});

Note

Socket.IO 额外的代价

  • 不是原生 WebSocket 协议(有自己的握手和帧格式),只能 Socket.IO 服务端配 Socket.IO 客户端
  • 包体积比 ws 大得多
  • 但换来了房间、广播、自动重连、HTTP 轮询降级、Redis Adapter 开箱即用

选型判断:要自己维护房间/广播/扩展 → 直接用 Socket.IO;只是简单的消息转发 → ws 更轻。

资源清理

常见的内存泄漏来源:

场景修复
ws.on("close") 里没清理定时器显式 clearInterval
房间 Map 里残留已关闭的连接关闭回调里 leaveRoom(ws)
EventEmitter 监听器超过 maxListeners 警告setMaxListeners 或按连接创建独立 emitter
未处理 error 事件必须监听 ws.on("error"),否则会 crash 进程

多实例部署

单台服务端的连接数受限于内存和文件描述符(数万到十万级)。水平扩展见 水平扩展

回到 WebSocket 总览