Streams

The first step is to set up storage for our ORDERS related messages, these arrive on a wildcard of subjects all flowing into the same Stream and they are kept for 1 year.

Creating

  1. $ nats str add ORDERS
  2. ? Subjects to consume ORDERS.*
  3. ? Storage backend file
  4. ? Retention Policy Limits
  5. ? Discard Policy Old
  6. ? Message count limit -1
  7. ? Message size limit -1
  8. ? Maximum message age limit 1y
  9. ? Maximum individual message size [? for help] (-1) -1
  10. Stream ORDERS was created
  11. Information for Stream ORDERS
  12. Configuration:
  13. Subjects: ORDERS.*
  14. Acknowledgements: true
  15. Retention: File - Limits
  16. Replicas: 1
  17. Maximum Messages: -1
  18. Maximum Bytes: -1
  19. Maximum Age: 8760h0m0s
  20. Maximum Message Size: -1
  21. Maximum Consumers: -1
  22. Statistics:
  23. Messages: 0
  24. Bytes: 0 B
  25. FirstSeq: 0
  26. LastSeq: 0
  27. Active Consumers: 0

You can get prompted interactively for missing information as above, or do it all on one command. Pressing ? in the CLI will help you map prompts to CLI options:

  1. nats str add ORDERS --subjects "ORDERS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard old --dupe-window="0s" --replicas 1

Additionally one can store the configuration in a JSON file, the format of this is the same as $ nats str info ORDERS -j | jq .config:

  1. $ nats str add ORDERS --config orders.json

Listing

We can confirm our Stream was created:

  1. $ nats str ls
  2. Streams:
  3. ORDERS

Querying

Information about the configuration of the Stream can be seen, and if you did not specify the Stream like below, it will prompt you based on all known ones:

  1. $ nats str info ORDERS
  2. Information for Stream ORDERS created 2021-02-27T16:49:36-07:00
  3. Configuration:
  4. Subjects: ORDERS.*
  5. Acknowledgements: true
  6. Retention: File - Limits
  7. Replicas: 1
  8. Discard Policy: Old
  9. Duplicate Window: 2m0s
  10. Maximum Messages: unlimited
  11. Maximum Bytes: unlimited
  12. Maximum Age: 1y0d0h0m0s
  13. Maximum Message Size: unlimited
  14. Maximum Consumers: unlimited
  15. State:
  16. Messages: 0
  17. Bytes: 0 B
  18. FirstSeq: 0
  19. LastSeq: 0
  20. Active Consumers: 0

Most commands that show data as above support -j to show the results as JSON:

  1. $ nats str info ORDERS -j
  2. {
  3. "config": {
  4. "name": "ORDERS",
  5. "subjects": [
  6. "ORDERS.*"
  7. ],
  8. "retention": "limits",
  9. "max_consumers": -1,
  10. "max_msgs": -1,
  11. "max_bytes": -1,
  12. "max_age": 31536000000000000,
  13. "max_msg_size": -1,
  14. "storage": "file",
  15. "discard": "old",
  16. "num_replicas": 1,
  17. "duplicate_window": 120000000000
  18. },
  19. "created": "2021-02-27T23:49:36.700424Z",
  20. "state": {
  21. "messages": 0,
  22. "bytes": 0,
  23. "first_seq": 0,
  24. "first_ts": "0001-01-01T00:00:00Z",
  25. "last_seq": 0,
  26. "last_ts": "0001-01-01T00:00:00Z",
  27. "consumer_count": 0
  28. }
  29. }

This is the general pattern for the entire nats utility as it relates to JetStream - prompting for needed information but every action can be run non-interactively making it usable as a CLI API. All information output like seen above can be turned into JSON using -j.

Copying

A stream can be copied into another, which also allows the configuration of the new one to be adjusted via CLI flags:

  1. $ nats str cp ORDERS ARCHIVE --subjects "ORDERS_ARCVHIVE.*" --max-age 2y
  2. Stream ORDERS was created
  3. Information for Stream ORDERS created 2021-02-27T16:52:46-07:00
  4. Configuration:
  5. Subjects: ORDERS_ARCHIVE.*
  6. Acknowledgements: true
  7. Retention: File - Limits
  8. Replicas: 1
  9. Discard Policy: Old
  10. Duplicate Window: 2m0s
  11. Maximum Messages: unlimited
  12. Maximum Bytes: unlimited
  13. Maximum Age: 2y0d0h0m0s
  14. Maximum Message Size: unlimited
  15. Maximum Consumers: unlimited
  16. State:
  17. Messages: 0
  18. Bytes: 0 B
  19. FirstSeq: 0
  20. LastSeq: 0
  21. Active Consumers: 0

Editing

A stream configuration can be edited, which allows the configuration to be adjusted via CLI flags. Here I have an incorrectly created ORDERS stream that I fix:

  1. $ nats str info ORDERS -j | jq .config.subjects
  2. [
  3. "ORDERS.new"
  4. ]
  5. $ nats str edit ORDERS --subjects "ORDERS.*"
  6. Stream ORDERS was updated
  7. Information for Stream ORDERS
  8. Configuration:
  9. Subjects: ORDERS.*
  10. ....

Additionally, one can store the configuration in a JSON file, the format of this is the same as $ nats str info ORDERS -j | jq .config:

  1. $ nats str edit ORDERS --config orders.json

Publishing Into a Stream

Now let’s add some messages to our Stream. You can use nats pub to add messages, pass the --wait flag to see the publish ack being returned.

You can publish without waiting for acknowledgement:

  1. $ nats pub ORDERS.scratch hello
  2. Published [sub1] : 'hello'

But if you want to be sure your messages got to JetStream and were persisted you can make a request:

  1. $ nats req ORDERS.scratch hello
  2. 13:45:03 Sending request on [ORDERS.scratch]
  3. 13:45:03 Received on [_INBOX.M8drJkd8O5otORAo0sMNkg.scHnSafY]: '+OK'

Keep checking the status of the Stream while doing this and you’ll see its stored messages increase.

  1. $ nats str info ORDERS
  2. Information for Stream ORDERS
  3. ...
  4. Statistics:
  5. Messages: 3
  6. Bytes: 147 B
  7. FirstSeq: 1
  8. LastSeq: 3
  9. Active Consumers: 0

After putting some throwaway data into the Stream, we can purge all the data out - while keeping the Stream active:

Deleting All Data

To delete all data in a stream use purge:

  1. $ nats str purge ORDERS -f
  2. ...
  3. State:
  4. Messages: 0
  5. Bytes: 0 B
  6. FirstSeq: 1,000,001
  7. LastSeq: 1,000,000
  8. Active Consumers: 0

Deleting A Message

A single message can be securely removed from the stream:

  1. $ nats str rmm ORDERS 1 -f

Deleting Sets

Finally, for demonstration purposes, you can also delete the whole Stream and recreate it so then we’re ready for creating the Consumers:

  1. $ nats str rm ORDERS -f
  2. $ nats str add ORDERS --subjects "ORDERS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard old --dupe-window="0s" --replicas 1