diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 2709df3..b0844fd 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1385,39 +1385,88 @@ fn service_probes( /// until the next keyframe — the cause of the "freezes over ~150 Mbps, no image at 400 Mbps" /// symptom. When there's little/no slack (encode ≈ interval at very high fps) the budget collapses /// to ~0 and every chunk goes out immediately, so this is never slower than the unpaced path. +/// One paced send's outcome: how long the frame's packets took to leave (`spread_us`) and whether +/// any were paced (vs the whole frame fitting the microburst and going out immediately). Fed to the +/// PUNKTFUNK_PERF histogram so the pacing tail is visible per-frame. +struct PaceStat { + spread_us: u32, + paced: bool, +} + +const PACE_CHUNK: usize = 16; + +/// Seal one access unit and send it with MICROBURST pacing: the first `burst_cap` bytes go out +/// immediately (one absorbed burst the NIC / socket tx-buffer can swallow), and only the OVERFLOW +/// beyond that is spread in [`PACE_CHUNK`]-packet chunks across ~90% of the time to `deadline`. So a +/// normal-bitrate frame (≤ cap) leaves in one immediate burst at ~0 added latency, while a genuine +/// IDR / sustained-high-bitrate frame (≫ cap) still spreads — keeping the freeze fix exactly where +/// it's needed (an unpaced line-rate burst overruns the kernel tx buffer → EAGAIN drop → under +/// infinite GOP, a freeze until the next keyframe). With no slack (encode ≈ interval) the budget +/// collapses to 0 and even the overflow goes out immediately, so this is never slower than unpaced. fn paced_submit( session: &mut Session, data: &[u8], pts_ns: u64, flags: u32, deadline: std::time::Instant, -) -> Result<()> { - const PACE_CHUNK: usize = 16; + burst_cap: usize, +) -> Result { let wires = session .seal_frame(data, pts_ns, flags) .map_err(|e| anyhow!("seal_frame: {e:?}"))?; let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); - let n_chunks = refs.len().div_ceil(PACE_CHUNK).max(1); let start = std::time::Instant::now(); - // Spread sends over ~90% of the time to the deadline (10% margin for the caller's tail sleep); - // 0 when we're already at/past the deadline → no sleeps → immediate send. - let budget = deadline - .checked_duration_since(start) - .unwrap_or_default() - .mul_f32(0.9); - for (i, chunk) in refs.chunks(PACE_CHUNK).enumerate() { + + // Split at the microburst cap: packets [0..split] burst out immediately, [split..] are paced. + let mut cum = 0usize; + let mut split = refs.len(); + for (k, r) in refs.iter().enumerate() { + cum += r.len(); + if cum >= burst_cap { + split = k + 1; + break; + } + } + for chunk in refs[..split].chunks(PACE_CHUNK) { session .send_sealed(chunk) .map_err(|e| anyhow!("send_sealed: {e:?}"))?; - // Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter). - let target = start + budget.mul_f64((i + 1) as f64 / n_chunks as f64); - if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) { - if ahead > std::time::Duration::from_micros(500) { - std::thread::sleep(ahead); + } + let paced = split < refs.len(); + if paced { + let pace_start = std::time::Instant::now(); + let budget = deadline + .checked_duration_since(pace_start) + .unwrap_or_default() + .mul_f32(0.9); + let m = refs[split..].len().div_ceil(PACE_CHUNK).max(1); + for (j, chunk) in refs[split..].chunks(PACE_CHUNK).enumerate() { + session + .send_sealed(chunk) + .map_err(|e| anyhow!("send_sealed: {e:?}"))?; + // Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter). + let target = pace_start + budget.mul_f64((j + 1) as f64 / m as f64); + if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) { + if ahead > std::time::Duration::from_micros(500) { + std::thread::sleep(ahead); + } } } } - Ok(()) + Ok(PaceStat { + spread_us: start.elapsed().as_micros() as u32, + paced, + }) +} + +/// Percentile of a slice (sorts it in place first). `q` in 0.0..=1.0. +fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 { + if sorted_or_not.is_empty() { + return 0; + } + sorted_or_not.sort_unstable(); + let i = ((sorted_or_not.len() as f64 * q) as usize).min(sorted_or_not.len() - 1); + sorted_or_not[i] } /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs @@ -1453,12 +1502,26 @@ fn virtual_stream( let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let mut next = std::time::Instant::now(); let mut sent: u64 = 0; - // Throughput/drop instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate / 1 Gbps soak - // observable: wire goodput + send-buffer drops (the dominant 1 Gbps+ loss mode) as they happen. + // Throughput/drop + per-frame-latency instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate + // soak observable: wire goodput, send-buffer drops, and the per-frame encode + pacing-tail + // distributions (so the pacing latency the microburst cap is meant to recover is visible). let perf = std::env::var("PUNKTFUNK_PERF").is_ok(); let mut last_perf = std::time::Instant::now(); let mut last_bytes = 0u64; let mut last_send_dropped = 0u64; + let mut encode_us: Vec = Vec::new(); + let mut pace_us: Vec = Vec::new(); + let (mut paced_frames, mut immediate_frames) = (0u64, 0u64); + // Microburst cap: a frame whose sealed size is ≤ this goes out in one immediate burst (no + // pacing latency); only the overflow of a bigger frame (IDR / sustained high bitrate) is spread. + // 128 KB is a conservative default (well under the ~150 Mbps@60 frame size where bursts started + // dropping). Raise it via PUNKTFUNK_PACE_BURST_KB after confirming send_dropped stays 0 on your + // link, to recover more pacing tail on higher-bitrate streams. + let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(128) + * 1024; while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { // Service speed-test probes between frames (each burst pauses video for its duration). service_probes(session, stop, probe_rx, probe_result_tx); @@ -1499,8 +1562,18 @@ fn virtual_stream( } else { FLAG_PIC as u32 }; - paced_submit(session, &au.data, capture_ns, flags, next)?; + let t_encode_done = now_ns(); + let stat = paced_submit(session, &au.data, capture_ns, flags, next, burst_cap)?; sent += 1; + if perf { + encode_us.push((t_encode_done.saturating_sub(capture_ns) / 1000) as u32); + pace_us.push(stat.spread_us); + if stat.paced { + paced_frames += 1; + } else { + immediate_frames += 1; + } + } } if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) { let s = session.stats(); @@ -1509,14 +1582,24 @@ fn virtual_stream( tracing::info!( wire_mbps = format!("{wire_mbps:.0}"), frames = sent, - packets_sent = s.packets_sent, send_dropped = s.packets_send_dropped - last_send_dropped, send_dropped_total = s.packets_send_dropped, + encode_us_p50 = percentile(&mut encode_us, 0.50), + encode_us_p99 = percentile(&mut encode_us, 0.99), + pace_us_p50 = percentile(&mut pace_us, 0.50), + pace_us_p99 = percentile(&mut pace_us, 0.99), + pace_us_max = pace_us.last().copied().unwrap_or(0), + immediate_frames, + paced_frames, "perf" ); last_perf = std::time::Instant::now(); last_bytes = s.bytes_sent; last_send_dropped = s.packets_send_dropped; + encode_us.clear(); + pace_us.clear(); + paced_frames = 0; + immediate_frames = 0; } match next.checked_duration_since(std::time::Instant::now()) { Some(d) => std::thread::sleep(d),