Postgres 适配器

这个怎么运作

Postgres 适配器依赖于NOTIFYLISTEN命令。

每个发送给多个客户的数据包 (例如 io.to("room1").emit()socket.broadcast.emit()) 是:

  • 发送到连接到当前服务器的所有匹配客户端
  • 如果数据包包含二进制数据或超过 8000 字节限制,则数据包为:
    • msgpack编码并插入到辅助表中
    • 行 ID 在 NOTIFY 命令中发送
    • 此行 ID 由集群的其他 Socket.IO 服务器接收,它们查询表,解码数据包,然后将其广播到自己的一组连接的客户端
  • 否则,数据包只是在 NOTIFY 命令中发送并由集群的其他 Socket.IO 服务器接收

Diagram of how the Postgres adapter works

这个适配器的源代码可以在这里找到。

安装

  1. npm install @socket.io/postgres-adapter pg

对于 TypeScript 用户,您可能还需要@types/pg.

用法

  1. const { Server } = require("socket.io");
  2. const { createAdapter } = require("@socket.io/postgres-adapter");
  3. const { Pool } = require("pg");
  4. const io = new Server();
  5. const pool = new Pool({
  6. user: "postgres",
  7. host: "localhost",
  8. database: "postgres",
  9. password: "changeit",
  10. port: 5432,
  11. });
  12. pool.query(`
  13. CREATE TABLE IF NOT EXISTS socket_io_attachments (
  14. id bigserial UNIQUE,
  15. created_at timestamptz DEFAULT NOW(),
  16. payload bytea
  17. );
  18. `);
  19. io.adapter(createAdapter(pool));
  20. io.listen(3000);

配置

配置项描述默认值
uid节点的 ID随机 ID
channelPrefix通知通道的前缀socket.io
tableName超过 8000 字节限制或包含二进制数据的有效负载表的名称socket_io_attachments
payloadThreshold有效负载大小的阈值(以字节为单位)8000
requestsTimeout服务器间请求的超时时间,例如fetchSockets()serverSideEmit()5000
heartbeatInterval两次心跳之间的毫秒数5000
heartbeatTimeout在我们考虑节点关闭之前没有心跳的毫秒数10000
cleanupInterval两次清理查询之间的毫秒数30000

常见问题

  • 使用 Postgres 适配器时是否还需要启用粘性会话?

是的。否则将导致 HTTP 400 响应(您到达的服务器不知道 Socket.IO 会话)。

更多信息可以在这里找到。

  • 当 Postgres 服务器关闭时会发生什么?

如果与 Postgres 服务器的连接被切断,数据包将仅发送到连接到当前服务器的客户端。

Emitter

Postgres 发射器允许从另一个 Node.js 进程向连接的客户端发送数据包:

Diagram of how the Postgres emitter works

安装

  1. npm install @socket.io/postgres-emitter pg

用法

  1. const { Emitter } = require("@socket.io/postgres-emitter");
  2. const { Pool } = require("pg");
  3. const pool = new Pool({
  4. user: "postgres",
  5. host: "localhost",
  6. database: "postgres",
  7. password: "changeit",
  8. port: 5432,
  9. });
  10. const emitter = new Emitter(pool);
  11. setInterval(() => {
  12. emitter.emit("ping", new Date());
  13. }, 1000);

请参阅此处的备忘单。