diff --git a/crates/punktfunk-host/src/encode.rs b/crates/punktfunk-host/src/encode.rs index 4b967c6..2067cfa 100644 --- a/crates/punktfunk-host/src/encode.rs +++ b/crates/punktfunk-host/src/encode.rs @@ -154,6 +154,12 @@ pub struct EncoderCaps { /// the encoder's real chroma disagrees with what was negotiated (the in-band SPS is authoritative /// for the decoder either way). pub chroma_444: bool, + /// The encoder runs a periodic **intra-refresh wave** (a moving band of intra blocks + + /// recovery-point SEI, no periodic IDR): FEC-unrecoverable loss self-heals within one wave, so + /// the session glue rate-limits client keyframe requests instead of answering each with a full + /// IDR (the 20-40× frame-size spike that cascades under loss). Linux NVENC sets it when + /// `PUNKTFUNK_INTRA_REFRESH` opened the encoder in that mode; VAAPI/software never do. + pub intra_refresh: bool, } /// A hardware encoder. One per session; runs on the encode thread. diff --git a/crates/punktfunk-host/src/encode/linux/mod.rs b/crates/punktfunk-host/src/encode/linux/mod.rs index d12a7ab..7f83202 100644 --- a/crates/punktfunk-host/src/encode/linux/mod.rs +++ b/crates/punktfunk-host/src/encode/linux/mod.rs @@ -168,6 +168,9 @@ pub struct NvencEncoder { frame_idx: i64, /// Force the next submitted frame to be an IDR (set by [`request_keyframe`]). force_kf: bool, + /// Opened in intra-refresh mode (surfaced via [`caps`](Encoder::caps) so the session glue + /// rate-limits forced IDRs — the wave heals loss without them). + intra_refresh: bool, } // `CudaHw` holds raw `AVBufferRef`s and `sws_444` a raw `SwsContext`; the encoder lives on a single @@ -183,6 +186,36 @@ pub struct NvencEncoder { // existing `Send` to the raw CUDA fields; `Sync` (shared `&`) is deliberately NOT implemented. unsafe impl Send for NvencEncoder {} +/// Latched true once an intra-refresh open failed with the device-capability error (ENOSYS from +/// `NV_ENC_CAPS_SUPPORT_INTRA_REFRESH`), so later sessions skip the doomed attempt. Never set by +/// other open failures (a bitrate EINVAL must not permanently disable the feature). +static IR_UNSUPPORTED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + +/// Whether this open should run the NVENC **intra-refresh** loss-recovery mode +/// (`PUNKTFUNK_INTRA_REFRESH` truthy, opt-in until on-glass validated): a moving intra band + +/// recovery-point SEI refreshes the whole picture every [`intra_refresh_period`] frames, so +/// FEC-unrecoverable loss heals without the 20-40× full-IDR spike (which under loss causes more +/// loss — the cascade). The session glue then rate-limits client keyframe requests +/// ([`EncoderCaps::intra_refresh`](super::EncoderCaps)). +fn intra_refresh_requested() -> bool { + std::env::var("PUNKTFUNK_INTRA_REFRESH") + .map(|v| matches!(v.trim(), "1" | "true" | "yes" | "on")) + .unwrap_or(false) + && !IR_UNSUPPORTED.load(std::sync::atomic::Ordering::Relaxed) +} + +/// The intra-refresh wave length in frames — ffmpeg derives `intraRefreshPeriod`/`Cnt` from +/// `gop_size` before forcing the real GOP infinite, so this is what `gop_size` is set to in IR +/// mode. Default = half a second of frames (heals fast, spreads the intra cost to ~2-3% per +/// frame); `PUNKTFUNK_IR_PERIOD_FRAMES` overrides. +fn intra_refresh_period(fps: u32) -> i32 { + std::env::var("PUNKTFUNK_IR_PERIOD_FRAMES") + .ok() + .and_then(|s| s.parse::().ok()) + .filter(|v| *v >= 2) + .unwrap_or_else(|| (fps.max(16) / 2) as i32) +} + impl NvencEncoder { #[allow(clippy::too_many_arguments)] pub fn open( @@ -277,11 +310,20 @@ impl NvencEncoder { // "freeze". NVENC emits one IDR at stream start, then P-frames only; `forced-idr` (below) // turns a client recovery request (RFI, via `request_keyframe`) into an IDR on demand. // This is the Moonlight/Sunshine low-latency model. + // In intra-refresh mode the GOP is still infinite — ffmpeg reads `gop_size` as the refresh + // WAVE length (`intraRefreshPeriod`/`Cnt`) and then forces `gopLength` infinite itself, so + // a positive `gop_size` here does NOT reintroduce periodic IDRs. + let intra_refresh = intra_refresh_requested(); // SAFETY: same `video` builder as above — a non-null, properly-aligned, sole-owned, not-yet- - // opened `AVCodecContext`. We write the plain `gop_size` int field (= -1, infinite GOP) before - // `open_with`, which ffmpeg-next has no setter for. No aliasing; synchronous scalar write. + // opened `AVCodecContext`. We write the plain `gop_size` int field (-1 = infinite GOP, or the + // intra-refresh wave length) before `open_with`, which ffmpeg-next has no setter for. No + // aliasing; synchronous scalar write. unsafe { - (*video.as_mut_ptr()).gop_size = -1; + (*video.as_mut_ptr()).gop_size = if intra_refresh { + intra_refresh_period(fps) + } else { + -1 + }; } // NV12 / 4:4:4 paths: we do the RGB→YUV conversion ourselves as BT.709 *limited* range @@ -377,6 +419,11 @@ impl NvencEncoder { opts.set("bf", "0"); opts.set("delay", "0"); opts.set("forced-idr", "1"); // RFI/request_keyframe → real IDR under the infinite GOP + if intra_refresh { + // Moving intra band + recovery-point SEI (period set via gop_size above). Loss now + // self-heals within the wave; forced IDRs remain available (rate-limited by the glue). + opts.set("intra-refresh", "1"); + } if want_444 { // HEVC Range Extensions — the profile that carries chroma_format_idc=3. With a YUV444P // input `hevc_nvenc` auto-selects it, but pin it explicitly so the chroma is never silently @@ -403,9 +450,45 @@ impl NvencEncoder { None => {} } - let enc = video - .open_with(opts) - .with_context(|| format!("open {name} ({width}x{height}@{fps}, {bitrate_bps} bps)"))?; + let enc = match video.open_with(opts) { + Ok(enc) => enc, + // The GPU lacks NV_ENC_CAPS_SUPPORT_INTRA_REFRESH — ffmpeg fails the open with + // ENOSYS ("Function not implemented"). Latch it (skip the doomed attempt on later + // sessions) and reopen this session without intra-refresh; any other failure — and + // any failure when IR wasn't requested — propagates untouched (the bitrate probe + // keys on EINVAL, which must not trip the latch). + Err(e) if intra_refresh && format!("{e:#}").contains("Function not implemented") => { + tracing::warn!( + encoder = name, + "NVENC intra-refresh not supported by this GPU — falling back to IDR-only \ + recovery" + ); + IR_UNSUPPORTED.store(true, std::sync::atomic::Ordering::Relaxed); + return Self::open( + codec, + format, + width, + height, + fps, + bitrate_bps, + cuda, + bit_depth, + chroma, + ); + } + Err(e) => { + return Err(e).with_context(|| { + format!("open {name} ({width}x{height}@{fps}, {bitrate_bps} bps)") + }) + } + }; + if intra_refresh { + tracing::info!( + encoder = name, + period_frames = intra_refresh_period(fps), + "NVENC intra-refresh recovery active (no periodic IDR; wave heals loss)" + ); + } let frame = if cuda { None @@ -424,6 +507,7 @@ impl NvencEncoder { fps, frame_idx: 0, force_kf: false, + intra_refresh, }) } } @@ -434,6 +518,7 @@ impl Encoder for NvencEncoder { // 4:4:4 iff this session opened the RGB→YUV444P swscale path (FREXT). RFI/HDR-SEI stay // unsupported on libavcodec NVENC (the trait defaults). chroma_444: self.sws_444.is_some(), + intra_refresh: self.intra_refresh, ..super::EncoderCaps::default() } } diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index 29a4ece..ad2d347 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -2365,8 +2365,13 @@ struct FrameMsg { /// When this frame's packets should have fully left (the next frame's due time) = the pacing /// budget. In the past when the send thread is behind → immediate send (catch up). deadline: std::time::Instant, - /// capture→encoded latency (µs), measured on the encode thread, carried for the perf histogram. + /// submit→encoded latency (µs), measured on the encode thread, carried for the perf histogram. encode_us: u32, + /// Capture-delivery → encoder-submit age (µs) of a fresh frame — the PipeWire delivery + + /// channel-queue time the old pre-submit stamp made invisible. Always measured (two integer + /// ops); 0 for repeats/tail frames. The wire pts (`capture_ns`) anchors at the same delivery + /// stamp, so client-side latency figures include this window too. + queue_us: u32, /// Per-stage µs splits, measured on the capture/encode thread (0 when neither `PUNKTFUNK_PERF` /// nor a stats capture is armed). The send thread accumulates them for the web-console sample: /// `cap_us` = `try_latest` (ring read + colour convert), `submit_us` = NVENC `encode_picture` @@ -2491,8 +2496,12 @@ fn send_loop( // split carried on each FrameMsg, the new-vs-repeat frame split, the cached registration id, and // the previous window's loss snapshot for delta computation. let mut sid: Option = None; - let (mut cap_v, mut submit_v, mut wait_v): (Vec, Vec, Vec) = - (Vec::new(), Vec::new(), Vec::new()); + let (mut cap_v, mut submit_v, mut wait_v, mut queue_v): ( + Vec, + Vec, + Vec, + Vec, + ) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); let (mut new_frames, mut repeat_frames) = (0u64, 0u64); let mut last_frames_dropped = 0u64; let mut last_packets_dropped = 0u64; @@ -2529,6 +2538,11 @@ fn send_loop( cap_v.push(msg.cap_us); submit_v.push(msg.submit_us); wait_v.push(msg.wait_us); + // Queue age is only meaningful for fresh frames (repeats/tail carry 0 + // by construction — including those would drag the percentiles down). + if !msg.repeat { + queue_v.push(msg.queue_us); + } } if msg.repeat { repeat_frames += 1; @@ -2588,6 +2602,11 @@ fn send_loop( t_ms: 0, // stamped by push_sample from the capture's monotonic start session_id, stages: vec![ + crate::stats_recorder::StageTiming { + name: "queue".into(), + p50_us: percentile(&mut queue_v, 0.50) as f32, + p99_us: percentile(&mut queue_v, 0.99) as f32, + }, crate::stats_recorder::StageTiming { name: "capture".into(), p50_us: percentile(&mut cap_v, 0.50) as f32, @@ -2631,6 +2650,7 @@ fn send_loop( cap_v.clear(); submit_v.clear(); wait_v.clear(); + queue_v.clear(); paced_frames = 0; immediate_frames = 0; new_frames = 0; @@ -2963,10 +2983,12 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { let mut capture_rebuilds: u32 = 0; // Last HDR mastering metadata we forwarded — re-sent as 0xCE on change/keyframe (see below). let mut last_hdr_meta: Option = None; - // Frames submitted to NVENC but not yet polled (capture_ns, pacing deadline). With a capturer that - // hands a fresh output texture per frame, the loop submits N+1 before polling N (pipeline depth > 1), - // overlapping the convert/copy of N+1 on the 3D engine with the encode of N on the NVENC ASIC. - let mut inflight: std::collections::VecDeque<(u64, std::time::Instant)> = + // Frames submitted to NVENC but not yet polled (wire pts, submit stamp, pacing deadline). With a + // capturer that hands a fresh output texture per frame, the loop submits N+1 before polling N + // (pipeline depth > 1), overlapping the convert/copy of N+1 on the 3D engine with the encode of N + // on the NVENC ASIC. The wire pts and the submit stamp are carried separately so `encode_us` + // keeps meaning submit→AU while the wire pts anchors at PipeWire delivery (queue age included). + let mut inflight: std::collections::VecDeque<(u64, u64, std::time::Instant)> = std::collections::VecDeque::new(); // Diagnostic: distinguish NEW captured frames (the source produced a fresh frame) from REPEATS (the // loop re-encoded the last frame because `try_latest` had nothing). A low new-frame rate at a high @@ -2974,12 +2996,18 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { // compositing), NOT an encoder problem. Logged every 2 s when `PUNKTFUNK_PERF`. let (mut diag_new, mut diag_repeat) = (0u64, 0u64); let mut diag_at = std::time::Instant::now(); + // Last client-requested forced IDR — the intra-refresh rate limit window anchor (see below). + let mut last_forced_idr: Option = None; // Per-stage latency breakdown (PUNKTFUNK_PERF): per-call µs for the GPU-bound stages so we see // exactly where the capture→encoded latency goes — cap=try_latest (ring read + colour convert), // submit=encode_picture launch, wait=lock_bitstream (the scheduling wait + ASIC encode, the one // that dominates under a GPU-saturating game). - let (mut st_cap, mut st_submit, mut st_wait): (Vec, Vec, Vec) = - (Vec::new(), Vec::new(), Vec::new()); + let (mut st_cap, mut st_submit, mut st_wait, mut st_queue): ( + Vec, + Vec, + Vec, + Vec, + ) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { // Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in // place — a different compositor at the SAME client mode — keeping the Session + send thread @@ -3083,8 +3111,21 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { want_kf = true; } if want_kf { - tracing::debug!("forcing keyframe (client decode recovery)"); - enc.request_keyframe(); + // Intra-refresh mode: clients request a keyframe on EVERY FEC-unrecoverable frame + // (`frames_dropped` polling), but the refresh wave already heals those within ~half a + // second — answering each with a full IDR is the 20-40× spike cascade the wave exists + // to avoid. Serve the first request immediately (a genuinely wedged decoder recovers + // at once), then suppress further requests for one window and let the wave heal. + const IDR_WINDOW: std::time::Duration = std::time::Duration::from_secs(2); + let suppress = enc.caps().intra_refresh + && last_forced_idr.is_some_and(|t| t.elapsed() < IDR_WINDOW); + if suppress { + tracing::debug!("keyframe request suppressed — intra-refresh wave healing"); + } else { + tracing::debug!("forcing keyframe (client decode recovery)"); + enc.request_keyframe(); + last_forced_idr = Some(std::time::Instant::now()); + } } // Measure the per-stage split when `PUNKTFUNK_PERF` is set OR a web-console stats capture is // armed (a cheap Relaxed atomic, re-read each frame). The values feed the existing perf log @@ -3206,6 +3247,8 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { ); let wait_max = st_wait.iter().copied().max().unwrap_or(0); tracing::info!( + queue_us_p50 = percentile(&mut st_queue, 0.50), + queue_us_p99 = percentile(&mut st_queue, 0.99), cap_us_p50 = percentile(&mut st_cap, 0.50), cap_us_p99 = percentile(&mut st_cap, 0.99), submit_us_p50 = percentile(&mut st_submit, 0.50), @@ -3213,11 +3256,12 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { wait_us_p50 = percentile(&mut st_wait, 0.50), wait_us_p99 = percentile(&mut st_wait, 0.99), wait_us_max = wait_max, - "stage perf (µs/call): cap=try_latest(ring+convert) submit=encode_picture wait=lock_bitstream(sched+ASIC)" + "stage perf (µs/call): queue=delivery→submit cap=try_latest(ring+convert) submit=encode_picture wait=lock_bitstream(sched+ASIC)" ); st_cap.clear(); st_submit.clear(); st_wait.clear(); + st_queue.clear(); diag_new = 0; diag_repeat = 0; diag_at = std::time::Instant::now(); @@ -3235,7 +3279,23 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { // How deep to pipeline (1 = synchronous submit→poll, the original behaviour). The IDD-push // capturer hands a rotating ring of output textures, so it returns >1; other capturers default 1. let depth = capturer.pipeline_depth().max(1); - let capture_ns = now_ns(); + let submit_ns = now_ns(); + // Wire pts: a fresh frame anchors at its capture-delivery stamp (`CapturedFrame.pts_ns`, + // stamped when the capture thread handed it over) so client-measured latency covers + // delivery + queue age, not just submit→glass; `queue_us` splits that age out as its own + // stage. A re-encoded hold anchors at "now" (its content age is unbounded by design). The + // stamp must be a recent wall-clock time — a synthetic/index-based or ahead-of-clock stamp + // (SyntheticCapturer counts from 0, not the epoch) falls back to "now". + let age_ns = submit_ns.saturating_sub(frame.pts_ns); + let plausible = frame.pts_ns > 0 && frame.pts_ns <= submit_ns && age_ns < 10_000_000_000; + let (capture_ns, queue_us) = if !repeat && plausible { + (frame.pts_ns, (age_ns / 1000) as u32) + } else { + (submit_ns, 0) + }; + if perf && !repeat { + st_queue.push(queue_us); + } let t_submit = std::time::Instant::now(); enc.submit(&frame).context("encoder submit")?; let submit_us = if measure { @@ -3249,7 +3309,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { // This frame's pacing deadline (the next frame's due time); the send thread spreads a big frame // up to here. Each in-flight frame carries its own (capture_ns, deadline) for when it's polled. next += interval; - inflight.push_back((capture_ns, next)); + inflight.push_back((capture_ns, submit_ns, next)); // Drain the OLDEST in-flight frames, keeping at most depth-1 deferred. At depth 1 this polls // immediately after every submit (synchronous); at depth 2 it polls N right after submitting N+1, // so the encode of N overlaps the convert/copy of N+1. NVENC's `pending` is FIFO, so poll() returns @@ -3270,7 +3330,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { Some(au) => au, None => break, // no AU ready for a submitted frame (shouldn't happen — poll blocks) }; - let (cap_ns, deadline) = inflight.pop_front().expect("inflight non-empty"); + let (cap_ns, sub_ns, deadline) = inflight.pop_front().expect("inflight non-empty"); let flags = if au.keyframe { (FLAG_PIC | FLAG_SOF) as u32 } else { @@ -3285,13 +3345,14 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { resend_meta = false; } } - let encode_us = (now_ns().saturating_sub(cap_ns) / 1000) as u32; + let encode_us = (now_ns().saturating_sub(sub_ns) / 1000) as u32; let msg = FrameMsg { data: au.data, capture_ns: cap_ns, flags, deadline, encode_us, + queue_us, cap_us, submit_us, wait_us, @@ -3316,14 +3377,14 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { } // Drain the in-flight tail (the depth-1 frames submitted but not yet polled) so the last frames still // reach the client instead of being dropped on the way out. - while let Some((cap_ns, deadline)) = inflight.pop_front() { + while let Some((cap_ns, sub_ns, deadline)) = inflight.pop_front() { let Ok(Some(au)) = enc.poll() else { break }; let flags = if au.keyframe { (FLAG_PIC | FLAG_SOF) as u32 } else { FLAG_PIC as u32 }; - let encode_us = (now_ns().saturating_sub(cap_ns) / 1000) as u32; + let encode_us = (now_ns().saturating_sub(sub_ns) / 1000) as u32; // End-of-stream tail drain: the per-stage split isn't measured here (the capture loop has // exited), so leave it zero — these last few frames are negligible for the aggregates. let msg = FrameMsg { @@ -3332,6 +3393,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { flags, deadline, encode_us, + queue_us: 0, cap_us: 0, submit_us: 0, wait_us: 0,