What is WebFlux?

WebFlux is a part of Springs asynchronous web framework that provides non-blocking, reactive programming support for web APIs. WebFlux utilises a library Reactor for its reactive support and provides two main types of response objects: Flux and Mono.

Flux being used to represent a stream of data containing 0 to N elements whilst Mono is simply used to represent a stream of data containing 0 to 1 elements.

It is important to note that both Flux and Mono extend the Publisher class which “is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).[1]

What are some of the features of Spring WebFlux?

WebFlux is ideally used when we want to handle concurrency with a small amount of threads and for the streaming of data. Additionally Router Functions have been included which act as an alternative to the @RequestMapping annotation, It’s purpose being used for routing incoming requests to handler functions.

WebCient is a reactive web client, and replacement for RestTemplate, used to perform web requests to controllers that user reactive streams.

How to set up WebFlux in a Spring project

Go to Spring Initializr and import the two dependencies: Spring Reactive Web and Spring Data Reactive MongoDB.

Spring Initializr Screenshot

For installing MongoDB:

_$brew tap mongodb/brew_

_$brew install mongodb-community_

_$brew services start mongodb-community_

StudentRepository.kt

interface StudentRepository: ReactiveCrudRepository<Student, String>

StudentRepository extends ReactiveCrudRepository which allows us to perform CRUD operations on reactive database streams. One crucial difference between ReactiveCrud & CrudRepository is when we perform saves on an entity. The entity won’t be saved until we subscribe to it, as functions in ReactiveCrudRepository return Mono publishers. We must either perform .block() or awaitFirst() when calling CRUD functions.

To make the function findAllStudents reactive we can call the findAll() endpoint provided by the repository:

StudentController.kt

@RestController
@RequestMapping("/students")
class StudentController(private val studentRepository: StudentRepository) {

   @GetMapping(produces = ["text/event-stream"])
   fun findAllStudents(): Flux<Student> {
       return studentRepository.findAll()
   }
}

Note that, if we utilised the WebFlux Router Functions alternative instead of the annotated controllers then our implementation would be different as it allows further customisation however functionality remains the same.

The @GetMapping(produces = ["text/event-stream"]) attribute ensures that the connection between client and server remains open, and that the client will be informed of any changes to the Student data on the server side:

WebFlux diagram

[2]

If you need to stream 0.. 1 elements, then use Mono. In the example below, addStudents is used to save a Student to the database. The API uses a mono publisher to return a singe instance of the Student entity.

@PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
fun addStudents(@Valid @RequestBody student: Student): Mono<Student> {
    return studentRepository.save(student)
}

Using Coroutines as an alternative

An alternative to WebFlux is to use coroutines to write non-blocking code. Coroutines is another way of performing concurrent threads in an efficient way that allows us to manage threads effectively. But why might we want to go down this route? Simple, the developer can write code sequentially without callbacks and as a result can make combining/merging of streams much easier. The table below shows the coroutines equivaluent of some WebFlux functions:

WebFlux/Coroutine Comparison

For input parameters:

If laziness is not needed then fun handler(mono: Mono<T>) becomes fun handler(value: T) since suspending functions can be invoked to get the value parameter. If laziness is needed, fun handler(mono: Mono<T>) becomes fun handler(supplier: suspend () → T). [3] Flow is an asynchronous data stream that is not reactive, however it can be safely interopted with reactive streams using asPublisher and asFlow from kotlinx-coroutines-reactive module.

One noticeable difference is the use of kotlinx coroutines Flow; Flow is push-based, where events are pushed to subscribers as they come in whilst Flux is push-pull hybrid, meaning events can be pushed to subscribers but it has to respect the backpressure signaled by the consumer.

To enable coroutines in Kotlin we need to add the dependencies:

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.2")
}

How does the above code translate to Coroutines?

For our getter, collecting operations are suspending functions which initiate the collection of the flow. The flow being “a cold asynchronous data stream that sequentially emits values and completes normally or with an exception.[4]
We must mark functions with the suspend modifier if the code may explicitly suspend.

@GetMapping
suspend fun fetchAllStudents(): Flow<Student> {
    return studentRepository.findAll().asFlow()
}

The same also applies to our post endpoint, however, we must awaitFirst which will return the Student instance as soon as it’s available.

@PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
suspend fun addStudents(@Valid @RequestBody student: Student): Student {
    return studentRepository.save(student).awaitFirst()
}

The Benefits of using Coroutines

  • Coroutines allows us to write asynchronous code in a sequential manner whilst allowing non-blocking operations to be dispatched in the background. Calls to other suspending functions calls operate like normal function calls - we need to wait for the called function to finish executing before fetching the response and executing the rest of the code.
  • Programmers should be able to adjust to coding with coroutines as its functionality is similar to WebFlux as seen in the table comparison above.
  • Skills/principles obtained from using coroutines can be applied to android/back-end/multi-project platforms.
    For example coroutines allows us to ensure any suspend functions to be called from the main thread.
  • Improved performance - coroutines are basically light weight threads whereas using schedulers with reactor can incur a lot of overhead due to thread context switching.

To conclude, in this blog we’ve touched upon a brief introduction to WebFlux whilst looking at a coroutines equivalent. Unfortunately this is too big of a topic to cover; for example if we are not using reactive non-blocking gradle dependencies then our application is not exactly reactive. The lack of reactive database drivers is a downside - JDBC in blocking only a few database drivers such as MongoDB & Cassandra support reactive drivers officially however if you can oversee this then WebFlux is definitely the web framework for you.

References:

[1] https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html?is-external=true

[2] https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial

[3] https://docs.spring.io/spring/docs/5.2.0.M1/spring-framework-reference/languages.html#how-reactive-translates-to-coroutines

[4] https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/