Functional async

How to start with the basics of functional asynchronous programming in Rust with streams and sinks.

Introduction

Let’s get started with some definitions you may have heard about already.

Declarative programming is when you write code and make sure that the inputs and outputs of sub-modules behave predictably according to fixed invariants. Logic or constraint solver languages like Prolog or ManyWorlds (written by my friend Jo Devriendt) are part of this family.

Functional programming is building the majority of your code from wel-defined and side-effect-free functions. It is a sub-set of declarative programming. Languages like ML, Haskell, OCaml, Lisp are part of this family. They are all extensions of Lambda Calculus.

Asynchronous programming is a kind of programming where you not only have functions that evaluate immediately, but there are also functions that may evaluate in the future. Such functions are called asynchronous functions. JavaScript, C# and Rust are examples.

In the following I will show how declarative, functional and asynchronous programming can be combined in the Rust programming language with the semi-standard library crate futures.

Use-cases

Normal imperative asynchronous programming contains a combination of the async, await keywords with imperative keywords like while, loop or continue.

To illustrate where functional asynchronous programming would be useful, I will give a few examples.

Channels

One approach that you might take when dealing with different sub-modules in a large project is using channels to communicate between the different modules.

This could look like this:

let (_, in_receiver) = watch::channel(1);
let (out_sender, _) = broadcast::channel(123);
let forward_task = spawn(async move {
    loop {
        let result = in_receiver.changed().await;
        if let Err(e) = result { break };
        let input = *in_receiver.borrow();
        let output = get_output(input);
        out_sender.send(output);
    }
});

In this example I use “watch” and “broadcast” channels from the Tokio crate.

If we would translate this to a functional asynchronous version, we get:

let in_receiver = WatchStream::new(in_receiver);
let out_sender = BroadcastSink::new(out_sender);

let forward_task = spawn(
    in_receiver
    .map(Result::ok)
    .filter_map(ready)
    .map(get_output)
    .map(Ok)
    .forward(out_sender)
)

So what happened during the translation?

By shifting the focus from input, intermediate and output variables to transformations with map we replaced N imperative variables in the loop by N-1 functional closures passed to map. You could still argue that the variables did not really disappear, but they are now hidden in the closures and the code is more readable.

Remark: JavaScript frameworks for building web-apps usually call sink a “signal” or “writable observable”. The gstreamer-rs crate also has “sinks” but they are not directly related to the “sinks” in futures

Reactive UI input

Another place where functional asynchronous programming is useful, is on the frontend.

An imperative version might look like this:

let mut target_temperature = 21.0;
slider.on_slide(move |new_t| {
    if acceptable(new_t) {
        let new_target = some_op(new_t);
        target_temperature = new_target;
    }
});

Although this particular example is small, writing large amounts of a large codebase in this style could introduce problems:

The imperative version can be translated to a functional version like this:

let mut target_temperature = MySink::new(21.0);
slider.value
    .filter(acceptable)
    .map(some_op)
    .forward(target_temperature);

The MySink::new(21.0) is a call to the constructor of MySink, an imaginary object that implements the Sink trait.

Clear benefits

Instead of exposing variables names for input, intermediate and output variables, we omit them and focus on naming the transformations themselves. This way of dealing with computation is closer to how we communicate in natural language using verbs.

Another benefit of the functional approach is that it does not rely on a concrete type. If you are stuck in the middle of writing a module to provides a Stream but you also have to write something that consumes it, you can just continue with the last one.

struct MyStream {
    ...
}

impl MyStream {
    fn new() -> Self {
        unimplemented!()
    }
}

impl Stream for MyStream {
    type Item = i32;
    fn poll_next(self) -> Poll<Option<Self::Item>> {
        unimplemented!()
    }
}

You could then already start with another module that consumes MyStream. But instead of directly depending on MyStream, you can just depend on the Stream trait. This way, you can write your code without having to know the implementation details of MyStream.

struct MyConsumer {
    ...
}

impl MyConsumer {
    fn consume(stream: impl Stream<Item = i32>) {
        // We can start already!
        ...
    }
}

The important part is impl Stream<Item = i32>. This means that the consume function can take any type that implements the Stream trait and produces i32 items.

We have separated the problem into two levels of abstraction that can be dealt with independently and simultaneously:

As of April 2025, the traits Stream and Sink are used universally by crates published on crates.io. If you make use of these traits, which describe a common behavior, and implement them for your own types, you make your code more interoperable with the rest of the world.

Important: It is important to know while using Rust that it is not required to know everything about Pin or Poll. You can just use the high-level methods provided by the standard library and futures crate.

Streams

Relationship with iterators

The main thing that is added in functional asynchronous program is the Stream trait. You are supposed to use it everywhere. There are other things of course, but this is the main concept that you will need.

So what is a stream? It is just something that implements the Stream trait from the futures crate. It nothing more than an asynchronous iterator.

A rough conceptual definition of a stream would be:

A function that returns multiple values at unpredictable times.

First, remember that the life-time of an iterator (a normal synchronous blocking one) looks like this:

Tcreateiterateyield
1(1..=10)
2next()
3Some(1)
4next()
5
6next()
7None
8next()
9Some(2)

Notice that calling next after the iterator yielded None may result in a new Some. If you do not want that, apply fuse to the iterator to obtain a FusedIterator that will keep yielding None after the first None.

The life-time of a stream/async iterator during usage looks like this:

TCreationIterationYielded
1St::new()
2next()
3await
4Some(1)
5next()
6
7await
8Some(2)
9next()
10await
11None
12next()
13await
14Some(3)

The lifecycle of an async iterator (stream) is longer than a normal iterator since it requires an await before a value is yielded.

A FusedStream is the async analogue of FusedIterator and will yield None after the first None. In addition, it has a non-async method is_terminated that says whether the stream is exhausted already.

pub trait FusedStream: Stream {
    fn is_terminated(&self) -> bool;
}

Usually a FusedStream will yield Poll::Ready(None) after the first Poll::Ready(None) and it’s is_terminated method will be positive. However, the implementor has the freedom to break these conventions.

Streams in the wild

The first place to look for Streams is in the futures::channel module. It contains a concrete implementation of channels with receivers that implement Sink and senders that implement Stream.

If you need more advanced types of channels, you can look in the postage or tokio crates.

Important: The channels in tokio are not directly usable with the futures crate. You need to:

Consuming streams

Remark: As of April 2025, all the methods you need for streams are in StreamExt from futures. For the rest of this article, almost all Stream-related methods come from this trait.

The simplest case would be the case where you just want to perform an operation for each element that is yielded by the stream. For this, you should use the for_each method to act on each item in Some(item).

let mut x = 0;
let fut = stream::repeat(1).take(3).for_each(|item| {
    x += item;
    future::ready(())
});
fut.await;
assert_eq!(x, 3);

Observe that the argument for the closure in for_each does not take an Option. The stream returned by for_each is fused; it is an implementor of FusedStream. A FusedStream is a special type of stream that terminates as soon as one item yielded by the input stream is None.

Also notice that the output of the closure has to be a future. But it does not need to await anything, so we use the ready future, the simplest possible future without any await points (and consequently, also Unpin, see other blog posts).

Important:

Very often we just have to apply one blocking, synchronous operation to every output item from an input stream and return a new stream with the mapped items. The map stream operator is the right tool for this:

let stream = stream::iter(1..=3);
let stream = stream.map(|x| x + 3);

When the operation in the closure is asynchronous you should use then (as in Future::then).

let stream = stream::iter(1..=3);
let stream = stream.then(|x| async move { convert(x).await });

Feel free to use async closures async |_| {} or AsyncFn in recent Rust versions. Asynchronous closures were only recently stabilized as of April 2025. Since I do not understand the implementation of async closures very well, I prefer to keep using the old syntax: |x| async move {} for now. This syntax works better with older versions of Rust.

Remark: In previous major version releases of futures, then and map were a single function. The crate futures-preview an old version. Avoid reading the documentation of futures-preview to prevent confusion. (Everything you need for the rest of this presentation is available in futures >= 0.3.31.)

Stream test helpers

While implementing your own streams (maybe not now but later on), you will run into situations where you need consume the streams as if you were a typical consumer. The futures crate provides helpers for tests that are analogous to the ones in Iterator. The only thing that distinguishes them is being operators on asynchronous iterators:

The following Stream helpers / operators take a stream and perform some simple actions on it without changing the values:

For example, you can use enumerate as follows:

let stream = stream::iter(vec!['a', 'b', 'c']);

let mut stream = stream.enumerate();

assert_eq!(stream.next().await, Some((0, 'a')));
assert_eq!(stream.next().await, Some((1, 'b')));
assert_eq!(stream.next().await, Some((2, 'c')));
assert_eq!(stream.next().await, None);

Filtering streams

You can filter a stream of numbers to only keep the even numbers as follows:

use std::future::ready;
use futures::{stream, StreamExt};

let stream = stream::iter(1..=10);
let events = stream.filter(|x| ready(x % 2 == 0));

Notice the ready function. This function maps primitive Rust values into the async world. The output of ready is a minimal Future that can be moved: it is Unpin.

Remark: Don’t try to implement ready yourself, just import it from std::future::ready.

Why ready? (optional intermezzo)

Sometimes the error messages emitted compiler may steer you on the wrong path. For example, the following would be an Unpin stream and the compiler will not allow you to call the next method on it:

let output_stream = stream.filter(|x| async move { x % 2 == 0});

// Compiler error: `output_stream` does not implement `Unpin`!
assert_eq!(output_stream.next().await, Some(2));

The compiler messages may lead you to pinning the whole stream or the closure on the heap with Box::pin and your hacky solution will look like some of the following:

stream.filter(|x| Box::pin(async move { x % 2 == 0}));
Box::pin(stream.filter(|x| async move { x % 2 == 0}));
stream.filter(|x| async move { x % 2 == 0}).boxed();

However, this is not necessary and involves unnecessary heap allocations. Use the std::future::ready function from above instead.

Because the output of the closure ready(x % 2 == 0) is an Unpin Future, the closure |x| ready(x % 2 == 0) itself is also Unpin.

Remember from the “auto-trait” rules of Rust that auto-traits such as Unpin are automatically implemented for all structs for which all fields implement Unpin. This implies that if the closure is Unpin, the Filter stream will also be Unpin.

Each time you write stream.filter(|x| {...}), you create a new stream that consumes the input stream. Internally, .filter constructs a Filter which looks (simplified) as follows:

pub struct Filter<St, Fut, F> where St: Stream {
    stream: St,
    f: F,
    pending_fut: Option<Fut>,
    pending_item: Option<St::Item>,
}

The resulting stream needs to keep in memory a reference to the closure (so that it can map future items). The easiest way to have such a reference is to just own it. So in practice, the output Filter stream is a struct with an extra field f for storing the closure.

Remark: The closure f does not have to be a real capturing closure, it can be any type that implements the Fn trait which includes function pointers or function items.

Filter map

When you are working with a stream and need to extract some information from it, but this extraction is fallible, you can use filter_map.

The filter_map operator:

let stream = stream::iter(1..=10);
let events = stream.filter_map(|x| async move {
    if x % 2 == 0 { Some(x + 1) } else { None }
});

In case your closure returns Result, you can pass Result::ok to the filter_map operator to convert each Result item into an Option item.

Important: Don’t forget to use the try_* operators from futures::TryStreamExt. I have not used them myself yet, but they seems quite useful when dealing with fallible streams.

Boolean operators

The futures crate also provides analogues for the boolean operators shipped with the standard library Iterator such as any, all, … :

let number_stream = stream::iter(0..10);
let less_then_twenty = number_stream.all(|i| async move { i < 20 });
assert_eq!(less_then_twenty.await, true);

Notice here that we don’t have to “pin” the less_than_twenty stream, because Unpin is not a requirement for all.

Sinks

Dual of streams

Up until now we have only seen detailed usage of the Stream trait. But the opposite of a stream, a “sink”, is also shipped by the futures crate as the Sink trait. A Sink is something that receives data, agnostic from the transport or channel used.

The different life-cycle stages of a Sink can be summarized as follows:

stagenamemethodmeaningremark
creationnewInitial state
sendreadyready().awaitWait until cache readymay be full
sendstart sendstart_send(item)Load into cachenot actual send
sendflushflush().awaitSend items from cache
closecloseclose().awaitClose the Sinknot automatic

The analogue of the map function for streams StreamExt::map for Sinks is the sink operator SinkExt::with. Instead mapping the output items of a stream, it applies a mapping function to all items that are going to be flushed into the sink.

If a Sink becomes full easily and you depend on the concrete underlying type (such as for example a sink derived from a particular type of sender of a Tokio channel), you can allocate an extra buffer with StreamExt::buffer() to cache elements that don’t fit in the sink.

Remark: The Sink trait is not as common as the Stream trait in the crates that I have used. It is, however, very easy to implement yourself.

Flushing a Stream into a Sink

A Sink may appear in combination with a Stream. In that case, it is possible to create a fully functional pipeline that takes a stream and flushes it into a sink. This is done with the forward method of the Sink trait.

let (output,_) = tokio::sync::mpsc::channel();
let output = tokio_util::PollSender::new(output);

let input = stream::repeat(1).map(Ok);
input.forward(output).await.unwrap();

Important:

More Sink operators

The forward method will also close the Sink upon termination of the input stream. If you don’t want to close the Sink after stream returned None, use the sink operator SinkExt::send_all.

When you have one input stream and know n output sinks at compile-time, you can use StreamExt::fanout. Otherwise you will need a mechanism to Clone the input stream at run-time. Cloning of streams will be discussed later on.

Merging and splitting

Collapsing an iterable of streams

Given an iterable of streams, you can collapse the whole iterable into one stream with select_all. This function will just emit the stream items from all the streams (assuming the iterator is finite) as they arrive.

A simple example would look like:

let stream_a = stream::repeat(1);
let stream_b = stream::repeat(2);

let merged = stream::select_all([stream_a, stream_b]);

In practice, you would typically pass large vectors, compile-time-sized arrays or other iterable collections to the select_all function.

Often, you will want to merge (a lot of) streams that come from a different underlying transport. This might be the case, for example, when you have input streams that are derived from channel receivers from different crates, internal or external. In that case, the input iterable of streams for select_all needs to be an interable over boxed stream trait objects Pin<Box<dyn Stream>>.

Remark: A special case of select_all for two input streams is the merge function from the Tokio helper crate tokio-stream. You can use it if you want, but the select_all function does the same thing and is more powerful and general.

Tracking origin stream

If you want to keep track of the origin of the values in the merged stream, the simplest solution you can come up will probably look like this:

let stream_a = stream::repeat(1).map(|n| ('a', 1));
let stream_b = stream::repeat(2).map(|n| ('b', 2));

let merged_tagged = stream::select_all([stream_a, stream_b]);

This is essentially just a homogeneous merge with select_all preceded by tagging the items of each source stream with a unique identifier for the source stream.

Remark: in this simple case you might want to consider a simple custom combinator (see other posts on how to build one) or the one provided by tokio_stream::MergeMap.

Inhomogeneous stream merging

If you feel like merging two streams with a different item type, you are probably doing something wrong. It is quite rare that you would want to wait for two items to be available from both streams (which are in practice usually emitted at unrelated times). However, it is possible using the zip function from the futures crate.

let stream1 = stream::iter(1..=3);
let stream2 = stream::iter(5..=10);

let vec = stream1.zip(stream2)
                 .collect::<Vec<_>>()
                 .await;
assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);

Splitting streams

What if you need to use the same output of a stream in several places? You can do it with one of the following:

In case you go for the last approach, there are a couple of crates available on crates.io that turn a stream with clone-able items into a clone-able stream with the same items:

To use the my crate clone-stream you have to import the trait ForkStream, then you call fork on the input stream. Afterwards, you can clone the output stream of fork as much as you want.

use futures::{FutureExt, StreamExt, stream};
use clone_stream::ForkStream;

let un_clone_able_stream = stream::iter(0..10);
let clone_able_stream = un_clone_able_stream.fork();
let mut cloned_stream = clone_able_stream.clone();

In later posts I will show how the clone-stream works and the overall experience I had with implementing or creating your own Stream combinator (something that combines streams and returns a stream) in Rust.