Building stream combinators
Wednesday, 2025-04-16 Comments
How to add functionality to asynchronous Rust by building your own stream combinators.
Previous post: Role of coroutines.
Introduction
What if you know how to use streams, but you need some kind of functionality that is not in the StreamExt
trait from futures
or the standard library?
In that case, you might try an imperative approach and create the stream using the unfold
function. This is a solution for simple cases, but it does not generalise well. Disambiguating between different parts of intermediate stream state quickly becomes difficult.
You might also want to try using imperative design patterns like loops, channels, spawns and generic functions. This approach quickly becomes unmaintainable because of its complexity.
Invalidation through moves
Remark: the following sections will be about Unpin
. I wrote about it because I needed it later on to understand better how to construct combinators. You can skip this chapter if you want (or look at the official documentation).
The physical location of variables used in your code may move throughout the lifetime of your program. In Rust, for example, it is common to move a variable through an assignment. A variable that is called by value by a function is moved (literally and conceptually) into the the function. The function takes conceptual ownership. As far as I know, this is called the move semantics of Rust.
However, through this conceptual/semantical moving process, a physical move of the data in the registers of the CPU or other parts of memory may also occur. This is dangerous when the data being moved is self-referencing.
For more information, see std::pin
.
Clearing up Unpin
The confusing aspect of Pin
and Unpin
in Rust, is that it is not Pin
which is the first that you should understand, but it is Unpin
.
The naming of Unpin
makes it seem like it is some counterpart to Pin
. However, it is not.
Unpin
is an auto-trait which means that the compiler derives Unpin
for everything that it deems safe to move. This is done at compile-time and behind the scene by the compiler for every “auto-trait”.
Cannot be moved / !Unpin
Anything that looks like it cannot be moved by the compiler, will be marked automatically as !Unpin
, not Unpin
, or un-moveable. The reasoning by the compiler is that it necessary to detect when some kind of data type would be invalidated by a move and prevent dangerous actions by users (programmers implementing async functionality).
Examples of !Unpin
Rust data-types:
-
generators:
- normal generators written with
gen
-blocks or macros - async generators written with the macro
async_stream::stream! {}
(implementing theStream
trait)
- normal generators written with
-
self-referencing data that is code-generated by the compiler
- state machines generated by
async {}
blocks - … ?
- state machines generated by
-
self-referencing user data structures:
- naive trees
- strings with slices
-
types with a manual
PhantomPinned
field
Important: The reason that async {}
blocks are !Unpin
is that the compiler is lazy and does not analyze such blocks automatically to check whether they are Unpin
.
Purpose of data-type Pin
Pin
as a data-type has only one requirement: it’s inner type should implement Deref
. In other words, Pin
is a wrapper around pointer / reference-like datatypes.
The concrete type Pin
is, in fact, not a real physical type. It does not represent a different location or address in memory. It is merely a Rust compiler construct that manifests itself as a type available to the users.
The Pin
type is essentially a contract for pointer-like types, maintained by two methods that require the pointed-to (also called “pointee”) to implement Unpin
:
- A constructor
Pin::new()
: takes any ownedUnpin
object (safe-to-move, not!Unpin
) and takes ownership of it. There is no way to get ownership back. You may, however, implement aDrop
implementation which will be run on de-allocation ofPin
. - A mutable getter
Pin::get_mut()
: allows you get mutable access to the contained, pinned,Unpin
value. This allows you to still call methods with the&mut Self
signature on the pinned data.
The reason that the get_mut
method requires the pointee to be Unpin
is that mutable access through a mutable reference can be used to move the content of the Pin
with a function like std::mem::replace
. This could invalidate any pinned otherwise unmoveable !Unpin
type.
Metaphor | Type state | Ownership event |
---|---|---|
undressed | Type | moveable / free |
dress-up | Pin::new(Type) | give up ownership |
dressed-up | Pin<Type> | stuck in memory |
dress-down | Pin::new(Type).get_mut() | acquire edit access |
Conclusion:
Pin
does not pin anything physically at run-time, but relies on the auto-trait!Unpin
to prevent dangerous moves at run-time.
Convert !Unpin
into Unpin
Almost all primitives in Rust are Unpin
. Whenever you encounter something that is not Unpin
(and has not non-'static
references), you can just allocate it on the heap with the function Box::pin
. The Box::pin
is shorthand for Pin::new(Box::new())
.
The result of the Box::pin
is a combination:
- The heap-allocated pinned object will not be moved by any code generated by the Rust compiler;
- We can still drop the pinned object and the
Drop
implementations will run.
Let’s take the following as an example of something that is !Unpin
.
struct MapFut<Fut> where Fut: Future {
fut: Fut,
...
}
A common approach when handling extensions or combinators of futures which are themselves futures, is by projecting Pin<&mut Self>
into &mut Self
. The projection is usually called this
(the self
keyword is reserved).
impl<Fut> Future for MapFut<Fut> {
fn poll(self: Pin<&mut Self>) -> Poll {
// We need to do something with `self.fut`.
// We need `Pin<&mut Self> -> &mut Self`.
// The following wil not compile, because `Self: !Unpin`.
let mut this = self.get_mut();
unimplemented!()
}
}
In this example the user struct MapFut
is !Unpin
, so get_mut
cannot be used. The get_mut
function requires Self: Unpin
as mentioned in the previous section.
The simplest solution is to refine the definition of MapFut
as follows:
struct MapFut<Fut> where Fut: Future {
fut: Pin<Box<Fut>>,
...
}
Now fut
is a pinned pointer to a location on the heap.
Unsafe Pin
methods (optional)
Until now I explicitly avoided mentioning or using the unsafe methods of the Pin
type. Beside Pin::new()
and Pin::get_mut()
, Pin
also has a few unsafe
counterparts.
You might have been tempted by the Rust compiler to use the unsafe
methods because they do not have an Unpin
constraint. But if you go in that direction, you effectively disable automatic and important checks by the compiler at run-time.
The Ready
future
Anything can be turned into a future using the Ready
future. This future takes the source and puts it inside an Option
.
pub struct Ready<T>(Option<T>);
This future never has to wake up, because it yields a value immediately and is read on the first poll
call. This is why the context (and its waker) are ignored.
impl<T> Future for Ready<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().expect("Ready polled after completion"))
}
}
To use this future, you can directly call the constructor Ready::new()
or you can call ready()
:
assert_eq!(1, ready(1).await)`;
Remark: .await
automatically converts everything in a future with IntoFuture
, so it might not even be necessary to use Ready
explicitly.
Simple stream combinators
Definition of a stream
A stream is an future that may be polled more than once and yield more than one Poll::Ready
value.
The official definition is a trait:
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Notice the similary with the Future
trait.
Important: As of April 2025, the Stream
trait is not yet in stable Rust.
Futures as streams with Once
Every future Fut
can be converted into a stream by returning Some(Fut::Output)
:
pub struct Once<Fut> {
future: Option<Fut>
}
impl<Fut: Future> Stream for Once<Fut> {
type Item = Fut::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let v = match this.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
this.future.set(None);
Poll::Ready(Some(v))
}
}
Here, ready!
converts Poll
into Option
.
Functional building block Map
Let’s look at a simple example from the “semi-standard” library crate futures
: the map
combinator. This combinator just maps the items of an input stream and returns an new output stream (while consuming the input stream.)
The combinator’s source code looks like this:
pub struct Map<St, F> {
#[pin]
stream: St,
f: F,
}
impl<St, F> Stream for Map<St, F>
where
St: Stream,
F: FnMut<St::Item>,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { ... }
}
Ignore the field attribute pin
for now.
The definition of map
and all the behaviour that we expect from a map
function essentially boils down to storing two things:
- The original input stream.
- The function or closure that maps elements of the input stream.
Then the authors of futures
had to implement the single required method of the Stream
trait: poll_next
:
impl<St, F> Stream for Map<St, F>
where
St: Stream,
F: FnMut<St::Item>,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let res = ready!(this.stream.as_mut().poll_next(cx));
Poll::Ready(res.map(|x| this.f.call_mut(x)))
}
}
At the beginning of the poll_next
function, only a Pin<&mut Self>
is given. It has to be “undressed” to a mutable reference to the state object Map
. This happens through a process called projection, which is essentially just calling get_mut
on Pin
.
Projecting with pin_project
(optional)
Almost any library I encountered uses the pin_project
crate which provides the #[pin]
field attribute that I asked you to ignore. The pin_project
crate takes care of two things:
- You don’t have to call
Box::pin
yourself orpin_mut!
inside the constructor function of your aggregated stream. - You get a
project
function which allows you to callas_mut()
(equivalent toget_mut()
).
Remark: Internally (if I read the code correctly), pin_project
makes use of several unsafe
function calls of the Pin
type. This is not that strange, considering that most of the standard library is written in unsafe
. On the other hand, using Box::pin
will not have a significant performance impact and it does not require a macro from an external crate.
Stream combinators
I will now focus on more complicated stream combinators. These are functions that take one or more input streams and output an output stream. I will call such higher-order functions stream combinators from now on.
Remark: The futures-rx
crate is a good crate to look for if you want to save time and just use combinators made by someone else.
Flattening nested streams
The futures
crate has several types of flatten functions for nested streams. Nested streams are Rust data types S
with a trait bound S: Stream<Item: Stream>
.
You could separate flatten combinators in two categories: sequential or concurrent.
Sequential flatten combinators will never yield values from multiple inner streams at the same time.
flatten
: flattens the outer stream by pasting the output from the inner streams consecutively. This is usually not what you want, since most streams are infinite and you don’t want to block the outer stream.- A “forgetful” flatten (also called
switch
in RxJs): a kind of flatten of nested streams that only polls the most recent stream that arrived on the outer stream. This is implemented byswitch_map
infutures-rx
.
Concurrent flatten combinators will combine values from multiple inner streams:
- Concurrent
flatten_unordered(None)
: flattens by merging as many inner streams as possible, as they arrive on the outer stream. This might seems like a useful function, but it often not what you want. - Buffered concurrent
flatten_unordered(Some(N))
: flattens up-to N different inner streams.
Keep in mind that you can also use the select_all
function from futures::stream
to flatten in the situation where your outer stream is not a real stream but a finite iterable of streams.
A stream of Arc
s
Sometimes you might want to use the items yielded by a single stream in several places simultaneously. You could wrap all the items yielded by the input stream inside a shared reference Arc
. The disadvantage of this approach is that you need (a limited form of) garbage cleaning by reference counting.
The futures-rx
crate exposes a share
method that can be used for splitting an input stream.
Looking at the docs, there is an example:
use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.share();
let sub_stream_a = stream.clone().map(|event| *event);
let sub_stream_b = stream.clone().map(|event| *event);
assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);
The event
s yielded by the shared stream seem to be wrappers around Arc<usize>
. Since usize
is Copy
, *event
is treated as &usize
and the referenced usize
automically cloned.
Remark: Maybe I am wrong?
A stream of clones
The share
operator on streams from futures-rx
seems to put all values on the input stream inside a reference-counted Arc
.
I wanted to derive several output streams from the input stream that yielded clones directly without reference counting.
Remark: Maybe I could just have used share
and then cloned every element?
I decided to implement a separate stream cloning trait in a crate clone-stream
. The high-level API of this crate is quite simple. It is just a call to fork
and then stream.clone()
:
use clone_stream::ForkStream;
use futures::{FutureExt, StreamExt, stream};
let non_clone_stream = stream::iter(0..10);
let clone_stream = non_clone_stream.fork();
let mut cloned_stream = clone_stream.clone();
Remark: There exist a few other crates that are similar.
Stucture of the clone-stream
crate
I called the shared state of all the clones of an input stream Fork
. The output streams that can be cloned are called CloneStream
. The input stream is called BaseStream
.
The fork is a combination of a queue of unprocessed items and a map of the state machines of the CloneStream
s:
struct Fork<BaseStream>
where BaseStream: Stream<Item: Clone>
{
base_stream: Pin<Box<BaseStream>>,
queue: BTreeMap<usize, Option<BaseStream::Item>>,
clones: BTreeMap<usize, CloneState>,
next_clone_index: usize,
next_queue_index: usize,
}
Every CloneStream
registers at the shared Fork
and receives a unique identifier.
fn register(&mut self) -> usize {
let min_available = self.next_clone_index;
self.clones.insert(min_available, CloneState::default());
self.next_clone_index += 1;
min_available
}
Then Fork
creates an entry for the CloneStream
in a map containing the states of all CloneStream
. The default entry is just NeverPolled
.
As soon as the CloneStream
is polled, the poll call is forwarded to the Fork
together with the waker provided by the async runtime. The Fork
will look at the current state of the state machine associated to CloneStream
by using it’s idea. Depending on the state it will:
- Pop an item from a shared item queue and return it to
CloneStream
.When there are still otherCloneStream
waiting and sleeping, just clone from the heap. - Poll the input stream and return pending
- Poll the input stream, receive ready, push on queue if other
CloneStream
s are sleeping, return item.
After each step, the state machine of the CloneStream
is advanced by the Fork
in its map.
General approach building combinators
Usually, a combinator has some kind of internal state. You can choose between:
- using an implicit state defined with
unfold
or - creating a new data type that represents the state explicitly.
If you decide on the last approach, you can use the “state” data type as a handle for useful helper methods.
Do not forget to give the helper methods (where appropriate) a Waker
argument. This argument can be used by the helper methods to store the waker in case some source is unavailable and wake up the relevant sleeping tasks in case the source becomes ready in future.
Only the latest Waker
passed to poll
is stored (at least for most futures in the wild). If you want multiple tasks to be woken from sleep by the latest passed waker, you have to build your own wrapper waker that will wake all relevant sleeping tasks.
To make your own waker you can use the unsafe
RawWaker
, but that requires you to specify low-level behaviour that is already provided by any asynchronous runtime. Instead, create a simple struct definition that contains the wakers you need and implement the Wake
trait for that struct.
For example:
struct SleepWaker {
wakers: Vec<Waker>,
}
impl Wake for SleepWaker {
fn wake(self: Arc<Self>) {
self.wakers.iter().for_each(Waker::wake_by_ref);
}
}
After writing all your helper methods (that operate on some shared internal state), the only task that remains is to implement the poll_next
method for the Stream
trait. The body of this implementation can then make use of the helper functions on the “state” data type.
As you can see in clone-stream
, the poll_next
-method of the output CloneStream
s just calls methods defined previously on the internal shared object (the Fork
):
impl<BaseStream> Stream for CloneStream<BaseStream>
where
BaseStream: Stream<Item: Clone>,
{
type Item = BaseStream::Item;
fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
let waker = current_task.waker();
let mut fork = self.fork.write().unwrap();
fork.poll_clone(self.id, waker)
}
Testing your async combinators
Do not spend time on mocking an asynchronous runtime in your tests, because you will end up building your own buggy runtime which needs its own test suite.
If you do not care much about the ordering of events, you can just use the ThreadPool
from futures
to create a barebones async runtime on which you can spawn async tasks. In this way you can already test a lot of invariants of your homegrown async combinator.
If you are testing the ordering of events, you should use a run-time with a notion of time (the most common one is Tokio). Then you can test your code with time-outs using the select!
-macro. Do not forget to account for situations in which parts of the system are dropped or futures cancelled. See the tests of clone-stream
.
In case you are worried about the size of an additional async runtime dependency, you can add the dependency as a “dev-dependency”-only to keep your core library code light and easy to distribute.