Actor模型探索
最后更新于:2022-04-01 19:46:57
### Akka是什么
Akka就是为了改变编写高容错性和强可扩展性的并发程序而生的。通过使用Actor模型我们提升了抽象级别,为构建正确的可扩展并发应用提供了一个更好的平台。在容错性方面我们采取了“let it crash”(让它崩溃)模型,人们已经将这种模型用在了电信行业,构建出“自愈合”的应用和永不停机的系统,取得了巨大成功。Actor还为透明的分布式系统以及真正的可扩展高容错应用的基础进行了抽象。
Akka是JVM(JAVA虚拟机,下同)平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用 Scala语言写成,同时提供了Scala和JAVA的开发接口。
Akka处理并发的方法基于Actor模型。在基于Actor的系统里,所有的事物都是Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别——那就是Actor模型是作为一个并发模型设计和架构的,而面向对象模式则不是。更具体一点,在Scala的Actor系统里,Actor互相交互并共享信息但并不对交互顺序作出预设。Actor之间共享信息和发起任务的机制是消息传递。
Akka在多个Actor和下面的系统之间建立了一个层次(Layer),这样一来,Actor只需要处理消息就可以了。**创建和调度线程、接收和分发消息以及处理竞态条件和同步的所有复杂性,都委托给框架,框架的处理对应用来说是透明的。**
### Akka混合模型特点
> - Actors
Actors为你提供:
对并发/并行程序的简单的、高级别的抽象。
异步、非阻塞、高性能的事件驱动编程模型(代码可以异步处理请求并采用独占的方式执行非阻塞操作)。
非常轻量的事件驱动处理(1G内存可容纳约270万个Actors)。
> - 容错性
使用“let-it-crash”语义和监管者树形结构来实现容错。非常适合编写永不停机、自愈合的高容错系统。监管者树形结构可以跨多个JVM来提供真正的高容错系统。
> - 位置透明性
Akka的所有元素都为分布式环境而设计:所有Actor都仅通过发送消息进行互操作,所有操作都是异步的。
> - 可伸缩性
在Akka里,不修改代码就增加节点是可能的,感谢消息传递和位置透明性(location transparency)。
> - 高弹性
任何应用都会碰到错误并在某个时间点失败。Akka的“监管”(容错)策略为实现自愈系统提供了便利。
> - 响应式应用
今天的高性能和快速响应应用需要对用户快速反馈,因此对于事件的响应需要非常及时。Akka的非阻塞、基于消息的策略可以帮助达成这个目标。
> - 事务性Actors
事务性Actor是Actor与STM(Software Transactional Memory)的组合。它使你能够使用自动重试和回滚来组合出原子消息流。
### Actor系统
**Actor本质上就是接收消息并采取行动处理消息的对象。它从消息源中解耦出来,只负责正确识别接收到的消息类型,并采取相应的行动。**
Actor是封装状态和行为的对象,他们的唯一通讯方式是交换消息,交换的消息存放在接收方的邮箱里。从某种意义上来说,Actor是面向对象的最严格的形式,但是最后把它们看成一些人:在使用Actor来对解决方案建模时,把Actor想象成一群人,把子任务分配给他们,将他们的功能整理成一个有组织的结构,考虑如何将失败逐级上传。这样的结果就可以在脑中形成进行软件实现的框架。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f4210b71.png "")
### 树形结构
程序中负责某一个功能的Actor可能需要把它的任务分拆成更小的、更易管理的部分。为此它启动子Actor并监管它们。每个Actor有且仅有一个监管者,就是创建它的那个Actor。
**Actor系统的精髓在于任务被分拆开来并进行委托,直到任务小到可以被完整地进行处理。**这样做不仅使任务本身被清晰地划分出结构,而且最终的Actor也能按照它们“应该处理的消息类型”,“如何完成正常流程的处理”以及“失败流程应如何处理”来进行解析。如果一个Actor对某种状况无法进行处理,它会发送相应的失败消息给它的监管者请求帮助。这样的递归结构使得失败能够在正确的层次进行处理。
可以将这与分层的设计方法进行比较。分层的设计方法最终很容易形成防护性编程,以防止任何失败被泄露出来。把问题交由正确的人处理会是比将所有的事情“藏在深处”更好的解决方案。
现在,设计这种系统的难度在于如何决定谁应该监管什么。这当然没有一个唯一的最佳方案,但是有一些可能会有帮助的原则:
> - 如果一个Actor管理另一个Actor所做的工作,如分配一个子任务,那么父Actor应该监督子Actor,原因是父Actor知道可能会出现哪些失败情况,知道如何处理它们。
> - 如果一个Actor携带着重要数据(i.e. 它的状态要尽可能地不被丢失),这个Actor应该将任何可能的危险子任务分配给它所监管的子Actor,并酌情处理子任务的失败。视请求的性质,可能最好是为每一个请求创建一个子Actor,这样能简化收集回应时的状态管理。这在Erlang中被称为“Error Kernel Pattern”。
> - 如果Actor A需要依赖Actor B才能完成它的任务,A应该观测B的存活状态并对收到B的终止提醒消息进行响应。这与监管机制不同,因为观测方对监管机制没有影响,需要指出的是,仅仅是功能上的依赖并不足以用来决定是否在树形监管体系中添加子Actor.
### 配置容器
多个Actor协作的Actor系统是管理如日程计划服务、配置文件、日志等共享设施的自然单元。使用不同的配置的多个Actor系统可以在同一个jvm中共存。Akka自身没有全局共享的状态。将这与Actor系统之间的透明通讯(在同一节点上或者跨网络连接的多个节点)结合,可以看到Actor系统本身可以被作为功能层次中的积木构件。
### Actor实践
1. Actor们应该被视为非常友好的同事:高效地完成他们的工作而不会无必要地打扰其它人,也不会争抢资源。转换到编程里这意味着以事件驱动的方式来处理事件并生成响应(或更多的请求)。Actor不应该因为某一个外部实体而阻塞(i.e.占据一个线程又被动等待),这个外部实体可能是一个锁、一个网络socket等等。阻塞操作应该在某些特殊的线程里完成,这个线程发送消息给可处理这些消息的Actor们。
1. 不要在Actor之间传递可变对象。为了保证这一点,尽量使用不变量消息。如果Actor将他们的可变状态暴露给外界,打破了封装,你又回到了普通的Java并发领域并遭遇所有其缺点。
1. Actor是行为和状态的容器,接受这一点意味着不要在消息中传递行为(例如在消息中使用scala闭包)。有一个风险是意外地在Actor之间共享了可变状态,而与Actor模型的这种冲突将破坏使Actor编程成为良好体验的所有属性。
### Akka中的Actor模型
使用Actor就像租车——我们如果需要,可以快速便捷地租到一辆;如果车辆发生故障,也不需要自己修理,直接打电话给租车公司更换另外一辆即可。
Actor模型是一种适用性非常好的通用并发编程模型。它可以应用于共享内存架构和分布式内存架构,适合解决地理分布型的问题。同时它还能提供很好的容错性。
**一个Actor是一个容器,它包含了 状态,行为,一个邮箱,子Actor和一个监管策略。所有这些包含在一个Actor Reference里。**
### Actor引用
Actor是以Actor引用的形式展现给外界的,Actor引用可以被自由的无限制地传来传去。内部对象和外部对象的这种划分使得所有想要的操作能够透明:重启Actor而不需要更新别处的引用,将实际Actor对象放置到远程主机上,向另外一个应用程序发送消息。但最重要的方面是从外界不可能到Actor对象的内部获取它的状态,除非这个Actor非常不明智地将信息公布出去。
### 状态
Actor对象通常包含一些变量来反映Actor所处的可能状态。这可能是一个明确的状态机(e.g. 使用 FSM 模块),或是一个计数器,一组监听器,待处理的请求,等等。这些数据使得Actor有价值,并且必须将这些数据保护起来不被其它的Actor所破坏。好消息是在概念上每个Akka Actor都有它自己的轻量线程,这个线程是完全与系统其它部分隔离的。这意味着你不需要使用锁来进行资源同步,可以完全不必担心并发性地来编写你的Actor代码。
在幕后,Akka会在一组线程上运行一组Actor,通常是很多Actor共享一个线程,对某一个Actor的调用可能会在不同的线程上进行处理。Akka保证这个实现细节不影响处理Actor状态的单线程性。
由于内部状态对于Actor的操作是至关重要的,所以状态不一致是致命的。当Actor失败并由其监管者重新启动,状态会进行重新创建,就象第一次创建这个Actor一样。这是为了实现系统的“自愈合”。
### 行为
每次当一个消息被处理时,消息会与Actor的当前的行为进行匹配。行为是一个函数,它定义了处理当前消息所要采取的动作,例如如果客户已经授权过了,那么就对请求进行处理,否则拒绝请求。这个行为可能随着时间而改变,例如由于不同的客户在不同的时间获得授权,或是由于Actor进入了“非服务”模式,之后又变回来。这些变化要么通过将它们放进从行为逻辑中读取的状态变量中实现,要么函数本身在运行时被替换出来,见become 和 unbecome操作。但是Actor对象在创建时所定义的初始行为是特殊的,因为当Actor重启时会恢复这个初始行为。
### 邮箱
Actor的用途是处理消息,这些消息是从其它的Actor(或者从Actor系统外部)发送过来的。连接发送者与接收者的纽带是Actor的邮箱:每个Actor有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。排队按照发送操作的时间顺序来进行,这意味着从不同的Actor发来的消息在运行时没有一个固定的顺序,这是由于Actor分布在不同的线程中。从另一个角度讲,从同一个Actor发送多个消息到相同的Actor,则消息会按发送的顺序排队。
可以有不同的邮箱实现供选择,缺省的是FIFO:Actor处理消息的顺序与消息入队列的顺序一致。这通常是一个好的选择,但是应用可能需要对某些消息进行优先处理。在这种情况下,可以使用优先邮箱来根据消息优先级将消息放在某个指定的位置,甚至可能是队列头,而不是队列末尾。如果使用这样的队列,消息的处理顺序是由队列的算法决定的,而不是FIFO。
Akka与其它Actor模型实现的一个重要差别在于当前的行为必须处理下一个从队列中取出的消息,Akka不会去扫描邮箱来找到下一个匹配的消息。无法处理某个消息通常是作为失败情况进行处理,除非Actor覆盖了这个行为。
### 子Actor
每个Actor都是一个潜在的监管者:如果它创建了子Actor来委托处理子任务,它会自动地监管它们。子Actor列表维护在Actor的上下文中,Actor可以访问它。对列表的更改是通过创建(tt class=”docutils literal”>context.ActorOf(…))或者停止(context.stop(child))子Actor来实现,并且这些更改会立刻生效。实际的创建和停止操作在幕后以异步的方式完成,这样它们就不会“阻塞”其监管者。
### 监管策略
Actor的最后一部分是它用来处理其子Actor错误状况的机制。错误处理是由Akka透明地进行处理的,将监管与监控中所描述的策略中的一个应用于每个出现的失败。由于策略是Actor系统组织结构的基础,所以一旦Actor被创建了它就不能被修改。
考虑对每个Actor只有唯一的策略,这意味着如果一个Actor的子Actor们应用了不同的策略,这些子Actor应该按照相同的策略来进行分组,生成中间的监管者,又一次倾向于根据任务到子任务的划分来组织Actor系统的结构。
### Actor终止
一旦一个Actor终止了,i.e. 失败了并且不能用重启来解决,停止它自己或者被它的监管者停止,它会释放它的资源,将它邮箱中所有未处理的消息放进系统的“死信邮箱”。而Actor引用中的邮箱将会被一个系统邮箱所替代,系统邮箱会将所有新的消息重定向到“排水沟”。 但是这些操作只是尽力而为,所以不能依赖它来实现“保证投递”。
不是简单地把(未处理的:译者注)消息扔掉的想法来源于我们(Akka:译者注)测试:我们在事件总线上注册了TestEventListener来接收死信,然后将每个收到的死信在日志中生成一条警告。这对于更快地解析测试失败非常有帮助。我们觉得可能这个功能也可以用于其它的目的。
### 参考资料
[让并发和容错更容易:Akka示例教程](http://www.csdn.net/article/2014-12-17/2823174)
[Akka 2.0官方文档中文版](http://www.gtan.com/akka_doc/)
**转载请注明作者Jason Ding及其出处**
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';
在并发程序中使用Future
最后更新于:2022-04-01 19:46:55
### 引言
在Akka中, 一个`Future`是用来获取某个并发操作的结果的数据结构。这个操作通常是由Actor执行或由Dispatcher直接执行的. 这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。
Future提供了一种简单的方式来执行并行算法。
### Future直接使用
Future中的一个常见用例是在不需要使用Actor的情况下并发地执行计算。
Future有两种使用方式:
> 1. 阻塞方式(Blocking):该方式下,父actor或主程序停止执行知道所有future完成各自任务。通过`scala.concurrent.Await`使用。
> 1. 非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过`onComplete`、`onSuccess`、`onFailure`方式使用。
### 执行上下文(ExecutionContext)
为了运行回调和操作,Futures需要有一个`ExecutionContext`。
如果你在作用域内有一个`ActorSystem`,它会它自己派发器用作ExecutionContext,你也可以用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹,或者甚至创建自己的实例。
通过导入`ExecutionContext.Implicits.global`来导入默认的全局执行上下文。你可以把该执行上下文看做是一个线程池,ExecutionContext是在某个线程池执行任务的抽象。
如果在代码中没有导入该执行上下文,代码将无法编译。
### 阻塞方式
第一个例子展示如何创建一个future,然后通过阻塞方式等待其计算结果。虽然阻塞方式不是一个很好的用法,但是可以说明问题。
这个例子中,通过在未来某个时间计算1+1,当计算结果后再返回。
~~~
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureBlockDemo extends App{
implicit val baseTime = System.currentTimeMillis
// create a Future
val f = Future {
Thread.sleep(500)
1+1
}
// this is blocking(blocking is bad)
val result = Await.result(f, 1 second)
// 如果Future没有在Await规定的时间里返回,
// 将抛出java.util.concurrent.TimeoutException
println(result)
Thread.sleep(1000)
}
~~~
代码解释:
> 1. 在上面的代码中,被传递给Future的代码块会被缺省的`Dispatcher`所执行,代码块的返回结果会被用来完成`Future`。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
> 1. `Await.result`方法将阻塞1秒时间来等待Future结果返回,如果Future在规定时间内没有返回,将抛出`java.util.concurrent.TimeoutException`异常。
> 1. 通过导入`scala.concurrent.duration._`,可以用一种方便的方式来声明时间间隔,如`100 nanos`,`500 millis`,`5 seconds`、`1 minute`、`1 hour`,`3 days`。还可以通过`Duration(100, MILLISECONDS)`,`Duration(200, "millis")`来创建时间间隔。
### 非阻塞方式(回调方式)
有时你只需要监听`Future`的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用。
通过`onComplete`,`onSuccess`,`onFailure`三个回调函数来异步执行Future任务,而后两者仅仅是第一项的特例。
使用onComplete的代码示例:
~~~
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object FutureNotBlock extends App{
println("starting calculation ...")
val f = Future {
Thread.sleep(Random.nextInt(500))
42
}
println("before onComplete")
f.onComplete{
case Success(value) => println(s"Got the callback, meaning = $value")
case Failure(e) => e.printStackTrace
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(2000)
}
~~~
使用onSuccess、onFailure的代码示例:
~~~
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object Test12_FutureOnSuccessAndFailure extends App{
val f = Future {
Thread.sleep(Random.nextInt(500))
if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
}
f onSuccess {
case result => println(s"Success: $result")
}
f onFailure {
case t => println(s"Exception: ${t.getMessage}")
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(1000)
}
~~~
代码解释:
上面两段例子中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
然后在回调函数中进行相关处理。
### 创建返回Future[T]的方法
先看一下示例:
~~~
import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object ReturnFuture extends App{
implicit val baseTime = System.currentTimeMillis
// `future` method is another way to create a future
// It starts the computation asynchronously and retures a Future[Int] that
// will hold the result of the computation.
def longRunningComputation(i: Int): Future[Int] = future {
Thread.sleep(100)
i + 1
}
// this does not block
longRunningComputation(11).onComplete {
case Success(result) => println(s"result = $result")
case Failure(e) => e.printStackTrace
}
// keep the jvm from shutting down
Thread.sleep(1000)
}
~~~
代码解释:
上面代码中的longRunningComputation返回一个`Future[Int]`,然后进行相关的异步操作。
其中`future`方法是创建一个future的另一种方法。它将启动一个异步计算并且返回包含计算结果的`Future[T]`。
### Future用于Actor
通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息`actor ! msg`,这种方法只在发送者是一个Actor时有效;第二种是通过一个Future。
使用Actor的`?`方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:
~~~
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]
~~~
下面是使用`?`发送消息给actor,并等待回应的代码示例:
~~~
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._
case object AskNameMessage
class TestActor extends Actor {
def receive = {
case AskNameMessage => // respond to the 'ask' request
sender ! "Fred"
case _ => println("that was unexpected")
}
}
object AskDemo extends App{
//create the system and actor
val system = ActorSystem("AskDemoSystem")
val myActor = system.actorOf(Props[TestActor], name="myActor")
// (1) this is one way to "ask" another actor for information
implicit val timeout = Timeout(5 seconds)
val future = myActor ? AskNameMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
println(result)
// (2) a slightly different way to ask another actor for information
val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
val result2 = Await.result(future2, 1 second)
println(result2)
system.shutdown
}
~~~
代码解释:
> 1. `Await.result(future, timeout.duration).asInstanceOf[String]`会导致当前线程被阻塞,并等待actor通过它的应答来完成`Future`。但是阻塞会导致性能问题,所以是不推荐的。致阻塞的操作位于`Await.result`和`Await.ready`中,这样就方便定位阻塞的位置。
> 1. 还要注意actor返回的Future的类型是`Future[Any]`,这是因为actor是动态的。 这也是为什么上例中注释(1)使用了`asInstanceOf`。
> 1. 在使用非阻塞方式时,最好使用`mapTo`方法来将Future转换到期望的类型。如果转换成功,`mapTo`方法会返回一个包含结果的新的 Future,如果不成功,则返回`ClassCastException`异常。
**转载请注明作者Jason Ding及其出处**
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';
Akka中actor的生命周期与DeathWatch监控
最后更新于:2022-04-01 19:46:53
### Actor的生命周期
在Actor系统中的路径代表一个“地方”,这可能被一个存活着的的actor占用着。最初,路径(除了系统初始化角色)是空的。当`actorOf()`被调用时,指定一个由通过`Props`描述给定的路径角色的化身。一个actor化身由路径和一个UID确定。重新启动仅仅交换Props定义的Actor 实例,但化身与UID依然是相同的。
当该actor停止时,化身的生命周期也相应结束了。在这一刻时间上相对应的生命周期事件也将被调用和监管角色也被通知终止结束。化身被停止之后,路径也可以重复被通过`actorOf()`方法创建的角色使用。在这种情况下,新的化身的名称跟与前一个将是相同的而是UIDs将会有所不同。
一个`ActorRef`总是代表一个化身(路径和UID)而不只是一个给定的路径。因此,如果一个角色停止,一个新的具有相同名称创建的旧化身的`ActorRef`不会指向新的。
在另一方面`ActorSelection`指向该路径(或多个路径在使用通配符时),并且是完全不知道其化身当前占用着它。由于这个原因导致`ActorSelection`不能被监视到。通过发送识别信息到将被回复包含正确地引用(见通过角色选择集识别角色)的`ActorIdentity`的`ActorSelection`来解决当前化身`ActorRef`存在该路径之下。这也可以用`ActorSelection`类的`resolveOne`方法来解决,这将返回一个匹配`ActorRef`的`Future`。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f422b7be.png "")
### Actor生命周期Hook:
Akka Actor定义了下列的生命周期回调钩子(Hook):
> - preStart:在actor实例化后执行,重启时不会执行。
> - postStop:在actor正常终止后执行,异常重启时不会执行。
> - preRestart:在actor异常重启前保存当前状态。
> - postRestart:在actor异常重启后恢复重启前保存的状态。当异常引起了重启,新actor的postRestart方法被触发,默认情况下preStart方法被调用。
### 启动Hook
启动策略,调用preStart Hook,一般用于初始化资源.在创建一个Actor的时候,会调用构造函数,之后调用preStart。
preStart的默认形式:
~~~
def preStart(): Unit = ()
~~~
### 重启Hook
所有的Actor都是被监管的,i.e.以某种失败处理策略与另一个actor链接在一起。如果在处理一个消息的时候抛出的异常,Actor将被重启。这个重启过程包括上面提到的Hook:
> 1. 要被重启的actor的`preRestart`被调用,携带着导致重启的异常以及触发异常的消息; 如果重启并不是因为消息的处理而发生的,所携带的消息为None,例如,当一个监管者没有处理某个异常继而被它自己的监管者重启时。 这个方法是用来完成清理、准备移交给新的actor实例的最佳位置。它的缺省实现是终止所有的子actor并调用`postStop`。
> 1. 最初`actorOf`调用的工厂方法将被用来创建新的实例。
> 1. 新的actor的`postRestart`方法被调用,携带着导致重启的异常信息。
actor的重启会替换掉原来的actor对象;重启不影响邮箱的内容, 所以对消息的处理将在`postRestart hook`返回后继续。触发异常的消息不会被重新接收。在actor重启过程中所有发送到该actor的消息将象平常一样被放进邮箱队列中。
preRestart和postRestart的默认形式:
~~~
def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
def postRestart(reason: Throwable): Unit = {
preStart()
}
~~~
解释一下重启策略的详细内容:
> 1. actor被挂起
> 1. 调用旧实例的 supervisionStrategy.handleSupervisorFailing 方法 (缺省实现为挂起所有的子actor)
> 1. 调用preRestart方法,从上面的源码可以看出来,preRestart方法将所有的children Stop掉了,并调用postStop回收资源
> 1. 调用旧实例的supervisionStrategy.handleSupervisorRestarted方法(缺省实现为向所有剩下的子actor发送重启请求)
> 1. 等待所有子actor终止直到 preRestart 最终结束
> 1. 再次调用之前提供的actor工厂创建新的actor实例
> 1. 对新实例调用 postRestart
> 1. 恢复运行新的actor
### 终止Hook
`postStop hook`一般用于回收资源。Actor在被调用postStop之前,会将邮箱中剩下的message处理掉(新的消息变成死信了)。Actor是由UID和Path来唯一标识的,也就是说ActorRef也是通过UID和Path来定位。在Actor被Stop之后,新的Actor是可以用这个Path的,但是旧的ActorRef是不能用的,因为UID不一样。
这个hook保证在该actor的消息队列被禁止后才运行,i.e.之后发给该actor的消息将被重定向到ActorSystem的`deadLetters`中。
postStop的默认形式:
~~~
def postStop(): Unit = ()
~~~
### 各种Hook的顺序关系图解
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f4255e3f.jpg "")
### Akka的actor生命周期示例代码
下面用Kenny类演示生命周期函数的调用顺序:
~~~
import akka.actor._
class Kenny extends Actor {
println("entered the Kenny constructor")
override def preStart: Unit = {
println("kenny: preStart")
}
override def postStop: Unit ={
println("kenny: postStop")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println("kenny: preRestart")
println(s" MESSAGE: ${message.getOrElse("")}")
println(s" REASON: ${reason.getMessage}")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
println("kenny: postRetart")
println(s" REASON: ${reason.getMessage}")
super.postRestart(reason)
}
def receive = {
case ForceRestart => throw new Exception("Boom!")
case _ => println("Kenny received a message")
}
}
case object ForceRestart
object LifecycleDemo extends App{
val system = ActorSystem("LifecycleDemo")
val kenny = system.actorOf(Props[Kenny], name="Kenny")
println("sending kenny a simple String message")
kenny ! "hello"
Thread.sleep(1000)
println("make kenny restart")
kenny ! ForceRestart
Thread.sleep(1000)
println("stopping kenny")
system.stop(kenny)
println("shutting down system")
system.shutdown
}
~~~
`pre*`和`post*`方法和actor的构造函数一样,都是用来初始化或关闭actor所需的资源的。
上面的代码中,`preRestart`和`postRestart`调用了父类的函数实现,其中`postRestart`的默认实现中,调用了`preStart`方法。
打印信息:
~~~
sending kenny a simple String message
entered the Kenny constructor
kenny: preStart
Kenny received a message
make kenny restart
kenny: preRestart
MESSAGE: ForceRestart
REASON: Boom!
kenny: postStop
[ERROR] [01/16/2016 21:51:46.584] [LifecycleDemo-akka.actor.default-dispatcher-4] [akka://LifecycleDemo/user/Kenny] Boom!
java.lang.Exception: Boom!
at Examples.Tutorials.Kenny$$anonfun$receive$1.applyOrElse(Test4_LifecycleDemo.scala:24)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at Examples.Tutorials.Kenny.aroundReceive(Test4_LifecycleDemo.scala:4)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
entered the Kenny constructor
kenny: postRetart
REASON: Boom!
kenny: preStart
stopping kenny
shutting down system
kenny: postStop
~~~
### Actor系统中的监管
在Actor系统中说过,**监管描述的是actor之间的关系:监管者将任务委托给下属并对下属的失败状况进行响应。** 当一个下属出现了失败(i.e.抛出一个异常),它自己会将自己和自己所有的下属挂起然后向自己的监管者发送一个提示失败的消息。取决于所监管的工作的性质和失败的性质,监管者可以有4种基本选择:
> 1. 让下属继续执行,保持下属当前的内部状态
> 1. 重启下属,清除下属的内部状态
> 1. 永久地终止下属
> 1. 将失败沿监管树向上传递
重要的是始终要把一个actor视为整个监管树形体系中的一部分,这解释了第4种选择存在的意义(因为一个监管者同时也是其上方监管者的下属),并且隐含在前3种选择中:让actor继续执行同时也会继续执行它的下属,重启一个actor也必须重启它的下属,相似地终止一个actor会终止它所有的下属。被强调的是一个actor的缺省行为是在重启前终止它的所有下属,但这种行为可以用Actor类的`preRestart hook`来重写;对所有子actor的递归重启操作在这个hook之后执行。
每个监管者都配置了一个函数,它将所有可能的失败原因(i.e.Exception)翻译成以上四种选择之一;注意,这个函数并不将失败actor本身作为输入。我们很快会发现在有些结构中这种方式看起来不够灵活,会希望对不同的下属采取不同的策略。在这一点上我们一定要理解监管是为了组建一个递归的失败处理结构。如果你试图在某一个层次做太多事情,这个层次会变得复杂难以理解,这时我们推荐的方法是增加一个监管层次。
Akka实现的是一种叫“父监管”的形式。Actor只能由其它的actor创建,而顶部的actor是由库来提供的——每一个创建出来的actor都是由它的父亲所监管。这种限制使得actor的树形层次拥有明确的形式,并提倡合理的设计方法。必须强调的是这也同时保证了actor们不会成为孤儿或者拥有在系统外界的监管者(被外界意外捕获)。还有,这样就产生了一种对actor应用(或其中子树)自然又干净的关闭过程。
### 生命周期监控(临终看护DeathWatch)
在Akka中生命周期监控通常指的是DeathWatch。
除了父actor和子actor的关系的监控关系,每个actor可能还监视着其它任意的actor。因为actor创建后,它活着的期间以及重启在它的监管者之外是看不到的,所以对监视者来说它能看到的状态变化就是从活着变到死亡。所以监视的目的是当一个actor终止时可以有另一个相关actor做出响应,而监管者的目的是对actor的失败做出响应。
**监视actor通过接收Terminated消息来实现生命周期监控。如果没有其它的处理方式,默认的行为是抛出一个DeathPactException异常。为了能够监听Terminated消息,你需要调用ActorContext.watch(targetActorRef)。调用ActorContext.unwatch(targetActorRed)来取消对目标角色的监听。需要注意的是,Terminated消息的发送与监视actor注册的时间和被监视角色终止的时间顺序无关。例如,即使在你注册的时候目标actor已经死了,你仍然能够收到Terminated消息。** 当监管者不能简单的重启子actor而必须终止它们时,监视将显得非常重要。例如,actor在初始化的时候报错。在这种情况下,它应该监视这些子actor并且重启它们或者稍后再做尝试。
另一个常见的应用案例是,一个actor或者它的子actor在无法获得需要的外部资源时需要失败。如果是第三方通过调用system.stop(child)方法或者发送PoisonPill消息来终止子actor时,监管者也将会受到影响。
### 说明
为了在其它actor结束时(i.e.永久终止,而不是临时的失败和重启)收到通知,actor可以将自己注册为其它actor在终止时所发布的 `Terminated`消息的接收者。这个服务是由actor系统的`DeathWatch`组件提供的。
注册一个监控器的代码:
~~~
import akka.actor.{ Actor, Props, Terminated }
class WatchActor extends Actor {
val child = context.actorOf(Props.empty, "child")
context.watch(child) // <-- 这是注册所需要的唯一调用
var lastSender = system.deadLetters
def receive = {
case "kill" ⇒ context.stop(child); lastSender = sender
case Terminated(`child`) ⇒ lastSender ! "finished"
}
}
~~~
要注意`Terminated`消息的产生与注册和终止行为所发生的顺序无关。多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列,在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为一个已经终止的actor注册监控器会立刻导致`Terminated`消息的发生。
可以使用`context.unwatch(target)`来停止对另一个actor的生存状态的监控,但很明显这不能保证不会接收到`Terminated`消息因为该消息可能已经进入了队列。
### DeathWatch代码示例:
DeathWatch的作用是,当一个actor终止时,你希望另一个actor收到通知。
使用`context.watch()`方法来声明对一个actor的监控。
下面是示例代码:
~~~
import akka.actor._
class Jason extends Actor {
def receive = {
case _ => println("jason got a message")
}
}
class Parent extends Actor {
// start Jason as a child, then keep an eye on it
val jason = context.actorOf(Props[Jason], name="Jason")
context.watch(jason)
def receive = {
case Terminated(jason) => println("OMG, they killed jason")
case _ => println("parent received a message")
}
}
object DeathWatchDemo extends App{
val system = ActorSystem("DeathWatchDemo")
val parentActor = system.actorOf(Props[Parent], name="Parent")
// look up jason, then kill it
println("kill the child actor")
val jasonActor = system.actorSelection("/user/Parent/Jason")
jasonActor ! PoisonPill
Thread.sleep(5000)
println("calling system.shutdown")
system.shutdown
}
~~~
当Jason被杀死后,Parent actor收到`Terminated(jason)`消息。
**转载请注明作者Jason Ding及其出处**
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';
Actor引用
最后更新于:2022-04-01 19:46:50
### Actor系统的实体
在Actor系统中,actor之间具有树形的监管结构,并且actor可以跨多个网络节点进行透明通信。
对于一个Actor而言,其源码中存在`Actor`,`ActorContext`,`ActorRef`等多个概念,它们都是为了描述Actor对象而进行的不同层面的抽象。
我们先给出一个官方的示例图,再对各个概念进行解释。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f41ccb0b.png "")
上图很清晰的展示了一个actor在源码层面的不同抽象,和不同actor之间的父子关系:
Actor类的一个成员`context`是ActorContext类型,ActorContext存储了Actor类的上下文,包括self、sender。
ActorContext还混入了`ActorRefFactory`特质,其中实现了`actorOf`方法用来创建子actor。
这是Actor中context的源码:
~~~
trait Actor {
/**
* Stores the context for this actor, including self, and sender.
* It is implicit to support operations such as `forward`.
*
* WARNING: Only valid within the Actor itself, so do not close over it and
* publish it to other threads!
*
* [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
* [[akka.actor.UntypedActorContext]], which is the Java API of the actor
* context.
*/
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
~~~
ActorCell的`self`成员是ActorRef类型,ActorRef是一个actor的不可变,可序列化的句柄(handle),它可能不在本地或同一个ActorSystem中,它是实现网络空间位置透明性的关键设计。
这是ActorContext中self的源码:
~~~
trait ActorContext extends ActorRefFactory {
def self: ActorRef
~~~
ActorRef的`path`成员是ActorPath类型,ActorPath是actor树结构中唯一的地址,它定义了根actor到子actor的顺序。
这是ActorRef中path的源码:
~~~
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
/**
* Returns the path for this actor (from this actor up to the root actor).
*/
def path: ActorPath
~~~
### Actor引用
Actor引用是ActorRef的子类,它的最重要功能是支持向它所代表的actor发送消息。每个actor通过self来访问它的标准(本地)引用,在发送给其它actor的消息中也缺省包含这个引用。反过来,在消息处理过程中,actor可以通过sender来访问到当前消息的发送者的引用。
### 不同类型的Actor引用
根据actor系统的配置,支持几种不同的actor引用:
> 1. 纯本地引用被配置成不支持网络功能的,这些actor引用发送的消息不能通过一个网络发送到另一个远程的JVM。
> 1. 支持远程调用的本地引用使用在支持同一个jvm中actor引用之间的网络功能的actor系统中。为了在发送到其它网络节点后被识别,这些引用包含了协议和远程地址信息。
> 1. 本地actor引用有一个子类是用在路由(比如,混入了`Router` trait的actor)。它的逻辑结构与之前的本地引用是一样的,但是向它们发送的消息会被直接重定向到它的子actor。
> 1. 远程actor引用代表可以通过远程通讯访问的actor,i.e. 从别的jvm向他们发送消息时,Akka会透明地对消息进行序列化。
> 1. 有几种特殊的actor引用类型,在实际用途中比较类似本地actor引用:
> - `PromiseActorRef`表示一个`Promise`,作用是从一个actor返回的响应来完成,它是由`akka.pattern.ask`调用来创建的
> - `DeadLetterActorRef`是死信服务的缺省实现,所有接收方被关闭或不存在的消息都在此被重新路由。
> - `EmptyLocalActorRef`是查找一个不存在的本地actor路径时返回的:它相当于`DeadLetterActorRef`,但是它保有其路径因此可以在网络上发送,以及与其它相同路径的存活的actor引用进行比较,其中一些存活的actor引用可能在该actor消失之前得到了。
> 1. 然后有一些内部实现,你可能永远不会用上:
> - 有一个actor引用并不表示任何actor,只是作为根actor的伪监管者存在,我们称它为“时空气泡穿梭者”。
> - 在actor创建设施启动之前运行的第一个日志服务是一个伪actor引用,它接收日志事件并直接显示到标准输出上;它就是`Logging.StandardOutLogger`。
### 获得Actor引用
### 创建Actor
一个actor系统通常是在根actor上使用`ActorSystem.actorOf`创建actor,然后使用`ActorContext.actorOf`从创建出的actor中生出actor树来启动的。这些方法返回指向新创建的actor的引用。每个actor都拥有到它的父亲,它自己和它的子actor的引用。这些引用可以与消息一直发送给别的actor,以便接收方直接回复。
### 具体路径查找
另一种查找actor引用的途径是使用`ActorSystem.actorSelection`方法,也可以使用`ActorContext.actorSelection`来在actor之中查询。它会返回一个(未验证的)本地、远程或集群actor引用。向这个引用发送消息或试图观察它的存活状态会在actor系统树中从根开始一层一层从父向子actor发送消息,直到消息到达目标或是出现某种失败,i.e.路径中的某一个actor名字不存在(在实际中这个过程会使用缓存来优化,但相较使用物理actor路径来说仍然增加了开销,因为物理路径能够从actor的响应消息中的发送方引用中获得),这个消息传递过程由Akka自动完成的,对客户端代码不可见。
使用相对路径向兄弟actor发送消息:
~~~
context.actorSelection("../brother") ! msg
~~~
也可以用绝对路径:
~~~
context.actorSelection("/user/serviceA") ! msg
~~~
### 查询逻辑Actor层次结构
由于actor系统是一个类似文件系统的树形结构,对actor的匹配与unix shell中支持的一样:你可以将路径(中的一部分)用通配符(«*» 和«?»)替换来组成对0个或多个实际actor的匹配。由于匹配的结果不是一个单一的actor引用,它拥有一个不同的类型ActorSelection,这个类型不完全支持ActorRef的所有操作。同样,路径选择也可以用ActorSystem.actorSelection或ActorContext.actorSelection两种方式来获得,并且支持发送消息。
下面是将msg发送给包括当前actor在内的所有兄弟actor:
~~~
context.actorSelection("../*") ! msg
~~~
### 与远程部署之间的互操作
当一个actor创建一个子actor,actor系统的部署者会决定新的actor是在同一个jvm中或是在其它的节点上。**如果是在其他节点创建actor,actor的创建会通过网络连接来到另一个jvm中进行,结果是新的actor会进入另一个actor系统。** 远程系统会将新的actor放在一个专为这种场景所保留的特殊路径下。新的actor的监管者会是一个远程actor引用(代表会触发创建动作的actor)。这时,`context.parent`(监管者引用)和`context.path.parent`(actor路径上的父actor)表示的actor是不同的。但是在其监管者中查找这个actor的名称能够在远程节点上找到它,保持其逻辑结构,e.g.当向另外一个未确定(unresolved)的actor引用发送消息时。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f41e4935.png "")
因为设计分布式执行会带来一些限制,最明显的一点就是所有通过电缆发送的消息都必须可序列化。虽然有一点不太明显的就是包括闭包在内的远程角色工厂,用来在远程节点创建角色(即Props内部)。
另一个结论是,要意识到所有交互都是完全异步的,它意味着在一个计算机网络中一条消息需要几分钟才能到达接收者那里(基于配置),而且可能比在单JVM中有更高丢失率,后者丢失率接近于0(还没有确凿的证据)。
### Akka使用的特殊路径
在路径树的根上是根监管者,所有的的actor都可以从通过它找到。在第二个层次上是以下这些:
> - `"/user"`是所有由用户创建的顶级actor的监管者,用`ActorSystem.actorOf`创建的actor在其下一个层次 are found at the next level。
> - `"/system"` 是所有由系统创建的顶级actor(如日志监听器或由配置指定在actor系统启动时自动部署的actor)的监管者。
> - `"/deadLetters"` 是死信actor,所有发往已经终止或不存在的actor的消息会被送到这里。
> - `"/temp"`是所有系统创建的短时actor(i.e.那些用在ActorRef.ask的实现中的actor)的监管者。
> - `"/remote"` 是一个人造的路径,用来存放所有其监管者是远程actor引用的actor。
### 附录-Actor模型概述:
Actor模型为编写并发和分布式系统提供了一种更高的抽象级别。它将开发人员从显式地处理锁和线程管理的工作中解脱出来,使编写并发和并行系统更加容易。Actor模型是在1973年Carl Hewitt的论文中提的,但是被Erlang语言采用后才变得流行起来,一个成功案例是爱立信使用Erlang非常成功地创建了高并发的可靠的电信系统。
### Actor的树形结构
像一个商业组织一样,actor自然会形成树形结构。程序中负责某一个功能的actor可能需要把它的任务分拆成更小的、更易管理的部分。为此它启动子Actor并监管它们。要知道每个actor有且仅有一个监管者,就是创建它的那个actor。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-22_56ca7f4210b71.png "")
**Actor系统的精髓在于任务被分拆开来并进行委托,直到任务小到可以被完整地进行处理。** 这样做不仅使任务本身被清晰地划分出结构,而且最终的actor也能按照它们“应该处理的消息类型”,“如何完成正常流程的处理”以及“失败流程应如何处理”来进行解析。如果一个actor对某种状况无法进行处理,它会发送相应的失败消息给它的监管者请求帮助。这样的递归结构使得失败能够在正确的层次进行处理。
可以将这与分层的设计方法进行比较。分层的设计方法最终很容易形成防御性编程,以防止任何失败被泄露出来。把问题交由正确的人处理会是比将所有的事情“藏在深处”更好的解决方案。
现在,设计这种系统的难度在于如何决定谁应该监管什么。这当然没有一个唯一的最佳方案,但是有一些可能会有帮助的原则:
> - 如果一个actort管理另一个actor所做的工作,如分配一个子任务,那么父actor应该监督子actor,原因是父actor知道可能会出现哪些失败情况,知道如何处理它们。
> - 如果一个actor携带着重要数据(i.e. 它的状态要尽可能地不被丢失),这个actor应该将任何可能的危险子任务分配给它所监管的子actor,并酌情处理子任务的失败。视请求的性质,可能最好是为每一个请求创建一个子actor,这样能简化收集回应时的状态管理。这在Erlang中被称为“Error Kernel Pattern”。
> - 如果actor A需要依赖actor B才能完成它的任务,A应该观测B的存活状态并对收到B的终止提醒消息进行响应。这与监管机制不同,因为观测方对监管机制没有影响,需要指出的是,仅仅是功能上的依赖并不足以用来决定是否在树形监管体系中添加子actor。
### Actor实体
一个Actor是一个容器,它包含了 状态,行为,一个邮箱,子Actor和一个监管策略。所有这些包含在一个Actor引用里。
#### 状态
Actor对象通常包含一些变量来反映actor所处的可能状态。这可能是一个明确的状态机,或是一个计数器,一组监听器,待处理的请求,等等。这些数据使得actor有价值,并且必须将这些数据保护起来不被其它的actor所破坏。
**好消息是在概念上每个Akka actor都有它自己的轻量线程,这个线程是完全与系统其它部分隔离的。这意味着你不需要使用锁来进行资源同步,可以完全不必担心并发性地来编写你的actor代码。**
> 在幕后,Akka会在一组线程上运行一组Actor,通常是很多actor共享一个线程,对某一个actor的调用可能会在不同的线程上进行处理。Akka保证这个实现细节不影响处理actor状态的单线程性。
由于内部状态对于actor的操作是至关重要的,所以状态不一致是致命的。当actor失败并由其监管者重新启动,状态会进行重新创建,就象第一次创建这个actor一样。这是为了实现系统的“自愈合”。
#### 行为
每次当一个消息被处理时,消息会与actor的当前的行为进行匹配。行为是一个函数,它定义了处理当前消息所要采取的动作,例如如果客户已经授权过了,那么就对请求进行处理,否则拒绝请求。
#### 邮箱
Actor的用途是处理消息,这些消息是从其它的actor(或者从actor系统外部)发送过来的。连接发送者与接收者的纽带是actor的邮箱:每个actor有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。排队按照发送操作的时间顺序来进行,这意味着从不同的actor发来的消息在运行时没有一个固定的顺序,这是由于actor分布在不同的线程中。从另一个角度讲,从同一个actor发送多个消息到相同的actor,则消息会按发送的顺序排队。
可以有不同的邮箱实现供选择,缺省的是FIFO:actor处理消息的顺序与消息入队列的顺序一致。这通常是一个好的选择,但是应用可能需要对某些消息进行优先处理。在这种情况下,可以使用优先邮箱来根据消息优先级将消息放在某个指定的位置,甚至可能是队列头,而不是队列末尾。如果使用这样的队列,消息的处理顺序是由队列的算法决定的,而不是FIFO。
Akka与其它actor模型实现的一个重要差别在于当前的行为必须处理下一个从队列中取出的消息,Akka不会去扫描邮箱来找到下一个匹配的消息。无法处理某个消息通常是作为失败情况进行处理,除非actor覆盖了这个行为。
#### 子Actor
每个actor都是一个潜在的监管者:如果它创建了子actor来委托处理子任务,它会自动地监管它们。子actor列表维护在actor的上下文中,actor可以访问它。对列表的更改是通过`context.actorOf(...)`创建或者`context.stop(child)`停止子actor来实现,并且这些更改会立刻生效。实际的创建和停止操作在幕后以异步的方式完成,这样它们就不会“阻塞”其监管者。
#### 监督策略
Actor的最后一部分是它用来处理其子actor错误状况的机制。错误处理是由Akka透明地进行处理的。由于策略是actor系统组织结构的基础,所以一旦actor被创建了它就不能被修改。
考虑对每个actor只有唯一的策略,这意味着如果一个actor的子actor们应用了不同的策略,这些子actor应该按照相同的策略来进行分组,生成中间的监管者,又一次倾向于根据任务到子任务的划分来组织actor系统的结构。
**转载请注明作者Jason Ding及其出处**
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';
Akka入门编程实例
最后更新于:2022-04-01 19:46:48
### 引言
这篇文章主要是第一次学习Akka编程,先试试水,探探坑,对Akka和SBT的使用有一个直观的了解,以几个简单的akka编程实例来说明akka的使用。希望在日后的学习和编程中,能有更多自己的体会和经验总结来分享。
### Actor模型
Actor实例可以想象成是服务器上的Web服务,你无法控制,只能通过发送消息去请求执行任务或查询信息,而不能直接在Web服务中修改状态或者处理资源。通过发送不可改变的消息,虽然看上去有些限制,但是可以很简单安全的编写并发程序。
### Actor系统的形象理解
一个actor是基于Actor系统的最小单元,就像面向对象系统中的对象实例一样,它也封装了状态和行为。我们无法窥探actor内部的信息,只能通过发送消息来请求状态信息(就像是问一个人,他感觉如何)。actor中有一个存放不可变状态信息的信箱。我们通过发送信息和actor进行通信,当actor收到信息之后,它会运用相关算法来处理具体的信息。
在一个应用程序中,多个actor构成了一套层级系统,像是一个家族或者一个商业组织。一个actor可以认为是一个商业组织的个人。一个actor有一个父亲,称为监督者(supervisor),还有好多孩子,可以认为,在一个商业组织中,主席(actor)下面有多个副主席,副主席也有很多下属随从。
Actor系统的最佳实践是“**委派任务**”,尤其是当actor的行为被阻塞的时候。可以想象,在实际商业活动中,主席将要做的工作分配给下面的几个副主席去分别执行,而副主席也会将子任务分配给自己的随从,直到该任务被下属们执行完毕。
### 处理故障
Actor模型的一个重要内容是处理故障。在工作工程中,如果出现错误或者抛出异常,actor和其子actor都将暂停,然后发送一条信息给监督者(supervisor)actor,报告出现故障的信号。
根据工作任务和故障的性质,监督者actor将会作出几种选择:
> - 恢复下属actor,保留内部状态
> - 重启下属actor,清空状态
> - 终止下属actor
> - 上报故障
### Hello,Actor实例
现在我用一个最简单的actor编程实例来介绍akka编程,先给出代码:
~~~
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class HelloActor extends Actor{
def receive = {
case "hello" => println("hello back to you.")
case _ => println("huh?")
}
}
object Test1_HelloActor extends App {
// actor need an ActorSystem
val system = ActorSystem("HelloSystem")
// create and start the actor
val helloActor = system.actorOf(Props[HelloActor], name="helloActor")
// send two messages
helloActor ! "hello"
helloActor ! "what"
// shutdown the actor system
system.shutdown
}
~~~
代码注解:
> - Actor由HelloActor定义
> - HelloActor的行为有receive方法定义实现,其中使用了模式匹配表达式
> - HelloActor接收字符串`hello`作为消息,做出相应打印动作
> - Test1_HelloActor的object用来测试actor
> - ActorSystem接收一个name参数,并且通过`system.actorOf`创建actor实例
> - 创建Actor实例名为helloActor,其构造函数没有参数
> - Actor创建后自动运行,不需调用start或者run方法
> - 通过`!`方法来发送消息
### ActorSystem
一个actor system是actors的层级集团,分享公共配置信息(比如分发器dispatchers,部署deployments,远程功能remote capabilities,地址addresses)。它同时也是创建和查询actors的入口。ActorSystem是为你的应用程序分配线程资源的结构。
### ActorRef
当你调用`ActorSystem`的`actorOf`方法时,将创建并返回一个`ActorRef`的实例:
`def actorOf(props: Props, name: String): ActorRef`。
这个引用用来处理actor,你可以将其看做是处理实际actor的代理人(broker)或包装外观(facade)。ActorRef防止你破坏Actor模型,比如直接处理Actor实例,或直接修改Actor实例中的变量。所以只能通过给actor发送消息方式来执行任务,这种“袖手旁观(不干涉,hands-off)”的方法帮助巩固适宜的编程实践。
ActorRef有以下特点:
> - 它是不可变的
> - 它与actor实体是一对一的关系
> - 它是可序列化的,网络可感知的。这使得你可以在网络环境中传送一个ActorRef
### Actor之间的通信实例
下面给出的是两个actor实例相互发送消息进行通信的PingPong示例:
~~~
import akka.actor._
case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage
class Ping(pong: ActorRef) extends Actor{
var count = 0
def incrementAndPrint {count += 1; println(s"$count:ping")}
def receive = {
case StartMessage =>
incrementAndPrint
pong ! PongMessage
case PingMessage =>
incrementAndPrint
if(count > 99) {
sender ! StopMessage
println("ping stopped")
context.stop(self)
}
else
sender ! PongMessage
case _ => println("Ping got unexpected information")
}
}
class Pong extends Actor {
var count = 0
def receive = {
case StopMessage =>
println("pong stopped")
context.stop(self)
case PongMessage =>
count += 1
println(s"$count:pong")
sender ! PingMessage
case _ => println("Pong got unexpected information")
}
}
object PingPangTest extends App{
val system = ActorSystem("PingPongTest")
val pongActor = system.actorOf(Props[Pong], name="pong")
val pingActor = system.actorOf(Props(new Ping(pongActor)),
name = "ping")
pingActor ! StartMessage
}
~~~
代码注释:
> - 创建`ActorSystem`之后;
> - 创建`Pong`的actor实例(pongActor对象其实是`ActorRef`的实例);
> - 之后创建`Ping`的actor实例,其构造函数接受`ActorRef`参数;
> - 通过给`pingActor`发送一个`StartMessage`消息来启动pingActor和pongActor的具体动作;
> - `Ping` Actor和`Pong` Actor通过PingMessage和PongMessage相互发送消息,`sender`用来引用消息发送源Actor;
> - `Ping`通过计数,知道进行了100次消息的发送之后,发送StopMessage来终止actor。分别调用自己的`context.stop`方法来结束
### 启动Actor
在ActorSystem层面,通过调用`system.actorOf`方法来创建actors;在actor内部,通过调用`context.actorOf`方法来创建子actor。
下面给出一个ParentChild示例:
~~~
import akka.actor._
case class CreateChild (name: String)
case class Name (name: String)
class Child extends Actor {
var name = "No name"
override def postStop: Unit = {
println(s"D'oh! They killed me ($name): ${self.path}")
}
def receive = {
case Name(name) => this.name = name
case _ => println(s"Child $name got message.")
}
}
class Parent extends Actor {
def receive = {
case CreateChild(name) =>
// Parent creates a new Child here
println(s"Parent about to create Child ($name) ...")
val child = context.actorOf(Props[Child], name=s"$name")
child ! Name(name)
case _ => println(s"Parent got some other message.")
}
}
object ParentChildDemo extends App{
val actorSystem = ActorSystem("ParentChildTest")
val parent = actorSystem.actorOf(Props[Parent], name="Parent")
// send messages to Parent to create to child actors
parent ! CreateChild("XiaoMing")
parent ! CreateChild("XiaoLiang")
Thread.sleep(500)
// lookup XiaoMing, the kill it
println("Sending XiaoMing a PoisonPill ... ")
val xiaoming = actorSystem.actorSelection("/user/Parent/XiaoMing")
xiaoming ! PoisonPill
println("XiaoMing was killed")
Thread.sleep(5000)
actorSystem.shutdown
}
~~~
打印结果:
~~~
Parent about to create Child (XiaoMing) ...
Parent about to create Child (XiaoLiang) ...
Sending XiaoMing a PoisonPill ...
XiaoMing was killed
D'oh! They killed me (XiaoMing): akka://ParentChildTest/user/Parent/XiaoMing
D'oh! They killed me (XiaoLiang): akka://ParentChildTest/user/Parent/XiaoLiang
~~~
### 终止Actor
在ActorSystem层面,通过`system.stop(actorRef)`来终止一个actor;在actor内部,使用`context.stop(actorRef)`来结束一个actor。
如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem的`dead letter mailbox`, 但是这取决于邮箱的实现。
### actor终止的相关处理
actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己(调用postStop,销毁邮箱,向DeathWatch发布Terminated,通知其监管者)。这个过程保证actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应(i.e.由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。
`postStop()` hook是在actor被完全终止以后调用的。这是为了清理资源:
~~~
override def postStop() = {
// 关闭文件或数据库连接
}
~~~
### PoisonPill和gracefulStop
还有其他两种方式,发送`PoisonPill`消息或者使用`gracefulStop`终止。
你也可以向actor发送`akka.actor.PoisonPill`消息,这个消息处理完成后actor会被终止。PoisonPill与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。
如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop。
下面给出gracefulStop的代码示例:
~~~
import akka.actor._
import akka.pattern.gracefulStop
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
case object TestActorStop
class TestActor extends Actor {
def receive = {
case TestActorStop =>
context.stop(self)
case _ => println("TestActor got message")
}
override def postStop {println("TestActor: postStop")}
}
object GracefulStopTest extends App{
val system = ActorSystem("GracefulStopTest")
val testActor = system.actorOf(Props[TestActor], name="TestActor")
// try to stop the actor graceful
try {
val stopped: Future[Boolean] = gracefulStop(testActor, 2 seconds, TestActorStop)
Await.result(stopped, 3 seconds)
println("testActor was stopped")
} catch {
case e: akka.pattern.AskTimeoutException => e.printStackTrace
} finally {
system.shutdown
}
}
~~~
`gracefulStop(actorRef, timeout)`将返回一个Future实例,当目标actor有处理相关终止动作的消息时,会执行成功。
上面示例中,通过发送TestActorStop消息来终止actor;如果没有处理终止的工作,当超过2s后,Future抛出`akka.pattern.AskTimeoutException`异常。默认情况下,gracefulStop将发送PoisonPill消息。
### Kill消息
当深入Akka actors,我们将认识监督者策略(supervisor strategies)概念。当实现了监督者策略,向actor发送一个`Kill`消息,这可以用来重新启动actor。如果使用默认的监督者策略,Kill消息将终止目标actor。
下面是示例代码:
~~~
import akka.actor._
class Number5 extends Actor {
def receive = {
case _ => println("Number 5 got a message")
}
override def preStart { println("Number 5 is alive")}
override def postStop { println("Number 5::postStop called")}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println("Number 5::preRestart called")
}
override def postRestart(reason: Throwable): Unit = {
println("Number 5::postRestart called")
}
}
object KillTest extends App{
val system = ActorSystem("KillTestSystem")
val number5 = system.actorOf(Props[Number5], name="Number5")
number5 ! "hello"
number5 ! Kill
system.shutdown
}
~~~
打印的信息:
~~~
Number 5 is alive
Number 5 got a message
[ERROR] [01/17/2016 19:20:09.342] [KillTestSystem-akka.actor.default-dispatcher-3] [akka://KillTestSystem/user/Number5] Kill (akka.actor.ActorKilledException)
Number 5::postStop called
~~~
**转载请注明作者Jason Ding及其出处**
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';
前言
最后更新于:2022-04-01 19:46:46
> 原文出处:[Akka编程实战](http://blog.csdn.net/column/details/akka.html)
作者:[jasonding1354](http://blog.csdn.net/jasonding1354)
**本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!**
# Akka编程实战
> 该专题介绍基于Scala的Akka分布式编程实战,结合Akka的基本概念和编程实例,让读者更好了解和学习Akka编程。
';