Replying to a Message

Incoming messages have an optional reply-to field. If that field is set, it will contain a subject to which a reply is expected.

For example, the following code will listen for that request and respond with the time.

Go

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Subscribe
  7. sub, err := nc.SubscribeSync("time")
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. // Read a message
  12. msg, err := sub.NextMsg(10 * time.Second)
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. // Get the time
  17. timeAsBytes := []byte(time.Now().String())
  18. // Send the time as the response.
  19. msg.Respond(timeAsBytes)

Java

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Subscribe
  3. Subscription sub = nc.subscribe("time");
  4. // Read a message
  5. Message msg = sub.nextMessage(Duration.ZERO);
  6. // Get the time
  7. Calendar cal = Calendar.getInstance();
  8. SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
  9. byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
  10. // Send the time
  11. nc.publish(msg.getReplyTo(), timeAsBytes);
  12. // Flush and close the connection
  13. nc.flush(Duration.ZERO);
  14. nc.close();

JavaScript

  1. let nc = NATS.connect({
  2. url: "nats://demo.nats.io:4222"
  3. });
  4. // set up a subscription to process a request
  5. nc.subscribe('time', (msg, reply) => {
  6. if (msg.reply) {
  7. nc.publish(msg.reply, new Date().toLocaleTimeString());
  8. }
  9. });

Python

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. future = asyncio.Future()
  4. async def cb(msg):
  5. nonlocal future
  6. future.set_result(msg)
  7. await nc.subscribe("time", cb=cb)
  8. await nc.publish_request("time", new_inbox(), b'What is the time?')
  9. await nc.flush()
  10. # Read the message
  11. msg = await asyncio.wait_for(future, 1)
  12. # Send the time
  13. time_as_bytes = "{}".format(datetime.now()).encode()
  14. await nc.publish(msg.reply, time_as_bytes)

Ruby

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("time") do |msg, reply|
  7. f.resume Time.now
  8. end
  9. nc.publish("time", 'What is the time?', NATS.create_inbox)
  10. # Use the response
  11. msg = Fiber.yield
  12. puts "Reply: #{msg}"
  13. end.resume
  14. end

TypeScript

  1. // set up a subscription to process a request
  2. await nc.subscribe('time', (err, msg) => {
  3. if (msg.reply) {
  4. nc.publish(msg.reply, new Date().toLocaleTimeString());
  5. } else {
  6. t.log('got a request for the time, but no reply subject was set.');
  7. }
  8. });

C

  1. natsConnection *conn = NULL;
  2. natsSubscription *sub = NULL;
  3. natsMsg *msg = NULL;
  4. natsStatus s = NATS_OK;
  5. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  6. // Subscribe
  7. if (s == NATS_OK)
  8. s = natsConnection_SubscribeSync(&sub, conn, "time");
  9. // Wait for messages
  10. if (s == NATS_OK)
  11. s = natsSubscription_NextMsg(&msg, sub, 10000);
  12. if (s == NATS_OK)
  13. {
  14. char buf[64];
  15. snprintf(buf, sizeof(buf), "%lld", nats_Now());
  16. // Send the time as a response
  17. s = natsConnection_Publish(conn, natsMsg_GetReply(msg), buf, (int) strlen(buf));
  18. // Destroy message that was received
  19. natsMsg_Destroy(msg);
  20. }
  21. (...)
  22. // Destroy objects that were created
  23. natsSubscription_Destroy(sub);
  24. natsConnection_Destroy(conn);