Quick demo of Akka
Akka helps develop concurrent and scalable systems using a message-passing mechanism. It supports Scala and Java and there’s also an Akka.NET project. I’m investigating it to see how well it works when you have hundreds or thousands of workers in an application. There’s been some demonstrations on using it with the Google Compute Engine too. But for now, I’ve got a simple console application which runs locally as an introduction.
Using message passing means that the normal shared-memory synchronization primitives are lost – you should supply the data in the message, and pass back a response message with any result. No need for shared memory synchronization primitives. Plus, it allows for location independence – you don’t necessarily need to know if the receiving end is hosted locally or on a separate machine. I’m not going to delve deeply into any of these topics yet – this is purely a get-up-and-running demonstration.
My simple application demonstrates sending messages from a user to a Master actor which sends the message to a set of Slave actors.
To start with, you’ll need a Master actor.
package uk.andrewgorton.akka.simpleakkalocal.actors; import akka.actor.UntypedActor; public class Master extends UntypedActor { @Override public void onReceive(Object message) throws Exception { } } |
import akka.actor.UntypedActor;
public class Master extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
}
}
We’ll follow best practices and define the ‘sayHello’ message the Master handles inside the Master itself, and get it to handle the message.
public class Master extends UntypedActor { //************************************************** // Messages which we expect to receive //************************************************** static public class SayHello implements Serializable { } //************************************************** // Class logic //************************************************** @Override public void onReceive(Object message) throws Exception { if (message instanceof SayHello) { handleStart(); } else { unhandled(message); } } private void handleStart() { System.out.println(String.format("%s received 'sayHello'", getSelf().path().name())); getContext().system().shutdown(); } } |
}
//**************************************************
// Class logic
//**************************************************
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SayHello) {
handleStart();
} else {
unhandled(message);
}
}
private void handleStart() {
System.out.println(String.format("%s received ‘sayHello’", getSelf().path().name()));
getContext().system().shutdown();
}
}
So far, so good.
Now we’ll need a Slave actor, similar to the Master. It will also define a sayHello message (this means we can detect if we send a Master’s sayHello message to a slave by accident).
package uk.andrewgorton.akka.simpleakkalocal.actors; import akka.actor.UntypedActor; import java.io.Serializable; public class Slave extends UntypedActor { //************************************************** // Messages which we expect to receive //************************************************** static public class SayHello implements Serializable { } //************************************************** // Class logic //************************************************** @Override public void onReceive(Object message) throws Exception { if (message instanceof SayHello) { handleSayHello(); } else { unhandled(message); } } private void handleSayHello() { System.out.println(String.format("%s received 'sayHello'", getSelf().path().name())); } } |
import akka.actor.UntypedActor;
import java.io.Serializable;
public class Slave extends UntypedActor {
//**************************************************
// Messages which we expect to receive
//**************************************************
static public class SayHello implements Serializable {
}
//**************************************************
// Class logic
//**************************************************
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SayHello) {
handleSayHello();
} else {
unhandled(message);
}
}
private void handleSayHello() {
System.out.println(String.format("%s received ‘sayHello’", getSelf().path().name()));
}
}
We need to modify the Master actor so it creates a set of Slave actors at startup and maintains the references to them, plus it sends a sayHello message to them when it gets a sayHello message.
public class Master extends UntypedActor { private static final int slaveCount = 10; private List<ActorRef> slaves = null; //************************************************** // Messages which we expect to receive //************************************************** static public class SayHello implements Serializable { } //************************************************** // Class logic //************************************************** @Override public void preStart() throws Exception { super.preStart(); // Create the required number of slaves if (slaves == null) { slaves = new ArrayList<ActorRef>(); for (int counter = 0; counter < slaveCount; ++counter) { slaves.add(getContext().actorOf(Props.create(Slave.class), String.format("slave%d", counter))); } } } @Override public void onReceive(Object message) throws Exception { if (message instanceof SayHello) { handleStart(); } else { unhandled(message); } } private void handleStart() { System.out.println(String.format("%s received 'sayHello'", getSelf().path().name())); for (ActorRef singleSlave : slaves) { singleSlave.tell(new Slave.SayHello(), getSelf()); } // Wait for all the slaves to complete try { Thread.sleep(1000); } catch (InterruptedException e) { // Don't care } System.out.println(String.format("%s shutting down", getSelf().path().name())); getContext().system().shutdown(); } |
private List<ActorRef> slaves = null;
//**************************************************
// Messages which we expect to receive
//**************************************************
static public class SayHello implements Serializable {
}
//**************************************************
// Class logic
//**************************************************
@Override
public void preStart() throws Exception {
super.preStart();
// Create the required number of slaves
if (slaves == null) {
slaves = new ArrayList<ActorRef>();
for (int counter = 0; counter < slaveCount; ++counter) {
slaves.add(getContext().actorOf(Props.create(Slave.class), String.format("slave%d", counter)));
}
}
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SayHello) {
handleStart();
} else {
unhandled(message);
}
}
private void handleStart() {
System.out.println(String.format("%s received ‘sayHello’", getSelf().path().name()));
for (ActorRef singleSlave : slaves) {
singleSlave.tell(new Slave.SayHello(), getSelf());
}
// Wait for all the slaves to complete
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Don’t care
}
System.out.println(String.format("%s shutting down", getSelf().path().name()));
getContext().system().shutdown();
}
And now we tie everything together with an application entrypoint which bootstraps Akka, creates a Master actor and sends it a SayHello message.
package uk.andrewgorton.akka.simpleakkalocal; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import uk.andrewgorton.akka.simpleakkalocal.actors.Master; public class Main { public static void main(String[] args) { ActorSystem system = ActorSystem.create("SimpleAkkaLocal"); ActorRef master = system.actorOf(Props.create(Master.class), "master"); master.tell(new Master.SayHello(), ActorRef.noSender()); } } |
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import uk.andrewgorton.akka.simpleakkalocal.actors.Master;
public class Main {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("SimpleAkkaLocal");
ActorRef master = system.actorOf(Props.create(Master.class), "master");
master.tell(new Master.SayHello(), ActorRef.noSender());
}
}
The full code is available in BitBucket, tagged at 1.0.0. When you run it, it should display the following
$ java -jar target/SimpleAkkaLocal-1.0.0.jar master received 'sayHello' slave0 received 'sayHello' slave2 received 'sayHello' slave3 received 'sayHello' slave1 received 'sayHello' slave4 received 'sayHello' slave5 received 'sayHello' slave7 received 'sayHello' slave6 received 'sayHello' slave9 received 'sayHello' slave8 received 'sayHello' master shutting down |
UPDATE: Changed BitBucket repository name.