perf(gamestream): move FEC packetization off the encode loop (3-stage pipeline)

FEC/Reed-Solomon packetization ran inline on the encode loop (~3 ms/frame at 4K),
serializing behind encode and capping the GameStream frame rate below what the
encoder alone can sustain. Split it into a 3-stage pipeline, each stage on its own
thread joined by a depth-2 bounded queue:

  encode loop → [raw AUs] → packetizer (FEC/RS) → [wire batch] → paced sender

- `spawn_packetizer`: turns each `RawFrame`'s access units into wire datagrams via
  the stateful VideoPacketizer, off the encode loop. Above-normal priority (on the
  per-frame critical path). Tallies goodput (bytes to the wire) for the stats window.
- Backpressure chains up: a slow sender blocks the packetizer, which fills the
  encode→packetizer queue, which makes the encode loop drop the NEWEST frame — encode
  itself never waits.
- A dropped frame now consumes no client-visible frameIndex (packetization is
  downstream), so the host re-anchors the reference chain: a drop arms a keyframe on
  the next iteration (`recover_after_drop`), routed through the same coalesce gate as
  client IDR requests so a burst of drops (congestion) can't become an IDR storm.
- Perf/stats relabeled: `pkt` = AU drain, `send` = enqueue to the pipeline (both
  should be near-zero now; nonzero = encode being stalled by pipeline backpressure).
  Goodput read from the packetizer's atomic at the 1 s stats boundary.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-07-05 13:53:43 +00:00
parent fa45608628
commit 677a4f4cf5
+103 -40
View File
@@ -413,6 +413,54 @@ fn pace_layout(n: usize) -> (usize, usize) {
(chunk_sz, steps) (chunk_sz, steps)
} }
/// One encoded frame handed from the encode loop to the packetizer thread: the frame's access
/// units (owned buffers, each with its frame type) plus the shared 90 kHz RTP timestamp. FEC
/// packetization runs on the packetizer thread — off the encode loop — so it never serializes
/// behind encode (measured ~3 ms/frame at 4K, which capped GameStream's frame rate well below what
/// the encoder alone can sustain).
struct RawFrame {
aus: Vec<(Vec<u8>, FrameType)>,
ts: u32,
}
/// Packetizer thread: turns each [`RawFrame`]'s access units into wire datagrams (data + ReedSolomon
/// FEC parity shards) via the stateful [`VideoPacketizer`], then hands the batch to the paced sender.
/// It sits between encode and send so the FEC never blocks the encode loop. Backpressure: the hand-off
/// to the sender BLOCKS, so if the paced sender falls behind, the packetizer stalls and the
/// encode→packetizer queue fills — the encode loop then drops the newest frame (see the loop) rather
/// than stalling. Tallies goodput (bytes handed to the wire) into `goodput` for the encode loop's stats
/// window. Exits when either neighbor's channel closes (session teardown / client gone).
fn spawn_packetizer(
rx: std::sync::mpsc::Receiver<RawFrame>,
tx: std::sync::mpsc::SyncSender<PacketBatch>,
mut pk: VideoPacketizer,
goodput: Arc<std::sync::atomic::AtomicU64>,
) -> Result<()> {
std::thread::Builder::new()
.name("punktfunk-pkt".into())
.spawn(move || {
// Above-normal, like the send thread — this stage is on the per-frame critical path.
crate::punktfunk1::boost_thread_priority(false);
while let Ok(frame) = rx.recv() {
let mut batch: PacketBatch = Vec::new();
for (au, ft) in frame.aus {
batch.extend(pk.packetize(&au, ft, frame.ts));
}
if batch.is_empty() {
continue;
}
let bytes: u64 = batch.iter().map(|p| p.len() as u64).sum();
// Blocking send: propagates the paced sender's backpressure upstream (see above).
if tx.send(batch).is_err() {
break; // sender exited (client gone)
}
goodput.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
}
})
.context("spawn packetizer thread")?;
Ok(())
}
/// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in /// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in
/// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval /// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval
/// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode /// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode
@@ -544,7 +592,7 @@ fn stream_body(
.ok() .ok()
.and_then(|v| v.parse().ok()) .and_then(|v| v.parse().ok())
.unwrap_or(20); .unwrap_or(20);
let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec); let pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec);
// Pace at the client's negotiated frame rate, re-encoding the last captured frame when the // Pace at the client's negotiated frame rate, re-encoding the last captured frame when the
// compositor produced no new one. Compositors only emit frames on damage, so a static or // compositor produced no new one. Compositors only emit frames on damage, so a static or
@@ -564,9 +612,15 @@ fn stream_body(
let mut sent_batches: u64 = 0; let mut sent_batches: u64 = 0;
let mut dropped_batches: u64 = 0; let mut dropped_batches: u64 = 0;
// The send thread: one frame's batch at a time over a small bounded queue. Depth 2 means a // Three-stage pipeline so FEC packetization never blocks encode: `encode loop → [raw AUs] →
// slow send can buffer one frame while the next encodes; beyond that the NEWEST batch is // packetizer (FEC/RS) → [wire batch] → paced sender`, each stage on its own thread joined by a
// dropped (the client recovers via FEC/RFI) rather than ever stalling the encode loop. // depth-2 bounded queue. Depth 2 means a slow stage can buffer one frame while the next is
// produced; beyond that the NEWEST frame is dropped (the client recovers via FEC/RFI) rather than
// stalling the encode loop. Backpressure chains up: a slow sender blocks the packetizer, which
// fills the encode→packetizer queue, which makes the encode loop drop — encode itself never
// waits. Goodput (bytes handed to the wire) is tallied by the packetizer into `goodput`, read at
// the encode loop's 1 s stats boundary (the old inline batch-byte sum moved with packetization).
let goodput = Arc::new(std::sync::atomic::AtomicU64::new(0));
let (batch_tx, batch_rx) = std::sync::mpsc::sync_channel::<PacketBatch>(2); let (batch_tx, batch_rx) = std::sync::mpsc::sync_channel::<PacketBatch>(2);
spawn_sender( spawn_sender(
sock.try_clone().context("clone video socket")?, sock.try_clone().context("clone video socket")?,
@@ -575,12 +629,14 @@ fn stream_body(
running.clone(), running.clone(),
drop_pct, drop_pct,
)?; )?;
let (raw_tx, raw_rx) = std::sync::mpsc::sync_channel::<RawFrame>(2);
spawn_packetizer(raw_rx, batch_tx, pk, goodput.clone())?;
// Per-stage timing (PUNKTFUNK_PERF=1): max µs/stage per second + unique vs re-encoded frames, // Per-stage timing (PUNKTFUNK_PERF=1): max µs/stage per second + unique vs re-encoded frames,
// to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds). // to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds).
let perf = crate::config::config().perf; let perf = crate::config::config().perf;
let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut mx_pkts, mut uniq) = let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut uniq) =
(0u128, 0u128, 0u128, 0u128, 0usize, 0u32); (0u128, 0u128, 0u128, 0u128, 0u32);
// Web-console stats accumulation (active when `perf` OR a capture is armed): per-stage vectors // Web-console stats accumulation (active when `perf` OR a capture is armed): per-stage vectors
// for p50/p99, the goodput bytes queued to the sender this window, the previous window's // for p50/p99, the goodput bytes queued to the sender this window, the previous window's
// dropped-frame count for delta computation, and the registration id cached on the first sample. // dropped-frame count for delta computation, and the registration id cached on the first sample.
@@ -592,7 +648,6 @@ fn stream_body(
let mut sid: Option<u32> = None; let mut sid: Option<u32> = None;
let (mut v_cap, mut v_enc, mut v_pkt, mut v_send): (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>) = let (mut v_cap, mut v_enc, mut v_pkt, mut v_send): (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>) =
(Vec::new(), Vec::new(), Vec::new(), Vec::new()); (Vec::new(), Vec::new(), Vec::new(), Vec::new());
let mut bytes_win: u64 = 0;
let mut last_dropped_batches: u64 = 0; let mut last_dropped_batches: u64 = 0;
// Absolute next-frame deadline — the single pacing clock for the loop. // Absolute next-frame deadline — the single pacing clock for the loop.
let mut next_frame = Instant::now(); let mut next_frame = Instant::now();
@@ -614,6 +669,13 @@ fn stream_body(
// ref-invalidation (cheap, no IDR spike) is never rate-limited — only full keyframes are. // ref-invalidation (cheap, no IDR spike) is never rate-limited — only full keyframes are.
let keyframe_coalesce = frame_interval * 2; let keyframe_coalesce = frame_interval * 2;
let mut last_keyframe: Option<Instant> = None; let mut last_keyframe: Option<Instant> = None;
// A frame dropped at the pipeline head (below) breaks the reference chain for the following
// P-frames: the client never receives it, but the encoder advanced its references past it, and —
// packetization being downstream now — a dropped frame consumes no frameIndex for the client to
// detect the gap. So the host re-anchors itself: a drop arms a keyframe on the next iteration,
// routed through the same coalesce gate as client IDR requests so a burst of drops (congestion)
// can't become an IDR storm.
let mut recover_after_drop = false;
while running.load(Ordering::SeqCst) { while running.load(Ordering::SeqCst) {
let tick = Instant::now(); let tick = Instant::now();
@@ -690,7 +752,9 @@ fn stream_body(
// Honor a client recovery request. Prefer reference-frame invalidation (the encoder // Honor a client recovery request. Prefer reference-frame invalidation (the encoder
// re-references an older still-valid frame — no costly IDR spike); if the encoder can't // re-references an older still-valid frame — no costly IDR spike); if the encoder can't
// invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe. // invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe.
let mut want_keyframe = false; // A prior pipeline drop needs a fresh keyframe to re-anchor the reference chain (see below).
let mut want_keyframe = recover_after_drop;
recover_after_drop = false;
if let Some((first, last)) = rfi_range.lock().unwrap().take() { if let Some((first, last)) = rfi_range.lock().unwrap().take() {
// Prefer reference-frame invalidation when the encoder supports it (no costly IDR // Prefer reference-frame invalidation when the encoder supports it (no costly IDR
// spike); otherwise — or if the range is too old to invalidate — fall back to a keyframe. // spike); otherwise — or if the range is too old to invalidate — fall back to a keyframe.
@@ -723,41 +787,36 @@ fn stream_body(
// 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct. // 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct.
let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32; let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32;
let mut batch: Vec<Vec<u8>> = Vec::new(); // Drain the encoder's access units (owned buffers) — FEC/packetization runs on the
// packetizer thread, off this loop, so it never serializes behind encode.
let mut aus: Vec<(Vec<u8>, FrameType)> = Vec::new();
while let Some(au) = enc.poll().context("encoder poll")? { while let Some(au) = enc.poll().context("encoder poll")? {
let ft = if au.keyframe { let ft = if au.keyframe {
FrameType::Idr FrameType::Idr
} else { } else {
FrameType::P FrameType::P
}; };
batch.extend(pk.packetize(&au.data, ft, ts)); aus.push((au.data, ft));
} }
let t_pkt = tick.elapsed(); let t_pkt = tick.elapsed();
// Hand the frame's packets to the send thread; never block here. A full queue means // Hand the frame's AUs to the pipeline; never block here. A full queue means the pipeline
// the sender is behind — drop this batch (FEC/RFI covers the client) and keep encoding. // (packetizer, or the paced sender behind it) is behind — drop this frame (FEC/RFI covers the
let n = batch.len(); // client) and keep encoding, so a downstream stall can never cap the encode rate.
// Goodput this window = bytes actually queued to the sender (a dropped batch never reaches if !aus.is_empty() {
// the wire, so it's excluded). Summed only when measuring, to keep the idle path free. match raw_tx.try_send(RawFrame { aus, ts }) {
let batch_bytes: u64 = if measure {
batch.iter().map(|p| p.len() as u64).sum()
} else {
0
};
if n > 0 {
match batch_tx.try_send(batch) {
Ok(()) => { Ok(()) => {
sent_batches += 1; sent_batches += 1;
bytes_win += batch_bytes;
} }
Err(std::sync::mpsc::TrySendError::Full(_)) => { Err(std::sync::mpsc::TrySendError::Full(_)) => {
dropped_batches += 1; dropped_batches += 1;
recover_after_drop = true; // re-anchor the reference chain on the next frame
if dropped_batches.is_power_of_two() { if dropped_batches.is_power_of_two() {
tracing::warn!(dropped_batches, "video: send queue full — frame dropped"); tracing::warn!(dropped_batches, "video: pipeline queue full — frame dropped");
} }
} }
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
break; // sender exited (client gone) break; // packetizer/sender exited (client gone)
} }
} }
} }
@@ -765,26 +824,33 @@ fn stream_body(
let t_send = tick.elapsed(); let t_send = tick.elapsed();
let cap_us = t_cap.as_micros(); let cap_us = t_cap.as_micros();
let enc_us = (t_enc - t_cap).as_micros(); let enc_us = (t_enc - t_cap).as_micros();
let pkt_us = (t_pkt - t_enc).as_micros(); // `poll` = drain the encoder's AUs; `enqueue` = hand-off to the pipeline. FEC/packetize
let send_us = (t_send - t_pkt).as_micros(); // and the paced send now run on their own threads, off this loop — so both of these
// should be small; if they aren't, the encode loop is being stalled by pipeline
// backpressure (a full queue), which is the signal that a downstream stage can't keep up.
let poll_us = (t_pkt - t_enc).as_micros();
let enqueue_us = (t_send - t_pkt).as_micros();
mx_cap = mx_cap.max(cap_us); mx_cap = mx_cap.max(cap_us);
mx_enc = mx_enc.max(enc_us); mx_enc = mx_enc.max(enc_us);
mx_pkt = mx_pkt.max(pkt_us); mx_pkt = mx_pkt.max(poll_us);
mx_send = mx_send.max(send_us); mx_send = mx_send.max(enqueue_us);
mx_pkts = mx_pkts.max(n);
v_cap.push(cap_us as u32); v_cap.push(cap_us as u32);
v_enc.push(enc_us as u32); v_enc.push(enc_us as u32);
v_pkt.push(pkt_us as u32); v_pkt.push(poll_us as u32);
v_send.push(send_us as u32); v_send.push(enqueue_us as u32);
} }
fps_count += 1; fps_count += 1;
if fps_t.elapsed() >= Duration::from_secs(1) { if fps_t.elapsed() >= Duration::from_secs(1) {
let secs = fps_t.elapsed().as_secs_f64(); let secs = fps_t.elapsed().as_secs_f64();
// Bytes handed to the wire this window, tallied by the packetizer thread (goodput).
let win_bytes = goodput.swap(0, std::sync::atomic::Ordering::Relaxed);
if perf { if perf {
// Max µs/stage this second: cap=drain channel, enc=submit (zero-copy device // Max µs/stage this second on the ENCODE loop: cap=drain channel, enc=submit
// copy + NVENC), pkt=poll+FEC+packetize, send=paced packet send. `uniq`=new // (zero-copy device copy + NVENC), pkt=poll (AU drain), send=enqueue to the pipeline.
// captured frames (vs re-encoded). `pkts`=max packets in one frame (IDR spike). // FEC/packetize and the paced send run on their own threads now, so pkt/send here
// should be near-zero — a nonzero value means encode is being stalled by pipeline
// backpressure. `uniq`=new captured frames (vs re-encoded).
tracing::info!( tracing::info!(
fps = fps_count, fps = fps_count,
uniq, uniq,
@@ -792,7 +858,6 @@ fn stream_body(
pkt_us = mx_pkt, pkt_us = mx_pkt,
send_us = mx_send, send_us = mx_send,
cap_us = mx_cap, cap_us = mx_cap,
max_pkts = mx_pkts,
"video: streaming (perf)" "video: streaming (perf)"
); );
} else { } else {
@@ -805,7 +870,7 @@ fn stream_body(
} }
// Web-console capture: build the aggregated sample. The host send side exposes no // Web-console capture: build the aggregated sample. The host send side exposes no
// receiver-side packet loss / FEC-recovery / send-buffer EAGAIN counters, so those stay // receiver-side packet loss / FEC-recovery / send-buffer EAGAIN counters, so those stay
// 0 (not fabricated); `frames_dropped` is the per-frame send-queue overflow delta. // 0 (not fabricated); `frames_dropped` is the per-frame pipeline-queue overflow delta.
if stats.is_armed() { if stats.is_armed() {
let session_id = *sid.get_or_insert_with(|| { let session_id = *sid.get_or_insert_with(|| {
stats.register_session( stats.register_session(
@@ -844,7 +909,7 @@ fn stream_body(
], ],
fps: (uniq as f64 / secs) as f32, fps: (uniq as f64 / secs) as f32,
repeat_fps: (fps_count.saturating_sub(uniq) as f64 / secs) as f32, repeat_fps: (fps_count.saturating_sub(uniq) as f64 / secs) as f32,
mbps: (bytes_win as f64 * 8.0 / secs / 1_000_000.0) as f32, mbps: (win_bytes as f64 * 8.0 / secs / 1_000_000.0) as f32,
bitrate_kbps: cfg.bitrate_kbps, bitrate_kbps: cfg.bitrate_kbps,
frames_dropped: dropped_batches.saturating_sub(last_dropped_batches) as u32, frames_dropped: dropped_batches.saturating_sub(last_dropped_batches) as u32,
packets_dropped: 0, packets_dropped: 0,
@@ -857,13 +922,11 @@ fn stream_body(
mx_enc = 0; mx_enc = 0;
mx_pkt = 0; mx_pkt = 0;
mx_send = 0; mx_send = 0;
mx_pkts = 0;
uniq = 0; uniq = 0;
v_cap.clear(); v_cap.clear();
v_enc.clear(); v_enc.clear();
v_pkt.clear(); v_pkt.clear();
v_send.clear(); v_send.clear();
bytes_win = 0;
last_dropped_batches = dropped_batches; last_dropped_batches = dropped_batches;
fps_count = 0; fps_count = 0;
fps_t = Instant::now(); fps_t = Instant::now();