Adapters
Functions that take a Stream
and return another Stream
are often called ‘stream adapters’, as they’re a form of the ‘adapter pattern’. Common stream adapters include map
, take
, and filter
.
Lets update the Mini-Redis so that it will exit. After receiving three messages, stop iterating messages. This is done using take
. This adapter limits the stream to yield at most n
messages.
let messages = subscriber
.into_stream()
.take(3);
Running the program again, we get:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
This time the program ends.
Now, let’s limit the stream to single digit numbers. We will check this by checking for the message length. We use the filter
adapter to drop any message that does not match the predicate.
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);
Running the program again, we get:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
Note that the order in which adapters are applied matters. Calling filter
first then take
is different than calling take
then filter
.
Finally, we will tidy up the output by stripping the Ok(Message { ... })
part of the output. This is done with map
. Because this is applied after filter
, we know the message is Ok
, so we can use unwrap()
.
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
Now, the output is:
got = b"1"
got = b"3"
got = b"6"
Another option would be to combine the filter
and map
steps into a single call using filter_map
.
There are more available adapters. See the list here.