Implementing custom Pub/Sub

Bring Your Own Pub/Sub.

The Pub/Sub interface

To add support for a custom Pub/Sub, you have to implement both message.Publisher and message.Subscriber interfaces.

Full source: github.com/ThreeDotsLabs/watermill/message/pubsub.go

  1. // ...
  2. type Publisher interface {
  3. // Publish publishes provided messages to given topic.
  4. //
  5. // Publish can be synchronous or asynchronous - it depends on the implementation.
  6. //
  7. // Most publishers implementations don't support atomic publishing of messages.
  8. // This means that if publishing one of the messages fails, the next messages will not be published.
  9. //
  10. // Publish must be thread safe.
  11. Publish(topic string, messages ...*Message) error
  12. // Close should flush unsent messages, if publisher is async.
  13. Close() error
  14. }
  15. type Subscriber interface {
  16. // Subscribe returns output channel with messages from provided topic.
  17. // Channel is closed, when Close() was called on the subscriber.
  18. //
  19. // To receive the next message, `Ack()` must be called on the received message.
  20. // If message processing failed and message should be redelivered `Nack()` should be called.
  21. //
  22. // When provided ctx is cancelled, subscriber will close subscribe and close output channel.
  23. // Provided ctx is set to all produced messages.
  24. // When Nack or Ack is called on the message, context of the message is canceled.
  25. Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
  26. // Close closes all subscriptions with their output channels and flush offsets etc. when needed.
  27. Close() error
  28. }
  29. type SubscribeInitializer interface {
  30. // ...

TODO list

Here are a few things you shouldn’t forget about:

  • Logging (good messages and proper levels).
  • Replaceable and configurable messages marshaller.
  • Close() implementation for the publisher and subscriber that is:
    • idempotent
    • working correctly even when the publisher or the subscriber is blocked (for example, waiting for an Ack).
    • working correctly when the subscriber output channel is blocked (because nothing is listening on it).
  • Ack() and Nack() support for consumed messages.
  • Redelivery on Nack() for a consumed message.
  • Use Universal Pub/Sub tests
  • Performance optimizations.
  • GoDocs, Markdown docs and Getting Started examples.We will also be thankful for submitting a pull requests with the new Pub/Sub implementation.

If anything is not clear, feel free to use any of our support channels to reach us, we will be glad to help.