feat(vdisplay): mode-conflict admission — separate/join/steal/reject (Stage 4)
The mode_conflict policy is now enforced at ADMISSION, before the punktfunk/1 Welcome, when a DIFFERENT client connects while another client's session is live: - separate (default, unconfigured → no change): each client its own display. - join: admit at the live display's mode (honest-downgrade — the Welcome carries it). - steal: signal the victim session(s)' stop flags, wait the release grace, serve. - reject: refuse the handshake with a busy reason (live mode + client label). New vdisplay/admission.rs: the pure decide() (unit-tested — same-client never conflicts, anonymous clients each distinct, join targets the oldest session) + a live-session registry (identity + mode + stop flag) sessions register in once up. Wired into punktfunk1 serve_session: admit() before validate_dimensions, register after the data plane binds. A same-client reconnect never conflicts. Validated on loopback (two probes, distinct identities, differing modes) across all four policies: separate→own mode, join→live mode, steal→victim interrupted, reject→handshake refused. Remaining Stage-4 surface (deferred): GameStream 503 path, Windows-specific defaults (separate→join map, silent-reconfigure→steal), reject reason delivered to the client as a typed message (currently host-side log + connection close). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -652,7 +652,7 @@ async fn serve_session(
|
||||
let source = opts.source;
|
||||
let frames = opts.frames;
|
||||
let handshake = async {
|
||||
let hello = Hello::decode(&first).map_err(|e| anyhow!("Hello decode: {e:?}"))?;
|
||||
let mut hello = Hello::decode(&first).map_err(|e| anyhow!("Hello decode: {e:?}"))?;
|
||||
anyhow::ensure!(
|
||||
hello.abi_version == punktfunk_core::WIRE_VERSION,
|
||||
"wire version mismatch: client {} host {}",
|
||||
@@ -684,6 +684,45 @@ async fn serve_session(
|
||||
"video codec negotiated"
|
||||
);
|
||||
|
||||
// Mode-conflict ADMISSION (Stage 4): a DIFFERENT client connecting while another client's
|
||||
// session is live is resolved by the `mode_conflict` policy BEFORE the Welcome — `separate`
|
||||
// (default, no change), `join` (serve at the live mode — an honest downgrade the client
|
||||
// renders from the Welcome), `steal` (preempt the victim), or `reject` (refuse the handshake).
|
||||
// A same-client reconnect never conflicts. THIS session registers in the live set once its
|
||||
// data plane is up (below the handshake), so a later client can see + steal it.
|
||||
{
|
||||
use crate::vdisplay::admission::{admit, Admission};
|
||||
match admit(endpoint::peer_fingerprint(&conn)) {
|
||||
Admission::Separate => {}
|
||||
Admission::Join(m) => {
|
||||
tracing::info!(
|
||||
requested =
|
||||
%format_args!("{}x{}@{}", hello.mode.width, hello.mode.height, hello.mode.refresh_hz),
|
||||
live = %format_args!("{}x{}@{}", m.0, m.1, m.2),
|
||||
"mode-conflict: JOIN — admitting at the live display's mode"
|
||||
);
|
||||
hello.mode.width = m.0;
|
||||
hello.mode.height = m.1;
|
||||
hello.mode.refresh_hz = m.2;
|
||||
}
|
||||
Admission::Steal(victims) => {
|
||||
tracing::info!(
|
||||
victims = victims.len(),
|
||||
"mode-conflict: STEAL — preempting the live session(s)"
|
||||
);
|
||||
for v in &victims {
|
||||
v.store(true, Ordering::SeqCst);
|
||||
}
|
||||
// Give the victims the release grace to tear their display down before we acquire.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||
}
|
||||
Admission::Reject(reason) => {
|
||||
tracing::warn!("mode-conflict: REJECT — {reason}");
|
||||
anyhow::bail!("{reason}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crate::encode::validate_dimensions(codec, hello.mode.width, hello.mode.height)
|
||||
.context("client-requested mode")?;
|
||||
|
||||
@@ -1055,6 +1094,22 @@ async fn serve_session(
|
||||
});
|
||||
}
|
||||
|
||||
// Register this now-live session for mode-conflict admission (Stage 4): carry its identity, the
|
||||
// negotiated mode, and its stop flag so a LATER connecting client's admission can see it and
|
||||
// (under `steal`) signal it. The guard removes the entry when this session ends.
|
||||
let _live_guard = {
|
||||
let id = endpoint::peer_fingerprint(&conn);
|
||||
let label = id
|
||||
.map(|fp| fp.iter().take(4).map(|b| format!("{b:02x}")).collect::<String>())
|
||||
.unwrap_or_else(|| "client".to_string());
|
||||
crate::vdisplay::admission::register(
|
||||
id,
|
||||
(welcome.mode.width, welcome.mode.height, welcome.mode.refresh_hz),
|
||||
stop.clone(),
|
||||
label,
|
||||
)
|
||||
};
|
||||
|
||||
// Audio plane (virtual source only — synthetic runs are protocol tests): desktop Opus
|
||||
// → host→client QUIC datagrams, on its own native thread. Best-effort on every failure
|
||||
// (no PipeWire audio, spawn error): the session continues without audio — and a spawn
|
||||
|
||||
@@ -785,6 +785,10 @@ mod gamescope;
|
||||
#[allow(dead_code)]
|
||||
#[path = "vdisplay/identity.rs"]
|
||||
pub(crate) mod identity;
|
||||
// Platform-neutral mode-conflict admission (Stage 4): the separate/join/steal/reject decision + the
|
||||
// live-session registry, wired into the punktfunk/1 handshake.
|
||||
#[path = "vdisplay/admission.rs"]
|
||||
pub(crate) mod admission;
|
||||
#[cfg(target_os = "linux")]
|
||||
#[path = "vdisplay/linux/kwin.rs"]
|
||||
mod kwin;
|
||||
|
||||
@@ -0,0 +1,224 @@
|
||||
//! Mode-conflict **admission** (design: `design/display-management.md` §5.3, Stage 4). When a
|
||||
//! *different* client connects while another client's session is already live, the `mode_conflict`
|
||||
//! policy decides what happens — BEFORE the Welcome / RTSP launch, so the client gets an honest answer
|
||||
//! instead of a mid-build failure:
|
||||
//!
|
||||
//! * `separate` — proceed on a fresh display at the requested mode (today's Linux multi-view / the
|
||||
//! default; no behavior change unconfigured).
|
||||
//! * `join` — admit at the live display's mode (honest-downgrade: the Welcome carries the real mode).
|
||||
//! * `steal` — signal the victim session(s)' stop flag(s), wait the release grace, then serve.
|
||||
//! * `reject` — refuse with a typed handshake error naming the live mode + client.
|
||||
//!
|
||||
//! A **live-session registry** ([`register`]) lets the decision see the current sessions (identity +
|
||||
//! mode + stop flag); each session registers once admitted and drops its [`LiveGuard`] on end. The
|
||||
//! decision itself ([`decide`]) is pure over a session slice, so it is unit-tested exhaustively.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
|
||||
use crate::vdisplay::policy::{self, ModeConflict};
|
||||
|
||||
/// A currently-live session, as admission sees it.
|
||||
#[derive(Clone)]
|
||||
pub struct LiveSession {
|
||||
id: u64,
|
||||
/// The owning client's cert fingerprint (`None` = anonymous / no client cert presented).
|
||||
pub identity: Option<[u8; 32]>,
|
||||
pub mode: (u32, u32, u32),
|
||||
/// The session's stop flag — signaled to preempt it on `steal`.
|
||||
pub stop: Arc<AtomicBool>,
|
||||
/// Short client label for `reject` messages.
|
||||
pub label: String,
|
||||
}
|
||||
|
||||
/// The admission outcome for a connecting session.
|
||||
#[derive(Debug)]
|
||||
pub enum Admission {
|
||||
/// No conflict / `separate`: proceed on a fresh display at the requested mode.
|
||||
Separate,
|
||||
/// `join`: admit at this (live) mode — share the existing display (honest-downgrade).
|
||||
Join((u32, u32, u32)),
|
||||
/// `steal`: signal these victim stop flags, wait the release grace, then proceed at the requested mode.
|
||||
Steal(Vec<Arc<AtomicBool>>),
|
||||
/// `reject`: refuse with this reason (host-busy + live mode + client label).
|
||||
Reject(String),
|
||||
}
|
||||
|
||||
fn table() -> &'static Mutex<Vec<LiveSession>> {
|
||||
static T: OnceLock<Mutex<Vec<LiveSession>>> = OnceLock::new();
|
||||
T.get_or_init(|| Mutex::new(Vec::new()))
|
||||
}
|
||||
|
||||
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
/// Two identities are the same client iff both are present and equal. Anonymous (`None`) never
|
||||
/// matches — we can't prove it's the same client, so two anonymous clients are treated as distinct
|
||||
/// (each conflicts), which is the safe side for `steal`/`reject`.
|
||||
fn same_client(a: Option<[u8; 32]>, b: Option<[u8; 32]>) -> bool {
|
||||
matches!((a, b), (Some(x), Some(y)) if x == y)
|
||||
}
|
||||
|
||||
/// The mode-conflict decision, pure over the live-session slice (so it's unit-testable). A conflict is
|
||||
/// a live session owned by a DIFFERENT client — a same-client reconnect adopts / reconfigures its own
|
||||
/// display and never conflicts (so it always resolves to `Separate` here and preempts downstream).
|
||||
pub fn decide(
|
||||
conflict: ModeConflict,
|
||||
req_identity: Option<[u8; 32]>,
|
||||
live: &[LiveSession],
|
||||
) -> Admission {
|
||||
let others: Vec<&LiveSession> = live
|
||||
.iter()
|
||||
.filter(|s| !same_client(s.identity, req_identity))
|
||||
.collect();
|
||||
if others.is_empty() {
|
||||
return Admission::Separate; // no other client is live → no conflict
|
||||
}
|
||||
match conflict {
|
||||
ModeConflict::Separate => Admission::Separate,
|
||||
// Join at the OLDEST other session's mode (the established "primary" the desktop is built on).
|
||||
ModeConflict::Join => Admission::Join(others[0].mode),
|
||||
ModeConflict::Steal => {
|
||||
Admission::Steal(others.iter().map(|s| Arc::clone(&s.stop)).collect())
|
||||
}
|
||||
ModeConflict::Reject => {
|
||||
let v = others[0];
|
||||
Admission::Reject(format!(
|
||||
"host busy: streaming {}x{}@{} to {}",
|
||||
v.mode.0, v.mode.1, v.mode.2, v.label
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve the effective decision for a connecting session: read the console `mode_conflict` policy
|
||||
/// (default `Separate` when unconfigured — no behavior change) and [`decide`] against the live set.
|
||||
pub fn admit(req_identity: Option<[u8; 32]>) -> Admission {
|
||||
let conflict = policy::prefs()
|
||||
.configured_effective()
|
||||
.map(|e| e.mode_conflict)
|
||||
.unwrap_or(ModeConflict::Separate);
|
||||
decide(conflict, req_identity, &table().lock().unwrap())
|
||||
}
|
||||
|
||||
/// Register a now-admitted, live session; the returned guard removes it on drop (session end). Call
|
||||
/// AFTER [`admit`] (so a session never conflicts with itself) and once the mode + stop flag are known.
|
||||
pub fn register(
|
||||
identity: Option<[u8; 32]>,
|
||||
mode: (u32, u32, u32),
|
||||
stop: Arc<AtomicBool>,
|
||||
label: String,
|
||||
) -> LiveGuard {
|
||||
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
|
||||
table().lock().unwrap().push(LiveSession {
|
||||
id,
|
||||
identity,
|
||||
mode,
|
||||
stop,
|
||||
label,
|
||||
});
|
||||
LiveGuard { id }
|
||||
}
|
||||
|
||||
/// RAII handle: removes its live-session entry from the registry on drop (session end).
|
||||
pub struct LiveGuard {
|
||||
id: u64,
|
||||
}
|
||||
|
||||
impl Drop for LiveGuard {
|
||||
fn drop(&mut self) {
|
||||
table().lock().unwrap().retain(|s| s.id != self.id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn sess(identity: Option<u8>, mode: (u32, u32, u32)) -> LiveSession {
|
||||
LiveSession {
|
||||
id: 0,
|
||||
identity: identity.map(|n| {
|
||||
let mut f = [0u8; 32];
|
||||
f[0] = n;
|
||||
f
|
||||
}),
|
||||
mode,
|
||||
stop: Arc::new(AtomicBool::new(false)),
|
||||
label: "peer".into(),
|
||||
}
|
||||
}
|
||||
fn fp(n: u8) -> Option<[u8; 32]> {
|
||||
let mut f = [0u8; 32];
|
||||
f[0] = n;
|
||||
Some(f)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_live_session_is_always_separate() {
|
||||
for c in [
|
||||
ModeConflict::Separate,
|
||||
ModeConflict::Join,
|
||||
ModeConflict::Steal,
|
||||
ModeConflict::Reject,
|
||||
] {
|
||||
assert!(matches!(decide(c, fp(1), &[]), Admission::Separate));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn same_client_never_conflicts() {
|
||||
let live = [sess(Some(1), (2560, 1440, 60))];
|
||||
// Even under reject/steal, the SAME client (fp 1) reconnecting is not a conflict.
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Reject, fp(1), &live),
|
||||
Admission::Separate
|
||||
));
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Steal, fp(1), &live),
|
||||
Admission::Separate
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn different_client_applies_policy() {
|
||||
let live = [sess(Some(1), (2560, 1440, 60))];
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Separate, fp(2), &live),
|
||||
Admission::Separate
|
||||
));
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Join, fp(2), &live),
|
||||
Admission::Join((2560, 1440, 60))
|
||||
));
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Steal, fp(2), &live),
|
||||
Admission::Steal(v) if v.len() == 1
|
||||
));
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Reject, fp(2), &live),
|
||||
Admission::Reject(r) if r.contains("2560x1440@60")
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn two_anonymous_clients_conflict() {
|
||||
// Anonymous (None) can't be proven same-client, so a second anon client DOES conflict.
|
||||
let live = [sess(None, (1920, 1080, 60))];
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Reject, None, &live),
|
||||
Admission::Reject(_)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_targets_the_oldest_other_session() {
|
||||
let live = [
|
||||
sess(Some(1), (3840, 2160, 60)), // oldest
|
||||
sess(Some(2), (1280, 720, 120)),
|
||||
];
|
||||
assert!(matches!(
|
||||
decide(ModeConflict::Join, fp(3), &live),
|
||||
Admission::Join((3840, 2160, 60))
|
||||
));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user