The last
article introduced the
NATS messaging platform and covered its core messaging patterns, how to build a cluster, and integrated
NATS into a simple microservice application. Today we will complete our coverage of
NATS by introducing
NATS JetStream
message streaming and demonstrate how to integrate it into yet another simple
Spring microservice application.
We will spin up our NATS container, but we will include the -js flag to instruct NATS to start JetStream.
The output should appear similar to the following:
We can verify that our JetStream instance is running by invoking the JetStream monitoring endpoint: http://localhost:8222/jsz.
The output should be similar to the following:
Note: The store_dir location is the filesystem directory that will contain persisted stream data.
Now that we have a cursory understanding of our configuration options let's create a stream.
You should be greeted with the following:
As mentioned in the previous article, nats-box is a containerized NATS toolbox that provides several tools (nats, nats-top, stan-pub, stan-sub, stan-bench) we can use to interact with the NATS server. In this article, we will be using the nats CLI.
Using the nats CLI, we can create our first stream with the following command:
Your output should be similar to the following:
We see that the stream NOTIFICATIONS.* was created. Note that we include the ( *) wildcard character. We can use the nats CLI to list the current streams
The output should include the NOTIFICATIONS stream:
We can also query the streams configuration using the following command:
The configuration should look similar to the following:
In a new nats-box terminal, we can publish messages to our stream with the following command:
The output should look similar to:
If we query the streams configuration again using:
We will see that the state of our stream has been updated to reflect the published message:
With our first message published, let's now create a Push subscriber. In a separate nats-box terminal, issue the following command:
Your output should appear similar to:
With our stream consumer created, we can subscribe to it using:
We should start receiving messages in the terminal:
From the highlighted console output, we can see that we received the original message ( new message) and that the idle heartbeat is active, letting us know that we are still subscribed.
Switching back to the terminal where we published the first message, let's publish two more messages:
Switching back to our subscriber terminal, we can see that two more messages have been received:
Let's look at the state of our stream with the following command:
Here we see the primary difference with using message streaming. In the output, we see that the stream currently has three messages. While those messages were delivered to our first consumer, the stream still contains the published messages.
Let's create another consumer with the following command:
We should get output similar to this:
You may be wondering why the number of unprocessed messages is 1? When we created the new consumer, we set the --deliver flag to last. Setting this flag to last will start delivery with the last message in the stream to the consumer. Let's subscribe to monitor2.NOTIFICATIONS with the following command :
We will get the following output:
We see that the last message in the stream ( yet another message) was delivered.
Let's create another consumer, this time we will change the --deliver flag to All:
We get the following output:
As we might expect, by setting the delivery to All, the number of Unprocessed Messages is 3, which is all of the published stream messages.
We can also configure our consumers to start delivering messages starting from a given sequence id. Let's create another consumer which will deliver messages starting at sequence id # 2. To do this, we will call the following command:
We get the following output:
Here we see that we now have two unprocessed messages. We can retrieve the messages with the following command:
We get the following output:
And we see that we get the last two messages: another message and yet new message.
Here we see the project dependencies. Of note is the jnats dependency, which provides NATS Java client support.
The application.properties file provides the service's externalized configuration.
The nats.server provides a list of NATS servers the service will attempt to connect to. In this example, we are only using a single NATS instance. Included here (but commented out) is the spring.jackson.serialization.FAIL_ON_EMPTY_BEANS property. In this service, we are not returning the io.nats.client.Message to the caller. However, if you wish to serialize io.nats.client.Message in your services, you will need to uncomment this property.
While this is pretty standard Spring-Boot boiler-plate code, it does contain the @TypeHint annotation to inform Spring Native's AOT compiler that it needs to include the io.nats.client.impl.SocketDataPort when generating the native image executable.
The controller provides six methods:
With the application started, we can now interact with it.
You should receive a JSON message similar to the following:
We see that the streamState contains the firstSequence value of (1) and the lastSequence value of (10). These values indicate that our stream currently has the ten messages preloaded by the service.
You should receive a JSON message similar to this:
The response's streamState msgCount shows the stream contains 11 messages. We also see that the lastSequence value is also 11.
Note: the URL is enclosed with ( ") since our URL contains the ( &) character. We should receive a list containing the five stream messages:
The output shows the first five messages from the stream. If we call the command again, we should see five more messages.
The output should be similar to the following:
With query parameters startSequenceId=3& count=3 we get three messages, 2, 3, 4 (The stream starts with message 0).
The output should be similar to the following:
JetStream returns three messages starting with the first message with a timestamp that is equal to after the time we supplied.
What is Message Streaming?
In traditional messaging, once a message has been delivered, it is consumed. If we want to reprocess the message, we need to republish it or maintain a separate copy. With message streaming, the messaging provider captures the stream of messages and persists them into some form of persistent storage. With the stream of messages stored, we can retrieve the messages for the duration of their persistence by querying the stream.NATS Jetstream
NATS JetStream is a distributed message streaming system that uses the publish/subscribe messaging pattern. It is the second generation of message streaming from NATS, replacing the older NATS Streaming, which will sunset in June 2023. With JetStream, we get message streaming support integrated directly into the NATS server. Now, in addition to NATS' core messaging pattern support, we also get persistence (file or memory-based), the ability to read messages from a specific time or message sequence, as well as the durable or ephemeral message consumer support. JetStream features include:- At-least-once delivery, exactly once within a window.
- Store messages and replay by time or sequence.
- Wildcard support.
- Account aware.
- Data at rest encryption.
- Cleanse specific messages (GDPR).
- Horizontal scalability.
- Persist Streams and replay via Consumer.
Running JetStream
As we did in the last article, we will run our NATS instance using docker. We will also use the Synadia nats-box containerized tool to test our instance.We will spin up our NATS container, but we will include the -js flag to instruct NATS to start JetStream.
docker run --network host -p 4222:4222 nats
-js
The output should appear similar to the following:
Terminal
[1] 2021/07/11 21:23:46.393662 [INF] Starting nats-server
[1] 2021/07/11 21:23:46.393712 [INF] Version: 2.3.4
[1] 2021/07/11 21:23:46.393716 [INF] Git: [7112ae0]
[1] 2021/07/11 21:23:46.393720 [INF] Name: NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO
[1] 2021/07/11 21:23:46.393726 [INF] Node: K0akWYW4
[1] 2021/07/11 21:23:46.393729 [INF] ID: NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO
[1] 2021/07/11 21:23:46.395427 [INF] Starting JetStream
[1] 2021/07/11 21:23:46.395911 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[1] 2021/07/11 21:23:46.395915 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[1] 2021/07/11 21:23:46.395918 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[1] 2021/07/11 21:23:46.395921 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[1] 2021/07/11 21:23:46.395924 [INF]
[1] 2021/07/11 21:23:46.395927 [INF] https://docs.nats.io/jetstream
[1] 2021/07/11 21:23:46.395930 [INF]
[1] 2021/07/11 21:23:46.395933 [INF] ---------------- JETSTREAM ----------------
[1] 2021/07/11 21:23:46.395939 [INF] Max Memory: 35.67 GB
[1] 2021/07/11 21:23:46.395944 [INF] Max Storage: 99.15 GB
[1] 2021/07/11 21:23:46.395947 [INF] Store Directory: "/tmp/nats/jetstream"
[1] 2021/07/11 21:23:46.395950 [INF] -------------------------------------------
[1] 2021/07/11 21:23:46.399668 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/07/11 21:23:46.400005 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/07/11 21:23:46.401585 [INF] Server is ready
We can verify that our JetStream instance is running by invoking the JetStream monitoring endpoint: http://localhost:8222/jsz.
curl http://localhost:8222/jsz
The output should be similar to the following:
Terminal
[1] 2021/07/11 21:23:46.401585 [INF] Server is ready { "server_id": "NCB5DUNSDTDKLG3DRBK5EWOV7OELSBA4WTAJKETEURV2I5JNRF4COFAO", "now": "2021-07-11T21:25:59.914098844Z", "config": { "max_memory": 38301914112, "max_storage": 3299398656, "store_dir": "/tmp/nats/jetstream" }, "memory": 0, "storage": 0, "accounts": 1, "api": { "total": 0, "errors": 0 }, "current_api_calls": 0 }
Note: The store_dir location is the filesystem directory that will contain persisted stream data.
Creating Streams
Before we can publish or consume messages from a stream, we must first create it. The creation step informs NATS that we want the messages to be persisted and instructs NATS how to configure the stream. First, let's look at what we need to create our stream.stream Naming Conventions
Unsurprisingly, every stream must have a unique name. JetStream's documentation includes the following naming recommendations:- Alphanumeric values are recommended.
- Spaces, tabs, period (.), greater than (>), or asterisk (*) are prohibited.
- Limit name length. The JetStream storage directories will include the account, stream name, and consumer name, so a generally safe approach would be to keep names under 32 characters.
- Do not use reserved file names like NUL, LPT1, etc.
- Be aware that some file systems are case insensitive, so do not use stream or account names that would collide in a file system. For example, Foo and foo would collide on a Windows or Mac OSx System.
Persistence
When configuring a stream, we must provide the Storage Type used to persist the stream's messages. There are currently two options: Memory and File. JetStream defaults to File mode if not explicitly declared. This mode will persist stream messages to the filesystem and is required if the stream must persist between server restarts. If maximum throughput is needed, Memory storage can improve performance at the cost of losing all memory streams after a server restart (unless replication is configured in a clustered configuration).Subjects
Streams can have one or more subjects associated with them. Messages in the stream must adhere to the policies defined by the stream.PUSH VS Pull Consumers
When creating a stream consumer subscription, we must first decide how messages will be delivered. For example, do we want the stream to push messages to our consumers, or do we want our consumers to pull messages from the stream?Ephemeral and Durable Consumers
There are two types of stream consumers: Ephemeral and Durable.Ephemeral consumers
Ephemeral consumers are not long-lived. By default, JetStream consumers are ephemeral. Each connection appears to the server as a new consumer and will only track the consumer while active. Ephemeral consumers can only be push subscribers.Durable consumers
If the consumer is expected to be long-lived, we can create a durable consumer. With a durable consumer, the server will keep track of the consumer's stream position. If the consumer restarts, the server will continue at the last stream position. Pull subscriptions require a durable consumer. Its also important to understand that once a durable consumer has been created, some options cannot be changed.Consumer Configuration
As you have no doubt already guessed from the four consumer options we have already covered, consumers must be configured before they can access a given stream. We have a fair number of consumer configuration options to consider when creating a new consumer.Option | Description | ||||||||||
durable | We can make a consumer durable by setting the consumer's name | ||||||||||
deliver policy | When we create a new consumer, we can specify where in the stream we want to start receiving messages. We have five options to choose from:
|
||||||||||
Deliver Subject | This value is the subject we are subscribing to. | ||||||||||
Ack Policy | We can configure what acknowledgment policy we want for the consumer.
|
||||||||||
Ack Wait | The time (in nanoseconds) the server will wait for a message to acknowledged. | ||||||||||
Max Ack Pending | If the server does not receive an ACK after sending Max ACK Pending messages, it will stop sending messages until it receives one or more ACKs. It will start sending messages from the point it stopped. | ||||||||||
Replay Policy | When the delivery policy has been set to All,ByStartSequence,ByStartTime:
|
||||||||||
Max Deliver | The maximum number of times a specific message will be delivered. | ||||||||||
Filter Subject | For streams with a wildcard subject, this option configures the subset of the complete wildcard subject. | ||||||||||
Rate Limit | limits the speed of message deliver in bits-per-second | ||||||||||
Sample Frequency | Sets the percentage of ACKS that should be sampled for observability. The range is 0-100. | ||||||||||
Idle Heartbeat | If set the server will send the client a status message when the interval has elapsed without receiving a new message. | ||||||||||
FlowControl | When set, the consumer can manage back-pressure based on pending limits of max message or bytes instead of depending on rate limiting. |
Now that we have a cursory understanding of our configuration options let's create a stream.
using the CLI to create a stream
With our JetStream instance running, let's spin up the first (of several) nats-box containers. In a separate terminal instance, run the following command:
docker run --network host --rm -it synadia/nats-box
You should be greeted with the following:
Terminal
_ _ _ __ __ _| |_ ___ | |__ _____ __ | '_ \ / _` | __/ __|_____| '_ \ / _ \ \/ / | | | | (_| | |_\__ \_____| |_) | (_) > < |_| |_|\__,_|\__|___/ |_.__/ \___/_/\_\ nats-box v0.5.0 59f611d7bc1c:~#
As mentioned in the previous article, nats-box is a containerized NATS toolbox that provides several tools (nats, nats-top, stan-pub, stan-sub, stan-bench) we can use to interact with the NATS server. In this article, we will be using the nats CLI.
Using the nats CLI, we can create our first stream with the following command:
nats str add NOTIFICATIONS --subjects "NOTIFICATIONS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard old --dupe-window="0s" --replicas 1
Your output should be similar to the following:
Terminal
Stream NOTIFICATIONS was created Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z Configuration: Subjects: NOTIFICATIONS.* Acknowledgements: true Retention: File - Limits Replicas: 1 Discard Policy: Old Duplicate Window: 2m0s Maximum Messages: unlimited Maximum Bytes: unlimited Maximum Age: 1y0d0h0m0s Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 0 Bytes: 0 B FirstSeq: 0 LastSeq: 0 Active Consumers: 0
We see that the stream NOTIFICATIONS.* was created. Note that we include the ( *) wildcard character. We can use the nats CLI to list the current streams
nats str ls
The output should include the NOTIFICATIONS stream:
Streams:
NOTIFICATIONS
We can also query the streams configuration using the following command:
Streams: nats str info NOTIFICATIONS
The configuration should look similar to the following:
Terminal
Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z Configuration: Subjects: NOTIFICATIONS.* Acknowledgements: true Retention: File - Limits Replicas: 1 Discard Policy: Old Duplicate Window: 2m0s Maximum Messages: unlimited Maximum Bytes: unlimited Maximum Age: 1y0d0h0m0s Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 0 Bytes: 0 B FirstSeq: 0 LastSeq: 0 Active Consumers: 0
In a new nats-box terminal, we can publish messages to our stream with the following command:
nats pub NOTIFICATIONS.inbound 'new message'
The output should look similar to:
17:58:29 Published 11 bytes to "NOTIFICATIONS.inbound"
If we query the streams configuration again using:
nats str info NOTIFICATIONS
We will see that the state of our stream has been updated to reflect the published message:
Terminal
Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z Configuration: Subjects: NOTIFICATIONS.* Acknowledgements: true Retention: File - Limits Replicas: 1 Discard Policy: Old Duplicate Window: 2m0s Maximum Messages: unlimited Maximum Bytes: unlimited Maximum Age: 1y0d0h0m0s Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 1 Bytes: 63 B FirstSeq: 1 @ 2021-07-11T17:58:29 UTC LastSeq: 1 @ 2021-07-11T17:58:29 UTC Active Consumers: 0
With our first message published, let's now create a Push subscriber. In a separate nats-box terminal, issue the following command:
nats con add NOTIFICATIONS MONITOR --ack none --target monitor.NOTIFICATIONS --deliver last --replay instant --filter '' --heartbeat=10s --flow-control
Your output should appear similar to:
Terminal
Information for Consumer NOTIFICATIONS > MONITOR created 2021-07-11T18:25:21Z Configuration: Durable Name: MONITOR Delivery Subject: monitor.NOTIFICATIONS Deliver Last: true Ack Policy: None Replay Policy: Instant Idle Heartbeat: 10.00s Flow Control: true State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 Redelivered Messages: 0 Unprocessed Messages: 1
With our stream consumer created, we can subscribe to it using:
nats sub monitor.NOTIFICATIONS
We should start receiving messages in the terminal:
Terminal
18:30:56 Subscribing on monitor.NOTIFICATIONS [#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 1 / ack: false new message [#2] Received on "monitor.NOTIFICATIONS" Nats-Last-Consumer: 1 Nats-Last-Stream: 1 Status: 100 Description: Idle Heartbeat
From the highlighted console output, we can see that we received the original message ( new message) and that the idle heartbeat is active, letting us know that we are still subscribed.
Switching back to the terminal where we published the first message, let's publish two more messages:
nats pub NOTIFICATIONS.inbound 'another message'
nats pub NOTIFICATIONS.inbound 'yet another message'
Switching back to our subscriber terminal, we can see that two more messages have been received:
Terminal
[#37] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 2 / stream seq: 2 / ack: false another message [#38] Received JetStream message: consumer: NOTIFICATIONS > MONITOR / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 3 / stream seq: 3 / ack: false yet another message
Let's look at the state of our stream with the following command:
nats str info NOTIFICATIONS
Terminal
Information for Stream NOTIFICATIONS created 2021-07-11T21:23:14Z Configuration: Subjects: NOTIFICATIONS.* Acknowledgements: true Retention: File - Limits Replicas: 1 Discard Policy: Old Duplicate Window: 2m0s Maximum Messages: unlimited Maximum Bytes: unlimited Maximum Age: 1y0d0h0m0s Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 3 Bytes: 199 B FirstSeq: 1 @ 2021-07-11T17:58:29 UTC LastSeq: 3 @ 2021-07-11T18:36:57 UTC Active Consumers: 1
Here we see the primary difference with using message streaming. In the output, we see that the stream currently has three messages. While those messages were delivered to our first consumer, the stream still contains the published messages.
Let's create another consumer with the following command:
nats con add NOTIFICATIONS MONITOR2 --ack none --target monitor2.NOTIFICATIONS --deliver last --replay instant --filter '' --heartbeat=10s --flow-control
We should get output similar to this:
Terminal
Information for Consumer NOTIFICATIONS > MONITOR2 created 2021-07-11T19:12:17Z
Configuration:
Durable Name: MONITOR2
Delivery Subject: monitor2.NOTIFICATIONS
Deliver Last: true
Ack Policy: None
Replay Policy: Instant
Idle Heartbeat: 10.00s
Flow Control: true
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 2
Acknowledgment floor: Consumer sequence: 0 Stream sequence: 2
Outstanding Acks: 0
Redelivered Messages: 0
Unprocessed Messages: 1
You may be wondering why the number of unprocessed messages is 1? When we created the new consumer, we set the --deliver flag to last. Setting this flag to last will start delivery with the last message in the stream to the consumer. Let's subscribe to monitor2.NOTIFICATIONS with the following command :
nats sub monitor2.NOTIFICATIONS
We will get the following output:
Terminal
19:13:06 Subscribing on monitor2.NOTIFICATIONS
[#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR2 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 3 / ack: false
yet another message
We see that the last message in the stream ( yet another message) was delivered.
Let's create another consumer, this time we will change the --deliver flag to All:
nats con add NOTIFICATIONS MONITOR3 --ack none --target monitor3.NOTIFICATIONS --deliver all --replay instant --filter '' --heartbeat=10s --flow-control
We get the following output:
Terminal
Information for Consumer NOTIFICATIONS > MONITOR3 created 2021-07-11T19:16:34Z
Configuration:
Durable Name: MONITOR3
Delivery Subject: monitor3.NOTIFICATIONS
Deliver All: true
Ack Policy: None
Replay Policy: Instant
Idle Heartbeat: 10.00s
Flow Control: true
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
Outstanding Acks: 0
Redelivered Messages: 0
Unprocessed Messages: 3
As we might expect, by setting the delivery to All, the number of Unprocessed Messages is 3, which is all of the published stream messages.
We can also configure our consumers to start delivering messages starting from a given sequence id. Let's create another consumer which will deliver messages starting at sequence id # 2. To do this, we will call the following command:
nats con add NOTIFICATIONS MONITOR4 --ack none --target monitor4.NOTIFICATIONS --deliver
2 --replay instant --filter '' --heartbeat=10s --flow-control
We get the following output:
Terminal
Information for Consumer NOTIFICATIONS > MONITOR4 created 2021-07-11T19:16:34Z
Configuration:
Durable Name: MONITOR4
Delivery Subject: monitor4.NOTIFICATIONS
Start Sequence: 2
Ack Policy: None
Replay Policy: Instant
Idle Heartbeat: 10.00s
Flow Control: true
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 1
Acknowledgment floor: Consumer sequence: 0 Stream sequence: 1
Outstanding Acks: 0
Redelivered Messages: 0
Unprocessed Messages: 2
Here we see that we now have two unprocessed messages. We can retrieve the messages with the following command:
nats sub monitor4.NOTIFICATIONS
We get the following output:
Terminal
01:09:09 Subscribing on monitor4.NOTIFICATIONS [#1] Received JetStream message: consumer: NOTIFICATIONS > MONITOR4 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 1 / stream seq: 2 / ack: false another message [#2] Received JetStream message: consumer: NOTIFICATIONS > MONITOR4 / subject: NOTIFICATIONS.inbound / delivered: 1 / consumer seq: 2 / stream seq: 3 / ack: false yet new message
And we see that we get the last two messages: another message and yet new message.
Integrating JEtstream
With a basic understanding of message streams we will now build a simple Java Spring Boot REST service to demonstrate JetStream messaging integration.Implementing the JEtstream Rest Cliemt
Let's start with our service's configuration.Configuration
We configure our build enviroment using a Maven pom.xml file.loading...
Here we see the project dependencies. Of note is the jnats dependency, which provides NATS Java client support.
The application.properties file provides the service's externalized configuration.
loading...
The nats.server provides a list of NATS servers the service will attempt to connect to. In this example, we are only using a single NATS instance. Included here (but commented out) is the spring.jackson.serialization.FAIL_ON_EMPTY_BEANS property. In this service, we are not returning the io.nats.client.Message to the caller. However, if you wish to serialize io.nats.client.Message in your services, you will need to uncomment this property.
Jetstream rest service application
This class provides the service's application entry point.loading...
While this is pretty standard Spring-Boot boiler-plate code, it does contain the @TypeHint annotation to inform Spring Native's AOT compiler that it needs to include the io.nats.client.impl.SocketDataPort when generating the native image executable.
REST controller
The RESTController contains the interesting parts of this service. The controller creates the JetStream stream, pre-loads the stream with sample messages, and provides endpoint methods to publish and consume messages.loading...
The controller provides six methods:
- /stream-info - returns the current stream information details.
- /pub/{message} - publishes the contents of {message} to the stream.
- /consumer?count=n - returns (n) messages from the stream if they are available. If the stream contains less than (n) message, the consumer will wait until more messages arrive or the timeout duration is reached.
- /consumeBySequence?startSequenceId=seqId,&count=n - returns n messages starting with the message corresponding to the startSequenceId.
- /consumer2ByTime?startTime=t&count=n - returns i messages starting with the message whose timestamp is equal to t or the first message in the stream whose timestamp is greater than t. The time format for t is yyyy-MM-ddTHH:mm:ss.SSSZ.
Building the Jar
We can build the service jar file with the following command:
mvn clean package
Running the jar
Once we have built the jar file, we can run it with:
java -jar ./target/jetstream-client-0.0.1-SNAPSHOT.jar
Building the Spring Native executable
In addition to building a jar file, we can also generate a Spring Native GraalVM native executable file and containerize it. Note: Building the Spring Native executable requires more system resources and takes considerably longer so be prepared for a longer build time. You can build the executable with the following command:
mvn spring-boot:build-image
Running the containerized service
We run the containerized native executable with the following command:
docker run --network host --rm -p 8080:8080 jetstream-client:0.0.1-SNAPSHOT
Exercising the Service
The first thing we must do is start up the service using the native image container or the jar file. Once started, the output should be similar to:
Terminal
==================================================== _ _ ____ _ | | ___| |_/ ___|| |_ _ __ ___ __ _ _ __ ___ _ | |/ _ \ __\___ \| __| '__/ _ \/ _` | '_ ` _ \ | |_| | __/ |_ ___) | |_| | | __/ (_| | | | | | | \___/ \___|\__|____/_\__|_|_ \___|\__,_|_| |_| |_| | _ \ ___ ___| |_ / ___| (_) ___ _ __ | |_ | |_) / _ \/ __| __| | | | |/ _ \ '_ \| __| | _ < __/\__ \ |_| |___| | | __/ | | | |_ |_| \_\___||___/\__|\____|_|_|\___|_| |_|\__| Application: jetstream-client Version: (0.0.1-SNAPSHOT) ==================================================== 2021-07-11 23:26:57.468 INFO 11831 --- [ main] .n.j.c.JetstreamClientServiceApplication : Starting JetstreamClientServiceApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 11831 (/home/cwoodward/Desktop/workspace/jetstream-client/target/jetstream-client-0.0.1-SNAPSHOT.jar started by workspace in /home/workspace/jetstream-client) 2021-07-11 23:26:57.475 INFO 11831 --- [ main] .n.j.c.JetstreamClientServiceApplication : No active profile set, falling back to default profiles: default 2021-07-11 23:27:00.341 INFO 11831 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2021-07-11 23:27:00.463 INFO 11831 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2021-07-11 23:27:00.464 INFO 11831 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.52] 2021-07-11 23:27:00.588 INFO 11831 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2021-07-11 23:27:00.589 INFO 11831 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2624 ms 2021-07-11 23:27:00.797 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : REST controller postConstruct. 2021-07-11 23:27:00.797 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : creating stream 2021-07-11 23:27:00.802 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : adding nats server:nats://localhost:4222 2021-07-11 23:27:00.877 INFO 11831 --- [pool-1-thread-1] c.t.n.j.c.controller.RESTController : Connection Event:nats: connection opened 2021-07-11 23:27:00.877 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : return connection:io.nats.client.impl.NatsConnection@dc885411 2021-07-11 23:27:00.877 INFO 11831 --- [pool-1-thread-1] c.t.n.j.c.controller.RESTController : CONNECTED to NATS! 2021-07-11 23:27:00.902 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : Stream does not exist 2021-07-11 23:27:00.902 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : creating stream 2021-07-11 23:27:00.911 INFO 11831 --- [ main] c.t.n.j.c.controller.RESTController : Created Stream 2021-07-11 23:27:01.939 INFO 11831 --- [ main] o.s.b.a.w.s.WelcomePageHandlerMapping : Adding welcome page: class path resource [static/index.html] 2021-07-11 23:27:02.588 INFO 11831 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2021-07-11 23:27:02.642 INFO 11831 --- [ main] .n.j.c.JetstreamClientServiceApplication : Started JetstreamClientServiceApplication in 6.902 seconds (JVM running for 8.215)
With the application started, we can now interact with it.
Stream Info
When the service starts, it will create a stream and preload it with several test messages. We can verify that our stream was created with the following command:
curl http://localhost:8080/stream-info
You should receive a JSON message similar to the following:
{
"type":"io.nats.jetstream.api.v1.stream_info_response",
"error":null,
"clusterInfo":null,
"mirrorInfo":null,
"sourceInfos":null,
"configuration": {
"name":"NOTIFICATIONS",
"description":null,
"subjects":["NOTIFICATIONS.monitor1"],
"retentionPolicy":"Limits",
"maxConsumers":-1,
"maxMsgs":-1,
"maxMsgsPerSubject":0,
"maxBytes":-1,
"maxAge":"PT0S",
"maxMsgSize":-1,
"storageType":"Memory",
"replicas":1,
"noAck":false,
"templateOwner":null,
"discardPolicy":"Old",
"duplicateWindow":"PT2M",
"placement":null,
"mirror":null,
"sources":[]
},
"streamState":{
"consumerCount":0,
"firstTime":"2021-07-11T02:36:25.540077149Z",
"lastTime":"2021-07-11T02:36:25.554296941Z",
"lastSequence":10,
"msgCount":10,
"firstSequence":1,
"byteCount":600
},
"createTime":"2021-07-11T02:36:25.538954677Z",
"description":null,
"errorCode":-1,
"apiErrorCode":-1
}
We see that the streamState contains the firstSequence value of (1) and the lastSequence value of (10). These values indicate that our stream currently has the ten messages preloaded by the service.
Publishing a message to the stream
No that we know every thing is working we can use the following command to publish a message:
curl localhost:8080/pub/This-is-a-message
You should receive a JSON message similar to this:
{
"type":"io.nats.jetstream.api.v1.stream_info_response",
"error":null,
"clusterInfo":null,
"mirrorInfo":null,
"sourceInfos":null,
"configuration": {
"name":"NOTIFICATIONS",
"description":null,
"subjects":["NOTIFICATIONS.monitor1"],
"retentionPolicy":"Limits",
"maxConsumers":-1,
"maxMsgs":-1,
"maxMsgsPerSubject":0,
"maxBytes":-1,
"maxAge":"PT0S",
"maxMsgSize":-1,
"storageType":"Memory",
"replicas":1,
"noAck":false,
"templateOwner":null,
"discardPolicy":"Old",
"duplicateWindow":"PT2M",
"placement":null,
"mirror":null,
"sources":[]
},
"streamState": {
"consumerCount":0,
"firstTime":"2021-07-11T03:27:00.914670473Z",
"lastTime":"2021-07-11T03:47:03.608571452Z",
"msgCount":11,
"byteCount":642,
"firstSequence":1,
"lastSequence":11},
"createTime":"2021-07-11T03:27:00.903504093Z",
"errorCode":-1,
"apiErrorCode":-1,
"description":null
}
The response's streamState msgCount shows the stream contains 11 messages. We also see that the lastSequence value is also 11.
Consuming a message
With our stream populated, we can begin consuming the stream's messages. We will start by using our first pull consumer command:
curl "http://localhost:8080/consumer?
count=5"
Note: the URL is enclosed with ( ") since our URL contains the ( &) character. We should receive a list containing the five stream messages:
[
"2021-07-11T02:36:25.540077149Z : <pre-loaded message (0)>",
"2021-07-11T02:36:25.540077149Z : <pre-loaded message (1)>",
"2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
"2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>",
"2021-07-11T02:36:25.548017482Z : <pre-loaded message (4)<"
]
The output shows the first five messages from the stream. If we call the command again, we should see five more messages.
[
"2021-07-11T02:27:00.935164617Z : <pre-loaded message (5)>",
"2021-07-11T02:27:00.939333472Z : <pre-loaded message (6)>",
"2021-07-11T02:27:00.943049112Z : <pre-loaded message (7)>",
"2021-07-11T02:27:00.945667805Z : <pre-loaded message (8)>",
"2021-07-11T02:27:00.948213843Z : <pre-loaded message (9)>"
]
Consuming a message by sequence id
We can also consume messages starting with a sequence id by invoking the following command:
curl "http://localhost:8080/consumeBySequence?
startSequenceId=3&
count=3"
The output should be similar to the following:
[
"2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
"2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>",
"2021-07-11T02:36:25.548017482Z : <pre-loaded message (4)>"]
With query parameters startSequenceId=3& count=3 we get three messages, 2, 3, 4 (The stream starts with message 0).
Consuming a message by start time
Lastly, we can consume messages starting with a given start time using this command:
curl "http://localhost:8080/consumeByTime?
count=3&
startTime=2021-07-11T02:36:25.544617729Z"
The output should be similar to the following:
[
"2021-07-11T02:36:25.544617729Z : <pre-loaded message (1)>",
"2021-07-11T02:36:25.545770159Z : <pre-loaded message (2)>",
"2021-07-11T02:36:25.546857988Z : <pre-loaded message (3)>"
]
JetStream returns three messages starting with the first message with a timestamp that is equal to after the time we supplied.
Coming up
In this article, we have had a brief introduction to messaging streaming and seen how we can integrate NATS's JetStream to a Spring Boot microservice. In the third, and final NATS article of this series, we will demonstrate how we can configure OpenFaas with the NATS-Connector to trigger OpenFaaS functions using NATS messages.Resources
- NATS website.
- NATS JetStream Docs.
- NATS Github Repo.
- NATS Java client Github Repo.
- NATS Server flags.
- Tools
- Demo
Twitter
Facebook
Reddit
LinkedIn
Email