在并发程序中使用Future
最后更新于:2022-04-01 19:46:55
### 引言
在Akka中, 一个`Future`是用来获取某个并发操作的结果的数据结构。这个操作通常是由Actor执行或由Dispatcher直接执行的. 这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。
Future提供了一种简单的方式来执行并行算法。
### Future直接使用
Future中的一个常见用例是在不需要使用Actor的情况下并发地执行计算。
Future有两种使用方式:
> 1. 阻塞方式(Blocking):该方式下,父actor或主程序停止执行知道所有future完成各自任务。通过`scala.concurrent.Await`使用。
> 1. 非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过`onComplete`、`onSuccess`、`onFailure`方式使用。
### 执行上下文(ExecutionContext)
为了运行回调和操作,Futures需要有一个`ExecutionContext`。
如果你在作用域内有一个`ActorSystem`,它会它自己派发器用作ExecutionContext,你也可以用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹,或者甚至创建自己的实例。
通过导入`ExecutionContext.Implicits.global`来导入默认的全局执行上下文。你可以把该执行上下文看做是一个线程池,ExecutionContext是在某个线程池执行任务的抽象。
如果在代码中没有导入该执行上下文,代码将无法编译。
### 阻塞方式
第一个例子展示如何创建一个future,然后通过阻塞方式等待其计算结果。虽然阻塞方式不是一个很好的用法,但是可以说明问题。
这个例子中,通过在未来某个时间计算1+1,当计算结果后再返回。
~~~
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureBlockDemo extends App{
implicit val baseTime = System.currentTimeMillis
// create a Future
val f = Future {
Thread.sleep(500)
1+1
}
// this is blocking(blocking is bad)
val result = Await.result(f, 1 second)
// 如果Future没有在Await规定的时间里返回,
// 将抛出java.util.concurrent.TimeoutException
println(result)
Thread.sleep(1000)
}
~~~
代码解释:
> 1. 在上面的代码中,被传递给Future的代码块会被缺省的`Dispatcher`所执行,代码块的返回结果会被用来完成`Future`。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
> 1. `Await.result`方法将阻塞1秒时间来等待Future结果返回,如果Future在规定时间内没有返回,将抛出`java.util.concurrent.TimeoutException`异常。
> 1. 通过导入`scala.concurrent.duration._`,可以用一种方便的方式来声明时间间隔,如`100 nanos`,`500 millis`,`5 seconds`、`1 minute`、`1 hour`,`3 days`。还可以通过`Duration(100, MILLISECONDS)`,`Duration(200, "millis")`来创建时间间隔。
### 非阻塞方式(回调方式)
有时你只需要监听`Future`的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用。
通过`onComplete`,`onSuccess`,`onFailure`三个回调函数来异步执行Future任务,而后两者仅仅是第一项的特例。
使用onComplete的代码示例:
~~~
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object FutureNotBlock extends App{
println("starting calculation ...")
val f = Future {
Thread.sleep(Random.nextInt(500))
42
}
println("before onComplete")
f.onComplete{
case Success(value) => println(s"Got the callback, meaning = $value")
case Failure(e) => e.printStackTrace
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(2000)
}
~~~
使用onSuccess、onFailure的代码示例:
~~~
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object Test12_FutureOnSuccessAndFailure extends App{
val f = Future {
Thread.sleep(Random.nextInt(500))
if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
}
f onSuccess {
case result => println(s"Success: $result")
}
f onFailure {
case t => println(s"Exception: ${t.getMessage}")
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(1000)
}
~~~
代码解释:
上面两段例子中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
然后在回调函数中进行相关处理。
### 创建返回Future[T]的方法
先看一下示例:
~~~
import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object ReturnFuture extends App{
implicit val baseTime = System.currentTimeMillis
// `future` method is another way to create a future
// It starts the computation asynchronously and retures a Future[Int] that
// will hold the result of the computation.
def longRunningComputation(i: Int): Future[Int] = future {
Thread.sleep(100)
i + 1
}
// this does not block
longRunningComputation(11).onComplete {
case Success(result) => println(s"result = $result")
case Failure(e) => e.printStackTrace
}
// keep the jvm from shutting down
Thread.sleep(1000)
}
~~~
代码解释:
上面代码中的longRunningComputation返回一个`Future[Int]`,然后进行相关的异步操作。
其中`future`方法是创建一个future的另一种方法。它将启动一个异步计算并且返回包含计算结果的`Future[T]`。
### Future用于Actor
通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息`actor ! msg`,这种方法只在发送者是一个Actor时有效;第二种是通过一个Future。
使用Actor的`?`方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:
~~~
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]
~~~
下面是使用`?`发送消息给actor,并等待回应的代码示例:
~~~
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._
case object AskNameMessage
class TestActor extends Actor {
def receive = {
case AskNameMessage => // respond to the 'ask' request
sender ! "Fred"
case _ => println("that was unexpected")
}
}
object AskDemo extends App{
//create the system and actor
val system = ActorSystem("AskDemoSystem")
val myActor = system.actorOf(Props[TestActor], name="myActor")
// (1) this is one way to "ask" another actor for information
implicit val timeout = Timeout(5 seconds)
val future = myActor ? AskNameMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
println(result)
// (2) a slightly different way to ask another actor for information
val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
val result2 = Await.result(future2, 1 second)
println(result2)
system.shutdown
}
~~~
代码解释:
> 1. `Await.result(future, timeout.duration).asInstanceOf[String]`会导致当前线程被阻塞,并等待actor通过它的应答来完成`Future`。但是阻塞会导致性能问题,所以是不推荐的。致阻塞的操作位于`Await.result`和`Await.ready`中,这样就方便定位阻塞的位置。
> 1. 还要注意actor返回的Future的类型是`Future[Any]`,这是因为actor是动态的。 这也是为什么上例中注释(1)使用了`asInstanceOf`。
> 1. 在使用非阻塞方式时,最好使用`mapTo`方法来将Future转换到期望的类型。如果转换成功,`mapTo`方法会返回一个包含结果的新的 Future,如果不成功,则返回`ClassCastException`异常。
**转载请注明作者Jason Ding及其出处**
[Github博客主页(http://jasonding1354.github.io/)](http://jasonding1354.github.io/)
[GitCafe博客主页(http://jasonding1354.gitcafe.io/)](http://jasonding1354.gitcafe.io/)
[CSDN博客(http://blog.csdn.net/jasonding1354)](http://blog.csdn.net/jasonding1354)
[简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)](http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
**Google搜索jasonding1354进入我的博客主页**
';