Skip to main content


Showing posts from September, 2017

Idiomatic concurrency: flatMap() vs. parallel() - RxJava FAQ

Bieszczady mountains Simple, effective and safe concurrency was one of the design principles of RxJava. Yet, ironically, it's probably one of the most misunderstood aspects of this library. Let's take a simple example: imagine we have a bunch of UUID s and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per each UUID , for example loading an object from a database: Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id)); First I'm generating 100 random UUID s just for the sake of testing. Then for each UUID I'd like to load a record using the following method: Person slowLoadBy(UUID id) { //... } The implementation of slowLoadBy() is irrelevant, just keep in mind it's slow and blocking. Using subscribe() to invoke slowLoadBy() has many disadvantages: subscribe() is single-threaded by desi

Detecting and testing stalled streams - RxJava FAQ

Topiło Lake, Białowieża Forest Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally no events can be seen for several seconds. This can be an issue if your stream is transmitted over web socket, SSE or any other network protocol. Silent period taking too long (stall) can be interpreted as network issue. Therefore we often send artificial events ( pings ) once in a while just to make sure: clients are still alive let clients know we are still alive A more concrete example, imagine we have a Flowable<String> stream that produces some events. When there is no event for more than one second, we should send a placeholder "PING" message. When the silence is even longer, there should be a "PING" message every second. How can we implement such a requirement in RxJava? The most obvious, but incorrect solution is to merge original stream with pings : Flowabl

Fixed-rate vs. fixed-delay - RxJava FAQ

Topiło lake in Białowieża Forest If you are using plain Java, since version 5 we have a handy scheduler class that allows running tasks at fixed rate or with fixed delay: import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); Basically it supports two types of operations: scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS); scheduleAtFixedRate() will make sure doStuff() is invoked precisely every second with an initial delay of two seconds. Of course garbage collection, context-switching, etc. still can affect the precision. scheduleWithFixedDelay() is seemingly similar, however it takes doStuff() processing time into account. For example, if doStuff() runs for 200ms, fixed rate will wait only 800ms until next retry. scheduleWithFixedDelay() on the other hand, always w

Streaming large JSON file with Jackson - RxJava FAQ

Oslo coast In the previous article, we learned how to parse excessively large XML files and turn them into RxJava streams. This time let's look at a large JSON file. We will base our examples on tiny colors.json containing almost 150 records of such format: { "aliceblue": [240, 248, 255, 1], "antiquewhite": [250, 235, 215, 1], "aqua": [0, 255, 255, 1], "aquamarine": [127, 255, 212, 1], "azure": [240, 255, 255, 1], //... Little known fact: azure is also a colour and python is a snake. But back to RxJava. This file is tiny but we'll use it to learn some principles. If you follow them you'll be capable of loading and continually processing arbitrarily large, even infinitely long JSON files. First of all the standard " Jackson " way is similar to JAXB: loading the whole file into memory and mapping it to Java beans. However, if your file is in megabyte or gigabytes (because somehow you found JSO