Linkerd

Message-Driven microservices

In the article Message-Driven Architecture, we discussed the benefits of using asynchronous messaging as a mechanism to decouple applications and facilitate scaling. Later, during the development of a simple Outbound Email Service, we demonstrated how we could employ RabbitMQ as our messaging platform. Today's article will introduce the NATS messaging platform as another tool for our growing toolbox and develop several services to demonstrate how we can employ NATS in our microservices.

What is NATS?

NATS is an open-source, cloud-native messaging system with extremely high throughput. This feature makes it extremely attractive for message-driven microservice architectures. NATS is a highly scalable (native clustering) platform with a minimal footprint (under 10MB), making it a lightweight addition to any architecture. It provide implementations for Pub/Sub, Request/Reply, Subject-based, and Queue Group messaging patterns. Lastly, NATS is now part of the Cloud Native Computing Foundation's portfolio which includes Kubernetes, Prometheus, Linkerd, Containerd, Fluentd, Helm, and many more. Membership in the CNCF indicates that NATS has industry acceptance and is a cloud-ready technology.

Digging deeper

NATS was created by Derek Collision originally designed as the messaging control plane for Cloud Foundry. As mentioned previously, NATS is open-source, which can be found on its Github repository. It is currently supported by Synadia, which is helmed, unsurprisingly, by Derek Collision.

According to Derek Collision, NATS is intended as the central nervous system for distributed cloud platforms. The NATS name reflects this perspective, standing for Neural Autonomic Transport System. Awkward acronyms aside, the core design principles of NATS are performance, scalability, and ease of use make it a strong contender for any application's central nervous system.

At its core, NATS consists of four main components:

NATS server

The NATS Server provides the core messaging patterns for implementing Pub/Sub, Request/Reply, Subject-based, and Queue Groups. It is extremely easy to use in both single instances as well as clustered configurations.

JetStream

JetStream replaces the NATS Streaming server, which is sunsetting in June 2023. It extends NATS to include message store and replay by time or sequence, at-least-once delivery, exactly-once in a window, encryption for data at rest, message-specific cleansing, and stream persistence and replay by consumers. JetStream will be covered in a separate, future article.

NATS clients

NATS clients connect and communicate with NATS servers. Currently, there are over thirty language-specific clients supported, including Java, C, C#, Go, Rust, Node.js, Python, Scala, Swift, and many more. While most of the prominent language clients have been developed and are maintained by the NATS team, the NATS community has supplied support for many additional languages.

NATS Connector framework

The NATS Connector Framework is an easy-to-use framework with an extensible plug-in mechanism that facilitates integration with other technologies. Custom connectors can be built quickly simply by implementing the appropriate frameworks interfaces.

NATS Messaging Patterns

NATS provides support for four messaging patterns.


Publish/Subscribe

In this pattern, message senders publish their messages to a particular subject. Subscribers receive messages when the subjects they have subscribed to receive messages from publishers.

Pub/Sub

We can have multiple publishers sending messages to a given subject and multiple subscribers receiving those messages. Every connected subscriber will receive each message published to the subject.


Subject-Based

Like Pub/Sub, Subject-based messaging also use publishers and subscribers. We can also have multiple publishers and subscribers.

Subject-based

However, we introduce the concept of Subject hierarchies and wildcards.

Subject Hierarchies

Within our subject names, we introduce the . character to identify elements within a hierarchy of similarly related subjects.

	 time.us
 time.us.east
 time.us.east.atlanta
 time.eu
 time.eu.east
 time.eu.prague


Here we see a fragment of a time hierarchy. Each successive element provides an increasing level of specificity.

Wildcard

Subscribers can employ two special wildcard characters when subscribing to a subject. The first wildcard is the * character which matches a single token. When used, this wildcard will allow any token in the corresponding hierarchy field to match.

The second wildcard is the > character. This wildcard must be the last character in the subject. It will match all tokens to the right of its position.

In the diagram above, the published message is sent to the time.us.east.atlanta subject. The first subscribe matches since both subjects are equal. However, the second subject won't match since the * wildcard character is in the third position and the published message contains a four-level hierarchy. It would have matched if the published subject was time.us.east since the wildcard would accept anything in the field containing the value east. Lastly, the third subscriber uses the > character in the same position, which matches everything to the right.

Publish Subject Subscriber Subject Matches
time.us.east.atlanta time.us.east.atlanta
time.us.east.atlanta time.us.east.*
time.us.east.atlanta time.us.*.atlanta
time.us.east.atlanta time.us.* x;
time.us.east.atlanta time.us.>

Request/Reply

When we use Publish/Subscriber or Subject-based messaging patterns, the subscriber isn't concerned with returning anything to the publisher. However, the message publisher may occasionally want to be notified of a result or that the message has been published. For these cases we us the Request/Reply message pattern.

Request/Reply

When the subscriber receives a message in the Request/Reply pattern, it has access to a replyTo address that points back to the publisher. Behind the scenes, NATS creates an Inbox-channel associated with the requester. With this information, we can send a response to the requester.

Queue Group

Queue groups allow us to distribute work across multiple subscribers to a subject. We can configure a group by supplying a queue group when we subscribe.

Queue Group

Under the covers, NATS sends each incoming message to only one subscriber. With this approach, we can scale our workload by growing and shrinking the number of subscribers.

NATS in docker

The simplest way to try out NATS is to run its official Docker image. To demonstrate this, let's spin up a single instance with the following commands:

docker network create nats-net

docker run --name nats-1 --network nats-net --rm -p 4222:4222 -p 6222:6222 -p 8222:8222 nats --name nats-1


Here we created a new docker network named nats-net and a new NATS instance named nats-1. We configured the NATS instance to expose three ports:

  • 4222 - NATS client port.
  • 6222 - NATS routing port for clustering.
  • 8222 - NATS HTTP managment port for reporting.


When we run the Docker command, we should see output similar to the following:

Terminal
[1] 2021/06/15 01:36:28.661367 [INF] Starting nats-server
[1] 2021/06/15 01:36:28.661464 [INF]   Version:  2.3.4
[1] 2021/06/15 01:36:28.661469 [INF]   Git:      [7112ae0]
[1] 2021/06/15 01:36:28.661477 [INF]   Name:     NDOEKCGFRUVDGKNGXJVOYM5SITZHP4X24UVYFK257XWFGUN54FNTEKMP
[1] 2021/06/15 01:36:28.661485 [INF]   ID:       NDOEKCGFRUVDGKNGXJVOYM5SITZHP4X24UVYFK257XWFGUN54FNTEKMP
[1] 2021/06/15 01:36:28.661493 [INF] Using configuration file: nats-server.conf
[1] 2021/06/15 01:36:28.663825 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/06/15 01:36:28.663890 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/06/15 01:36:28.664110 [INF] Server is ready
[1] 2021/06/15 01:36:28.664178 [INF] Cluster name is xkXle2Qc2T1dnKg5t1jh86
[1] 2021/06/15 01:36:28.664189 [WRN] Cluster name was dynamically generated, consider setting one
[1] 2021/06/15 01:36:28.664213 [INF] Listening for route connections on 0.0.0.0:6222


synadia nats-box

With our new NATS instance up and running, we need a way to test it out. Synadia has published a lightweight containerized nats-box tool packaged with NATS and NATS Streaming utilities. The nats-box container toolbox contains:


  • nats - NATS management utility ( target="_blank">README).
  • nsc - create NATS accounts and users.
  • nats-top - top-like tool for monitoring NATS servers.
  • stan-pub - publish messages to NATS Streaming.
  • stan-sub - subscribe to messages from NATS Streaming.
  • stan-bench - benchmark stan.

For now, our focus will be on the nats command that allows us to perform the following operations:

  • nats-pub - publish NATS messages
  • nats-req - send a request, receive a response over NATS
  • nats-sub - subscribe to NATS messages on a topic
  • stan-pub - publish messages to NATS Streaming
  • stan-sub - subscribe to messages from NATS Streaming
  • nats-top - top-like tool for monitoring NATS servers
  • nsc - create NATS accounts and users

We can start up NATS-BOX with the following command:

docker run --network nats-net --rm -it synadia/nats-box

The output should appear similar to the following:

Terminal
             _             _               
 _ __   __ _| |_ ___      | |__   _____  __
| '_ \ / _` | __/ __|_____| '_ \ / _ \ \/ /
| | | | (_| | |_\__ \_____| |_) | (_) >  < 
|_| |_|\__,_|\__|___/     |_.__/ \___/_/\_\
                                           
nats-box v0.5.0
59f611d7bc1c:~# 



Now that we have nats-box running, we can begin exercising our NATS instance. For example, we can get a nats-box cheatsheet with the following command:

nats cheat

The console output provides a listing of common commands:

Terminal
 # show a specific section of cheats
nats cheat pub

# list available sections
nats cheat --sections

# Save cheats to files in the format expected by 'cheats'
rm -rf .config/cheat/cheatsheets/personal/nats
nats cheat --save .config/cheat/cheatsheets/personal/nats
cheat nats/sub

# To view account information and connection
nats account info

# benchmark JetStream acknowledged publishes
nats bench --ack --msgs 10000 ORDERS.bench

# benchmark core nats publish and subscribe with 10 publishers and subscribers
nats bench --pub 10 --sub 10 --msgs 10000 --size 512

# To view common system events
nats events
nats events --short --all
nats events --no-srv-advisory --js-metric --js-advisory
nats events --no-srv-advisory --subjects service.latency.weather

# To test latency between 2 servers
nats latency --server srv1.example.net:4222 --server-b srv2.example.net:4222 --duration 10s

# To publish 100 messages with a random body between 100 and 1000 characters
nats pub destination.subject "{{ Random 100 1000 }}" -H Count:{{ Count }} --count 100

# To publish messages from STDIN
echo "hello world" | nats pub destination.subject

# To request a response from a server and show just the raw result
nats request destination.subject "hello world" -H "Content-type:text/plain" --raw

# To subscribe to messages, in a queue group and acknowledge any JetStream ones
nats sub source.subject --queue work --ack

# To subscribe to a randomly generated inbox
nats sub --inbox

# Create or update
nats context add development --server nats.dev.example.net:4222 [other standard connection properties]
nats context add ngs --description "NGS Connection in Orders Account" --nsc nsc://acme/orders/new
nats context edit development [standard connection properties]

# View contexts
nats context ls
nats context show development --json

# Validate all connections are valid and that connections can be established
nats context validate --connect

# Select a new default context
nats context select

# Connecting using a context
nats pub --context development subject body

# To set up a responder that runs an external command with the 3rd subject token as argument
nats reply "service.requests.>" --command "service.sh {{2}}"

# To set up basic responder
nats reply service.requests "Message {{Count}} @ {{Time}}"
nats reply service.requests --echo --sleep 10

# To see all available schemas using regular expressions
nats schema search 'response|request'

# To view a specific schema
nats schema show io.nats.jetstream.api.v1.stream_msg_get_request --yaml

# To validate a JSON input against a specific schema
nats schema validate io.nats.jetstream.api.v1.stream_msg_get_request request.json

# To see all servers, including their server ID and show a response graph
nats server ping --id --graph --user system

# To see information about a specific server
nats server info nats1.example.net --user system
nats server info NCAXNST2VH7QGBVYBEDQGX73GMBXTWXACUTMQPTNKWLOYG2ES67NMX6M --user system

# To list all servers and show basic summaries, expecting responses from 10 servers
nats server list 10 --user system

# To report on current connections
nats server report connections 
nats server report connz --account WEATHER
nats server report connz --sort in-msgs
nats server report connz --top 10 --sort in-msgs

# To report on accounts
nats server report accounts
nats server report accounts --account WEATHER --sort in-msgs --top 10

# To report on JetStream usage by account WEATHER
nats server report jetstream --account WEATHER --sort cluster

# To generate a NATS Server bcrypt command
nats server password
nats server pass -p 'W#OZwVN-UjMb8nszwvT2LQ'
nats server pass -g
PASSWORD='W#OZwVN-UjMb8nszwvT2LQ' nats server pass

# To request raw monitoring data from servers
nats server request subscriptions --detail --filter-account WEATHER --cluster EAST
nats server req variables --name nats1.example.net
nats server req connections --filter-state open
nats server req connz --subscriptions --name nats1.example.net
nats server req gateways --filter-name EAST
nats server req leafnodes --subscriptions
nats server req accounts --account WEATHER
nats server req jsz --leader

# To manage JetStream cluster RAFT membership
nats server cluster raft step-down



We can, of course, always call nats with the --help flag to obtain a listing of the CLI command.

nats --help


The console output provides a listing of CLI commands

Terminal
usage: nats [<flags>] <command> [<args> ...]

NATS Utility

NATS Server and JetStream administration.

See 'nats cheat' for a quick cheatsheet of commands



Flags:
  -h, --help                    Show context-sensitive help (also try --help-long and --help-man).
      --version                 Show application version.
  -s, --server=NATS_URL         NATS server urls
      --user=NATS_USER          Username or Token
      --password=NATS_PASSWORD  Password
      --creds=NATS_CREDS        User credentials
      --nkey=NATS_NKEY          User NKEY
      --tlscert=NATS_CERT       TLS public certificate
      --tlskey=NATS_KEY         TLS private key
      --tlsca=NATS_CA           TLS certificate authority chain
      --timeout=NATS_TIMEOUT    Time to wait on responses from NATS
      --js-api-prefix=PREFIX    Subject prefix for access to JetStream API
      --js-event-prefix=PREFIX  Subject prefix for access to JetStream Advisories
      --context=CONTEXT         Configuration context
      --trace                   Trace API interactions

Commands:
  help [<command>...]
    Show help.

  account info
    Account information

  backup [<flags>] <output>
    JetStream configuration backup utility

  bench [<flags>] <subject>
    Benchmark utility

  consumer add [<flags>] [<stream>] [<consumer>]
    Creates a new Consumer

  consumer copy [<flags>] <stream> <source> <destination>
    Creates a new Consumer based on the configuration of another

  consumer info [<flags>] [<stream>] [<consumer>]
    Consumer information

  consumer ls [<flags>] [<stream>]
    List known Consumers

  consumer next [<flags>] <stream> <consumer>
    Retrieves messages from Pull Consumers without interactive prompts

  consumer rm [<flags>] [<stream>] [<consumer>]
    Removes a Consumer

  consumer sub [<flags>] [<stream>] [<consumer>]
    Retrieves messages from Consumers

  consumer cluster step-down [<stream>] [<consumer>]
    Force a new leader election by standing down the current leader

  consumer report [<flags>] [<stream>]
    Reports on Consmer statistics

  context save [<flags>] <name>
    Update or create a context

  context edit <name>
    Edit a context in your EDITOR

  context ls
    List known contexts

  context rm [<flags>] <name>
    Remove a context

  context select [<name>]
    Select the default context

  context show [<flags>] [<name>]
    Show the current or named context

  context validate [<flags>] [<name>]
    Validate one or all contexts

  events [<flags>]
    Show Advisories and Events

  latency --server-b=SERVER-B [<flags>]
    Perform latency tests between two NATS servers

  pub [<flags>] <subject> [<body>]
    Generic data publish utilty

    Body and Header values of the messages may use Go templates to create unique messages.

      nats pub test --count 10 "Message {{Count}} @ {{Time}}"

    Multiple messages with random strings between 10 and 100 long:

      nats pub test --count 10 "Message {{Count}}: {{ Random 10 100 }}"

    Available template functions are:

      Count            the message number
      TimeStamp        RFC3339 format current time
      Unix             seconds since 1970 in UTC
      UnixNano         nano seconds since 1970 in UTC
      Time             the current time
      ID               an unique ID
      Random(min, max) random string at least min long, at most max 

  request [<flags>] <subject> [<body>]
    Generic data request utility

    Body and Header values of the messages may use Go templates to create unique messages.

      nats pub test --count 10 "Message {{Count}} @ {{Time}}"

    Multiple messages with random strings between 10 and 100 long:

      nats pub test --count 10 "Message {{Count}}: {{ Random 10 100 }}"

    Available template functions are:

      Count            the message number
      TimeStamp        RFC3339 format current time
      Unix             seconds since 1970 in UTC
      UnixNano         nano seconds since 1970 in UTC
      Time             the current time
      ID               an unique ID
      Random(min, max) random string at least min long, at most max 

  rtt [<flags>] [<iterations>]
    Compute round-trip time to NATS server

  reply [<flags>] <subject> [<body>]
    Generic service reply utility

    The "command" supports extracting some information from the subject the request came in on.

    When the subject being listened on is "weather.>" a request on "weather.london" can extract the "london" part and use it in the command string:

      nats reply 'weather.>' --command "curl -s wttr.in/{{1}}?format=3"

    This will request the weather for london when invoked as:

      nats request weather.london ''

    The body and Header values of the messages may use Go templates to create unique messages.

      nats reply test "Message {{Count}} @ {{Time}}"

    Multiple messages with random strings between 10 and 100 long:

      nats pub test --count 10 "Message {{Count}}: {{ Random 10 100 }}"

    Available template functions are:

      Count            the message number
      TimeStamp        RFC3339 format current time
      Unix             seconds since 1970 in UTC
      UnixNano         nano seconds since 1970 in UTC
      Time             the current time
      ID               an unique ID
      Random(min, max) random string at least min long, at most max 

  restore [<flags>] [<directory>]
    Restores a backup of JetStream configuration

  schema search [<flags>] [<pattern>]
    Search schemas using a pattern

  schema show [<flags>] <schema>
    Show the contents of a schema

  schema validate [<flags>] <schema> <file>
    Validates a JSON file against a schema

  server info [<server>]
    Show information about a single server

  server list [<flags>] [<expect>]
    List known servers

  server ping [<flags>] [<expect>]
    Ping all servers

  server report connections [<flags>] [<limit>]
    Report on connections

  server report accounts [<flags>] [<account>] [<limit>]
    Report on account activity

  server report jetstream [<flags>] [<limit>]
    Report on JetStream activity

  server request subscriptions [<flags>] [<wait>]
    Show subscription information

  server request variables [<wait>]
    Show runtime variables

  server request connections [<flags>] [<wait>]
    Show connection details

  server request routes [<flags>] [<wait>]
    Show route details

  server request gateways [<flags>] [<wait>] [<filter-name>]
    Show gateway details

  server request leafnodes [<flags>] [<wait>]
    Show leafnode details

  server request accounts [<flags>] [<wait>]
    Show account details

  server request jetstream [<flags>] [<wait>]
    Show JetStream details

  server raft step-down [<flags>]
    Force a new leader election by standing down the current meta leader

  server raft peer-remove [<flags>] [<name>]
    Removes a server from a JetStream cluster

  server passwd [<flags>]
    Creates encrypted passwords for use in NATS Server

  server check connection* [<flags>]
    Checks basic server connection

  server check stream --stream=STREAM --peer-expect=SERVERS [<flags>]
    Checks the health of mirrored streams, streams with sources or clustered streams

  server check meta --expect=SERVERS --lag-critical=OPS --seen-critical=DURATION
    Check JetStream cluster state

  stream add [<flags>] [<stream>]
    Create a new Stream

  stream edit [<flags>] [<stream>]
    Edits an existing stream

  stream info [<flags>] [<stream>]
    Stream information

  stream ls [<flags>]
    List all known Streams

  stream rm [<flags>] [<stream>]
    Removes a Stream

  stream purge [<flags>] [<stream>]
    Purge a Stream without deleting it

  stream copy [<flags>] <source> <destination>
    Creates a new Stream based on the configuration of another

  stream get [<flags>] [<stream>] [<id>]
    Retrieves a specific message from a Stream

  stream rmm [<flags>] [<stream>] [<id>]
    Securely removes an individual message from a Stream

  stream view [<flags>] [<stream>] [<size>]
    View messages in a stream

  stream report [<flags>]
    Reports on Stream statistics

  stream backup [<flags>] <stream> <target>
    Creates a backup of a Stream over the NATS network

  stream restore [<flags>] <stream> <file>
    Restore a Stream over the NATS network

  stream cluster step-down [<stream>]
    Force a new leader election by standing down the current leader

  stream cluster peer-remove [<stream>] [<peer>]
    Removes a peer from the Stream cluster

  stream template create [<flags>] [<stream>]
    Creates a new Stream Template

  stream template info [<flags>] [<template>]
    Stream Template information

  stream template ls [<flags>]
    List all known Stream Templates

  stream template rm [<flags>] [<template>]
    Removes a Stream Template

  sub [<flags>] [<subject>]
    Generic subscription client

  cheat [<flags>] [<section>]
    Cheatsheets for the nats CLI

    These cheatsheets are in a format compatible with the popular https://github.com/cheat/cheat command.


From our nats-box command line, we will create a subscriber:

nats sub -s "nats://nats-1:4222" test

This command connects to our NATS instance and subscribes to the test subject. The output should appear similar to the following:

14:42:57 Subscribing on test

We now have a client listening for messages sent to test.

Now open a second nats-box and issue it the following command:

nats pub -s "nats://nats-1:4222" test "hello nats"

This command connects to our NATS instance and publishes the message hello nats to the test subject. The output of this command should be similar to:

14:43:54 Published 10 bytes to "test"

Switching back to our first nats-box instance, we should see that the message we just published has been delivered to our subscriber.

[#1] Received on "test" hello nats

We can also use the nats-top utility to get a real-time view of our Nats instance. nats-top can display system summary information, including subscriptions, pending bytes, message count, and other NATS metrics. To run nats-top to view for our existing instance issue the following command:

nats-top -s nats-1

Your output should appear similar to the following:

Terminal
	NATS server version 2.3.4 (uptime: 5h0m12s)
Server:
  Load: CPU:  1.0%  Memory: 13.4M  Slow Consumers: 0
  In:   Msgs: 12  Bytes: 98  Msgs/Sec: 0.0  Bytes/Sec: 0
  Out:  Msgs: 12  Bytes: 98  Msgs/Sec: 0.0  Bytes/Sec: 0

Connections Polled: 1
  HOST                   CID    NAME                        SUBS    PENDING     MSGS_TO     MSGS_FROM   BYTES_TO    BYTES_FROM  LANG     VERSION  UPTIME   LAST ACTIVITY
  192.168.176.3:56792    5      NATS CLI Version 20210429   1       0           11          0           88          0           go       1.11.0   2m46s    2021-06-15 19:37:26.489476229 +0000 UTC


Now that our NATS instance is running and we can publish and subscribe messages, we can explore clustering NATS instances.


NATS clustering

When you add a messaging dependency to your microservice architecture it is important to consider the impact of a failure of the messaging system on the application. If messaging is critical to your application, then you should consider a messaging cluster. Clustering can help us achieve both high availability and fault tolerance.

Fortunately, NATS clustering is baked in. Let's transform the NATS instance we created earlier into a three-node cluster by issuing the following two commands (in separate terminal instances):

docker run --name nats-2 --network nats-net --rm nats --cluster nats://0.0.0.0:6222 --cluster_name nats-cluster --routes=nats://ruser:T0pS3cr3t@nats-1:6222 --name nats-2

docker run --name nats-3 --network nats-net --rm nats --cluster nats://0.0.0.0:6222 --cluster_name nats-cluster --routes=nats://ruser:T0pS3cr3t@nats-1:6222 --name nats-3

If we look at our original NATS instance output, we see that the two new nodes have connected to our initial instance:

Terminal
[1] 2021/06/15 01:36:28.661367 [INF] Starting nats-server
[1] 2021/06/15 01:36:28.661464 [INF]   Version:  2.3.4
[1] 2021/06/15 01:36:28.661469 [INF]   Git:      [7112ae0]
[1] 2021/06/15 01:36:28.661477 [INF]   Name:     NDOEKCGFRUVDGKNGXJVOYM5SITZHP4X24UVYFK257XWFGUN54FNTEKMP
[1] 2021/06/15 01:36:28.661485 [INF]   ID:       NDOEKCGFRUVDGKNGXJVOYM5SITZHP4X24UVYFK257XWFGUN54FNTEKMP
[1] 2021/06/15 01:36:28.661493 [INF] Using configuration file: nats-server.conf
[1] 2021/06/15 01:36:28.663825 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/06/15 01:36:28.663890 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/06/15 01:36:28.664110 [INF] Server is ready
[1] 2021/06/15 01:36:28.664178 [INF] Cluster name is xkXle2Qc2T1dnKg5t1jh86
[1] 2021/06/15 01:36:28.664189 [WRN] Cluster name was dynamically generated, consider setting one
[1] 2021/06/15 01:36:28.664213 [INF] Listening for route connections on 0.0.0.0:6222
[1] 2021/06/15 01:37:51.828651 [INF] 192.168.176.6:56232 - rid:3 - Route connection created
[1] 2021/06/15 01:37:51.831460 [INF] Cluster name updated to nats-cluster
[1] 2021/06/15 01:37:51.832863 [INF] 192.168.176.6:56232 - rid:3 - Router connection closed: Client Closed
[1] 2021/06/15 01:37:52.851006 [INF] 192.168.176.6:56242 - rid:4 - Route connection created
[1] 2021/06/15 01:37:59.944363 [INF] 192.168.176.7:57264 - rid:5 - Route connection created

We also see that the connecting nodes have updated the cluster name.

If we look at the nats-2 output, we can see two new route connections:

Terminal
[1] 2021/06/15 01:37:51.821981 [INF] Starting nats-server
[1] 2021/06/15 01:37:51.822052 [INF]   Version:  2.3.4
[1] 2021/06/15 01:37:51.822056 [INF]   Git:      [7112ae0]
[1] 2021/06/15 01:37:51.822060 [INF]   Name:     nats-2
[1] 2021/06/15 01:37:51.822063 [INF]   ID:       ND26GBOYNCAOT66E6QA4KXXHSVUIGUAW4F5XKN4BLN7YTWV7U47YSXC7
[1] 2021/06/15 01:37:51.824150 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/06/15 01:37:51.824370 [INF] Server is ready
[1] 2021/06/15 01:37:51.824409 [INF] Cluster name is nats-cluster
[1] 2021/06/15 01:37:51.824440 [INF] Listening for route connections on 0.0.0.0:6222
[1] 2021/06/15 01:37:51.830903 [INF] 192.168.176.2:6222 - rid:3 - Route connection created
[1] 2021/06/15 01:37:51.831122 [INF] 192.168.176.2:6222 - rid:3 - Router connection closed: Cluster Name Conflict
[1] 2021/06/15 01:37:52.851034 [INF] 192.168.176.2:6222 - rid:4 - Route connection created
[1] 2021/06/15 01:37:59.947086 [INF] 192.168.176.7:6222 - rid:5 - Route connection created

 
   

And looking at the nats-3 output, we can see two new route connections have been added as well.

Terminal
[1] 2021/06/15 01:37:59.933224 [INF] Starting nats-server
[1] 2021/06/15 01:37:59.933266 [INF]   Version:  2.3.4
[1] 2021/06/15 01:37:59.933270 [INF]   Git:      [7112ae0]
[1] 2021/06/15 01:37:59.933274 [INF]   Name:     nats-3
[1] 2021/06/15 01:37:59.933279 [INF]   ID:       NB4DPFI3A4BTMGRU5W5O56WUBL53CJMHUK27FGZXJ3TZ3G3AXNH6O5US
[1] 2021/06/15 01:37:59.936065 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/06/15 01:37:59.936457 [INF] Server is ready
[1] 2021/06/15 01:37:59.936481 [INF] Cluster name is nats-cluster
[1] 2021/06/15 01:37:59.936512 [INF] Listening for route connections on 0.0.0.0:6222
[1] 2021/06/15 01:37:59.945382 [INF] 192.168.176.2:6222 - rid:3 - Route connection created
[1] 2021/06/15 01:37:59.947130 [INF] 192.168.176.6:43246 - rid:4 - Route connection created


How does NATS clustering work? NATS servers use the Gossip Protocol to share data across nodes. Each node must connect to at least one other node. Once connected, they share any new information (gossip) between nodes. This gossip includes the presence of other available nodes. As nodes are added or removed, the gossip propagates between nodes and builds a dynamic mesh of NATS instances.


monitoring

A core requirement of any microservices is to provide a monitoring endpoint to allow an external entity to detect liveness and capture operational statistics. NATS provides us with a dedicated HTTP server that can expose both HTTP and HTTPS endpoints. We can configure the listener ports using the following flags:

Short flag Long flag Description
-m --http_port PORT HTTP port for monitoring.
-ms --https_port PORT HTTPS port for monitoring.

In our example above, we used the default port 8222 and exposed it in our docker run command. We can verify the monitoring point is available by opening a web browser and navigating to http://localhost:8222.

You should see a similar landing page:

NATS Monitoring Landing page

URL Information Type Description
varz General returns general information about the server state and configuration.
connz Connection Reports detailed information on current and recently closed connections.
routez Routing Endpoint on active cluster routes.
gatewayz Gateway Reports information about gateways used to create a NATS supercluster.
leafz Leaf Node Reports detail information on leaf node connections.
subz Subscription Routing Reports detailed information about current subscriptions and the routing data structure
accountz Account Reports information on server's active accounts
jsz JetStream Reports details information on JetStream

Each endpoint returns its data in JSON format.


Connecting services with NATS

Now that we have a working knowledge of NATS, we will develop three services that use it to communicate. These three services will demonstrate inter-service communication via the Publish/Subscribe, Request/Reply, and Queue Group messaging patterns.

Queue Worker Service

The first service we will create will be a Queue Worker service. This service will connect to NATS as part of a queue group allowing the work to be elastically scaled. When messages arrive, the incoming message is transformed using the current string operation. Additionally, the Queue Worker will subscribe to a subject to allow the string operation to be changed to one of the supported operations (capitalize, uppercase, lowercase, reverse).

Processor Service

This service acts as an intermediate service. It receives Request/Reply messages from an upstream service and published Request/Reply messages to downstream services.

REST Service

The last service will provide a REST endpoint from which we can trigger our initiating messages. The service will provide a method for changing the QueueWorker operation, and the second method provides a Request/Reply message to Processor Service. When invoking the REST Service, the output will be in the form:

<{original-message-text}> processed by <{name-of-queue-worker}> transformed into ({transformed-message})


The default transformation will be uppercase. If the Queue Worker receives an unknown operation type, it will set the operation to the default.


ImplementING the Services

We will be using Spring Boot, JNATS as our Java NATS client, along with the Docker-ized NATS cluster we've created.

Demo Architecture

Nats Rest Service

This service provides the REST front-end we will use to send our inbound messages to the NATS cluster.

Configuration

We configure our build environment using Maven pom.xml.

loading...

Here we see the dependencies common across all three services. The application is a simple Spring-Boot app with the JNATS dependency.

The application.properties provide our externalized service configuration.

loading...


Of note in the application.properties is the inclusion of the nats.servers list, which includes the hostnames and ports to configure the JNATS client.

NATS REST Service Application

This class provides our application entry point.

loading...

Of note in this class is the use of the @TypeHint(types = io.nats.client.impl.SocketDataPort.class, typeNames = "io.nats.client.impl.SocketDataPort") annotation. If the application is built using Spring Native AOT, this instructs the AOT compiler to include the SocketDataPort class in the native executable. This pattern is duplicated in three services.

loading...


Nats Processor Service

This service provides a somewhat contrived example of an intermediate message processor. It receives Request/Reply messages from the front-end and republishes them for processing by a Queue Worker.

Configuration

The application.properties provide our externalized service configuration.

loading...


Of note in the application.properties is the inclusion of the nats.servers list, which includes the hostnames and ports to configure the JNATS client.

loading...


Nats Queue Worker Service

This service demonstrates the use of NATS queue groups to distribute work across multiple services.

The application.properties provide our externalized service configuration.

loading...


Of note in the application.properties is the inclusion of the nats.servers list, which includes the hostnames and ports to configure the JNATS client.

loading...



Build the services

Each Maven project file is configured for building both traditional Spring-Boot fat-jar, Spring-Native native executable, and container artifacts. To build each, navigate to the project root and execute the corresponding command:

Fat jar file

mvn clean package


Native Executable and container

mvn clean spring-boot:build-image

Note: Spring Native builds take longer and require considerably more resources than their fat-jar counterpart.


Running The application

To run the application, we must start up our cluster and each of our services.

NATS Cluster

If you didn't build the cluster previously, go back and build the cluster. Once your cluster has started, you can proceed.

NAts Rest Service

Start an instance of the NATSRestService.

java -jar ./target/nats-rest-service-0.0.1-SNAPSHOT.jar


NATS Processor Service

Start an instance of the NATSProcessorService.

java -jar ./target/nats-processor-service-0.0.1-SNAPSHOT.jar

NATS Queue Worker Service

We will be running three instances of the NATSQueueWorkerService. We will start the first Queue Worker with is default settings.

java -jar ./target/nats-queue-worker-service-0.0.1-SNAPSHOT.jar

We then start two more instances and override both the queue.worker.name and server.port.

java -jar ./target/nats-queue-worker-service-0.0.1-SNAPSHOT.jar --queue.worker.name=QW-2 --server.port=8100

java -jar ./target/nats-queue-worker-service-0.0.1-SNAPSHOT.jar --queue.worker.name=QW-3 --server.port=8110



Exercising the application

Once we have our NATS cluster and services running, we can begin exercising the application. If we call the Request/Reply first, we can see the default transformation:

curl http://localhost:8080/request-reply/ test-message

The resulting output will be:

< test-message> processed by < QW-1> transformed into ( TEST-MESSAGE)

Queue Worker QW-1 transformed the message text, converting all characters to upper-case.

We can then change the transformation operation to capitalization with:

curl http://localhost:8080/fnf/ cap

Because this is a fire and forget message, we will not receive any output.

If we call the same URL again...

curl http://localhost:8080/request-reply/ test-message

The resulting output contains the original message with the first letter capitalized.

< test-message> processed by < QW-1> transformed into ( Test-message)

We can change the operation to reverse with:

curl http://localhost:8080/fnf/ rev

If we call the same URL again...

curl http://localhost:8080/request-reply/ test-message

The resulting output contains the original message with the message's content reversed.

< test-message> processed by < QW-1> transformed into ( egassem-tset)

Lastly, We can change the operation to lower-case with:

curl http://localhost:8080/fnf/ lc

If we call the same URL again, this time with the message content all uppercase...

curl http://localhost:8080/request-reply/ TEST-MESSAGE

The resulting output contains the original message text transformed to all lower-case characters.

< test-message> processed by < QW-1> transformed into ( test-message)



Scaling Our Queue Workers

We can quickly scale our Queue Workers by simply deploying additional instances.

java -jar ./target/nats-queue-worker-service-0.0.1-SNAPSHOT.jar --queue.worker.name=QW-2 --server.port=8085

java -jar ./target/nats-queue-worker-service-0.0.1-SNAPSHOT.jar --queue.worker.name=QW-3 --server.port=8086

Here we start two additional instances QW-2 and QW3, configured on separate ports. Once these have started, NATS will send each message from the log to a single consumer in the group. The distribution of these messages is not specified, therefore applications should not rely on an expected delivery scheme. Since we aren't guaranteed a specific message distribution pattern, we need to put the server under load to coax NATS to distribute the message. We can use the xargs command to issue multiple, parallel, curl calls. This approach is usually sufficient to see the workload split across the Queue Workers.

xargs -I % -P 5 curl "http://localhost:8080/request-reply/test-message" < <(printf '%s\n\n' {1..3})

The output should be similar to the following:

Terminal
 <TEST-MESSAGE> processed by <QW-1> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-1> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-2> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-1> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-3> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-2> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-3> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-1> transformed into (test-message)
  
 <test-message> processed by <QW-3> transformed into (test-message)
  
 <TEST-MESSAGE> processed by <QW-2> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-3> transformed into (test-message)
  
 <test-message> processed by <QW-1> transformed into (test-message)
  
 <TEST-MESSAGE> processed by <QW-3> transformed into (test-message)
  
 <TEST-MESSAGE> processed by <QW-1> transformed into (test-message)
 
 <TEST-MESSAGE> processed by <QW-2> transformed into (test-message)

So we can see that building a NATS messaging cluster is pretty painless. When you need an easily scalable messaging platform that is fault-tolerant and highly available, NATS is a great option.


Coming up

In this article, we have focused on the core messaging patterns in NATS. However, we did not discuss message streaming. In the next article, we will explore NATS message streaming with JetStream.

Resources