Replying to a Message

Incoming messages have an optional reply-to field. If that field is set, it will contain a subject to which a reply is expected.

For example, the following code will listen for that request and respond with the time.

{% tabs %} {% tab title=”Go” %}

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Subscribe
  7. sub, err := nc.SubscribeSync("time")
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. // Read a message
  12. msg, err := sub.NextMsg(10 * time.Second)
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. // Get the time
  17. timeAsBytes := []byte(time.Now().String())
  18. // Send the time as the response.
  19. msg.Respond(timeAsBytes)

{% endtab %}

{% tab title=”Java” %}

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Subscribe
  3. Subscription sub = nc.subscribe("time");
  4. // Read a message
  5. Message msg = sub.nextMessage(Duration.ZERO);
  6. // Get the time
  7. Calendar cal = Calendar.getInstance();
  8. SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
  9. byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
  10. // Send the time
  11. nc.publish(msg.getReplyTo(), timeAsBytes);
  12. // Flush and close the connection
  13. nc.flush(Duration.ZERO);
  14. nc.close();

{% endtab %}

{% tab title=”JavaScript” %}

  1. const sc = StringCodec();
  2. // set up a subscription to process a request
  3. const sub = nc.subscribe("time");
  4. for await (const m of sub) {
  5. m.respond(sc.encode(new Date().toLocaleDateString()));
  6. }

{% endtab %}

{% tab title=”Python” %}

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. future = asyncio.Future()
  4. async def cb(msg):
  5. nonlocal future
  6. future.set_result(msg)
  7. await nc.subscribe("time", cb=cb)
  8. await nc.publish_request("time", new_inbox(), b'What is the time?')
  9. await nc.flush()
  10. # Read the message
  11. msg = await asyncio.wait_for(future, 1)
  12. # Send the time
  13. time_as_bytes = "{}".format(datetime.now()).encode()
  14. await nc.publish(msg.reply, time_as_bytes)

{% endtab %}

{% tab title=”Ruby” %}

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("time") do |msg, reply|
  7. f.resume Time.now
  8. end
  9. nc.publish("time", 'What is the time?', NATS.create_inbox)
  10. # Use the response
  11. msg = Fiber.yield
  12. puts "Reply: #{msg}"
  13. end.resume
  14. end

{% endtab %}

{% tab title=”C” %}

  1. natsConnection *conn = NULL;
  2. natsSubscription *sub = NULL;
  3. natsMsg *msg = NULL;
  4. natsStatus s = NATS_OK;
  5. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  6. // Subscribe
  7. if (s == NATS_OK)
  8. s = natsConnection_SubscribeSync(&sub, conn, "time");
  9. // Wait for messages
  10. if (s == NATS_OK)
  11. s = natsSubscription_NextMsg(&msg, sub, 10000);
  12. if (s == NATS_OK)
  13. {
  14. char buf[64];
  15. snprintf(buf, sizeof(buf), "%lld", nats_Now());
  16. // Send the time as a response
  17. s = natsConnection_Publish(conn, natsMsg_GetReply(msg), buf, (int) strlen(buf));
  18. // Destroy message that was received
  19. natsMsg_Destroy(msg);
  20. }
  21. (...)
  22. // Destroy objects that were created
  23. natsSubscription_Destroy(sub);
  24. natsConnection_Destroy(conn);

{% endtab %} {% endtabs %}