-
Notifications
You must be signed in to change notification settings - Fork 56
Akka
akka (http://akka.io) - это бибилиотека и среда выполнения, реализующая actor model(wiki:Модель акторов) для JVM. Akka написана на языке Scala, но в связи с тем, что выполняется в JVM, приложения писать можно на любом языке (работающем в JVM).
Akka позволяет не только создавать многопоточные и конкурентные приложения с использованием акторов, но и предоставляет в том числе декларативные средства для управления параметрами, а также обеспечивает возможность кластеризации и удалённой отправки сообщений и старта акторов на удалённой машине.
Akka может работать в 2-х режимах:
- в виде библиотеки и в этом случае запуск ActorSystem необходимо реализовывать собственноручно
- в виде runtime загрузчика - MicroKernel
В наших экспериментах мы будем использовать Akka в виде библиотеки. Актор - основная единица рабочих модулей.Актор инкапсулирует в себе состояние, поведение и очередь сообщений Акторы могут взаимодействовать только с использованием сообщений. ActorSystem - это базовый блок архитектуры, в котором работают все акторы. Mailbox - очередь (может быть как приоритизированной, так и не приоритизированной), в которую попадают сообщения для актора, которые он и обрабатывает Акторы взаимодействуют асинхронно путём отправки сообщений друг другу. Акторы взаимодействуют асинхронно, но обработка сообщений яляется однопоточной, что позволяет отказаться от блокировок, shared state и т.д. Сообщения необходимо делать неизменяемыми (immutable). В системе выделяют supervision tree - это дерево акторов, в котром выделяют особые акторы(supervisor), которые следят за дочерними акторами и в случае ошибок перезапускают "упавшие" акторы. Слежение за акторами происходит посредством подписки на сообщения о запуске, перезапуске и остановке актора
В качестве конфигурации в Akka можно передавать DSL, который конфигурирует кластер, настройки акторов и т.д. и даже дерево акторов. Кроме того, в Akka есть возможность задавать в декларативном стиле особые виды акторов. Например, для увеличения производительности можно создать акторы-роутеры, в задаи которых входит балансировка сообщений между другими акторами (workers).
Создадим пример actor system. Branch pres_0_11_0 (https://github.com/wizardjedi/my-spring-learning/tree/pres_0_11_0)
Для начала создадим пустой проект maven.
Добавим зависимости в pom.xml
и настроим сборку.
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>2.3.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<addMavenDescriptor>false</addMavenDescriptor>
<compress>true</compress>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>libs/</classpathPrefix>
<mainClass>learn.akka.App</mainClass>
</manifest>
</archive>
</configuration>
<version>2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/libs</outputDirectory>
</configuration>
</execution>
</executions>
<version>2.5.1</version>
</plugin>
</plugins>
</build>
Реально для работы нам понадобится только akka-actor
, но cluster и remote добавлены на будущее.
Теперь необходмо реализовать класс простейшего актора. Необходимо пронаследовать класс akka.actor.UntypedActor
.
Наш актор будет получать сообщение и выводить его на консоль. Для обработки сообщений необходимо переопределить метод получения сообщения (onReceive
).
Наш актор будут выводить на консоль данные о том, кто прислал сообщение и где собственно актор находится, а также присланное сообщение.
SimpleActor.java
package learn.akka;
import akka.actor.UntypedActor;
public class SimpleActor extends UntypedActor {
@Override
public void onReceive(Object o) throws Exception {
System.out.println("I'm actor "+self()+" from "+self().path());
System.out.println("I've got message "+o);
}
}
Для взаимодействия с акторами есть специализированный класс ActorRef
. который реализует ссылку на актор (как на локальной машине, так и в кластере).
Теперь рассмотрим пример инициализации actor system.
App.java
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("system");
ActorRef actor = system.actorOf(Props.create(SimpleActor.class));
actor.tell("Hello", ActorRef.noSender());
system.shutdown();
}
Сборка проекта и запуск
$ mvn clean install
$ java -jar target/pres_0_11_0-1.0-SNAPSHOT.jar
I'm actor Actor[akka://system/user/$a#1803303174] from akka://system/user/$a
I've got message Hello
Разберём код более подробно.
Для начала создадим нашу систему акторов с наименованием system. Данное название будет фигурировать в путях поиска акторов.
ActorSystem system = ActorSystem.create("system");
Теперь в созданной системе акторов создадим актор, который реализуется классом SimpleActor
.
ActorRef actor = system.actorOf(Props.create(SimpleActor.class));
Метод actorOf
создаёт актора и возвращает ссылку на актора в виде объекта класса ActorRef
.
Теперь отправим актору сообщение. Отправка сообщений реализуется с помощью метода tell
. Если мы не хотим указывать отправителя собщения, то можем воспользоваться методом ActorRef.noSender()
, который возвращает "пустого отправителя".
actor.tell("Hello", ActorRef.noSender());
Завершим работу с системой акторов для завершения приложения.
system.shutdown();
Ветка pres_0_11_1. https://github.com/wizardjedi/my-spring-learning/tree/pres_0_11_1
Акторы не существуют сами по себе. У акторов есть жизненный цикл.
Внесём изменение в приложения таким образом, чтобы увидеть стадии создания и остановка акторов.
Для начала реализуем следующие методы в SimpleActor
:
-
preRestart
- обработчик, вызываемый перед перезапуском актора -
postStop
- обработчик, вызываемый после остановки актора -
preStart
- обработчик, вызываемый перед стартом актора
public class SimpleActor extends UntypedActor {
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
System.out.println("Restarting "+self());
super.preRestart(reason, message);
}
@Override
public void postStop() throws Exception {
System.out.println("Stopping "+self());
super.postStop();
}
@Override
public void preStart() throws Exception {
System.out.println("Starting "+self());
super.preStart();
}
@Override
public void onReceive(Object o) throws Exception {
System.out.println("I'm actor "+self()+" from "+self().path());
System.out.println("I've got message "+o);
}
}
Теперь внесём изменения в основной класс для отслеживания стадий работы приложения.
public static void main(String[] args) throws InterruptedException {
System.out.println("Actor system create");
ActorSystem system = ActorSystem.create("system");
System.out.println("Actor system created");
ActorRef actor = system.actorOf(Props.create(SimpleActor.class));
System.out.println("Actor created");
actor.tell("Hello", ActorRef.noSender());
System.out.println("Wait 2 seconds");
TimeUnit.SECONDS.sleep(2);
System.out.println("Shutting down");
system.shutdown();
}
Соберём и запустим приложение.
$ mvn clean install
$ java -jar target/pres_0_11_1-1.0-SNAPSHOT.jar
Actor system create
Actor system created
Actor created
Wait 2 seconds
Starting Actor[akka://system/user/$a#-702176811]
I'm actor Actor[akka://system/user/$a#-702176811] from akka://system/user/$a
I've got message Hello
Shutting down
Stopping Actor[akka://system/user/$a#-702176811]
Ветка pres_0_11_2. https://github.com/wizardjedi/my-spring-learning/tree/pres_0_11_2
В предыдущем примере актор останавливался в ответ на завершение всей системы, однако можно послать актору специализированное сообщение, которое приводит к остановке актора после обработки (PoisonPill
).
Внесём следующие изменения, добавив следующий код:
System.out.println("Send poison pill");
actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
$ mvn clean install
$ $ java -jar target/pres_0_11_2-1.0-SNAPSHOT.jar
Actor system create
Actor system created
Actor created
Send poison pill
Starting Actor[akka://system/user/$a#1284879155]
I'm actor Actor[akka://system/user/$a#1284879155] from akka://system/user/$a
I've got message Hello
Wait 2 seconds
Stopping Actor[akka://system/user/$a#1284879155]
... delay for 2 seconds ...
Shutting down
По выводу программы видно, что сначала происходит остановка актора, а уже потом остановка всей системы.
Ветка pres_0_11_3 https://github.com/wizardjedi/my-spring-learning/tree/pres_0_11_3
Наиболее часто встречающейся задачей является создание акторов. Попробуем создать дочерний актор в нашем акторе и реализуем код, который будет "пробрасывать" сообщения дочернему актору.
В Akka есть возможность следить за жизненным циклом акторов, в частности получать события от актора в виде сообщений.
Для того, чтобы следить за жизненным циклом актора есть метод watch
.
Например, создадим дочерний актор и подпишемся на события о его жизненном цикле
final ActorSystem system = context().system();
childActor = system.actorOf(Props.create(SanpleActor.class), "childActor");
getContext().watch(childActor);
Если актор останавливается, то родительский актор получает сообщение Terminated
с причиной остановки актора.
Посмотрим на пример создания акторов, проброски сообщений и подписки на события.
Изменим основной метод таким образом:
public static void main(String[] args) throws InterruptedException {
System.out.println("Actor system create");
ActorSystem system = ActorSystem.create("system");
System.out.println("Actor system created");
ActorRef actor = system.actorOf(Props.create(SimpleActor.class));
System.out.println("Actor created");
System.out.println("Wait 2 seconds");
TimeUnit.SECONDS.sleep(2);
System.out.println("Born child");
actor.tell("Born", ActorRef.noSender());
System.out.println("Wait 2 seconds");
TimeUnit.SECONDS.sleep(2);
System.out.println("Kill child");
actor.tell("Kill", ActorRef.noSender());
System.out.println("Wait 2 seconds");
TimeUnit.SECONDS.sleep(2);
System.out.println("Send poison pill");
actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
TimeUnit.SECONDS.sleep(2);
System.out.println("Shutting down");
system.shutdown();
}
Рассмотрим код самого актора.
public class SimpleActor extends UntypedActor {
protected ActorRef childActor;
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
System.out.println("Restarting " + self());
super.preRestart(reason, message);
}
@Override
public void postStop() throws Exception {
System.out.println("Stopping " + self());
super.postStop();
}
@Override
public void preStart() throws Exception {
System.out.println("Starting " + self());
super.preStart();
}
@Override
public void onReceive(Object o) throws Exception {
if ("Born".equals(o)) {
final ActorSystem system = context().system();
childActor = system.actorOf(Props.create(SimpleActor.class), "child");
getContext().watch(childActor);
System.out.println("Child created");
} else if ("Kill".equals(o)) {
System.out.println("Kill child");
childActor.tell(PoisonPill.getInstance(), self());
} else if (o instanceof Terminated) {
Terminated t = (Terminated)o;
System.out.println("Child "+t.actor()+" was killed. And I've got message");
} else {
System.out.println("Got message "+o);
}
}
}
Если актор получает сообщение Born
, то создаётся дочерний актор и ссылка на него сохраняется в акторе. Дочерний актор в этом случае следит за дочерним актором. Если поступает сообщение Kill
, то родительский актор убивает дочерний актор, отправляя PoisonPill
. Это необходимо для того, чтобы получить сообщение о завершении актора.
Соберём проект и посмотрим на вывод.
$ mvn clean install
$ $ java -jar target/pres_0_11_3-1.0-SNAPSHOT.jar
Actor system create
Actor system created
Actor created
Wait 2 seconds
Starting Actor[akka://system/user/$a#727676505]
Born child
Wait 2 seconds
Starting Actor[akka://system/user/child#-360878496]
Child created
Kill child
Wait 2 seconds
Kill child
Stopping Actor[akka://system/user/child#-360878496]
Child Actor[akka://system/user/child#-360878496] was killed. And I've got message
Send poison pill
Stopping Actor[akka://system/user/$a#727676505]
Shutting down
Из вывода видно, что был создан актор akka://system/user/$a#727676505
и его дочерний актор akka://system/user/child#-360878496
. После чего родительский актор "убил" дочерний актор и получил сообщение об этом.
Ветка pres_0_11_4 https://github.com/wizardjedi/my-spring-learning/tree/pres_0_11_4
Akka построена таким образом, что если сообщение не может быть доставлено адресато, то такие сообщения отправляются специализированному актору akka.actor.DeadLetter
. Данный актор регистрируется в системе при старте и выдаёт сообщения о "мёртвых сообщениях".
Пример выдачи лога о неудачных сообщениях.
[10/17/2014 22:56:44.494] [default-akka.actor.default-dispatcher-4] [akka://default/user/actor/childActor] Message [java.lang.String] from Actor[akka://default/user/actor#566097194] to Actor[akka://default/user/actor/childActor#2083540886] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Однако, данный вывод можно отключить. Для этого в конфиге необходимо задать параметр akka.log-dead-letters = off
.
Например, так:
Config config = ConfigFactory.parseString("akka.log-dead-letters = off");
ActorSystem system = ActorSystem.create("system",config);
Мы воспользовались ConfigFactory
для создания объекта, реализующего интерфейс Config
. При создании конфига передали строку с параметром для отключения логирования dead letters.
Но если понадобилось подписаться на события о dead letters в своём акторе, то это можно сделать так:
ActorRef myActorRef = system.actorOf(Props.create(MyActor.class),"deadLetterActor");
system.eventStream().subscribe(myActorRef, DeadLetter.class);
Чтобы получить ссылку на актор dead letters есть метод deadLetters()
.
ActorSystem system = ActorSystem.create();
system.deadLetters();
Для обработки мёртвых сообщений, актор должен обрабатывать сообщения типа DeadLetter
.
В предыдущих примерах в акторах вызывались только методы preStart()
и postStop()
, а вот методы preRestart()
и postRestart()
не вызывались. Это связано с тем, что не была релизован механизм перезапуска акторов в случае ошибок.
В akka есть специализированный метод, который можно переопределить для возвращения стратегии перезапуска. Стратегия перезапуска - это обработчик перезапуска !!!ДЛЯ ДЧЕРНИХ!!! акторов.
таким образом, отдельная сущность супервайзера не реализуется в Akka, а просто возвращается стратегия перезапуска, которая влияет на перезапуск дочерних акторов.
То есть, для определения стратегии перезапуска необходимо реализовать метод, который возвращает соответствующий объект.
protected static final SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.create("10 second"), new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
OneForOneStrategy
- означает, что в случае проблемы будет перезапущен только тот актор, который завершился, а не все акторы. Также, есть стратегия AllForOneStrategy
, при использовании кторой будут перезапущены все акторы.
Первый параметр - количество разрешённых перезапусков, а второй параметр - период, за который разрешено количество перезапусков. То есть, в нашем примере актор разрешено перезапускать не более 10 раз за 10 секунд, иначе актор не перезапускается.
Последний параметр передаёт обработчик для различных типов исключений. Данный обработчик позволяет настроить поведение супервайзера для каждого из различнх видов исключений.
Реакции следующие:
-
resume()
- актор не перезапускается, а продолжает работать дальше -
restart()
- актор перезапускается -
stop()
- актор завершается -
escalate()
- передаёт исключение выше по дереву супервайзеров для проверки
Реализуем актор, который будет запускать дочерний актор со стратегией слежения, который будет печатать на консоль сообщения типа DeadLetter
. Также актор будет запускать дочерний актор, если получил сообщение born
.
public class MyActor extends UntypedActor {
protected ActorRef child;
protected static final SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.create("10 second"), new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof DeadLetter) {
System.out.println("DEAD LETTER x_x "+message);
} else {
if ("born".equals(message)) {
child = context().actorOf(Props.create(MyChildActor.class),"childActor");
} else {
child.tell(message, self());
}
}
}
}
Класс для дочернего актора будет по сообщению генерировать соответствующий вид исключения и реализовывать обработчики событий.
public class MyChildActor extends UntypedActor {
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
System.out.println("> preRestart "+self());
super.preRestart(reason, message);
}
@Override
public void postRestart(Throwable reason) throws Exception {
System.out.println("> postRestart "+self());
super.postRestart(reason);
}
@Override
public void postStop() throws Exception {
System.out.println("> postStop" + self());
super.postStop();
}
@Override
public void preStart() throws Exception {
System.out.println("> preStart" + self());
super.preStart();
}
@Override
public void onReceive(Object message) throws Exception {
System.out.println("Processing "+self().path()+" msg:"+message);
if ("arith".equals(message)) {
throw new ArithmeticException();
} else if ("null".equals(message)) {
throw new NullPointerException();
} else if ("illegal".equals(message)) {
throw new IllegalArgumentException();
} else if ("exception".equals(message)) {
throw new Exception("Other");
} else {
System.out.println("::"+message);
}
}
}
Основной метод приложения будет выглядеть так:
public static void main(String[] args) throws InterruptedException {
Config config = ConfigFactory.parseString("akka.log-dead-letters = off");
ActorSystem system = ActorSystem.create("system",config);
ActorRef actorOf = system.actorOf(Props.create(MyActor.class),"actor");
system.eventStream().subscribe(actorOf, DeadLetter.class);
TimeUnit.SECONDS.sleep(1);
actorOf.tell("born", ActorRef.noSender());
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("----Some message----");
System.out.println(" ! arith");TimeUnit.MILLISECONDS.sleep(1500);
actorOf.tell("arith", ActorRef.noSender()); // resume
TimeUnit.MILLISECONDS.sleep(1500);
System.out.println(" ! null");TimeUnit.MILLISECONDS.sleep(1500);
actorOf.tell("null", ActorRef.noSender()); // restart
TimeUnit.MILLISECONDS.sleep(1500);
System.out.println(" ! illegal");TimeUnit.MILLISECONDS.sleep(500);
actorOf.tell("illegal", ActorRef.noSender()); // stop
TimeUnit.MILLISECONDS.sleep(1500);
System.out.println(" ! exception");TimeUnit.MILLISECONDS.sleep(500);
actorOf.tell("exception", ActorRef.noSender()); // should invoke escalate, but actor is dead. msg send to dead lettes.
TimeUnit.MILLISECONDS.sleep(1500);
System.out.println(" ! another");TimeUnit.MILLISECONDS.sleep(500);
actorOf.tell("another", ActorRef.noSender()); // actor is dead. msg send to dead lettes.
TimeUnit.SECONDS.sleep(1);
System.out.println("Wait 3 and shutdown system");
TimeUnit.SECONDS.sleep(3);
system.shutdown();
}
Соберём приложение и посмотрим на вывод.
$ mvn clean install
$ java -jar target/pres_0_11_4-1.0-SNAPSHOT.jar
> preStartActor[akka://system/user/actor/childActor#372872900]
## Был создан дочерний актор
----Some message----
! arith
Processing akka://system/user/actor/childActor msg:arith
[WARN] [10/17/2014 23:47:54.520] [system-akka.actor.default-dispatcher-4] [akka://system/user/actor/childActor] null
## Был вызван метод resume.Актор не перезапущен.
! null
Processing akka://system/user/actor/childActor msg:null
> preRestart Actor[akka://system/user/actor/childActor#372872900]
> postStopActor[akka://system/user/actor/childActor#372872900]
[ERROR] [10/17/2014 23:47:57.515] [system-akka.actor.default-dispatcher-2] [akka://system/user/actor/childActor] null
java.lang.NullPointerException
at learn.akka.MyChildActor.onReceive(MyChildActor.java:43)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> postRestart Actor[akka://system/user/actor/childActor#372872900]
> preStartActor[akka://system/user/actor/childActor#372872900]
## Был вызван метод restart().Актор перезапущен.
! illegal
Processing akka://system/user/actor/childActor msg:illegal
[ERROR] [10/17/2014 23:47:59.514] [system-akka.actor.default-dispatcher-3] [akka://system/user/actor/childActor] null
java.lang.IllegalArgumentException
at learn.akka.MyChildActor.onReceive(MyChildActor.java:45)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> postStopActor[akka://system/user/actor/childActor#372872900]
## Был вызван метод stop().Актор остановлен.
! exception
DEAD LETTER x_x DeadLetter(exception,Actor[akka://system/user/actor#-1878314046],Actor[akka://system/user/actor/childActor#372872900])
! another
DEAD LETTER x_x DeadLetter(another,Actor[akka://system/user/actor#-1878314046],Actor[akka://system/user/actor/childActor#372872900])
## Актор остановлен, поэтому сообщения уходят в dead letters.
Wait 3 and shutdown system