From b295a5b7a9c82f5ed4c34dfdf8532775aafeb3f5 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 12 Jun 2026 09:42:16 +0000 Subject: [PATCH] perf(latency): encode|send thread split on the native path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bigger-bet #1 from the latency plan. virtual_stream ran capture+encode+seal+ paced-send on ONE thread, so frame N+1's capture/encode couldn't start until frame N's entire paced tail had left the wire — the pacing budget (~0.9×interval) was serialized in front of the next encode. Port GameStream's spawn_sender model to the native path: - A dedicated send thread (`send_loop`) owns the WHOLE Session (so no socket clone or shared/Arc stats needed — `seal_frame` mutates the nonce, `send_sealed` + the probe bursts all live there) and does FEC+seal + microburst-paced send. - The encode thread captures+encodes + handles reconfig and hands each AU over a bounded sync_channel(3) as a FrameMsg (data, capture_ns, flags, deadline, encode_us). It BLOCKS on backpressure if the send falls behind — frames slow down rather than a dropped frame freezing the infinite-GOP stream (we don't drop). Clean shutdown: drop the channel → send thread drains/exits → join. - Probes (run_probe_burst) move to the send thread since they need the Session; a burst naturally pauses video (the encode thread blocks on the full channel). - Per-frame encode_us/pace_us histogram moved to the send thread (carries encode_us in the FrameMsg) and now reflects the overlap. Removes the encode↔paced-tail serialization (~2-8 ms @60-120 fps), independent of the pacing policy, no quality cost. Substrate for the future NVENC slice wrapper. Verified live on this box (appliance restarted onto it): a client streamed the KWin desktop (1.49 MB H.265, clean, no panic) and a 200 Mbps speed-test probe completed through the send thread (0 drops). Build + clippy + fmt green. Real-NIC sustained soak (reconfig under load, line-rate, mode switches) pending the Ubuntu third host. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-host/src/m3.rs | 225 ++++++++++++++++++++++---------- 1 file changed, 158 insertions(+), 67 deletions(-) diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index e88402c..e33791e 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -671,15 +671,15 @@ async fn serve_session( let compositor = compositor .expect("the Virtual source resolves a compositor during the handshake"); virtual_stream( - &mut session, + session, mode, seconds, - &stop_stream, + stop_stream, &reconfig_rx, compositor, bitrate_kbps, - &probe_rx, - &probe_result_tx, + probe_rx, + probe_result_tx, ) } } @@ -1469,6 +1469,104 @@ fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 { sorted_or_not[i] } +/// One encoded frame handed from the capture/encode thread to the send thread (the encode|send +/// split). The send thread does FEC+seal+paced-send while this thread captures+encodes the next. +struct FrameMsg { + data: Vec, + capture_ns: u64, + flags: u32, + /// When this frame's packets should have fully left (the next frame's due time) = the pacing + /// budget. In the past when the send thread is behind → immediate send (catch up). + deadline: std::time::Instant, + /// capture→encoded latency (µs), measured on the encode thread, carried for the perf histogram. + encode_us: u32, +} + +/// The dedicated send thread: it owns the whole [`Session`] (so no socket clone or shared stats are +/// needed) and does FEC+seal + microburst-paced send OFF the capture/encode thread, plus the +/// speed-test probe bursts (which also need the Session). Decoupling the paced send from encoding +/// lets the encode of frame N+1 overlap the transmit of frame N instead of waiting behind its tail. +/// Runs until the encode thread drops the frame channel (end of stream) or `stop` is set. +fn send_loop( + mut session: Session, + frame_rx: std::sync::mpsc::Receiver, + probe_rx: std::sync::mpsc::Receiver, + probe_result_tx: tokio::sync::mpsc::UnboundedSender, + stop: Arc, + perf: bool, + burst_cap: usize, +) { + let mut last_perf = std::time::Instant::now(); + let mut last_bytes = 0u64; + let mut last_send_dropped = 0u64; + let mut encode_us: Vec = Vec::new(); + let mut pace_us: Vec = Vec::new(); + let (mut paced_frames, mut immediate_frames) = (0u64, 0u64); + loop { + if stop.load(Ordering::SeqCst) { + break; + } + // Probes run here (they need the Session); a burst pauses video — the encode thread blocks + // on the full frame channel meanwhile, which is exactly the intended pause. + service_probes(&mut session, &stop, &probe_rx, &probe_result_tx); + // Short timeout so we keep re-checking `stop` + probes when no frames are flowing. + match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) { + Ok(msg) => match paced_submit( + &mut session, + &msg.data, + msg.capture_ns, + msg.flags, + msg.deadline, + burst_cap, + ) { + Ok(stat) => { + if perf { + encode_us.push(msg.encode_us); + pace_us.push(stat.spread_us); + if stat.paced { + paced_frames += 1; + } else { + immediate_frames += 1; + } + } + } + Err(e) => { + tracing::error!(error = %format!("{e:#}"), "send failed — stopping stream"); + break; + } + }, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, // encode thread done + } + if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) { + let s = session.stats(); + let secs = last_perf.elapsed().as_secs_f64(); + // Attempted (sealed) transmit rate; `send_dropped` is what didn't reach the wire. + let tx_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0; + tracing::info!( + tx_mbps = format!("{tx_mbps:.0}"), + send_dropped = s.packets_send_dropped - last_send_dropped, + send_dropped_total = s.packets_send_dropped, + encode_us_p50 = percentile(&mut encode_us, 0.50), + encode_us_p99 = percentile(&mut encode_us, 0.99), + pace_us_p50 = percentile(&mut pace_us, 0.50), + pace_us_p99 = percentile(&mut pace_us, 0.99), + pace_us_max = pace_us.last().copied().unwrap_or(0), + immediate_frames, + paced_frames, + "perf" + ); + last_perf = std::time::Instant::now(); + last_bytes = s.bytes_sent; + last_send_dropped = s.packets_send_dropped; + encode_us.clear(); + pace_us.clear(); + paced_frames = 0; + immediate_frames = 0; + } + } +} + /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs /// stamped with the capture wall clock (the client derives per-frame pipeline latency). /// @@ -1479,15 +1577,15 @@ fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 { /// carry speed-test bursts (see [`service_probes`]). #[allow(clippy::too_many_arguments)] fn virtual_stream( - session: &mut Session, + session: Session, mode: punktfunk_core::Mode, seconds: u32, - stop: &AtomicBool, + stop: Arc, reconfig: &std::sync::mpsc::Receiver, compositor: crate::vdisplay::Compositor, bitrate_kbps: u32, - probe_rx: &std::sync::mpsc::Receiver, - probe_result_tx: &tokio::sync::mpsc::UnboundedSender, + probe_rx: std::sync::mpsc::Receiver, + probe_result_tx: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { tracing::info!( compositor = compositor.id(), @@ -1499,32 +1597,43 @@ fn virtual_stream( let (mut capturer, mut enc, mut frame, mut interval) = build_pipeline_with_retry(&mut vd, mode, bitrate_kbps)?; - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); - let mut next = std::time::Instant::now(); - let mut sent: u64 = 0; - // Throughput/drop + per-frame-latency instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate - // soak observable: wire goodput, send-buffer drops, and the per-frame encode + pacing-tail - // distributions (so the pacing latency the microburst cap is meant to recover is visible). let perf = std::env::var("PUNKTFUNK_PERF").is_ok(); - let mut last_perf = std::time::Instant::now(); - let mut last_bytes = 0u64; - let mut last_send_dropped = 0u64; - let mut encode_us: Vec = Vec::new(); - let mut pace_us: Vec = Vec::new(); - let (mut paced_frames, mut immediate_frames) = (0u64, 0u64); - // Microburst cap: a frame whose sealed size is ≤ this goes out in one immediate burst (no - // pacing latency); only the overflow of a bigger frame (IDR / sustained high bitrate) is spread. - // 128 KB is a conservative default (well under the ~150 Mbps@60 frame size where bursts started - // dropping). Raise it via PUNKTFUNK_PACE_BURST_KB after confirming send_dropped stays 0 on your - // link, to recover more pacing tail on higher-bitrate streams. + // Microburst cap (applied in send_loop/paced_submit): a frame ≤ this bursts out immediately; + // only a bigger frame's overflow is spread. PUNKTFUNK_PACE_BURST_KB overrides the 128 KB default. let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(128) * 1024; + + // Encode|send split: this thread captures+encodes (the GPU work) + handles reconfig, and hands + // each AU to a dedicated send thread that owns the Session and does FEC+seal+paced-send — so the + // encode of frame N+1 overlaps the paced transmit of frame N instead of waiting behind its tail. + // The bounded channel applies backpressure (the encode thread blocks if the send falls behind, + // so frames slow down rather than a dropped frame freezing the infinite-GOP stream). + let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::(3); + let send_thread = std::thread::Builder::new() + .name("punktfunk-send".into()) + .spawn({ + let stop = stop.clone(); + move || { + send_loop( + session, + frame_rx, + probe_rx, + probe_result_tx, + stop, + perf, + burst_cap, + ) + } + }) + .context("spawn send thread")?; + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); + let mut next = std::time::Instant::now(); + let mut sent: u64 = 0; while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { - // Service speed-test probes between frames (each burst pauses video for its duration). - service_probes(session, stop, probe_rx, probe_result_tx); // Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once, // not once per stale intermediate mode. let mut want = None; @@ -1552,61 +1661,43 @@ fn virtual_stream( } let capture_ns = now_ns(); enc.submit(&frame).context("encoder submit")?; - // The deadline for this frame's packets: pace the send up to here so a high-bitrate frame - // spreads over the interval instead of bursting all at once into the NIC (a real link drops - // the burst, freezing the infinite-GOP stream until the next keyframe — the 1 Gbps+ fix). + // The deadline for this frame's packets (the next frame's due time); the send thread paces + // up to here so a high-bitrate frame spreads over the interval instead of bursting. next += interval; + let mut send_gone = false; while let Some(au) = enc.poll().context("encoder poll")? { let flags = if au.keyframe { (FLAG_PIC | FLAG_SOF) as u32 } else { FLAG_PIC as u32 }; - let t_encode_done = now_ns(); - let stat = paced_submit(session, &au.data, capture_ns, flags, next, burst_cap)?; - sent += 1; - if perf { - encode_us.push((t_encode_done.saturating_sub(capture_ns) / 1000) as u32); - pace_us.push(stat.spread_us); - if stat.paced { - paced_frames += 1; - } else { - immediate_frames += 1; - } + let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32; + let msg = FrameMsg { + data: au.data, + capture_ns, + flags, + deadline: next, + encode_us, + }; + // Hand to the send thread; this blocks (backpressure) if it's behind. An Err means it + // exited (send failure / stop) — end the encode loop too. + if frame_tx.send(msg).is_err() { + send_gone = true; + break; } + sent += 1; } - if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) { - let s = session.stats(); - let secs = last_perf.elapsed().as_secs_f64(); - // Attempted (sealed) transmit rate; `send_dropped` below is what didn't reach the wire. - let tx_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0; - tracing::info!( - tx_mbps = format!("{tx_mbps:.0}"), - frames = sent, - send_dropped = s.packets_send_dropped - last_send_dropped, - send_dropped_total = s.packets_send_dropped, - encode_us_p50 = percentile(&mut encode_us, 0.50), - encode_us_p99 = percentile(&mut encode_us, 0.99), - pace_us_p50 = percentile(&mut pace_us, 0.50), - pace_us_p99 = percentile(&mut pace_us, 0.99), - pace_us_max = pace_us.last().copied().unwrap_or(0), - immediate_frames, - paced_frames, - "perf" - ); - last_perf = std::time::Instant::now(); - last_bytes = s.bytes_sent; - last_send_dropped = s.packets_send_dropped; - encode_us.clear(); - pace_us.clear(); - paced_frames = 0; - immediate_frames = 0; + if send_gone { + break; } match next.checked_duration_since(std::time::Instant::now()) { Some(d) => std::thread::sleep(d), None => next = std::time::Instant::now(), } } + // Signal the send thread to drain + exit (drop the channel), then join it. + drop(frame_tx); + let _ = send_thread.join(); tracing::info!(sent, "punktfunk/1 virtual stream complete"); Ok(()) }