12.10 定义一个Actor任务
最后更新于:2022-04-01 15:39:25
## 问题
You’d like to define tasks with behavior similar to “actors” in the so-called “actor model.”
## 解决方案
The “actor model” is one of the oldest and most simple approaches to concurrency anddistributed computing. In fact, its underlying simplicity is part of its appeal. In a nutshell,an actor is a concurrently executing task that simply acts upon messages sent to it. Inresponse to these messages, it may decide to send further messages to other actors.Communication with actors is one way and asynchronous. Thus, the sender of a messagedoes not know when a message actually gets delivered, nor does it receive a responseor acknowledgment that the message has been processed.Actors are straightforward to define using a combination of a thread and a queue. Forexample:
from queue import Queuefrom threading import Thread, Event
# Sentinel used for shutdownclass ActorExit(Exception):
> pass
class Actor:def __init__(self):self._mailbox = Queue()def send(self, msg):‘''Send a message to the actor‘''self._mailbox.put(msg)def recv(self):
‘''Receive an incoming message‘''msg = self._mailbox.get()if msg is ActorExit:
> raise ActorExit()
return msg
def close(self):‘''Close the actor, thus shutting it down‘''self.send(ActorExit)def start(self):
‘''Start concurrent execution‘''self._terminated = Event()t = Thread(target=self._bootstrap)
t.daemon = Truet.start()
def _bootstrap(self):try:self.run()except ActorExit:passfinally:self._terminated.set()def join(self):self._terminated.wait()def run(self):
‘''Run method to be implemented by the user‘''while True:
> msg = self.recv()
# Sample ActorTaskclass PrintActor(Actor):
> def run(self):while True:msg = self.recv()print(‘Got:', msg)
# Sample usep = PrintActor()p.start()p.send(‘Hello')p.send(‘World')p.close()p.join()
In this example, Actor instances are things that you simply send a message to usingtheir send() method. Under the covers, this places the message on a queue and handsit off to an internal thread that runs to process the received messages. The close()method is programmed to shut down the actor by placing a special sentinel value(ActorExit) on the queue. Users define new actors by inheriting from Actor and re‐defining the run() method to implement their custom processing. The usage of theActorExit exception is such that user-defined code can be programmed to catch thetermination request and handle it if appropriate (the exception is raised by the get()method and propagated).If you relax the requirement of concurrent and asynchronous message delivery, actor-like objects can also be minimally defined by generators. For example:
def print_actor():
while True:
> try:msg = yield # Get a messageprint(‘Got:', msg)except GeneratorExit:print(‘Actor terminating')
# Sample usep = print_actor()next(p) # Advance to the yield (ready to receive)p.send(‘Hello')p.send(‘World')p.close()
## 讨论
Part of the appeal of actors is their underlying simplicity. In practice, there is just onecore operation, send(). Plus, the general concept of a “message” in actor-based systemsis something that can be expanded in many different directions. For example, you couldpass tagged messages in the form of tuples and have actors take different courses ofaction like this:
class TaggedActor(Actor):def run(self):while True:tag, [*](#)payload = self.recv()getattr(self,'[do_](#)‘+tag)([*](#)payload)
# Methods correponding to different message tagsdef do_A(self, x):
> print(‘Running A', x)
def do_B(self, x, y):print(‘Running B', x, y)
# Examplea = TaggedActor()a.start()a.send((‘A', 1)) # Invokes do_A(1)a.send((‘B', 2, 3)) # Invokes do_B(2,3)
As another example, here is a variation of an actor that allows arbitrary functions to beexecuted in a worker and results to be communicated back using a special Result object:
from threading import Eventclass Result:
> def __init__(self):self._evt = Event()self._result = Nonedef set_result(self, value):> self._result = value
> self._evt.set()
def result(self):self._evt.wait()return self._result
class Worker(Actor):def submit(self, func, [*](#)args, [**](#)kwargs):r = Result()self.send((func, args, kwargs, r))return rdef run(self):while True:func, args, kwargs, r = self.recv()r.set_result(func([*](#)args, [**](#)kwargs))
# Example useworker = Worker()worker.start()r = worker.submit(pow, 2, 3)print(r.result())
Last, but not least, the concept of “sending” a task a message is something that can bescaled up into systems involving multiple processes or even large distributed systems.For example, the send() method of an actor-like object could be programmed to trans‐mit data on a socket connection or deliver it via some kind of messaging infrastructure(e.g., AMQP, ZMQ, etc.).