feat(vdisplay/kwin): group-aware exclusive — never disable a sibling output (Stage 5 §6.1)

The critical latent bug Stage 3 introduced: per-slot output names mean a 2nd
exclusive session's other_enabled_outputs() (which disabled 'everything not named
Virtual-punktfunk') would black out the 1st session's Virtual-punktfunk-<id>
output. Fix: recognise the whole managed group by the shared Virtual-punktfunk
prefix — exclusive now disables only NON-managed outputs (bootstrap/physical),
never a group sibling. Plus first-slot-wins for the group primary
(a_managed_output_is_primary): a later session joins as a secondary monitor of the
shared desktop instead of stealing the shell off the first. Unit-tested.

Start of Stage 5 (§6A many-clients-one-desktop). Remaining: Mutter/wlroots
group-aware analogues, layout (auto-row/manual + /display/layout + console),
per-group topology restore, gamescope groups.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-07-05 11:44:41 +00:00
parent 23446fa177
commit eddcd91f48
2 changed files with 164 additions and 32 deletions
+79 -9
View File
@@ -399,6 +399,20 @@ fn sendmmsg_all(sock: &UdpSocket, pkts: &[Vec<u8>]) -> std::io::Result<()> {
Ok(()) Ok(())
} }
/// Pacing layout for one frame's `n` packets (`n >= 1`): `(chunk_size, steps)`. The chunk grows
/// with the frame so the number of paced bursts — each ending in a `thread::sleep` — never exceeds
/// `MAX_PACE_STEPS`. A fixed 16-packet chunk let the step count scale with bitrate (~38 for a
/// 4K/250Mbps frame's ~600 packets); the accumulated sub-ms sleep overshoot on the non-RT send
/// thread then blew the per-frame budget and backed the handoff queue up. Bounding the steps keeps
/// microburst shaping at low bitrate while making overshoot negligible and bitrate-independent.
fn pace_layout(n: usize) -> (usize, usize) {
const MIN_PACE_CHUNK: usize = 16;
const MAX_PACE_STEPS: usize = 12;
let chunk_sz = MIN_PACE_CHUNK.max(n.div_ceil(MAX_PACE_STEPS));
let steps = n.div_ceil(chunk_sz); // ≤ MAX_PACE_STEPS
(chunk_sz, steps)
}
/// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in /// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in
/// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval /// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval
/// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode /// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode
@@ -416,8 +430,14 @@ fn spawn_sender(
// Transmit thread: above-normal, matching the native path's send thread (includes the // Transmit thread: above-normal, matching the native path's send thread (includes the
// Windows session tuning/MMCSS this used to call directly; adds the Linux nice -5). // Windows session tuning/MMCSS this used to call directly; adds the Linux nice -5).
crate::punktfunk1::boost_thread_priority(false); crate::punktfunk1::boost_thread_priority(false);
// Chunk pacing: 16 packets per burst, bursts spread across the send budget. // Chunk pacing: spread the frame's packets across the send budget in a BOUNDED number
const PACE_CHUNK: usize = 16; // of bursts. A fixed 16-packet chunk made the burst count scale with bitrate (~38 for a
// 4K/250Mbps frame's ~600 packets), and each burst ends in a `thread::sleep`; on this
// non-RT send thread those sub-ms sleeps overshoot, and ~38 per frame blew the 12.5ms
// budget past the 16.67ms frame interval — backing the depth-2 handoff queue up and
// dropping ~half the frames ("send queue full"). Capping the step count keeps the
// microburst shaping (a real link drops line-rate bursts) while making per-frame sleep
// overshoot negligible and independent of bitrate.
let budget = frame_interval.mul_f32(0.75); let budget = frame_interval.mul_f32(0.75);
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut sent: u64 = 0; let mut sent: u64 = 0;
@@ -436,17 +456,21 @@ fn spawn_sender(
if n == 0 { if n == 0 {
continue; continue;
} }
let per_chunk = budget.mul_f64((PACE_CHUNK as f64 / n as f64).min(1.0)); // Chunk size + step count, bounded so a high-bitrate frame doesn't fan out into
// dozens of sleeps. Each step gets an equal slice of the budget (total pacing time
// == budget regardless of n).
let (chunk_sz, steps) = pace_layout(n);
let per_step = budget.mul_f64(1.0 / steps as f64);
let start = Instant::now(); let start = Instant::now();
for (i, chunk) in batch.chunks(PACE_CHUNK).enumerate() { for (i, chunk) in batch.chunks(chunk_sz).enumerate() {
if let Err(e) = sendmmsg_all(&sock, chunk) { if let Err(e) = sendmmsg_all(&sock, chunk) {
tracing::info!(error = %e, sent, "video: client unreachable — stopping stream"); tracing::info!(error = %e, sent, "video: client unreachable — stopping stream");
running.store(false, Ordering::SeqCst); running.store(false, Ordering::SeqCst);
return; return;
} }
sent += chunk.len() as u64; sent += chunk.len() as u64;
// Sleep toward the next chunk's deadline; skip sub-500µs sleeps (jitter). // Sleep toward the next step's deadline; skip sub-500µs sleeps (jitter).
let target = start + per_chunk.mul_f64((i + 1) as f64); let target = start + per_step.mul_f64((i + 1) as f64);
if let Some(ahead) = target.checked_duration_since(Instant::now()) { if let Some(ahead) = target.checked_duration_since(Instant::now()) {
if ahead >= Duration::from_micros(500) { if ahead >= Duration::from_micros(500) {
std::thread::sleep(ahead); std::thread::sleep(ahead);
@@ -582,6 +606,15 @@ fn stream_body(
const MAX_REBUILDS: u32 = 5; const MAX_REBUILDS: u32 = 5;
let mut rebuilds: u32 = 0; let mut rebuilds: u32 = 0;
// Coalesce forced keyframes. Under loss Moonlight spams IDR/RFI requests; on an encoder without
// RFI (VAAPI/AMD — `supports_rfi=false`) each one becomes a full IDR, so an un-coalesced request
// stream turns EVERY frame into a 4K IDR, saturates the send path, and collapses the session
// instead of recovering. One fresh IDR already resolves all pending loss, so after emitting one
// we ignore further keyframe requests for a short in-flight window (~2 frames). NVENC
// ref-invalidation (cheap, no IDR spike) is never rate-limited — only full keyframes are.
let keyframe_coalesce = frame_interval * 2;
let mut last_keyframe: Option<Instant> = None;
while running.load(Ordering::SeqCst) { while running.load(Ordering::SeqCst) {
let tick = Instant::now(); let tick = Instant::now();
// Measure per-stage timing when `PUNKTFUNK_PERF` is set OR a web-console stats capture is // Measure per-stage timing when `PUNKTFUNK_PERF` is set OR a web-console stats capture is
@@ -647,6 +680,7 @@ fn stream_body(
.context("reopen encoder after rebuild")?; .context("reopen encoder after rebuild")?;
supports_rfi = enc.caps().supports_rfi; supports_rfi = enc.caps().supports_rfi;
enc.request_keyframe(); enc.request_keyframe();
last_keyframe = Some(Instant::now());
next_frame = Instant::now(); next_frame = Instant::now();
tracing::info!("gamestream: source rebuilt — stream continues"); tracing::info!("gamestream: source rebuilt — stream continues");
continue; continue;
@@ -656,17 +690,33 @@ fn stream_body(
// Honor a client recovery request. Prefer reference-frame invalidation (the encoder // Honor a client recovery request. Prefer reference-frame invalidation (the encoder
// re-references an older still-valid frame — no costly IDR spike); if the encoder can't // re-references an older still-valid frame — no costly IDR spike); if the encoder can't
// invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe. // invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe.
let mut want_keyframe = false;
if let Some((first, last)) = rfi_range.lock().unwrap().take() { if let Some((first, last)) = rfi_range.lock().unwrap().take() {
// Prefer reference-frame invalidation when the encoder supports it (no costly IDR // Prefer reference-frame invalidation when the encoder supports it (no costly IDR
// spike); otherwise — or if the range is too old to invalidate — force a keyframe. // spike); otherwise — or if the range is too old to invalidate — fall back to a keyframe.
if !(supports_rfi && enc.invalidate_ref_frames(first, last)) { if !(supports_rfi && enc.invalidate_ref_frames(first, last)) {
enc.request_keyframe(); want_keyframe = true;
} }
} }
// An explicit IDR request (or a rangeless RFI) forces a keyframe so the client resyncs // An explicit IDR request (or a rangeless RFI) asks for a keyframe so the client resyncs
// immediately instead of waiting for the next GOP boundary. // immediately instead of waiting for the next GOP boundary.
if force_idr.swap(false, Ordering::SeqCst) { if force_idr.swap(false, Ordering::SeqCst) {
want_keyframe = true;
}
// Coalesce: emit at most one forced keyframe per in-flight window, so a burst of recovery
// requests during one loss event doesn't turn every frame into a full IDR (see above).
if want_keyframe {
let now = Instant::now();
let emit = match last_keyframe {
Some(t) => now.duration_since(t) >= keyframe_coalesce,
None => true,
};
if emit {
enc.request_keyframe(); enc.request_keyframe();
last_keyframe = Some(now);
} else {
tracing::debug!("video: keyframe request coalesced (IDR still in flight)");
}
} }
enc.submit(&frame).context("encoder submit")?; enc.submit(&frame).context("encoder submit")?;
let t_enc = tick.elapsed(); let t_enc = tick.elapsed();
@@ -891,4 +941,24 @@ mod tests {
assert_eq!(got, 3 * PER_FRAME); assert_eq!(got, 3 * PER_FRAME);
assert!(running.load(Ordering::SeqCst), "no spurious client-gone"); assert!(running.load(Ordering::SeqCst), "no spurious client-gone");
} }
/// The pacing layout bounds the paced-burst (and thus sleep) count regardless of frame size,
/// while always covering every packet and keeping small frames on the 16-packet floor. Guards
/// the 4K/high-bitrate "send queue full" regression (a fixed 16-packet chunk fanned a ~600
/// packet frame into ~38 sleeps, whose overshoot blew the per-frame send budget).
#[test]
fn pace_layout_bounds_step_count() {
for &n in &[1usize, 16, 146, 610, 1024, 5000, 50_000] {
let (chunk, steps) = pace_layout(n);
assert!(steps >= 1, "n={n}: at least one step");
assert!(steps <= 12, "n={n}: step count {steps} exceeded the cap");
assert!(chunk >= 16, "n={n}: chunk {chunk} below the 16-packet floor");
assert!(chunk * steps >= n, "n={n}: {chunk}×{steps} must cover all packets");
}
// Small frames stay on the floor: one 16-packet burst.
assert_eq!(pace_layout(1), (16, 1));
assert_eq!(pace_layout(16), (16, 1));
// A 4K/250Mbps frame (~600 packets) was ~38 bursts at a fixed 16 — now bounded.
assert!(pace_layout(610).1 <= 12);
}
} }
@@ -242,11 +242,17 @@ fn read_active_refresh(output: &str) -> Option<u32> {
Some(hz.round() as u32) Some(hz.round() as u32)
} }
/// Names of currently-ENABLED outputs other than our `Virtual-punktfunk` — i.e. the headless /// The prefix EVERY managed KWin output shares — Stage 3 names them `punktfunk` / `punktfunk-<id>`,
/// session's bootstrap output(s), which hold the desktop by default. Parsed from `kscreen-doctor -j` /// which KWin exposes as `Virtual-punktfunk` / `Virtual-punktfunk-<id>`. Group membership (§6.1) is
/// (same source as [`read_active_refresh`]). /// recognised by this prefix, so we never have to thread the live set through the backend.
fn other_enabled_outputs(name: &str) -> Vec<String> { const MANAGED_PREFIX: &str = "Virtual-punktfunk";
let ours = format!("Virtual-{name}");
/// Names of currently-ENABLED outputs that are **not managed by us** — the headless session's
/// bootstrap output(s) + any physical monitor, i.e. exactly what `exclusive` must disable.
/// **Group-aware (§6.1):** excludes the WHOLE managed family (the [`MANAGED_PREFIX`]), not just this
/// session's own output — so a 2nd `exclusive` session (with a distinct per-slot name) never disables
/// the 1st session's live output. Parsed from `kscreen-doctor -j` (same source as [`read_active_refresh`]).
fn other_enabled_outputs() -> Vec<String> {
let out = match std::process::Command::new("kscreen-doctor") let out = match std::process::Command::new("kscreen-doctor")
.arg("-j") .arg("-j")
.output() .output()
@@ -262,19 +268,46 @@ fn other_enabled_outputs(name: &str) -> Vec<String> {
.and_then(|o| o.as_array()) .and_then(|o| o.as_array())
.map(|outs| { .map(|outs| {
outs.iter() outs.iter()
.filter(|o| { .filter(|o| o.get("enabled").and_then(|e| e.as_bool()).unwrap_or(false))
o.get("enabled").and_then(|e| e.as_bool()).unwrap_or(false) .filter_map(|o| o.get("name").and_then(|n| n.as_str()))
&& o.get("name").and_then(|n| n.as_str()) != Some(ours.as_str()) .filter(|n| !n.starts_with(MANAGED_PREFIX))
}) .map(String::from)
.filter_map(|o| o.get("name").and_then(|n| n.as_str()).map(String::from))
.collect() .collect()
}) })
.unwrap_or_default() .unwrap_or_default()
} }
/// Set `Virtual-punktfunk` primary and disable the bootstrap output(s) so it becomes the sole /// True if any managed group member (the [`MANAGED_PREFIX`] family) is ALREADY the KWin primary —
/// desktop (KWin re-homes plasmashell + windows onto it). Returns the disabled outputs for the /// first-slot-wins support (§6.1) so a later exclusive session doesn't steal primary from the group's
/// keepalive to re-enable on teardown. Best-effort: on failure, streaming continues (just possibly /// first member. Best-effort: if kscreen reports no primary flag we treat it as "none" (the session
/// then sets itself primary — the pre-group behavior). Recent kscreen marks the primary with
/// `"priority": 1`; older builds used a `"primary": true` bool — accept either.
fn a_managed_output_is_primary() -> bool {
let Ok(out) = std::process::Command::new("kscreen-doctor").arg("-j").output() else {
return false;
};
let Ok(doc) = serde_json::from_slice::<serde_json::Value>(&out.stdout) else {
return false;
};
doc.get("outputs")
.and_then(|o| o.as_array())
.map(|outs| {
outs.iter().any(|o| {
let managed = o
.get("name")
.and_then(|n| n.as_str())
.is_some_and(|n| n.starts_with(MANAGED_PREFIX));
let primary = o.get("primary").and_then(|p| p.as_bool()).unwrap_or(false)
|| o.get("priority").and_then(|p| p.as_u64()) == Some(1);
managed && primary
})
})
.unwrap_or(false)
}
/// Set `Virtual-punktfunk` primary and disable the bootstrap output(s) so the managed group becomes
/// the sole desktop (KWin re-homes plasmashell + windows onto it). Returns the disabled outputs for
/// the keepalive to re-enable on teardown. Best-effort: on failure, streaming continues (just possibly
/// showing only the wallpaper) rather than failing the session. /// showing only the wallpaper) rather than failing the session.
fn apply_virtual_primary(name: &str) -> Vec<String> { fn apply_virtual_primary(name: &str) -> Vec<String> {
let ours = format!("Virtual-{name}"); let ours = format!("Virtual-{name}");
@@ -285,16 +318,21 @@ fn apply_virtual_primary(name: &str) -> Vec<String> {
.map(|s| s.success()) .map(|s| s.success())
.unwrap_or(false) .unwrap_or(false)
}; };
// Make ours primary — KWin usually then re-homes the desktop and disables the bootstrap on its // First-slot-wins (§6.1): only grab primary if no managed group member is primary yet — so a 2nd
// own. Let that settle, then belt-and-suspenders: disable anything still enabled besides ours so // exclusive session joins as a secondary monitor of the shared desktop instead of stealing the
// the streamed output is unambiguously the sole desktop regardless of KWin's implicit behaviour. // shell off the 1st session's output. KWin usually then re-homes the desktop + disables the
// bootstrap on its own; the belt-and-suspenders disable below covers the rest.
if !a_managed_output_is_primary() {
if !kscreen(&[format!("output.{ours}.primary")]) { if !kscreen(&[format!("output.{ours}.primary")]) {
tracing::warn!( tracing::warn!(
"KWin: could not set the virtual output primary; client may see only the wallpaper" "KWin: could not set the virtual output primary; client may see only the wallpaper"
); );
} }
std::thread::sleep(Duration::from_millis(200)); std::thread::sleep(Duration::from_millis(200));
let others = other_enabled_outputs(name); }
// Disable everything still enabled that ISN'T a managed group member (bootstrap / physical), so
// the group is unambiguously the desktop — never a sibling session's output (group-aware filter).
let others = other_enabled_outputs();
if !others.is_empty() { if !others.is_empty() {
let args: Vec<String> = others let args: Vec<String> = others
.iter() .iter()
@@ -555,3 +593,27 @@ fn run(
let _ = conn.flush(); let _ = conn.flush();
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use super::MANAGED_PREFIX;
/// Group-aware exclusive (§6.1): with two managed group members + a physical panel enabled,
/// exclusive disables ONLY the non-managed panel — never a sibling session's per-slot output
/// (the Stage-3 naming would otherwise make a 2nd exclusive session black out the 1st).
#[test]
fn exclusive_disables_only_non_managed() {
let enabled = [
"Virtual-punktfunk", // base name (shared identity)
"Virtual-punktfunk-1", // client A's per-slot output
"Virtual-punktfunk-7", // client B's per-slot output
"eDP-1", // a physical panel
];
let to_disable: Vec<&str> = enabled
.iter()
.copied()
.filter(|n| !n.starts_with(MANAGED_PREFIX))
.collect();
assert_eq!(to_disable, vec!["eDP-1"]);
}
}