CQRS Component
Golang CQRS implementation in Watermill.
CQRS
CQRS means “Command-query responsibility segregation”. We segregate the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are handled by different objects.
That’s it. We can further split up the data storage, having separate read and write stores. Once that happens, there may be many read stores, optimized for handling different types of queries or spanning many bounded contexts. Though separate read/write stores are often discussed in relation with CQRS, this is not CQRS itself. CQRS is just the first split of commands and queries.
Source: www.cqrs.nu FAQ
Glossary
Command
The command is a simple data structure, representing the request for executing some operation.
Command Bus
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus transports commands to command handlers.
type CommandBus struct {
// ...
Command Processor
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessor determines which CommandHandler should handle the command received from the command bus.
type CommandProcessor struct {
// ...
Command Handler
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandHandler receives a command defined by NewCommand and handles it with the Handle method.
// If using DDD, CommandHandler may modify and persist the aggregate.
//
// In contrast to EvenHandler, every Command must have only one CommandHandler.
type CommandHandler interface {
// ...
Event
The event represents something that already took place. Events are immutable.
Event Bus
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus transports events to event handlers.
type EventBus struct {
// ...
Event Processor
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor determines which EventHandler should handle event received from event bus.
type EventProcessor struct {
// ...
Event Handler
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventHandler receives events defined by NewEvent and handles them with its Handle method.
// If using DDD, CommandHandler may modify and persist the aggregate.
// It can also invoke a process manager, a saga or just build a read model.
//
// In contrast to CommandHandler, every Event can have multiple EventHandlers.
type EventHandler interface {
// ...
CQRS Facade
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/cqrs.go
// ...
// Facade is a facade for creating the Command and Event buses and processors.
// It was created to avoid boilerplate, when using CQRS in the standard way.
// You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.
type Facade struct {
// ...
Command and Event Marshaler
Full source: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa.
// Payload of the command needs to be marshaled to []bytes.
type CommandEventMarshaler interface {
// Marshal marshals Command or Event to Watermill's message.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal unmarshals watermill's message to v Command or Event.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name returns the name of Command or Event.
// Name is used to determine, that received command or event is event which we want to handle.
Name(v interface{}) string
// NameFromMessage return the name of Command or Event from Watermill's message (generated by Marshal).
//
// When we have Commnad or Event marshaled to Watermill's message,
// we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
NameFromMessage(msg *message.Message) string
}
// ...
Usage
Example domain
As an example, we will use a simple domain, that is responsible for handing room booking in a hotel.
We will use Event Storming notation to show the model of this domain.
Legend:
- blue post-its are commands
- orange post-its are events
- green post-its are read models, asynchronously generated from events
- violet post-its are policies, which are triggered by events and produce commands
- pink post its are hot-spots; we mark places where problems often occur
The domain is simple:
- A Guest is able to book a room.
- Whenever a room is booked, we order a beer for the guest (because we love our guests).
- We know that sometimes there are not enough beers.
- We generate a financial report based on the bookings.
Sending a command
For the beginning, we need to simulate the guest’s action.
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Command handler
BookRoomHandler
will handle our command.
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
//
// In CQRS, one command must be handled by only one handler.
// When another handler with this command is added to command processor, error will be retuerned.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand returns type of command which this handle should handle. It must be a pointer.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c is always the type returned by `NewCommand`, so casting is always safe
cmd := c.(*BookRoom)
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s for %s from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
// in future RoomBooked may be handled by multiple event handler
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
// ...
Event handler
As mentioned before, we want to order a beer every time when a room is booked (“Whenever a Room is booked” post-it). We do it by using the OrderBeer
command.
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler is a command handler, which handles OrderBeer command and emits BeerOrdered.
// ...
OrderBeerHandler
is very similar to BookRoomHandler
. The only difference is, that it sometimes returns an error when there are not enough beers, which causes redelivery of the command.You can find the entire implementation in the example source code.
Building a read model with the event handler
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport is a read model, which calculates how much money we may earn from bookings.
// Like OrderBeerOnRoomBooked, it listens for RoomBooked event.
//
// This implementation is just writing to the memory. In production, you will probably will use some persistent storage.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle may be called concurrently, so it need to be thread safe.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's make this example ready for other Pub/Sub implementations.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Wiring it up - the CQRS facade
We have all the blocks to build our CQRS application. We now need to use some kind of glue to wire it up.
We will use the simplest in-memory messaging infrastructure: GoChannel.
Under the hood, CQRS is using Watermill’s message router. If you are not familiar with it and want to learn how it works, you should check Getting Started guide.It will also show you how to use some standard messaging patterns, like metrics, poison queue, throttling, correlation and other tools used by every message-driven application. Those come built-in with Watermill.
Let’s go back to the CQRS. As you already know, CQRS is built from multiple components, like Command or Event buses, handlers, processors, etc.To simplify creating all these building blocks, we created cqrs.Facade
, which creates all of them.
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/docs/pub-sub-implementations/
// Detailed RabbitMQ implementation: https://watermill.io/docs/pub-sub-implementations/#rabbitmq-amqp
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
// cqrs.Facade is facade for Command and Event buses and processors.
// You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
GenerateCommandsTopic: func(commandName string) string {
// we are using queue RabbitMQ config, so we need to have topic per command type
return commandName
},
CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
return []cqrs.CommandHandler{
BookRoomHandler{eb},
OrderBeerHandler{eb},
}
},
CommandsPublisher: commandsPublisher,
CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
GenerateEventsTopic: func(eventName string) string {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events"
// we can also use topic per event type
// return eventName
},
EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
OrderBeerOnRoomBooked{cb},
NewBookingsFinancialReport(),
}
},
EventsPublisher: eventsPublisher,
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),
)
return amqp.NewSubscriber(config, logger)
},
Router: router,
CommandEventMarshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(cqrsFacade.CommandBus())
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
And that’s all. We have a working CQRS application.
What’s next?
As mentioned before, if you are not familiar with Watermill, we highly recommend reading Getting Started guide.