Skip to main content

Reactive emoji tracker with WebClient and Reactor: aggregating data

In the first part we managed to connect to emojitracker.com and consume SSE stream that looks like this:

data:{"1F60D":1}

data:{"1F3A8":1,"1F48B":1,"1F499":1,"1F602":1,"2764":1}

data:{"1F607":1,"2764":2}
Each message represents the number of various emojis that appeared on Twitter since the previous message. After a few transformations, we got a stream of hexadecimal Unicode values for each emoji. E.g. for {"1F607":1,"2764":2} we produce three events: "1F607", "2764", "2764". This is how we achieved it:

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final Flux<String> stream = WebClient
        .create("http://emojitrack-gostreamer.herokuapp.com")
        .get().uri("/subscribe/eps")
        .retrieve()
        .bodyToFlux(ServerSentEvent.class)
        .flatMap(e -> Mono.justOrEmpty(e.data()))
        .map(x -> (Map<String, Integer>) x)
        .flatMapIterable(Map::entrySet)
        .flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()));
Our next goal is to show the top 50 most popular emojis since we started the application. But first let's convert these boring Unicode hexadecimal values into, you know, emojis! This is pretty simple:

 String hexToEmoji(String hex) {
    return new String(Character.toChars(Integer.parseInt(hex, 16)));
}
Seems to work:

@Unroll
class EmojiTrackerSpec extends Specification {

    def 'translate #hex to #emoji'() {
        expect:
            hexToEmoji(hex) == emoji
        where:
            hex     || emoji
            '2611'  || '☑'
            '263A'  || '☺'
            '2764'  || '❤'
            '1F440' || '๐Ÿ‘€'
            '1F49E' || '๐Ÿ’ž'
            '1F605' || '๐Ÿ˜…'
            '1F60A' || '๐Ÿ˜Š'
            '1F60D' || '๐Ÿ˜'
            '1F60E' || '๐Ÿ˜Ž'
            '1F60F' || '๐Ÿ˜'
            '1F61E' || '๐Ÿ˜ž'
            '1F62D' || '๐Ÿ˜ญ'
            '1F646' || '๐Ÿ™†'
            '1F6B6' || '๐Ÿšถ'
    }

}
Apparently, this is the weirdest test case I ever wrote. Let's plug in hexToEmoji():

final Flux<String> stream = WebClient
         //...see above for missing lines...
        .map(ServerSentEvent::data)
        .map(x -> (Map<String, Integer>) x)
        .flatMapIterable(Map::entrySet)
        .flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()))
        .map(Tracker::hexToEmoji);
My terminal exploded with happy faces, hearts and other emojis:

Received: ๐ŸŽฅ
Received: ๐Ÿ’™
Received: ๐Ÿ˜
Received: ๐Ÿš‘
Received: ๐Ÿ˜‚
Received: ๐Ÿ˜’
Received: ๐ŸŽ‰
Received: ๐Ÿ˜‰
Received: ❤
Received: ⚽
Received: ๐Ÿ‘
Received: ๐Ÿ˜
Received: ♻
Received: ♻
Received: ๐Ÿ’™
Received: ๐Ÿ”ฅ
Received: ๐Ÿ˜‚
Received: ๐Ÿ˜…
Received: ๐Ÿ˜˜
Received: ๐Ÿ’ช
Received: ๐Ÿ˜‰
Received: ♻
Received: ๐Ÿ˜ช
Received: ๐Ÿ˜ƒ
Received: ๐Ÿ™
Received: ๐Ÿ’”
Received: ๐Ÿ˜‚
Received: ๐Ÿ˜
Received: ๐ŸŽถ
Received: ๐ŸŽน
Received: ๐Ÿ‘
Received: ๐Ÿ”ฅ
Received: ๐Ÿ˜Ž
Then it exploded for real with: NumberFormatException: For input string: "1F1F5-1F1F1". Turns out some emojis are bigger than the other. For example, two individual characters: ๐Ÿ‡ต and ๐Ÿ‡ฑ when placed next to each other (๐Ÿ‡ต๐Ÿ‡ฑ) may be rendered as a flag. Polish flag in this case. A single emoji formed from two emojis. We need to enhance our parsing logic by parsing each hexadecimal number separated by dash (-) individually and concatenating characters. To be honest I started with something quite complex:

private static String codeToEmoji(String hex) {
    return Arrays
            .stream(hex.split("-"))
            .map(Tracker::hexToEmoji)
            .collect(joining());
}

private static String hexToEmoji(String hex) {
    return new String(Character.toChars(Integer.parseInt(hex, 16)));
}
But it turns out the more straightforward solution is both more readable and most likely faster:

private static String codeToEmoji(String hex) {
    final String[] codes = hex.split("-");
    if (codes.length == 2) {
        return hexToEmoji(codes[0]) + hexToEmoji(codes[1]);
    } else {
        return hexToEmoji(hex);
    }
}
Maybe not as impressive, but I like it more. Few more test cases and we are free to go:

'1F1F5-1F1F1' || '๐Ÿ‡ต๐Ÿ‡ฑ'
'1F1FA-1F1E6' || '๐Ÿ‡บ๐Ÿ‡ฆ'
'1F1FA-1F1F8' || '๐Ÿ‡บ๐Ÿ‡ธ'
OK, we are finally ready to aggregate individual events. We must somehow aggregate individual emojis into some sort of histogram (occurrence map). Basically, we want a Map<String, Long> of all emojis since the very beginning. The worst way to do this is global, mutable state:

final ConcurrentHashMap<String, Long> histogram = new ConcurrentHashMap<>();

final Flux<String> stream = WebClient
         //...see above for missing lines...
        .map(Tracker::codeToEmoji)
        .doOnNext(emoji -> histogram.merge(emoji, 1L, Math::addExact));
If you are still not aware of Map.merge() method, it does something quite common, that can be expressed like this:

if(histogram.contains(emoji)) {
    histogram.put(emoji, Math.addExact(histogram.get(emoji), 1L)
} else {
    histogram.put(emoji(1L));
}
After five seconds the histogram may look for example like this:

{๐Ÿ’ธ=1, ☀=1, ☁=1, ✅=2, ⬅=1, ✈=1, ๐Ÿ’ฏ=3, ๐Ÿšฎ=1, ✋=2, ✌=2, ๐Ÿ’ฒ=1, ๐Ÿšจ=1, ๐Ÿ’จ=1, ๐Ÿ†=1, ๐Ÿšง=1, ๐Ÿ’ฅ=1, ✔=4, ☕=1, ๐Ÿ’ช=2, ๐ŸŽผ=1, ๐Ÿ’ก=1, ๐Ÿ€=1, ๐Ÿ“š=1, ✨=7, ๐Ÿ“…=1, ๐Ÿ“Œ=2, ๐Ÿจ=1, ☺=6, ‼=2, ๐Ÿ“ท=5, ๐ŸŒš=2, ๐Ÿ“น=3, ๐Ÿ“ฐ=1, ๐ŸŒ=1, ๐ŸŒ†=1, ๐ŸŒŠ=1, ❗=1, ๐Ÿ“ž=2, ๐Ÿ“=1, ๐Ÿ‡บ๐Ÿ‡ธ=2, ๐Ÿ˜˜=5, ๐ŸŒท=1, ๐Ÿ˜–=2, ๐Ÿ˜•=2, ❤=42, ๐Ÿ˜œ=3, ♥=7, ๐Ÿ˜›=1, ♦=1, ๐ŸŒน=3, ๐Ÿ˜š=1, ๐ŸŒธ=2, ๐Ÿ˜™=1, ๐Ÿ=2, ๐Ÿ˜=2, ๐Ÿ˜=11, ๐Ÿ˜Ž=2, ๐Ÿ˜=12, ๐Ÿ””=1, ๐Ÿ˜”=1, ๐Ÿ“=1, ๐ŸŒฑ=1, ๐Ÿ˜’=8, ๐Ÿ’=1, ๐Ÿ”‘=1, ๐Ÿ˜‘=4, ๐Ÿ”ˆ=1, ๐Ÿ˜ˆ=1, ๐Ÿ˜‡=2, ๐Ÿ˜†=2, ๐Ÿ†=1, ๐Ÿ˜…=3, ๐Ÿ˜Œ=3, ๐Ÿ˜‹=2, ๐Ÿ˜Š=11, ๐Ÿ˜‰=9, ๐Ÿ”‰=1, ๐ŸŒŸ=2, ๐Ÿ˜€=4, ๐ŸŒž=2, ♻=9, ๐Ÿ˜„=8, ๐Ÿ˜ƒ=1, ๐Ÿ˜‚=75, ๐Ÿ˜=9, ๐Ÿ˜ธ=1, ๐Ÿถ=1, ๐Ÿ˜ถ=1, ๐Ÿ˜ต=1, ๐Ÿ˜ป=3, ๐Ÿ˜ฐ=1, ๐Ÿ˜ฎ=2, ๐Ÿ˜ญ=18, ๐Ÿ”ด=2, ๐Ÿ˜ด=3, ๐Ÿ˜ณ=1, ๐Ÿณ=1, ๐Ÿ˜ฒ=2, ๐Ÿ˜ฑ=4, ๐Ÿ˜จ=2, ๐Ÿ„=1, ๐Ÿ”ฅ=4, ๐Ÿ˜ฅ=4, ๐Ÿ˜ฌ=2, ๐Ÿ”ซ=1, ๐Ÿ˜ซ=2, ➕=1, ๐Ÿ˜ฉ=7, ๐ŸŒฟ=1, ๐Ÿ˜ =1, ๐Ÿ˜ž=3, ๐Ÿ”ž=1, ๐Ÿ˜=2, ๐ŸŒผ=1, ๐Ÿ˜ค=1, ๐Ÿ˜ฃ=1, ๐Ÿ˜ข=5, ๐Ÿ=1, ๐Ÿ˜ก=4, ⚠=1, ➡=4, ⚡=2, ๐Ÿบ=1, ©=1, ๐Ÿ‘=6, ๐Ÿ™=2, ๐Ÿ‘Ž=1, ๐Ÿ‘=10, ๐Ÿ‘“=1, ®=1, ๐Ÿ‘‘=2, ๐Ÿ™ˆ=4, ๐Ÿ‘‡=3, ๐Ÿซ=1, ๐Ÿ™Œ=5, ๐Ÿ‘Œ=2, ๐Ÿ‘‹=2, ๐Ÿ™‹=1, ๐Ÿ™Š=2, ▶=2, ๐Ÿ‘Š=1, ๐Ÿ‘‰=2, ๐Ÿ‘€=3, ⚽=3, ◀=1, 9⃣=1, ๐Ÿ†’=1, ๐ŸŽ‰=2, ๐Ÿ’˜=3, ๐ŸŽถ=2, ๐Ÿ’—=2, ๐Ÿš—=1, ๐Ÿš–=2, ๐ŸŽต=1, ๐Ÿ’•=6, ๐Ÿ’œ=2, ๐Ÿ’›=7, ๐Ÿ’™=8, ๐Ÿ’Ž=1, ๐ŸŽฌ=1, ๐Ÿ’”=11, ๐ŸŽฒ=1, ๐Ÿ’“=2, ๐ŸŽง=2, ๐Ÿ’‹=5, ๐Ÿ’€=4, ๐Ÿ’„=1, ๐Ÿ’ƒ=1}

Within 5 seconds ๐Ÿ˜‚ emoji was sent 75 times to Twitter! So why is this solution bad? Modifying global mutable state from within your reactive stream inevitably leads to race conditions and problems with synchronization. A much better solution is to aggregate events within the stream itself. It's a bit mind-bending. Basically, we turn a stream of individual events into a stream of gradually built aggregation. Every event is applied to our histogram and passed further downstream. Look:

final Flux<HashMap<String, Long>> stream = WebClient
         //...see above for missing lines...
        .map(Tracker::codeToEmoji)
        .scan(new HashMap<String, Long>(), (histogram, emoji) -> {
            histogram.merge(emoji, 1L, Math::addExact);
            return histogram;
        });
See how a single emoji in the input stream (for example "๐Ÿ’–") results in a histogram of "{๐Ÿ’–=1}" on the output stream:

๐Ÿ’–   ->  {๐Ÿ’–=1}
๐Ÿ”   ->  {๐Ÿ’–=1, ๐Ÿ”=1}
๐Ÿ˜‚   ->  {๐Ÿ’–=1, ๐Ÿ”=1, ๐Ÿ˜‚=1}
๐Ÿ‘€   ->  {๐Ÿ’–=1, ๐Ÿ‘€=1, ๐Ÿ”=1, ๐Ÿ˜‚=1}
๐Ÿ˜   ->  {๐Ÿ’–=1, ๐Ÿ‘€=1, ๐Ÿ˜=1, ๐Ÿ”=1, ๐Ÿ˜‚=1}
๐Ÿ˜‚   ->  {๐Ÿ’–=1, ๐Ÿ‘€=1, ๐Ÿ˜=1, ๐Ÿ”=1, ๐Ÿ˜‚=2}
๐Ÿ˜€   ->  {๐Ÿ’–=1, ๐Ÿ˜€=1, ๐Ÿ‘€=1, ๐Ÿ˜=1, ๐Ÿ”=1, ๐Ÿ˜‚=2}
๐Ÿ˜‚   ->  {๐Ÿ’–=1, ๐Ÿ˜€=1, ๐Ÿ‘€=1, ๐Ÿ˜=1, ๐Ÿ”=1, ๐Ÿ˜‚=3}
Notice how each individual emoji is either added to the map or increments existing entry. Theoretically, the occurrence map (histogram) can grow quite large. However, the number of different emojis is fixed and not that large (2666 as of this writing). Now we'd like to find the top 50 emojis - 50 map entries with highest occurrence count. This can easily be done with JDK 8 Stream API:

import java.util.Comparator;
import static java.util.Comparator.comparing;

String topEmojis(HashMap<String, Long> histogram, int max) {
    return histogram
            .entrySet()
            .stream()
            .sorted(comparing(Map.Entry<String, Long>::getValue).reversed())
            .limit(max)
            .map(Map.Entry::getKey)
            .collect(joining(" "));
}
In the end we generate a String containing top 50 emojis, separated by spaces. We don't want to see the outcome after each and every emoji. Instead, let's sample the results 10 times a second:

final Flux<String> stream = WebClient
         //...see above for missing lines...
        .scan(new HashMap<String, Long>(), (histogram, emoji) -> {
            histogram.merge(emoji, 1L, Math::addExact);
            return histogram;
        })
        .map(hist -> topEmojis(hist, 50))
        .sample(Duration.ofMillis(100));
The full source code looks as follows:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static java.util.stream.Collectors.joining;
import static java.util.Comparator.comparing;

public class Tracker {

    private static final Logger log = LoggerFactory.getLogger(Tracker.class);
    
    public static void main(String[] args) throws InterruptedException {
        final Flux<String> stream = WebClient
                .create("http://emojitrack-gostreamer.herokuapp.com")
                .get().uri("/subscribe/eps")
                .retrieve()
                .bodyToFlux(ServerSentEvent.class)
                .flatMap(e -> Mono.justOrEmpty(e.data()))
                .map(x -> (Map<String, Integer>) x)
                .flatMapIterable(Map::entrySet)
                .flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()))
                .map(Tracker::codeToEmoji)
                .scan(new HashMap<String, Long>(), (histogram, emoji) -> {
                    histogram.merge(emoji, 1L, Math::addExact);
                    return histogram;
                })
                .map(hist -> topEmojis(hist, 50))
                .sample(Duration.ofMillis(100));


        stream.subscribe(sse -> log.info("Received: {}", sse));

        TimeUnit.MINUTES.sleep(10);
    }

    private static String topEmojis(HashMap<String, Long> histogram, int max) {
        return histogram
                .entrySet()
                .stream()
                .sorted(comparing(Map.Entry<String, Long>::getValue).reversed())
                .limit(max)
                .map(Map.Entry::getKey)
                .collect(joining(" "));
    }

    private static String codeToEmoji(String hex) {
        final String[] codes = hex.split("-");
        if (codes.length == 2) {
            return hexToEmoji(codes[0]) + hexToEmoji(codes[1]);
        } else {
            return hexToEmoji(hex);
        }
    }

    private static String hexToEmoji(String hex) {
        return new String(Character.toChars(Integer.parseInt(hex, 16)));
    }

}
And the results are adorable:



You might think this and the previous article aren't very practical. On the surface, yes. But we learned a few techniques that can be really valuable when dealing with real streams of data. Also producing and consuming SSE stream is the easiest way to enable streaming architecture in your ecosystem.

Comments

  1. This comment has been removed by a blog administrator.

    ReplyDelete
  2. Great post Tomasz, Thank you.

    I copy the code in my github account: https://github.com/guedim/spring-projects/tree/master/webclient

    Thanks.

    ReplyDelete

Post a Comment