Publishing to Streams

{% tabs %} {% tab title=”Go” %}

  1. func ExampleJetStream() {
  2. nc, err := nats.Connect("localhost")
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. // Use the JetStream context to produce and consumer messages
  7. // that have been persisted.
  8. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. js.AddStream(&nats.StreamConfig{
  13. Name: "example-stream",
  14. Subjects: []string{"example-subject"},
  15. })
  16. js.Publish("example-subject", []byte("Hello JS!"))
  17. // Publish messages asynchronously.
  18. for i := 0; i < 500; i++ {
  19. js.PublishAsync("example-subject", []byte("Hello JS Async!"))
  20. }
  21. select {
  22. case <-js.PublishAsyncComplete():
  23. case <-time.After(5 * time.Second):
  24. fmt.Println("Did not resolve in time")
  25. }
  26. }

{% endtab %}

{% tab title=”Java” %}

  1. try (Connection nc = Nats.connect("localhost")) {
  2. JetStreamManagement jsm = nc.jetStreamManagement();
  3. jsm.addStream(StreamConfiguration.builder()
  4. .name("example-stream")
  5. .subjects("example-subject")
  6. .build());
  7. JetStream js = jsm.jetStream();
  8. // Publish Synchronously
  9. PublishAck pa = js.publish("example-subject", "Hello JS Sync!".getBytes());
  10. System.out.println("Publish Sequence: " + pa.getSeqno());
  11. // Publish Asynchronously
  12. CompletableFuture<PublishAck> future =
  13. js.publishAsync("example-subject", "Hello JS Async!".getBytes());
  14. try {
  15. pa = future.get(1, TimeUnit.SECONDS);
  16. System.out.println("Publish Sequence: " + pa.getSeqno());
  17. }
  18. catch (ExecutionException e) {
  19. // Might have been a problem with the publish,
  20. // such as a failed expectation (advanced feature)
  21. // Also could be that the publish ack did not return in time
  22. // from the internal request timeout
  23. }
  24. catch (TimeoutException e) {
  25. // The future timed out meaning it's timeout was shorter than
  26. // the publish async's request timeout
  27. }
  28. catch (InterruptedException e) {
  29. // The future.get() thread was interrupted.
  30. }
  31. }

{% endtab %}

{% tab title=”JavaScript” %}

  1. import { connect, Empty } from "../../src/mod.ts";
  2. const nc = await connect();
  3. const jsm = await nc.jetstreamManager();
  4. await jsm.streams.add({ name: "example-stream", subjects: ["example-subject"] });
  5. const js = await nc.jetstream();
  6. // the jetstream client provides a publish that returns
  7. // a confirmation that the message was received and stored
  8. // by the server. You can associate various expectations
  9. // when publishing a message to prevent duplicates.
  10. // If the expectations are not met, the message is rejected.
  11. let pa = await js.publish("example-subject", Empty, {
  12. msgID: "a",
  13. expect: { streamName: "example-stream" },
  14. });
  15. console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
  16. pa = await js.publish("example-subject", Empty, {
  17. msgID: "a",
  18. expect: { lastSequence: 1 },
  19. });
  20. console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
  21. await jsm.streams.delete("example-stream");
  22. await nc.drain();

{% endtab %}

{% tab title=”Python” %}

  1. import asyncio
  2. import nats
  3. from nats.errors import TimeoutError
  4. async def main():
  5. nc = await nats.connect("localhost")
  6. # Create JetStream context.
  7. js = nc.jetstream()
  8. # Persist messages on 'example-subject'.
  9. await js.add_stream(name="example-stream", subjects=["example-subject"])
  10. for i in range(0, 10):
  11. ack = await js.publish("example-subject", f"hello world: {i}".encode())
  12. print(ack)
  13. await nc.close()
  14. if __name__ == '__main__':
  15. asyncio.run(main())

{% endtab %}

{% tab title=”C# v1” %}

  1. using (IConnection nc = new ConnectionFactory().CreateConnection("nats://localhost:4222"))
  2. {
  3. IJetStreamManagement jsm = nc.CreateJetStreamManagementContext();
  4. jsm.AddStream(StreamConfiguration.Builder()
  5. .WithName("example-stream")
  6. .WithSubjects("example-subject")
  7. .Build());
  8. IJetStream js = jsm.GetJetStreamContext();
  9. // Publish Synchronously
  10. PublishAck pa = js.Publish("example-subject", Encoding.UTF8.GetBytes("Hello JS Sync!"));
  11. Console.WriteLine($"Publish Sequence: {pa.Seq}");
  12. // Publish Asynchronously
  13. Task<PublishAck> task =
  14. js.PublishAsync("example-subject", Encoding.UTF8.GetBytes("Hello JS Async!"));
  15. task.Wait();
  16. Console.WriteLine($"Publish Sequence: {task.Result.Seq}");
  17. }

{% endtab %}

{% tab title=”C” %}

  1. #include "examples.h"
  2. static const char *usage = ""\
  3. "-stream stream name (default is 'foo')\n" \
  4. "-txt text to send (default is 'hello')\n" \
  5. "-count number of messages to send\n" \
  6. "-sync publish synchronously (default is async)\n";
  7. static void
  8. _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
  9. {
  10. int *errors = (int*) closure;
  11. printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
  12. printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
  13. *errors = (*errors + 1);
  14. // If we wanted to resend the original message, we would do something like that:
  15. //
  16. // js_PublishMsgAsync(js, &(pae->Msg), NULL);
  17. //
  18. // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
  19. // ownership, and the library will not destroy the message when this callback returns.
  20. // No need to destroy anything, everything is handled by the library.
  21. }
  22. int main(int argc, char **argv)
  23. {
  24. natsConnection *conn = NULL;
  25. natsStatistics *stats = NULL;
  26. natsOptions *opts = NULL;
  27. jsCtx *js = NULL;
  28. jsOptions jsOpts;
  29. jsErrCode jerr = 0;
  30. natsStatus s;
  31. int dataLen=0;
  32. volatile int errors = 0;
  33. bool delStream = false;
  34. opts = parseArgs(argc, argv, usage);
  35. dataLen = (int) strlen(payload);
  36. s = natsConnection_Connect(&conn, opts);
  37. if (s == NATS_OK)
  38. s = jsOptions_Init(&jsOpts);
  39. if (s == NATS_OK)
  40. {
  41. if (async)
  42. {
  43. jsOpts.PublishAsync.ErrHandler = _jsPubErr;
  44. jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
  45. }
  46. s = natsConnection_JetStream(&js, conn, &jsOpts);
  47. }
  48. if (s == NATS_OK)
  49. {
  50. jsStreamInfo *si = NULL;
  51. // First check if the stream already exists.
  52. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  53. if (s == NATS_NOT_FOUND)
  54. {
  55. jsStreamConfig cfg;
  56. // Since we are the one creating this stream, we can delete at the end.
  57. delStream = true;
  58. // Initialize the configuration structure.
  59. jsStreamConfig_Init(&cfg);
  60. cfg.Name = stream;
  61. // Set the subject
  62. cfg.Subjects = (const char*[1]){subj};
  63. cfg.SubjectsLen = 1;
  64. // Make it a memory stream.
  65. cfg.Storage = js_MemoryStorage;
  66. // Add the stream,
  67. s = js_AddStream(&si, js, &cfg, NULL, &jerr);
  68. }
  69. if (s == NATS_OK)
  70. {
  71. printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  72. si->Config->Name, si->State.Msgs, si->State.Bytes);
  73. // Need to destroy the returned stream object.
  74. jsStreamInfo_Destroy(si);
  75. }
  76. }
  77. if (s == NATS_OK)
  78. s = natsStatistics_Create(&stats);
  79. if (s == NATS_OK)
  80. {
  81. printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
  82. start = nats_Now();
  83. }
  84. for (count = 0; (s == NATS_OK) && (count < total); count++)
  85. {
  86. if (async)
  87. s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
  88. else
  89. {
  90. jsPubAck *pa = NULL;
  91. s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
  92. if (s == NATS_OK)
  93. {
  94. if (pa->Duplicate)
  95. printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
  96. jsPubAck_Destroy(pa);
  97. }
  98. }
  99. }
  100. if ((s == NATS_OK) && async)
  101. {
  102. jsPubOptions jsPubOpts;
  103. jsPubOptions_Init(&jsPubOpts);
  104. // Let's set it to 30 seconds, if getting "Timeout" errors,
  105. // this may need to be increased based on the number of messages
  106. // being sent.
  107. jsPubOpts.MaxWait = 30000;
  108. s = js_PublishAsyncComplete(js, &jsPubOpts);
  109. if (s == NATS_TIMEOUT)
  110. {
  111. // Let's get the list of pending messages. We could resend,
  112. // etc, but for now, just destroy them.
  113. natsMsgList list;
  114. js_PublishAsyncGetPendingList(&list, js);
  115. natsMsgList_Destroy(&list);
  116. }
  117. }
  118. if (s == NATS_OK)
  119. {
  120. jsStreamInfo *si = NULL;
  121. elapsed = nats_Now() - start;
  122. printStats(STATS_OUT, conn, NULL, stats);
  123. printPerf("Sent");
  124. if (errors != 0)
  125. printf("There were %d asynchronous errors\n", errors);
  126. // Let's report some stats after the run
  127. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
  128. if (s == NATS_OK)
  129. {
  130. printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
  131. si->Config->Name, si->State.Msgs, si->State.Bytes);
  132. jsStreamInfo_Destroy(si);
  133. }
  134. }
  135. if (delStream && (js != NULL))
  136. {
  137. printf("\nDeleting stream %s: ", stream);
  138. s = js_DeleteStream(js, stream, NULL, &jerr);
  139. if (s == NATS_OK)
  140. printf("OK!");
  141. printf("\n");
  142. }
  143. if (s != NATS_OK)
  144. {
  145. printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
  146. nats_PrintLastErrorStack(stderr);
  147. }
  148. // Destroy all our objects to avoid report of memory leak
  149. jsCtx_Destroy(js);
  150. natsStatistics_Destroy(stats);
  151. natsConnection_Destroy(conn);
  152. natsOptions_Destroy(opts);
  153. // To silence reports of memory still in used with valgrind
  154. nats_Close();
  155. return 0;
  156. }

{% endtab %} {% endtabs %}