Functional asynchronous Rust

Declarative/functional asynchronous programming in Rust with streams and sinks.

Introduction

Let’s get started with some definitions you may have heard about already.

Declarative programming is when you write code and make sure that the inputs and outputs of sub-modules behave predictably according to fixed invariants. Logic or constraint solver languages like Prolog or ManyWorlds (written by my friend Jo Devriendt) are part of this family.

Functional programming is building the majority of your code from wel-defined and side-effect-free functions. It is a sub-set of declarative programming. Languages like ML, Haskell, OCaml, Lisp are part of this family. They are all extensions of Lambda Calculus.

Asynchronous programming is a kind of programming where you not only have functions that evaluate immediately, but there are also functions that may evaluate in the future. Such functions are called asynchronous functions. JavaScript, C# and Rust are examples.

In the following I will show how declarative, functional and asynchronous programming can be combined in the Rust programming language with the semi-standard library crate futures.

Use-cases

Imagine you would not want to do any kind of functional asynchronous programming. What would that look like?

It would just look like asynchronous programming and the async, await keywords with imperative keywords like while, loop or continue.

Channels

One approach that you might take when dealing with different sub-modules in a large project is using channels to communicate between the different modules.

This could look like this:

let (_, in_receiver) = watch::channel(1);
let (out_sender, _) = broadcast::channel(123);
let forward_task = spawn(async move {
    loop {
        let result = in_receiver.changed().await;
        if let Err(e) = result { break };
        let input = *in_receiver.borrow();
        let output = get_output(input);
        out_sender.send(output);
    }
});

In this example I use Tokio “watch” and “broadcast” channels.

If we would translate this to a functional asynchronous version, we get:

let in_receiver = WatchStream::new(in_receiver);
let out_sender = BroadcastSink::new(out_sender);

let forward_task = spawn(
    in_receiver
    .map(Result::ok)
    .filter_map(ready)
    .map(get_output)
    .map(Ok)
    .forward(out_sender)
)

So what happened during the translation?

A side effect of this is that we replace N intermediate variables in the loop by N-1 map functional operations. This is where the functional aspect comes in.

Reactive UI input

Another place where functional asynchronous programming is useful, is on the frontend.

An imperative version might look like this:

let mut target_temperature = 21.0;
slider.on_slide(move |new_t| {
    if acceptable(new_t) {
        let new_target = some_op(new_t);
        target_temperature = new_target;
    }
});

One problem with this code is that it has a lot of indentation and different branches, even though it is quite simple. It also requires the maintainer to maintain a little more variable names.

Translating this to functional asynchronous Rust:

let mut target_temperature = sink(21.0);
slider.value
    .filter(acceptable)
    .map(some_op)
    .forward(target_temperature);

In mainstream web-frameworkds the sink will be called a “signal” or “writable observable”.

First observations

Instead naming the inputs and outputs, we focus on naming the transformations itself. The successive application of the transformations/functions is slightly easier to read for me.

Another benefit of the functional approach is that it does not rely on a concrete type. Stream and Sink are universal traits (or interfaces). If you make use of these traits, which describe a common behaviour, and implement them for your own types, you make your code more interoperable with the rest of the world.

On the other hand, on the consumer side, a consumer of a functional library using these traits does not need to know the implementation details. It is not required how the data exactly moves in the dependency. I like to think of it as:

Important: It is important to know while using Rust that it is not required to know everything about Pin or Poll. You can just use the high-level methods provided by the standard library and futures crate.

Simple building blocks

The main thing that is added in functional asynchronous program is the Stream trait. You are supposed to use it everywhere. There are other things of course, but this is the main concept that you will need.

So what is a stream? It is just something that implements the Stream trait from the futures crate. It nothing more than an asynchronous iterator.

A rough conceptual definition of a stream would be:

A function that returns multiple values at unpredictable times.

First, remember that the life-time of an iterator (a normal synchronous blocking one) looks like this:

Tcreateiterateyieldend
1(1..=10)
2it.next()
3Some(1)
4it.next()
51+1
6it.next()
7None

Here, I put the 1+1 to represent a random unrelated computation.

The life-time of a stream/async iterator during usage looks like this:

Tcreateiterateyieldnull
1St::new()
2st.next()
3await
4Some(1)
5it.next()
61+1
7await
8Some(2)
9it.next()
10await
11None
12it.next()
13await
14Some(3)

This is longer and more complicated than a normal iterator. An async iterator requires an await before a value is yielded.

As you can see in the last table, streams may yield None at first and later on still yield a Some. This is very different from iterators. Keep this in mind, especially further on, when creating your own streams. If you do not like this behaviour, you have to restrict your focus to FusedStream.

Basic usage of streams

Before we dive into how streams are actually built in Rust, I would like to show you how to use a stream. You will notice that the methods on a stream are very similar to Iterator.

The most basic operation is of course st.next(). This will create a future that references the stream and evaluates to a next item or None.

Remark: As of April 25, all the methods you need for streams are in StreamExt from futures. For the rest of this article, almost all Stream-related methods come from this trait.

Consuming streams

The simplest case would be the case where you just want to perform an operation for each element that is yielded by the stream. For this, you should use the for_each method to act on each Some item.

let mut x = 0;
let fut = stream::repeat(1).take(3).for_each(|item| {
    x += item;
    future::ready(())
});
fut.await;
assert_eq!(x, 3);

Observe that the argument for the closure in for_each does not take an Option. The stream returned by for_each is fused, it is an implementor of FusedStream. It terminates as soon as one output item from the input stream is a None.

Also notice that the output of the closure has to be a future. But it does not need to await anything, so we use the ready future, the simplest possible future without any await points.

Important:

Very often we just have to apply one blocking, synchronous operation to every output item from an input stream and return a new stream with the mapped items.

let stream = stream::iter(1..=3);
let stream = stream.map(|x| x + 3);

When the operation in the closure is asynchronous you should use then (as in Future::then).

let stream = stream::iter(1..=3);
let stream = stream.then(|x| async move { convert(x).await });

Feel free to use async closures async |_| {} or AsyncFn in recent Rust versions.

I prefer to stick to the good-old async move {} blocks.

Important: In previous major version releases of futures, then and map were a single function. The crate futures-preview is an old fork such an old version. Avoid it’s documentation to prevent confusion. (Everything you need for the rest of this presentation is available in the recent official release of futures.)

Stream test helpers

When you are testing a stream, you often want to use the following methods (which are completely identical to Iterator apart from being asynchronous):

An example of using enumerate:

let stream = stream::iter(vec!['a', 'b', 'c']);

let mut stream = stream.enumerate();

assert_eq!(stream.next().await, Some((0, 'a')));
assert_eq!(stream.next().await, Some((1, 'b')));
assert_eq!(stream.next().await, Some((2, 'c')));
assert_eq!(stream.next().await, None);

Filtering streams

use std::future::ready;
use futures::{stream, StreamExt};

let stream = stream::iter(1..=10);
let events = stream.filter(|x| ready(x % 2 == 0));

Notice the ready function. It maps sync values into async values / futures which are Unpin.

Because the closure is Unpin, the resulting Stream called events is also Upin.

Important: Sometimes the error messages emitted compiler may steer you on the wrong path. For example, the following would be an Unpin stream and the compiler will not allow you to call the next method on it:

stream.filter(|x| async move { x % 2 == 0});

The compiler messages may lead you to pinning the whole stream or the closure on the heap with Box::pin and your hacky solution will look like some of the following:

stream.filter(|x| Box::pin(async move { x % 2 == 0}));
Box::pin(stream.filter(|x| async move { x % 2 == 0}));
stream.filter(|x| async move { x % 2 == 0}).boxed();

However, this is not necessary and involves unnecessary heap allocations. Use the std::future::ready function from above instead.

Filter map

The filter_map operator:

let stream = stream::iter(1..=10);
let events = stream.filter_map(|x| async move {
    if x % 2 == 0 { Some(x + 1) } else { None }
});

Boolean operators

There are also analogues for all boolean operators from Iterator: any, all, …

let number_stream = stream::iter(0..10);
let less_then_twenty = number_stream.all(|i| async move { i < 20 });
assert_eq!(less_then_twenty.await, true);

Dual of streams: Sinks

You can forward a stream into a sink.

The futures::Sink trait is the opposite of Stream.

stagenamemethodmeaningremark
creationnewInitial state
sendreadyready().awaitWait until cache readymay be full
sendstart sendstart_send(item)Load into cachenot actual send
sendflushflush().awaitSend items from cache
closecloseclose().awaitClose the Sinknot automatic

The opposite for map for sinks is with.

If a Sink becomes full easily, you can allocate a buffer with buffer().

Flushing a Stream into a Sink

A stream can be sent into a sink with futures::SinkExt::forward.

If you don’t want to close the Sink after stream returned None, use send_all.

Third-party crates rarely have impl for futures::Sink.

let (output,_) = tokio::sync::mpsc::channel();
let output = tokio_util::PollSender::new(output);

let input = stream::repeat(1).map(Ok);
input.forward(tx).await.unwrap();

Important:

When you have one input stream and know n output sinks at compile-time, you can use fanout.

Otherwise you will have to do book-keeping yourself and spawn helper threads/tasks.

Merging and splitting

Collapsing an iterable of streams

Given an iterable of streams, you can collapse the whole iterable into one stream with select_all. This function will just emit the stream items from all the streams (assuming the iterator is finite) as they arrive.

A simple example would look like:

let stream_a = stream::repeat(1);
let stream_b = stream::repeat(2);

let merged_tagged = stream::select_all([stream_a, stream_b]);

In practice, you would typically pass large vectors, compile-time-sized arrays or other iterable collections to the select_all function.

Often, you will want to merge (a lot of) streams that come from a different underlying transport. This might be the case, for example, when you have input streams that are derived from channel receivers from different crates, internal or external. In that case, the input iterable of streams for select_all needs to be an interable over boxed stream trait objects Pin<Box<dyn Stream>>.

Remark: A special case of select_all for two input streams is the merge function from the Tokio helper crate tokio-stream. You can use it if you want, but the select_all function does the same thing and is more powerful and general.

Tracking source

If you want to keep track of the origin of the values in the merged stream, the simplest solution you can come up will probably look like this:

let stream_a = stream::repeat(1).map(|n| ('a', 1));
let stream_b = stream::repeat(2).map(|n| ('b', 2));

let merged_tagged = stream::select_all([stream_a, stream_b]);

This is essentially just a homogeneous merge with select_all preceded by tagging each individual item with an identifier for the source stream.

Remark: in this simple case you might want to consider a simple custom combinator (see other posts on how to build one) or the one provided by tokio_stream::MergeMap.

Inhomogenous stream merging

If you feel like merging two streams with a different item type, you are probably doing something wrong. It is quite rare that you would want to wait for two items to be available from both streams (which are in practice usually emitted at unrelated times). However, it is possible using the zip function from the futures crate.

let stream1 = stream::iter(1..=3);
let stream2 = stream::iter(5..=10);

let vec = stream1.zip(stream2)
                 .collect::<Vec<_>>()
                 .await;
assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);

Splitting streams

What if you need to use the same output of a stream in several places? You can do it with one of the following:

In case you go for the last approach, there are a couple of crates available on crates.io that turn a stream with cloneable items into a cloneable stream with the same items:

Using my implementation would look like:

use futures::{FutureExt, StreamExt, stream};
use clone_stream::ForkStream;

let uncloneable_stream = stream::iter(0..10);
let cloneable_stream = uncloneable_stream.fork();
let mut cloned_stream = cloneable_stream.clone();