消息调度

Riker的计时器模块提供调度功能,允许在给定的持续时间或特定时间后发送消息。 定时器方法在ActorSystem和Context上都公开。

一次性调度

有两种方法可以提供一次性调度:

  • schedule_once调度在给定延迟之后要发送的消息。
  • schedule_at_time计划在给定的特定时间发送的消息。
  1. let delay = Duration::from_secs(1);
  2. let actor = ctx.actor_of(MyActor::props(), "my-actor").unwrap();
  3. ctx.schedule_once(delay,
  4. actor,
  5. None,
  6. "that's one small step for man".into());

这里的消息计划在20秒后发送给 Actor

  1. let time = SystemTime::now();
  2. let actor = ctx.actor_of(MyActor::props(), "my-actor").unwrap();
  3. ctx.schedule_at_time(time,
  4. actor,
  5. None,
  6. "one giant leap for mankind".into());

这里计划在特定时间发送消息。

重复调度

可以安排消息以特定间隔重复发送:

schedule方法安排以给定间隔重复发送的消息。

  1. let delay = Duration::from_millis(100);
  2. let iterv = Duration::from_millis(500);
  3. let actor = ctx.actor_of(MyActor::props(), "my-actor").unwrap();
  4. ctx.schedule(delay,
  5. interv,
  6. actor,
  7. None,
  8. "a scheduled msg".into());

这里消息被安排为每500毫秒重复一次。 还有100毫秒的初始延迟,即重复消息开始之前的持续时间。

注意 : Riker的默认计时器模块不是持久性的,这意味着当应用程序停止时任何调度都会丢失。 它针对从几毫秒到48小时的短期持续时间或部署之间的平均时间进行了优化。

取消

在调度消息时,返回调度ID,该调度ID可以在以后用于取消调度。

  1. let id = ctx.schedule(delay,
  2. interv,
  3. actor,
  4. None,
  5. "a scheduled msg".into());
  6. ctx.cancel_schedule(id);

取消计划会将其从计时器中删除,并且将不再发送该消息。

消息调度的一些示例用例包括:

  • 等待特定时间让其他参与者提供输入,例如出价系统
  • 作为在超时间隔后提供默认消息的工作流程的一部分
  • 定期唤醒actor以检查资源,例如队列,IO或传感器
  • 定期向其他参与者发布或广播消息消息调度是并发系统的核心功能,可以驱动应用程序完成其目标。

接下来我们将看看持久化的actor状态,这是构建可扩展的数据驱动应用程序的关键。