![]() |
Bieszczady mountains |
UUID
s and for each one of them we must perform a set of tasks. The first problem is to perform I/O intensive operation per each UUID
, for example loading an object from a database:Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id));First I'm generating 100 random
UUID
s just for the sake of testing. Then for each UUID
I'd like to load a record using the following method:Person slowLoadBy(UUID id) { //... }The implementation of
slowLoadBy()
is irrelevant, just keep in mind it's slow and blocking. Using subscribe()
to invoke slowLoadBy()
has many disadvantages:subscribe()
is single-threaded by design and there is no way around it. EachUUID
is loaded sequentially- when you call
subscribe()
you can not transformPerson
object further. It's a terminal operation
map()
each UUID
:Flowable<Person> people = ids .map(id -> slowLoadBy(id)); //BROKENThis is very readable but unfortunately broken. Operators, just like subscribers, are single-threaded. This means at any given time only one
UUID
can be mapped, no concurrency is allowed here as well. To make matters worse, we are inheriting thread/worker from upstream. This has several drawbacks. If the upstream produces events using some dedicated scheduler, we will hijack threads from that scheduler. For example many operators, like interval()
, use Schedulers.computation()
thread pool transparently. We suddenly start to perform I/O intensive operations on a pool that is totally not suitable for that. Moreover, we slow down the whole pipeline with this one blocking, sequential step. Very, very bad.You might have heard about this
subscribeOn()
operator and how it enables concurrency. Indeed, but you have to be very careful when applying it. The following sample is (again) wrong:import io.reactivex.schedulers.Schedulers; Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .map(id -> slowLoadBy(id)); //BROKENThe code snippet above is still broken.
subscribeOn()
(and observeOn()
for that matter) barely switch execution to a different worker (thread) without introducing any concurrency. The stream still sequentially processes all events, but on a different thread. In other words - rather than consuming events sequentially on a thread inherited from upstream, we now consume them sequentially on io()
thread. So what about this mythical flatMap()
operator?
flatMap()
operator to the rescue
flatMap()
operator enables concurrency by splitting a stream of events into a stream of substreams. But first, one more broken example:Flowable<Person> asyncLoadBy(UUID id) { return Flowable.fromCallable(() -> slowLoadBy(id)); } Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .flatMap(id -> asyncLoadBy(id)); //BROKENOh gosh, this is still broken!
flatMap()
operator logically does two things:- applying the transformation (
id -> asyncLoadBy(id)
) on each upstream event - this producesFlowable<Flowable<Person>>
. This makes sense, for each upstreamUUID
we get aFlowable<Person>
so we end up with a stream of streams ofPerson
objects
- then
flatMap()
tries to subscribe to all of these inner sub-streams at once. Whenever any of the substreams emit aPerson
event, it is transparently passed as an outcome of outerFlowable
.
flatMap()
only creates and subscribes to the first 128 (by default, optional maxConcurrency
parameter) substreams. Also when the last substream completes, outer stream of Person
completes as well. Now, why on earth is this broken? RxJava doesn't introduce any thread pool unless explicitly asked for. For example this piece of code is still blocking:log.info("Setup"); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; }); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");Look at the output carefully, especially on the order of events and threads involved:
19:57:28.847 | INFO | main | Setup 19:57:28.943 | INFO | main | Created 19:57:28.949 | INFO | main | Starting 19:57:29.954 | INFO | main | Done 19:57:29.955 | INFO | main | Received Hello, world! 19:57:29.957 | INFO | main | DoneNo concurrency whatsoever, no extra threads. Merely wrapping blocking code in a
Flowable
doesn't magically add concurrency. You have to explicitly use... subscribeOn()
:log.info("Setup"); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; }) .subscribeOn(Schedulers.io()); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");The output this time is more promising:
19:59:10.547 | INFO | main | Setup 19:59:10.653 | INFO | main | Created 19:59:10.662 | INFO | main | Done 19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting 19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done 19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world!But we did use
subscribeOn()
last time, what's going on? Well, subscribeOn()
on the outer stream level basically said that all events should be processed sequentially, within this stream, on a different thread. We didn't say that there should many sub-streams running concurrently. And because all sub-streams are blocking, when RxJava tries to subscribe to all of them, it effectively subscribes sequentially to one after another. asyncLoadBy()
is not really async, thus it blocks when flatMap()
operator tries to subscribe to it. The fix is easy. Normally you would put subscribeOn()
inside asyncLoadBy()
but for educational purposes I'll place it directly in the main pipeline:Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));Now it works like a charm! By default RxJava will take first 128 upstream events (
UUID
s), turn them into sub-streams and subscribe to all of them. If sub-streams are asynchronous and highly parallelizable (e.g. network calls), we get 128 concurrent invocations of asyncLoadBy()
. The concurrency level (128) is configurable via maxConcurrency
parameter:Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()), 10 //maxConcurrency );That was a lot of work, don't you think? Shouldn't concurrency be even more declarative? We no longer deal with
Executor
s and futures, but still, it seems this approach is too error prone. Can't it be as simple as parallel()
in Java 8 streams?
Enter ParallelFlowable
Let's first look again at our example and make it even more complex by adding filter()
:Flowable<Person> people = ids .map(this::slowLoadBy) //BROKEN .filter(this::hasLowRisk); //BROKENwhere
hasLowRisk()
is a slow predicate:boolean hasLowRisk(Person p) { //slow... }We already know that idiomatic approach to this problem is by using
flatMap()
, twice:Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(io())) .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));
asyncHasLowRisk()
is rather obscure - it either returns a single-element stream when predicate passes or an empty stream when it fails. This is how you emulate filter()
using flatMap()
. Can we do better? Since RxJava 2.0.5 there is a new operator called... parallel()
! It's quite surprising because operator with the same name was removed before RxJava became 1.0 due to many misconceptions and being misused. parallel()
in 2.x seems to finally address the problem of idiomatic concurrency in a safe and declarative way. First, let's see some beautiful code!Flowable<Person> people = ids .parallel(10) .runOn(Schedulers.io()) .map(this::slowLoadBy) .filter(this::hasLowRisk) .sequential();Just like that! A block of code between
parallel()
and sequential()
runs... in parallel. What do we have here? First of all the new parallel()
operator turns Flowable<UUID>
into ParallelFlowable<UUID>
which has a much smaller API than Flowable. You'll see in a second why. The optional int
parameter (10
in our case) defines concurrency, or (as the documentation puts it) how many concurrent "rails" are created. So for us we split single Flowable<Person>
into 10 concurrent, independent rails (think: threads). Events from original stream of UUID
s are split (modulo 10
) into different rails, sub-streams that are independent from each other. Think of them as sending upstream events into 10 separate threads. But first we have to define where these threads come from - using handy runOn()
operator. This is so much better than parallel()
on Java 8 streams where you have no control over concurrency level.At this point we have a
ParallelFlowable
. When an event appears in upstream (UUID
) it is delegated to one of 10 "rails", concurrent, independent pipelines. Pipeline provides a limited subset of operators that are safe to run concurrently, e.g. map()
and filter()
, but also reduce()
. There is no buffer()
, take()
etc. as their semantics are unclear when invoked on many sub-streams at once. Our blocking slowLoadBy()
as well as hasLowRisk()
are still invoked sequentially, but only within single "rail". Because we now have 10 concurrent "rails", we effectively parallelized them without much effort.When events reach the end of sub-stream ("rail") they encounter
sequential()
operator. This operator turns ParallelFlowable
back into Flowable
. As long as our mappers and filters are thread-safe, parallel()
/sequential()
pair provides very easy way of parallelizing streams. One small caveat - you will inevitably get messages reordered. Sequential map()
and filter()
always preserve order (like most operators). But once you run them within parallel()
block, the order is lost. This allows for greater concurrency, but you have to keep that in mind.Should you use
parallel()
rather than nested flatMap()
to parallelize your code? It's up to you, but parallel()
seems to be much easier to read and grasp.
Hi Tomasz, love your blog btw. Good introduction to .parallel, however the ex. seems a little contrived as you could have:
ReplyDelete.flatMap(id -> asyncLoadBy(id)
.filter(this::hasLowRisk)
.subscribeOn(io())
)
unless I'm missing something?
This comment has been removed by a blog administrator.
ReplyDeleteHello Tomasz,
ReplyDeleteIs it possible to run background operation after Unsubscribe?
Ex: I create a stream of 3 strings: A, B, C and I introduce high latency for C and the same latency for A and B. By using the operator first() I unsubscribe too faster to C and C does not have the time to be executed so the unscription kill C. Is there a easy way to leave C continuing running in the background???
@Test
public void test_69_b() throws Exception {
List intList = Arrays.asList("A", "B", "C");
print("start");
Observable test = Observable.from(intList)
.flatMap(this::findWrapperS)
.first();
print("build finnished");
test.subscribe(this::printAtSub);
print("End");
Sleeper.sleep(Duration.ofSeconds(4));
}
private Observable findWrapperS(String id) {
return Observable.just(id).doOnUnsubscribe(() -> {
print("Wrapper <" + id + "> is released");
})
.observeOn(Schedulers.io())
.flatMap(i -> Observable.fromCallable(() -> sendBackString(i)));
}
private String sendBackString(String string) {
switch (string){
case "C":
Sleeper.sleep(Duration.ofMillis(1000));
print("Hello " + string);
return string;
default:
Sleeper.sleep(Duration.ofMillis(10));
print("Hello " + string);
return string;
}
}
Given logs
Delete=================
18:35:58.052 [Test worker] INFO tests.TestRunner - Got: start
18:35:58.078 [Test worker] INFO tests.TestRunner - Got: build finnished
18:35:58.248 [Test worker] INFO tests.TestRunner - Got: End
18:35:58.258 [Test worker] INFO tests.Sleeper - Sleeping PT4S ms
18:35:58.258 [RxIoScheduler-3] INFO tests.Sleeper - Sleeping PT0.01S ms
18:35:58.259 [RxIoScheduler-2] INFO tests.Sleeper - Sleeping PT0.01S ms
18:35:58.259 [RxIoScheduler-4] INFO tests.Sleeper - Sleeping PT1S ms
18:35:58.269 [RxIoScheduler-2] INFO tests.TestRunner - Got: Hello A
18:35:58.269 [RxIoScheduler-3] INFO tests.TestRunner - Got: Hello B
18:35:58.271 [RxIoScheduler-2] INFO tests.TestRunner - TERMINAL EVENT --->: A
18:35:58.272 [RxIoScheduler-2] INFO tests.TestRunner - Got: Wrapper [A] is released
18:35:58.274 [RxIoScheduler-2] INFO tests.TestRunner - Got: Wrapper [C] is released
18:35:58.274 [RxIoScheduler-2] INFO tests.TestRunner - Got: Wrapper [B] is released
18:35:58.280 [RxIoScheduler-4] WARN tests.Sleeper - Sleep interrupted
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at tests.Sleeper.sleep(Sleeper.java:24)
at tests.TestRunner.sendBackString(TestRunner.java:249)
at tests.TestRunner.lambda$null$15(TestRunner.java:242)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:48)
at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:227)
at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18:35:58.281 [RxIoScheduler-4] INFO tests.TestRunner - Got: Hello C