Pub/sub

Creating a pub/sub component requires just a few basic steps.

Import pub/sub packages

Create the file components/pubsub.go and add import statements for the pub/sub related packages.

  1. package components
  2. import (
  3. "context"
  4. "github.com/dapr/components-contrib/pubsub"
  5. )

Implement the PubSub interface

Create a type that implements the PubSub interface.

  1. type MyPubSubComponent struct {
  2. }
  3. func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
  4. // Called to initialize the component with its configured metadata...
  5. }
  6. func (component *MyPubSubComponent) Close() error {
  7. // Not used with pluggable components...
  8. return nil
  9. }
  10. func (component *MyPubSubComponent) Features() []pubsub.Feature {
  11. // Return a list of features supported by the component...
  12. }
  13. func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
  14. // Send the message to the "topic"...
  15. }
  16. func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
  17. // Until canceled, check the topic for messages and deliver them to the Dapr runtime...
  18. }

Calls to the Subscribe() method are expected to set up a long-lived mechanism for retrieving messages but immediately return nil (or an error, if that mechanism could not be set up). The mechanism should end when canceled (for example, via the ctx.Done() or ctx.Err() != nil). The “topic” from which messages should be pulled is passed via the req argument, while the delivery to the Dapr runtime is performed via the handler callback. The callback doesn’t return until the application (served by the Dapr runtime) acknowledges processing of the message.

  1. func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
  2. go func() {
  3. for {
  4. err := ctx.Err()
  5. if err != nil {
  6. return
  7. }
  8. messages := // Poll for messages...
  9. for _, message := range messages {
  10. handler(ctx, &pubsub.NewMessage{
  11. // Set the message content...
  12. })
  13. }
  14. select {
  15. case <-ctx.Done():
  16. case <-time.After(5 * time.Second):
  17. }
  18. }
  19. }()
  20. return nil
  21. }

Register pub/sub component

In the main application file (for example, main.go), register the pub/sub component with the application.

  1. package main
  2. import (
  3. "example/components"
  4. dapr "github.com/dapr-sandbox/components-go-sdk"
  5. "github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
  6. )
  7. func main() {
  8. dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
  9. return &components.MyPubSubComponent{}
  10. }))
  11. dapr.MustRun()
  12. }

Next steps