perf(latency): microburst-cap pacing + per-frame latency histogram
ci / rust (push) Has been cancelled
ci / rust (push) Has been cancelled
From the latency investigation: the freeze-fix pacing (paced_submit) was the single biggest software-controllable latency term — it unconditionally spread EVERY multi-chunk frame over ~90% of the frame interval, adding up to ~7.5 ms @120 / ~15 ms @60 to a frame's last packet even when the frame was small or the link idle. Recover that on the common case while keeping the freeze fix: - Microburst-cap pacing: a frame whose sealed size is <= a cap (default 128 KB, PUNKTFUNK_PACE_BURST_KB) goes out in ONE immediate burst — no pacing latency. Only the OVERFLOW of a bigger frame (IDR / sustained high bitrate, the bursts that actually overran the tx buffer and froze) is spread. 128 KB is well under the ~150 Mbps@60 frame size where drops began, so the default is safe; raise it after confirming send_dropped stays 0 on a given link. Still never slower than unpaced (budget collapses to 0 with no slack). seal-once/in-order nonce preserved — chunks are split, never reordered or re-sealed. - Per-frame instrumentation (PUNKTFUNK_PERF, zero-cost off): encode_us + pace_us (the pacing tail) p50/p99/max histograms + immediate-vs-paced frame counts in the periodic perf line, so the pacing tail is finally visible and the cap is tunable against real numbers. Host builds + clippy + fmt green. NOT yet deployed to the running hosts (still on the safe full-pacing A+B build) — needs the user's LAN soak to validate the cap doesn't reintroduce send_dropped before raising it. Deferred bigger bets (need real-NIC/GPU/Mac validation): encode|send thread split on the native path, CUDA stream+event (one redundant sync), NVENC slice wrapper, stage-2 Apple presenter, glass-to-glass probe — see docs/roadmap.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+103
-20
@@ -1385,39 +1385,88 @@ fn service_probes(
|
|||||||
/// until the next keyframe — the cause of the "freezes over ~150 Mbps, no image at 400 Mbps"
|
/// until the next keyframe — the cause of the "freezes over ~150 Mbps, no image at 400 Mbps"
|
||||||
/// symptom. When there's little/no slack (encode ≈ interval at very high fps) the budget collapses
|
/// symptom. When there's little/no slack (encode ≈ interval at very high fps) the budget collapses
|
||||||
/// to ~0 and every chunk goes out immediately, so this is never slower than the unpaced path.
|
/// to ~0 and every chunk goes out immediately, so this is never slower than the unpaced path.
|
||||||
|
/// One paced send's outcome: how long the frame's packets took to leave (`spread_us`) and whether
|
||||||
|
/// any were paced (vs the whole frame fitting the microburst and going out immediately). Fed to the
|
||||||
|
/// PUNKTFUNK_PERF histogram so the pacing tail is visible per-frame.
|
||||||
|
struct PaceStat {
|
||||||
|
spread_us: u32,
|
||||||
|
paced: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
const PACE_CHUNK: usize = 16;
|
||||||
|
|
||||||
|
/// Seal one access unit and send it with MICROBURST pacing: the first `burst_cap` bytes go out
|
||||||
|
/// immediately (one absorbed burst the NIC / socket tx-buffer can swallow), and only the OVERFLOW
|
||||||
|
/// beyond that is spread in [`PACE_CHUNK`]-packet chunks across ~90% of the time to `deadline`. So a
|
||||||
|
/// normal-bitrate frame (≤ cap) leaves in one immediate burst at ~0 added latency, while a genuine
|
||||||
|
/// IDR / sustained-high-bitrate frame (≫ cap) still spreads — keeping the freeze fix exactly where
|
||||||
|
/// it's needed (an unpaced line-rate burst overruns the kernel tx buffer → EAGAIN drop → under
|
||||||
|
/// infinite GOP, a freeze until the next keyframe). With no slack (encode ≈ interval) the budget
|
||||||
|
/// collapses to 0 and even the overflow goes out immediately, so this is never slower than unpaced.
|
||||||
fn paced_submit(
|
fn paced_submit(
|
||||||
session: &mut Session,
|
session: &mut Session,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
pts_ns: u64,
|
pts_ns: u64,
|
||||||
flags: u32,
|
flags: u32,
|
||||||
deadline: std::time::Instant,
|
deadline: std::time::Instant,
|
||||||
) -> Result<()> {
|
burst_cap: usize,
|
||||||
const PACE_CHUNK: usize = 16;
|
) -> Result<PaceStat> {
|
||||||
let wires = session
|
let wires = session
|
||||||
.seal_frame(data, pts_ns, flags)
|
.seal_frame(data, pts_ns, flags)
|
||||||
.map_err(|e| anyhow!("seal_frame: {e:?}"))?;
|
.map_err(|e| anyhow!("seal_frame: {e:?}"))?;
|
||||||
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||||||
let n_chunks = refs.len().div_ceil(PACE_CHUNK).max(1);
|
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
// Spread sends over ~90% of the time to the deadline (10% margin for the caller's tail sleep);
|
|
||||||
// 0 when we're already at/past the deadline → no sleeps → immediate send.
|
// Split at the microburst cap: packets [0..split] burst out immediately, [split..] are paced.
|
||||||
let budget = deadline
|
let mut cum = 0usize;
|
||||||
.checked_duration_since(start)
|
let mut split = refs.len();
|
||||||
.unwrap_or_default()
|
for (k, r) in refs.iter().enumerate() {
|
||||||
.mul_f32(0.9);
|
cum += r.len();
|
||||||
for (i, chunk) in refs.chunks(PACE_CHUNK).enumerate() {
|
if cum >= burst_cap {
|
||||||
|
split = k + 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for chunk in refs[..split].chunks(PACE_CHUNK) {
|
||||||
session
|
session
|
||||||
.send_sealed(chunk)
|
.send_sealed(chunk)
|
||||||
.map_err(|e| anyhow!("send_sealed: {e:?}"))?;
|
.map_err(|e| anyhow!("send_sealed: {e:?}"))?;
|
||||||
// Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter).
|
}
|
||||||
let target = start + budget.mul_f64((i + 1) as f64 / n_chunks as f64);
|
let paced = split < refs.len();
|
||||||
if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) {
|
if paced {
|
||||||
if ahead > std::time::Duration::from_micros(500) {
|
let pace_start = std::time::Instant::now();
|
||||||
std::thread::sleep(ahead);
|
let budget = deadline
|
||||||
|
.checked_duration_since(pace_start)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.mul_f32(0.9);
|
||||||
|
let m = refs[split..].len().div_ceil(PACE_CHUNK).max(1);
|
||||||
|
for (j, chunk) in refs[split..].chunks(PACE_CHUNK).enumerate() {
|
||||||
|
session
|
||||||
|
.send_sealed(chunk)
|
||||||
|
.map_err(|e| anyhow!("send_sealed: {e:?}"))?;
|
||||||
|
// Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter).
|
||||||
|
let target = pace_start + budget.mul_f64((j + 1) as f64 / m as f64);
|
||||||
|
if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) {
|
||||||
|
if ahead > std::time::Duration::from_micros(500) {
|
||||||
|
std::thread::sleep(ahead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(PaceStat {
|
||||||
|
spread_us: start.elapsed().as_micros() as u32,
|
||||||
|
paced,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Percentile of a slice (sorts it in place first). `q` in 0.0..=1.0.
|
||||||
|
fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 {
|
||||||
|
if sorted_or_not.is_empty() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
sorted_or_not.sort_unstable();
|
||||||
|
let i = ((sorted_or_not.len() as f64 * q) as usize).min(sorted_or_not.len() - 1);
|
||||||
|
sorted_or_not[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
||||||
@@ -1453,12 +1502,26 @@ fn virtual_stream(
|
|||||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||||
let mut next = std::time::Instant::now();
|
let mut next = std::time::Instant::now();
|
||||||
let mut sent: u64 = 0;
|
let mut sent: u64 = 0;
|
||||||
// Throughput/drop instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate / 1 Gbps soak
|
// Throughput/drop + per-frame-latency instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate
|
||||||
// observable: wire goodput + send-buffer drops (the dominant 1 Gbps+ loss mode) as they happen.
|
// soak observable: wire goodput, send-buffer drops, and the per-frame encode + pacing-tail
|
||||||
|
// distributions (so the pacing latency the microburst cap is meant to recover is visible).
|
||||||
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
||||||
let mut last_perf = std::time::Instant::now();
|
let mut last_perf = std::time::Instant::now();
|
||||||
let mut last_bytes = 0u64;
|
let mut last_bytes = 0u64;
|
||||||
let mut last_send_dropped = 0u64;
|
let mut last_send_dropped = 0u64;
|
||||||
|
let mut encode_us: Vec<u32> = Vec::new();
|
||||||
|
let mut pace_us: Vec<u32> = Vec::new();
|
||||||
|
let (mut paced_frames, mut immediate_frames) = (0u64, 0u64);
|
||||||
|
// Microburst cap: a frame whose sealed size is ≤ this goes out in one immediate burst (no
|
||||||
|
// pacing latency); only the overflow of a bigger frame (IDR / sustained high bitrate) is spread.
|
||||||
|
// 128 KB is a conservative default (well under the ~150 Mbps@60 frame size where bursts started
|
||||||
|
// dropping). Raise it via PUNKTFUNK_PACE_BURST_KB after confirming send_dropped stays 0 on your
|
||||||
|
// link, to recover more pacing tail on higher-bitrate streams.
|
||||||
|
let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse::<usize>().ok())
|
||||||
|
.unwrap_or(128)
|
||||||
|
* 1024;
|
||||||
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||||
// Service speed-test probes between frames (each burst pauses video for its duration).
|
// Service speed-test probes between frames (each burst pauses video for its duration).
|
||||||
service_probes(session, stop, probe_rx, probe_result_tx);
|
service_probes(session, stop, probe_rx, probe_result_tx);
|
||||||
@@ -1499,8 +1562,18 @@ fn virtual_stream(
|
|||||||
} else {
|
} else {
|
||||||
FLAG_PIC as u32
|
FLAG_PIC as u32
|
||||||
};
|
};
|
||||||
paced_submit(session, &au.data, capture_ns, flags, next)?;
|
let t_encode_done = now_ns();
|
||||||
|
let stat = paced_submit(session, &au.data, capture_ns, flags, next, burst_cap)?;
|
||||||
sent += 1;
|
sent += 1;
|
||||||
|
if perf {
|
||||||
|
encode_us.push((t_encode_done.saturating_sub(capture_ns) / 1000) as u32);
|
||||||
|
pace_us.push(stat.spread_us);
|
||||||
|
if stat.paced {
|
||||||
|
paced_frames += 1;
|
||||||
|
} else {
|
||||||
|
immediate_frames += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||||
let s = session.stats();
|
let s = session.stats();
|
||||||
@@ -1509,14 +1582,24 @@ fn virtual_stream(
|
|||||||
tracing::info!(
|
tracing::info!(
|
||||||
wire_mbps = format!("{wire_mbps:.0}"),
|
wire_mbps = format!("{wire_mbps:.0}"),
|
||||||
frames = sent,
|
frames = sent,
|
||||||
packets_sent = s.packets_sent,
|
|
||||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||||
send_dropped_total = s.packets_send_dropped,
|
send_dropped_total = s.packets_send_dropped,
|
||||||
|
encode_us_p50 = percentile(&mut encode_us, 0.50),
|
||||||
|
encode_us_p99 = percentile(&mut encode_us, 0.99),
|
||||||
|
pace_us_p50 = percentile(&mut pace_us, 0.50),
|
||||||
|
pace_us_p99 = percentile(&mut pace_us, 0.99),
|
||||||
|
pace_us_max = pace_us.last().copied().unwrap_or(0),
|
||||||
|
immediate_frames,
|
||||||
|
paced_frames,
|
||||||
"perf"
|
"perf"
|
||||||
);
|
);
|
||||||
last_perf = std::time::Instant::now();
|
last_perf = std::time::Instant::now();
|
||||||
last_bytes = s.bytes_sent;
|
last_bytes = s.bytes_sent;
|
||||||
last_send_dropped = s.packets_send_dropped;
|
last_send_dropped = s.packets_send_dropped;
|
||||||
|
encode_us.clear();
|
||||||
|
pace_us.clear();
|
||||||
|
paced_frames = 0;
|
||||||
|
immediate_frames = 0;
|
||||||
}
|
}
|
||||||
match next.checked_duration_since(std::time::Instant::now()) {
|
match next.checked_duration_since(std::time::Instant::now()) {
|
||||||
Some(d) => std::thread::sleep(d),
|
Some(d) => std::thread::sleep(d),
|
||||||
|
|||||||
Reference in New Issue
Block a user