Queue Subscriptions

Subscribing to a queue group is only slightly different than subscribing to a subject alone. The application simply includes a queue name with the subscription. The server will load balance between all members of the queue group. In a cluster setup, every member has the same chance of receiving a particular message.

Keep in mind that queue groups in NATS are dynamic and do not require any server configuration.

Queue Subscriptions - 图1

As an example, to subscribe to the queue workers with the subject updates:

{% tabs %} {% tab title=”Go” %}

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Use a WaitGroup to wait for 10 messages to arrive
  7. wg := sync.WaitGroup{}
  8. wg.Add(10)
  9. // Create a queue subscription on "updates" with queue name "workers"
  10. if _, err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
  11. wg.Done()
  12. }); err != nil {
  13. log.Fatal(err)
  14. }
  15. // Wait for messages to come in
  16. wg.Wait()

{% endtab %}

{% tab title=”Java” %}

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Use a latch to wait for 10 messages to arrive
  3. CountDownLatch latch = new CountDownLatch(10);
  4. // Create a dispatcher and inline message handler
  5. Dispatcher d = nc.createDispatcher((msg) -> {
  6. String str = new String(msg.getData(), StandardCharsets.UTF_8);
  7. System.out.println(str);
  8. latch.countDown();
  9. });
  10. // Subscribe
  11. d.subscribe("updates", "workers");
  12. // Wait for a message to come in
  13. latch.await();
  14. // Close the connection
  15. nc.close();

{% endtab %}

{% tab title=”JavaScript” %}

  1. nc.subscribe(subj, {
  2. queue: "workers",
  3. callback: (_err, _msg) => {
  4. t.log("worker1 got message");
  5. },
  6. });
  7. nc.subscribe(subj, {
  8. queue: "workers",
  9. callback: (_err, _msg) => {
  10. t.log("worker2 got message");
  11. },
  12. });

{% endtab %}

{% tab title=”Python” %}

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. future = asyncio.Future()
  4. async def cb(msg):
  5. nonlocal future
  6. future.set_result(msg)
  7. await nc.subscribe("updates", queue="workers", cb=cb)
  8. await nc.publish("updates", b'All is Well')
  9. msg = await asyncio.wait_for(future, 1)
  10. print("Msg", msg)

{% endtab %}

{% tab title=”Ruby” %}

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("updates", queue: "worker") do |msg, reply|
  7. f.resume Time.now
  8. end
  9. nc.publish("updates", "A")
  10. # Use the response
  11. msg = Fiber.yield
  12. puts "Msg: #{msg}"
  13. end.resume
  14. end

{% endtab %}

{% tab title=”C” %}

  1. static void
  2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
  3. {
  4. printf("Received msg: %s - %.*s\n",
  5. natsMsg_GetSubject(msg),
  6. natsMsg_GetDataLength(msg),
  7. natsMsg_GetData(msg));
  8. // Need to destroy the message!
  9. natsMsg_Destroy(msg);
  10. }
  11. (...)
  12. natsConnection *conn = NULL;
  13. natsSubscription *sub = NULL;
  14. natsStatus s;
  15. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  16. // Create a queue subscription on "updates" with queue name "workers"
  17. if (s == NATS_OK)
  18. s = natsConnection_QueueSubscribe(&sub, conn, "updates", "workers", onMsg, NULL);
  19. (...)
  20. // Destroy objects that were created
  21. natsSubscription_Destroy(sub);
  22. natsConnection_Destroy(conn);

{% endtab %} {% endtabs %}

If you run this example with the publish examples that send to updates, you will see that one of the instances gets a message while the others you run won’t. But the instance that receives the message will change.