Skip to main content


Reactive emoji tracker with WebClient and Reactor: aggregating data

In the first part we managed to connect to and consume SSE stream that looks like this:

data:{"1F60D":1} data:{"1F3A8":1,"1F48B":1,"1F499":1,"1F602":1,"2764":1} data:{"1F607":1,"2764":2} Each message represents the number of various emojis that appeared on Twitter since the previous message. After a few transformations, we got a stream of hexadecimal Unicode values for each emoji. E.g. for {"1F607":1,"2764":2} we produce three events: "1F607", "2764", "2764". This is how we achieved it:

import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; final Flux<String> stream = WebClient .create("") .get().uri("/subscribe/eps") …

Reactive emoji tracker with WebClient and Reactor: consuming SSE

In this article we will learn how to consume infinite SSE (server-sent events) stream with Spring's WebClient and Project Reactor. WebClient is a new HTTP client in Spring 5, entirely asynchronous and natively supporting Flux and Mono types. You can technically open thousands of concurrent HTTP connections with just a handful of threads. In standard RestTemplate one HTTP connection always needs at least one thread.

As an example, let's connect to this cute little site called It shows emojis being used in real-time on Twitter. Looks quite cool! All credits go to Matthew Rothenberg, the creator of that site. It's very dynamic so there obviously has to be some push mechanism underneath. I wore my hacker glasses and after hours of penetration testing, I discovered the following URL in Chrome DevTools: If you connect to it, you'll get a fast stream of emoji counters:

$ curl -v http://emojitrack-gostr…

Spring Boot 2: Fluxes, from Elasticsearch to controller

The final piece of the puzzle in our series is exposing reactive APIs via RESTful interfaces. Previously we were seeding our Elasticsearch with some sample fake data. Now it's about time to expose indexing functionality through some API. Let's start with some simple adapter to our indexing engine:

import lombok.RequiredArgsConstructor; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @Component @RequiredArgsConstructor class ElasticAdapter { private final RestHighLevelClient client; private final ObjectMapper objectMapper; Mono<IndexResponse> index(Person doc) { return indexDoc(doc); } private void doIndex(Pe…

Spring Boot 2: Migrating from Dropwizard metrics to Micrometer

Spring Boot 2 is around the corner. One of the minor changes is the replacement of Dropwizard Metrics with Micrometer. The migration path is fairly straightforward and Micrometer actually provides cleaner API. With Metrics, you have to inject MetricRegistry wherever you need some metrics (see: Monitoring and measuring reactive application with Dropwizard Metrics). This has many drawbacks:

we are mixing business and technical dependencies in our componentstherefore I am sometimes reluctant to add new metrics because it requires me to inject MetricRegistryalso MetricRegistry must be stubbed in unit tests Micrometer's tagline is:

Think SLF4J, but for metrics

It's actually quite accurate. Whenever I need a Logger I don't inject LoggerFactory, instead I simply use static methods available everywhere. The same goes for Micrometer, I simply use static factory methods on globally available Metrics class:

private final Timer indexTimer = Metrics.timer("es.timer"); private f…

Monitoring and measuring reactive application with Dropwizard Metrics

In the previous article we created a simple indexing code that hammers ElasticSearch with thousands of concurrent requests. The only way to monitor the performance of our system was an old-school logging statement:

.window(Duration.ofSeconds(1)) .flatMap(Flux::count) .subscribe(winSize -> log.debug("Got {} responses in last second", winSize)); It's fine, but on a production system, we'd rather have some centralized monitoring and charting solution for gathering various metrics. This becomes especially important once you have hundreds of different applications in thousands of instances. Having a single graphical dashboard, aggregating all important information, becomes crucial. We need two components in order to collect some metrics:

publishing metricscollecting and visualizing them Publishing metrics using Dropwizard Metrics In Spring Boot 2 Dropwizard Metrics were replaced by Micrometer. This article uses the former, the next one will show the latter solution in pr…

Spring, Reactor and Elasticsearch: bechmarking with fake test data

In the previous article we created a simple adapter from ElasticSearch's API to Reactor's Mono, that looks like this:

import reactor.core.publisher.Mono; private Mono<IndexResponse> indexDoc(Doc doc) { //... } Now we would like to run this method at controlled concurrency level, millions of times. Basically, we want to see how our indexing code behaves under load, benchmark it.

Fake data with jFairy First, we need some good looking test data. For that purpose, we'll use a handy jFairy library. The document we'll index is a simple POJO:

@Value class Doc { private final String username; private final String json; } The generation logic is wrapped inside a Java class:

import io.codearte.jfairy.Fairy; import io.codearte.jfairy.producer.person.Address; import io.codearte.jfairy.producer.person.Person; import org.apache.commons.lang3.RandomUtils; @Component class PersonGenerator { private final ObjectMapper objectMapper; private final Fairy fairy;…

Spring, Reactor and Elasticsearch: from callbacks to reactive streams

Spring 5 (and Boot 2, when it arrives in a couple of weeks) is a revolution. Not the "annotations over XML" or "Java classes over annotations" type of revolution. It's truly a revolutionary framework that enables writing a brand new class of applications. Over the recent years, I became a little bit intimidated by this framework. "Spring Cloud being framework that simplifies the usage of Spring Boot, being a framework that simplifies the usage of Spring, being a framework, that simplifies enterprise development." (also known as "start... dot spring... dot I... O") lists 120 different modules (!) that you can add to your service. Spring these days became an enormous umbrella project and I can imagine why some people (still!) prefer Java EE (or whatever it's called these days).

But Spring 5 brings the reactive revolution. It's no longer only a wrapper around blocking servlet API and various web frameworks. Spring 5, on…

Idiomatic concurrency: flatMap() vs. parallel() - RxJava FAQ

Simple, effective and safe concurrency was one of the design principles of RxJava. Yet, ironically, it's probably one of the most misunderstood aspects of this library. Let's take a simple example: imagine we have a bunch of UUIDs and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per each UUID, for example loading an object from a database:

Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id)); First I'm generating 100 random UUIDs just for the sake of testing. Then for each UUID I'd like to load a record using the following method:

Person slowLoadBy(UUID id) { //... } The implementation of slowLoadBy() is irrelevant, just keep in mind it's slow and blocking. Using subscribe() to invoke slowLoadBy() has many disadvantages:

subscribe() is single-threaded by design and there is no way around it. Each UUID…

Detecting and testing stalled streams - RxJava FAQ

Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally no events can be seen for several seconds. This can be an issue if your stream is transmitted over web socket, SSE or any other network protocol. Silent period taking too long (stall) can be interpreted as network issue. Therefore we often send artificial events (pings) once in a while just to make sure:

clients are still alivelet clients know we are still alive A more concrete example, imagine we have a Flowable<String> stream that produces some events. When there is no event for more than one second, we should send a placeholder "PING" message. When the silence is even longer, there should be a "PING" message every second. How can we implement such a requirement in RxJava? The most obvious, but incorrect solution is to merge original stream with pings:

Flowable<String> events = //... Flowable<String&g…

Fixed-rate vs. fixed-delay - RxJava FAQ

If you are using plain Java, since version 5 we have a handy scheduler class that allows running tasks at fixed rate or with fixed delay:

import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); Basically it supports two types of operations:

scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS);scheduleAtFixedRate() will make sure doStuff() is invoked precisely every second with an initial delay of two seconds. Of course garbage collection, context-switching, etc. still can affect the precision. scheduleWithFixedDelay() is seemingly similar, however it takes doStuff() processing time into account. For example, if doStuff() runs for 200ms, fixed rate will wait only 800ms until next retry. scheduleWithFixedDelay() on the other hand, always waits for the same amount of time (1 second in our …