Reactive programming with Project Reactor

Archit Agarwal
5 min readMay 13, 2021

--

Reactive Programming

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

https://en.wikipedia.org/wiki/Reactive_programming

Features of Reactive programming

  1. Asynchronous and Non blocking.
  2. Data flow as Event/Message Driven stream
  3. Functional Style Code
  4. Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high.
  5. Nothing happens until you subscribe.

Advantages of Reactive Programming

  1. Cleaner code, more concise
  2. Easier to read (once you get the hang of it)
  3. Easier to scale (pipe any operation)
  4. Better error handling
  5. Event-driven inspired
  6. Backpressure (client can control flow)

Reactive Stream Specification

  • Publisher: A source of data (from 0 to N signals where N can be unlimited). It optionally provides for 2 terminal events: error and completion.
  • Subscriber: A consumer of a data sequence (from 0 to N signals where N can be unlimited). It receives a subscription on initialisation to request how much data it wants to process next. The other callbacks interact with the data sequence signals: next (new message) and the optional completion/error.
  • Subscription: A small tracker passed on initialisation to the Subscriber. It controls how much data we are ready to consume and when do we want to stop consuming (cancel).
  • Processor: A marker for components that are both Subscriber and Publisher!
Publish-Subscribe Model

Reactor Core Features

Flux — 0 to N elements

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error

Flux<String> stringFlux = Flux.just("Spring", "Spring Boot", "Reactive Spring").log(); stringFlux
.subscribe(System.out::println,
(e) -> System.err.println("Exception is " + e)
, () -> System.out.println("Completed"));
Output of the above code

Explanation:

So in Flux until you subscribe nothing will happen, once you subscribe, publisher(Flux) will start sending the data, by default subscriber requests for unbounded number. Then onNext() publisher will start sending the data.Once data publish is done onComplete signal will be sent and subscription will be over.

Note: I have added log() in the flux to show what happens internally. We shouldn’t use log() in production systems.

Error Flow in Flux

Flux<String> stringFlux = Flux.just("A", "B", "C").log()
.concatWith(Flux.error(new RuntimeException("Exception Occurred")))
.concatWith(Flux.just("D"))
.onErrorResume((e) -> { // this block gets executed
System.out.println("Exception is : " + e);
return Flux.just("default", "default1");
});


stringFlux.subscribe(System.out::println);

Explanation:

Here also there is a subscriber and a publisher(Flux) and after printing A, B and C there is an exception, so publisher will send onComplete signal and after that onErrorResume will be executed and it will print Exception occurred and then finally default and default1 will be printed.

Note: Project reactor has really good error handling methods.

Mono — 0 to 1 element

A Mono<T> is a specialised Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

Mono.just(1).log()
.map(integer -> "foo" + integer)
.subscribe(System.out::println);

Explanation:

Mono emits only a single event, in this case it will emit 1 and then it will signal onComplete signal.

Backpressure

Publisher produces the stream of data and a Subscriber that consumes data. If the rate at which a Consumer consumes data is less than the rate at which a Producer produces data (referred to as a Fast Producer/Slow Consumer), then signals from the consumer can constrain the rate of production. In this consumer will never be able to catch up.

To prevent this issue project reactor has backpressure enabled where power is there in the hands of subscriber. Subscriber can request how many it can process so producer and consumer are in sync.

Backpressure
Flux<Integer> finiteFlux = Flux.range(1, 10)
.log();

finiteFlux.subscribe((element) -> System.out.println("Element is : " + element)
, (e) -> System.err.println("Exception is : " + e)
, () -> System.out.println("Done")
, (subscription -> subscription.request(2)));
Output

Explanation:

In the above examples by default subscriber is request(unbounded) but subscriber has the option to request a particular number also. In this case subscriber is requesting 2 elements out of 10. Because publisher still has data to publish it didn’t send the onComplete Signal to Subscriber.

Conclusion

So this is the basic of reactive programming and project reactor capabilities.There are multiple reactive toolkits available, project reactor is one of them. Though it takes time to grasp reactive programming but it is worth the features it comes with and its capabilities. Project reactor handles much of the complexity of timeouts, threading, backpressure, and so forth, freeing us to think about the bigger picture of how signals flow through our systems.For more details you can follow the documentation:https://projectreactor.io/docs/core/release/reference/#getting-started

In the next blog we will see how project reactor makes multithreading so easy.If you want to know about Akka which is another toolkit for reactive programming, you can read here.

--

--