NATS JetStream
The last article introduced the NATS messaging platform and covered its core messaging patterns, how to build a cluster, and integrated NATS into a simple microservice application. Today we will complete our coverage of NATS by introducing NATS JetStream message streaming and demonstrate how to integrate it into yet another simple Spring microservice application.

What is Message Streaming?

In traditional messaging, once a message has been delivered, it is consumed. If we want to reprocess the message, we need to republish it or maintain a separate copy. With message streaming, the messaging provider captures the stream of messages and persists them into some form of persistent storage. With the stream of messages stored, we can retrieve the messages for the duration of their persistence by querying the stream.

NATS Jetstream

NATS JetStream is a distributed message streaming system that uses the publish/subscribe messaging pattern. It is the second generation of message streaming from NATS, replacing the older NATS Streaming, which will sunset in June 2023. With JetStream, we get message streaming support integrated directly into the NATS server. Now, in addition to NATS' core messaging pattern support, we also get persistence (file or memory-based), the ability to read messages from a specific time or message sequence, as well as the durable or ephemeral message consumer support. JetStream features include:

  • At-least-once delivery, exactly once within a window.
  • Store messages and replay by time or sequence.
  • Wildcard support.
  • Account aware.
  • Data at rest encryption.
  • Cleanse specific messages (GDPR).
  • Horizontal scalability.
  • Persist Streams and replay via Consumer.

Running JetStream

As we did in the last article, we will run our NATS instance using docker. We will also use the Synadia nats-box containerized tool to test our instance.

We will spin up our NATS container, but we will include the -js flag to instruct NATS to start JetStream.

docker run --network host -p 4222:4222 nats -js

The output should appear similar to the following:

Terminal
[1] 2021/07/11 21:23:46.393662 [INF] Starting nats-server
[1] 2021/07/11 21:23:46.393712 [INF]   Version:  2.3.4
[1] 2021/07/11 21:23:46.393716 [INF]   Git:      [7112ae0]
[1] 2021/07/11 21:23:46.393720 [INF]   Name:     NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO
[1] 2021/07/11 21:23:46.393726 [INF]   Node:     K0akWYW4
[1] 2021/07/11 21:23:46.393729 [INF]   ID:       NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO
[1] 2021/07/11 21:23:46.395427 [INF] Starting JetStream
[1] 2021/07/11 21:23:46.395911 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[1] 2021/07/11 21:23:46.395915 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[1] 2021/07/11 21:23:46.395918 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[1] 2021/07/11 21:23:46.395921 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[1] 2021/07/11 21:23:46.395924 [INF] 
[1] 2021/07/11 21:23:46.395927 [INF]          https://docs.nats.io/jetstream
[1] 2021/07/11 21:23:46.395930 [INF] 
[1] 2021/07/11 21:23:46.395933 [INF] ---------------- JETSTREAM ----------------
[1] 2021/07/11 21:23:46.395939 [INF]   Max Memory:      35.67 GB
[1] 2021/07/11 21:23:46.395944 [INF]   Max Storage:     99.15 GB
[1] 2021/07/11 21:23:46.395947 [INF]   Store Directory: "/tmp/nats/jetstream"
[1] 2021/07/11 21:23:46.395950 [INF] -------------------------------------------
[1] 2021/07/11 21:23:46.399668 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/07/11 21:23:46.400005 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/07/11 21:23:46.401585 [INF] Server is ready

We can verify that our JetStream instance is running by invoking the JetStream monitoring endpoint: http://localhost:8222/jsz.

curl http://localhost:8222/jsz

The output should be similar to the following:

Terminal
[1] 2021/07/11 21:23:46.401585 [INF] Server is ready
{
  "server_id": "NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO",
  "now": "2021-07-11T21:25:59.914098844Z",
  "config": {
    "max_memory": 38301914112,
    "max_storage": 3299398656,
    "store_dir": "/tmp/nats/jetstream"
  },
  "memory": 0,
  "storage": 0,
  "accounts": 1,
  "api": {
    "total": 0,
    "errors": 0
  },
  "current_api_calls": 0
}

Note: The store_dir location is the filesystem directory that will contain persisted stream data.

Creating Streams

Before we can publish or consume messages from a stream, we must first create it. The creation step informs NATS that we want the messages to be persisted and instructs NATS how to configure the stream. First, let's look at what we need to create our stream.

stream Naming Conventions

Unsurprisingly, every stream must have a unique name. JetStream's documentation includes the following naming recommendations:

  • Alphanumeric values are recommended.
  • Spaces, tabs, period (.), greater than (>), or asterisk (*) are prohibited.
  • Limit name length. The JetStream storage directories will include the account, stream name, and consumer name, so a generally safe approach would be to keep names under 32 characters.
  • Do not use reserved file names like NUL, LPT1, etc.
  • Be aware that some file systems are case insensitive, so do not use stream or account names that would collide in a file system. For example, Foo and foo would collide on a Windows or Mac OSx System.

Persistence

When configuring a stream, we must provide the Storage Type used to persist the stream's messages. There are currently two options: Memory and File. JetStream defaults to File mode if not explicitly declared. This mode will persist stream messages to the filesystem and is required if the stream must persist between server restarts. If maximum throughput is needed, Memory storage can improve performance at the cost of losing all memory streams after a server restart (unless replication is configured in a clustered configuration).

Subjects

Streams can have one or more subjects associated with them. Messages in the stream must adhere to the policies defined by the stream.

PUSH VS Pull Consumers

When creating a stream consumer subscription, we must first decide how messages will be delivered. For example, do we want the stream to push messages to our consumers, or do we want our consumers to pull messages from the stream?

Ephemeral and Durable Consumers

There are two types of stream consumers: Ephemeral and Durable.

Ephemeral consumers

Ephemeral consumers are not long-lived. By default, JetStream consumers are ephemeral. Each connection appears to the server as a new consumer and will only track the consumer while active. Ephemeral consumers can only be push subscribers.

Durable consumers

If the consumer is expected to be long-lived, we can create a durable consumer. With a durable consumer, the server will keep track of the consumer's stream position. If the consumer restarts, the server will continue at the last stream position. Pull subscriptions require a durable consumer. Its also important to understand that once a durable consumer has been created, some options cannot be changed.

Consumer Configuration

As you have no doubt already guessed from the four consumer options we have already covered, consumers must be configured before they can access a given stream. We have a fair number of consumer configuration options to consider when creating a new consumer.

Option Description
durable We can make a consumer durable by setting the consumer's name
deliver policy When we create a new consumer, we can specify where in the stream we want to start receiving messages. We have five options to choose from:
All (Default Option). The consumer will start at the first (earliest) position in the stream.
Last The last message in the stream will be the first message the consumer receives.
New Only messages received after the consumer was created will be delivered to the consumer.
ByStartSequence A startSequence id must be supplied. All messages with a value equal or greater than the sequence id will be delivered to the consumer.
ByStartTime Here the consumer supplies a start time. All messages with a time equal to or greater than the value supplied will be delivered.
Deliver Subject This value is the subject we are subscribing to.
Ack Policy We can configure what acknowledgment policy we want for the consumer.

None No ACK is required. (push subscribers only).
All All previous messages will be acknowledged when the consumer sends an ACK. The ACK must be sent within the Ack wait window.
Explicit (Default Option). Every message must be acknowledged. Pull consumers must be Explicit.
Ack Wait The time (in nanoseconds) the server will wait for a message to acknowledged.
Max Ack Pending If the server does not receive an ACK after sending Max ACK Pending messages, it will stop sending messages until it receives one or more ACKs. It will start sending messages from the point it stopped.
Replay Policy When the delivery policy has been set to All,ByStartSequence,ByStartTime:

Original Messages will be pushed to the consumer at the same rate they were originally received to reproduce the original timing of the messages.
Instant Messages will be pushed as fast as possible (based on ACK Policy, Max ACK Pending, and the clients ability to consume them).
Max Deliver The maximum number of times a specific message will be delivered.
Filter Subject For streams with a wildcard subject, this option configures the subset of the complete wildcard subject.
Rate Limit limits the speed of message deliver in bits-per-second
Sample Frequency Sets the percentage of ACKS that should be sampled for observability. The range is 0-100.
Idle Heartbeat If set the server will send the client a status message when the interval has elapsed without receiving a new message.
FlowControl When set, the consumer can manage back-pressure based on pending limits of max message or bytes instead of depending on rate limiting.

Now that we have a cursory understanding of our configuration options let's create a stream.


using the CLI to create a stream

With our JetStream instance running, let's spin up the first (of several) nats-box containers. In a separate terminal instance, run the following command:

docker run --network host --rm -it synadia/nats-box

You should be greeted with the following:

Terminal
             _             _               
 _ __   __ _| |_ ___      | |__   _____  __
| '_ \ / _` | __/ __|_____| '_ \ / _ \ \/ /
| | | | (_| | |_\__ \_____| |_) | (_) >  < 
|_| |_|\__,_|\__|___/     |_.__/ \___/_/\_\
                                           
nats-box v0.5.0
59f611d7bc1c:~# 


As mentioned in the previous article, nats-box is a containerized NATS toolbox that provides several tools (nats, nats-top, stan-pub, stan-sub, stan-bench) we can use to interact with the NATS server. In this article, we will be using the nats CLI.

Using the nats CLI, we can create our first stream with the following command:

nats str add NOTIFICATIONS --subjects "NOTIFICATIONS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard old --dupe-window="0s" --replicas 1

Your output should be similar to the following:

Terminal
  
  Stream NOTIFICATIONS was created

Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z

Configuration:

             Subjects: NOTIFICATIONS.*
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: 1y0d0h0m0s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 0

We see that the stream NOTIFICATIONS.* was created. Note that we include the ( *) wildcard character. We can use the nats CLI to list the current streams

nats str ls

The output should include the NOTIFICATIONS stream:

Streams: NOTIFICATIONS

We can also query the streams configuration using the following command:

Streams: nats str info NOTIFICATIONS

The configuration should look similar to the following:

Terminal
  
  Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z

Configuration:

             Subjects: NOTIFICATIONS.*
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: 1y0d0h0m0s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 0


In a new nats-box terminal, we can publish messages to our stream with the following command:

nats pub NOTIFICATIONS.inbound 'new message'

The output should look similar to:

17:58:29 Published 11 bytes to "NOTIFICATIONS.inbound"

If we query the streams configuration again using:

nats str info NOTIFICATIONS

We will see that the state of our stream has been updated to reflect the published message:

Terminal
  
Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z

Configuration:

             Subjects: NOTIFICATIONS.*
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: 1y0d0h0m0s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 1
                Bytes: 63 B
             FirstSeq: 1 @ 2021-07-11T17:58:29 UTC
              LastSeq: 1 @ 2021-07-11T17:58:29 UTC
     Active Consumers: 0


With our first message published, let's now create a Push subscriber. In a separate nats-box terminal, issue the following command:

nats con add NOTIFICATIONS MONITOR --ack none --target monitor.NOTIFICATIONS --deliver last --replay instant --filter '' --heartbeat=10s --flow-control

Your output should appear similar to:

Terminal
  Information for Consumer NOTIFICATIONS > MONITOR created 2021-07-11T18:25:21Z

Configuration:

        Durable Name: MONITOR
    Delivery Subject: monitor.NOTIFICATIONS
        Deliver Last: true
          Ack Policy: None
       Replay Policy: Instant
      Idle Heartbeat: 10.00s
        Flow Control: true

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0
     Redelivered Messages: 0
     Unprocessed Messages: 1
 

With our stream consumer created, we can subscribe to it using:

nats sub monitor.NOTIFICATIONS

We should start receiving messages in the terminal:

Terminal
18:30:56 Subscribing on monitor.NOTIFICATIONS
[#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 1 / ack: false
new message

[#2] Received on "monitor.NOTIFICATIONS"
Nats-Last-Consumer: 1
Nats-Last-Stream: 1
Status: 100
Description: Idle Heartbeat

From the highlighted console output, we can see that we received the original message ( new message) and that the idle heartbeat is active, letting us know that we are still subscribed.

Switching back to the terminal where we published the first message, let's publish two more messages:

nats pub NOTIFICATIONS.inbound 'another message'
nats pub NOTIFICATIONS.inbound 'yet another message'

Switching back to our subscriber terminal, we can see that two more messages have been received:

Terminal
 
[#37] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 2 / stream seq: 2 / ack: false
another message

[#38] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 3 / stream seq: 3 / ack: false
yet another message


Let's look at the state of our stream with the following command:

nats str info NOTIFICATIONS

Terminal
Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z

Configuration:

             Subjects: NOTIFICATIONS.*
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: 1y0d0h0m0s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 3
                Bytes: 199 B 
             FirstSeq: 1 @ 2021-07-11T17:58:29 UTC
              LastSeq: 3 @ 2021-07-11T18:36:57 UTC
     Active Consumers: 1


Here we see the primary difference with using message streaming. In the output, we see that the stream currently has three messages. While those messages were delivered to our first consumer, the stream still contains the published messages.

Let's create another consumer with the following command:

nats con add NOTIFICATIONS MONITOR2 --ack none --target monitor2.NOTIFICATIONS --deliver last --replay instant --filter '' --heartbeat=10s --flow-control

We should get output similar to this:

Terminal
      Information for Consumer NOTIFICATIONS > MONITOR2 created 2021-07-11T19:12:17Z

Configuration:

        Durable Name: MONITOR2
    Delivery Subject: monitor2.NOTIFICATIONS
        Deliver Last: true
          Ack Policy: None
       Replay Policy: Instant
      Idle Heartbeat: 10.00s
        Flow Control: true

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 2
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 2
         Outstanding Acks: 0
     Redelivered Messages: 0
     Unprocessed Messages: 1

  

You may be wondering why the number of unprocessed messages is 1? When we created the new consumer, we set the --deliver flag to last. Setting this flag to last will start delivery with the last message in the stream to the consumer. Let's subscribe to monitor2.NOTIFICATIONS with the following command :

nats sub monitor2.NOTIFICATIONS

We will get the following output:

Terminal
19:13:06 Subscribing on monitor2.NOTIFICATIONS
[#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR2 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 3 / ack: false
yet another message
  

We see that the last message in the stream ( yet another message) was delivered.

Let's create another consumer, this time we will change the --deliver flag to All:

nats con add NOTIFICATIONS MONITOR3 --ack none --target monitor3.NOTIFICATIONS --deliver all --replay instant --filter '' --heartbeat=10s --flow-control

We get the following output:

Terminal
 
Information for Consumer NOTIFICATIONS > MONITOR3 created 2021-07-11T19:16:34Z

Configuration:

        Durable Name: MONITOR3
    Delivery Subject: monitor3.NOTIFICATIONS
         Deliver All: true
          Ack Policy: None
       Replay Policy: Instant
      Idle Heartbeat: 10.00s
        Flow Control: true

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0
     Redelivered Messages: 0
     Unprocessed Messages: 3


 
  

As we might expect, by setting the delivery to All, the number of Unprocessed Messages is 3, which is all of the published stream messages.

We can also configure our consumers to start delivering messages starting from a given sequence id. Let's create another consumer which will deliver messages starting at sequence id # 2. To do this, we will call the following command:

nats con add NOTIFICATIONS MONITOR4 --ack none --target monitor4.NOTIFICATIONS --deliver 2 --replay instant --filter '' --heartbeat=10s --flow-control

We get the following output:

Terminal
 
 Information for Consumer NOTIFICATIONS > MONITOR4 created 2021-07-11T19:16:34Z

Configuration:

        Durable Name: MONITOR4
    Delivery Subject: monitor4.NOTIFICATIONS
      Start Sequence: 2
          Ack Policy: None
       Replay Policy: Instant
      Idle Heartbeat: 10.00s
        Flow Control: true

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 1
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 1
         Outstanding Acks: 0
     Redelivered Messages: 0
     Unprocessed Messages: 2


  

Here we see that we now have two unprocessed messages. We can retrieve the messages with the following command:

nats sub monitor4.NOTIFICATIONS

We get the following output:

Terminal
 
01:09:09 Subscribing on monitor4.NOTIFICATIONS
[#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR4 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 2 / ack: false
another message

[#2] Received JetStream message: consumer: NOTIFICATIONS > MONITOR4 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 2 / stream seq: 3 / ack: false
yet new message

  

And we see that we get the last two messages: another message and yet new message.

Integrating JEtstream

With a basic understanding of message streams we will now build a simple Java Spring Boot REST service to demonstrate JetStream messaging integration.

Implementing the JEtstream Rest Cliemt

Let's start with our service's configuration.

Configuration

We configure our build enviroment using a Maven pom.xml file.

loading...

Here we see the project dependencies. Of note is the jnats dependency, which provides NATS Java client support.

The application.properties file provides the service's externalized configuration.

loading...

The nats.server provides a list of NATS servers the service will attempt to connect to. In this example, we are only using a single NATS instance. Included here (but commented out) is the spring.jackson.serialization.FAIL_ON_EMPTY_BEANS property. In this service, we are not returning the io.nats.client.Message to the caller. However, if you wish to serialize io.nats.client.Message in your services, you will need to uncomment this property.

Jetstream rest service application

This class provides the service's application entry point.

loading...

While this is pretty standard Spring-Boot boiler-plate code, it does contain the @TypeHint annotation to inform Spring Native's AOT compiler that it needs to include the io.nats.client.impl.SocketDataPort when generating the native image executable.

REST controller

The RESTController contains the interesting parts of this service. The controller creates the JetStream stream, pre-loads the stream with sample messages, and provides endpoint methods to publish and consume messages.

loading...

The controller provides six methods:

  • /stream-info - returns the current stream information details.
  • /pub/{message} - publishes the contents of {message} to the stream.
  • /consumer?count=n - returns (n) messages from the stream if they are available. If the stream contains less than (n) message, the consumer will wait until more messages arrive or the timeout duration is reached.
  • /consumeBySequence?startSequenceId=seqId,&count=n - returns n messages starting with the message corresponding to the startSequenceId.
  • /consumer2ByTime?startTime=t&count=n - returns i messages starting with the message whose timestamp is equal to t or the first message in the stream whose timestamp is greater than t. The time format for t is yyyy-MM-ddTHH:mm:ss.SSSZ.


Building the Jar

We can build the service jar file with the following command:

mvn clean package


Running the jar

Once we have built the jar file, we can run it with:

java -jar ./target/jetstream-client-0.0.1-SNAPSHOT.jar


Building the Spring Native executable

In addition to building a jar file, we can also generate a Spring Native GraalVM native executable file and containerize it. Note: Building the Spring Native executable requires more system resources and takes considerably longer so be prepared for a longer build time. You can build the executable with the following command:

mvn spring-boot:build-image


Running the containerized service

We run the containerized native executable with the following command:

docker run --network host --rm -p 8080:8080 jetstream-client:0.0.1-SNAPSHOT


Exercising the Service

The first thing we must do is start up the service using the native image container or the jar file. Once started, the output should be similar to:

Terminal

====================================================

      _      _   ____  _                            
     | | ___| |_/ ___|| |_ _ __ ___  __ _ _ __ ___  
  _  | |/ _ \ __\___ \| __| '__/ _ \/ _` | '_ ` _ \ 
 | |_| |  __/ |_ ___) | |_| | |  __/ (_| | | | | | |
  \___/ \___|\__|____/_\__|_|_ \___|\__,_|_| |_| |_|
 |  _ \ ___  ___| |_ / ___| (_) ___ _ __ | |_       
 | |_) / _ \/ __| __| |   | | |/ _ \ '_ \| __|      
 |  _ <  __/\__ \ |_| |___| | |  __/ | | | |_       
 |_| \_\___||___/\__|\____|_|_|\___|_| |_|\__|      
                                                    


Application: jetstream-client 
    Version: (0.0.1-SNAPSHOT)

====================================================

                                            

2021-07-11 23:26:57.468  INFO 11831 --- [           main] .n.j.c.JetstreamClientServiceApplication : Starting JetstreamClientServiceApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 11831 (/home/cwoodward/Desktop/workspace/jetstream-client/target/jetstream-client-0.0.1-SNAPSHOT.jar started by workspace in /home/workspace/jetstream-client)
2021-07-11 23:26:57.475  INFO 11831 --- [           main] .n.j.c.JetstreamClientServiceApplication : No active profile set, falling back to default profiles: default
2021-07-11 23:27:00.341  INFO 11831 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-07-11 23:27:00.463  INFO 11831 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-07-11 23:27:00.464  INFO 11831 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.52]
2021-07-11 23:27:00.588  INFO 11831 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-07-11 23:27:00.589  INFO 11831 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2624 ms
2021-07-11 23:27:00.797  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : REST controller postConstruct.
2021-07-11 23:27:00.797  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : creating stream
2021-07-11 23:27:00.802  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : adding nats server:nats://localhost:4222
2021-07-11 23:27:00.877  INFO 11831 --- [pool-1-thread-1] c.t.n.j.c.controller.RESTController      : Connection Event:nats: connection opened
2021-07-11 23:27:00.877  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : return connection:io.nats.client.impl.NatsConnection@dc885411
2021-07-11 23:27:00.877  INFO 11831 --- [pool-1-thread-1] c.t.n.j.c.controller.RESTController      : CONNECTED to NATS!
2021-07-11 23:27:00.902  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : Stream does not exist
2021-07-11 23:27:00.902  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : creating stream
2021-07-11 23:27:00.911  INFO 11831 --- [           main] c.t.n.j.c.controller.RESTController      : Created Stream
2021-07-11 23:27:01.939  INFO 11831 --- [           main] o.s.b.a.w.s.WelcomePageHandlerMapping    : Adding welcome page: class path resource [static/index.html]
2021-07-11 23:27:02.588  INFO 11831 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-07-11 23:27:02.642  INFO 11831 --- [           main] .n.j.c.JetstreamClientServiceApplication : Started JetstreamClientServiceApplication in 6.902 seconds (JVM running for 8.215)


  

With the application started, we can now interact with it.

Stream Info

When the service starts, it will create a stream and preload it with several test messages. We can verify that our stream was created with the following command:

curl http://localhost:8080/stream-info

You should receive a JSON message similar to the following:


  {
    "type":"io.nats.jetstream.api.v1.stream_info_response",
    "error":null,
    "clusterInfo":null,
    "mirrorInfo":null,
    "sourceInfos":null,
    "configuration":  {
                        "name":"NOTIFICATIONS",
                        "description":null,
                        "subjects":["NOTIFICATIONS.monitor1"],
                        "retentionPolicy":"Limits",
                        "maxConsumers":-1,
                        "maxMsgs":-1,  
                        "maxMsgsPerSubject":0,
                        "maxBytes":-1,
                        "maxAge":"PT0S",
                        "maxMsgSize":-1,
                        "storageType":"Memory",
                        "replicas":1,
                        "noAck":false,
                        "templateOwner":null,
                        "discardPolicy":"Old",
                        "duplicateWindow":"PT2M",
                        "placement":null,
                        "mirror":null,
                        "sources":[]
                      },
   "streamState":{
                        "consumerCount":0,
                        "firstTime":"2021-07-11T02:36:25.540077149Z",
                        "lastTime":"2021-07-11T02:36:25.554296941Z",
                        "lastSequence":10,
                        "msgCount":10,
                        "firstSequence":1,
                        "byteCount":600
                },
     "createTime":"2021-07-11T02:36:25.538954677Z",
     "description":null,
     "errorCode":-1,
     "apiErrorCode":-1
  }

We see that the streamState contains the firstSequence value of (1) and the lastSequence value of (10). These values indicate that our stream currently has the ten messages preloaded by the service.

Publishing a message to the stream

No that we know every thing is working we can use the following command to publish a message:

curl localhost:8080/pub/This-is-a-message

You should receive a JSON message similar to this:


 {
  "type":"io.nats.jetstream.api.v1.stream_info_response",
  "error":null,
  "clusterInfo":null,
  "mirrorInfo":null,
  "sourceInfos":null,
  "configuration": {
                    "name":"NOTIFICATIONS",
                    "description":null,
                    "subjects":["NOTIFICATIONS.monitor1"],
                    "retentionPolicy":"Limits",
                    "maxConsumers":-1,
                    "maxMsgs":-1,
                    "maxMsgsPerSubject":0,
                    "maxBytes":-1,
                    "maxAge":"PT0S",
                    "maxMsgSize":-1,
                    "storageType":"Memory",
                    "replicas":1,
                    "noAck":false,
                    "templateOwner":null,
                    "discardPolicy":"Old",
                    "duplicateWindow":"PT2M",
                    "placement":null,
                    "mirror":null,
                    "sources":[]
                   },
    "streamState": {
                    "consumerCount":0,
                    "firstTime":"2021-07-11T03:27:00.914670473Z",
                    "lastTime":"2021-07-11T03:47:03.608571452Z",
                    "msgCount":11,
                    "byteCount":642,
                    "firstSequence":1,
                    "lastSequence":11},
                    "createTime":"2021-07-11T03:27:00.903504093Z",
                    "errorCode":-1,
                    "apiErrorCode":-1,
                    "description":null
                   } 

The response's streamState msgCount shows the stream contains 11 messages. We also see that the lastSequence value is also 11.

Consuming a message

With our stream populated, we can begin consuming the stream's messages. We will start by using our first pull consumer command:

curl "http://localhost:8080/consumer? count=5"

Note: the URL is enclosed with ( ") since our URL contains the ( &) character. We should receive a list containing the five stream messages:


  [
   "2021-07-11T02:36:25.540077149Z : <pre-loaded message (0)>",
   "2021-07-11T02:36:25.540077149Z : <pre-loaded message (1)>",
   "2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
   "2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>",
   "2021-07-11T02:36:25.548017482Z : <pre-loaded message (4)<"
  ]
  

The output shows the first five messages from the stream. If we call the command again, we should see five more messages.


  [
   "2021-07-11T02:27:00.935164617Z : <pre-loaded message (5)>",
   "2021-07-11T02:27:00.939333472Z : <pre-loaded message (6)>",
   "2021-07-11T02:27:00.943049112Z : <pre-loaded message (7)>",
   "2021-07-11T02:27:00.945667805Z : <pre-loaded message (8)>",
   "2021-07-11T02:27:00.948213843Z : <pre-loaded message (9)>"
  ]
  

Consuming a message by sequence id

We can also consume messages starting with a sequence id by invoking the following command:

curl "http://localhost:8080/consumeBySequence? startSequenceId=3& count=3"

The output should be similar to the following:


  [
  "2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
  "2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>",
  "2021-07-11T02:36:25.548017482Z : <pre-loaded message (4)>"]
  

With query parameters startSequenceId=3& count=3 we get three messages, 2, 3, 4 (The stream starts with message 0).

Consuming a message by start time

Lastly, we can consume messages starting with a given start time using this command:

curl "http://localhost:8080/consumeByTime? count=3& startTime=2021-07-11T02:36:25.544617729Z"

The output should be similar to the following:


  [
  "2021-07-11T02:36:25.544617729Z : <pre-loaded message (1)>",
  "2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
  "2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>"
    ]
  

JetStream returns three messages starting with the first message with a timestamp that is equal to after the time we supplied.

Coming up

In this article, we have had a brief introduction to messaging streaming and seen how we can integrate NATS's JetStream to a Spring Boot microservice. In the third, and final NATS article of this series, we will demonstrate how we can configure OpenFaas with the NATS-Connector to trigger OpenFaaS functions using NATS messages.


Resources