1. Work模式(工作模式,一个消息只能被一个消费者获取)

Work模式 - 图1

  • 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  • 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

目录结构

Work模式 - 图2

kuteng-RabbitMQ

-RabbitMQ

—rabitmq.go //这个是RabbitMQ的封装和Simple模式代码一样

-SimlpePublish

—mainSimlpePublish.go //Publish 先启动

-SimpleRecieve1

—mainSimpleRecieve.go

-SimpleRecieve2

—mainSimpleRecieve.go

注意

Work模式和Simple模式相比代码并没有发生变化只是多了一个消费者

rabitmq.go代码

  1. package RabbitMQ
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. //连接信息amqp://kuteng:kuteng@127.0.0.1:5672/kuteng这个信息是固定不变的amqp://事固定参数后面两个是用户名密码ip地址端口号Virtual Host
  8. const MQURL = "amqp://kuteng:kuteng@127.0.0.1:5672/kuteng"
  9. //rabbitMQ结构体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. //队列名称
  14. QueueName string
  15. //交换机名称
  16. Exchange string
  17. //bind Key 名称
  18. Key string
  19. //连接信息
  20. Mqurl string
  21. }
  22. //创建结构体实例
  23. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  24. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  25. }
  26. //断开channel 和 connection
  27. func (r *RabbitMQ) Destory() {
  28. r.channel.Close()
  29. r.conn.Close()
  30. }
  31. //错误处理函数
  32. func (r *RabbitMQ) failOnErr(err error, message string) {
  33. if err != nil {
  34. log.Fatalf("%s:%s", message, err)
  35. panic(fmt.Sprintf("%s:%s", message, err))
  36. }
  37. }
  38. //创建简单模式下RabbitMQ实例
  39. func NewRabbitMQSimple(queueName string) *RabbitMQ {
  40. //创建RabbitMQ实例
  41. rabbitmq := NewRabbitMQ(queueName, "", "")
  42. var err error
  43. //获取connection
  44. rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  45. rabbitmq.failOnErr(err, "failed to connect rabb"+
  46. "itmq!")
  47. //获取channel
  48. rabbitmq.channel, err = rabbitmq.conn.Channel()
  49. rabbitmq.failOnErr(err, "failed to open a channel")
  50. return rabbitmq
  51. }
  52. //直接模式队列生产
  53. func (r *RabbitMQ) PublishSimple(message string) {
  54. //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  55. _, err := r.channel.QueueDeclare(
  56. r.QueueName,
  57. //是否持久化
  58. false,
  59. //是否自动删除
  60. false,
  61. //是否具有排他性
  62. false,
  63. //是否阻塞处理
  64. false,
  65. //额外的属性
  66. nil,
  67. )
  68. if err != nil {
  69. fmt.Println(err)
  70. }
  71. //调用channel 发送消息到队列中
  72. r.channel.Publish(
  73. r.Exchange,
  74. r.QueueName,
  75. //如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
  76. false,
  77. //如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
  78. false,
  79. amqp.Publishing{
  80. ContentType: "text/plain",
  81. Body: []byte(message),
  82. })
  83. }
  84. //simple 模式下消费者
  85. func (r *RabbitMQ) ConsumeSimple() {
  86. //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  87. q, err := r.channel.QueueDeclare(
  88. r.QueueName,
  89. //是否持久化
  90. false,
  91. //是否自动删除
  92. false,
  93. //是否具有排他性
  94. false,
  95. //是否阻塞处理
  96. false,
  97. //额外的属性
  98. nil,
  99. )
  100. if err != nil {
  101. fmt.Println(err)
  102. }
  103. //接收消息
  104. msgs, err := r.channel.Consume(
  105. q.Name, // queue
  106. //用来区分多个消费者
  107. "", // consumer
  108. //是否自动应答
  109. true, // auto-ack
  110. //是否独有
  111. false, // exclusive
  112. //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  113. false, // no-local
  114. //列是否阻塞
  115. false, // no-wait
  116. nil, // args
  117. )
  118. if err != nil {
  119. fmt.Println(err)
  120. }
  121. forever := make(chan bool)
  122. //启用协程处理消息
  123. go func() {
  124. for d := range msgs {
  125. //消息逻辑处理,可以自行设计逻辑
  126. log.Printf("Received a message: %s", d.Body)
  127. }
  128. }()
  129. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  130. <-forever
  131. }

mainSimlpePublish.go代码

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  7. )
  8. func main() {
  9. rabbitmq := RabbitMQ.NewRabbitMQSimple("" +
  10. "kuteng")
  11. for i := 0; i <= 100; i++ {
  12. rabbitmq.PublishSimple("Hello kuteng!" + strconv.Itoa(i))
  13. time.Sleep(1 * time.Second)
  14. fmt.Println(i)
  15. }
  16. }

mainSimpleRecieve.go代码(两个消费端的代码是一样的)

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. rabbitmq := RabbitMQ.NewRabbitMQSimple("" +
  5. "kuteng")
  6. rabbitmq.ConsumeSimple()
  7. }