Skip to main content

Posts

Showing posts from August, 2017

Loading files with backpressure - RxJava FAQ

Processing file as a stream turns out to be tremendously effective and convenient. Many people seem to forget that since Java 8 (3+ years!) we can very easily turn any file into a stream of lines:

String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { reader.lines() .filter(line -> !line.startsWith("#")) .map(String::toLowerCase) .flatMap(line -> Stream.of(line.split(" "))) .forEach(System.out::println); }reader.lines() returns a Stream<String> which you can further transform. In this example, we discard lines starting with "#" and explode each line by splitting it into words. This way we achieve stream of words as opposed to stream of lines. Working with text files is almost as simple as working with normal Java collections. In RxJava we already learned about generate() operator. It can be used here as well to create robust stre…

Generating backpressure-aware streams with Flowable.generate() - RxJava FAQ

RxJava is missing a factory to create an infinite stream of natural numbers. Such a stream is useful e.g. when you want to assign unique sequence numbers to possibly infinite stream of events by zipping both of them:

Flowable<Long> naturalNumbers = //??? Flowable<Event> someInfiniteEventStream = //... Flowable<Pair<Long, Event>> sequenced = Flowable.zip( naturalNumbers, someInfiniteEventStream, Pair::of ); Implementing naturalNumbers is surprisingly complex. In RxJava 1.x you could briefly get away with Observable that does not respect backpressure:

import rx.Observable; //RxJava 1.x Observable<Long> naturalNumbers = Observable.create(subscriber -> { long state = 0; //poor solution :-( while (!subscriber.isUnsubscribed()) { subscriber.onNext(state++); } }); What does it mean that such stream is not backpressure-aware? Well, basically the stream produces events (ever-incrementing state variable) as fast …

1.x to 2.x migration: Observable vs. Observable: RxJava FAQ

The title is not a mistake. rx.Observable from RxJava 1.x is a completely different beast than io.reactivex.Observable from 2.x. Blindly upgrading rx dependency and renaming all imports in your project will compile (with minor changes) but does not guarantee the same behavior. In the very early days of the project Observable in 1.x had no notion of backpressure but later on backpressure was included. What does it actually mean? Let's imagine we have a stream that produces one event every 1 millisecond but it takes 1 second to process one such item. You see it can't possibly work this way in the long run:

import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers; Observable .interval(1, MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe( x -> sleep(Duration.ofSeconds(1)));MissingBackpressureException creeps in within few hundred milliseconds. But what does this exception mean? Well, basically it's a safety net…

flatMap() and the order of events - RxJava FAQ

As we already discovered, flatMap() does not preserve the order of original stream. Let's illustrate this using the GeoNames API example from previous article:
public interface GeoNames { Flowable<Long> populationOf(String city); } By requesting population of multiple cities using flatMap() we have no guarantee that they will arrive in order:

Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"); cities .flatMap(geoNames::populationOf) .subscribe(response -> log.info("Population: {}", response)); The output is somewhat surprising:

17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1 17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1 17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1 17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1 17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms) 17:09:49.939 | R…

flatMap() vs. concatMap() vs. concatMapEager() - RxJava FAQ

There are three, seamlessly similar operators in RxJava 2.x: flatMap(), concatMap() and concatMapEager(). All of them accept the same argument - a function from original stream's individual item to a (sub-)stream of arbitrary type. In other words if you have a Flowable<T> you provide a function from T to Flowable<R> for arbitrary R type. After applying any of these operators you end up with Flowable<R>. So how are they different?

Sample project First let's build a sample application. We will use Retrofit2 HTTP client wrapper that has built-in plugins for RxJava2. Our task is to leverage GeoNames API in order to find the population of any city in the world. The interface looks as follows:

public interface GeoNames { Flowable<Long> populationOf(String city); } The implementation of this interface is auto-generated by Retrofit, scroll down to see glue source code. For the time being just assume we have a function that takes a String with city name an…