Local Async Executors and Why They Should be the Default
Imagine you are new to Rust and you want to learn some async programming. Writing a chat server is a common way to start, scope is small enough to contain in a blog post, yet it is complex enough to learn most of what you'll need to build async servers in Rust. One DuckDuckGo query and we are off to the races: Building a Real-time Chat App in Rust and React by Tin Rabzejl. First line reads:
This article covers building a chat app in Rust using asynchronous code.
Asynchronous code, that's what we want, great! This is a very well written article, and being from 2020 it's still very much in vogue. There are some low-hanging performance tweaks I might do, but as example code to cover the basics I think this works really well. We have Tokio for runtime and warp for WebSockets, pretty standard affair.
So let's look through this article and see what do we need to learn to write asynchronous Rust:
async
functions and.await
syntax.- Futures and
tokio::select!
. RwLock
and why it's better thanMutex
for this case.sync::mpsc
channels.- Our main shared-state thing lives inside an
Arc
and we clone that a bunch.
Notice something about this list? Except for the first two, nothing here has anything to do with asynchronous programming. Yes the RwLock
and mpsc
comes from Tokio and lets you .await
instead of blocking a thread, but these are not async primitives, these are multi-threading synchronization primitives.
Let's step back a little and actually think about the problem we are trying to solve. This is a simple chat server, how likely is it that it's ever going to be more CPU bound than I/O bound? I'd say that probability is pretty low. If you know anything about asynchronous sockets it should be that multi-threading a socket doesn't actually yield you more requests / second, and it can actually lower it (slightly, but still).
This is why Node.js and Deno are so good: it defaults to a single-threaded JavaScript event loop handling huge amounts of requests that is easy to write, and when you need to utilize multiple CPU cores, you just spawn multiple processes that listen on the same socket. This is a much better way of structuring servers, except for all that JavaScript, of course.
The Original Sin of Rust async programming is making it multi-threaded by default. If premature optimization is the root of all evil, this is the mother of all premature optimizations, and it curses all your code with the unholy Send + 'static
, or worse yet Send + Sync + 'static
, which just kills all the joy of actually writing Rust.
Actually Pleasant Async Rust
So I'm writing a multiplayer game in Rust, and the server part of it is really quite similar to what a chat server would do: there is an instance of a Battle
which is a chat-room analog, a message comes in from one of the clients, it gets processed, and then the outcome gets broadcast to all client in that instance, just like a chat message in a room would be. And here is the piece of code that handles that currently built on top of async-io
and async-executor
:
// This is spawned as a task in a `LocalExecutor`. `BattleManager` outlives
// the executor, so I can just send it in by reference 🤯
pub async fn battle(stream: Async<TcpStream>, manager: &BattleManager) {
// `!Sync` read and write halves of WebSocket using a modified Soketto
let server = UnsyncServer::new(stream);
let (mut sender, mut receiver) = server.split();
// `!Sync` read and write halves of a quasi-ring buffer.
let (writer, mut reader) = new_shared();
// We find a battle to put this socket into, and do just that.
// Each battle instance is wrapped in `Rc<RefCell<_>>`.
let battle = manager.matchmake();
let cid = battle.borrow_mut().join(writer);
// Loop handling outgoing messages turned into a simple future
let outgoing = async move {
while let Some(mut buf) = reader.read().await {
if let Err(err) = sender.send(&mut buf[..]).await {
log::error!("Connection error: {err:?}");
break;
}
// `buf` is dropped here, which safely advances read head
}
let _ = sender.close().await;
};
// Loop handling incoming messages turned into a simple future
let incoming = async move {
let mut data = Vec::new();
loop {
data.clear();
if receiver.receive_data(&mut data).await.is_err() {
battle.borrow_mut().leave(cid);
break;
}
let mut battle = battle.borrow_mut();
// Process incoming messages
for client_message in core::Decoder::new(&data) {
battle.handle_message(cid, client_message);
}
// Broadcast all outgoing messages buffered for all clients
battle.flush();
}
};
// Zip (join) the two futures together so the two loops can run
// concurrently. Yes sometimes I double-poll one, who cares.
zip(incoming, outgoing).await;
log::info!("Connection closed");
}
This is pretty easy, and it looks almost like synchronous Rust. Yes I have some RefCell
s, but compared to Mutex
es or RwLock
s borrowing a RefCell
is virtually free. And if you can prove to yourself that your stuff is never double-borrowed, you could do some custom wrappers using UnsafeCell
to make it literally free.
The guts of my shared buffer look roughly like this:
#[repr(transparent)]
pub struct BufWriter(Rc<Inner>);
#[repr(transparent)]
pub struct BufReader(Rc<Inner>);
struct Inner {
write_head: Cell<u32>,
read_head: Cell<u32>,
waker: Cell<Option<Waker>>,
buffer: UnsafeCell<[u8; BUF_SIZE]>,
}
If using unsafe makes your heart skip a beat, RefCell
would be an option too. Since reading the buffer can .await
, and since there is only ever one BufReader
I also only ever need to manage one Waker
. When manually implementing a Future
registering a waker is as simple as:
inner.waker.set(Some(cx.waker().clone()));
And waking the waker is as simple as:
// This is effectively the same as `inner.waker.take()`
// if we didn't have the `Cell` wrapper.
if let Some(waker) = inner.waker.replace(None) {
waker.wake();
}
No unsafe
, no atomic-waker, no need for a Ph.D. in atomic op ordering, no magic, just single threaded things.
Hidden Costs
Just switching to a LocalExecutor or something like Tokio LocalSet should be enough, but unfortunately that alone does not free your code from unnecessary thread synchronization. Like the WebSocket in warp that our chat example uses, Soketto is using BiLock
s to guarantee safe access to the underlying socket. In fact both Sender
and Receiver
in Soketto need the ability to write in order to respond to PINGs with PONGs when reading, and just for that writing to a socket has to go through two separate BiLock
s (one inside the WriteHalf
, and one around it). Even if you discount the actual locking mechanisms, just having all that pointer indirection should be a red flag 🚩.
I'm writing all this because I've contributed a bunch to Soketto myself and never thought that to be a problem, so I too am not without sin. Making things thread safe for runtime-agnostic utilities like WebSocket is yet another price we pay for making everything multi-threaded by default. The standard way of doing what I'm doing in my code above would be to spawn one of the loops on a separate background task, which could land on a separate thread, meaning we must do all that synchronization to manage reading and writing to a socket from different threads for no good reason.
Luckily those assumptions are made exclusively in the ecosystem, neither the standard library nor Rust itself bakes in any of such assumptions for async. Yes, the Wake
trait requires an Arc
, but even that has an escape hatch1). We can fix this, and all it takes is writing utilities that are !Sync
and evangelizing the use of local async executors as the default option, as it should be.
But I need Threads!
Of course writing single-threaded servers isn't always the optimal choice, it likely isn't for most cases, isn't that why multi-threaded task executors should be the default?
No.
If you write regular synchronous Rust code, unless you have a really good reason, you don't just start with a thread-pool. You write single-threaded code until you find a place where threads can help you, and then you parallelize it, which can be as simple as replacing iter
with par_iter
using Rayon. Rust is exceptional at adding safe parallelization to your code, and for the most part this is how Rust is written everywhere, except async.
It's easy to parallelize a server that's using local task executors in Rust, you just spawn multiple threads, each with its own executor. Like in your bog standard non-async Rust the goal is to limit the shared memory accesses between tasks to local thread as much as possible, and only when you absolutely have to communicate between threads do you reach out for the more expensive tools mpsc
or RwLock
. Your TCP connection needs to access multiple chat rooms that might live on different threads in a pool? TcpStream
is Send
, you can just move it to the thread on which a given room lives so that it's always local.
Not only does it make writing async code more pleasant because suddenly you don't need to worry about synchronization all the time, and you might even be able to use references (again, 🤯), this way of doing things even has a name: Thread-per-Core (or Thread/Core), and it's actually the far better model for writing performant servers. And no, you don't necessarily need to buy into Glommio with cooperative task programming, I hereby give you permission to still spawn blocking tasks on a separate thread-pool, it's okay.
Isn't all of this extra work? After all just calling spawn
every time you need to do something in the background is pretty easy, right? Well, I suggest to you, dear reader, that this function signature:
pub fn spawn<T>(future: T) -> JoinHandle<T::Output> where
T: Future + Send + 'static,
T::Output: Send + 'static,
is a gun. We give it to all Rustaceans reaching for async, seasoned or new alike, and they all use it to repeatedly shoot themselves in the foot, over, and over, and over again, and most of the time they don't even know that's what they are doing. That's how easy to use it is.
Using multi-threaded spawn
reduces the friction of managing multiple tasks across all CPU cores, and it only costs you friction everywhere else in your entire codebase, and quite often performance as well.
Let's stop.
1) u/SkiFire13 astutely noticed that Waker
must implement Sync
, so this escape hatch is mostly relevant for environment such as embedded platforms or Wasm, where such constraint isn't an issue, and where atomics might be unavailable.