Custom transporters
Nest provides a variety of transporters out-of-the-box, as well as an API allowing developers to build new custom transport strategies. Transporters enable you to connect components over a network using a pluggable communications layer and a very simple application-level message protocol (read full article).
info Hint Building a microservice with Nest does not necessarily mean you must use the
@nestjs/microservices
package. For example, if you want to communicate with external services (let’s say other microservices written in different languages), you may not need all the features provided by@nestjs/microservice
library. In fact, if you don’t need decorators (@EventPattern
or@MessagePattern
) that let you declaratively define subscribers, running a Standalone Application and manually maintaining connection/subscribing to channels should be enough for most use-cases and will provide you with more flexibility.
With a custom transporter, you can integrate any messaging system/protocol (including Google Cloud Pub/Sub, Amazon Kinesis, and others) or extend the existing one, adding extra features on top (for example, QoS for MQTT).
info Hint To better understand how Nest microservices work and how you can extend the capabilities of existing transporters, we recommend reading the NestJS Microservices in Action and Advanced NestJS Microservices article series.
Creating a strategy
First, let’s define a class representing our custom transporter.
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy {
/**
* This method is triggered when you run "app.listen()".
*/
listen(callback: () => void) {
callback();
}
/**
* This method is triggered on application shutdown.
*/
close() {}
}
warning Warning Please, note we won’t be implementing a fully-featured Google Cloud Pub/Sub server in this chapter as this would require diving into transporter specific technical details.
In our example above, we declared the GoogleCloudPubSubServer
class and provided listen()
and close()
methods enforced by the CustomTransportStrategy
interface. Also, our class extends the Server
class imported from the @nestjs/microservices
package that provides a few useful methods, for example, methods used by Nest runtime to register message handlers. Alternatively, in case you want to extend the capabilities of an existing transport strategy, you could extend the corresponding server class, for example, ServerRedis
. Conventionally, we added the "Server"
suffix to our class as it will be responsible for subscribing to messages/events (and responding to them, if necessary).
With this in place, we can now use our custom strategy instead of using a built-in transporter, as follows:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
},
);
Basically, instead of passing the normal transporter options object with transport
and options
properties, we pass a single property, strategy
, whose value is an instance of our custom transporter class.
Back to our GoogleCloudPubSubServer
class, in a real-world application, we would be establishing a connection to our message broker/external service and registering subscribers/listening to specific channels in listen()
method (and then removing subscriptions & closing the connection in the close()
teardown method), but since this requires a good understanding of how Nest microservices communicate with each other, we recommend reading this article series. In this chapter instead, we’ll focus on the capabilities the Server
class provides and how you can leverage them to build custom strategies.
For example, let’s say that somewhere in our application, the following message handler is defined:
@MessagePattern('echo')
echo(@Payload() data: object) {
return data;
}
This message handler will be automatically registered by Nest runtime. With Server
class, you can see what message patterns have been registered and also, access and execute the actual methods that were assigned to them. To test this out, let’s add a simple console.log
inside listen()
method before callback
function is called:
listen(callback: () => void) {
console.log(this.messageHandlers);
callback();
}
After your application restarts, you’ll see the following log in your terminal:
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
info Hint If we used the
@EventPattern
decorator, you would see the same output, but with theisEventHandler
property set totrue
.
As you can see, the messageHandlers
property is a Map
collection of all message (and event) handlers, in which patterns are being used as keys. Now, you can use a key (for example, "echo"
) to receive a reference to the message handler:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
console.log(await echoHandler('Hello world!'));
callback();
}
Once we execute the echoHandler
passing an arbitrary string as an argument ("Hello world!"
here), we should see it in the console:
Hello world!
Which means that our method handler was properly executed.
Client proxy
As we mentioned in the first section, you don’t necessarily need to use the @nestjs/microservices
package to create microservices, but if you decide to do so and you need to integrate a custom strategy, you will need to provide a “client” class too.
info Hint Again, implementing a fully-featured client class compatible with all
@nestjs/microservices
features (e.g., streaming) requires a good understading of communication techniques used by the framework. To learn more, check out this article.
To communicate with an external service/emit & publish messages (or events) you can either use a library-specific SDK package, or implement a custom client class that extends the ClientProxy
, as follows:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {}
}
warning Warning Please, note we won’t be implementing a fully-featured Google Cloud Pub/Sub client in this chapter as this would require diving into transporter specific technical details.
As you can see, ClientProxy
class requires us to provide several methods for establishing & closing the connection and publishing messages (publish
) and events (dispatchEvent
). Note, if you don’t need a request-response communication style support, you can leave the publish()
method empty. Likewise, if you don’t need to support event-based communication, skip the dispatchEvent()
method.
To observe what and when those methods are executed, let’s add multiple console.log
calls, as follows:
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect');
}
async close() {
console.log('close');
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet);
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {
console.log('message:', packet);
// In a real-world application, the "callback" function should be executed
// with payload sent back from the responder. Here, we'll simply simulate (5 seconds delay)
// that response came through by passing the same "data" as we've originally passed in.
setTimeout(() => callback({ response: packet.data }), 5000);
return () => console.log('teardown');
}
}
With this in place, let’s create an instance of GoogleCloudPubSubClient
class and run the send()
method (which you might have seen in earlier chapters), subscribing to the returned observable stream.
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
Now, you should see the following output in your terminal:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- after 5 seconds
To test if our “teardown” method (which our publish()
method returns) is properly executed, let’s apply a timeout operator to our stream, setting it to 2 seconds to make sure it throws earlier then our setTimeout
calls the callback
function.
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message),
);
info Hint The
timeout
operator is imported from therxjs/operators
package.
With timeout
operator applied, your terminal output should look as follows:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred
To dispatch an event (instead of sending a message), use the emit()
method:
googlePubSubClient.emit('event', 'Hello world!');
And that’s what you should see in the console:
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }
Message serialization
If you need to add some custom logic around the serialization of responses on the client side, you can use a custom class that extends the ClientProxy
class or one of its child classes. For modifying successful requests you can override the serializeResponse
method, and for modifying any errors that go through this client you can override the serializeError
method. To make use of this custom class, you can pass the class itself to the ClientsModule.register()
method using the customClass
property. Below is an example of a custom ClientProxy
that serializes each error into an RpcException
.
@@filename(error-handling.proxy)
import { ClientTcp, RpcException } from '@nestjs/microservices';
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err);
}
}
and then use it in the ClientsModule
like so:
@@filename(app.module)
@Module({
imports: [
ClientsModule.register({
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}),
]
})
export class AppModule
info hint This is the class itself being passed to
customClass
, not an instance of the class. Nest will create the instance under the hood for you, and will pass any options given to theoptions
property to the newClientProxy
.