The Trouble with Rest
Microservice architectures rely on the ability of their component services to communicate with each other andthe client. For many, REST is the de-facto standard for communicating between clients and services.With REST, we get a standard, vendor-neutral approach for exposing a service's API over HTTP. Furthermore, by providing a clean separation between the client and the server, we can more easily support different client types.
From a development perspective, REST's broad adoption and support across languages and frameworks makes it an un-controversial choice with development teams.
However, the average user's expectations of how web applications should behave have changed considerably since Roy Fielding first conceived REST in 2000. In that world, the Request/Response model was how we interacted with servers. This model is at the very core of HTTP. Fielding built REST on top of the HTTP protocol to leverage HTTP's request methods. Unfortunately, if we limit ourselves to REST, we constrain ourselves to employ only the synchronous request/response interaction pattern.
If we look at every interaction between client and server or between services as a simple synchronous Request/Response pair, we may be falling victim to the same cognitive bias that Maslow describes. The reality is that REST's synchronous request/response model is only one of several common interaction models. If we broaden our perspective, we find can identify three more interaction patterns:
Fire and Forget
The Fire-and-Forget model is applicable when the caller sends a request but does not need a response. We may be tempted to derive this behavior with REST by having the server send an empty response. However, this approach accomplishes little as we are still required by HTTP to send a response, even if it is empty. In this case, we waste resources handling an unecessary (empty) response.Request/Stream
Another common interaction pattern is the Request/Stream model. In this pattern, we make a request to a service that asynchronously streams its responses as they become available. Here we see a fundamental limitation of REST become more apparent. We can only send one REST response. A common solution to circumvent this limitation is for the caller poll the service periodically. However, this creates an additional load on both the caller and the service in terms of implementation and runtime resource consumption. For each poll interval, the service must process every request, even when no new data is available. Additionally, the caller is starved of any new data until the end of the current poll interval.Channel
Finally, How do we model a conversation between two services?. The Channel interaction model contains two independent streams (inbound/outbound) through which two parties can communicate. While HTTP is built on top of the bi-directional Transmission Control Protocol, HTTP (and REST by extension) don't support bi-direction communications.In addition to limiting our interaction models to request/response, we must also acknowledge the performance impact from REST. While we can meet our availability targets by elastically scaling our services horizontally, this alone is a blunt instrument. If we inspect our service containers, we often find that they are running out of memory while the CPU is barely utilized.
This situation is due in large part to the thread-per-request model common when using HTTP. As the name implies, each request is assigned a specific thread to process the request and response. With the creation of a thread, there is a corresponding allocation of memory needed to support it while it is active. If the server receives too many requests and the threads are long-lived, the aggregate memory consumed can quickly exceed the available memory. Thus, we end up with services that spend most of their time idle, blocking on I/O.
What we need is a non-blocking i/o protocol that is more efficient with our services memory heap. A good candidate to solve this is RSocket
Introducing Rsocket
RSocket is a relatively new protocol created by Netflix to support Reactive Streams. It provides a message-driven, binary protocol that supplements the traditional Request/Response interaction pattern with several additional patterns. Because RSocket is reactive, it is able to manage the flow of data across an asynchronous boundary. This approachreduces the need for either side from buffering an arbitrary amount of data.Protocols
RSocket is a binary protocol used on byte-stream transports (e.g. TCP, WebSocket, and Aeron) . It is composed of frames and messages. Each frame contains a frame header, meta-data, and payload message containing a request, response or protocol processing. There are currently 16 types of frames:Type | Value | Description |
---|---|---|
RESERVED | 0x00 | Reserved |
SETUP | 0x01 | Setup: Sent by client to initiate protocol processing. |
LEASE | 0x02 | Lease: Sent by Responder to grant the ability to send requests. |
KEEPALIVE | 0x03 | Keepalive: Connection keepalive. |
REQUEST_RESPONSE | 0x04 | Request Response: Request single response. |
REQUEST_FNF | 0x05 | Fire And Forget: A single one-way message. |
REQUEST_STREAM | 0x06 | Request Stream: Request a completable stream. |
REQUEST_CHANNEL | 0x07 | Request Channel: Request a completable stream in both directions. |
REQUEST_N | 0x08 | Request N: Request N more items with Reactive Streams semantics. |
CANCEL | 0x09 | Cancel Request: Cancel outstanding request. |
PAYLOAD | 0x0A | Payload: Payload on a stream. For example, response to a request, or message on a channel. |
ERROR | 0x0B | Error: Error at connection or application level. |
METADATA_PUSH | 0x0C | Metadata: Asynchronous Metadata frame |
RESUME | 0x0D | Resume: Replaces SETUP for Resuming Operation (optional) |
RESUME_OK | 0x0E | Resume OK : Sent in response to a RESUME if resuming operation possible (optional) |
EXT | 0x01 | Extension Header: Used To Extend more frame types as well as extensions. |
In this article we will focus on the four interaction models:
Interaction models
Fire-and-Forget
Fire-and-Forget is used when we don't care about a response to our request.Future<Void> completionSignalOfSend = socketClient.fireAndForget(message);
Frame Contents
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-------------+-------------------------------+ |Frame Type |0|M|F| Flags | +-------------------------------+ Metadata & Request Data
Request/Response (single response)
While this model resembles the traditional request/response interaction, under the covers, the request doesn't block. By avoiding blocking operations, we reduce resource consumption (e.g., thread, memory, etc.) and improve scalability.Future<Payload> response = socketClient.requestResponse(requestPayload);
Frame Contents
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-------------+-------------------------------+ |Frame Type |0|M|F| Flags | +-------------------------------+ Metadata & Request Data
Request/Stream (multi-response)
Like request/response, the response may actually be composed of multiple responses streamed to the requester as they become available.Publisher<Payload> response = socketClient.requestStream(requestPayload);
Frame Contents
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-------------+-------------------------------+ |Frame Type |0|M|F| Flags | +-------------------------------+-------------------------------+ |0| Initial Request N | +---------------------------------------------------------------+ Metadata & Request Data
Channel
The last interaction model is the channel. Channels provide bi-directional streaming between the requester and the responder, effectively allowing either side of the connection to act in both roles.Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input);
Frame Contents
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-+-----------+-------------------------------+ |Frame Type |0|M|F|C| Flags | +-------------------------------+-------------------------------+ |0| Initial Request N | +---------------------------------------------------------------+ Metadata & Request Data
Flow Control
RSocket uses Reactive Stream semantics for flow control. The requester grants the responder credit for the number of PAYLOADs it can send. This is often referred to as request(n) where (n) is the number of PAYLOADs the responder may send. With each request(n), the total number of PAYLOADs increase. This mechanism provides application-level feedback of work units completed in contrast to TCP flow control which is focused on byte-level data transmission.With this type of flow control, we can avoid head of line blocking ,which can overwhelm both network and application buffers. With application-layer request(n) semantics, a consumer can indicate how much data it can accept on a stream and allows the responder to react accordingly
Service to Service with RSocket
Now that we understand the what and the why let's look at the how. In this article, we will be building two services to demonstrate how to implement RSocket communications. The RSocketService will provide an RSocket endpoint and perform some trivial processing. We will then build the RSocketClientService. It will, unsurprisingly, act as our client service, and connect to the RSocketService's RSocket port. Finally, we will add a simple REST endpoint to enable us to trigger the client.Implementing With Spring RSocket
We will implement both services using the Spring framework. Spring supports a reactive programming model using ProjectReactor and also provides us with the Spring RSocket library. Both services will be packaged as jar files, GraalVM native executables, and container images. A docker-compose file will be provided to run the two containers.RSocket Service
As mentioned previously, this service provides us with an RSocket endpoint. We will provide routes to demonstrate the four interaction models:- Fire and Forget
- Request/Reply
- Request/Stream
- Channel
Dependencies
loading...
Of note, in the dependency list, we find the spring-boot-starter-rsocket that provides most of the RSocket goodness. We also include lombok to enhance our message models. Lastly, we include reactor-test to control our reactive unit-test elements.
Spring AOT plugin
loading...
As we have seen in previous articles, the spring-aot-maven-plugin plugin provides us with GraalVM native executable generation and container packaging.
Configuration
loading...
A single configuration property ( spring.rsocket.server.port) sets the port the controller will listen for connections on.
RSocket Service Application
Our service application provides the minimal Spring boilerplate necessary to boot the application.loading...
RSocket Service Models
We will be using three message types in the service. The MessageType enumeration denotes the types:Message Type
loading...
Since each message shares common elements, each will inherit from AbstractMessage. This abstract class leverages a bit of the Lombok goodness to provide inheritable builder support with @SuperBuilder annotation. We get a default value mechanism supplied with @Builder.Default.
Abstract Message
loading...
Request
Here we add a single message variable to carry the request text.loading...
Response
Here we add the original requestMessage and a corresponding responseMessage.loading...
Event
An Event differs from a request in that it is neither a request nor a response to a request. In this message type, we include a message string to carry the event's text data.loading...
Connection Log Entry
When an RSocket connection state changes, we create a new ConnectionLogEntry instance. We store the connection type, and the transition state.loading...
RSocket Controller
The controller is the heart of the service. Here we define our RSocket routes and map them to our handler methods with the @MessageMapping annotation. Also included in this class is the registerRequester method, which logs our client connections state transitions.loading...
Testing the RSocket Service
Testing our RSocket code is simplified with the inclusion of the io.projectreactor/reactor-test dependency. This dependency provides us with the StepVerifier interface in our unit tests. StepVerfier gives us a declarative way of creating scripts for async Publisher sequences.loading...
In our unit tests, we call the static StepVerifier create method and then declare our assertion in the corresponding sequence.
Build the Jar
Before we can run the service, we must first build, test, and package it. We can do this all with the following command:
mvn clean package
Run the Jar
If everything compiled, and all the tests passed, your target directory will contain a jar file with the serviceand all its dependencies included. We can run it like any Java jar file:
java -jar ./target/rsocket-service-0.0.1-SNAPSHOT.jar
The console output should appear similar to the following:
Terminal
___ ___ _ _ ___ _ | _ \/ __| ___ __| |_____| |_/ __| ___ _ ___ _(_)__ ___ | /\__ \/ _ \/ _| / / -_) _\__ \/ -_) '_\ V / / _/ -_) |_|_\|___/\___/\__|_\_\___|\__|___/\___|_| \_/|_\__\___| 2021-05-11 11:12:35.892 INFO 13187 --- [ main] c.t.rsocket.RsocketServiceApplication : Starting RsocketServiceApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 13187 (/home/workspace/rsocket-service/target/rsocket-service-0.0.1-SNAPSHOT.jar started by workspace in /home//workspace/rsocket-service) 2021-05-11 11:12:35.899 INFO 13187 --- [ main] c.t.rsocket.RsocketServiceApplication : No active profile set, falling back to default profiles: default 2021-05-11 11:12:38.429 INFO 13187 --- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 7000 2021-05-11 11:12:38.460 INFO 13187 --- [ main] c.t.rsocket.RsocketServiceApplication : Started RsocketServiceApplication in 3.479 seconds (JVM running for 4.495) 2021-05-11 11:12:38.462 INFO 13187 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT 2021-05-11 11:12:38.465 INFO 13187 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
Build the GraalVM Native Image
Since Spring supports the generation of GraalVM native executables packaged in a container, let's build that too! Note: the GraalVM native image generation takes considerably longer than the basic build. It also requires more computing resources (CPU and Memory) so be prepared to wait.
mvn spring-boot:build-image
Run the container
Now we can run the GraalVM native executable service in its own container.
docker run --rm -p 7000:7000 rsocket-service:0.0.1-SNAPSHOT
Invoking the service with RSC
Now that we have a working RSocket service, it would be great if we could invoke it using curl. Unfortunately, curl doesn't work with RSockets. Fortunately, there is an open-source RSocket CLI tool called RSC that gives us similar powers.The RSC project provides a feature-rich CLI for RSockets. The tool is written in Java and can be downloaded as a JAR file or installed using Homebrew, Scoop or Coursier.
Once installed, we can test it with:
rsc --version
If it was installed correctly, you should see output similar to this:
"version": "0.9.1", "build": "2021-06-28T02:57:04Z", "rsocket-java": "1.1.1"}
We can get a listing of the CLI flags using:
rsc --help
Terminal
usage: rsc [options] uri Non-option arguments: [String: uri] Option Description ------ ----------- --ab, --authBearer [String] Enable Authentication Metadata Extension (Bearer). --authBasic [String] [DEPRECATED] Enable Authentication Metadata Extension (Basic). This Metadata exists only for the backward compatibility with Spring Security 5.2 --channel Shortcut of --im REQUEST_CHANNEL --completion [ShellType] Output shell completion code for the specified shell (bash, zsh, fish, powershell) -d, --data [String] Data. Use '-' to read data from standard input. (default: ) --dataMimeType, --dmt [String] MimeType for data (default: application/json) --debug Enable FrameLogger --delayElements [Long] Enable delayElements(delay) in milli seconds --dumpOpts Dump options as a file that can be loaded by --optsFile option --fnf Shortcut of --im FIRE_AND_FORGET -h, --help [String] Print help --im, --interactionModel InteractionModel (default: [InteractionModel] REQUEST_RESPONSE) -l, --load [String] Load a file as Data. (e.g. ./foo.txt, /tmp/foo.txt, https://example.com) --limitRate [Integer] Enable limitRate(rate) --log [String] Enable log() -m, --metadata [String] Metadata (default: ) --metadataMimeType, --mmt [String] MimeType for metadata (default: application/json) --optsFile [String] Configure options from a YAML file (e. g. ./opts.yaml, /tmp/opts.yaml, https://example.com/opts.yaml) --printB3 Print B3 propagation info. Ignored unless --trace is set. -q, --quiet Disable the output on next -r, --route [String] Enable Routing Metadata Extension --request Shortcut of --im REQUEST_RESPONSE --resume [Integer] Enable resume. Resume session duration can be configured in seconds. --retry [Integer] Enable retry. Retry every 1 second with the given max attempts. --sd, --setupData [String] Data for Setup payload --setupMetadata, --sm [String] Metadata for Setup payload --setupMetadataMimeType, --smmt Metadata MimeType for Setup payload [String] (default: application/json) --showSystemProperties Show SystemProperties for troubleshoot --stacktrace Show Stacktrace when an exception happens --stream Shortcut of --im REQUEST_STREAM --take [Integer] Enable take(n) --trace [TracingMetadataCodec$Flags] Enable Tracing (Zipkin) Metadata Extension. Unless sampling state (UNDECIDED, NOT_SAMPLE, SAMPLE, DEBUG) is specified, DEBUG is used if no state is specified. --trustCert [String] PEM file for a trusted certificate. (e. g. ./foo.crt, /tmp/foo.crt, https: //example.com/foo.crt) -u, --as, --authSimple [String] Enable Authentication Metadata Extension (Simple). The format must be 'username:password'. -v, --version Print version -w, --wiretap Enable wiretap --wsHeader, --wsh [String] Header for web socket connection --zipkinUrl [String] Zipkin URL to send a span (e.g. http: //localhost:9411). Ignored unless -- trace is set.
Fire and Forget
Now that we have verified RSC is working correctly, let's test our Fire and Forget route. We can do that with the following command:
rsc --debug --fnf --data='{"source":"rsc","message":"this is a request"}' --route fire-and-forget tcp://localhost:7000
flags
- --debug enable the framelogger output.
- --fnf shortcut of --im FIRE_AND_FORGET.
- --data provides the data payload.
- --route enable Routing Metadata Extension.
Your output should be similar to this:
Terminal
2021-05-11 17:18:57.212 DEBUG 26084 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 75 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo| |00000010| 72 67 65 74 |rget | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",| |00000010| 22 6d 65 73 73 61 67 65 22 3a 22 74 68 69 73 20 |"message":"this | |00000020| 69 73 20 61 20 72 65 71 75 65 73 74 22 7d |is a request"} | +--------+-------------------------------------------------+----------------+
Here we can see the two frames that make up our Fire and Forget message.
Request and Response
The next interaction model we will test is Request/Response:
rsc --debug --request --data='{"source":"rsc","message":"hello"}' --route request-response tcp://localhost:7000
flags
- -debug enable the framelogger output.
- -request shortcut of --im REQUEST_RESPONSE .
- -data provides the data payload.
- --route enable Routing Metadata Extension.
Terminal
2021-05-11 17:26:25.699 DEBUG 26722 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 64 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res| |00000010| 70 6f 6e 73 65 |ponse | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",| |00000010| 22 6d 65 73 73 61 67 65 22 3a 22 68 65 6c 6c 6f |"message":"hello| |00000020| 22 7d |"} | +--------+-------------------------------------------------+----------------+ 2021-05-11 17:26:25.802 DEBUG 26722 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 260 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 31 37 32 63 37 61 30 64 2d |uuid":"172c7a0d-| |00000040| 33 31 65 36 2d 34 34 33 32 2d 62 66 35 65 2d 61 |31e6-4432-bf5e-a| |00000050| 66 30 64 37 35 32 32 65 34 30 32 22 2c 22 63 72 |f0d7522e402","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 31 38 35 37 32 37 2c 22 74 79 70 65 22 3a 22 52 |473121,"type":"R| |00000080| 45 53 50 4f 4e 53 45 22 2c 22 72 65 71 75 65 73 |ESPONSE","reques| |00000090| 74 55 75 69 64 22 3a 22 37 39 63 31 37 31 61 62 |tUuid":"79c171ab| |000000a0| 2d 30 34 66 31 2d 34 30 39 30 2d 62 35 63 39 2d |-04f1-4090-b5c9-| |000000b0| 35 39 33 32 39 33 39 35 35 65 30 63 22 2c 22 72 |593293955e0c","r| |000000c0| 65 71 75 65 73 74 4d 65 73 73 61 67 65 22 3a 22 |equestMessage":"| |000000d0| 68 65 6c 6c 6f 22 2c 22 72 65 73 70 6f 6e 73 65 |hello","response| |000000e0| 4d 65 73 73 61 67 65 22 3a 22 52 45 43 45 49 56 |Message":"RECEIV| |000000f0| 45 44 20 3a 20 27 68 65 6c 6c 6f 27 22 7d |ED : 'hello'"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"172c7a0d-31e6-4432-bf5e-af0d7522e402","createdAt":1620763473121,"type":"RESPONSE","requestUuid":"79c171ab-04f1-4090-b5c9-593293955e0c","requestMessage":"hello","responseMessage":"RECEIVED : 'hello'"}
Here we see three frames. The first two setup our request payload, and the third contains the response payload.
Request Stream
To exercise the Request/Stream interaction model, we use the following command:
rsc --debug --stream --data='{"source":"rsc","message" : "Get Events"}' --route request-stream tcp://localhost:7000
flags
- --debug enable the framelogger output.
- --stream shortcut of --im REQUEST_STREAM.
- --data provides the data payload.
- --route enable Routing Metadata Extension.
Terminal
2021-05-11 17:27:17.079 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75 Data: 2021-05-11 17:27:17.162 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 73 InitialRequestN: 9223372036854775807 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 0f 0e 72 65 71 75 65 73 74 2d 73 74 72 |.....request-str| |00000010| 65 61 6d |eam | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",| |00000010| 22 6d 65 73 73 61 67 65 22 20 3a 20 22 47 65 74 |"message" : "Get| |00000020| 20 45 76 65 6e 74 73 22 7d | Events"} | +--------+-------------------------------------------------+----------------+ 2021-08-11 17:27:17.211 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 223 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 36 30 62 61 66 30 35 65 2d |uuid":"60baf05e-| |00000040| 30 32 61 62 2d 34 31 37 34 2d 61 63 62 34 2d 35 |02ab-4174-acb4-5| |00000050| 61 36 30 62 64 61 35 63 38 30 32 22 2c 22 63 72 |a60bda5c802","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |483121,"type":"E| |00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":| |00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn| |000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,| |000000b0| 20 74 79 70 65 3d 46 49 52 45 5f 41 4e 44 5f 46 | type=FIRE_AND_F| |000000c0| 4f 52 47 45 54 2c 20 73 74 61 74 65 3d 43 4f 4e |ORGET, state=CON| |000000d0| 4e 45 43 54 45 44 29 22 7d |NECTED)"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"60baf05e-02ab-4174-acb4-5a60bda5c802","createdAt":1620763483121,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=CONNECTED)"} 2021-05-11 17:27:17.217 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 226 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 61 37 35 62 37 64 37 39 2d |uuid":"a75b7d79-| |00000040| 33 33 62 66 2d 34 36 61 38 2d 39 34 36 38 2d 66 |33bf-46a8-9468-f| |00000050| 30 35 30 63 37 31 31 65 62 63 32 22 2c 22 63 72 |050c711ebc2","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |489621,"type":"E| |00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":| |00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn| |000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,| |000000b0| 20 74 79 70 65 3d 46 49 52 45 5f 41 4e 44 5f 46 | type=FIRE_AND_F| |000000c0| 4f 52 47 45 54 2c 20 73 74 61 74 65 3d 44 49 53 |ORGET, state=DIS| |000000d0| 43 4f 4e 4e 45 43 54 45 44 29 22 7d |CONNECTED)"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"a75b7d79-33bf-46a8-9468-f050c711ebc2","createdAt":1620763489621,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=DISCONNECTED)"} 2021-05-11 17:27:17.220 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 224 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 31 64 36 35 62 31 38 35 2d |uuid":"1d65b185-| |00000040| 66 39 65 66 2d 34 33 38 65 2d 61 61 39 64 2d 36 |f9ef-438e-aa9d-6| |00000050| 32 30 34 39 66 38 61 63 30 39 31 22 2c 22 63 72 |2049f8ac091","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |491211,"type":"E| |00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":| |00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn| |000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,| |000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 52 45 | type=REQUEST_RE| |000000c0| 53 50 4f 4e 53 45 2c 20 73 74 61 74 65 3d 43 4f |SPONSE, state=CO| |000000d0| 4e 4e 45 43 54 45 44 29 22 7d |NNECTED)"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"1d65b185-f9ef-438e-aa9d-62049f8ac091","createdAt":1620763491211,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=CONNECTED)"} 2021-05-11 17:27:17.223 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 227 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 39 38 33 34 31 31 38 38 2d |uuid":"98341188-| |00000040| 31 63 36 64 2d 34 32 30 39 2d 62 62 63 36 2d 62 |1c6d-4209-bbc6-b| |00000050| 32 35 38 37 38 63 37 65 32 63 62 22 2c 22 63 72 |25878c7e2cb","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |511024,"type":"E| |00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":| |00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn| |000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,| |000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 52 45 | type=REQUEST_RE| |000000c0| 53 50 4f 4e 53 45 2c 20 73 74 61 74 65 3d 44 49 |SPONSE, state=DI| |000000d0| 53 43 4f 4e 4e 45 43 54 45 44 29 22 7d |SCONNECTED)"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"98341188-1c6d-4209-bbc6-b25878c7e2cb","createdAt":1620763511024,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=DISCONNECTED)"} 2021-05-11 17:27:17.227 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 222 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 72 73 63 22 2c 22 |ination":"rsc","| |00000030| 75 75 69 64 22 3a 22 38 36 32 35 33 66 62 65 2d |uuid":"86253fbe-| |00000040| 30 61 36 65 2d 34 64 30 32 2d 62 31 35 63 2d 65 |0a6e-4d02-b15c-e| |00000050| 34 65 31 61 62 31 61 37 35 64 62 22 2c 22 63 72 |4e1ab1a75db","cr| |00000060| 65 61 74 65 64 41 74 22 3a 31 36 32 38 37 31 37 |eatedAt":1620763| |00000070| 32 33 37 31 39 32 2c 22 74 79 70 65 22 3a 22 45 |512310,"type":"E| |00000080| 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 65 22 3a |VENT","message":| |00000090| 22 43 6f 6e 6e 65 63 74 69 6f 6e 4c 6f 67 45 6e |"ConnectionLogEn| |000000a0| 74 72 79 28 74 69 6d 65 73 74 61 6d 70 3d 30 2c |try(timestamp=0,| |000000b0| 20 74 79 70 65 3d 52 45 51 55 45 53 54 5f 53 54 | type=REQUEST_ST| |000000c0| 52 45 41 4d 2c 20 73 74 61 74 65 3d 43 4f 4e 4e |REAM, state=CONN| |000000d0| 45 43 54 45 44 29 22 7d |ECTED)"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"rsc","uuid":"86253fbe-0a6e-4d02-b15c-e4e1ab1a75db","createdAt":1620763512310,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_STREAM, state=CONNECTED)"} 2021-05-11 17:27:17.228 DEBUG 26821 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6 Data:
The result of the command is seven frames. The first two handle setup and request payload, while the last five frames contain payloads streamed from the service.
Channel
Channel is the last of our interaction models. We initiate the channel with the following command:
rsc --debug --channel --data='{"source":"rsc","message":"Request #1"}' --route channel tcp://localhost:7000
flags
- --debug enable the framelogger output.
- --channel shortcut of --im REQUEST_CHANNEL .
- --data provides the data payload.
- --route enable Routing Metadata Extension.
Terminal
2021-05-11 17:29:20.892 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75 Data: 2021-05-11 17:29:20.989 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 64 InitialRequestN: 9223372036854775807 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 08 07 63 68 61 6e 6e 65 6c |.....channel | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",| |00000010| 22 6d 65 73 73 61 67 65 22 3a 22 52 65 71 75 65 |"message":"Reque| |00000020| 73 74 20 23 31 22 7d |st #1"} | +--------+-------------------------------------------------+----------------+ 2021-05-11 17:29:20.990 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6 Data: 2021-05-11 17:29:21.069 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 150 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 6e 75 6c 6c 2c 22 |{"source":null,"| |00000010| 64 65 73 74 69 6e 61 74 69 6f 6e 22 3a 6e 75 6c |destination":nul| |00000020| 6c 2c 22 75 75 69 64 22 3a 22 30 65 64 62 37 30 |l,"uuid":"0edb70| |00000030| 34 32 2d 61 32 66 30 2d 34 32 61 36 2d 38 38 32 |42-a2f0-42a6-882| |00000040| 66 2d 62 31 35 34 62 65 64 61 61 64 33 39 22 2c |f-b154bedaad39",| |00000050| 22 63 72 65 61 74 65 64 41 74 22 3a 31 36 32 38 |"createdAt":1628| |00000060| 37 31 37 33 36 31 30 33 37 2c 22 74 79 70 65 22 |717361037,"type"| |00000070| 3a 22 45 56 45 4e 54 22 2c 22 6d 65 73 73 61 67 |:"EVENT","messag| |00000080| 65 22 3a 22 52 65 71 75 65 73 74 20 23 31 22 7d |e":"Request #1"}| +--------+-------------------------------------------------+----------------+ {"source":null,"destination":null,"uuid":"0edb7042-a2f0-42a6-882f-b154bedaad39","createdAt":1628717361037,"type":"EVENT","message":"Request #1"} 2021-05-11 17:29:21.076 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807 Data: 2021-05-11 17:29:40.978 DEBUG 27033 --- [ parallel-1] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14 Data: 2021-08-11 17:29:40.982 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14 Data:
Here we see the first two frames setup our outgoing stream and its first payload. The next frame contains our first response from the server. We then receive three more message frames. The first is a REQUEST_N indicating the service is asking for more work. The last two are KEEPALIVE frames.
We can test the service's outbound stream by calling the Fire and Forget again. The service receives this message and will notify any outbound streams on its arrival.
Terminal
2021-05-11 17:31:57.630 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75 Data: 2021-05-11 17:31:57.697 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 75 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo| |00000010| 72 67 65 74 |rget | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 72 73 63 22 2c |{"source":"rsc",| |00000010| 22 6d 65 73 73 61 67 65 22 3a 22 74 68 69 73 20 |"message":"this | |00000020| 69 73 20 61 20 72 65 71 75 65 73 74 22 7d |is a request"} | +--------+-------------------------------------------------+----------------+ 2021-05-11 17:31:57.737 DEBUG 27446 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6 Data:
The channel connection will then receive the new event, and it will be echoed in the console.
Terminal
2021-05-11 17:31:41.014 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14 Data: 2021-05-11 17:31:57.727 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 195 Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 73 6f 75 72 63 65 22 3a 22 52 53 4f 43 4b |{"source":"RSOCK| |00000010| 45 54 20 53 65 72 76 65 72 22 2c 22 64 65 73 74 |ET Server","dest| |00000020| 69 6e 61 74 69 6f 6e 22 3a 22 42 52 4f 41 44 43 |ination":"BROADC| |00000030| 41 53 54 22 2c 22 75 75 69 64 22 3a 22 34 63 66 |AST","uuid":"4cf| |00000040| 64 30 66 66 38 2d 64 36 62 39 2d 34 66 36 32 2d |d0ff8-d6b9-4f62-| |00000050| 62 33 66 65 2d 32 31 37 33 32 39 33 63 38 37 32 |b3fe-2173293c872| |00000060| 65 22 2c 22 63 72 65 61 74 65 64 41 74 22 3a 31 |e","createdAt":1| |00000070| 36 32 38 37 31 37 35 31 37 37 32 32 2c 22 74 79 |628717517722,"ty| |00000080| 70 65 22 3a 22 45 56 45 4e 54 22 2c 22 6d 65 73 |pe":"EVENT","mes| |00000090| 73 61 67 65 22 3a 22 46 49 52 45 5f 41 4e 44 5f |sage":"FIRE_AND_| |000000a0| 46 4f 52 47 45 54 3a 20 27 74 68 69 73 20 69 73 |FORGET: 'this is| |000000b0| 20 61 20 72 65 71 75 65 73 74 27 22 7d | a request'"} | +--------+-------------------------------------------------+----------------+ {"source":"RSOCKET Server","destination":"BROADCAST","uuid":"4cfd0ff8-d6b9-4f62-b3fe-2173293c872e","createdAt":1628717517722,"type":"EVENT","message":"FIRE_AND_FORGET: 'this is a request'"} 2021-05-11 17:32:00.964 DEBUG 27033 --- [ parallel-1] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14 Data: 2021-05-11 17:32:00.966 DEBUG 27033 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger : receiving -> Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b0 Length: 14 Data:
Service to Service RSocket communications
We have seen how we can communicate with the RSocketService using RSC. However, the real reason for this article is to demonstrate inter-service communication using RSocket. To this end, we will build a second service that will expose a conventional REST endpoint to trigger RSocket messaging to the RSocketService.RSocket Client Service
The RSocketClientService is a simple Spring Boot application that exposes a set of REST endpoint methods. Each of these methods parallels a corresponding RSocket route. When the client REST method is invoked, it uses a TCP RSocket to communicate with the RSocketService.Configuration
loading...
The client configuration is only slightly more complicated than the server. Here we include the REST port ( server.port) and the RSocketService host and port ( rsocket.server.host and rsocket.server.port, respectively).
Code
The most interesting aspect of the RSocketClientService is its controller. Message classes are duplicates of those found in the RSocketService and will not be included here.Controller
loading...
In our controller's constructor, we @Autowire the RSocketRequester.builder. We will use this builder to create a TCP RSocket to the RSocketService's host and port. We then create REST mappings for each of the interaction mode methods. Within each interaction model method, we use the corresponding message builder to construct our outgoing payloads. We then invoke the configured RSocketRequester to transmit our message to the desired route.
Build the Jar
We can build the RSocketClientService using the same maven command we used before:
mvn clean package
Run the Jar
If everything compiled, we can run our new jar file with the following command:
java -jar ./target/rsocket-client-service-0.0.1-SNAPSHOT.jar
The console output should appear similar to the following:
Terminal
___ ___ _ _ ___ _ _ _ | _ \/ __| ___ __| |_____| |_ / __| (_)___ _ _| |_ | /\__ \/ _ \/ _| / / -_) _| (__| | / -_) ' \ _| |_|_\|___/\___/\__|_\_\___|\__|\___|_|_\___|_||_\__| / __| ___ _ ___ _(_)__ ___ \__ \/ -_) '_\ V / / _/ -_) |___/\___|_| \_/|_\__\___| 2021-05-11 11:10:02.906 INFO 12792 --- [ main] c.t.r.client.RSocketClientApplication : Starting RSocketClientApplication v0.0.1-SNAPSHOT using Java 11.0.11 on ubuntu-vm with PID 12792 (/home/workspace/rsocket-client-service/target/rsocket-client-service-0.0.1-SNAPSHOT.jar started by workspace in /home/workspace/rsocket-client-service) 2021-05-11 11:10:02.914 INFO 12792 --- [ main] c.t.r.client.RSocketClientApplication : No active profile set, falling back to default profiles: default 2021-05-11 11:10:05.312 INFO 12792 --- [ main] c.t.r.c.c.RSocketClientController : remote rsocket.server.host:localhost=7000 2021-05-11 11:10:06.303 INFO 12792 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080 2021-05-11 11:10:06.334 INFO 12792 --- [ main] c.t.r.client.RSocketClientApplication : Started RSocketClientApplication in 4.506 seconds (JVM running for 5.488)
Build the GraalVM Native Image
If you wish to build the GraalVM native image, we can use the same command as before:
mvn spring-boot:build-image
Run the container
If you wish to run the containerized version, the command is:
docker run --rm -p 8080:8080 rsocket-client-service:0.0.1-SNAPSHOT
Note: while you can run both services independently, you will need to handle the container networking manually. A better approach is to use Docker-Compose discussed here.
RSocket client service landing page
After starting both services, open up a browser and navigate to http://localhost:8080. You should be presented with the RSocket Client Service landing page.You may run these methods from the browser or using curl. We will be using curl to call the REST methods for the remainder of the article.
Fire and forget
This call will cause the RSocketService to generate an outbound event for all active channel connections.
curl http://localhost:8080/fire-and-forget
The response will be:
Fire-and-Forget completed!
Request/Response
This call will send a request to the RSocketService's request-response route and return the original requestMessage and a responseMessage.
curl http://localhost:8080/request-response
The output should be similar to this:
{"source":"RSOCKET Server","destination":"RSocket-Client34d79a4c-d6b6-434d-ba31-292c413f69a8","uuid":"bb481c70-24c0-461f-a304-7b705a27af15","createdAt":1620766312011,"type":"RESPONSE","requestUuid":"cbe08f1e-2388-452e-aa50-a474878a0c55","requestMessage":"This is a request!","responseMessage":"RECEIVED : 'This is a request!'"}
Here we see the original requestMessage text was This is a request! and the responseMessage wraps the message with "RECEIVED:' '.
Request/stream
To invoke the request-stream route, we call the following url:
curl http://localhost:8080/request-stream
Your output should be similar to this:
Terminal
data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"5d411a92-ac65-4bdc-b0e2-e0ed9cd8758e","createdAt":1620766332745,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=FIRE_AND_FORGET, state=CONNECTED)"} data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"701bef65-de74-4f4d-ae41-f228355acb00","createdAt":1620766332933,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_RESPONSE, state=CONNECTED)"} data:{"source":"RSOCKET Server","destination":"RSocket-Client60469425-0c92-4b61-b608-fac34b3538c6","uuid":"25e8157e-1728-4f89-a276-74e3d7581894","createdAt":1620766333121,"type":"EVENT","message":"ConnectionLogEntry(timestamp=0, type=REQUEST_STREAM, state=CONNECTED)"}
We get back a collection of type ConnectionLogEntry, representing each connection state transition since the service started.
Channel
With the following URL, we create a channel between the RSocketClientService and the RSocketService. When the connection is created, the RSocketService registers the outbound stream and will broadcast Fire-andForget events to it.
curl http://localhost:8080/channel
The service will respond with the following message:
data:{"source":null,"destination":null,"uuid":"bbd9af38-5c69-45c4-8095-36997c01e813","createdAt":1620766473121,"type":"EVENT","message":"Request #1"}
Note that curl doesn't exit since we still have a live connection. In a separate terminal window, issue another Fire-And-Forget call:
curl http://localhost:8080/fire-and-forget
In the original channel terminal output we will see a new event has arrived:
data:{"source":"RSOCKET Server","destination":"BROADCAST","uuid":"b3b4f3ea-b58d-4b8a-9f7e-ee01e8c39147","createdAt":1620766479786,"type":"EVENT","message":"FIRE_AND_FORGET: 'FIRE!!'"}
This connection will continue until you terminate curl or stop the RSocketClientService.
Running From Docker-compose
While we can run the individual services from their jar files, the preferred method is to run them in their native executable format, packaged within containers. If we run them individually using docker run both containers will start but be unable to communicate with each other. If you want to run the containerized versions, there is a Docker-Compose file to make things simple.loading...
Here we declare our two services. We declare the rsocket network to allow the client service to talk directly to the server. We also set the rsocket.server.host and rsocket.server.port environment variables to override the default values.
To run the Docker-Compose file, we execute the following command:
docker-compose up -d
This will spin up the services in detached mode which runs the container processes in the background.
To shut it down we execute:
docker-compose down
Conclusions
We have covered a lot of ground in this article. Hopefully, you now have a better understanding of what RSocket is and how it can be used to supplement your REST services and enrich your service-to-service communication. While this article is quite long, it should only be considered an introduction to RSocket. For more information, please refer to the RSocket.io site.Coming Up
With RSocket, we have introduced a new, reactive messaging communication protocol. Our next article, we will introduce NATS, a cloud-native messaging platform that can provide a central nervous system to your microservice architectures.Resources
- RSocket website.
- RSocket-Java Github repo.
- Spring Rsocket.
- RSocketService Github repo.
- RSocketClientService Github repo.
Twitter
Facebook
Reddit
LinkedIn
Email