Actor

Actor模型是处理并发计算的概念模型。 Riker框架的核心是四个主要组件:

  • ActorSystem - 每个Riker应用程序有一个管理actor生命周期的ActorSystem
  • Actor - 实现Actor特征的Rust类型,以便它们可以接收消息
  • Props - 每个Actor都需要一个Props来描述如何创建一个Actor
  • ActorRef - 一种克隆成本低廉的轻量级类型,可用于与其底层Actor交互,例如向其发送消息让我们看看每一个,看看如何创建一个简单的应用程序。

定义Actor

Actor是计算的基本单位。 Actor只能以异步方式通过消息进行通信。 Actor可以根据收到的消息执行三个不同的操作:

  • 将有限数量的消息发送给其他Actor参与者
  • 创造有限数量的新Actor
  • 更改其状态或指定用于接收的下一条消息的行为Actor参与者通过传递消息来相互交互。 对上述行为没有假设的顺序,它们可以同时进行。 并发发送的两条消息可以按任意顺序到达。

要定义一个actor,系统需要理解一个actor应该如何处理它收到的消息。 要做到这一点,只需在数据类型上实现Actor特征,并至少提供一个接收方法。

Rust代码:

  1. struct MyActor;
  2. impl Actor for MyActor {
  3. type Msg = String;
  4. fn receive(&mut self,
  5. ctx: &Context<Self::Msg>,
  6. msg: Self::Msg,
  7. sender: Option<ActorRef<Self::Msg>>) {
  8. println!("received {}", msg);
  9. }
  10. }

在此示例中,一个简单的struct MyActor实现了Actor特征。 当消息发送到MyActor时,系统会将其调度为立即执行。 调用receive函数并将消息打印到stdout

创建Actor

每个应用程序都有一个ActorSystemactor系统提供actor管理和运行时在发送消息时执行actor。 它还提供基本服务,如启动actor和暴露系统服务。

启动actor系统:

  1. let model: DefaultModel<String> = DefaultModel::new();
  2. let sys = ActorSystem::new(&model).unwrap();

在这里,我们看到actor是使用ActorSystem :: new启动的。 但是这个型号是什么? 该模型允许我们配置应用程序中使用的消息类型,并配置用于核心服务的模块。 我们将在本文档后面详细介绍该模型。

一旦我们启动了Actor系统,我们就准备启动一些Actor了。

启动一个Actor

  1. let props = Props::new(Box::new(MyActor::new));
  2. let my_actor = sys.actor_of(props, "my-actor");

每个actor都需要一个包含actor的工厂方法的Props,在本例中是MyActor :: new,以及该方法所需的任何参数。 然后将Propsactor_of一起使用以创建actor的实例。 还需要一个名称,以便我们可以在以后查找,如果需要的话。

虽然这只是两行代码,但很多事情都在幕后发生。 Actor生命周期和状态由系统管理。 当一个actor开始时它会保留这些属性,以防它再次需要它来重启actor,如果它失败了。 当创建一个actor时,它会获得自己的邮箱来接收消息,并且其他感兴趣的actor会被告知有关加入系统的新actor

Actor References

当使用actor_of启动actor时,系统返回对actor的引用,即ActorRef。 实际的actor实例仍然无法访问,其生命周期由系统管理和保护。 在Rust术语中,系统具有并始终维护actor实例的“所有权”。 当你与Actor互动时,你实际上是与Actor的ActorRef进行互动! 这是Actor模型的核心概念。

ActorRef始终引用actor的特定实例。 当同一个Actor的两个实例启动时,它们仍然被认为是单独的actor,每个actor都有不同的ActorRef。

注意 : ActorRef很轻量,可以克隆(它们实现Clone)而不必过多关注资源。 引用也可以在Props中用作另一个actor的工厂方法中的一个字段,一种称为天赋的模式。 ActorRef也是Send,因此它可以作为消息发送给另一个actor。

发送消息

Actor只能通过消息进行通信。 他们是孤立的。 他们永远不会暴露他们的状态或行为。

如果我们想向actor发送消息,我们在actor的ActorRef上使用tell方法:

  1. let my_actor = sys.actor_of(props, "my-actor");
  2. my_actor.tell("Hello my actor!".into(), None);

这里我们向MyActor actor发送了一个String类型的消息。 第二个参数让我们将发件人指定为Option <ActorRef>。 由于我们从主节点发送消息而不是从Actor的接收节点发送消息,因此我们将发送方设置为无。

Riker在处理消息时提供某些保证:

  • 消息传递是“最多一次”: 消息将传递失败或传递一次, 没有重复传递相同的消息。
  • Actor随时处理一条消息
  • 消息存储在actor的邮箱中,以便接收它们

示例

让我们回到我们的MyActor,并将我们目前所见的内容与一个完整的例子结合起来:

Cargo.toml依赖项:

  1. [dependencies]
  2. riker = "0.2.0"
  3. riker-default = "0.2.0"
  1. extern crate riker;
  2. extern crate riker_default;
  3. #[macro_use]
  4. extern crate log;
  5. use std::time::Duration;
  6. use riker::actors::*;
  7. use riker_default::DefaultModel;
  8. struct MyActor;
  9. // implement the Actor trait
  10. impl Actor for MyActor {
  11. type Msg = String;
  12. fn receive(&mut self,
  13. _ctx: &Context<Self::Msg>,
  14. msg: Self::Msg,
  15. _sender: Option<ActorRef<Self::Msg>>) {
  16. debug!("Received: {}", msg);
  17. }
  18. }
  19. // provide factory and props functions
  20. impl MyActor {
  21. fn actor() -> BoxActor<String> {
  22. Box::new(MyActor)
  23. }
  24. fn props() -> BoxActorProd<String> {
  25. Props::new(Box::new(MyActor::actor))
  26. }
  27. }
  28. // start the system and create an actor
  29. fn main() {
  30. let model: DefaultModel<String> = DefaultModel::new();
  31. let sys = ActorSystem::new(&model).unwrap();
  32. let props = MyActor::props();
  33. let my_actor = sys.actor_of(props, "my-actor").unwrap();
  34. my_actor.tell("Hello my actor!".to_string(), None);
  35. std::thread::sleep(Duration::from_millis(500));
  36. }

在这里,我们启动了actor系统和MyActor的一个实例。 最后,我们向Actor发送了一条消息。 你还会注意到我们还提供了一个工厂函数actor()和Props函数props()作为MyActor实现的一部分。

要查看此示例项目,请单击此处

注意:如果actor的工厂方法需要参数,则可以使用Props :: new_args。 有关示例,请参阅Rustdocs。

在此页面上,您学习了使用actor创建Riker应用程序的基础知识。 让我们继续下一节,看看如何使用自己的消息类型: