feat(host/m3): mid-stream session-switch watcher (Feature B, opt-in)
ci / web (push) Successful in 28s
ci / rust (push) Failing after 45s
ci / docs-site (push) Successful in 30s
apple / swift (push) Successful in 1m16s
ci / bench (push) Successful in 1m36s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 4s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 3s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 5m58s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 4m18s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m32s

Feature B: while streaming, follow a Gaming<->Desktop switch on the box without
a reconnect. A ~1s watcher thread (session_watcher_loop) self-baselines on the
live ActiveKind and, when it changes and stays changed for a 3s debounce (the
old/new compositors coexist briefly during a switch), sends a SessionSwitch to
the encode loop. The loop's new rebuild slot — taking precedence over a queued
mode change — retargets the process env (apply_session_env/apply_input_env) and
rebuilds the WHOLE backend in place at the SAME client mode (vdisplay::open +
build_pipeline_with_retry), reusing the proven mode-switch rebuild path: the
Session + send thread (QUIC control + UDP data plane + side planes) stay up, the
client sees a brief freeze then an IDR. Old pipeline kept on a rebuild failure
(transient vs permanent classified via is_permanent_build_error). Input
re-routes via the host-lifetime injector's lazy reopen against the new
PUNKTFUNK_INPUT_BACKEND.

Opt-in via PUNKTFUNK_SESSION_WATCH (off by default; never under an explicit
PUNKTFUNK_COMPOSITOR pin), so it lands inert and is promoted to default only
after live validation on a real Bazzite Gaming<->KDE flip. The watcher snapshots
the SessionEnv so only the encode thread writes process env.

Compiles, clippy/fmt clean, 78 host tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-14 22:42:13 +00:00
parent c25706b355
commit 95a820b68a
+137
View File
@@ -1820,6 +1820,71 @@ fn send_loop(
} }
} }
/// A mid-stream session change the watcher detected (the box flipped Gaming↔Desktop): the new
/// backend + the [`crate::vdisplay::SessionEnv`] snapshot to retarget at it. The env is applied on
/// the encode thread (not the watcher), so the watcher never does a process-global env write.
struct SessionSwitch {
kind: crate::vdisplay::ActiveKind,
compositor: crate::vdisplay::Compositor,
env: crate::vdisplay::SessionEnv,
}
/// Poll the live graphical session ~1 s and, when its kind changes from what the stream opened with
/// (the user switched Gaming↔Desktop mid-stream) and stays changed for a debounce, send one
/// [`SessionSwitch`] so the encode loop rebuilds the backend in place. Self-baselines on the first
/// read (so no handshake plumbing). Opt-in via `PUNKTFUNK_SESSION_WATCH`; readiness of the new
/// backend is left to the encode thread's `build_pipeline_with_retry` (the watcher never writes
/// env). Exits when `stop` is set or the channel closes.
fn session_watcher_loop(tx: std::sync::mpsc::Sender<SessionSwitch>, stop: Arc<AtomicBool>) {
use crate::vdisplay;
const DEBOUNCE: std::time::Duration = std::time::Duration::from_secs(3);
// Baseline = what the stream is currently driving (matches the handshake's resolution).
let mut current = vdisplay::detect_active_session().kind;
let mut pending: Option<(vdisplay::ActiveKind, std::time::Instant)> = None;
while !stop.load(Ordering::SeqCst) {
std::thread::sleep(std::time::Duration::from_secs(1));
if stop.load(Ordering::SeqCst) {
break;
}
let active = vdisplay::detect_active_session();
let cur = active.kind;
if cur == current {
pending = None; // back to the current backend before debounce elapsed — no switch
continue;
}
match pending {
// Stable at the new kind for the debounce window — the switch is real, signal it.
Some((k, since)) if k == cur && since.elapsed() >= DEBOUNCE => {
match vdisplay::compositor_for_kind(cur) {
Some(comp) => {
tracing::info!(from = ?current, to = ?cur, compositor = comp.id(),
"session watcher: mid-stream switch — signaling backend rebuild");
if tx
.send(SessionSwitch {
kind: cur,
compositor: comp,
env: active.env,
})
.is_err()
{
break; // encode loop gone
}
current = cur; // new baseline; don't re-signal until it changes again
}
// Logout / no usable backend for the new session — keep streaming the old one.
None => tracing::debug!(to = ?cur,
"session watcher: no usable backend for the new session — staying put"),
}
pending = None;
}
// Still debouncing this kind.
Some((k, _)) if k == cur => {}
// A new (or different) change — start the debounce window.
_ => pending = Some((cur, std::time::Instant::now())),
}
}
}
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
/// stamped with the capture wall clock (the client derives per-frame pipeline latency). /// stamped with the capture wall clock (the client derives per-frame pipeline latency).
/// ///
@@ -1884,10 +1949,82 @@ fn virtual_stream(
}) })
.context("spawn send thread")?; .context("spawn send thread")?;
// Mid-stream session-switch watcher (opt-in via PUNKTFUNK_SESSION_WATCH; never under an explicit
// PUNKTFUNK_COMPOSITOR pin). It self-baselines and signals the loop below to swap the backend in
// place when the box flips Gaming↔Desktop. When not spawned, session_rx just stays empty.
let mut compositor = compositor;
let (session_tx, session_rx) = std::sync::mpsc::channel::<SessionSwitch>();
let watch = std::env::var_os("PUNKTFUNK_SESSION_WATCH").is_some()
&& std::env::var_os("PUNKTFUNK_COMPOSITOR").is_none();
let _watcher = if watch {
let stop = stop.clone();
std::thread::Builder::new()
.name("punktfunk-m3-watcher".into())
.spawn(move || session_watcher_loop(session_tx, stop))
.ok()
} else {
None
};
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
let mut next = std::time::Instant::now(); let mut next = std::time::Instant::now();
let mut sent: u64 = 0; let mut sent: u64 = 0;
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
// Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in
// place — a different compositor at the SAME client mode — keeping the Session + send thread
// (and thus the QUIC control + UDP data plane) up. Takes precedence over a queued mode change.
let mut switch = None;
while let Ok(s) = session_rx.try_recv() {
switch = Some(s); // coalesce to the newest
}
if let Some(sw) = switch {
if sw.compositor != compositor {
tracing::info!(from = compositor.id(), to = sw.compositor.id(), kind = ?sw.kind,
"session switch — rebuilding backend in place");
// Retarget the process env at the new session BEFORE opening the new backend (this
// thread is the only env writer; the watcher only snapshots).
crate::vdisplay::apply_session_env(&crate::vdisplay::ActiveSession {
kind: sw.kind,
env: sw.env,
});
crate::vdisplay::apply_input_env(sw.compositor);
// Build the new backend's pipeline BEFORE dropping the old one (retry absorbs the
// brief compositor-coexistence race during a switch); on failure keep the old.
let rebuilt =
(|| -> Result<(Box<dyn crate::vdisplay::VirtualDisplay>, Pipeline)> {
let mut new_vd = crate::vdisplay::open(sw.compositor)?;
let pipe = build_pipeline_with_retry(&mut new_vd, mode, bitrate_kbps)?;
Ok((new_vd, pipe))
})();
match rebuilt {
Ok((new_vd, (new_cap, new_enc, new_frame, new_interval))) => {
// Replace the pipeline first (drops the old capturer → old PipeWire stream +
// virtual output), then the factory (drops e.g. the old KWin connection).
capturer = new_cap;
enc = new_enc;
frame = new_frame;
interval = new_interval;
vd = new_vd;
compositor = sw.compositor;
next = std::time::Instant::now();
tracing::info!(
compositor = compositor.id(),
"session switch — backend rebuilt, stream continues"
);
}
Err(e) => {
let chain = format!("{e:#}");
let kind = if is_permanent_build_error(&chain) {
"permanent"
} else {
"transient"
};
tracing::error!(error = %chain, kind,
"session-switch rebuild failed — staying on the current backend");
}
}
}
}
// Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once, // Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once,
// not once per stale intermediate mode. // not once per stale intermediate mode.
let mut want = None; let mut want = None;