feat(host): intra-refresh loss recovery + delivery-anchored latency instrumentation
Intra-refresh (opt-in PUNKTFUNK_INTRA_REFRESH=1 until on-glass validated): NVENC runs a moving intra band + recovery-point SEI (gop_size becomes the wave period, ffmpeg forces the real GOP infinite; default fps/2, PUNKTFUNK_IR_PERIOD_FRAMES overrides; ENOSYS latches a fallback to IDR-only). Clients request a keyframe on every FEC-unrecoverable frame, so under intra-refresh the session glue serves the first request instantly and suppresses the rest for a 2s window — the wave heals loss without the 20-40x IDR spike cascade. VAAPI/software keep IDR recovery. Instrumentation: the wire pts now anchors at the PipeWire delivery stamp (client-measured latency covers delivery + queue age, not just submit->glass; repeats/synthetic stamps fall back to now), encode_us keeps its submit->AU meaning via a separate inflight stamp, and a new 'queue' stage (delivery->submit age of fresh frames) rides PUNKTFUNK_PERF and the web-console stats samples. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -2365,8 +2365,13 @@ struct FrameMsg {
|
||||
/// When this frame's packets should have fully left (the next frame's due time) = the pacing
|
||||
/// budget. In the past when the send thread is behind → immediate send (catch up).
|
||||
deadline: std::time::Instant,
|
||||
/// capture→encoded latency (µs), measured on the encode thread, carried for the perf histogram.
|
||||
/// submit→encoded latency (µs), measured on the encode thread, carried for the perf histogram.
|
||||
encode_us: u32,
|
||||
/// Capture-delivery → encoder-submit age (µs) of a fresh frame — the PipeWire delivery +
|
||||
/// channel-queue time the old pre-submit stamp made invisible. Always measured (two integer
|
||||
/// ops); 0 for repeats/tail frames. The wire pts (`capture_ns`) anchors at the same delivery
|
||||
/// stamp, so client-side latency figures include this window too.
|
||||
queue_us: u32,
|
||||
/// Per-stage µs splits, measured on the capture/encode thread (0 when neither `PUNKTFUNK_PERF`
|
||||
/// nor a stats capture is armed). The send thread accumulates them for the web-console sample:
|
||||
/// `cap_us` = `try_latest` (ring read + colour convert), `submit_us` = NVENC `encode_picture`
|
||||
@@ -2491,8 +2496,12 @@ fn send_loop(
|
||||
// split carried on each FrameMsg, the new-vs-repeat frame split, the cached registration id, and
|
||||
// the previous window's loss snapshot for delta computation.
|
||||
let mut sid: Option<u32> = None;
|
||||
let (mut cap_v, mut submit_v, mut wait_v): (Vec<u32>, Vec<u32>, Vec<u32>) =
|
||||
(Vec::new(), Vec::new(), Vec::new());
|
||||
let (mut cap_v, mut submit_v, mut wait_v, mut queue_v): (
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
|
||||
let (mut new_frames, mut repeat_frames) = (0u64, 0u64);
|
||||
let mut last_frames_dropped = 0u64;
|
||||
let mut last_packets_dropped = 0u64;
|
||||
@@ -2529,6 +2538,11 @@ fn send_loop(
|
||||
cap_v.push(msg.cap_us);
|
||||
submit_v.push(msg.submit_us);
|
||||
wait_v.push(msg.wait_us);
|
||||
// Queue age is only meaningful for fresh frames (repeats/tail carry 0
|
||||
// by construction — including those would drag the percentiles down).
|
||||
if !msg.repeat {
|
||||
queue_v.push(msg.queue_us);
|
||||
}
|
||||
}
|
||||
if msg.repeat {
|
||||
repeat_frames += 1;
|
||||
@@ -2588,6 +2602,11 @@ fn send_loop(
|
||||
t_ms: 0, // stamped by push_sample from the capture's monotonic start
|
||||
session_id,
|
||||
stages: vec![
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "queue".into(),
|
||||
p50_us: percentile(&mut queue_v, 0.50) as f32,
|
||||
p99_us: percentile(&mut queue_v, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "capture".into(),
|
||||
p50_us: percentile(&mut cap_v, 0.50) as f32,
|
||||
@@ -2631,6 +2650,7 @@ fn send_loop(
|
||||
cap_v.clear();
|
||||
submit_v.clear();
|
||||
wait_v.clear();
|
||||
queue_v.clear();
|
||||
paced_frames = 0;
|
||||
immediate_frames = 0;
|
||||
new_frames = 0;
|
||||
@@ -2963,10 +2983,12 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
let mut capture_rebuilds: u32 = 0;
|
||||
// Last HDR mastering metadata we forwarded — re-sent as 0xCE on change/keyframe (see below).
|
||||
let mut last_hdr_meta: Option<punktfunk_core::quic::HdrMeta> = None;
|
||||
// Frames submitted to NVENC but not yet polled (capture_ns, pacing deadline). With a capturer that
|
||||
// hands a fresh output texture per frame, the loop submits N+1 before polling N (pipeline depth > 1),
|
||||
// overlapping the convert/copy of N+1 on the 3D engine with the encode of N on the NVENC ASIC.
|
||||
let mut inflight: std::collections::VecDeque<(u64, std::time::Instant)> =
|
||||
// Frames submitted to NVENC but not yet polled (wire pts, submit stamp, pacing deadline). With a
|
||||
// capturer that hands a fresh output texture per frame, the loop submits N+1 before polling N
|
||||
// (pipeline depth > 1), overlapping the convert/copy of N+1 on the 3D engine with the encode of N
|
||||
// on the NVENC ASIC. The wire pts and the submit stamp are carried separately so `encode_us`
|
||||
// keeps meaning submit→AU while the wire pts anchors at PipeWire delivery (queue age included).
|
||||
let mut inflight: std::collections::VecDeque<(u64, u64, std::time::Instant)> =
|
||||
std::collections::VecDeque::new();
|
||||
// Diagnostic: distinguish NEW captured frames (the source produced a fresh frame) from REPEATS (the
|
||||
// loop re-encoded the last frame because `try_latest` had nothing). A low new-frame rate at a high
|
||||
@@ -2974,12 +2996,18 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
// compositing), NOT an encoder problem. Logged every 2 s when `PUNKTFUNK_PERF`.
|
||||
let (mut diag_new, mut diag_repeat) = (0u64, 0u64);
|
||||
let mut diag_at = std::time::Instant::now();
|
||||
// Last client-requested forced IDR — the intra-refresh rate limit window anchor (see below).
|
||||
let mut last_forced_idr: Option<std::time::Instant> = None;
|
||||
// Per-stage latency breakdown (PUNKTFUNK_PERF): per-call µs for the GPU-bound stages so we see
|
||||
// exactly where the capture→encoded latency goes — cap=try_latest (ring read + colour convert),
|
||||
// submit=encode_picture launch, wait=lock_bitstream (the scheduling wait + ASIC encode, the one
|
||||
// that dominates under a GPU-saturating game).
|
||||
let (mut st_cap, mut st_submit, mut st_wait): (Vec<u32>, Vec<u32>, Vec<u32>) =
|
||||
(Vec::new(), Vec::new(), Vec::new());
|
||||
let (mut st_cap, mut st_submit, mut st_wait, mut st_queue): (
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
Vec<u32>,
|
||||
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
|
||||
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||
// Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in
|
||||
// place — a different compositor at the SAME client mode — keeping the Session + send thread
|
||||
@@ -3083,8 +3111,21 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
want_kf = true;
|
||||
}
|
||||
if want_kf {
|
||||
tracing::debug!("forcing keyframe (client decode recovery)");
|
||||
enc.request_keyframe();
|
||||
// Intra-refresh mode: clients request a keyframe on EVERY FEC-unrecoverable frame
|
||||
// (`frames_dropped` polling), but the refresh wave already heals those within ~half a
|
||||
// second — answering each with a full IDR is the 20-40× spike cascade the wave exists
|
||||
// to avoid. Serve the first request immediately (a genuinely wedged decoder recovers
|
||||
// at once), then suppress further requests for one window and let the wave heal.
|
||||
const IDR_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
|
||||
let suppress = enc.caps().intra_refresh
|
||||
&& last_forced_idr.is_some_and(|t| t.elapsed() < IDR_WINDOW);
|
||||
if suppress {
|
||||
tracing::debug!("keyframe request suppressed — intra-refresh wave healing");
|
||||
} else {
|
||||
tracing::debug!("forcing keyframe (client decode recovery)");
|
||||
enc.request_keyframe();
|
||||
last_forced_idr = Some(std::time::Instant::now());
|
||||
}
|
||||
}
|
||||
// Measure the per-stage split when `PUNKTFUNK_PERF` is set OR a web-console stats capture is
|
||||
// armed (a cheap Relaxed atomic, re-read each frame). The values feed the existing perf log
|
||||
@@ -3206,6 +3247,8 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
);
|
||||
let wait_max = st_wait.iter().copied().max().unwrap_or(0);
|
||||
tracing::info!(
|
||||
queue_us_p50 = percentile(&mut st_queue, 0.50),
|
||||
queue_us_p99 = percentile(&mut st_queue, 0.99),
|
||||
cap_us_p50 = percentile(&mut st_cap, 0.50),
|
||||
cap_us_p99 = percentile(&mut st_cap, 0.99),
|
||||
submit_us_p50 = percentile(&mut st_submit, 0.50),
|
||||
@@ -3213,11 +3256,12 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
wait_us_p50 = percentile(&mut st_wait, 0.50),
|
||||
wait_us_p99 = percentile(&mut st_wait, 0.99),
|
||||
wait_us_max = wait_max,
|
||||
"stage perf (µs/call): cap=try_latest(ring+convert) submit=encode_picture wait=lock_bitstream(sched+ASIC)"
|
||||
"stage perf (µs/call): queue=delivery→submit cap=try_latest(ring+convert) submit=encode_picture wait=lock_bitstream(sched+ASIC)"
|
||||
);
|
||||
st_cap.clear();
|
||||
st_submit.clear();
|
||||
st_wait.clear();
|
||||
st_queue.clear();
|
||||
diag_new = 0;
|
||||
diag_repeat = 0;
|
||||
diag_at = std::time::Instant::now();
|
||||
@@ -3235,7 +3279,23 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
// How deep to pipeline (1 = synchronous submit→poll, the original behaviour). The IDD-push
|
||||
// capturer hands a rotating ring of output textures, so it returns >1; other capturers default 1.
|
||||
let depth = capturer.pipeline_depth().max(1);
|
||||
let capture_ns = now_ns();
|
||||
let submit_ns = now_ns();
|
||||
// Wire pts: a fresh frame anchors at its capture-delivery stamp (`CapturedFrame.pts_ns`,
|
||||
// stamped when the capture thread handed it over) so client-measured latency covers
|
||||
// delivery + queue age, not just submit→glass; `queue_us` splits that age out as its own
|
||||
// stage. A re-encoded hold anchors at "now" (its content age is unbounded by design). The
|
||||
// stamp must be a recent wall-clock time — a synthetic/index-based or ahead-of-clock stamp
|
||||
// (SyntheticCapturer counts from 0, not the epoch) falls back to "now".
|
||||
let age_ns = submit_ns.saturating_sub(frame.pts_ns);
|
||||
let plausible = frame.pts_ns > 0 && frame.pts_ns <= submit_ns && age_ns < 10_000_000_000;
|
||||
let (capture_ns, queue_us) = if !repeat && plausible {
|
||||
(frame.pts_ns, (age_ns / 1000) as u32)
|
||||
} else {
|
||||
(submit_ns, 0)
|
||||
};
|
||||
if perf && !repeat {
|
||||
st_queue.push(queue_us);
|
||||
}
|
||||
let t_submit = std::time::Instant::now();
|
||||
enc.submit(&frame).context("encoder submit")?;
|
||||
let submit_us = if measure {
|
||||
@@ -3249,7 +3309,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
// This frame's pacing deadline (the next frame's due time); the send thread spreads a big frame
|
||||
// up to here. Each in-flight frame carries its own (capture_ns, deadline) for when it's polled.
|
||||
next += interval;
|
||||
inflight.push_back((capture_ns, next));
|
||||
inflight.push_back((capture_ns, submit_ns, next));
|
||||
// Drain the OLDEST in-flight frames, keeping at most depth-1 deferred. At depth 1 this polls
|
||||
// immediately after every submit (synchronous); at depth 2 it polls N right after submitting N+1,
|
||||
// so the encode of N overlaps the convert/copy of N+1. NVENC's `pending` is FIFO, so poll() returns
|
||||
@@ -3270,7 +3330,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
Some(au) => au,
|
||||
None => break, // no AU ready for a submitted frame (shouldn't happen — poll blocks)
|
||||
};
|
||||
let (cap_ns, deadline) = inflight.pop_front().expect("inflight non-empty");
|
||||
let (cap_ns, sub_ns, deadline) = inflight.pop_front().expect("inflight non-empty");
|
||||
let flags = if au.keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
@@ -3285,13 +3345,14 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
resend_meta = false;
|
||||
}
|
||||
}
|
||||
let encode_us = (now_ns().saturating_sub(cap_ns) / 1000) as u32;
|
||||
let encode_us = (now_ns().saturating_sub(sub_ns) / 1000) as u32;
|
||||
let msg = FrameMsg {
|
||||
data: au.data,
|
||||
capture_ns: cap_ns,
|
||||
flags,
|
||||
deadline,
|
||||
encode_us,
|
||||
queue_us,
|
||||
cap_us,
|
||||
submit_us,
|
||||
wait_us,
|
||||
@@ -3316,14 +3377,14 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
}
|
||||
// Drain the in-flight tail (the depth-1 frames submitted but not yet polled) so the last frames still
|
||||
// reach the client instead of being dropped on the way out.
|
||||
while let Some((cap_ns, deadline)) = inflight.pop_front() {
|
||||
while let Some((cap_ns, sub_ns, deadline)) = inflight.pop_front() {
|
||||
let Ok(Some(au)) = enc.poll() else { break };
|
||||
let flags = if au.keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
let encode_us = (now_ns().saturating_sub(cap_ns) / 1000) as u32;
|
||||
let encode_us = (now_ns().saturating_sub(sub_ns) / 1000) as u32;
|
||||
// End-of-stream tail drain: the per-stage split isn't measured here (the capture loop has
|
||||
// exited), so leave it zero — these last few frames are negligible for the aggregates.
|
||||
let msg = FrameMsg {
|
||||
@@ -3332,6 +3393,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
flags,
|
||||
deadline,
|
||||
encode_us,
|
||||
queue_us: 0,
|
||||
cap_us: 0,
|
||||
submit_us: 0,
|
||||
wait_us: 0,
|
||||
|
||||
Reference in New Issue
Block a user