diff --git a/CLAUDE.md b/CLAUDE.md index b9c267f..98c26e6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -479,7 +479,7 @@ fallback for modern Intel), `PUNKTFUNK_NV12=0` (opt OUT of the default GPU RGB NVIDIA tiled zero-copy path), `PUNKTFUNK_INTRA_REFRESH=1` (opt-in NVENC intra-refresh loss recovery), `PUNKTFUNK_PIN_CLOCKS=1` (opt-in NVML GPU clock floor, root-gated), `PUNKTFUNK_GAMESCOPE_APP=...`, `PUNKTFUNK_INPUT_BACKEND=...`, `PUNKTFUNK_PERF=1` (per-stage timing), `PUNKTFUNK_VIDEO_DROP=N` (FEC -test), `PUNKTFUNK_FEC_PCT=N`, `PUNKTFUNK_DSCP=1` (opt-in DSCP/SO_PRIORITY media QoS on the data + +test — injects N% wire-packet loss on BOTH the GameStream and native video paths, no netem needed), `PUNKTFUNK_FEC_PCT=N`, `PUNKTFUNK_DSCP=1` (opt-in DSCP/SO_PRIORITY media QoS on the data + GameStream video/audio sockets; no-op on the wire on Windows without a qWAVE policy), `PUNKTFUNK_444=1` (full-chroma HEVC 4:4:4, see below). diff --git a/clients/probe/src/main.rs b/clients/probe/src/main.rs index 58cd883..75c3427 100644 --- a/clients/probe/src/main.rs +++ b/clients/probe/src/main.rs @@ -52,7 +52,7 @@ use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::FLAG_PROBE; use punktfunk_core::quic::{ endpoint, io, window_loss_ppm, Hello, LossReport, ProbeRequest, ProbeResult, Reconfigure, - Reconfigured, Start, Welcome, + Reconfigured, RequestKeyframe, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; @@ -505,6 +505,11 @@ async fn session(args: Args) -> Result<()> { // stream mode (no speed test / remode) a control-stream task relays it to the host as a // LossReport so it can size FEC to the link. u32::MAX = "no fresh sample this window". let loss_ppm = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(u32::MAX)); + // Decode-recovery feedback, mirroring the real clients: the data loop publishes the session's + // cumulative unrecoverable-frame count; the control task requests a keyframe when it grows + // (the correct loss trigger under infinite GOP — see NativeClient::frames_dropped). Lets the + // probe exercise the host's IDR-vs-intra-refresh recovery path under injected loss. + let dropped_frames = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); // Mid-stream renegotiation test: after a delay, ask the host to switch modes on the // still-open control stream. The stream then carries new-mode AUs (IDR + in-band @@ -617,21 +622,40 @@ async fn session(args: Args) -> Result<()> { }); } else { // Normal stream mode: relay the data loop's windowed loss estimate to the host as periodic - // LossReports, so it can size FEC to the link (adaptive FEC). The control stream is otherwise - // idle here (remode/speed-test own it in their modes). + // LossReports, so it can size FEC to the link (adaptive FEC), and — like the real clients — + // request a keyframe whenever the unrecoverable-frame count grows (100 ms poll = a natural + // throttle; several drops in a burst coalesce into one request). The control stream is + // otherwise idle here (remode/speed-test own it in their modes). let mut ls = send; let lp = loss_ppm.clone(); + let df = dropped_frames.clone(); tokio::spawn(async move { use std::sync::atomic::Ordering::Relaxed; + let mut last_report = std::time::Instant::now(); + let mut last_dropped = 0u64; loop { - tokio::time::sleep(std::time::Duration::from_millis(750)).await; - let v = lp.swap(u32::MAX, Relaxed); - if v != u32::MAX - && io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode()) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let d = df.load(Relaxed); + if d > last_dropped { + last_dropped = d; + if io::write_msg(&mut ls, &RequestKeyframe.encode()) .await .is_err() - { - break; // control stream gone + { + break; // control stream gone + } + tracing::debug!(dropped = d, "unrecoverable frame — requested keyframe"); + } + if last_report.elapsed() >= std::time::Duration::from_millis(750) { + last_report = std::time::Instant::now(); + let v = lp.swap(u32::MAX, Relaxed); + if v != u32::MAX + && io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode()) + .await + .is_err() + { + break; // control stream gone + } } } }); @@ -943,6 +967,7 @@ async fn session(args: Args) -> Result<()> { let out_path = args.out.clone(); let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone()); let lp_dt = loss_ppm.clone(); + let df_dt = dropped_frames.clone(); // Express our receive time in the host clock before differencing against the host-stamped // capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency @@ -988,6 +1013,7 @@ async fn session(args: Args) -> Result<()> { let s = session.stats(); rxp_dt.store(s.packets_received, Relaxed); rxb_dt.store(s.bytes_received, Relaxed); + df_dt.store(s.frames_dropped, Relaxed); if last_loss_report.elapsed() >= std::time::Duration::from_millis(750) { lp_dt.store( window_loss_ppm( diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index ad2d347..c5d5452 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -2290,6 +2290,9 @@ const PACE_CHUNK: usize = 16; /// it's needed (an unpaced line-rate burst overruns the kernel tx buffer → EAGAIN drop → under /// infinite GOP, a freeze until the next keyframe). With no slack (encode ≈ interval) the budget /// collapses to 0 and even the overflow goes out immediately, so this is never slower than unpaced. +/// Parsed-once `PUNKTFUNK_VIDEO_DROP` percentage for the native data plane (see `paced_submit`). +static NATIVE_VIDEO_DROP: std::sync::OnceLock = std::sync::OnceLock::new(); + fn paced_submit( session: &mut Session, data: &[u8], @@ -2301,7 +2304,29 @@ fn paced_submit( let wires = session .seal_frame(data, pts_ns, flags) .map_err(|e| anyhow!("seal_frame: {e:?}"))?; - let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); + let mut refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); + // FEC/recovery test knob: PUNKTFUNK_VIDEO_DROP=N discards N% of the sealed wire packets + // before send — controlled loss injection with no netem/root, same knob the GameStream video + // path honors. Parsed once; 0/unset = off (the normal path is untouched). + let drop_pct = *NATIVE_VIDEO_DROP.get_or_init(|| { + let pct = std::env::var("PUNKTFUNK_VIDEO_DROP") + .ok() + .and_then(|s| s.parse::().ok()) + .filter(|p| (1..=90).contains(p)) + .unwrap_or(0); + if pct > 0 { + tracing::warn!( + pct, + "PUNKTFUNK_VIDEO_DROP: injecting wire-packet loss (FEC test)" + ); + } + pct + }); + if drop_pct > 0 { + use rand::Rng; + let mut rng = rand::thread_rng(); + refs.retain(|_| rng.gen_range(0..100) >= drop_pct); + } let start = std::time::Instant::now(); // Split at the microburst cap: packets [0..split] burst out immediately, [split..] are paced.