Message
Message is one of core parts of Watermill.
Message
Message is one of core parts of Watermill. Messages are emitted by Publishers and received by Subscribers.When a message is processed, you should send an Ack()
or a Nack()
when the processing failed.
Acks
and Nacks
are processed by Subscribers (in default implementations, the subscribers are waiting for an Ack
or a Nack
).
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
type Message struct {
// UUID is an unique identifier of message.
//
// It is only used by Watermill for debugging.
// UUID can be empty.
UUID string
// Metadata contains the message metadata.
//
// Can be used to store data which doesn't require unmarshaling entire payload.
// It is something similar to HTTP request's headers.
//
// Metadata is marshaled and will be saved to PubSub.
Metadata Metadata
// Payload is message's payload.
Payload Payload
// ack is closed, when acknowledge is received.
ack chan struct{}
// noACk is closed, when negative acknowledge is received.
noAck chan struct{}
ackMutex sync.Mutex
ackSentType ackType
ctx context.Context
}
// ...
Ack
Sending Ack
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Ack sends message's acknowledgement.
//
// Ack is not blocking.
// Ack is idempotent.
// False is returned, if Nack is already sent.
func (m *Message) Ack() bool {
// ...
Nack
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Nack sends message's negative acknowledgement.
//
// Nack is not blocking.
// Nack is idempotent.
// False is returned, if Ack is already sent.
func (m *Message) Nack() bool {
// ...
Receiving Ack/Nack
Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go
// ...
select {
case <-msg.Acked():
log.Print("ack received")
case <-msg.Nacked():
log.Print("nack received")
}
// ...
Context
Message contains the standard library context, just like an HTTP request.
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Context returns the message's context. To change the context, use
// SetContext.
//
// The returned context is always non-nil; it defaults to the
// background context.
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
return context.Background()
}
// SetContext sets provided context to the message.
func (m *Message) SetContext(ctx context.Context) {
m.ctx = ctx
}
// ...