Skip to Content
ftui-runtimeSubscriptions

Subscriptions

A subscription is a long-lived event source that produces messages into the update loop. Timers, file watchers, child processes, WebSocket streams — anything whose lifecycle is “start when I declare it, stop when I stop declaring it” — is a subscription.

The runtime manages lifecycles automatically: you declare what should be running, and the runtime reconciles the live set against that declaration.

Files:

  • Trait: crates/ftui-runtime/src/subscription.rs:33
  • Built-in Every: crates/ftui-runtime/src/subscription.rs:443
  • ProcessSubscription: crates/ftui-runtime/src/process_subscription.rs:69

The trait

crates/ftui-runtime/src/subscription.rs
pub type SubId = u64; pub trait Subscription<M: Send + 'static>: Send { /// Unique identifier for deduplication. /// Subscriptions with the same ID are considered identical. fn id(&self) -> SubId; /// Run the subscription on a background thread. /// Send messages through `sender`; exit when `stop` is triggered. fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal); }

Two invariants do most of the work:

  1. id is stable across updates. If a subscription returns the same SubId on two consecutive calls to Model::subscriptions, the runtime keeps it running. If the ID changes, the old one is stopped and the new one is started.
  2. run blocks until the stop signal fires or the channel disconnects. Implementations check stop.wait_timeout(…) or stop.is_stopped() at safe points.

StopSignal

crates/ftui-runtime/src/subscription.rs
#[derive(Clone)] pub struct StopSignal { /* wraps CancellationToken */ } impl StopSignal { pub fn is_stopped(&self) -> bool; pub fn wait_timeout(&self, duration: Duration) -> bool; // true if stopped pub fn cancellation_token(&self) -> &CancellationToken; }

StopSignal is backed by a CancellationToken (see crates/ftui-runtime/src/cancellation.rs). The Structured and Asupersync runtime lanes propagate the same token to child tasks so cancellation is cooperative all the way down.

wait_timeout is the preferred polling primitive — it sleeps on a condition variable, which is cheaper than busy-waiting and also responds immediately to a cancel.

Declarative reconciliation

Each time Model::update returns, the runtime calls Model::subscriptions, diffs the result by SubId against the previous cycle, and:

Tracing for this cycle lives on ftui.runtime:

  • subscription.start — a new subscription entered the active set.
  • subscription.stop — an old subscription left and was joined.
  • subscription.stop_all — the runtime is shutting down.

Counters are available via ftui_runtime::effect_system:

use ftui_runtime::effect_system::*; subscription_starts_total(); subscription_stops_total(); subscription_panics_total(); reconcile_count();

Panics inside run are caught and counted but do not crash the runtime; the subscription is removed and Model::on_error is called with a synthetic error string.

Built-in: Every

A fixed-interval ticker. Message factory is Fn() -> M, so you can capture per-cycle state.

src/subscription.rs
pub struct Every<M: Send + 'static> { /* ... */ } impl<M: Send + 'static> Every<M> { pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self; pub fn with_id(id: SubId, interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self; }

The default id is derived from the interval nanoseconds (XORed with "TICK" as a magic) so two Every instances with the same interval dedupe. Use with_id when you need distinct identical intervals.

examples/clock.rs
use ftui_runtime::subscription::Every; use std::time::Duration; fn subscriptions(&self) -> Vec<Box<dyn Subscription<Msg>>> { vec![ Box::new(Every::new(Duration::from_secs(1), || Msg::Tick)), Box::new(Every::with_id(0xBEEF, Duration::from_secs(1), || Msg::HeartBeat)), ] }

Built-in: ProcessSubscription

Spawns a child process, captures its stdout/stderr line-by-line, and emits ProcessEvent messages.

src/process_subscription.rs
#[derive(Debug, Clone, PartialEq, Eq)] pub enum ProcessEvent { Stdout(String), Stderr(String), Exited(i32), Signaled(i32), Killed, Error(String), } pub struct ProcessSubscription<M: Send + 'static> { /* ... */ } impl<M> ProcessSubscription<M> { pub fn new( program: impl Into<String>, make_msg: impl Fn(ProcessEvent) -> M + Send + Sync + 'static, ) -> Self; pub fn arg(self, arg: impl Into<String>) -> Self; pub fn args(self, args: impl IntoIterator<Item = impl Into<String>>) -> Self; pub fn env(self, key: impl Into<String>, value: impl Into<String>) -> Self; pub fn timeout(self, duration: Duration) -> Self; pub fn with_id(self, id: SubId) -> Self; }

The SubId is hashed from program + args + env + timeout, so re-declaring the same command is a true no-op and changing any field causes the old process to be killed and a new one to start.

examples/tail_log.rs
use ftui_runtime::process_subscription::{ProcessSubscription, ProcessEvent}; use std::time::Duration; enum Msg { Log(ProcessEvent), /* … */ } fn subscriptions(&self) -> Vec<Box<dyn Subscription<Msg>>> { vec![Box::new( ProcessSubscription::new("tail", Msg::Log) .arg("-f") .arg("/var/log/syslog") .timeout(Duration::from_secs(60)), )] }

When stop fires the child is sent SIGKILL (Unix) and reader threads are joined with a bounded timeout; detached threads are logged rather than leaked.

Writing a custom subscription

examples/file_watcher.rs
use ftui_runtime::subscription::{Subscription, SubId, StopSignal}; use std::sync::mpsc; use std::time::{Duration, Instant}; struct Poll<M> { id: SubId, interval: Duration, make_msg: Box<dyn Fn() -> M + Send + Sync>, } impl<M: Send + 'static> Subscription<M> for Poll<M> { fn id(&self) -> SubId { self.id } fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) { let deadline = Instant::now() + Duration::from_secs(30); loop { if stop.wait_timeout(self.interval) { break; } if Instant::now() >= deadline { break; } if sender.send((self.make_msg)()).is_err() { break; } } } }

Three conventions:

  1. Check stop.wait_timeout instead of thread::sleep.
  2. Exit when sender.send returns Err — the receiver has been dropped, which means the runtime no longer wants your messages.
  3. Don’t panic. If you must propagate a fatal condition, send an error-shaped message and then return.

Pitfalls

Changing a subscription parameter changes its ID. If you want to keep the same instance running while tuning (for example a debounce interval), either pin the ID with with_id or keep the parameter constant during a session.

Channel-closed is the fast exit path, not a hang. Your run loop must observe sender.send(..).is_err() and return. Otherwise, the subscription continues to consume CPU after the receiver was dropped — which happens during reconciliation and shutdown.

Cross-references