流
依赖
为了使用 Akka 流类型,你需要将以下依赖添加到你的项目中:
<!-- Maven -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-typed_2.12</artifactId>
<version>2.5.23</version>
</dependency>
<!-- Gradle -->
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.12', version: '2.5.23'
}
<!-- sbt -->
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.23"
简介
「Akka Streams」使对类型安全的消息处理管道建模变得容易。对于类型化的 Actors,可以在不丢失类型信息的情况下将流连接到 Actors。
此模块包含现有ActorRef
源的类型化替换,以及「ActorMaterializerFactory」的工厂方法,后者采用类型化ActorSystem
。
从这些工厂方法和源可以与来自原始模块的原始 Akka 流构建块混合和匹配。
- 注释:此模块已准备好用于生产,但仍标记为「可能更改」。这意味着 API 或语义可以在没有警告的情况下进行更改,但这些更改将在 Akka 2.6.0 中收集并执行,而不是在 2.5.x 补丁版本中执行。
Actor Source
发送到特定 Actor 的消息驱动的流可以使用「ActorSource.actorRef」启动。此源具体化为类型化的ActorRef
,它只接受与流类型相同的消息。
import akka.actor.typed.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;
interface Protocol {}
class Message implements Protocol {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Exception ex;
public Fail(Exception ex) {
this.ex = ex;
}
}
final JavaPartialFunction<Protocol, Throwable> failureMatcher =
new JavaPartialFunction<Protocol, Throwable>() {
public Throwable apply(Protocol p, boolean isCheck) {
if (p instanceof Fail) {
return ((Fail) p).ex;
} else {
throw noMatch();
}
}
};
final Source<Protocol, ActorRef<Protocol>> source =
ActorSource.actorRef(
(m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail());
final ActorRef<Protocol> ref =
source
.collect(
new JavaPartialFunction<Protocol, String>() {
public String apply(Protocol p, boolean isCheck) {
if (p instanceof Message) {
return ((Message) p).msg;
} else {
throw noMatch();
}
}
})
.to(Sink.foreach(System.out::println))
.run(mat);
ref.tell(new Message("msg1"));
// ref.tell("msg2"); Does not compile
Actor Sink
有两个sink
可接受类型化的ActorRef
。要将流中的所有消息发送给 Actor 而不考虑反压力(backpressure
),请使用「ActorSink.actorRef」。
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;
interface Protocol {}
class Message implements Protocol {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Throwable ex;
public Fail(Throwable ex) {
this.ex = ex;
}
}
final ActorRef<Protocol> actor = null;
final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(actor, new Complete(), Fail::new);
Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);
为了使 Actor 能够对反压力作出反应,需要在 Actor 和流之间引入一个协议。使用「ActorSink.actorRefWithAck」可以在 Actor 准备接收更多元素时发出需求信号。
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;
class Ack {}
interface Protocol {}
class Init implements Protocol {
private final ActorRef<Ack> ack;
public Init(ActorRef<Ack> ack) {
this.ack = ack;
}
}
class Message implements Protocol {
private final ActorRef<Ack> ackTo;
private final String msg;
public Message(ActorRef<Ack> ackTo, String msg) {
this.ackTo = ackTo;
this.msg = msg;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Throwable ex;
public Fail(Throwable ex) {
this.ex = ex;
}
}
final ActorRef<Protocol> actor = null;
final Sink<String, NotUsed> sink =
ActorSink.actorRefWithAck(
actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);
Source.single("msg1").runWith(sink, mat);
英文原文链接:Streams.