Skip to main content

Posts

Showing posts from 2014

Asynchronous timeouts with CompletableFuture

One day I was rewriting poorly implemented multi-threaded code that was blocking at some point on Future.get():

public void serve() throws InterruptedException, ExecutionException, TimeoutException { final Future<Response> responseFuture = asyncCode(); final Response response = responseFuture.get(1, SECONDS); send(response); } private void send(Response response) { //... } This was actually an Akka application written in Java with a thread pool of 1000 threads (sic!) - all of them blocked on this get() call. Otherwise system couldn't keep up with the number of concurrent requests. After refactoring we got rid of all these threads and introduced just one, significantly reducing memory footprint. Let's simplify a bit and show examples in Java 8. The first step is to introduce CompletableFuture instead of plain Future (see: tip 9). It's simple if:

you control how tasks are submitted to ExecutorService: just use CompletableFuture.supplyAsync(..., executo…

Hazelcast member discovery using Curator and ZooKeeper

At one project I was setting up Hazelcast cluster in a private cloud. Within cluster all nodes must see each other, so during bootstrapping Hazelcast will try to locate other cluster members. There is no server and all nodes are made equal. There are couple techniques of discovering members implemented in Hazelcast; unfortunately it wasn't AWS so we couldn't use EC2 autodiscovery and multicast was blocked so built-in multicast support was useless. The last resort was TCP/IP cluster where addresses of all nodes need to be hard-coded in XML configuration:

<tcp-ip enabled="true"> <member>machine1</member> <member>machine2</member> <member>machine3:5799</member> <member>192.168.1.0-7</member> <member>192.168.1.21</member> </tcp-ip> This doesn't scale very well, also nodes in our cloud were assigned dynamically, thus it was not possible to figure out addresses prior runtime. H…

Accessing Meetup's streaming API with RxNetty

This article will touch upon multiple subjects: reactive programming, HTTP, parsing JSON and integrating with social API. All in one use case: we will load and process new meetup.com events in real time via non-bloking RxNetty library, combining the power of Netty framework and flexibility of RxJava library. Meetup provides publicly available streaming API that pushes every single Meetup registered all over the world in real-time. Just browse to stream.meetup.com/2/open_events and observe how chunks of JSON are slowly appearing on your screen. Every time someone creates new event, self-containing JSON is pushed from the server to your browser. This means such request never ends, instead we keep receiving partial data as long as we want. We already examined similar scenario in Turning Twitter4J into RxJava's Observable. Each new meetup event publishes a standalone JSON document, similar to this (lots of details omitted):

{ "id" : "219088449", "name" :…

Benchmarking impact of batching in Hystrix

In previous article "Batching (collapsing) requests in Hystrix" we looked at collapsing API in Hystrix. Check it out before proceeding with this article. Example presented there was rather artificial, merely presenting API. Today let's look at semi-real-life example and do some benchmarking. We already used random.org API some time ago as an example (see: Your first message - discovering Akka), let's use it again. Imagine our application calls the following API facade in order to fetch exactly one random number per request (generateIntegers(1)):

public interface RandomOrgClient { RandomIntegers generateIntegers(int howMany); } As you can see this method can easily fetch more than one number. You might wonder why it returns some fancy RandomIntegers class rather than, say List<Integer>? Well, a list of integers is just a data structure, it doesn't represent any business concept, while *random integers* leaves no room for speculation. Still unsurprisingly…

Converting between Completablefuture and Observable

CompletableFuture<T> from Java 8 is an advanced abstraction over a promise that value of type T will be available in the future. Observable<T> is quite similar, but it promises arbitrary number of items in the future, from 0 to infinity. These two representations of asynchronous results are quite similar to the point where Observable with just one item can be used instead of CompletableFuture and vice-versa. On the other hand CompletableFuture is more specialized and because it's now part of JDK, should become prevalent quite soon. Let's celebrate RxJava 1.0 release with a short article showing how to convert between the two, without loosing asynchronous and event-driven nature of them.

From CompletableFuture<T> to Observable<T>CompletableFuture represents one value in the future, so turning it into Observable is rather simple. When Future completes with some value, Observable will emit that value as well immediately and close stream:

class FuturesTest e…

ExecutorService - 10 tips and tricks

ExecutorService abstraction has been around since Java 5. We are talking about 2004 here. Just a quick reminder: both Java 5 and 6 are no longer supported, Java 7 won't be in half a year. The reason I'm bringing this up is that many Java programmers still don't fully understand how ExecutorService works. There are many places to learn that, today I wanted to share few lesser known features and practices. However this article is still aimed toward intermediate programmers, nothing especially advanced.

1. Name pool threads I can't emphasize this. When dumping threads of a running JVM or during debugging, default thread pool naming scheme is pool-N-thread-M, where N stands for pool sequence number (every time you create a new thread pool, global N counter is incremented) and M is a thread sequence number within a pool. For example pool-2-thread-3 means third thread in second pool created in the JVM lifecycle. See: Executors.defaultThreadFactory(). Not very descriptive. JD…

Batching (collapsing) requests in Hystrix

Hystrix has an advanced feature of collapsing (or batching) requests. If two or more commands run similar request at the same time, Hystrix can combine them together, run one batched request and dispatch split results back to all commands. Let's first see how Hystrix works without collapsing. Imagine we have a service that looks up StockPrice of a given Ticker:

import lombok.Value; import java.math.BigDecimal; import java.time.Instant; @Value class Ticker { String symbol; } @Value class StockPrice { BigDecimal price; Instant effectiveTime; } interface StockPriceGateway { default StockPrice load(Ticker stock) { final Set<Ticker> oneTicker = Collections.singleton(stock); return loadAll(oneTicker).get(stock); } ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers); } Core implementation of StockPriceGateway must provide loadAll() batch method while load() method is implemented for our convenience. So our gateway i…

Java Performance: The Definitive Guide - review

Java Performance: The Definitive Guide is the best Java book I read this year. In about 400 pages Scott Oaks touches every aspect of Java-based applications, from core terminology and methodologies, through tooling, JIT, garbage collection, threading etc., to reach high-level topics, such as Java EE, JDBC/JPA, Java 8 and even... JavaScript and CSS compression. But let's go through this book chapter by chapter.

First the author explains common terms like what is a microbenchmark, measuring throughput versus response time, etc. Surprisingly few sections are devoted solely to statistics and interpretation of inherently varying benchmark results. Oaks goes as far as briefly explaining Student's t-test - important tool in measuring correctness of tests. I found that part very enjoyable (and way too short), but it's just about enough for ordinary purposes. Now it's time to get our hands dirty. Before we start exploring Java performance, author goes through various tool, both…

Hazelcast's MapLoader pitfalls

One of the core data structures provided by Hazelcast is IMap<K, V> extending java.util.concurrent.ConcurrentMap - which is basically a distributed map, often used as cache. You can configure such map to use custom MapLoader<K, V> - piece of Java code that will be asked every time you try to .get() something from that map (by key) which is not yet there. This is especially useful when you use IMap as a distributed in-memory cache - if client code asks for something that wasn't cached yet, Hazelcast will transparently execute your MapLoader.load(key):

public interface MapLoader<K, V> { V load(K key); Map<K, V> loadAll(Collection<K> keys); Set<K> loadAllKeys(); } The remaining two methods are used during startup to optionally warm-up cache by loading pre-defined set of keys. Your custom MapLoader can reach out to (No)SQL database, web-service, file-system, you name it. Working with such a cache is much more convenient because you don&#…