Skip to main content

Posts

Showing posts from 2017

Idiomatic concurrency: flatMap() vs. parallel() - RxJava FAQ

Bieszczady mountains Simple, effective and safe concurrency was one of the design principles of RxJava. Yet, ironically, it's probably one of the most misunderstood aspects of this library. Let's take a simple example: imagine we have a bunch of UUID s and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per each UUID , for example loading an object from a database: Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id)); First I'm generating 100 random UUID s just for the sake of testing. Then for each UUID I'd like to load a record using the following method: Person slowLoadBy(UUID id) { //... } The implementation of slowLoadBy() is irrelevant, just keep in mind it's slow and blocking. Using subscribe() to invoke slowLoadBy() has many disadvantages: subscribe() is single-threaded by desi

Detecting and testing stalled streams - RxJava FAQ

Topiło Lake, Białowieża Forest Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally no events can be seen for several seconds. This can be an issue if your stream is transmitted over web socket, SSE or any other network protocol. Silent period taking too long (stall) can be interpreted as network issue. Therefore we often send artificial events ( pings ) once in a while just to make sure: clients are still alive let clients know we are still alive A more concrete example, imagine we have a Flowable<String> stream that produces some events. When there is no event for more than one second, we should send a placeholder "PING" message. When the silence is even longer, there should be a "PING" message every second. How can we implement such a requirement in RxJava? The most obvious, but incorrect solution is to merge original stream with pings : Flowabl

Fixed-rate vs. fixed-delay - RxJava FAQ

Topiło lake in Białowieża Forest If you are using plain Java, since version 5 we have a handy scheduler class that allows running tasks at fixed rate or with fixed delay: import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); Basically it supports two types of operations: scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS); scheduleAtFixedRate() will make sure doStuff() is invoked precisely every second with an initial delay of two seconds. Of course garbage collection, context-switching, etc. still can affect the precision. scheduleWithFixedDelay() is seemingly similar, however it takes doStuff() processing time into account. For example, if doStuff() runs for 200ms, fixed rate will wait only 800ms until next retry. scheduleWithFixedDelay() on the other hand, always w

Streaming large JSON file with Jackson - RxJava FAQ

Oslo coast In the previous article, we learned how to parse excessively large XML files and turn them into RxJava streams. This time let's look at a large JSON file. We will base our examples on tiny colors.json containing almost 150 records of such format: { "aliceblue": [240, 248, 255, 1], "antiquewhite": [250, 235, 215, 1], "aqua": [0, 255, 255, 1], "aquamarine": [127, 255, 212, 1], "azure": [240, 255, 255, 1], //... Little known fact: azure is also a colour and python is a snake. But back to RxJava. This file is tiny but we'll use it to learn some principles. If you follow them you'll be capable of loading and continually processing arbitrarily large, even infinitely long JSON files. First of all the standard " Jackson " way is similar to JAXB: loading the whole file into memory and mapping it to Java beans. However, if your file is in megabyte or gigabytes (because somehow you found JSO

Loading files with backpressure - RxJava FAQ

Oslofjord Processing file as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!) we can very easily turn any file into a stream of lines: String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { reader.lines() .filter(line -> !line.startsWith("#")) .map(String::toLowerCase) .flatMap(line -> Stream.of(line.split(" "))) .forEach(System.out::println); } reader.lines() returns a Stream<String> which you can further transform. In this example, we discard lines starting with "#" and explode each line by splitting it into words. This way we achieve stream of words as opposed to stream of lines. Working with text files is almost as simple as working with normal Java collections. In RxJava we already learned about generate() operator. It can be used here as wel

Generating backpressure-aware streams with Flowable.generate() - RxJava FAQ

Babia Góra, Poland RxJava is missing a factory to create an infinite stream of natural numbers. Such a stream is useful e.g. when you want to assign unique sequence numbers to possibly infinite stream of events by zipping both of them: Flowable<Long> naturalNumbers = //??? Flowable<Event> someInfiniteEventStream = //... Flowable<Pair<Long, Event>> sequenced = Flowable.zip( naturalNumbers, someInfiniteEventStream, Pair::of ); Implementing naturalNumbers is surprisingly complex. In RxJava 1.x you could briefly get away with Observable that does not respect backpressure: import rx.Observable; //RxJava 1.x Observable<Long> naturalNumbers = Observable.create(subscriber -> { long state = 0; //poor solution :-( while (!subscriber.isUnsubscribed()) { subscriber.onNext(state++); } }); What does it mean that such stream is not backpressure-aware? Well, basically the stream produces events (ever-increm

1.x to 2.x migration: Observable vs. Observable: RxJava FAQ

View from Basilica of Notre-Dame de Fourvière The title is not a mistake. rx.Observable from RxJava 1.x is a completely different beast than io.reactivex.Observable from 2.x. Blindly upgrading rx dependency and renaming all imports in your project will compile (with minor changes) but does not guarantee the same behavior. In the very early days of the project Observable in 1.x had no notion of backpressure but later on backpressure was included. What does it actually mean? Let's imagine we have a stream that produces one event every 1 millisecond but it takes 1 second to process one such item. You see it can't possibly work this way in the long run: import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers; Observable .interval(1, MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe( x -> sleep(Duration.ofSeconds(1))); MissingBackpressureException creeps in within few hundred milliseconds. But what does

flatMap() and the order of events - RxJava FAQ

Parc de la Tête d'Or, Lyon As we already discovered, flatMap() does not preserve the order of original stream. Let's illustrate this using the GeoNames API example from previous article : public interface GeoNames { Flowable<Long> populationOf(String city); } By requesting population of multiple cities using flatMap() we have no guarantee that they will arrive in order: Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"); cities .flatMap(geoNames::populationOf) .subscribe(response -> log.info("Population: {}", response)); The output is somewhat surprising: 17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1 17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1 17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1 17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1 17:09:49.939 | Rx-4 | <-- 200 OK .

flatMap() vs. concatMap() vs. concatMapEager() - RxJava FAQ

Frozen Stawy Cietrzewia There are three, seamlessly similar operators in RxJava 2.x: flatMap() , concatMap() and concatMapEager() . All of them accept the same argument - a function from original stream's individual item to a (sub-)stream of arbitrary type. In other words if you have a Flowable<T> you provide a function from T to Flowable<R> for arbitrary R type. After applying any of these operators you end up with Flowable<R> . So how are they different? Sample project First let's build a sample application. We will use Retrofit2 HTTP client wrapper that has built-in plugins for RxJava2. Our task is to leverage GeoNames API in order to find the population of any city in the world. The interface looks as follows: public interface GeoNames { Flowable<Long> populationOf(String city); } The implementation of this interface is auto-generated by Retrofit, scroll down to see glue source code. For the time being just assume we have a fu

Eager subscription - RxJava FAQ

Warsaw center from Park Szczęśliwicki While teaching and mentoring RxJava, as well as after authoring a book , I noticed some areas are especially problematic. I decided to publish a bunch of short tips that address most common pitfalls. This is the first part. Observable s and Flowable s are lazy by nature. This means no matter how heavy or long-running logic you place inside your Flowable , it will get evaluated only when someone subscribes. And also as many times as someone subscribes. This is illustrated by the following code snippet: private static String slow() throws InterruptedException { logger.info("Running"); TimeUnit.SECONDS.sleep(1); return "abc"; } //... Flowable<String> flo = Flowable.fromCallable(this::slow); logger.info("Created"); flo.subscribe(); flo.subscribe(); logger.info("Done"); Such Observable or Flowable will inevitably print: 19:37:57.368 [main] - Created 19:37:57.379 [main] - Running 1