diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 74c6510..6875c6f 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1820,6 +1820,71 @@ fn send_loop( } } +/// A mid-stream session change the watcher detected (the box flipped Gaming↔Desktop): the new +/// backend + the [`crate::vdisplay::SessionEnv`] snapshot to retarget at it. The env is applied on +/// the encode thread (not the watcher), so the watcher never does a process-global env write. +struct SessionSwitch { + kind: crate::vdisplay::ActiveKind, + compositor: crate::vdisplay::Compositor, + env: crate::vdisplay::SessionEnv, +} + +/// Poll the live graphical session ~1 s and, when its kind changes from what the stream opened with +/// (the user switched Gaming↔Desktop mid-stream) and stays changed for a debounce, send one +/// [`SessionSwitch`] so the encode loop rebuilds the backend in place. Self-baselines on the first +/// read (so no handshake plumbing). Opt-in via `PUNKTFUNK_SESSION_WATCH`; readiness of the new +/// backend is left to the encode thread's `build_pipeline_with_retry` (the watcher never writes +/// env). Exits when `stop` is set or the channel closes. +fn session_watcher_loop(tx: std::sync::mpsc::Sender, stop: Arc) { + use crate::vdisplay; + const DEBOUNCE: std::time::Duration = std::time::Duration::from_secs(3); + // Baseline = what the stream is currently driving (matches the handshake's resolution). + let mut current = vdisplay::detect_active_session().kind; + let mut pending: Option<(vdisplay::ActiveKind, std::time::Instant)> = None; + while !stop.load(Ordering::SeqCst) { + std::thread::sleep(std::time::Duration::from_secs(1)); + if stop.load(Ordering::SeqCst) { + break; + } + let active = vdisplay::detect_active_session(); + let cur = active.kind; + if cur == current { + pending = None; // back to the current backend before debounce elapsed — no switch + continue; + } + match pending { + // Stable at the new kind for the debounce window — the switch is real, signal it. + Some((k, since)) if k == cur && since.elapsed() >= DEBOUNCE => { + match vdisplay::compositor_for_kind(cur) { + Some(comp) => { + tracing::info!(from = ?current, to = ?cur, compositor = comp.id(), + "session watcher: mid-stream switch — signaling backend rebuild"); + if tx + .send(SessionSwitch { + kind: cur, + compositor: comp, + env: active.env, + }) + .is_err() + { + break; // encode loop gone + } + current = cur; // new baseline; don't re-signal until it changes again + } + // Logout / no usable backend for the new session — keep streaming the old one. + None => tracing::debug!(to = ?cur, + "session watcher: no usable backend for the new session — staying put"), + } + pending = None; + } + // Still debouncing this kind. + Some((k, _)) if k == cur => {} + // A new (or different) change — start the debounce window. + _ => pending = Some((cur, std::time::Instant::now())), + } + } +} + /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs /// stamped with the capture wall clock (the client derives per-frame pipeline latency). /// @@ -1884,10 +1949,82 @@ fn virtual_stream( }) .context("spawn send thread")?; + // Mid-stream session-switch watcher (opt-in via PUNKTFUNK_SESSION_WATCH; never under an explicit + // PUNKTFUNK_COMPOSITOR pin). It self-baselines and signals the loop below to swap the backend in + // place when the box flips Gaming↔Desktop. When not spawned, session_rx just stays empty. + let mut compositor = compositor; + let (session_tx, session_rx) = std::sync::mpsc::channel::(); + let watch = std::env::var_os("PUNKTFUNK_SESSION_WATCH").is_some() + && std::env::var_os("PUNKTFUNK_COMPOSITOR").is_none(); + let _watcher = if watch { + let stop = stop.clone(); + std::thread::Builder::new() + .name("punktfunk-m3-watcher".into()) + .spawn(move || session_watcher_loop(session_tx, stop)) + .ok() + } else { + None + }; + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let mut next = std::time::Instant::now(); let mut sent: u64 = 0; while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { + // Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in + // place — a different compositor at the SAME client mode — keeping the Session + send thread + // (and thus the QUIC control + UDP data plane) up. Takes precedence over a queued mode change. + let mut switch = None; + while let Ok(s) = session_rx.try_recv() { + switch = Some(s); // coalesce to the newest + } + if let Some(sw) = switch { + if sw.compositor != compositor { + tracing::info!(from = compositor.id(), to = sw.compositor.id(), kind = ?sw.kind, + "session switch — rebuilding backend in place"); + // Retarget the process env at the new session BEFORE opening the new backend (this + // thread is the only env writer; the watcher only snapshots). + crate::vdisplay::apply_session_env(&crate::vdisplay::ActiveSession { + kind: sw.kind, + env: sw.env, + }); + crate::vdisplay::apply_input_env(sw.compositor); + // Build the new backend's pipeline BEFORE dropping the old one (retry absorbs the + // brief compositor-coexistence race during a switch); on failure keep the old. + let rebuilt = + (|| -> Result<(Box, Pipeline)> { + let mut new_vd = crate::vdisplay::open(sw.compositor)?; + let pipe = build_pipeline_with_retry(&mut new_vd, mode, bitrate_kbps)?; + Ok((new_vd, pipe)) + })(); + match rebuilt { + Ok((new_vd, (new_cap, new_enc, new_frame, new_interval))) => { + // Replace the pipeline first (drops the old capturer → old PipeWire stream + + // virtual output), then the factory (drops e.g. the old KWin connection). + capturer = new_cap; + enc = new_enc; + frame = new_frame; + interval = new_interval; + vd = new_vd; + compositor = sw.compositor; + next = std::time::Instant::now(); + tracing::info!( + compositor = compositor.id(), + "session switch — backend rebuilt, stream continues" + ); + } + Err(e) => { + let chain = format!("{e:#}"); + let kind = if is_permanent_build_error(&chain) { + "permanent" + } else { + "transient" + }; + tracing::error!(error = %chain, kind, + "session-switch rebuild failed — staying on the current backend"); + } + } + } + } // Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once, // not once per stale intermediate mode. let mut want = None;