Subscriptions / WebSockets

info

Subscriptions & WebSockets are in beta, alpha & might change without a major version bump. However, feel free to use them and report any issue you may find on GitHub

Using Subscriptions

tip

Adding a subscription procedure

  1. import * as trpc from '@trpc/server';
  2. import { EventEmitter } from 'events';
  3. // create a global event emitter (could be replaced by redis, etc)
  4. const ee = new EventEmitter()
  5. trpc.router()
  6. .subscription('onAdd', {
  7. resolve({ ctx }) {
  8. // `resolve()` is triggered for each client when they start subscribing `onAdd`
  9. // return a `Subscription` with a callback which is triggered immediately
  10. return new trpc.Subscription<Post>((emit) => {
  11. const onAdd = (data: Post) => {
  12. // emit data to client
  13. emit.data(data)
  14. };
  15. // trigger `onAdd()` when `add` is triggered in our event emitter
  16. ee.on('add', onAdd);
  17. // unsubscribe function when client disconnects or stops subscribing
  18. return () => {
  19. ee.off('add', onAdd);
  20. };
  21. });
  22. },
  23. })
  24. .mutation('add', {
  25. input: z.object({
  26. id: z.string().uuid().optional(),
  27. text: z.string().min(1),
  28. }),
  29. async resolve({ ctx, input }) {
  30. const post = { ...input } /* [..] add to db */
  31. ee.emit('add', post);
  32. return post;
  33. },
  34. })

Creating a WebSocket-server

  1. yarn add ws
  1. import ws from 'ws';
  2. import { applyWSSHandler } from '@trpc/server/adapters/ws';
  3. import { appRouter } from './routers/app';
  4. import { createContext } from './trpc';
  5. const wss = new ws.Server({
  6. port: 3001,
  7. });
  8. const handler = applyWSSHandler({ wss, router: appRouter, createContext });
  9. wss.on('connection', (ws) => {
  10. console.log(`➕➕ Connection (${wss.clients.size})`);
  11. ws.once('close', () => {
  12. console.log(`➖➖ Connection (${wss.clients.size})`);
  13. });
  14. });
  15. console.log('✅ WebSocket Server listening on ws://localhost:3001');
  16. process.on('SIGTERM', () => {
  17. console.log('SIGTERM');
  18. handler.broadcastReconnectNotification();
  19. wss.close();
  20. });

Setting TRPCClient to use WebSockets

tip

You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.

  1. import { createWSClient, wsLink } from '@trpc/client/links/wsLink';
  2. import { httpBatchLink } from '@trpc/client/links/httpBatchLink';
  3. // create persistent WebSocket connection
  4. const wsClient = createWSClient({
  5. url: `ws://localhost:3001`,
  6. });
  7. // configure TRPCClient to use WebSockets transport
  8. const client = createTRPCClient<AppRouter>({
  9. links: [
  10. wsLink({
  11. client: wsClient,
  12. }),
  13. ],
  14. });

Using React

See /examples/next-prisma-starter-websockets.

WebSockets RPC Specification

You can read more details by drilling into the TypeScript definitions:

query / mutation

Request

  1. {
  2. id: number | string;
  3. jsonrpc?: '2.0';
  4. method: 'query' | 'mutation';
  5. params: {
  6. path: string;
  7. input?: unknown; // <-- pass input of procedure, serialized by transformer
  8. };
  9. }

Response

… below, or an error.

  1. {
  2. id: number | string;
  3. jsonrpc: '2.0';
  4. result: {
  5. type: 'data'; // always 'data' for mutation / queries
  6. data: TOutput; // output from procedure
  7. };
  8. }

subscription / subscription.stop

Start a subscription

  1. {
  2. id: number | string;
  3. jsonrpc?: '2.0';
  4. method: 'subscription';
  5. params: {
  6. path: string;
  7. input?: unknown; // <-- pass input of procedure, serialized by transformer
  8. };
  9. }

To cancel a subscription, call subscription.stop

  1. {
  2. id: number | string; // <-- id of your created subscription
  3. jsonrpc?: '2.0';
  4. method: 'subscription.stop';
  5. }

Subscription response shape

… below, or an error.

  1. {
  2. id: number | string;
  3. jsonrpc: '2.0';
  4. result: (
  5. | {
  6. type: 'data';
  7. data: TData; // subscription emitted data
  8. }
  9. | {
  10. type: 'started'; // sub started
  11. }
  12. | {
  13. type: 'stopped'; // sub stopped
  14. }
  15. )
  16. }

Errors

See https://www.jsonrpc.org/specification#error_object or Error Formatting.

Notifications from Server to Client

{id: null, type: 'reconnect' }

Tells clients to reconnect before shutting down server. Invoked by wssHandler.broadcastReconnectNotification().