Skip to main content

Posts

Showing posts from January, 2014

Three flavours of request-response pattern in Akka

Imagine a simple Akka actor system consisting of two parties: MonitoringActor and NetworkActor. Whenever someone (client) sends CheckHealth to the former one it asks the latter by sending Ping. NetworkActor is obligated to reply with Pong as soon as possible (scenario [A]). Once MonitoringActor receives such a reply it immediately replies to the client with Up status message. However MonitoringActor is obligated to send Down reply if NetworkActor failed to respond with Pong within one second (scenario [B]). Both workflows are depicted below:



Apparently there are at least three ways to implement this simple task in Akka and we shall study their pros and cons.

Ordinary actor In this scenario MonitoringActor listens for Pong directly without any intermediaries:

class MonitoringActor extends Actor with ActorLogging { private val networkActor = context.actorOf(Props[NetworkActor], "network") private var origin: Option[ActorRef] = None def receive = { case CheckHealth =…

Turning Twitter4J into RxJava's Observable

aTwitter4J is a Java wrapper around Twitter API. While Twitter supports simple request-response interactions in this article we will explore streaming APIs. In contrary to request-response model which is always initiated by the client, streaming API pushes data from Twitter server to the clients as soon as they are available. Of course in case of Twitter we are talking about tweets, called Status in the API.

The question is, how would you design a Java API for streaming purposes? No surprise here: callbacks, callbacks everywhere!

import twitter4j.*; TwitterStream twitter = new TwitterStreamFactory().getInstance(); twitter.addListener(new StatusAdapter() { public void onStatus(Status status) { System.out.println(status.getUser().getName() + " : " + status.getText()); } }); twitter.sample(); Say that on top of this API we would like to count how many messages we receive per second. A lot of accidental complexity sneaks in:

final AtomicInteger countPerSecond = new Atom…