diff --git a/crates/punktfunk-host/src/gamestream/stream.rs b/crates/punktfunk-host/src/gamestream/stream.rs index 6e0ff79..04c4731 100644 --- a/crates/punktfunk-host/src/gamestream/stream.rs +++ b/crates/punktfunk-host/src/gamestream/stream.rs @@ -399,6 +399,20 @@ fn sendmmsg_all(sock: &UdpSocket, pkts: &[Vec]) -> std::io::Result<()> { Ok(()) } +/// Pacing layout for one frame's `n` packets (`n >= 1`): `(chunk_size, steps)`. The chunk grows +/// with the frame so the number of paced bursts — each ending in a `thread::sleep` — never exceeds +/// `MAX_PACE_STEPS`. A fixed 16-packet chunk let the step count scale with bitrate (~38 for a +/// 4K/250Mbps frame's ~600 packets); the accumulated sub-ms sleep overshoot on the non-RT send +/// thread then blew the per-frame budget and backed the handoff queue up. Bounding the steps keeps +/// microburst shaping at low bitrate while making overshoot negligible and bitrate-independent. +fn pace_layout(n: usize) -> (usize, usize) { + const MIN_PACE_CHUNK: usize = 16; + const MAX_PACE_STEPS: usize = 12; + let chunk_sz = MIN_PACE_CHUNK.max(n.div_ceil(MAX_PACE_STEPS)); + let steps = n.div_ceil(chunk_sz); // ≤ MAX_PACE_STEPS + (chunk_sz, steps) +} + /// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in /// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval /// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode @@ -416,8 +430,14 @@ fn spawn_sender( // Transmit thread: above-normal, matching the native path's send thread (includes the // Windows session tuning/MMCSS this used to call directly; adds the Linux nice -5). crate::punktfunk1::boost_thread_priority(false); - // Chunk pacing: 16 packets per burst, bursts spread across the send budget. - const PACE_CHUNK: usize = 16; + // Chunk pacing: spread the frame's packets across the send budget in a BOUNDED number + // of bursts. A fixed 16-packet chunk made the burst count scale with bitrate (~38 for a + // 4K/250Mbps frame's ~600 packets), and each burst ends in a `thread::sleep`; on this + // non-RT send thread those sub-ms sleeps overshoot, and ~38 per frame blew the 12.5ms + // budget past the 16.67ms frame interval — backing the depth-2 handoff queue up and + // dropping ~half the frames ("send queue full"). Capping the step count keeps the + // microburst shaping (a real link drops line-rate bursts) while making per-frame sleep + // overshoot negligible and independent of bitrate. let budget = frame_interval.mul_f32(0.75); let mut rng = rand::thread_rng(); let mut sent: u64 = 0; @@ -436,17 +456,21 @@ fn spawn_sender( if n == 0 { continue; } - let per_chunk = budget.mul_f64((PACE_CHUNK as f64 / n as f64).min(1.0)); + // Chunk size + step count, bounded so a high-bitrate frame doesn't fan out into + // dozens of sleeps. Each step gets an equal slice of the budget (total pacing time + // == budget regardless of n). + let (chunk_sz, steps) = pace_layout(n); + let per_step = budget.mul_f64(1.0 / steps as f64); let start = Instant::now(); - for (i, chunk) in batch.chunks(PACE_CHUNK).enumerate() { + for (i, chunk) in batch.chunks(chunk_sz).enumerate() { if let Err(e) = sendmmsg_all(&sock, chunk) { tracing::info!(error = %e, sent, "video: client unreachable — stopping stream"); running.store(false, Ordering::SeqCst); return; } sent += chunk.len() as u64; - // Sleep toward the next chunk's deadline; skip sub-500µs sleeps (jitter). - let target = start + per_chunk.mul_f64((i + 1) as f64); + // Sleep toward the next step's deadline; skip sub-500µs sleeps (jitter). + let target = start + per_step.mul_f64((i + 1) as f64); if let Some(ahead) = target.checked_duration_since(Instant::now()) { if ahead >= Duration::from_micros(500) { std::thread::sleep(ahead); @@ -582,6 +606,15 @@ fn stream_body( const MAX_REBUILDS: u32 = 5; let mut rebuilds: u32 = 0; + // Coalesce forced keyframes. Under loss Moonlight spams IDR/RFI requests; on an encoder without + // RFI (VAAPI/AMD — `supports_rfi=false`) each one becomes a full IDR, so an un-coalesced request + // stream turns EVERY frame into a 4K IDR, saturates the send path, and collapses the session + // instead of recovering. One fresh IDR already resolves all pending loss, so after emitting one + // we ignore further keyframe requests for a short in-flight window (~2 frames). NVENC + // ref-invalidation (cheap, no IDR spike) is never rate-limited — only full keyframes are. + let keyframe_coalesce = frame_interval * 2; + let mut last_keyframe: Option = None; + while running.load(Ordering::SeqCst) { let tick = Instant::now(); // Measure per-stage timing when `PUNKTFUNK_PERF` is set OR a web-console stats capture is @@ -647,6 +680,7 @@ fn stream_body( .context("reopen encoder after rebuild")?; supports_rfi = enc.caps().supports_rfi; enc.request_keyframe(); + last_keyframe = Some(Instant::now()); next_frame = Instant::now(); tracing::info!("gamestream: source rebuilt — stream continues"); continue; @@ -656,17 +690,33 @@ fn stream_body( // Honor a client recovery request. Prefer reference-frame invalidation (the encoder // re-references an older still-valid frame — no costly IDR spike); if the encoder can't // invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe. + let mut want_keyframe = false; if let Some((first, last)) = rfi_range.lock().unwrap().take() { // Prefer reference-frame invalidation when the encoder supports it (no costly IDR - // spike); otherwise — or if the range is too old to invalidate — force a keyframe. + // spike); otherwise — or if the range is too old to invalidate — fall back to a keyframe. if !(supports_rfi && enc.invalidate_ref_frames(first, last)) { - enc.request_keyframe(); + want_keyframe = true; } } - // An explicit IDR request (or a rangeless RFI) forces a keyframe so the client resyncs + // An explicit IDR request (or a rangeless RFI) asks for a keyframe so the client resyncs // immediately instead of waiting for the next GOP boundary. if force_idr.swap(false, Ordering::SeqCst) { - enc.request_keyframe(); + want_keyframe = true; + } + // Coalesce: emit at most one forced keyframe per in-flight window, so a burst of recovery + // requests during one loss event doesn't turn every frame into a full IDR (see above). + if want_keyframe { + let now = Instant::now(); + let emit = match last_keyframe { + Some(t) => now.duration_since(t) >= keyframe_coalesce, + None => true, + }; + if emit { + enc.request_keyframe(); + last_keyframe = Some(now); + } else { + tracing::debug!("video: keyframe request coalesced (IDR still in flight)"); + } } enc.submit(&frame).context("encoder submit")?; let t_enc = tick.elapsed(); @@ -891,4 +941,24 @@ mod tests { assert_eq!(got, 3 * PER_FRAME); assert!(running.load(Ordering::SeqCst), "no spurious client-gone"); } + + /// The pacing layout bounds the paced-burst (and thus sleep) count regardless of frame size, + /// while always covering every packet and keeping small frames on the 16-packet floor. Guards + /// the 4K/high-bitrate "send queue full" regression (a fixed 16-packet chunk fanned a ~600 + /// packet frame into ~38 sleeps, whose overshoot blew the per-frame send budget). + #[test] + fn pace_layout_bounds_step_count() { + for &n in &[1usize, 16, 146, 610, 1024, 5000, 50_000] { + let (chunk, steps) = pace_layout(n); + assert!(steps >= 1, "n={n}: at least one step"); + assert!(steps <= 12, "n={n}: step count {steps} exceeded the cap"); + assert!(chunk >= 16, "n={n}: chunk {chunk} below the 16-packet floor"); + assert!(chunk * steps >= n, "n={n}: {chunk}×{steps} must cover all packets"); + } + // Small frames stay on the floor: one 16-packet burst. + assert_eq!(pace_layout(1), (16, 1)); + assert_eq!(pace_layout(16), (16, 1)); + // A 4K/250Mbps frame (~600 packets) was ~38 bursts at a fixed 16 — now bounded. + assert!(pace_layout(610).1 <= 12); + } } diff --git a/crates/punktfunk-host/src/vdisplay/linux/kwin.rs b/crates/punktfunk-host/src/vdisplay/linux/kwin.rs index a9fb20a..f559b51 100644 --- a/crates/punktfunk-host/src/vdisplay/linux/kwin.rs +++ b/crates/punktfunk-host/src/vdisplay/linux/kwin.rs @@ -242,11 +242,17 @@ fn read_active_refresh(output: &str) -> Option { Some(hz.round() as u32) } -/// Names of currently-ENABLED outputs other than our `Virtual-punktfunk` — i.e. the headless -/// session's bootstrap output(s), which hold the desktop by default. Parsed from `kscreen-doctor -j` -/// (same source as [`read_active_refresh`]). -fn other_enabled_outputs(name: &str) -> Vec { - let ours = format!("Virtual-{name}"); +/// The prefix EVERY managed KWin output shares — Stage 3 names them `punktfunk` / `punktfunk-`, +/// which KWin exposes as `Virtual-punktfunk` / `Virtual-punktfunk-`. Group membership (§6.1) is +/// recognised by this prefix, so we never have to thread the live set through the backend. +const MANAGED_PREFIX: &str = "Virtual-punktfunk"; + +/// Names of currently-ENABLED outputs that are **not managed by us** — the headless session's +/// bootstrap output(s) + any physical monitor, i.e. exactly what `exclusive` must disable. +/// **Group-aware (§6.1):** excludes the WHOLE managed family (the [`MANAGED_PREFIX`]), not just this +/// session's own output — so a 2nd `exclusive` session (with a distinct per-slot name) never disables +/// the 1st session's live output. Parsed from `kscreen-doctor -j` (same source as [`read_active_refresh`]). +fn other_enabled_outputs() -> Vec { let out = match std::process::Command::new("kscreen-doctor") .arg("-j") .output() @@ -262,19 +268,46 @@ fn other_enabled_outputs(name: &str) -> Vec { .and_then(|o| o.as_array()) .map(|outs| { outs.iter() - .filter(|o| { - o.get("enabled").and_then(|e| e.as_bool()).unwrap_or(false) - && o.get("name").and_then(|n| n.as_str()) != Some(ours.as_str()) - }) - .filter_map(|o| o.get("name").and_then(|n| n.as_str()).map(String::from)) + .filter(|o| o.get("enabled").and_then(|e| e.as_bool()).unwrap_or(false)) + .filter_map(|o| o.get("name").and_then(|n| n.as_str())) + .filter(|n| !n.starts_with(MANAGED_PREFIX)) + .map(String::from) .collect() }) .unwrap_or_default() } -/// Set `Virtual-punktfunk` primary and disable the bootstrap output(s) so it becomes the sole -/// desktop (KWin re-homes plasmashell + windows onto it). Returns the disabled outputs for the -/// keepalive to re-enable on teardown. Best-effort: on failure, streaming continues (just possibly +/// True if any managed group member (the [`MANAGED_PREFIX`] family) is ALREADY the KWin primary — +/// first-slot-wins support (§6.1) so a later exclusive session doesn't steal primary from the group's +/// first member. Best-effort: if kscreen reports no primary flag we treat it as "none" (the session +/// then sets itself primary — the pre-group behavior). Recent kscreen marks the primary with +/// `"priority": 1`; older builds used a `"primary": true` bool — accept either. +fn a_managed_output_is_primary() -> bool { + let Ok(out) = std::process::Command::new("kscreen-doctor").arg("-j").output() else { + return false; + }; + let Ok(doc) = serde_json::from_slice::(&out.stdout) else { + return false; + }; + doc.get("outputs") + .and_then(|o| o.as_array()) + .map(|outs| { + outs.iter().any(|o| { + let managed = o + .get("name") + .and_then(|n| n.as_str()) + .is_some_and(|n| n.starts_with(MANAGED_PREFIX)); + let primary = o.get("primary").and_then(|p| p.as_bool()).unwrap_or(false) + || o.get("priority").and_then(|p| p.as_u64()) == Some(1); + managed && primary + }) + }) + .unwrap_or(false) +} + +/// Set `Virtual-punktfunk` primary and disable the bootstrap output(s) so the managed group becomes +/// the sole desktop (KWin re-homes plasmashell + windows onto it). Returns the disabled outputs for +/// the keepalive to re-enable on teardown. Best-effort: on failure, streaming continues (just possibly /// showing only the wallpaper) rather than failing the session. fn apply_virtual_primary(name: &str) -> Vec { let ours = format!("Virtual-{name}"); @@ -285,16 +318,21 @@ fn apply_virtual_primary(name: &str) -> Vec { .map(|s| s.success()) .unwrap_or(false) }; - // Make ours primary — KWin usually then re-homes the desktop and disables the bootstrap on its - // own. Let that settle, then belt-and-suspenders: disable anything still enabled besides ours so - // the streamed output is unambiguously the sole desktop regardless of KWin's implicit behaviour. - if !kscreen(&[format!("output.{ours}.primary")]) { - tracing::warn!( - "KWin: could not set the virtual output primary; client may see only the wallpaper" - ); + // First-slot-wins (§6.1): only grab primary if no managed group member is primary yet — so a 2nd + // exclusive session joins as a secondary monitor of the shared desktop instead of stealing the + // shell off the 1st session's output. KWin usually then re-homes the desktop + disables the + // bootstrap on its own; the belt-and-suspenders disable below covers the rest. + if !a_managed_output_is_primary() { + if !kscreen(&[format!("output.{ours}.primary")]) { + tracing::warn!( + "KWin: could not set the virtual output primary; client may see only the wallpaper" + ); + } + std::thread::sleep(Duration::from_millis(200)); } - std::thread::sleep(Duration::from_millis(200)); - let others = other_enabled_outputs(name); + // Disable everything still enabled that ISN'T a managed group member (bootstrap / physical), so + // the group is unambiguously the desktop — never a sibling session's output (group-aware filter). + let others = other_enabled_outputs(); if !others.is_empty() { let args: Vec = others .iter() @@ -555,3 +593,27 @@ fn run( let _ = conn.flush(); Ok(()) } + +#[cfg(test)] +mod tests { + use super::MANAGED_PREFIX; + + /// Group-aware exclusive (§6.1): with two managed group members + a physical panel enabled, + /// exclusive disables ONLY the non-managed panel — never a sibling session's per-slot output + /// (the Stage-3 naming would otherwise make a 2nd exclusive session black out the 1st). + #[test] + fn exclusive_disables_only_non_managed() { + let enabled = [ + "Virtual-punktfunk", // base name (shared identity) + "Virtual-punktfunk-1", // client A's per-slot output + "Virtual-punktfunk-7", // client B's per-slot output + "eDP-1", // a physical panel + ]; + let to_disable: Vec<&str> = enabled + .iter() + .copied() + .filter(|n| !n.starts_with(MANAGED_PREFIX)) + .collect(); + assert_eq!(to_disable, vec!["eDP-1"]); + } +}