从冷数据源创建
你可以从各种来源创建 Stream
,包括一个已知值的 Iterable
对象,一个用来作为基础任务流的单一值,或者甚至是来自于诸如 Future
或者 Supplier
这样的块结构。
Streams.just()
Stream<String> st = Streams.just("Hello ", "World", "!"); (1)
st.dispatchOn(Environment.cachedDispatcher()) (2)
.map(String::toUpperCase) (3)
.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (4)
- 从一个已知值创建
Stream
,但并不指定一个默认的调度器(Dispatcher)
。 .dispatchOn(Dispatcher)
告诉Stream
在哪一个线程上执行任务。用这来将任务的执行从一个线程转移到另外一个线程。- 使用常见的约定 —— map() 方法 —— 来对输入进行转化。
- 在管道上产生需求,这意味着“处理开始了”。这是一个对
subscribe(Subscriber)
进行了优化后的快捷方式,默认只请求 Long.MAX_VALUE 次。
冷数据源从一开始就会因为每一个传入
Stream.subscribe(Subscriber
) 的新的 Subscriber 而被重新发放而因此就可能发生重复的消耗。
表5, 创建预先确定的 Stream 和 Promise
工厂方法 | 数据类型 | 作用 |
---|---|---|
Streams.<T>empty() | T |
只在被订阅者请求时,发出一次 onComplete() 。
|
Streams.<T>never() | T | 从不发出任何东西。 对于保持活动状态的行为很有用。 |
Streams.<T, Throwable>fail(Throwable) | T |
只发出 onError(Throwable) 。
|
Streams.from(Future<T>) | T |
在传入的可能会发出 onNext(T) 和 onComplete() ,或者异常时发出 onError(Throwable) 的 Future.get() 上阻止 Subscription.request(long) 。
|
Streams.from(T[]) | T |
每次 Subscription.request(N) 被调用到时发出 N 个 onNext(T) 元素。 如果 N == Long.MAXVALUE, 就发出所有数据。 一旦整个数组都已经被读取了一遍,就发出 onComplete()。
|
Streams.from(Iterable<T>) | T |
每次 Subscription.request(N) 被调用到时发出 N 个 onNext(T) 元素。 如果 N == Long.MAX_VALUE, 就发出所有数据。一旦整个数组都已经被读取了一遍,就发出 onComplete() 。
|
Streams.range(long, _long) | Long |
每次 Subscription.request(N) 被调用到时就发出有 N 个 onNext(Long) 的一个序列。如果 N == Long.MAXVALUE, 就发出所有东西。 一旦读取达到所能包容的上限,就发出 onComplete() 。
|
Streams.just(T, _T, T, T, T, T, T, T) | T |
在只是行为相似的 Streams.from(Iterable) 的一种优化。 用来发送没有和 Streams.from() 签名相冲突的 Iterable, Array 或者 Future 也很有用。
|
Streams.generate(Supplier<T>) | T |
每当 Subscription.request(N) 被调用时就发送从 Supplier.get() 工厂产出的 onNext(T) 。忽略掉要求的数量 N,因为只有一个数据会被发送。当返回一个 null 值时,就发送 onComplete() 。
|
Promises.syncTask(Supplier<T>), Promises.task(, Supplier<T>) | T |
当 Subscription.request(N) 第一次接收时发送一个从 Supplier.get() 产生的 onNext(T) 和 onComplete() 。忽略掉数量 N。
|
Promises.success(T) | T |
无论一个订阅者何时被提供给 Promise.subscribe(Subscriber) ,都发送 onNext(T) 和 onComplete() 。
|
Promises.<T>error(Throwable) | T |
无论一个被订阅了的订阅者何时被提供给 Promise.subscribe(Subscriber) ,都发送 onError(Throwable) 。
|