Structured Concurrency — The Pattern That Defines Senior Tokio
Tasks have parents. Parents wait for children. Cancellation is a tree.
Foundation · Core Engine & Data Structures
Why this week
The difference between someone who uses tokio::spawn and someone who architects with Tokio is this one principle: tasks have parents, parents wait for children, cancellation is a tree. It's called structured concurrency. Nathaniel J. Smith's Trio essays made it famous in Python; Tokio's JoinSet + CancellationToken are the idiomatic Rust implementation.
You'll build small, std-only versions of both — a CancelToken that supports parent/child hierarchies, and a TaskSet that waits for every spawned thread to finish before returning. The capstone wires them into a ConnectionManager that accepts N "connections" (threads), broadcasts a shutdown signal, and drains cleanly inside a deadline.
We're deliberately using std::thread instead of tokio here. The concepts — hierarchical cancellation, join-on-drop, drain-before-stop — are independent of the runtime. Once you see the shape with threads, the tokio equivalents (JoinSet, CancellationToken, select! with a cancel branch) read as exactly the same pattern.
Day 1 — Trio essays + tokio primitives
The nursery pattern
Why ad-hoc spawn is a bug factory
Treat unstructured spawn the way you would treat `goto` — powerful, sometimes necessary, usually wrong. Read this twice.
The thing JoinSet gives you that FuturesUnordered doesn't: ownership of every spawned task so Drop aborts them and the set itself joins before dropping.
`child_token()`, `cancel()`, `is_cancelled()`, `cancelled().await`. Tree cancellation for free.
State the structured-concurrency invariant in one sentence. Now find a place in code you've written that violates it and describe what could go wrong.
Day 2 — Build a CancelToken
Cancellation is a tree
Drill 1: clone-shared flag + child tokens
The contract you're implementing:
CancelToken::new()→ returns a root token.token.cancel()marks this subtree cancelled. Idempotent.token.is_cancelled()returns true iff this node or any ancestor is cancelled.token.child()returns a new token whose cancellation bubbles up tois_cancelled()checks made on its descendants — but cancelling the child leaves the parent alone.
Parent/child cancel token
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
pub struct CancelToken {
// Flag for this node.
flag: Arc<AtomicBool>,
// Parent chain — checked on is_cancelled() to support "cancel
// bubbles down from ancestor". Children hold references to all
// ancestor flags (cheap: just Arc clones).
ancestors: Vec<Arc<AtomicBool>>,
}
impl CancelToken {
pub fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
ancestors: Vec::new(),
}
}
/// Shares the *same* node — not a child. Use this to hand a copy
/// of this token to another thread without splitting the subtree.
pub fn clone_handle(&self) -> Self {
// TODO: hint — share `flag` and `ancestors` via Arc::clone
todo!()
}
/// Create a child token. Cancelling *this* child does not cancel
/// the parent. Cancelling the parent cancels this child.
pub fn child(&self) -> Self {
// TODO: new flag for the child; ancestors = self.ancestors ++ [self.flag]
todo!()
}
pub fn cancel(&self) {
self.flag.store(true, Ordering::SeqCst);
}
pub fn is_cancelled(&self) -> bool {
// TODO: true iff self.flag OR any ancestor flag is set
todo!()
}
}
Day 3 — Build a TaskSet
Join-on-drop
Drill 2: own every thread you spawn
The TaskSet owns every JoinHandle it creates. Dropping the set without calling shutdown is a programming error (we signal it with a debug_assert); the correct lifecycle is spawn × N, shutdown, drop. Shutdown flags every task via a CancelToken, then joins each handle.
TaskSet with graceful shutdown
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
// Re-declare the pieces of CancelToken we need. (In the capstone you'll
// import from the drill above; here we duplicate so this drill compiles
// standalone.)
use std::sync::atomic::AtomicBool;
#[derive(Clone)]
pub struct Cancel {
flag: Arc<AtomicBool>,
}
impl Cancel {
pub fn new() -> Self { Self { flag: Arc::new(AtomicBool::new(false)) } }
pub fn cancel(&self) { self.flag.store(true, Ordering::SeqCst); }
pub fn is_cancelled(&self) -> bool { self.flag.load(Ordering::SeqCst) }
}
pub struct TaskSet {
handles: Vec<JoinHandle<()>>,
cancel: Cancel,
/// Incremented every time a spawned task returns. Lets tests assert
/// that all tasks actually ran (and didn't panic silently).
completed: Arc<AtomicUsize>,
}
impl TaskSet {
pub fn new(cancel: Cancel) -> Self {
Self {
handles: Vec::new(),
cancel,
completed: Arc::new(AtomicUsize::new(0)),
}
}
/// Spawn a worker. The closure receives the shared Cancel handle so
/// it can observe shutdown.
pub fn spawn<F>(&mut self, f: F)
where
F: FnOnce(Cancel) + Send + 'static,
{
// TODO: wrap f in a closure that runs it, then increments
// `completed`. Push the handle onto self.handles.
let _ = f;
todo!()
}
pub fn completed_count(&self) -> usize {
self.completed.load(Ordering::SeqCst)
}
/// Cancel + join every task. Returns the time the drain took.
pub fn shutdown(self) -> Duration {
// TODO:
// 1. Record a start Instant
// 2. self.cancel.cancel()
// 3. For each handle, handle.join()
// 4. Return elapsed
todo!()
}
}
Day 4 — Testing: proving zero leaks
The zero-leaks test pattern
Every spawned task is accounted for
Patterns you want: track alive count via Arc<AtomicUsize>, assert count == 0 after shutdown. Any non-zero count is a leak.
What would count as a 'leak' in your exchange? Give three examples — one at the task level, one at the socket level, one at the state level.
Day 5 — Drain patterns
Graceful vs forced
Deadlines and escalation
Convention: the cancellation branch comes first in every select! arm so it wins any tie. The matching-engine loop in Week 7 depends on this.
Write the escalation policy: soft cancel → hard cancel → abort. What are reasonable timeouts at each step for a trading system?
Day 6 — Reflection
Connecting forward
How this week shapes weeks 4 and 7
The matching engine in Week 7 will be a single task owning the order book. How does the structured-concurrency pattern you built this week shape its shutdown story?
Capstone — minimal ConnectionManager
Accept N worker connections; cancel-broadcast drains in <50ms with zero leaks
TargetAll three requirements machine-verified; tracing notes recorded.
// Re-declare the primitives the capstone harness will drive. Keep these
// signatures — the harness calls them exactly like this.
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct Cancel { flag: Arc<AtomicBool> }
impl Cancel {
pub fn new() -> Self { Self { flag: Arc::new(AtomicBool::new(false)) } }
pub fn cancel(&self) { self.flag.store(true, Ordering::SeqCst); }
pub fn is_cancelled(&self) -> bool { self.flag.load(Ordering::SeqCst) }
}
pub struct ConnectionManager {
handles: Vec<JoinHandle<()>>,
cancel: Cancel,
alive: Arc<AtomicUsize>,
work_counter: Arc<AtomicUsize>,
}
impl ConnectionManager {
pub fn new() -> Self {
Self {
handles: Vec::new(),
cancel: Cancel::new(),
alive: Arc::new(AtomicUsize::new(0)),
work_counter: Arc::new(AtomicUsize::new(0)),
}
}
pub fn alive(&self) -> usize { self.alive.load(Ordering::SeqCst) }
pub fn work_done(&self) -> usize { self.work_counter.load(Ordering::SeqCst) }
/// Spawn `n` worker "connections". Each worker loops, does one unit
/// of work per iteration, and checks the cancel flag every time.
pub fn spawn_workers(&mut self, n: usize) {
// TODO:
// - For each worker: clone cancel, alive, work_counter
// - alive.fetch_add(1)
// - Spawn a thread that loops:
// while !cancel.is_cancelled() {
// work_counter.fetch_add(1)
// thread::sleep(Duration::from_micros(50))
// }
// alive.fetch_sub(1)
// - Push handle into self.handles
let _ = n;
todo!()
}
/// Broadcast shutdown. Returns the drain time. After this returns,
/// all worker threads MUST have joined.
pub fn shutdown(self) -> Duration {
// TODO:
// - Record start
// - self.cancel.cancel()
// - Join every handle
// - Return elapsed
todo!()
}
}
Spawned workers increment the work counter before shutdown.
Broadcast cancel observed by every worker; all threads joined within the deadline.
After shutdown() returns, alive() == 0. No zombie threads.
Your Journal has 2-3 bullet points on what you'd log at each state transition (start, first-cancel-observed, joined). Click when done.
Feeds into
- Week 4 (Axum server) uses the ConnectionManager pattern to track open HTTP connections.
- Week 8 (event log) applies the drain-before-stop idiom to flush pending writes.
- Week 12 integrates cancellation into the end-to-end pipeline.