diff --git a/crates/punktfunk-host/src/gamestream/stream.rs b/crates/punktfunk-host/src/gamestream/stream.rs index 04c4731..2380a0f 100644 --- a/crates/punktfunk-host/src/gamestream/stream.rs +++ b/crates/punktfunk-host/src/gamestream/stream.rs @@ -413,6 +413,54 @@ fn pace_layout(n: usize) -> (usize, usize) { (chunk_sz, steps) } +/// One encoded frame handed from the encode loop to the packetizer thread: the frame's access +/// units (owned buffers, each with its frame type) plus the shared 90 kHz RTP timestamp. FEC +/// packetization runs on the packetizer thread — off the encode loop — so it never serializes +/// behind encode (measured ~3 ms/frame at 4K, which capped GameStream's frame rate well below what +/// the encoder alone can sustain). +struct RawFrame { + aus: Vec<(Vec, FrameType)>, + ts: u32, +} + +/// Packetizer thread: turns each [`RawFrame`]'s access units into wire datagrams (data + Reed–Solomon +/// FEC parity shards) via the stateful [`VideoPacketizer`], then hands the batch to the paced sender. +/// It sits between encode and send so the FEC never blocks the encode loop. Backpressure: the hand-off +/// to the sender BLOCKS, so if the paced sender falls behind, the packetizer stalls and the +/// encode→packetizer queue fills — the encode loop then drops the newest frame (see the loop) rather +/// than stalling. Tallies goodput (bytes handed to the wire) into `goodput` for the encode loop's stats +/// window. Exits when either neighbor's channel closes (session teardown / client gone). +fn spawn_packetizer( + rx: std::sync::mpsc::Receiver, + tx: std::sync::mpsc::SyncSender, + mut pk: VideoPacketizer, + goodput: Arc, +) -> Result<()> { + std::thread::Builder::new() + .name("punktfunk-pkt".into()) + .spawn(move || { + // Above-normal, like the send thread — this stage is on the per-frame critical path. + crate::punktfunk1::boost_thread_priority(false); + while let Ok(frame) = rx.recv() { + let mut batch: PacketBatch = Vec::new(); + for (au, ft) in frame.aus { + batch.extend(pk.packetize(&au, ft, frame.ts)); + } + if batch.is_empty() { + continue; + } + let bytes: u64 = batch.iter().map(|p| p.len() as u64).sum(); + // Blocking send: propagates the paced sender's backpressure upstream (see above). + if tx.send(batch).is_err() { + break; // sender exited (client gone) + } + goodput.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); + } + }) + .context("spawn packetizer thread")?; + Ok(()) +} + /// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in /// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval /// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode @@ -544,7 +592,7 @@ fn stream_body( .ok() .and_then(|v| v.parse().ok()) .unwrap_or(20); - let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec); + let pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec); // Pace at the client's negotiated frame rate, re-encoding the last captured frame when the // compositor produced no new one. Compositors only emit frames on damage, so a static or @@ -564,9 +612,15 @@ fn stream_body( let mut sent_batches: u64 = 0; let mut dropped_batches: u64 = 0; - // The send thread: one frame's batch at a time over a small bounded queue. Depth 2 means a - // slow send can buffer one frame while the next encodes; beyond that the NEWEST batch is - // dropped (the client recovers via FEC/RFI) rather than ever stalling the encode loop. + // Three-stage pipeline so FEC packetization never blocks encode: `encode loop → [raw AUs] → + // packetizer (FEC/RS) → [wire batch] → paced sender`, each stage on its own thread joined by a + // depth-2 bounded queue. Depth 2 means a slow stage can buffer one frame while the next is + // produced; beyond that the NEWEST frame is dropped (the client recovers via FEC/RFI) rather than + // stalling the encode loop. Backpressure chains up: a slow sender blocks the packetizer, which + // fills the encode→packetizer queue, which makes the encode loop drop — encode itself never + // waits. Goodput (bytes handed to the wire) is tallied by the packetizer into `goodput`, read at + // the encode loop's 1 s stats boundary (the old inline batch-byte sum moved with packetization). + let goodput = Arc::new(std::sync::atomic::AtomicU64::new(0)); let (batch_tx, batch_rx) = std::sync::mpsc::sync_channel::(2); spawn_sender( sock.try_clone().context("clone video socket")?, @@ -575,12 +629,14 @@ fn stream_body( running.clone(), drop_pct, )?; + let (raw_tx, raw_rx) = std::sync::mpsc::sync_channel::(2); + spawn_packetizer(raw_rx, batch_tx, pk, goodput.clone())?; // Per-stage timing (PUNKTFUNK_PERF=1): max µs/stage per second + unique vs re-encoded frames, // to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds). let perf = crate::config::config().perf; - let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut mx_pkts, mut uniq) = - (0u128, 0u128, 0u128, 0u128, 0usize, 0u32); + let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut uniq) = + (0u128, 0u128, 0u128, 0u128, 0u32); // Web-console stats accumulation (active when `perf` OR a capture is armed): per-stage vectors // for p50/p99, the goodput bytes queued to the sender this window, the previous window's // dropped-frame count for delta computation, and the registration id cached on the first sample. @@ -592,7 +648,6 @@ fn stream_body( let mut sid: Option = None; let (mut v_cap, mut v_enc, mut v_pkt, mut v_send): (Vec, Vec, Vec, Vec) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); - let mut bytes_win: u64 = 0; let mut last_dropped_batches: u64 = 0; // Absolute next-frame deadline — the single pacing clock for the loop. let mut next_frame = Instant::now(); @@ -614,6 +669,13 @@ fn stream_body( // ref-invalidation (cheap, no IDR spike) is never rate-limited — only full keyframes are. let keyframe_coalesce = frame_interval * 2; let mut last_keyframe: Option = None; + // A frame dropped at the pipeline head (below) breaks the reference chain for the following + // P-frames: the client never receives it, but the encoder advanced its references past it, and — + // packetization being downstream now — a dropped frame consumes no frameIndex for the client to + // detect the gap. So the host re-anchors itself: a drop arms a keyframe on the next iteration, + // routed through the same coalesce gate as client IDR requests so a burst of drops (congestion) + // can't become an IDR storm. + let mut recover_after_drop = false; while running.load(Ordering::SeqCst) { let tick = Instant::now(); @@ -690,7 +752,9 @@ fn stream_body( // Honor a client recovery request. Prefer reference-frame invalidation (the encoder // re-references an older still-valid frame — no costly IDR spike); if the encoder can't // invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe. - let mut want_keyframe = false; + // A prior pipeline drop needs a fresh keyframe to re-anchor the reference chain (see below). + let mut want_keyframe = recover_after_drop; + recover_after_drop = false; if let Some((first, last)) = rfi_range.lock().unwrap().take() { // Prefer reference-frame invalidation when the encoder supports it (no costly IDR // spike); otherwise — or if the range is too old to invalidate — fall back to a keyframe. @@ -723,41 +787,36 @@ fn stream_body( // 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct. let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32; - let mut batch: Vec> = Vec::new(); + // Drain the encoder's access units (owned buffers) — FEC/packetization runs on the + // packetizer thread, off this loop, so it never serializes behind encode. + let mut aus: Vec<(Vec, FrameType)> = Vec::new(); while let Some(au) = enc.poll().context("encoder poll")? { let ft = if au.keyframe { FrameType::Idr } else { FrameType::P }; - batch.extend(pk.packetize(&au.data, ft, ts)); + aus.push((au.data, ft)); } let t_pkt = tick.elapsed(); - // Hand the frame's packets to the send thread; never block here. A full queue means - // the sender is behind — drop this batch (FEC/RFI covers the client) and keep encoding. - let n = batch.len(); - // Goodput this window = bytes actually queued to the sender (a dropped batch never reaches - // the wire, so it's excluded). Summed only when measuring, to keep the idle path free. - let batch_bytes: u64 = if measure { - batch.iter().map(|p| p.len() as u64).sum() - } else { - 0 - }; - if n > 0 { - match batch_tx.try_send(batch) { + // Hand the frame's AUs to the pipeline; never block here. A full queue means the pipeline + // (packetizer, or the paced sender behind it) is behind — drop this frame (FEC/RFI covers the + // client) and keep encoding, so a downstream stall can never cap the encode rate. + if !aus.is_empty() { + match raw_tx.try_send(RawFrame { aus, ts }) { Ok(()) => { sent_batches += 1; - bytes_win += batch_bytes; } Err(std::sync::mpsc::TrySendError::Full(_)) => { dropped_batches += 1; + recover_after_drop = true; // re-anchor the reference chain on the next frame if dropped_batches.is_power_of_two() { - tracing::warn!(dropped_batches, "video: send queue full — frame dropped"); + tracing::warn!(dropped_batches, "video: pipeline queue full — frame dropped"); } } Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { - break; // sender exited (client gone) + break; // packetizer/sender exited (client gone) } } } @@ -765,26 +824,33 @@ fn stream_body( let t_send = tick.elapsed(); let cap_us = t_cap.as_micros(); let enc_us = (t_enc - t_cap).as_micros(); - let pkt_us = (t_pkt - t_enc).as_micros(); - let send_us = (t_send - t_pkt).as_micros(); + // `poll` = drain the encoder's AUs; `enqueue` = hand-off to the pipeline. FEC/packetize + // and the paced send now run on their own threads, off this loop — so both of these + // should be small; if they aren't, the encode loop is being stalled by pipeline + // backpressure (a full queue), which is the signal that a downstream stage can't keep up. + let poll_us = (t_pkt - t_enc).as_micros(); + let enqueue_us = (t_send - t_pkt).as_micros(); mx_cap = mx_cap.max(cap_us); mx_enc = mx_enc.max(enc_us); - mx_pkt = mx_pkt.max(pkt_us); - mx_send = mx_send.max(send_us); - mx_pkts = mx_pkts.max(n); + mx_pkt = mx_pkt.max(poll_us); + mx_send = mx_send.max(enqueue_us); v_cap.push(cap_us as u32); v_enc.push(enc_us as u32); - v_pkt.push(pkt_us as u32); - v_send.push(send_us as u32); + v_pkt.push(poll_us as u32); + v_send.push(enqueue_us as u32); } fps_count += 1; if fps_t.elapsed() >= Duration::from_secs(1) { let secs = fps_t.elapsed().as_secs_f64(); + // Bytes handed to the wire this window, tallied by the packetizer thread (goodput). + let win_bytes = goodput.swap(0, std::sync::atomic::Ordering::Relaxed); if perf { - // Max µs/stage this second: cap=drain channel, enc=submit (zero-copy device - // copy + NVENC), pkt=poll+FEC+packetize, send=paced packet send. `uniq`=new - // captured frames (vs re-encoded). `pkts`=max packets in one frame (IDR spike). + // Max µs/stage this second on the ENCODE loop: cap=drain channel, enc=submit + // (zero-copy device copy + NVENC), pkt=poll (AU drain), send=enqueue to the pipeline. + // FEC/packetize and the paced send run on their own threads now, so pkt/send here + // should be near-zero — a nonzero value means encode is being stalled by pipeline + // backpressure. `uniq`=new captured frames (vs re-encoded). tracing::info!( fps = fps_count, uniq, @@ -792,7 +858,6 @@ fn stream_body( pkt_us = mx_pkt, send_us = mx_send, cap_us = mx_cap, - max_pkts = mx_pkts, "video: streaming (perf)" ); } else { @@ -805,7 +870,7 @@ fn stream_body( } // Web-console capture: build the aggregated sample. The host send side exposes no // receiver-side packet loss / FEC-recovery / send-buffer EAGAIN counters, so those stay - // 0 (not fabricated); `frames_dropped` is the per-frame send-queue overflow delta. + // 0 (not fabricated); `frames_dropped` is the per-frame pipeline-queue overflow delta. if stats.is_armed() { let session_id = *sid.get_or_insert_with(|| { stats.register_session( @@ -844,7 +909,7 @@ fn stream_body( ], fps: (uniq as f64 / secs) as f32, repeat_fps: (fps_count.saturating_sub(uniq) as f64 / secs) as f32, - mbps: (bytes_win as f64 * 8.0 / secs / 1_000_000.0) as f32, + mbps: (win_bytes as f64 * 8.0 / secs / 1_000_000.0) as f32, bitrate_kbps: cfg.bitrate_kbps, frames_dropped: dropped_batches.saturating_sub(last_dropped_batches) as u32, packets_dropped: 0, @@ -857,13 +922,11 @@ fn stream_body( mx_enc = 0; mx_pkt = 0; mx_send = 0; - mx_pkts = 0; uniq = 0; v_cap.clear(); v_enc.clear(); v_pkt.clear(); v_send.clear(); - bytes_win = 0; last_dropped_batches = dropped_batches; fps_count = 0; fps_t = Instant::now();