fix: M2 — eliminate the periodic high-res stream freeze (infinite GOP + single-deadline pacing)

At 5120x1440 the stream froze on a ~2s cadence. Two compounding causes (confirmed by a
profiling pass + adversarial review):

1. Periodic IDR every 2s (set_gop(fps*2)). A keyframe at 5K is ~20-40x a P-frame — a
   recurring multi-millisecond encode+packetize+send spike. Fix: infinite GOP (gop_size=-1),
   one IDR at stream start, P-frames only; forced-idr makes a client recovery request (RFI via
   request_keyframe) emit an IDR on demand — the Moonlight/Sunshine low-latency model.

2. Two pacing timers summing on the capture/encode thread: a per-packet thread::sleep pacer
   (spread a frame's packets across a whole frame interval) PLUS a backstop sleep on top, so
   every frame cost 1-2x the interval and the big IDR blew through it (the 2->120 oscillation).
   Fix: delete both; send at line rate and drive cadence from a single absolute deadline.
   (Proper microburst pacing belongs on a dedicated send thread — a follow-up.)

Also: honor the client's fps (pacing clamp 60->240) and add an env-gated (LUMEN_PERF)
per-stage timing log (enc/pkt/send µs + unique-vs-reencoded frames + max packet burst) for
diagnosing the remaining throughput ceiling. Verified live: freeze gone at 5120x1440.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 20:29:48 +00:00
parent 669d40ae21
commit 0e1853e070
2 changed files with 76 additions and 31 deletions
+67 -30
View File
@@ -160,11 +160,12 @@ fn stream_body(
.unwrap_or(20);
let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec);
// Pace at a steady rate (capped at 60fps), re-encoding the last captured frame when the
// compositor produced no new one. wlroots only emits frames on damage, so a static or
// Pace at the client's negotiated frame rate, re-encoding the last captured frame when the
// compositor produced no new one. Compositors only emit frames on damage, so a static or
// slow-updating desktop would otherwise starve the client into a "network too slow" abort.
// Re-encoding an unchanged frame is cheap — NVENC emits a near-empty P-frame.
let target_fps = cfg.fps.clamp(1, 60);
// Re-encoding an unchanged frame is cheap — NVENC emits a near-empty P-frame. The upper
// bound just guards against an absurd client request (the encoder is opened at `cfg.fps`).
let target_fps = cfg.fps.clamp(1, 240);
let frame_interval = Duration::from_secs_f64(1.0 / target_fps as f64);
let mut sent_pkts: u64 = 0;
let mut fps_count: u32 = 0;
@@ -178,18 +179,29 @@ fn stream_body(
let mut rng = rand::thread_rng();
let mut dropped: u64 = 0;
// Per-stage timing (LUMEN_PERF=1): max µs/stage per second + unique vs re-encoded frames,
// to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds).
let perf = std::env::var_os("LUMEN_PERF").is_some();
let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut mx_pkts, mut uniq) =
(0u128, 0u128, 0u128, 0u128, 0usize, 0u32);
// Absolute next-frame deadline — the single pacing clock for the loop.
let mut next_frame = Instant::now();
while running.load(Ordering::SeqCst) {
let tick = Instant::now();
// Advance to the freshest captured frame if one arrived; otherwise reuse the last.
if let Some(f) = capturer.try_latest().context("capture frame")? {
frame = f;
uniq += 1;
}
let t_cap = tick.elapsed();
// Honor a client recovery request (RFI / request-IDR): force a keyframe so the client
// resyncs immediately instead of waiting for the next GOP boundary.
if force_idr.swap(false, Ordering::SeqCst) {
enc.request_keyframe();
}
enc.submit(&frame).context("encoder submit")?;
let t_enc = tick.elapsed();
// 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct.
let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32;
@@ -202,47 +214,72 @@ fn stream_body(
};
batch.extend(pk.packetize(&au.data, ft, ts));
}
let t_pkt = tick.elapsed();
// Pace the frame's packets evenly across the frame interval rather than blasting them
// at line rate (a real link drops microbursts). The per-packet schedule also paces the
// frame itself; sub-500µs sleeps are skipped (unreliable), batching the spread.
// Send the frame's packets at line rate. Per-packet pacing (microburst shaping) must NOT
// run on this thread — it serializes against capture/encode and was the prime cause of
// the fps oscillation. Pacing belongs on a dedicated send thread (TODO: split encode|send);
// cadence is driven below by a single absolute deadline.
let mut client_gone = false;
let n = batch.len();
if n > 0 {
let per_ns = frame_interval.as_nanos() as u64 / n as u64;
for (i, pkt) in batch.iter().enumerate() {
if drop_pct > 0 && rng.gen_range(0..100) < drop_pct {
dropped += 1; // simulated loss: built the packet, skip the send
} else if sock.send(pkt).is_err() {
client_gone = true;
break;
} else {
sent_pkts += 1;
}
let target = tick + Duration::from_nanos(per_ns * (i as u64 + 1));
if let Some(ahead) = target.checked_duration_since(Instant::now()) {
if ahead >= Duration::from_micros(500) {
std::thread::sleep(ahead);
}
}
for pkt in &batch {
if drop_pct > 0 && rng.gen_range(0..100) < drop_pct {
dropped += 1; // simulated loss: built the packet, skip the send
} else if sock.send(pkt).is_err() {
client_gone = true;
break;
} else {
sent_pkts += 1;
}
}
if client_gone {
tracing::info!(sent_pkts, "video: client unreachable — stopping stream");
break;
}
if perf {
let t_send = tick.elapsed();
mx_cap = mx_cap.max(t_cap.as_micros());
mx_enc = mx_enc.max((t_enc - t_cap).as_micros());
mx_pkt = mx_pkt.max((t_pkt - t_enc).as_micros());
mx_send = mx_send.max((t_send - t_pkt).as_micros());
mx_pkts = mx_pkts.max(n);
}
fps_count += 1;
if fps_t.elapsed() >= Duration::from_secs(1) {
tracing::info!(fps = fps_count, sent_pkts, dropped, "video: streaming");
if perf {
// Max µs/stage this second: cap=drain channel, enc=submit (zero-copy device
// copy + NVENC), pkt=poll+FEC+packetize, send=paced packet send. `uniq`=new
// captured frames (vs re-encoded). `pkts`=max packets in one frame (IDR spike).
tracing::info!(
fps = fps_count,
uniq,
enc_us = mx_enc,
pkt_us = mx_pkt,
send_us = mx_send,
cap_us = mx_cap,
max_pkts = mx_pkts,
"video: streaming (perf)"
);
mx_cap = 0;
mx_enc = 0;
mx_pkt = 0;
mx_send = 0;
mx_pkts = 0;
uniq = 0;
} else {
tracing::info!(fps = fps_count, sent_pkts, dropped, "video: streaming");
}
fps_count = 0;
fps_t = Instant::now();
}
// Backstop the frame rate when few/no packets were produced (the packet pacing above
// otherwise consumes ~one frame interval on its own).
let elapsed = tick.elapsed();
if elapsed < frame_interval {
std::thread::sleep(frame_interval - elapsed);
// Single pacing authority: hold a steady cadence at the target rate from an absolute
// clock. No double-sleep. If a slow frame put us behind, resync to now rather than
// bursting to catch up.
next_frame += frame_interval;
match next_frame.checked_duration_since(Instant::now()) {
Some(d) => std::thread::sleep(d),
None => next_frame = Instant::now(),
}
}
Ok(())