Building stream combinators

How to add functionality to asynchronous Rust by building your own stream combinators.

Previous post: Role of coroutines.

Introduction

What if you know how to use streams, but you need some kind of functionality that is not in the StreamExt trait from futures or the standard library?

In that case, you might try an imperative approach and create the stream using the unfold function. This is a solution for simple cases, but it does not generalise well. Disambiguating between different parts of intermediate stream state quickly becomes difficult.

You might also want to try using imperative design patterns like loops, channels, spawns and generic functions. This approach quickly becomes unmaintainable because of its complexity.

Invalidation through moves

Remark: the following sections will be about Unpin. I wrote about it because I needed it later on to understand better how to construct combinators. You can skip this chapter if you want (or look at the official documentation).

The physical location of variables used in your code may move throughout the lifetime of your program. In Rust, for example, it is common to move a variable through an assignment. A variable that is called by value by a function is moved (literally and conceptually) into the the function. The function takes conceptual ownership. As far as I know, this is called the move semantics of Rust.

However, through this conceptual/semantical moving process, a physical move of the data in the registers of the CPU or other parts of memory may also occur. This is dangerous when the data being moved is self-referencing.

For more information, see std::pin.

Clearing up Unpin

The confusing aspect of Pin and Unpin in Rust, is that it is not Pin which is the first that you should understand, but it is Unpin.

The naming of Unpin makes it seem like it is some counterpart to Pin. However, it is not.

Unpin is an auto-trait which means that the compiler derives Unpin for everything that it deems safe to move. This is done at compile-time and behind the scene by the compiler for every “auto-trait”.

Cannot be moved / !Unpin

Anything that looks like it cannot be moved by the compiler, will be marked automatically as !Unpin, not Unpin, or un-moveable. The reasoning by the compiler is that it necessary to detect when some kind of data type would be invalidated by a move and prevent dangerous actions by users (programmers implementing async functionality).

Examples of !Unpin Rust data-types:

Important: The reason that async {} blocks are !Unpin is that the compiler is lazy and does not analyze such blocks automatically to check whether they are Unpin.

Purpose of data-type Pin

Pin as a data-type has only one requirement: it’s inner type should implement Deref. In other words, Pin is a wrapper around pointer / reference-like datatypes.

The concrete type Pin is, in fact, not a real physical type. It does not represent a different location or address in memory. It is merely a Rust compiler construct that manifests itself as a type available to the users.

The Pin type is essentially a contract for pointer-like types, maintained by two methods that require the pointed-to (also called “pointee”) to implement Unpin:

The reason that the get_mut method requires the pointee to be Unpin is that mutable access through a mutable reference can be used to move the content of the Pin with a function like std::mem::replace. This could invalidate any pinned otherwise unmoveable !Unpin type.

MetaphorType stateOwnership event
undressedTypemoveable / free
dress-upPin::new(Type)give up ownership
dressed-upPin<Type>stuck in memory
dress-downPin::new(Type).get_mut()acquire edit access

Conclusion:

Pin does not pin anything physically at run-time, but relies on the auto-trait !Unpin to prevent dangerous moves at run-time.

Convert !Unpin into Unpin

Almost all primitives in Rust are Unpin. Whenever you encounter something that is not Unpin (and has not non-'static references), you can just allocate it on the heap with the function Box::pin. The Box::pin is shorthand for Pin::new(Box::new()).

The result of the Box::pin is a combination:

Let’s take the following as an example of something that is !Unpin.

struct MapFut<Fut> where Fut: Future {
    fut: Fut,
    ...
}

A common approach when handling extensions or combinators of futures which are themselves futures, is by projecting Pin<&mut Self>into &mut Self. The projection is usually called this (the self keyword is reserved).

impl<Fut> Future for MapFut<Fut> {
    fn poll(self: Pin<&mut Self>) -> Poll {
        // We need to do something with `self.fut`.
        // We need `Pin<&mut Self> -> &mut Self`.
        // The following wil not compile, because `Self: !Unpin`.
        let mut this = self.get_mut();
        unimplemented!()
    }
}

In this example the user struct MapFut is !Unpin, so get_mut cannot be used. The get_mut function requires Self: Unpin as mentioned in the previous section.

The simplest solution is to refine the definition of MapFut as follows:

struct MapFut<Fut> where Fut: Future {
    fut: Pin<Box<Fut>>,
    ...
}

Now fut is a pinned pointer to a location on the heap.

Unsafe Pin methods (optional)

Until now I explicitly avoided mentioning or using the unsafe methods of the Pin type. Beside Pin::new() and Pin::get_mut(), Pin also has a few unsafe counterparts.

You might have been tempted by the Rust compiler to use the unsafe methods because they do not have an Unpin constraint. But if you go in that direction, you effectively disable automatic and important checks by the compiler at run-time.

The Ready future

Anything can be turned into a future using the Ready future. This future takes the source and puts it inside an Option.

pub struct Ready<T>(Option<T>);

This future never has to wake up, because it yields a value immediately and is read on the first poll call. This is why the context (and its waker) are ignored.

impl<T> Future for Ready<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
        Poll::Ready(self.0.take().expect("Ready polled after completion"))
    }
}

To use this future, you can directly call the constructor Ready::new() or you can call ready():

assert_eq!(1, ready(1).await)`;

Remark: .await automatically converts everything in a future with IntoFuture, so it might not even be necessary to use Ready explicitly.

Simple stream combinators

Definition of a stream

A stream is an future that may be polled more than once and yield more than one Poll::Ready value.

The official definition is a trait:

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

Notice the similary with the Future trait.

Important: As of April 2025, the Stream trait is not yet in stable Rust.

Futures as streams with Once

Every future Fut can be converted into a stream by returning Some(Fut::Output):

pub struct Once<Fut> {
    future: Option<Fut>
}

impl<Fut: Future> Stream for Once<Fut> {
  type Item = Fut::Output;

  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
      let mut this = self.project();
      let v = match this.future.as_mut().as_pin_mut() {
          Some(fut) => ready!(fut.poll(cx)),
          None => return Poll::Ready(None),
      };
      this.future.set(None);
      Poll::Ready(Some(v))
  }
}

Here, ready! converts Poll into Option.

Functional building block Map

Let’s look at a simple example from the “semi-standard” library crate futures: the map combinator. This combinator just maps the items of an input stream and returns an new output stream (while consuming the input stream.)

The combinator’s source code looks like this:

pub struct Map<St, F> {
    #[pin]
    stream: St,
    f: F,
}

impl<St, F> Stream for Map<St, F>
where
    St: Stream,
    F: FnMut<St::Item>,
{
    type Item = F::Output;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { ... }
}

Ignore the field attribute pin for now.

The definition of map and all the behaviour that we expect from a map function essentially boils down to storing two things:

Then the authors of futures had to implement the single required method of the Stream trait: poll_next:

impl<St, F> Stream for Map<St, F>
where
    St: Stream,
    F: FnMut<St::Item>,
{
    type Item = F::Output;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        let res = ready!(this.stream.as_mut().poll_next(cx));
        Poll::Ready(res.map(|x| this.f.call_mut(x)))
    }
}

At the beginning of the poll_next function, only a Pin<&mut Self> is given. It has to be “undressed” to a mutable reference to the state object Map. This happens through a process called projection, which is essentially just calling get_mut on Pin.

Projecting with pin_project (optional)

Almost any library I encountered uses the pin_project crate which provides the #[pin] field attribute that I asked you to ignore. The pin_project crate takes care of two things:

Remark: Internally (if I read the code correctly), pin_project makes use of several unsafe function calls of the Pin type. This is not that strange, considering that most of the standard library is written in unsafe. On the other hand, using Box::pin will not have a significant performance impact and it does not require a macro from an external crate.

Stream combinators

I will now focus on more complicated stream combinators. These are functions that take one or more input streams and output an output stream. I will call such higher-order functions stream combinators from now on.

Remark: The futures-rx crate is a good crate to look for if you want to save time and just use combinators made by someone else.

Flattening nested streams

The futures crate has several types of flatten functions for nested streams. Nested streams are Rust data types S with a trait bound S: Stream<Item: Stream>.

You could separate flatten combinators in two categories: sequential or concurrent.

Sequential flatten combinators will never yield values from multiple inner streams at the same time.

Concurrent flatten combinators will combine values from multiple inner streams:

Keep in mind that you can also use the select_all function from futures::stream to flatten in the situation where your outer stream is not a real stream but a finite iterable of streams.

A stream of Arcs

Sometimes you might want to use the items yielded by a single stream in several places simultaneously. You could wrap all the items yielded by the input stream inside a shared reference Arc. The disadvantage of this approach is that you need (a limited form of) garbage cleaning by reference counting.

The futures-rx crate exposes a share method that can be used for splitting an input stream.

Looking at the docs, there is an example:

use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};

let stream = stream::iter(0..=3);
let stream = stream.share();

let sub_stream_a = stream.clone().map(|event| *event); 
let sub_stream_b = stream.clone().map(|event| *event); 

assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);

The events yielded by the shared stream seem to be wrappers around Arc<usize>. Since usize is Copy, *event is treated as &usize and the referenced usize automically cloned.

Remark: Maybe I am wrong?

A stream of clones

The share operator on streams from futures-rx seems to put all values on the input stream inside a reference-counted Arc.

I wanted to derive several output streams from the input stream that yielded clones directly without reference counting.

Remark: Maybe I could just have used share and then cloned every element?

I decided to implement a separate stream cloning trait in a crate clone-stream. The high-level API of this crate is quite simple. It is just a call to fork and then stream.clone():

use clone_stream::ForkStream;
use futures::{FutureExt, StreamExt, stream};
let non_clone_stream = stream::iter(0..10);
let clone_stream = non_clone_stream.fork();
let mut cloned_stream = clone_stream.clone();

Remark: There exist a few other crates that are similar.

Stucture of the clone-stream crate

I called the shared state of all the clones of an input stream Fork. The output streams that can be cloned are called CloneStream. The input stream is called BaseStream.

The fork is a combination of a queue of unprocessed items and a map of the state machines of the CloneStreams:

struct Fork<BaseStream>
    where  BaseStream: Stream<Item: Clone> 
{
    base_stream: Pin<Box<BaseStream>>,
    queue: BTreeMap<usize, Option<BaseStream::Item>>,
    clones: BTreeMap<usize, CloneState>,
    next_clone_index: usize,
    next_queue_index: usize,
}

Every CloneStream registers at the shared Fork and receives a unique identifier.

fn register(&mut self) -> usize {
    let min_available = self.next_clone_index;
    self.clones.insert(min_available, CloneState::default());
    self.next_clone_index += 1;
    min_available
}

Then Fork creates an entry for the CloneStream in a map containing the states of all CloneStream. The default entry is just NeverPolled.

As soon as the CloneStream is polled, the poll call is forwarded to the Fork together with the waker provided by the async runtime. The Fork will look at the current state of the state machine associated to CloneStream by using it’s idea. Depending on the state it will:

After each step, the state machine of the CloneStream is advanced by the Fork in its map.

General approach building combinators

Usually, a combinator has some kind of internal state. You can choose between:

If you decide on the last approach, you can use the “state” data type as a handle for useful helper methods.

Do not forget to give the helper methods (where appropriate) a Waker argument. This argument can be used by the helper methods to store the waker in case some source is unavailable and wake up the relevant sleeping tasks in case the source becomes ready in future.

Only the latest Waker passed to poll is stored (at least for most futures in the wild). If you want multiple tasks to be woken from sleep by the latest passed waker, you have to build your own wrapper waker that will wake all relevant sleeping tasks.

To make your own waker you can use the unsafe RawWaker, but that requires you to specify low-level behaviour that is already provided by any asynchronous runtime. Instead, create a simple struct definition that contains the wakers you need and implement the Wake trait for that struct.

For example:

struct SleepWaker {
    wakers: Vec<Waker>,
}

impl Wake for SleepWaker {
    fn wake(self: Arc<Self>) {
       self.wakers.iter().for_each(Waker::wake_by_ref);
    }
}

After writing all your helper methods (that operate on some shared internal state), the only task that remains is to implement the poll_next method for the Stream trait. The body of this implementation can then make use of the helper functions on the “state” data type.

As you can see in clone-stream, the poll_next-method of the output CloneStreams just calls methods defined previously on the internal shared object (the Fork):

impl<BaseStream> Stream for CloneStream<BaseStream>
where
    BaseStream: Stream<Item: Clone>,
{
    type Item = BaseStream::Item;

    fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
        let waker = current_task.waker();
        let mut fork = self.fork.write().unwrap();
        fork.poll_clone(self.id, waker)
    }

Testing your async combinators

Do not spend time on mocking an asynchronous runtime in your tests, because you will end up building your own buggy runtime which needs its own test suite.

If you do not care much about the ordering of events, you can just use the ThreadPool from futures to create a barebones async runtime on which you can spawn async tasks. In this way you can already test a lot of invariants of your homegrown async combinator.

If you are testing the ordering of events, you should use a run-time with a notion of time (the most common one is Tokio). Then you can test your code with time-outs using the select!-macro. Do not forget to account for situations in which parts of the system are dropped or futures cancelled. See the tests of clone-stream.

In case you are worried about the size of an additional async runtime dependency, you can add the dependency as a “dev-dependency”-only to keep your core library code light and easy to distribute.