Concurrency and Fault Tolerance Made Easy: An Akka Tutorial with Examples
The challenge
Writing concurrent programs is hard. Having to deal with threads, locks, race conditions, and so on is highly error-prone and can lead to code that is difficult to read, test, and maintain.
Many therefore prefer to avoid multithreading altogether. Instead, they employ single-threaded processes exclusively, relying on external services (such as databases, queues, etc.) to handle any needed concurrent or asynchronous operations. While this approach is in some cases a legitimate alternative, there are many scenarios in which it is simply not a viable option. Many real-time systems – such as trading or banking applications, or real-time games – don’t have the luxury of waiting for a single-threaded process to complete (they need the answer now!). Other systems are so compute- or resource-intensive that they would take an inordinate amount of time (hours or even days in some cases) to run without introducing parallelization into their code.
One fairly common single-threaded approach (widely used in the Node.js world, for example) is to use an event-based, non-blocking paradigm. While this does help performance by avoiding context switches, locks, and blocking, it still does not address the issues of using multiple processors concurrently (doing so would require launching, and coordinating between, multiple independent processes).
So does this mean you have no choice but to journey deep into the bowels of threads, locks, and race conditions in order to build a concurrent application?
Thanks to the Akka framework, the answer is no. This tutorial introduces Akka examples and explores the ways in which it facilitates and simplifies the implementation of concurrent, distributed applications.
What is the Akka Framework?
Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant applications on the JVM. Akka is written in Scala, with language bindings provided for both Scala and Java.
Akka’s approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design. A key difference, though – particularly relevant to our discussion – is that the Actor Model was specifically designed and architected to serve as a concurrent model whereas the object-oriented model is not. More specifically, in a Scala actor system, actors interact and share information, without any presupposition of sequentiality. The mechanism by which actors share information with one another, and task one another, is message passing.
Akka creates a layer between the actors and the underlying system such that actors simply need to process messages. All the complexity of creating and scheduling threads, receiving and dispatching messages, and handling race conditions and synchronization, is relegated to the framework to handle transparently.
Akka strictly adheres to the The Reactive Manifesto. Reactive applications aim at replacing traditional multithreaded applications with an architecture that satisfies one or more of the following requirements:
- Event-driven. Using Actors, one can write code that handles requests asynchronously and employs non-blocking operations exclusively.
- Scalable. In Akka, adding nodes without having to modify the code is possible, thanks both to message passing and location transparency.
- Resilient. Any application will encounter errors and fail at some point in time. Akka provides “supervision” (fault tolerance) strategies to facilitate a self-healing system.
- Responsive. Many of today’s high performance and rapid response applications need to give quick feedback to the user and therefore need to react to events in an extremely timely manner. Akka’s non-blocking, message-based strategy helps achieve this.
What is an Actor in Akka?
An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.
Upon receiving a message, an actor may take one or more of the following actions:
- Execute some operations itself (such as performing calculations, persisting data, calling an external web service, and so on)
- Forward the message, or a derived message, to another actor
- Instantiate a new actor and forward the message to it
Alternatively, the actor may choose to ignore the message entirely (i.e., it may choose inaction) if it deems it appropriate to do so.
To implement an actor, it is necessary to extend the akka.actor.Actor trait and implement the receive method. An actor’s receive method is invoked (by Akka) when a message is sent to that actor. Its typical implementation consists of pattern matching, as shown in the following Akka example, to identify the message type and react accordingly:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
def receive = {
case value: String => doSomething(value)
case _ => println("received unknown message")
}
}
Pattern matching is a relatively elegant technique for handling messages, which tends to produce “cleaner” and easier-to-navigate code than a comparable implementation based on callbacks. Consider, for example, a simplistic HTTP request/response implementation.
First, let’s implement this using a callback-based paradigm in JavaScript:
route(url, function(request){
var query = buildQuery(request);
dbCall(query, function(dbResponse){
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(wsRequest, function(wsResponse) {
sendReply(wsResponse);
});
});
});
Now let’s compare this to a pattern-matching-based implementation:
msg match {
case HttpRequest(request) => {
val query = buildQuery(request)
dbCall(query)
}
case DbResponse(dbResponse) => {
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(dbResponse)
}
case WsResponse(wsResponse) => sendReply(wsResponse)
}
While the callback-based JavaScript code is admittedly compact, it is certainly harder to read and navigate. In comparison, the pattern-matching-based code makes it more immediately apparent what cases are being considered and how each is being handled.
The Actor System
Taking a complex problem and recursively splitting it into smaller sub-problems is a sound problem solving technique in general. This approach can be particularly beneficial in computer science (consistent with theSingle Responsibility Principle), as it tends to yield clean, modularized code, with little or no redundancy, that is relatively easy to maintain.
In an actor-based design, use of this technique facilitates the logical organization of actors into a hierarchical structure known as an Actor System. The actor system provides the infrastructure through which actors interact with one another.
In Akka, the only way to communicate with an actor is through an ActorRef
. An ActorRef
represents a reference to an actor that precludes other objects from directly accessing or manipulating that actor’s internals and state. Messages may be sent to an actor via an ActorRef
using one of the following syntax protocols:
!
(“tell”) – sends the message and returns immediately?
(“ask”) – sends the message and returns a Future representing a possible reply
Each actor has a mailbox to which its incoming messages are delivered. There are multiple mailbox implementations from which to choose, with the default implementation being FIFO.
An actor contains many instance variables to maintain state while processing multiple messages. Akka ensures that each instance of an actor runs in its own lightweight thread and that messages are processed one at a time. In this way, each actor’s state can be reliably maintained without the developer needing to explicitly worry about synchronization or race conditions.
Each actor is provided with the following useful information for performing its tasks via the Akka Actor API:
sender
: anActorRef
to the sender of the message currently being processedcontext
: information and methods relating to the context within which the actor is running (includes, for example, anactorOf
method for instantiating a new actor)supervisionStrategy
: defines the strategy to be used for recovering from errorsself
: theActorRef
for the actor itself
To help tie these tutorials together, let’s consider a simple example of counting the number of words in a text file.
For purposes of our Akka example, we’ll decompose the problem into two subtasks; namely, (1) a “child” task of counting the number of words on a single line and (2) a “parent” task of summing those per-line word counts to get the total number of words in the file.
The parent actor will load each line from the file and then delegate to a child actor the task of counting the words in that line. When the child is done, it will send a message back to the parent with the result. The parent will receive the messages with the word counts (for each line) and keep a counter for the total number of words in the entire file, which it will then return to its invoker upon completion.
(Note that the Akka tutorial code samples provided below are intended to be didactic only and therefore do not necessarily concern themselves with all edge conditions, performance optimizations, and so on. Also, a complete compilable version of the code samples shown below is available in this gist.)
Let’s first look at a sample implementation of the child StringCounterActor
class:
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
This actor has a very simple task: consume ProcessStringMsg
messages (containing a line of text), count the number of words on the specified line, and return the result to the sender via a StringProcessedMsg
message. Note that we have implemented our class to use the !
(“tell”) method to send the StringProcessedMsg
message (i.e., to send the message and return immediately).
OK, now let’s turn our attention to the parent WordCounterActor
class:
1. case class StartProcessFileMsg()
2.
3. class WordCounterActor(filename: String) extends Actor {
4.
5. private var running = false
6. private var totalLines = 0
7. private var linesProcessed = 0
8. private var result = 0
9. private var fileSender: Option[ActorRef] = None
10.
11. def receive = {
12. case StartProcessFileMsg() => {
13. if (running) {
14. // println just used for example purposes;
15. // Akka logger should be used instead
16. println("Warning: duplicate start message received")
17. } else {
18. running = true
19. fileSender = Some(sender) // save reference to process invoker
20. import scala.io.Source._
21. fromFile(filename).getLines.foreach { line =>
22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
23. totalLines += 1
24. }
25. }
26. }
27. case StringProcessedMsg(words) => {
28. result += words
29. linesProcessed += 1
30. if (linesProcessed == totalLines) {
31. fileSender.map(_ ! result) // provide result to process invoker
32. }
33. }
34. case _ => println("message not recognized!")
35. }
36. }
Many things are going on in here, so let’s examine each of them in more detail (note that the line numbers referenced in the discussion that follows are based on the above code sample)…
First, notice that the name of the file to process is passed to the WordCounterActor
constructor (line 3). This indicates that the actor is only to be used to process a single file. This also simplifies the coding job for the developer, by avoiding the need to reset state variables (running
, totalLines
, linesProcessed
, and result
) when the job is done, since the instance is only used once (i.e., to process a single file) and then discarded.
Next, observe that the WordCounterActor
handles two types of messages:
StartProcessFileMsg
(line 12)- Received from the external actor that initially initiates the
WordCounterActor
. - When received, the
WordCounterActor
first checks that it is not receiving a redundant request. - If the request is redundant,
WordCounterActor
generates a warning and nothing more is done (line 16). - If the request is not redundant:
WordCounterActor
stores a reference to the sender in thefileSender
instance variable (note that this is anOption[ActorRef]
rather than anOption[Actor]
– see line 9). ThisActorRef
is needed in order to later access and respond to it when processing the finalStringProcessedMsg
(which is received from aStringCounterActor
child, as described below).WordCounterActor
then reads the file and, as each line in the file is loaded, aStringCounterActor
child is created and a message containing the line to be processed is passed to it (lines 21-24).
- Received from the external actor that initially initiates the
StringProcessedMsg
(line 27)- Received from a child
StringCounterActor
when it completes processing the line assigned to it. - When received, the
WordCounterActor
increments the line counter for the file and, if all lines in the file have been processed (i.e., whentotalLines
andlinesProcessed
are equal), it sends the final result to the originalfileSender
(lines 28-31).
- Received from a child
Once again, notice that in Akka, the sole mechanism for inter-actor communication is message passing. Messages are the only thing that actors share and, since actors can potentially access the same messages concurrently, it is important for them to be immutable, in order to avoid race conditions and unexpected behaviors.
It is therefore common to pass messages in the form of case classes since they are immutable by default and because of how seamlessly they integrate with pattern matching.
Let’s conclude the example with the code sample to run the whole app.
object Sample extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._
implicit val ec = global
override def main(args: Array[String]) {
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
}
Notice how this time the ?
method is used to send a message. In this way, the caller can use the returnedFuture to print the final result when this is available and to exit the program by shutting down the ActorSystem.
Akka fault tolerance and supervisor strategies
In an actor system, each actor is the supervisor of its children. If an actor fails to handle a message, it suspends itself and all of its children and sends a message, usually in the form of an exception, to its supervisor.
In Akka, the way in which a supervisor reacts to and handles exceptions that percolate up to it from its children is referred to as a supervisor strategy. Supervisor strategies are the primary and straightforward mechanism by which you define the fault tolerant behavior of your system.
When a message signifying a failure reaches a supervisor, it can take one of the following actions:
- Resume the child (and its children), keeping its internal state. This strategy can be applied when the child state was not corrupted by the error and it can continue functioning correctly.
- Restart the child (and its children), clearing its internal state. This strategy can be used in the opposite scenario of the one just described. If the child state has been corrupted by the error, it is necessary the reset its state before it can be used in the Future.
- Stop the child (and its children) permanently. This strategy can be employed in cases where the error condition is not believed to be rectifiable, but does not jeopardize the rest of the operation being performed, which can be completed in the absence of the failed child.
- Stop itself and escalate the error. Employed when the supervisor does not know how to handle the failure and so it escalates it to its own supervisor.
Moreover, an Actor can decide to apply the action just to the failed children or to its siblings as well. There are two pre-defined strategies for this:
OneForOneStrategy
: Applies the specified action to the failed child onlyAllForOneStrategy
: Applies the specified action to all of its children
Here’s a simple example, using the OneForOneStrategy
:
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy() {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
If no strategy is specified, the following default strategy is employed:
- If there was an error while initializing the actor or if the actor was killed, the actor is stopped.
- If there was any other kind of exception, the actor is simply restarted.
The Akka-supplied implementation of this default strategy is as follows:
final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: Exception ⇒ Restart
}
OneForOneStrategy()(defaultDecider)
}
Akka allows for the implementation of custom supervisor strategies, but as the Akka documentation warns, do so with caution as incorrect implementations may lead to problems such as blocked actor systems (i.e. permanently suspended actors).
Location transparency
The Akka architecture supports location transparency, enabling actors to be entirely agnostic to where the messages that they receive originated. The sender of the message may reside in the same JVM as the actor or in a separate JVM (either running on the same node or a different node). Akka enables each of these cases to be handled in a manner that is completely transparent to the actor (and therefore the developer). The only caveat is that messages sent across multiple nodes must be serializable.
Actor systems are designed to run in a distributed environment without requiring any specialized code. Akka only requires the presence of a configuration file (application.conf
) that specifies the nodes to send messages to. Here’s a simple example of a configuration file:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
A few parting tips…
We have seen how the Akka framework helps achieve concurrency and high performance. However, as this tutorial pointed out, there are a few points to keep in mind when designing and implementing your system in order to exploit the power of Akka to its fullest:
- To the greatest extent possible, each actor should be assigned the smallest task possible (as previously discussed, following the Single Responsibility Principle)
- Actors should handle events (i.e., process messages) asynchronously and should not block, otherwise context switches will happen which can adversely affect performance. Specifically, it is best to perform blocking operations (IO, etc.) in a Future so as not to block the actor; i.e.:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- Be sure your messages are all immutable, since the actors that pass them to one another will all be running concurrently in their own threads. Mutable messages are likely to result in unexpected behavior.
- Since messages sent between nodes must be serializable, it is important to bear in mind that the larger the messages are, the longer it will take to serialize, send, and deserialize them, which can negatively impact performance.
Conclusion
Akka, written in Scala, simplifies and facilitates the development of highly-concurrent, distributed, and fault tolerant applications, hiding much of the complexity from the developer. Doing Akka full justice would require much more than this single tutorial, but hopefully this introduction and its examples were sufficiently captivating to get you to want to read more.
Amazon, VMWare, and CSC are but a few examples of leading companies who are actively using Akka. Visit the official Akka website to learn more and to explore whether Akka could be the right answer for your project as well.
[/pt_text]
[/pt_text][pt_testimonials_balloon name=”About this post” quote=”This is a shared post. Originally posted on “https://www.toptal.com/“. All rights reserved to their editors. I just shared this post on Original Author’s demand to share this post on my blog.” minheight=”50″ css_animation=”fadeIn”]