Building stream combinators
Wednesday, 2025-04-16 Comments
How to add functionality to asynchronous Rust by building your own stream combinators.
Previous post: Role of coroutines.
Introduction
What if you know how to use streams, but you need some kind of functionality that is not in the StreamExt trait from futures or the standard library?
In that case, you might try an imperative approach and create the stream using the unfold function. This is a solution for simple cases, but it does not generalise well. Disambiguating between different parts of intermediate stream state quickly becomes difficult.
You might also want to try using imperative design patterns like loops, channels, spawns and generic functions. This approach quickly becomes unmaintainable because of its complexity.
Invalidation through moves
Remark: the following sections will be about Unpin. I wrote about it because I needed it later on to understand better how to construct combinators. You can skip this chapter if you want (or look at the official documentation).
The physical location of variables used in your code may move throughout the lifetime of your program. In Rust, for example, it is common to move a variable through an assignment. A variable that is called by value by a function is moved (literally and conceptually) into the the function. The function takes conceptual ownership. As far as I know, this is called the move semantics of Rust.
However, through this conceptual/semantical moving process, a physical move of the data in the registers of the CPU or other parts of memory may also occur. This is dangerous when the data being moved is self-referencing.
For more information, see std::pin.
Clearing up Unpin
The confusing aspect of Pin and Unpin in Rust, is that it is not Pin which is the first that you should understand, but it is Unpin.
The naming of Unpin makes it seem like it is some counterpart to Pin. However, it is not.
Unpin is an auto-trait which means that the compiler derives Unpin for everything that it deems safe to move. This is done at compile-time and behind the scene by the compiler for every “auto-trait”.
Cannot be moved / !Unpin
Anything that looks like it cannot be moved by the compiler, will be marked automatically as !Unpin, not Unpin, or un-moveable. The reasoning by the compiler is that it necessary to detect when some kind of data type would be invalidated by a move and prevent dangerous actions by users (programmers implementing async functionality).
Examples of !Unpin Rust data-types:
-
generators:
- normal generators written with
gen-blocks or macros - async generators written with the macro
async_stream::stream! {}(implementing theStreamtrait)
- normal generators written with
-
self-referencing data that is code-generated by the compiler
- state machines generated by
async {}blocks - … ?
- state machines generated by
-
self-referencing user data structures:
- naive trees
- strings with slices
-
types with a manual
PhantomPinnedfield
Important: The reason that async {} blocks are !Unpin is that the compiler is lazy and does not analyze such blocks automatically to check whether they are Unpin.
Purpose of data-type Pin
Pin as a data-type has only one requirement: it’s inner type should implement Deref. In other words, Pin is a wrapper around pointer / reference-like datatypes.
The concrete type Pin is, in fact, not a real physical type. It does not represent a different location or address in memory. It is merely a Rust compiler construct that manifests itself as a type available to the users.
The Pin type is essentially a contract for pointer-like types, maintained by two methods that require the pointed-to (also called “pointee”) to implement Unpin:
- A constructor
Pin::new(): takes any ownedUnpinobject (safe-to-move, not!Unpin) and takes ownership of it. There is no way to get ownership back. You may, however, implement aDropimplementation which will be run on de-allocation ofPin. - A mutable getter
Pin::get_mut(): allows you get mutable access to the contained, pinned,Unpinvalue. This allows you to still call methods with the&mut Selfsignature on the pinned data.
The reason that the get_mut method requires the pointee to be Unpin is that mutable access through a mutable reference can be used to move the content of the Pin with a function like std::mem::replace. This could invalidate any pinned otherwise unmoveable !Unpin type.
| Metaphor | Type state | Ownership event |
|---|---|---|
| undressed | Type | moveable / free |
| dress-up | Pin::new(Type) | give up ownership |
| dressed-up | Pin<Type> | stuck in memory |
| dress-down | Pin::new(Type).get_mut() | acquire edit access |
Conclusion:
Pindoes not pin anything physically at run-time, but relies on the auto-trait!Unpinto prevent dangerous moves at run-time.
Convert !Unpin into Unpin
Almost all primitives in Rust are Unpin. Whenever you encounter something that is not Unpin (and has not non-'static references), you can just allocate it on the heap with the function Box::pin. The Box::pin is shorthand for Pin::new(Box::new()).
The result of the Box::pin is a combination:
- The heap-allocated pinned object will not be moved by any code generated by the Rust compiler;
- We can still drop the pinned object and the
Dropimplementations will run.
Let’s take the following as an example of something that is !Unpin.
struct MapFut<Fut> where Fut: Future {
fut: Fut,
...
}
A common approach when handling extensions or combinators of futures which are themselves futures, is by projecting Pin<&mut Self>into &mut Self. The projection is usually called this (the self keyword is reserved).
impl<Fut> Future for MapFut<Fut> {
fn poll(self: Pin<&mut Self>) -> Poll {
// We need to do something with `self.fut`.
// We need `Pin<&mut Self> -> &mut Self`.
// The following wil not compile, because `Self: !Unpin`.
let mut this = self.get_mut();
unimplemented!()
}
}
In this example the user struct MapFut is !Unpin, so get_mut cannot be used. The get_mut function requires Self: Unpin as mentioned in the previous section.
The simplest solution is to refine the definition of MapFut as follows:
struct MapFut<Fut> where Fut: Future {
fut: Pin<Box<Fut>>,
...
}
Now fut is a pinned pointer to a location on the heap.
Unsafe Pin methods (optional)
Until now I explicitly avoided mentioning or using the unsafe methods of the Pin type. Beside Pin::new() and Pin::get_mut(), Pin also has a few unsafe counterparts.
You might have been tempted by the Rust compiler to use the unsafe methods because they do not have an Unpin constraint. But if you go in that direction, you effectively disable automatic and important checks by the compiler at run-time.
The Ready future
Anything can be turned into a future using the Ready future. This future takes the source and puts it inside an Option.
pub struct Ready<T>(Option<T>);
This future never has to wake up, because it yields a value immediately and is read on the first poll call. This is why the context (and its waker) are ignored.
impl<T> Future for Ready<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().expect("Ready polled after completion"))
}
}
To use this future, you can directly call the constructor Ready::new() or you can call ready():
assert_eq!(1, ready(1).await)`;
Remark: .await automatically converts everything in a future with IntoFuture, so it might not even be necessary to use Ready explicitly.
Simple stream combinators
Definition of a stream
A stream is an future that may be polled more than once and yield more than one Poll::Ready value.
The official definition is a trait:
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Notice the similary with the Future trait.
Important: As of April 2025, the Stream trait is not yet in stable Rust.
Futures as streams with Once
Every future Fut can be converted into a stream by returning Some(Fut::Output):
pub struct Once<Fut> {
future: Option<Fut>
}
impl<Fut: Future> Stream for Once<Fut> {
type Item = Fut::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let v = match this.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
this.future.set(None);
Poll::Ready(Some(v))
}
}
Here, ready! converts Poll into Option.
Functional building block Map
Let’s look at a simple example from the “semi-standard” library crate futures: the map combinator. This combinator just maps the items of an input stream and returns an new output stream (while consuming the input stream.)
The combinator’s source code looks like this:
pub struct Map<St, F> {
#[pin]
stream: St,
f: F,
}
impl<St, F> Stream for Map<St, F>
where
St: Stream,
F: FnMut<St::Item>,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { ... }
}
Ignore the field attribute pin for now.
The definition of map and all the behaviour that we expect from a map function essentially boils down to storing two things:
- The original input stream.
- The function or closure that maps elements of the input stream.
Then the authors of futures had to implement the single required method of the Stream trait: poll_next:
impl<St, F> Stream for Map<St, F>
where
St: Stream,
F: FnMut<St::Item>,
{
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let res = ready!(this.stream.as_mut().poll_next(cx));
Poll::Ready(res.map(|x| this.f.call_mut(x)))
}
}
At the beginning of the poll_next function, only a Pin<&mut Self> is given. It has to be “undressed” to a mutable reference to the state object Map. This happens through a process called projection, which is essentially just calling get_mut on Pin.
Projecting with pin_project (optional)
Almost any library I encountered uses the pin_project crate which provides the #[pin] field attribute that I asked you to ignore. The pin_project crate takes care of two things:
- You don’t have to call
Box::pinyourself orpin_mut!inside the constructor function of your aggregated stream. - You get a
projectfunction which allows you to callas_mut()(equivalent toget_mut()).
Remark: Internally (if I read the code correctly), pin_project makes use of several unsafe function calls of the Pin type. This is not that strange, considering that most of the standard library is written in unsafe. On the other hand, using Box::pin will not have a significant performance impact and it does not require a macro from an external crate.
Stream combinators
I will now focus on more complicated stream combinators. These are functions that take one or more input streams and output an output stream. I will call such higher-order functions stream combinators from now on.
Remark: The futures-rx crate is a good crate to look for if you want to save time and just use combinators made by someone else.
Flattening nested streams
The futures crate has several types of flatten functions for nested streams. Nested streams are Rust data types S with a trait bound S: Stream<Item: Stream>.
You could separate flatten combinators in two categories: sequential or concurrent.
Sequential flatten combinators will never yield values from multiple inner streams at the same time.
flatten: flattens the outer stream by pasting the output from the inner streams consecutively. This is usually not what you want, since most streams are infinite and you don’t want to block the outer stream.- A “forgetful” flatten (also called
switchin RxJs): a kind of flatten of nested streams that only polls the most recent stream that arrived on the outer stream. This is implemented byswitch_mapinfutures-rx.
Concurrent flatten combinators will combine values from multiple inner streams:
- Concurrent
flatten_unordered(None): flattens by merging as many inner streams as possible, as they arrive on the outer stream. This might seems like a useful function, but it often not what you want. - Buffered concurrent
flatten_unordered(Some(N)): flattens up-to N different inner streams.
Keep in mind that you can also use the select_all function from futures::stream to flatten in the situation where your outer stream is not a real stream but a finite iterable of streams.
A stream of Arcs
Sometimes you might want to use the items yielded by a single stream in several places simultaneously. You could wrap all the items yielded by the input stream inside a shared reference Arc. The disadvantage of this approach is that you need (a limited form of) garbage cleaning by reference counting.
The futures-rx crate exposes a share method that can be used for splitting an input stream.
Looking at the docs, there is an example:
use futures::{stream::{StreamExt, self}, future::join};
use futures_rx::{Notification, RxExt};
let stream = stream::iter(0..=3);
let stream = stream.share();
let sub_stream_a = stream.clone().map(|event| *event);
let sub_stream_b = stream.clone().map(|event| *event);
assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);
The events yielded by the shared stream seem to be wrappers around Arc<usize>. Since usize is Copy, *event is treated as &usize and the referenced usize automically cloned.
Remark: Maybe I am wrong?
A stream of clones
The share operator on streams from futures-rx seems to put all values on the input stream inside a reference-counted Arc.
I wanted to derive several output streams from the input stream that yielded clones directly without reference counting.
Remark: Maybe I could just have used share and then cloned every element?
I decided to implement a separate stream cloning trait in a crate clone-stream. The high-level API of this crate is quite simple. It is just a call to fork and then stream.clone():
use clone_stream::ForkStream;
use futures::{FutureExt, StreamExt, stream};
let non_clone_stream = stream::iter(0..10);
let clone_stream = non_clone_stream.fork();
let mut cloned_stream = clone_stream.clone();
Remark: There exist a few other crates that are similar.
For more information about the clone-stream crate, see my presentation at EuroRust 2025.