跳至正文

SSE Node.js 实现

Node.js 服务端实现 SSE 的关键点:正确的响应头、持续写入、心跳保活、Last-Event-ID 处理。本文以 Express 为例,其他框架(Fastify、Koa、原生 http)原理相同。

最小示例

import express from "express";

const app = express();

app.get("/api/events", (req, res) => {
  // 三个必要的响应头
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  let id = 0;

  const timer = setInterval(() => {
    id++;
    res.write(`id: ${id}\n`);
    res.write(`data: ${JSON.stringify({ time: Date.now() })}\n\n`);
  }, 1000);

  req.on("close", () => {
    clearInterval(timer);
    res.end();
  });
});

app.listen(3000);

关键点:

  • res.writeHead(200, ...) 设置三个响应头
  • 每个事件末尾 \n\n(空行是事件边界)
  • 监听 req.on("close") 清理资源,防止客户端断开后继续写入导致内存泄漏

Note

响应不要被缓冲

SSE 的本质是”边写边让客户端收到”。以下几种缓冲会导致事件卡住直到响应结束才一次性下发:

  • 反向代理(Nginx 默认 proxy_buffering on)— 在响应头加 X-Accel-Buffering: no 指示 Nginx 关闭缓冲
  • Node.js 压缩中间件(compression())— 排除 text/event-stream 或对该路由跳过压缩
  • Node.js 自身的隐式写缓冲 — 调用 res.flushHeaders() 立即把响应头送出,避免客户端等不到 200 OK
res.writeHead(200, {
  "Content-Type": "text/event-stream",
  "Cache-Control": "no-cache",
  Connection: "keep-alive",
  "X-Accel-Buffering": "no",
});
res.flushHeaders();

封装 sendEvent 辅助函数

直接拼字符串容易漏 \n\n,用一个辅助函数统一处理:

function sendEvent(res, { id, event, data, retry }) {
  if (id !== undefined) res.write(`id: ${id}\n`);
  if (event) res.write(`event: ${event}\n`);
  if (retry) res.write(`retry: ${retry}\n`);

  // data 可以是多行
  const payload = typeof data === "string" ? data : JSON.stringify(data);
  for (const line of payload.split("\n")) {
    res.write(`data: ${line}\n`);
  }
  res.write("\n"); // 事件边界
}

使用:

sendEvent(res, { id: 1, data: { msg: "hello" } });
sendEvent(res, { id: 2, event: "notification", data: { title: "新消息" } });
sendEvent(res, { retry: 5000, data: "server restart soon" });

心跳保活

很多反向代理(Nginx、负载均衡器)会在连接空闲 60 秒 左右主动断开。需要定期发送注释行(以 : 开头)作为心跳:

const heartbeat = setInterval(() => {
  res.write(": keep-alive\n\n");
}, 30_000); // 每 30 秒一次

req.on("close", () => {
  clearInterval(heartbeat);
});

Note

要点

  • 心跳用注释行: 开头),客户端会忽略,不会触发任何事件
  • 间隔一般 15~30 秒,比代理超时时间小
  • 不发心跳会导致连接被代理无声断开,客户端才会触发 onerror 重连,有感知延迟

断线重连与 Last-Event-ID

客户端重连时会带 Last-Event-ID 请求头,服务端据此决定从哪里继续推送。

简单内存队列

const events = []; // 保留最近 N 条事件

function pushEvent(data) {
  const id = events.length + 1;
  events.push({ id, data });
  if (events.length > 1000) events.shift(); // 容量上限
  return id;
}

app.get("/api/events", (req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  // 从 Last-Event-ID 的下一条开始补发
  const lastEventId = Number(req.headers["last-event-id"] ?? 0);
  const missed = events.filter((e) => e.id > lastEventId);
  for (const e of missed) {
    sendEvent(res, { id: e.id, data: e.data });
  }

  // 然后把新事件推给该客户端
  const listener = (e) => sendEvent(res, { id: e.id, data: e.data });
  emitter.on("event", listener);

  req.on("close", () => {
    emitter.off("event", listener);
  });
});

Note

要点

  • 生产环境用 Redis / 数据库持久化事件,单机内存队列会在进程重启时丢数据
  • Web 服务多实例时要引入消息中间件(Redis Pub/Sub 等),否则不同实例之间事件不互通
  • Last-Event-ID 是字符串,需要自己解析成业务 ID

AI 流式响应模式(POST + 流)

AI 对话接口通常是 POST,因此不能用 EventSource。服务端按 SSE 协议格式写出即可,客户端用 fetch + ReadableStream 解析:

app.post("/api/chat", async (req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  const { prompt } = req.body;

  // 假设 llm.stream() 返回 async iterator
  for await (const token of llm.stream(prompt)) {
    res.write(`data: ${JSON.stringify({ token })}\n\n`);
  }

  res.write("data: [DONE]\n\n");
  res.end();
});

客户端侧的 fetch + ReadableStream 解析见 SSE 总览

广播给多个客户端

常见需求:一个事件同时推给所有已连接的客户端。

import { EventEmitter } from "events";

const emitter = new EventEmitter();
emitter.setMaxListeners(10_000); // 默认 10,连接多了要调高

app.get("/api/events", (req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  const listener = (data) => sendEvent(res, { data });
  emitter.on("event", listener);

  req.on("close", () => {
    emitter.off("event", listener);
  });
});

// 在其他接口里触发广播
app.post("/api/broadcast", (req, res) => {
  emitter.emit("event", req.body);
  res.json({ ok: true });
});

Note

要点

  • setMaxListeners 必须调大,否则连接数超过 10 会触发警告
  • 多实例部署时 EventEmitter 只在单进程生效,跨实例广播要走 Redis Pub/Sub / Kafka

压缩与 HTTP/2

问题方案
Nginx 的 gzip 会缓冲 SSE 输出关闭对 text/event-stream 的压缩,或设置 X-Accel-Buffering: no
HTTP/1.1 每域名 6 连接上限启用 HTTP/2,单连接多路复用
长连接下代理超时延长代理的 proxy_read_timeout 并配合心跳

常用的 Nginx 配置:

location /api/events {
  proxy_pass http://backend;
  proxy_http_version 1.1;
  proxy_set_header Connection "";
  proxy_buffering off;          # 禁用缓冲
  proxy_cache off;              # 禁用缓存
  proxy_read_timeout 3600s;     # 长超时
}

对比:为什么不用原生 http 模块直接写

原生 http 也可以实现 SSE,但 Express / Fastify 帮你处理了路由、请求体解析、错误兜底等边缘。纯 SSE 学习场景用原生模块理解清楚协议后,生产代码用框架更省事。

回到 SSE 总览