1. 消费者

  1. //Nsq接收测试
  2. package main
  3. import (
  4. "fmt"
  5. "time"
  6. "github.com/nsqio/go-nsq"
  7. )
  8. // 消费者
  9. type ConsumerT struct{}
  10. // 主函数
  11. func main() {
  12. InitConsumer("test", "test-channel", "127.0.0.1:4161")
  13. for {
  14. time.Sleep(time.Second * 10)
  15. }
  16. }
  17. //处理消息
  18. func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
  19. fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
  20. return nil
  21. }
  22. //初始化消费者
  23. func InitConsumer(topic string, channel string, address string) {
  24. cfg := nsq.NewConfig()
  25. cfg.LookupdPollInterval = time.Second //设置重连时间
  26. c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
  27. if err != nil {
  28. panic(err)
  29. }
  30. c.SetLogger(nil, 0) //屏蔽系统日志
  31. c.AddHandler(&ConsumerT{}) // 添加消费者接口
  32. //建立NSQLookupd连接
  33. if err := c.ConnectToNSQLookupd(address); err != nil {
  34. panic(err)
  35. }
  36. //建立多个nsqd连接
  37. // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
  38. // panic(err)
  39. // }
  40. // 建立一个nsqd连接
  41. // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
  42. // panic(err)
  43. // }
  44. }