From 42b1158ea7f5520a7f3a40b8401390c43608d5ce Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sun, 5 Jul 2026 10:21:28 +0000 Subject: [PATCH] =?UTF-8?q?feat(vdisplay):=20mode-conflict=20admission=20?= =?UTF-8?q?=E2=80=94=20separate/join/steal/reject=20(Stage=204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mode_conflict policy is now enforced at ADMISSION, before the punktfunk/1 Welcome, when a DIFFERENT client connects while another client's session is live: - separate (default, unconfigured → no change): each client its own display. - join: admit at the live display's mode (honest-downgrade — the Welcome carries it). - steal: signal the victim session(s)' stop flags, wait the release grace, serve. - reject: refuse the handshake with a busy reason (live mode + client label). New vdisplay/admission.rs: the pure decide() (unit-tested — same-client never conflicts, anonymous clients each distinct, join targets the oldest session) + a live-session registry (identity + mode + stop flag) sessions register in once up. Wired into punktfunk1 serve_session: admit() before validate_dimensions, register after the data plane binds. A same-client reconnect never conflicts. Validated on loopback (two probes, distinct identities, differing modes) across all four policies: separate→own mode, join→live mode, steal→victim interrupted, reject→handshake refused. Remaining Stage-4 surface (deferred): GameStream 503 path, Windows-specific defaults (separate→join map, silent-reconfigure→steal), reject reason delivered to the client as a typed message (currently host-side log + connection close). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-host/src/punktfunk1.rs | 57 ++++- crates/punktfunk-host/src/vdisplay.rs | 4 + .../punktfunk-host/src/vdisplay/admission.rs | 224 ++++++++++++++++++ 3 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 crates/punktfunk-host/src/vdisplay/admission.rs diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index ef9f1ff..368fb4d 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -652,7 +652,7 @@ async fn serve_session( let source = opts.source; let frames = opts.frames; let handshake = async { - let hello = Hello::decode(&first).map_err(|e| anyhow!("Hello decode: {e:?}"))?; + let mut hello = Hello::decode(&first).map_err(|e| anyhow!("Hello decode: {e:?}"))?; anyhow::ensure!( hello.abi_version == punktfunk_core::WIRE_VERSION, "wire version mismatch: client {} host {}", @@ -684,6 +684,45 @@ async fn serve_session( "video codec negotiated" ); + // Mode-conflict ADMISSION (Stage 4): a DIFFERENT client connecting while another client's + // session is live is resolved by the `mode_conflict` policy BEFORE the Welcome — `separate` + // (default, no change), `join` (serve at the live mode — an honest downgrade the client + // renders from the Welcome), `steal` (preempt the victim), or `reject` (refuse the handshake). + // A same-client reconnect never conflicts. THIS session registers in the live set once its + // data plane is up (below the handshake), so a later client can see + steal it. + { + use crate::vdisplay::admission::{admit, Admission}; + match admit(endpoint::peer_fingerprint(&conn)) { + Admission::Separate => {} + Admission::Join(m) => { + tracing::info!( + requested = + %format_args!("{}x{}@{}", hello.mode.width, hello.mode.height, hello.mode.refresh_hz), + live = %format_args!("{}x{}@{}", m.0, m.1, m.2), + "mode-conflict: JOIN — admitting at the live display's mode" + ); + hello.mode.width = m.0; + hello.mode.height = m.1; + hello.mode.refresh_hz = m.2; + } + Admission::Steal(victims) => { + tracing::info!( + victims = victims.len(), + "mode-conflict: STEAL — preempting the live session(s)" + ); + for v in &victims { + v.store(true, Ordering::SeqCst); + } + // Give the victims the release grace to tear their display down before we acquire. + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + } + Admission::Reject(reason) => { + tracing::warn!("mode-conflict: REJECT — {reason}"); + anyhow::bail!("{reason}"); + } + } + } + crate::encode::validate_dimensions(codec, hello.mode.width, hello.mode.height) .context("client-requested mode")?; @@ -1055,6 +1094,22 @@ async fn serve_session( }); } + // Register this now-live session for mode-conflict admission (Stage 4): carry its identity, the + // negotiated mode, and its stop flag so a LATER connecting client's admission can see it and + // (under `steal`) signal it. The guard removes the entry when this session ends. + let _live_guard = { + let id = endpoint::peer_fingerprint(&conn); + let label = id + .map(|fp| fp.iter().take(4).map(|b| format!("{b:02x}")).collect::()) + .unwrap_or_else(|| "client".to_string()); + crate::vdisplay::admission::register( + id, + (welcome.mode.width, welcome.mode.height, welcome.mode.refresh_hz), + stop.clone(), + label, + ) + }; + // Audio plane (virtual source only — synthetic runs are protocol tests): desktop Opus // → host→client QUIC datagrams, on its own native thread. Best-effort on every failure // (no PipeWire audio, spawn error): the session continues without audio — and a spawn diff --git a/crates/punktfunk-host/src/vdisplay.rs b/crates/punktfunk-host/src/vdisplay.rs index c34fb2d..d427f25 100644 --- a/crates/punktfunk-host/src/vdisplay.rs +++ b/crates/punktfunk-host/src/vdisplay.rs @@ -785,6 +785,10 @@ mod gamescope; #[allow(dead_code)] #[path = "vdisplay/identity.rs"] pub(crate) mod identity; +// Platform-neutral mode-conflict admission (Stage 4): the separate/join/steal/reject decision + the +// live-session registry, wired into the punktfunk/1 handshake. +#[path = "vdisplay/admission.rs"] +pub(crate) mod admission; #[cfg(target_os = "linux")] #[path = "vdisplay/linux/kwin.rs"] mod kwin; diff --git a/crates/punktfunk-host/src/vdisplay/admission.rs b/crates/punktfunk-host/src/vdisplay/admission.rs new file mode 100644 index 0000000..ff9fc6a --- /dev/null +++ b/crates/punktfunk-host/src/vdisplay/admission.rs @@ -0,0 +1,224 @@ +//! Mode-conflict **admission** (design: `design/display-management.md` §5.3, Stage 4). When a +//! *different* client connects while another client's session is already live, the `mode_conflict` +//! policy decides what happens — BEFORE the Welcome / RTSP launch, so the client gets an honest answer +//! instead of a mid-build failure: +//! +//! * `separate` — proceed on a fresh display at the requested mode (today's Linux multi-view / the +//! default; no behavior change unconfigured). +//! * `join` — admit at the live display's mode (honest-downgrade: the Welcome carries the real mode). +//! * `steal` — signal the victim session(s)' stop flag(s), wait the release grace, then serve. +//! * `reject` — refuse with a typed handshake error naming the live mode + client. +//! +//! A **live-session registry** ([`register`]) lets the decision see the current sessions (identity + +//! mode + stop flag); each session registers once admitted and drops its [`LiveGuard`] on end. The +//! decision itself ([`decide`]) is pure over a session slice, so it is unit-tested exhaustively. + +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use crate::vdisplay::policy::{self, ModeConflict}; + +/// A currently-live session, as admission sees it. +#[derive(Clone)] +pub struct LiveSession { + id: u64, + /// The owning client's cert fingerprint (`None` = anonymous / no client cert presented). + pub identity: Option<[u8; 32]>, + pub mode: (u32, u32, u32), + /// The session's stop flag — signaled to preempt it on `steal`. + pub stop: Arc, + /// Short client label for `reject` messages. + pub label: String, +} + +/// The admission outcome for a connecting session. +#[derive(Debug)] +pub enum Admission { + /// No conflict / `separate`: proceed on a fresh display at the requested mode. + Separate, + /// `join`: admit at this (live) mode — share the existing display (honest-downgrade). + Join((u32, u32, u32)), + /// `steal`: signal these victim stop flags, wait the release grace, then proceed at the requested mode. + Steal(Vec>), + /// `reject`: refuse with this reason (host-busy + live mode + client label). + Reject(String), +} + +fn table() -> &'static Mutex> { + static T: OnceLock>> = OnceLock::new(); + T.get_or_init(|| Mutex::new(Vec::new())) +} + +static NEXT_ID: AtomicU64 = AtomicU64::new(1); + +/// Two identities are the same client iff both are present and equal. Anonymous (`None`) never +/// matches — we can't prove it's the same client, so two anonymous clients are treated as distinct +/// (each conflicts), which is the safe side for `steal`/`reject`. +fn same_client(a: Option<[u8; 32]>, b: Option<[u8; 32]>) -> bool { + matches!((a, b), (Some(x), Some(y)) if x == y) +} + +/// The mode-conflict decision, pure over the live-session slice (so it's unit-testable). A conflict is +/// a live session owned by a DIFFERENT client — a same-client reconnect adopts / reconfigures its own +/// display and never conflicts (so it always resolves to `Separate` here and preempts downstream). +pub fn decide( + conflict: ModeConflict, + req_identity: Option<[u8; 32]>, + live: &[LiveSession], +) -> Admission { + let others: Vec<&LiveSession> = live + .iter() + .filter(|s| !same_client(s.identity, req_identity)) + .collect(); + if others.is_empty() { + return Admission::Separate; // no other client is live → no conflict + } + match conflict { + ModeConflict::Separate => Admission::Separate, + // Join at the OLDEST other session's mode (the established "primary" the desktop is built on). + ModeConflict::Join => Admission::Join(others[0].mode), + ModeConflict::Steal => { + Admission::Steal(others.iter().map(|s| Arc::clone(&s.stop)).collect()) + } + ModeConflict::Reject => { + let v = others[0]; + Admission::Reject(format!( + "host busy: streaming {}x{}@{} to {}", + v.mode.0, v.mode.1, v.mode.2, v.label + )) + } + } +} + +/// Resolve the effective decision for a connecting session: read the console `mode_conflict` policy +/// (default `Separate` when unconfigured — no behavior change) and [`decide`] against the live set. +pub fn admit(req_identity: Option<[u8; 32]>) -> Admission { + let conflict = policy::prefs() + .configured_effective() + .map(|e| e.mode_conflict) + .unwrap_or(ModeConflict::Separate); + decide(conflict, req_identity, &table().lock().unwrap()) +} + +/// Register a now-admitted, live session; the returned guard removes it on drop (session end). Call +/// AFTER [`admit`] (so a session never conflicts with itself) and once the mode + stop flag are known. +pub fn register( + identity: Option<[u8; 32]>, + mode: (u32, u32, u32), + stop: Arc, + label: String, +) -> LiveGuard { + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + table().lock().unwrap().push(LiveSession { + id, + identity, + mode, + stop, + label, + }); + LiveGuard { id } +} + +/// RAII handle: removes its live-session entry from the registry on drop (session end). +pub struct LiveGuard { + id: u64, +} + +impl Drop for LiveGuard { + fn drop(&mut self) { + table().lock().unwrap().retain(|s| s.id != self.id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sess(identity: Option, mode: (u32, u32, u32)) -> LiveSession { + LiveSession { + id: 0, + identity: identity.map(|n| { + let mut f = [0u8; 32]; + f[0] = n; + f + }), + mode, + stop: Arc::new(AtomicBool::new(false)), + label: "peer".into(), + } + } + fn fp(n: u8) -> Option<[u8; 32]> { + let mut f = [0u8; 32]; + f[0] = n; + Some(f) + } + + #[test] + fn no_live_session_is_always_separate() { + for c in [ + ModeConflict::Separate, + ModeConflict::Join, + ModeConflict::Steal, + ModeConflict::Reject, + ] { + assert!(matches!(decide(c, fp(1), &[]), Admission::Separate)); + } + } + + #[test] + fn same_client_never_conflicts() { + let live = [sess(Some(1), (2560, 1440, 60))]; + // Even under reject/steal, the SAME client (fp 1) reconnecting is not a conflict. + assert!(matches!( + decide(ModeConflict::Reject, fp(1), &live), + Admission::Separate + )); + assert!(matches!( + decide(ModeConflict::Steal, fp(1), &live), + Admission::Separate + )); + } + + #[test] + fn different_client_applies_policy() { + let live = [sess(Some(1), (2560, 1440, 60))]; + assert!(matches!( + decide(ModeConflict::Separate, fp(2), &live), + Admission::Separate + )); + assert!(matches!( + decide(ModeConflict::Join, fp(2), &live), + Admission::Join((2560, 1440, 60)) + )); + assert!(matches!( + decide(ModeConflict::Steal, fp(2), &live), + Admission::Steal(v) if v.len() == 1 + )); + assert!(matches!( + decide(ModeConflict::Reject, fp(2), &live), + Admission::Reject(r) if r.contains("2560x1440@60") + )); + } + + #[test] + fn two_anonymous_clients_conflict() { + // Anonymous (None) can't be proven same-client, so a second anon client DOES conflict. + let live = [sess(None, (1920, 1080, 60))]; + assert!(matches!( + decide(ModeConflict::Reject, None, &live), + Admission::Reject(_) + )); + } + + #[test] + fn join_targets_the_oldest_other_session() { + let live = [ + sess(Some(1), (3840, 2160, 60)), // oldest + sess(Some(2), (1280, 720, 120)), + ]; + assert!(matches!( + decide(ModeConflict::Join, fp(3), &live), + Admission::Join((3840, 2160, 60)) + )); + } +}