feat(host/m3): mid-stream session-switch watcher (Feature B, opt-in)
ci / web (push) Successful in 28s
ci / rust (push) Failing after 45s
ci / docs-site (push) Successful in 30s
apple / swift (push) Successful in 1m16s
ci / bench (push) Successful in 1m36s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 4s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 3s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 5m58s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 4m18s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m32s
ci / web (push) Successful in 28s
ci / rust (push) Failing after 45s
ci / docs-site (push) Successful in 30s
apple / swift (push) Successful in 1m16s
ci / bench (push) Successful in 1m36s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 4s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 3s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 5m58s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 4m18s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m32s
Feature B: while streaming, follow a Gaming<->Desktop switch on the box without a reconnect. A ~1s watcher thread (session_watcher_loop) self-baselines on the live ActiveKind and, when it changes and stays changed for a 3s debounce (the old/new compositors coexist briefly during a switch), sends a SessionSwitch to the encode loop. The loop's new rebuild slot — taking precedence over a queued mode change — retargets the process env (apply_session_env/apply_input_env) and rebuilds the WHOLE backend in place at the SAME client mode (vdisplay::open + build_pipeline_with_retry), reusing the proven mode-switch rebuild path: the Session + send thread (QUIC control + UDP data plane + side planes) stay up, the client sees a brief freeze then an IDR. Old pipeline kept on a rebuild failure (transient vs permanent classified via is_permanent_build_error). Input re-routes via the host-lifetime injector's lazy reopen against the new PUNKTFUNK_INPUT_BACKEND. Opt-in via PUNKTFUNK_SESSION_WATCH (off by default; never under an explicit PUNKTFUNK_COMPOSITOR pin), so it lands inert and is promoted to default only after live validation on a real Bazzite Gaming<->KDE flip. The watcher snapshots the SessionEnv so only the encode thread writes process env. Compiles, clippy/fmt clean, 78 host tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1820,6 +1820,71 @@ fn send_loop(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A mid-stream session change the watcher detected (the box flipped Gaming↔Desktop): the new
|
||||||
|
/// backend + the [`crate::vdisplay::SessionEnv`] snapshot to retarget at it. The env is applied on
|
||||||
|
/// the encode thread (not the watcher), so the watcher never does a process-global env write.
|
||||||
|
struct SessionSwitch {
|
||||||
|
kind: crate::vdisplay::ActiveKind,
|
||||||
|
compositor: crate::vdisplay::Compositor,
|
||||||
|
env: crate::vdisplay::SessionEnv,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Poll the live graphical session ~1 s and, when its kind changes from what the stream opened with
|
||||||
|
/// (the user switched Gaming↔Desktop mid-stream) and stays changed for a debounce, send one
|
||||||
|
/// [`SessionSwitch`] so the encode loop rebuilds the backend in place. Self-baselines on the first
|
||||||
|
/// read (so no handshake plumbing). Opt-in via `PUNKTFUNK_SESSION_WATCH`; readiness of the new
|
||||||
|
/// backend is left to the encode thread's `build_pipeline_with_retry` (the watcher never writes
|
||||||
|
/// env). Exits when `stop` is set or the channel closes.
|
||||||
|
fn session_watcher_loop(tx: std::sync::mpsc::Sender<SessionSwitch>, stop: Arc<AtomicBool>) {
|
||||||
|
use crate::vdisplay;
|
||||||
|
const DEBOUNCE: std::time::Duration = std::time::Duration::from_secs(3);
|
||||||
|
// Baseline = what the stream is currently driving (matches the handshake's resolution).
|
||||||
|
let mut current = vdisplay::detect_active_session().kind;
|
||||||
|
let mut pending: Option<(vdisplay::ActiveKind, std::time::Instant)> = None;
|
||||||
|
while !stop.load(Ordering::SeqCst) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
if stop.load(Ordering::SeqCst) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let active = vdisplay::detect_active_session();
|
||||||
|
let cur = active.kind;
|
||||||
|
if cur == current {
|
||||||
|
pending = None; // back to the current backend before debounce elapsed — no switch
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match pending {
|
||||||
|
// Stable at the new kind for the debounce window — the switch is real, signal it.
|
||||||
|
Some((k, since)) if k == cur && since.elapsed() >= DEBOUNCE => {
|
||||||
|
match vdisplay::compositor_for_kind(cur) {
|
||||||
|
Some(comp) => {
|
||||||
|
tracing::info!(from = ?current, to = ?cur, compositor = comp.id(),
|
||||||
|
"session watcher: mid-stream switch — signaling backend rebuild");
|
||||||
|
if tx
|
||||||
|
.send(SessionSwitch {
|
||||||
|
kind: cur,
|
||||||
|
compositor: comp,
|
||||||
|
env: active.env,
|
||||||
|
})
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break; // encode loop gone
|
||||||
|
}
|
||||||
|
current = cur; // new baseline; don't re-signal until it changes again
|
||||||
|
}
|
||||||
|
// Logout / no usable backend for the new session — keep streaming the old one.
|
||||||
|
None => tracing::debug!(to = ?cur,
|
||||||
|
"session watcher: no usable backend for the new session — staying put"),
|
||||||
|
}
|
||||||
|
pending = None;
|
||||||
|
}
|
||||||
|
// Still debouncing this kind.
|
||||||
|
Some((k, _)) if k == cur => {}
|
||||||
|
// A new (or different) change — start the debounce window.
|
||||||
|
_ => pending = Some((cur, std::time::Instant::now())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
||||||
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
|
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
|
||||||
///
|
///
|
||||||
@@ -1884,10 +1949,82 @@ fn virtual_stream(
|
|||||||
})
|
})
|
||||||
.context("spawn send thread")?;
|
.context("spawn send thread")?;
|
||||||
|
|
||||||
|
// Mid-stream session-switch watcher (opt-in via PUNKTFUNK_SESSION_WATCH; never under an explicit
|
||||||
|
// PUNKTFUNK_COMPOSITOR pin). It self-baselines and signals the loop below to swap the backend in
|
||||||
|
// place when the box flips Gaming↔Desktop. When not spawned, session_rx just stays empty.
|
||||||
|
let mut compositor = compositor;
|
||||||
|
let (session_tx, session_rx) = std::sync::mpsc::channel::<SessionSwitch>();
|
||||||
|
let watch = std::env::var_os("PUNKTFUNK_SESSION_WATCH").is_some()
|
||||||
|
&& std::env::var_os("PUNKTFUNK_COMPOSITOR").is_none();
|
||||||
|
let _watcher = if watch {
|
||||||
|
let stop = stop.clone();
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("punktfunk-m3-watcher".into())
|
||||||
|
.spawn(move || session_watcher_loop(session_tx, stop))
|
||||||
|
.ok()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||||
let mut next = std::time::Instant::now();
|
let mut next = std::time::Instant::now();
|
||||||
let mut sent: u64 = 0;
|
let mut sent: u64 = 0;
|
||||||
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||||
|
// Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in
|
||||||
|
// place — a different compositor at the SAME client mode — keeping the Session + send thread
|
||||||
|
// (and thus the QUIC control + UDP data plane) up. Takes precedence over a queued mode change.
|
||||||
|
let mut switch = None;
|
||||||
|
while let Ok(s) = session_rx.try_recv() {
|
||||||
|
switch = Some(s); // coalesce to the newest
|
||||||
|
}
|
||||||
|
if let Some(sw) = switch {
|
||||||
|
if sw.compositor != compositor {
|
||||||
|
tracing::info!(from = compositor.id(), to = sw.compositor.id(), kind = ?sw.kind,
|
||||||
|
"session switch — rebuilding backend in place");
|
||||||
|
// Retarget the process env at the new session BEFORE opening the new backend (this
|
||||||
|
// thread is the only env writer; the watcher only snapshots).
|
||||||
|
crate::vdisplay::apply_session_env(&crate::vdisplay::ActiveSession {
|
||||||
|
kind: sw.kind,
|
||||||
|
env: sw.env,
|
||||||
|
});
|
||||||
|
crate::vdisplay::apply_input_env(sw.compositor);
|
||||||
|
// Build the new backend's pipeline BEFORE dropping the old one (retry absorbs the
|
||||||
|
// brief compositor-coexistence race during a switch); on failure keep the old.
|
||||||
|
let rebuilt =
|
||||||
|
(|| -> Result<(Box<dyn crate::vdisplay::VirtualDisplay>, Pipeline)> {
|
||||||
|
let mut new_vd = crate::vdisplay::open(sw.compositor)?;
|
||||||
|
let pipe = build_pipeline_with_retry(&mut new_vd, mode, bitrate_kbps)?;
|
||||||
|
Ok((new_vd, pipe))
|
||||||
|
})();
|
||||||
|
match rebuilt {
|
||||||
|
Ok((new_vd, (new_cap, new_enc, new_frame, new_interval))) => {
|
||||||
|
// Replace the pipeline first (drops the old capturer → old PipeWire stream +
|
||||||
|
// virtual output), then the factory (drops e.g. the old KWin connection).
|
||||||
|
capturer = new_cap;
|
||||||
|
enc = new_enc;
|
||||||
|
frame = new_frame;
|
||||||
|
interval = new_interval;
|
||||||
|
vd = new_vd;
|
||||||
|
compositor = sw.compositor;
|
||||||
|
next = std::time::Instant::now();
|
||||||
|
tracing::info!(
|
||||||
|
compositor = compositor.id(),
|
||||||
|
"session switch — backend rebuilt, stream continues"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let chain = format!("{e:#}");
|
||||||
|
let kind = if is_permanent_build_error(&chain) {
|
||||||
|
"permanent"
|
||||||
|
} else {
|
||||||
|
"transient"
|
||||||
|
};
|
||||||
|
tracing::error!(error = %chain, kind,
|
||||||
|
"session-switch rebuild failed — staying on the current backend");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once,
|
// Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once,
|
||||||
// not once per stale intermediate mode.
|
// not once per stale intermediate mode.
|
||||||
let mut want = None;
|
let mut want = None;
|
||||||
|
|||||||
Reference in New Issue
Block a user