SSE Node.js 实现
Node.js 服务端实现 SSE 的关键点:正确的响应头、持续写入、心跳保活、Last-Event-ID 处理。本文以 Express 为例,其他框架(Fastify、Koa、原生 http)原理相同。
最小示例
import express from "express";
const app = express();
app.get("/api/events", (req, res) => {
// 三个必要的响应头
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
let id = 0;
const timer = setInterval(() => {
id++;
res.write(`id: ${id}\n`);
res.write(`data: ${JSON.stringify({ time: Date.now() })}\n\n`);
}, 1000);
req.on("close", () => {
clearInterval(timer);
res.end();
});
});
app.listen(3000);
关键点:
res.writeHead(200, ...)设置三个响应头- 每个事件末尾
\n\n(空行是事件边界) - 监听
req.on("close")清理资源,防止客户端断开后继续写入导致内存泄漏
Note
响应不要被缓冲
SSE 的本质是”边写边让客户端收到”。以下几种缓冲会导致事件卡住直到响应结束才一次性下发:
- 反向代理(Nginx 默认
proxy_buffering on)— 在响应头加X-Accel-Buffering: no指示 Nginx 关闭缓冲 - Node.js 压缩中间件(
compression())— 排除text/event-stream或对该路由跳过压缩 - Node.js 自身的隐式写缓冲 — 调用
res.flushHeaders()立即把响应头送出,避免客户端等不到200 OK
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
});
res.flushHeaders(); 封装 sendEvent 辅助函数
直接拼字符串容易漏 \n\n,用一个辅助函数统一处理:
function sendEvent(res, { id, event, data, retry }) {
if (id !== undefined) res.write(`id: ${id}\n`);
if (event) res.write(`event: ${event}\n`);
if (retry) res.write(`retry: ${retry}\n`);
// data 可以是多行
const payload = typeof data === "string" ? data : JSON.stringify(data);
for (const line of payload.split("\n")) {
res.write(`data: ${line}\n`);
}
res.write("\n"); // 事件边界
}
使用:
sendEvent(res, { id: 1, data: { msg: "hello" } });
sendEvent(res, { id: 2, event: "notification", data: { title: "新消息" } });
sendEvent(res, { retry: 5000, data: "server restart soon" });
心跳保活
很多反向代理(Nginx、负载均衡器)会在连接空闲 60 秒 左右主动断开。需要定期发送注释行(以 : 开头)作为心跳:
const heartbeat = setInterval(() => {
res.write(": keep-alive\n\n");
}, 30_000); // 每 30 秒一次
req.on("close", () => {
clearInterval(heartbeat);
});
Note
要点
- 心跳用注释行(
:开头),客户端会忽略,不会触发任何事件 - 间隔一般 15~30 秒,比代理超时时间小
- 不发心跳会导致连接被代理无声断开,客户端才会触发
onerror重连,有感知延迟
断线重连与 Last-Event-ID
客户端重连时会带 Last-Event-ID 请求头,服务端据此决定从哪里继续推送。
简单内存队列
const events = []; // 保留最近 N 条事件
function pushEvent(data) {
const id = events.length + 1;
events.push({ id, data });
if (events.length > 1000) events.shift(); // 容量上限
return id;
}
app.get("/api/events", (req, res) => {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
// 从 Last-Event-ID 的下一条开始补发
const lastEventId = Number(req.headers["last-event-id"] ?? 0);
const missed = events.filter((e) => e.id > lastEventId);
for (const e of missed) {
sendEvent(res, { id: e.id, data: e.data });
}
// 然后把新事件推给该客户端
const listener = (e) => sendEvent(res, { id: e.id, data: e.data });
emitter.on("event", listener);
req.on("close", () => {
emitter.off("event", listener);
});
});
Note
要点
- 生产环境用 Redis / 数据库持久化事件,单机内存队列会在进程重启时丢数据
- Web 服务多实例时要引入消息中间件(Redis Pub/Sub 等),否则不同实例之间事件不互通
Last-Event-ID是字符串,需要自己解析成业务 ID
AI 流式响应模式(POST + 流)
AI 对话接口通常是 POST,因此不能用 EventSource。服务端按 SSE 协议格式写出即可,客户端用 fetch + ReadableStream 解析:
app.post("/api/chat", async (req, res) => {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
const { prompt } = req.body;
// 假设 llm.stream() 返回 async iterator
for await (const token of llm.stream(prompt)) {
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
res.write("data: [DONE]\n\n");
res.end();
});
客户端侧的 fetch + ReadableStream 解析见 SSE 总览。
广播给多个客户端
常见需求:一个事件同时推给所有已连接的客户端。
import { EventEmitter } from "events";
const emitter = new EventEmitter();
emitter.setMaxListeners(10_000); // 默认 10,连接多了要调高
app.get("/api/events", (req, res) => {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
const listener = (data) => sendEvent(res, { data });
emitter.on("event", listener);
req.on("close", () => {
emitter.off("event", listener);
});
});
// 在其他接口里触发广播
app.post("/api/broadcast", (req, res) => {
emitter.emit("event", req.body);
res.json({ ok: true });
});
Note
要点
setMaxListeners必须调大,否则连接数超过 10 会触发警告- 多实例部署时 EventEmitter 只在单进程生效,跨实例广播要走 Redis Pub/Sub / Kafka
压缩与 HTTP/2
| 问题 | 方案 |
|---|---|
Nginx 的 gzip 会缓冲 SSE 输出 | 关闭对 text/event-stream 的压缩,或设置 X-Accel-Buffering: no |
| HTTP/1.1 每域名 6 连接上限 | 启用 HTTP/2,单连接多路复用 |
| 长连接下代理超时 | 延长代理的 proxy_read_timeout 并配合心跳 |
常用的 Nginx 配置:
location /api/events {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off; # 禁用缓冲
proxy_cache off; # 禁用缓存
proxy_read_timeout 3600s; # 长超时
}
对比:为什么不用原生 http 模块直接写
原生 http 也可以实现 SSE,但 Express / Fastify 帮你处理了路由、请求体解析、错误兜底等边缘。纯 SSE 学习场景用原生模块理解清楚协议后,生产代码用框架更省事。
回到 SSE 总览