1. Topic模式(话题模式,一个消息被多个消费者获取,消息的目标queue可用BindingKey以通配符,(#:一个或多个词,*:一个词)的方式指定

Topic模式 - 图1

  • 星号井号代表通配符
  • 星号代表多个单词,井号代表一个单词
  • 路由功能添加模糊匹配
  • 消息产生者产生消息,把消息交给交换机
  • 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

目录结构

Topic模式 - 图2

kuteng-RabbitMQ

-RabbitMQ

—rabitmq.go //这个是RabbitMQ的封装

-publish

—mainpublish.go //Publish 先启动

-recieve1

—mainrecieve.go

-recieve2

—mainrecieve.go

rabitmq.go代码

  1. package RabbitMQ
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. //连接信息
  8. const MQURL = "amqp://kuteng:kuteng@127.0.0.1:5672/kuteng"
  9. //rabbitMQ结构体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. //队列名称
  14. QueueName string
  15. //交换机名称
  16. Exchange string
  17. //bind Key 名称
  18. Key string
  19. //连接信息
  20. Mqurl string
  21. }
  22. //创建结构体实例
  23. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  24. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  25. }
  26. //断开channel 和 connection
  27. func (r *RabbitMQ) Destory() {
  28. r.channel.Close()
  29. r.conn.Close()
  30. }
  31. //错误处理函数
  32. func (r *RabbitMQ) failOnErr(err error, message string) {
  33. if err != nil {
  34. log.Fatalf("%s:%s", message, err)
  35. panic(fmt.Sprintf("%s:%s", message, err))
  36. }
  37. }
  38. //话题模式
  39. //创建RabbitMQ实例
  40. func NewRabbitMQTopic(exchangeName string, routingKey string) *RabbitMQ {
  41. //创建RabbitMQ实例
  42. rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
  43. var err error
  44. //获取connection
  45. rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  46. rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
  47. //获取channel
  48. rabbitmq.channel, err = rabbitmq.conn.Channel()
  49. rabbitmq.failOnErr(err, "failed to open a channel")
  50. return rabbitmq
  51. }
  52. //话题模式发送消息
  53. func (r *RabbitMQ) PublishTopic(message string) {
  54. //1.尝试创建交换机
  55. err := r.channel.ExchangeDeclare(
  56. r.Exchange,
  57. //要改成topic
  58. "topic",
  59. true,
  60. false,
  61. false,
  62. false,
  63. nil,
  64. )
  65. r.failOnErr(err, "Failed to declare an excha"+
  66. "nge")
  67. //2.发送消息
  68. err = r.channel.Publish(
  69. r.Exchange,
  70. //要设置
  71. r.Key,
  72. false,
  73. false,
  74. amqp.Publishing{
  75. ContentType: "text/plain",
  76. Body: []byte(message),
  77. })
  78. }
  79. //话题模式接受消息
  80. //要注意key,规则
  81. //其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
  82. //匹配 kuteng.* 表示匹配 kuteng.hello, kuteng.hello.one需要用kuteng.#才能匹配到
  83. func (r *RabbitMQ) RecieveTopic() {
  84. //1.试探性创建交换机
  85. err := r.channel.ExchangeDeclare(
  86. r.Exchange,
  87. //交换机类型
  88. "topic",
  89. true,
  90. false,
  91. false,
  92. false,
  93. nil,
  94. )
  95. r.failOnErr(err, "Failed to declare an exch"+
  96. "ange")
  97. //2.试探性创建队列,这里注意队列名称不要写
  98. q, err := r.channel.QueueDeclare(
  99. "", //随机生产队列名称
  100. false,
  101. false,
  102. true,
  103. false,
  104. nil,
  105. )
  106. r.failOnErr(err, "Failed to declare a queue")
  107. //绑定队列到 exchange 中
  108. err = r.channel.QueueBind(
  109. q.Name,
  110. //在pub/sub模式下,这里的key要为空
  111. r.Key,
  112. r.Exchange,
  113. false,
  114. nil)
  115. //消费消息
  116. messges, err := r.channel.Consume(
  117. q.Name,
  118. "",
  119. true,
  120. false,
  121. false,
  122. false,
  123. nil,
  124. )
  125. forever := make(chan bool)
  126. go func() {
  127. for d := range messges {
  128. log.Printf("Received a message: %s", d.Body)
  129. }
  130. }()
  131. fmt.Println("退出请按 CTRL+C\n")
  132. <-forever
  133. }

mainpublish.go代码

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  7. )
  8. func main() {
  9. kutengOne := RabbitMQ.NewRabbitMQTopic("exKutengTopic", "kuteng.topic.one")
  10. kutengTwo := RabbitMQ.NewRabbitMQTopic("exKutengTopic", "kuteng.topic.two")
  11. for i := 0; i <= 100; i++ {
  12. kutengOne.PublishTopic("Hello kuteng topic one!" + strconv.Itoa(i))
  13. kutengTwo.PublishTopic("Hello kuteng topic Two!" + strconv.Itoa(i))
  14. time.Sleep(1 * time.Second)
  15. fmt.Println(i)
  16. }
  17. }

recieve1/mainrecieve.go代码

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. kutengOne := RabbitMQ.NewRabbitMQTopic("exKutengTopic", "#")
  5. kutengOne.RecieveTopic()
  6. }

recieve2/mainrecieve.go代码

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. kutengOne := RabbitMQ.NewRabbitMQTopic("exKutengTopic", "kuteng.*.two")
  5. kutengOne.RecieveTopic()
  6. }