Message Router
The Magic Glue of Watermill.
Publishers and Subscribers are rather low-level parts of Watermill.In production use, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..
You also might not want to send an Ack when processing was successful. Sometimes, you’d like to send a message after processing of another message finishes.
To handle these requirements, there is a component named Router.
Configuration
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout determines how long router should work for handlers when closing.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
func (c RouterConfig) Validate() error {
return nil
}
// ...
Handler
At the beginning you need to implement HandlerFunc
:
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc is function called when message is received.
//
// msg.Ack() is called automatically when HandlerFunc doesn't return error.
// When HandlerFunc returns error, msg.Nack() is called.
// When msg.Ack() was called in handler and HandlerFunc returns error,
// msg.Nack() will be not sent because Ack was already sent.
//
// HandlerFunc's are executed parallel when multiple messages was received
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Next, you have to add a new handler with Router.AddHandler
:
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler adds a new handler.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// publishTopic is a topic to which router will produce messages returned by handlerFunc.
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) {
// ...
See an example usage from Getting Started:
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router.AddHandler(
"struct_handler", // handler name, must be unique
"incoming_messages_topic", // topic from which we will read events
pubSub,
"outgoing_messages_topic", // topic to which we will publish events
pubSub,
structHandler{}.Handler,
)
// ...
No publisher handler
Not every handler will produce new messages. You can add this kind of handler by using Router.AddNoPublisherHandler
:
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) {
// ...
Ack
By default, msg.Ack()
is called when HanderFunc
doesn’t return an error. If an error is returned, msg.Nack()
will be called.Because of this, you don’t have to call msg.Ack()
or msg.Nack()
after a message is processed (you can if you want, of course).
Producing messages
When returning multiple messages from a handler, be aware that most Publisher implementations don’t support atomic publishing of messages. It may end up producing only some of messages and sending msg.Nack()
if the broker or the storage are not available.
If it is an issue, consider publishing just one message with each handler.
Running the Router
To run the Router, you need to call Run()
.
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run runs all plugins and handlers and starts subscribing to provided topics.
// This call is blocking while the router is running.
//
// When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
//
// To stop Run() you should call Close() on the router.
//
// ctx will be propagated to all subscribers.
//
// When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
Ensuring that the Router is running
It can be useful to know if the router is running. You can use the Running()
method for this.
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
// fmt.Println("Starting router")
// go r.Run(ctx)
// <- r.Running()
// fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...
Execution models
Subscribers can consume either one message at a time or multiple messages in parallel.
- Single stream of messages is the simplest approach and it means that until a
msg.Ack()
is called, the subscriberwill not receive any new messages. - Multiple message streams are supported only by some subscribers. By subscribing to multiple topic partitions at once,several messages can be consumed in parallel, even previous messages that were not acked (for example, the Kafka subscriberworks like this). Router handles this model by running concurrent
HandlerFunc
s, one for each partition.See the chosen Pub/Sub documentation for supported execution models.
Middleware
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
//
// It can be attached to the router by using `AddMiddleware` method.
//
// Example:
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executed before handler")
// producedMessages, err := h(message)
// fmt.Println("executed after handler")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
A full list of standard middlewares can be found in Middlewares.
Plugin
Full source: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error
// ...
A full list of standard plugins can be found in message/router/plugin.