From c39615d7d1b7393a24b8512cb31cc1e74402004a Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Tue, 9 Jun 2026 23:13:34 +0000 Subject: [PATCH] =?UTF-8?q?perf:=20M2=20=E2=80=94=20split=20the=20data=20p?= =?UTF-8?q?lane=20into=20encode=20|=20send=20threads=20with=20batched,=20p?= =?UTF-8?q?aced=20sends=20(Phase=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stream_body no longer sends: each frame's packet batch goes over a depth-2 bounded queue to a dedicated send thread, so a send spike can never stall capture/encode (a full queue drops the NEWEST batch — FEC/RFI covers the client — rather than ever blocking). The sender ships packets with sendmmsg (≤64/syscall: ~375 syscalls/s instead of ~24k at 5K@240) in 16-packet chunks paced across ~3/4 of the frame interval — microburst shaping for real links without per-packet sleep jitter. Client-gone detection moved to the sender (clears `running`); the LUMEN_VIDEO_DROP FEC test knob moved with the send path. Loopback-tested: batches arrive complete and byte-identical through the paced sendmmsg path. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/lumen-host/src/gamestream/stream.rs | 216 ++++++++++++++++++--- 1 file changed, 192 insertions(+), 24 deletions(-) diff --git a/crates/lumen-host/src/gamestream/stream.rs b/crates/lumen-host/src/gamestream/stream.rs index bcc0253..999d9b5 100644 --- a/crates/lumen-host/src/gamestream/stream.rs +++ b/crates/lumen-host/src/gamestream/stream.rs @@ -54,7 +54,7 @@ pub fn start( fn run( cfg: StreamConfig, - running: &AtomicBool, + running: &Arc, force_idr: &AtomicBool, video_cap: &std::sync::Mutex>>, ) -> Result<()> { @@ -126,12 +126,111 @@ fn run( result } -/// The encode → packetize → paced-send loop, over a borrowed capturer. +/// One frame's packets, handed from the encode thread to the send thread. +type PacketBatch = Vec>; + +/// Send `pkts` with as few syscalls as possible (`sendmmsg`, up to 64 per call). The socket is +/// connected, so no per-message address. Returns an error on the first send failure. +fn sendmmsg_all(sock: &UdpSocket, pkts: &[Vec]) -> std::io::Result<()> { + use std::os::fd::AsRawFd; + const CHUNK: usize = 64; + let fd = sock.as_raw_fd(); + for chunk in pkts.chunks(CHUNK) { + let mut iovs: Vec = chunk + .iter() + .map(|p| libc::iovec { + iov_base: p.as_ptr() as *mut libc::c_void, + iov_len: p.len(), + }) + .collect(); + let mut hdrs: Vec = iovs + .iter_mut() + .map(|iov| { + let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() }; + h.msg_hdr.msg_iov = iov; + h.msg_hdr.msg_iovlen = 1; + h + }) + .collect(); + let mut off = 0usize; + while off < hdrs.len() { + let n = unsafe { + libc::sendmmsg(fd, hdrs[off..].as_mut_ptr(), (hdrs.len() - off) as u32, 0) + }; + if n < 0 { + return Err(std::io::Error::last_os_error()); + } + off += n as usize; + } + } + Ok(()) +} + +/// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in +/// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval +/// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode +/// thread is never blocked by this). On send failure (client gone) it clears `running`. +fn spawn_sender( + sock: UdpSocket, + rx: std::sync::mpsc::Receiver, + frame_interval: Duration, + running: Arc, + drop_pct: u32, +) -> Result<()> { + std::thread::Builder::new() + .name("lumen-send".into()) + .spawn(move || { + // Chunk pacing: 16 packets per burst, bursts spread across the send budget. + const PACE_CHUNK: usize = 16; + let budget = frame_interval.mul_f32(0.75); + let mut rng = rand::thread_rng(); + let mut sent: u64 = 0; + let mut dropped: u64 = 0; + while let Ok(mut batch) = rx.recv() { + if drop_pct > 0 { + batch.retain(|_| { + let keep = rng.gen_range(0..100) >= drop_pct; + if !keep { + dropped += 1; + } + keep + }); + } + let n = batch.len(); + if n == 0 { + continue; + } + let per_chunk = budget.mul_f64((PACE_CHUNK as f64 / n as f64).min(1.0)); + let start = Instant::now(); + for (i, chunk) in batch.chunks(PACE_CHUNK).enumerate() { + if let Err(e) = sendmmsg_all(&sock, chunk) { + tracing::info!(error = %e, sent, "video: client unreachable — stopping stream"); + running.store(false, Ordering::SeqCst); + return; + } + sent += chunk.len() as u64; + // Sleep toward the next chunk's deadline; skip sub-500µs sleeps (jitter). + let target = start + per_chunk.mul_f64((i + 1) as f64); + if let Some(ahead) = target.checked_duration_since(Instant::now()) { + if ahead >= Duration::from_micros(500) { + std::thread::sleep(ahead); + } + } + } + } + tracing::debug!(sent, dropped, "video sender exiting"); + }) + .context("spawn send thread")?; + Ok(()) +} + +/// The encode → packetize loop, over a borrowed capturer. Sending runs on a dedicated thread +/// (see [`spawn_sender`]) so a send spike can never stall capture/encode. fn stream_body( capturer: &mut dyn Capturer, sock: &UdpSocket, cfg: StreamConfig, - running: &AtomicBool, + running: &Arc, force_idr: &AtomicBool, ) -> Result<()> { // The first frame establishes the authoritative size/format for the encoder. @@ -167,7 +266,6 @@ fn stream_body( // bound just guards against an absurd client request (the encoder is opened at `cfg.fps`). let target_fps = cfg.fps.clamp(1, 240); let frame_interval = Duration::from_secs_f64(1.0 / target_fps as f64); - let mut sent_pkts: u64 = 0; let mut fps_count: u32 = 0; let mut fps_t = Instant::now(); let stream_start = Instant::now(); @@ -176,8 +274,20 @@ fn stream_body( .ok() .and_then(|v| v.parse().ok()) .unwrap_or(0); - let mut rng = rand::thread_rng(); - let mut dropped: u64 = 0; + let mut sent_batches: u64 = 0; + let mut dropped_batches: u64 = 0; + + // The send thread: one frame's batch at a time over a small bounded queue. Depth 2 means a + // slow send can buffer one frame while the next encodes; beyond that the NEWEST batch is + // dropped (the client recovers via FEC/RFI) rather than ever stalling the encode loop. + let (batch_tx, batch_rx) = std::sync::mpsc::sync_channel::(2); + spawn_sender( + sock.try_clone().context("clone video socket")?, + batch_rx, + Duration::from_secs_f64(1.0 / target_fps as f64), + running.clone(), + drop_pct, + )?; // Per-stage timing (LUMEN_PERF=1): max µs/stage per second + unique vs re-encoded frames, // to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds). @@ -216,26 +326,23 @@ fn stream_body( } let t_pkt = tick.elapsed(); - // Send the frame's packets at line rate. Per-packet pacing (microburst shaping) must NOT - // run on this thread — it serializes against capture/encode and was the prime cause of - // the fps oscillation. Pacing belongs on a dedicated send thread (TODO: split encode|send); - // cadence is driven below by a single absolute deadline. - let mut client_gone = false; + // Hand the frame's packets to the send thread; never block here. A full queue means + // the sender is behind — drop this batch (FEC/RFI covers the client) and keep encoding. let n = batch.len(); - for pkt in &batch { - if drop_pct > 0 && rng.gen_range(0..100) < drop_pct { - dropped += 1; // simulated loss: built the packet, skip the send - } else if sock.send(pkt).is_err() { - client_gone = true; - break; - } else { - sent_pkts += 1; + if n > 0 { + match batch_tx.try_send(batch) { + Ok(()) => sent_batches += 1, + Err(std::sync::mpsc::TrySendError::Full(_)) => { + dropped_batches += 1; + if dropped_batches.is_power_of_two() { + tracing::warn!(dropped_batches, "video: send queue full — frame dropped"); + } + } + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + break; // sender exited (client gone) + } } } - if client_gone { - tracing::info!(sent_pkts, "video: client unreachable — stopping stream"); - break; - } if perf { let t_send = tick.elapsed(); mx_cap = mx_cap.max(t_cap.as_micros()); @@ -268,7 +375,12 @@ fn stream_body( mx_pkts = 0; uniq = 0; } else { - tracing::info!(fps = fps_count, sent_pkts, dropped, "video: streaming"); + tracing::info!( + fps = fps_count, + sent_batches, + dropped_batches, + "video: streaming" + ); } fps_count = 0; fps_t = Instant::now(); @@ -284,3 +396,59 @@ fn stream_body( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + /// End-to-end check of the send thread: batches pushed on the channel arrive, complete and + /// byte-identical, at a peer socket via the paced sendmmsg path. + #[test] + fn sender_delivers_batches() { + let rx_sock = UdpSocket::bind("127.0.0.1:0").unwrap(); + rx_sock + .set_read_timeout(Some(Duration::from_secs(3))) + .unwrap(); + let tx_sock = UdpSocket::bind("127.0.0.1:0").unwrap(); + tx_sock.connect(rx_sock.local_addr().unwrap()).unwrap(); + + let running = Arc::new(AtomicBool::new(true)); + let (tx, rx) = std::sync::mpsc::sync_channel::(2); + spawn_sender( + tx_sock, + rx, + Duration::from_millis(8), // ~120fps frame interval + running.clone(), + 0, + ) + .unwrap(); + + // 3 frames of 100 packets, content-tagged for verification. + let mut sent = Vec::new(); + for f in 0..3u8 { + let batch: PacketBatch = (0..100u8) + .map(|i| { + let mut p = vec![0u8; 1200]; + p[0] = f; + p[1] = i; + p + }) + .collect(); + sent.extend(batch.iter().cloned()); + tx.send(batch).unwrap(); + } + drop(tx); // sender drains then exits + + let mut got = 0usize; + let mut buf = [0u8; 2048]; + while got < sent.len() { + let n = rx_sock.recv(&mut buf).expect("packet within timeout"); + assert_eq!(n, 1200); + let (f, i) = (buf[0] as usize, buf[1] as usize); + assert_eq!(&buf[..n], &sent[f * 100 + i][..], "payload intact"); + got += 1; + } + assert_eq!(got, 300); + assert!(running.load(Ordering::SeqCst), "no spurious client-gone"); + } +}