Kubernetes Client

The Trouble with Rest

Microservice architectures rely on the ability of their component services to communicate with each other andthe client. For many, REST is the de-facto standard for communicating between clients and services.

The Good

With REST, we get a standard, vendor-neutral approach for exposing a service's API over HTTP. Furthermore, by providing a clean separation between the client and the server, we can more easily support different client types.

From a development perspective, REST's broad adoption and support across languages and frameworks makes it an un-controversial choice with development teams.

The Bad

However, the average user's expectations of how web applications should behave have changed considerably since Roy Fielding first conceived REST in 2000. In that world, the Request/Response model was how we interacted with servers. This model is at the very core of HTTP. Fielding built REST on top of the HTTP protocol to leverage HTTP's request methods. Unfortunately, if we limit ourselves to REST, we constrain ourselves to employ only the synchronous request/response interaction pattern.

"if all you have is a hammer, everything looks like a nail."


If we look at every interaction between client and server or between services as a simple synchronous Request/Response pair, we may be falling victim to the same cognitive bias that Maslow describes. The reality is that REST's synchronous request/response model is only one of several common interaction models. If we broaden our perspective, we find can identify three more interaction patterns:

Fire and Forget

The Fire-and-Forget model is applicable when the caller sends a request but does not need a response. We may be tempted to derive this behavior with REST by having the server send an empty response. However, this approach accomplishes little as we are still required by HTTP to send a response, even if it is empty. In this case, we waste resources handling an unecessary (empty) response.

Request/Stream

Another common interaction pattern is the Request/Stream model. In this pattern, we make a request to a service that asynchronously streams its responses as they become available. Here we see a fundamental limitation of REST become more apparent. We can only send one REST response. A common solution to circumvent this limitation is for the caller poll the service periodically. However, this creates an additional load on both the caller and the service in terms of implementation and runtime resource consumption. For each poll interval, the service must process every request, even when no new data is available. Additionally, the caller is starved of any new data until the end of the current poll interval.

Channel

Finally, How do we model a conversation between two services?. The Channel interaction model contains two independent streams (inbound/outbound) through which two parties can communicate. While HTTP is built on top of the bi-directional Transmission Control Protocol, HTTP (and REST by extension) don't support bi-direction communications.

The Ugly

In addition to limiting our interaction models to request/response, we must also acknowledge the performance impact from REST. While we can meet our availability targets by elastically scaling our services horizontally, this alone is a blunt instrument. If we inspect our service containers, we often find that they are running out of memory while the CPU is barely utilized.

This situation is due in large part to the thread-per-request model common when using HTTP. As the name implies, each request is assigned a specific thread to process the request and response. With the creation of a thread, there is a corresponding allocation of memory needed to support it while it is active. If the server receives too many requests and the threads are long-lived, the aggregate memory consumed can quickly exceed the available memory. Thus, we end up with services that spend most of their time idle, blocking on I/O.

What we need is a non-blocking i/o protocol that is more efficient with our services memory heap. A good candidate to solve this is RSocket

Introducing Rsocket

RSocket is a relatively new protocol created by Netflix to support Reactive Streams. It provides a message-driven, binary protocol that supplements the traditional Request/Response interaction pattern with several additional patterns. Because RSocket is reactive, it is able to manage the flow of data across an asynchronous boundary. This approachreduces the need for either side from buffering an arbitrary amount of data.

Protocols

RSocket is a binary protocol used on byte-stream transports (e.g. TCP, WebSocket, and Aeron) . It is composed of frames and messages. Each frame contains a frame header, meta-data, and payload message containing a request, response or protocol processing. There are currently 16 types of frames:

Type Value Description
RESERVED 0x00 Reserved
SETUP 0x01 Setup: Sent by client to initiate protocol processing.
LEASE 0x02 Lease: Sent by Responder to grant the ability to send requests.
KEEPALIVE 0x03 Keepalive: Connection keepalive.
REQUEST_RESPONSE 0x04 Request Response: Request single response.
REQUEST_FNF 0x05 Fire And Forget: A single one-way message.
REQUEST_STREAM 0x06 Request Stream: Request a completable stream.
REQUEST_CHANNEL 0x07 Request Channel: Request a completable stream in both directions.
REQUEST_N 0x08 Request N: Request N more items with Reactive Streams semantics.
CANCEL 0x09 Cancel Request: Cancel outstanding request.
PAYLOAD 0x0A Payload: Payload on a stream. For example, response to a request, or message on a channel.
ERROR 0x0B Error: Error at connection or application level.
METADATA_PUSH 0x0C Metadata: Asynchronous Metadata frame
RESUME 0x0D Resume: Replaces SETUP for Resuming Operation (optional)
RESUME_OK 0x0E Resume OK : Sent in response to a RESUME if resuming operation possible (optional)
EXT 0x01 Extension Header: Used To Extend more frame types as well as extensions.
(For more details refer to RSocket Protocol documentation.)

In this article we will focus on the four interaction models:


Interaction models

Fire-and-Forget

Fire-and-Forget is used when we don't care about a response to our request.

Future<Void> completionSignalOfSend = socketClient.fireAndForget(message);


Frame Contents

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           Stream ID                           |
+-----------+-+-+-+-------------+-------------------------------+
|Frame Type |0|M|F|    Flags    |
+-------------------------------+
                      Metadata & Request Data 



Request/Response (single response)

While this model resembles the traditional request/response interaction, under the covers, the request doesn't block. By avoiding blocking operations, we reduce resource consumption (e.g., thread, memory, etc.) and improve scalability.

Future<Payload> response = socketClient.requestResponse(requestPayload);


Frame Contents

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           Stream ID                           |
+-----------+-+-+-+-------------+-------------------------------+
|Frame Type |0|M|F|     Flags   |
+-------------------------------+
                     Metadata & Request Data



Request/Stream (multi-response)

Like request/response, the response may actually be composed of multiple responses streamed to the requester as they become available.

Publisher<Payload> response = socketClient.requestStream(requestPayload);


Frame Contents

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           Stream ID                           |
+-----------+-+-+-+-------------+-------------------------------+
|Frame Type |0|M|F|    Flags    |
+-------------------------------+-------------------------------+
|0|                    Initial Request N                        |
+---------------------------------------------------------------+
                      Metadata & Request Data 



Channel

The last interaction model is the channel. Channels provide bi-directional streaming between the requester and the responder, effectively allowing either side of the connection to act in both roles.

Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input);


Frame Contents

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           Stream ID                           |
+-----------+-+-+-+-+-----------+-------------------------------+
|Frame Type |0|M|F|C|  Flags    |
+-------------------------------+-------------------------------+
|0|                    Initial Request N                        |
+---------------------------------------------------------------+
                       Metadata & Request Data 



Flow Control

RSocket uses Reactive Stream semantics for flow control. The requester grants the responder credit for the number of PAYLOADs it can send. This is often referred to as request(n) where (n) is the number of PAYLOADs the responder may send. With each request(n), the total number of PAYLOADs increase. This mechanism provides application-level feedback of work units completed in contrast to TCP flow control which is focused on byte-level data transmission.

With this type of flow control, we can avoid head of line blocking ,which can overwhelm both network and application buffers. With application-layer request(n) semantics, a consumer can indicate how much data it can accept on a stream and allows the responder to react accordingly


Service to Service with RSocket

Now that we understand the what and the why let's look at the how. In this article, we will be building two services to demonstrate how to implement RSocket communications. The RSocketService will provide an RSocket endpoint and perform some trivial processing. We will then build the RSocketClientService. It will, unsurprisingly, act as our client service, and connect to the RSocketService's RSocket port. Finally, we will add a simple REST endpoint to enable us to trigger the client.

Implementing With Spring RSocket

We will implement both services using the Spring framework. Spring supports a reactive programming model using ProjectReactor and also provides us with the Spring RSocket library. Both services will be packaged as jar files, GraalVM native executables, and container images. A docker-compose file will be provided to run the two containers.

RSocket Service

As mentioned previously, this service provides us with an RSocket endpoint. We will provide routes to demonstrate the four interaction models:

  • Fire and Forget
  • Request/Reply
  • Request/Stream
  • Channel


Dependencies

loading...


Of note, in the dependency list, we find the spring-boot-starter-rsocket that provides most of the RSocket goodness. We also include lombok to enhance our message models. Lastly, we include reactor-test to control our reactive unit-test elements.

Spring AOT plugin

loading...


As we have seen in previous articles, the spring-aot-maven-plugin plugin provides us with GraalVM native executable generation and container packaging.


Configuration

loading...


A single configuration property ( spring.rsocket.server.port) sets the port the controller will listen for connections on.


RSocket Service Application

Our service application provides the minimal Spring boilerplate necessary to boot the application.

loading...


RSocket Service Models

We will be using three message types in the service. The MessageType enumeration denotes the types:

Message Type

loading...


Since each message shares common elements, each will inherit from AbstractMessage. This abstract class leverages a bit of the Lombok goodness to provide inheritable builder support with @SuperBuilder annotation. We get a default value mechanism supplied with @Builder.Default.

Abstract Message

loading...


Request

Here we add a single message variable to carry the request text.

loading...


Response

Here we add the original requestMessage and a corresponding responseMessage.

loading...


Event

An Event differs from a request in that it is neither a request nor a response to a request. In this message type, we include a message string to carry the event's text data.

loading...


Connection Log Entry

When an RSocket connection state changes, we create a new ConnectionLogEntry instance. We store the connection type, and the transition state.

loading...


RSocket Controller

The controller is the heart of the service. Here we define our RSocket routes and map them to our handler methods with the @MessageMapping annotation. Also included in this class is the registerRequester method, which logs our client connections state transitions.

loading...


Testing the RSocket Service

Testing our RSocket code is simplified with the inclusion of the io.projectreactor/reactor-test dependency. This dependency provides us with the StepVerifier interface in our unit tests. StepVerfier gives us a declarative way of creating scripts for async Publisher sequences.

loading...


In our unit tests, we call the static StepVerifier create method and then declare our assertion in the corresponding sequence.


Build the Jar

Before we can run the service, we must first build, test, and package it. We can do this all with the following command:

mvn clean package



Run the Jar

If everything compiled, and all the tests passed, your target directory will contain a jar file with the serviceand all its dependencies included. We can run it like any Java jar file:

java -jar ./target/rsocket-service-0.0.1-SNAPSHOT.jar


The console output should appear similar to the following:

Terminal
  ___  ___          _       _   ___              _        
 | _ \/ __| ___  __| |_____| |_/ __| ___ _ ___ _(_)__ ___ 
 |   /\__ \/ _ \/ _| / / -_)  _\__ \/ -_) '_\ V / / _/ -_)
 |_|_\|___/\___/\__|_\_\___|\__|___/\___|_|  \_/|_\__\___|
                                                          


2021-05-11 11:12:35.892  INFO 13187 --- [           main] c.t.rsocket.RsocketServiceApplication    : Starting RsocketServiceApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 13187 (/home/workspace/rsocket-service/target/rsocket-service-0.0.1-SNAPSHOT.jar started by workspace in /home//workspace/rsocket-service)
2021-05-11 11:12:35.899  INFO 13187 --- [           main] c.t.rsocket.RsocketServiceApplication    : No active profile set, falling back to default profiles: default
2021-05-11 11:12:38.429  INFO 13187 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7000
2021-05-11 11:12:38.460  INFO 13187 --- [           main] c.t.rsocket.RsocketServiceApplication    : Started RsocketServiceApplication in 3.479 seconds (JVM running for 4.495)
2021-05-11 11:12:38.462  INFO 13187 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2021-05-11 11:12:38.465  INFO 13187 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC




Build the GraalVM Native Image

Since Spring supports the generation of GraalVM native executables packaged in a container, let's build that too! Note: the GraalVM native image generation takes considerably longer than the basic build. It also requires more computing resources (CPU and Memory) so be prepared to wait.

mvn spring-boot:build-image


Run the container

Now we can run the GraalVM native executable service in its own container.

docker run --rm -p 7000:7000 rsocket-service:0.0.1-SNAPSHOT


Invoking the service with RSC

Now that we have a working RSocket service, it would be great if we could invoke it using curl. Unfortunately, curl doesn't work with RSockets. Fortunately, there is an open-source RSocket CLI tool called RSC that gives us similar powers.

The RSC project provides a feature-rich CLI for RSockets. The tool is written in Java and can be downloaded as a JAR file or installed using Homebrew, Scoop or Coursier.

Once installed, we can test it with:

rsc --version


If it was installed correctly, you should see output similar to this:

"version": "0.9.1", "build": "2021-06-28T02:57:04Z", "rsocket-java": "1.1.1"}


We can get a listing of the CLI flags using:

rsc --help


Terminal
usage: rsc [options] uri

Non-option arguments:
[String: uri]        

Option                                Description                            
------                                -----------                            
--ab, --authBearer [String]           Enable Authentication Metadata         
                                        Extension (Bearer).                  
--authBasic [String]                  [DEPRECATED] Enable Authentication     
                                        Metadata Extension (Basic). This     
                                        Metadata exists only for the         
                                        backward compatibility with Spring   
                                        Security 5.2                         
--channel                             Shortcut of --im REQUEST_CHANNEL       
--completion [ShellType]              Output shell completion code for the   
                                        specified shell (bash, zsh, fish,    
                                        powershell)                          
-d, --data [String]                   Data. Use '-' to read data from        
                                        standard input. (default: )          
--dataMimeType, --dmt [String]        MimeType for data (default:            
                                        application/json)                    
--debug                               Enable FrameLogger                     
--delayElements [Long]                Enable delayElements(delay) in milli   
                                        seconds                              
--dumpOpts                            Dump options as a file that can be     
                                        loaded by --optsFile option          
--fnf                                 Shortcut of --im FIRE_AND_FORGET       
-h, --help [String]                   Print help                             
--im, --interactionModel              InteractionModel (default:             
  [InteractionModel]                    REQUEST_RESPONSE)                    
-l, --load [String]                   Load a file as Data. (e.g. ./foo.txt,  
                                        /tmp/foo.txt, https://example.com)   
--limitRate [Integer]                 Enable limitRate(rate)                 
--log [String]                        Enable log()                           
-m, --metadata [String]               Metadata (default: )                   
--metadataMimeType, --mmt [String]    MimeType for metadata (default:        
                                        application/json)                    
--optsFile [String]                   Configure options from a YAML file (e. 
                                        g. ./opts.yaml, /tmp/opts.yaml,      
                                        https://example.com/opts.yaml)       
--printB3                             Print B3 propagation info. Ignored     
                                        unless --trace is set.               
-q, --quiet                           Disable the output on next             
-r, --route [String]                  Enable Routing Metadata Extension      
--request                             Shortcut of --im REQUEST_RESPONSE      
--resume [Integer]                    Enable resume. Resume session duration 
                                        can be configured in seconds.        
--retry [Integer]                     Enable retry. Retry every 1 second     
                                        with the given max attempts.         
--sd, --setupData [String]            Data for Setup payload                 
--setupMetadata, --sm [String]        Metadata for Setup payload             
--setupMetadataMimeType, --smmt       Metadata MimeType for Setup payload    
  [String]                              (default: application/json)          
--showSystemProperties                Show SystemProperties for troubleshoot 
--stacktrace                          Show Stacktrace when an exception      
                                        happens                              
--stream                              Shortcut of --im REQUEST_STREAM        
--take [Integer]                      Enable take(n)                         
--trace [TracingMetadataCodec$Flags]  Enable Tracing (Zipkin) Metadata       
                                        Extension. Unless sampling state     
                                        (UNDECIDED, NOT_SAMPLE, SAMPLE,      
                                        DEBUG) is specified, DEBUG is used   
                                        if no state is specified.            
--trustCert [String]                  PEM file for a trusted certificate. (e.
                                        g. ./foo.crt, /tmp/foo.crt, https:   
                                        //example.com/foo.crt)               
-u, --as, --authSimple [String]       Enable Authentication Metadata         
                                        Extension (Simple). The format must  
                                        be 'username:password'.              
-v, --version                         Print version                          
-w, --wiretap                         Enable wiretap                         
--wsHeader, --wsh [String]            Header for web socket connection       
--zipkinUrl [String]                  Zipkin URL to send a span (e.g. http:  
                                        //localhost:9411). Ignored unless -- 
                                        trace is set. 



Fire and Forget

Now that we have verified RSC is working correctly, let's test our Fire and Forget route. We can do that with the following command:

rsc --debug --fnf --data='{"source":"rsc","message":"this is a request"}' --route fire-and-forget tcp://localhost:7000


flags

  • --debug enable the framelogger output.
  • --fnf shortcut of --im FIRE_AND_FORGET.
  • --data provides the data payload.
  • --route enable Routing Metadata Extension.


Your output should be similar to this:

Terminal
2021-05-11 17:18:57.212 DEBUG 26084 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 75
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74                                     |rget            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",|
|00000010| 22 6d 65 73 73 61 67 65 22 3a 22 74 68 69 73 20 |"message":"this |
|00000020| 69 73 20 61 20 72 65 71 75 65 73 74 22 7d       |is a request"}  |
+--------+-------------------------------------------------+----------------+
 



Here we can see the two frames that make up our Fire and Forget message.


Request and Response

The next interaction model we will test is Request/Response:

rsc --debug --request --data='{"source":"rsc","message":"hello"}' --route request-response tcp://localhost:7000


flags

  • -debug enable the framelogger output.
  • -request shortcut of --im REQUEST_RESPONSE .
  • -data provides the data payload.
  • --route enable Routing Metadata Extension.
Your output should be similar to this:

Terminal
2021-05-11 17:26:25.699 DEBUG 26722 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 64
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65                                  |ponse           |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",|
|00000010| 22 6d 65 73 73 61 67 65 22 3a 22 68 65 6c 6c 6f |"message":"hello|
|00000020| 22 7d                                           |"}              |
+--------+-------------------------------------------------+----------------+
2021-05-11 17:26:25.802 DEBUG 26722 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 260
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 31 37 32 63 37 61 30 64 2d |uuid":"172c7a0d-|
|00000040| 33 31 65 36 2d 34 34 33 32 2d 62 66 35 65 2d 61 |31e6-4432-bf5e-a|
|00000050| 66 30 64 37 35 32 32 65 34 30 32 22 2c 22 63 72 |f0d7522e402","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 31 38 35 37 32 37 2c 22 74 79 70 65 22 3a 22 52 |473121,"type":"R|
|00000080| 45 53 50 4f 4e 53 45 22 2c 22 72 65 71 75 65 73 |ESPONSE","reques|
|00000090| 74 55 75 69 64 22 3a 22 37 39 63 31 37 31 61 62 |tUuid":"79c171ab|
|000000a0| 2d 30 34 66 31 2d 34 30 39 30 2d 62 35 63 39 2d |-04f1-4090-b5c9-|
|000000b0| 35 39 33 32 39 33 39 35 35 65 30 63 22 2c 22 72 |593293955e0c","r|
|000000c0| 65 71 75 65 73 74 4d 65 73 73 61 67 65 22 3a 22 |equestMessage":"|
|000000d0| 68 65 6c 6c 6f 22 2c 22 72 65 73 70 6f 6e 73 65 |hello","response|
|000000e0| 4d 65 73 73 61 67 65 22 3a 22 52 45 43 45 49 56 |Message":"RECEIV|
|000000f0| 45 44 20 3a 20 27 68 65 6c 6c 6f 27 22 7d       |ED : 'hello'"}  |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"172c7a0d-31e6-4432-bf5e-af0d7522e402","createdAt":1620763473121,"type":"RESPONSE","requestUuid":"79c171ab-04f1-4090-b5c9-593293955e0c","requestMessage":"hello","responseMessage":"RECEIVED : 'hello'"}      



Here we see three frames. The first two setup our request payload, and the third contains the response payload.


Request Stream

To exercise the Request/Stream interaction model, we use the following command:

rsc --debug --stream --data='{"source":"rsc","message" : "Get Events"}' --route request-stream tcp://localhost:7000


flags

  • --debug enable the framelogger output.
  • --stream shortcut of --im REQUEST_STREAM.
  • --data provides the data payload.
  • --route enable Routing Metadata Extension.
Your output should be similar to this:

Terminal
2021-05-11 17:27:17.079 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2021-05-11 17:27:17.162 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 73 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0f 0e 72 65 71 75 65 73 74 2d 73 74 72 |.....request-str|
|00000010| 65 61 6d                                        |eam             |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",|
|00000010| 22 6d 65 73 73 61 67 65 22 20 3a 20 22 47 65 74 |"message" : "Get|
|00000020| 20 45 76 65 6e 74 73 22 7d                      | Events"}       |
+--------+-------------------------------------------------+----------------+
2021-08-11 17:27:17.211 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 223
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 36 30 62 61 66 30 35 65 2d |uuid":"60baf05e-|
|00000040| 30 32 61 62 2d 34 31 37 34 2d 61 63 62 34 2d 35 |02ab-4174-acb4-5|
|00000050| 61 36 30 62 64 61 35 63 38 30 32 22 2c 22 63 72 |a60bda5c802","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |483121,"type":"E|
|00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":|
|00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn|
|000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,|
|000000b0| 20 74 79 70 65 3d 46 49 52 45 5f 41 4e 44 5f 46 | type=FIRE_AND_F|
|000000c0| 4f 52 47 45 54 2c 20 73 74 61 74 65 3d 43 4f 4e |ORGET, state=CON|
|000000d0| 4e 45 43 54 45 44 29 22 7d                      |NECTED)"}       |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"60baf05e-02ab-4174-acb4-5a60bda5c802","createdAt":1620763483121,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=CONNECTED)"}
2021-05-11 17:27:17.217 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 226
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 61 37 35 62 37 64 37 39 2d |uuid":"a75b7d79-|
|00000040| 33 33 62 66 2d 34 36 61 38 2d 39 34 36 38 2d 66 |33bf-46a8-9468-f|
|00000050| 30 35 30 63 37 31 31 65 62 63 32 22 2c 22 63 72 |050c711ebc2","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |489621,"type":"E|
|00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":|
|00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn|
|000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,|
|000000b0| 20 74 79 70 65 3d 46 49 52 45 5f 41 4e 44 5f 46 | type=FIRE_AND_F|
|000000c0| 4f 52 47 45 54 2c 20 73 74 61 74 65 3d 44 49 53 |ORGET, state=DIS|
|000000d0| 43 4f 4e 4e 45 43 54 45 44 29 22 7d             |CONNECTED)"}    |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"a75b7d79-33bf-46a8-9468-f050c711ebc2","createdAt":1620763489621,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=DISCONNECTED)"}
2021-05-11 17:27:17.220 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 224
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 31 64 36 35 62 31 38 35 2d |uuid":"1d65b185-|
|00000040| 66 39 65 66 2d 34 33 38 65 2d 61 61 39 64 2d 36 |f9ef-438e-aa9d-6|
|00000050| 32 30 34 39 66 38 61 63 30 39 31 22 2c 22 63 72 |2049f8ac091","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |491211,"type":"E|
|00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":|
|00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn|
|000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,|
|000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 52 45 | type=REQUEST_RE|
|000000c0| 53 50 4f 4e 53 45 2c 20 73 74 61 74 65 3d 43 4f |SPONSE, state=CO|
|000000d0| 4e 4e 45 43 54 45 44 29 22 7d                   |NNECTED)"}      |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"1d65b185-f9ef-438e-aa9d-62049f8ac091","createdAt":1620763491211,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=CONNECTED)"}
2021-05-11 17:27:17.223 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 227
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 39 38 33 34 31 31 38 38 2d |uuid":"98341188-|
|00000040| 31 63 36 64 2d 34 32 30 39 2d 62 62 63 36 2d 62 |1c6d-4209-bbc6-b|
|00000050| 32 35 38 37 38 63 37 65 32 63 62 22 2c 22 63 72 |25878c7e2cb","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |511024,"type":"E|
|00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":|
|00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn|
|000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,|
|000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 52 45 | type=REQUEST_RE|
|000000c0| 53 50 4f 4e 53 45 2c 20 73 74 61 74 65 3d 44 49 |SPONSE, state=DI|
|000000d0| 53 43 4f 4e 4e 45 43 54 45 44 29 22 7d          |SCONNECTED)"}   |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"98341188-1c6d-4209-bbc6-b25878c7e2cb","createdAt":1620763511024,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=DISCONNECTED)"}
2021-05-11 17:27:17.227 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 222
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","|
|00000030| 75 75 69 64 22 3a 22 38 36 32 35 33 66 62 65 2d |uuid":"86253fbe-|
|00000040| 30 61 36 65 2d 34 64 30 32 2d 62 31 35 63 2d 65 |0a6e-4d02-b15c-e|
|00000050| 34 65 31 61 62 31 61 37 35 64 62 22 2c 22 63 72 |4e1ab1a75db","cr|
|00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763|
|00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |512310,"type":"E|
|00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":|
|00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn|
|000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,|
|000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 53 54 | type=REQUEST_ST|
|000000c0| 52 45 41 4d 2c 20 73 74 61 74 65 3d 43 4f 4e 4e |REAM, state=CONN|
|000000d0| 45 43 54 45 44 29 22 7d                         |ECTED)"}        |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"rsc","uuid":"86253fbe-0a6e-4d02-b15c-e4e1ab1a75db","createdAt":1620763512310,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_STREAM, state=CONNECTED)"}
2021-05-11 17:27:17.228 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:     



The result of the command is seven frames. The first two handle setup and request payload, while the last five frames contain payloads streamed from the service.


Channel

Channel is the last of our interaction models. We initiate the channel with the following command:

rsc --debug --channel --data='{"source":"rsc","message":"Request #1"}' --route channel tcp://localhost:7000


flags

  • --debug enable the framelogger output.
  • --channel shortcut of --im REQUEST_CHANNEL .
  • --data provides the data payload.
  • --route enable Routing Metadata Extension.
Your output should be similar to this:

Terminal
2021-05-11 17:29:20.892 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2021-05-11 17:29:20.989 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 64 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 08 07 63 68 61 6e 6e 65 6c             |.....channel    |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",|
|00000010| 22 6d 65 73 73 61 67 65 22 3a 22 52 65 71 75 65 |"message":"Reque|
|00000020| 73 74 20 23 31 22 7d                            |st #1"}         |
+--------+-------------------------------------------------+----------------+
2021-05-11 17:29:20.990 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:

2021-05-11 17:29:21.069 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 150
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 6e 75 6c 6c 2c 22 |{"source":null,"|
|00000010| 64 65 73 74 69 6e 61 74 69 6f 6e 22 3a 6e 75 6c |destination":nul|
|00000020| 6c 2c 22 75 75 69 64 22 3a 22 30 65 64 62 37 30 |l,"uuid":"0edb70|
|00000030| 34 32 2d 61 32 66 30 2d 34 32 61 36 2d 38 38 32 |42-a2f0-42a6-882|
|00000040| 66 2d 62 31 35 34 62 65 64 61 61 64 33 39 22 2c |f-b154bedaad39",|
|00000050| 22 63 72 65 61 74 65 64 41 74 22 3a 31 36 32 38 |"createdAt":1628|
|00000060| 37 31 37 33 36 31 30 33 37 2c 22 74 79 70 65 22 |717361037,"type"|
|00000070| 3a 22 45 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 |:"EVENT","messag|
|00000080| 65 22 3a 22 52 65 71 75 65 73 74 20 23 31 22 7d |e":"Request #1"}|
+--------+-------------------------------------------------+----------------+
{"source":null,"destination":null,"uuid":"0edb7042-a2f0-42a6-882f-b154bedaad39","createdAt":1628717361037,"type":"EVENT","message":"Request #1"}
2021-05-11 17:29:21.076 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807
Data:

2021-05-11 17:29:40.978 DEBUG 27033 --- [     parallel-1] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:

2021-08-11 17:29:40.982 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:





Here we see the first two frames setup our outgoing stream and its first payload. The next frame contains our first response from the server. We then receive three more message frames. The first is a REQUEST_N indicating the service is asking for more work. The last two are KEEPALIVE frames.

We can test the service's outbound stream by calling the Fire and Forget again. The service receives this message and will notify any outbound streams on its arrival.

Terminal
2021-05-11 17:31:57.630 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

2021-05-11 17:31:57.697 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 75
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74                                     |rget            |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",|
|00000010| 22 6d 65 73 73 61 67 65 22 3a 22 74 68 69 73 20 |"message":"this |
|00000020| 69 73 20 61 20 72 65 71 75 65 73 74 22 7d       |is a request"}  |
+--------+-------------------------------------------------+----------------+
2021-05-11 17:31:57.737 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
Data:
    



The channel connection will then receive the new event, and it will be echoed in the console.

Terminal
2021-05-11 17:31:41.014 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:

2021-05-11 17:31:57.727 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 195
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK|
|00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest|
|00000020| 69 6e 61 74 69 6f 6e 22 3a 22 42 52 4f 41 44 43 |ination":"BROADC|
|00000030| 41 53 54 22 2c 22 75 75 69 64 22 3a 22 34 63 66 |AST","uuid":"4cf|
|00000040| 64 30 66 66 38 2d 64 36 62 39 2d 34 66 36 32 2d |d0ff8-d6b9-4f62-|
|00000050| 62 33 66 65 2d 32 31 37 33 32 39 33 63 38 37 32 |b3fe-2173293c872|
|00000060| 65 22 2c 22 63 72 65 61 74 65 64 41 74 22 3a 31 |e","createdAt":1|
|00000070| 36 32 38 37 31 37 35 31 37 37 32 32 2c 22 74 79 |628717517722,"ty|
|00000080| 70 65 22 3a 22 45 56 45 4e 54 22 2c 22 6d 65 73 |pe":"EVENT","mes|
|00000090| 73 61 67 65 22 3a 22 46 49 52 45 5f 41 4e 44 5f |sage":"FIRE_AND_|
|000000a0| 46 4f 52 47 45 54 3a 20 27 74 68 69 73 20 69 73 |FORGET: 'this is|
|000000b0| 20 61 20 72 65 71 75 65 73 74 27 22 7d          | a request'"}   |
+--------+-------------------------------------------------+----------------+
{"source":"RSOCKET Server","destination":"BROADCAST","uuid":"4cfd0ff8-d6b9-4f62-b3fe-2173293c872e","createdAt":1628717517722,"type":"EVENT","message":"FIRE_AND_FORGET: 'this is a request'"}
2021-05-11 17:32:00.964 DEBUG 27033 --- [     parallel-1] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14
Data:

2021-05-11 17:32:00.966 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14
Data:
     




Service to Service RSocket communications

We have seen how we can communicate with the RSocketService using RSC. However, the real reason for this article is to demonstrate inter-service communication using RSocket. To this end, we will build a second service that will expose a conventional REST endpoint to trigger RSocket messaging to the RSocketService.


RSocket Client Service

The RSocketClientService is a simple Spring Boot application that exposes a set of REST endpoint methods. Each of these methods parallels a corresponding RSocket route. When the client REST method is invoked, it uses a TCP RSocket to communicate with the RSocketService.


Configuration

loading...


The client configuration is only slightly more complicated than the server. Here we include the REST port ( server.port) and the RSocketService host and port ( rsocket.server.host and rsocket.server.port, respectively).

Code

The most interesting aspect of the RSocketClientService is its controller. Message classes are duplicates of those found in the RSocketService and will not be included here.

Controller



loading...


In our controller's constructor, we @Autowire the RSocketRequester.builder. We will use this builder to create a TCP RSocket to the RSocketService's host and port. We then create REST mappings for each of the interaction mode methods. Within each interaction model method, we use the corresponding message builder to construct our outgoing payloads. We then invoke the configured RSocketRequester to transmit our message to the desired route.


Build the Jar

We can build the RSocketClientService using the same maven command we used before:

mvn clean package


Run the Jar

If everything compiled, we can run our new jar file with the following command:

java -jar ./target/rsocket-client-service-0.0.1-SNAPSHOT.jar


The console output should appear similar to the following:

Terminal
  ___  ___          _       _    ___ _ _         _   
 | _ \/ __| ___  __| |_____| |_ / __| (_)___ _ _| |_ 
 |   /\__ \/ _ \/ _| / / -_)  _| (__| | / -_) ' \  _|
 |_|_\|___/\___/\__|_\_\___|\__|\___|_|_\___|_||_\__|
 / __| ___ _ ___ _(_)__ ___                          
 \__ \/ -_) '_\ V / / _/ -_)                    
 |___/\___|_|  \_/|_\__\___|                         
                                                     

2021-05-11 11:10:02.906  INFO 12792 --- [           main] c.t.r.client.RSocketClientApplication    : Starting RSocketClientApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 12792 (/home/workspace/rsocket-client-service/target/rsocket-client-service-0.0.1-SNAPSHOT.jar started by workspace in /home/workspace/rsocket-client-service)
2021-05-11 11:10:02.914  INFO 12792 --- [           main] c.t.r.client.RSocketClientApplication    : No active profile set, falling back to default profiles: default
2021-05-11 11:10:05.312  INFO 12792 --- [           main] c.t.r.c.c.RSocketClientController        : remote rsocket.server.host:localhost=7000
2021-05-11 11:10:06.303  INFO 12792 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2021-05-11 11:10:06.334  INFO 12792 --- [           main] c.t.r.client.RSocketClientApplication    : Started RSocketClientApplication in 4.506 seconds (JVM running for 5.488)



Build the GraalVM Native Image

If you wish to build the GraalVM native image, we can use the same command as before:

mvn spring-boot:build-image


Run the container

If you wish to run the containerized version, the command is:

docker run --rm -p 8080:8080 rsocket-client-service:0.0.1-SNAPSHOT


Note: while you can run both services independently, you will need to handle the container networking manually. A better approach is to use Docker-Compose discussed here.


RSocket client service landing page

After starting both services, open up a browser and navigate to http://localhost:8080. You should be presented with the RSocket Client Service landing page.

RSocketClientService landing page.

You may run these methods from the browser or using curl. We will be using curl to call the REST methods for the remainder of the article.


Fire and forget

This call will cause the RSocketService to generate an outbound event for all active channel connections.

curl http://localhost:8080/fire-and-forget


The response will be:

Fire-and-Forget completed!


Request/Response

This call will send a request to the RSocketService's request-response route and return the original requestMessage and a responseMessage.

curl http://localhost:8080/request-response


The output should be similar to this:

{"source":"RSOCKET Server","destination":"RSocket-Client34d79a4c-d6b6-434d-ba31-292c413f69a8","uuid":"bb481c70-24c0-461f-a304-7b705a27af15","createdAt":1620766312011,"type":"RESPONSE","requestUuid":"cbe08f1e-2388-452e-aa50-a474878a0c55","requestMessage":"This is a request!","responseMessage":"RECEIVED : 'This is a request!'"}


Here we see the original requestMessage text was This is a request! and the responseMessage wraps the message with "RECEIVED:' '.


Request/stream

To invoke the request-stream route, we call the following url:

curl http://localhost:8080/request-stream


Your output should be similar to this:

Terminal
data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"5d411a92-ac65-4bdc-b0e2-e0ed9cd8758e","createdAt":1620766332745,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=CONNECTED)"}

data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"701bef65-de74-4f4d-ae41-f228355acb00","createdAt":1620766332933,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=CONNECTED)"}

data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"25e8157e-1728-4f89-a276-74e3d7581894","createdAt":1620766333121,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_STREAM, state=CONNECTED)"}



We get back a collection of type ConnectionLogEntry, representing each connection state transition since the service started.




Channel

With the following URL, we create a channel between the RSocketClientService and the RSocketService. When the connection is created, the RSocketService registers the outbound stream and will broadcast Fire-andForget events to it.

curl http://localhost:8080/channel


The service will respond with the following message:

data:{"source":null,"destination":null,"uuid":"bbd9af38-5c69-45c4-8095-36997c01e813","createdAt":1620766473121,"type":"EVENT","message":"Request #1"}


Note that curl doesn't exit since we still have a live connection. In a separate terminal window, issue another Fire-And-Forget call:

curl http://localhost:8080/fire-and-forget


In the original channel terminal output we will see a new event has arrived:

data:{"source":"RSOCKET Server","destination":"BROADCAST","uuid":"b3b4f3ea-b58d-4b8a-9f7e-ee01e8c39147","createdAt":1620766479786,"type":"EVENT","message":"FIRE_AND_FORGET: 'FIRE!!'"}


This connection will continue until you terminate curl or stop the RSocketClientService.


Running From Docker-compose

While we can run the individual services from their jar files, the preferred method is to run them in their native executable format, packaged within containers. If we run them individually using docker run both containers will start but be unable to communicate with each other. If you want to run the containerized versions, there is a Docker-Compose file to make things simple.

loading...


Here we declare our two services. We declare the rsocket network to allow the client service to talk directly to the server. We also set the rsocket.server.host and rsocket.server.port environment variables to override the default values.

To run the Docker-Compose file, we execute the following command:

docker-compose up -d


This will spin up the services in detached mode which runs the container processes in the background.

To shut it down we execute:

docker-compose down



Conclusions

We have covered a lot of ground in this article. Hopefully, you now have a better understanding of what RSocket is and how it can be used to supplement your REST services and enrich your service-to-service communication. While this article is quite long, it should only be considered an introduction to RSocket. For more information, please refer to the RSocket.io site.

Coming Up

With RSocket, we have introduced a new, reactive messaging communication protocol. Our next article, we will introduce NATS, a cloud-native messaging platform that can provide a central nervous system to your microservice architectures.

Resources