perf: M2 — split the data plane into encode | send threads with batched, paced sends (Phase 2)

stream_body no longer sends: each frame's packet batch goes over a depth-2 bounded queue to a
dedicated send thread, so a send spike can never stall capture/encode (a full queue drops the
NEWEST batch — FEC/RFI covers the client — rather than ever blocking). The sender ships packets
with sendmmsg (≤64/syscall: ~375 syscalls/s instead of ~24k at 5K@240) in 16-packet chunks
paced across ~3/4 of the frame interval — microburst shaping for real links without per-packet
sleep jitter. Client-gone detection moved to the sender (clears `running`); the LUMEN_VIDEO_DROP
FEC test knob moved with the send path. Loopback-tested: batches arrive complete and
byte-identical through the paced sendmmsg path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 23:13:34 +00:00
parent 6521146abc
commit c39615d7d1
+192 -24
View File
@@ -54,7 +54,7 @@ pub fn start(
fn run(
cfg: StreamConfig,
running: &AtomicBool,
running: &Arc<AtomicBool>,
force_idr: &AtomicBool,
video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>,
) -> Result<()> {
@@ -126,12 +126,111 @@ fn run(
result
}
/// The encode → packetize → paced-send loop, over a borrowed capturer.
/// One frame's packets, handed from the encode thread to the send thread.
type PacketBatch = Vec<Vec<u8>>;
/// Send `pkts` with as few syscalls as possible (`sendmmsg`, up to 64 per call). The socket is
/// connected, so no per-message address. Returns an error on the first send failure.
fn sendmmsg_all(sock: &UdpSocket, pkts: &[Vec<u8>]) -> std::io::Result<()> {
use std::os::fd::AsRawFd;
const CHUNK: usize = 64;
let fd = sock.as_raw_fd();
for chunk in pkts.chunks(CHUNK) {
let mut iovs: Vec<libc::iovec> = chunk
.iter()
.map(|p| libc::iovec {
iov_base: p.as_ptr() as *mut libc::c_void,
iov_len: p.len(),
})
.collect();
let mut hdrs: Vec<libc::mmsghdr> = iovs
.iter_mut()
.map(|iov| {
let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() };
h.msg_hdr.msg_iov = iov;
h.msg_hdr.msg_iovlen = 1;
h
})
.collect();
let mut off = 0usize;
while off < hdrs.len() {
let n = unsafe {
libc::sendmmsg(fd, hdrs[off..].as_mut_ptr(), (hdrs.len() - off) as u32, 0)
};
if n < 0 {
return Err(std::io::Error::last_os_error());
}
off += n as usize;
}
}
Ok(())
}
/// Dedicated send thread: one [`PacketBatch`] per frame arrives on `rx`; its packets go out in
/// `sendmmsg` chunks, paced so the frame's data spreads over ~3/4 of the frame interval
/// (microburst shaping at chunk granularity — a real link drops line-rate bursts; the encode
/// thread is never blocked by this). On send failure (client gone) it clears `running`.
fn spawn_sender(
sock: UdpSocket,
rx: std::sync::mpsc::Receiver<PacketBatch>,
frame_interval: Duration,
running: Arc<AtomicBool>,
drop_pct: u32,
) -> Result<()> {
std::thread::Builder::new()
.name("lumen-send".into())
.spawn(move || {
// Chunk pacing: 16 packets per burst, bursts spread across the send budget.
const PACE_CHUNK: usize = 16;
let budget = frame_interval.mul_f32(0.75);
let mut rng = rand::thread_rng();
let mut sent: u64 = 0;
let mut dropped: u64 = 0;
while let Ok(mut batch) = rx.recv() {
if drop_pct > 0 {
batch.retain(|_| {
let keep = rng.gen_range(0..100) >= drop_pct;
if !keep {
dropped += 1;
}
keep
});
}
let n = batch.len();
if n == 0 {
continue;
}
let per_chunk = budget.mul_f64((PACE_CHUNK as f64 / n as f64).min(1.0));
let start = Instant::now();
for (i, chunk) in batch.chunks(PACE_CHUNK).enumerate() {
if let Err(e) = sendmmsg_all(&sock, chunk) {
tracing::info!(error = %e, sent, "video: client unreachable — stopping stream");
running.store(false, Ordering::SeqCst);
return;
}
sent += chunk.len() as u64;
// Sleep toward the next chunk's deadline; skip sub-500µs sleeps (jitter).
let target = start + per_chunk.mul_f64((i + 1) as f64);
if let Some(ahead) = target.checked_duration_since(Instant::now()) {
if ahead >= Duration::from_micros(500) {
std::thread::sleep(ahead);
}
}
}
}
tracing::debug!(sent, dropped, "video sender exiting");
})
.context("spawn send thread")?;
Ok(())
}
/// The encode → packetize loop, over a borrowed capturer. Sending runs on a dedicated thread
/// (see [`spawn_sender`]) so a send spike can never stall capture/encode.
fn stream_body(
capturer: &mut dyn Capturer,
sock: &UdpSocket,
cfg: StreamConfig,
running: &AtomicBool,
running: &Arc<AtomicBool>,
force_idr: &AtomicBool,
) -> Result<()> {
// The first frame establishes the authoritative size/format for the encoder.
@@ -167,7 +266,6 @@ fn stream_body(
// bound just guards against an absurd client request (the encoder is opened at `cfg.fps`).
let target_fps = cfg.fps.clamp(1, 240);
let frame_interval = Duration::from_secs_f64(1.0 / target_fps as f64);
let mut sent_pkts: u64 = 0;
let mut fps_count: u32 = 0;
let mut fps_t = Instant::now();
let stream_start = Instant::now();
@@ -176,8 +274,20 @@ fn stream_body(
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0);
let mut rng = rand::thread_rng();
let mut dropped: u64 = 0;
let mut sent_batches: u64 = 0;
let mut dropped_batches: u64 = 0;
// The send thread: one frame's batch at a time over a small bounded queue. Depth 2 means a
// slow send can buffer one frame while the next encodes; beyond that the NEWEST batch is
// dropped (the client recovers via FEC/RFI) rather than ever stalling the encode loop.
let (batch_tx, batch_rx) = std::sync::mpsc::sync_channel::<PacketBatch>(2);
spawn_sender(
sock.try_clone().context("clone video socket")?,
batch_rx,
Duration::from_secs_f64(1.0 / target_fps as f64),
running.clone(),
drop_pct,
)?;
// Per-stage timing (LUMEN_PERF=1): max µs/stage per second + unique vs re-encoded frames,
// to pinpoint stalls. `unique` counts genuinely-new captured frames (vs re-encoded holds).
@@ -216,26 +326,23 @@ fn stream_body(
}
let t_pkt = tick.elapsed();
// Send the frame's packets at line rate. Per-packet pacing (microburst shaping) must NOT
// run on this thread — it serializes against capture/encode and was the prime cause of
// the fps oscillation. Pacing belongs on a dedicated send thread (TODO: split encode|send);
// cadence is driven below by a single absolute deadline.
let mut client_gone = false;
// Hand the frame's packets to the send thread; never block here. A full queue means
// the sender is behind — drop this batch (FEC/RFI covers the client) and keep encoding.
let n = batch.len();
for pkt in &batch {
if drop_pct > 0 && rng.gen_range(0..100) < drop_pct {
dropped += 1; // simulated loss: built the packet, skip the send
} else if sock.send(pkt).is_err() {
client_gone = true;
break;
} else {
sent_pkts += 1;
if n > 0 {
match batch_tx.try_send(batch) {
Ok(()) => sent_batches += 1,
Err(std::sync::mpsc::TrySendError::Full(_)) => {
dropped_batches += 1;
if dropped_batches.is_power_of_two() {
tracing::warn!(dropped_batches, "video: send queue full — frame dropped");
}
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
break; // sender exited (client gone)
}
}
}
if client_gone {
tracing::info!(sent_pkts, "video: client unreachable — stopping stream");
break;
}
if perf {
let t_send = tick.elapsed();
mx_cap = mx_cap.max(t_cap.as_micros());
@@ -268,7 +375,12 @@ fn stream_body(
mx_pkts = 0;
uniq = 0;
} else {
tracing::info!(fps = fps_count, sent_pkts, dropped, "video: streaming");
tracing::info!(
fps = fps_count,
sent_batches,
dropped_batches,
"video: streaming"
);
}
fps_count = 0;
fps_t = Instant::now();
@@ -284,3 +396,59 @@ fn stream_body(
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
/// End-to-end check of the send thread: batches pushed on the channel arrive, complete and
/// byte-identical, at a peer socket via the paced sendmmsg path.
#[test]
fn sender_delivers_batches() {
let rx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
rx_sock
.set_read_timeout(Some(Duration::from_secs(3)))
.unwrap();
let tx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
tx_sock.connect(rx_sock.local_addr().unwrap()).unwrap();
let running = Arc::new(AtomicBool::new(true));
let (tx, rx) = std::sync::mpsc::sync_channel::<PacketBatch>(2);
spawn_sender(
tx_sock,
rx,
Duration::from_millis(8), // ~120fps frame interval
running.clone(),
0,
)
.unwrap();
// 3 frames of 100 packets, content-tagged for verification.
let mut sent = Vec::new();
for f in 0..3u8 {
let batch: PacketBatch = (0..100u8)
.map(|i| {
let mut p = vec![0u8; 1200];
p[0] = f;
p[1] = i;
p
})
.collect();
sent.extend(batch.iter().cloned());
tx.send(batch).unwrap();
}
drop(tx); // sender drains then exits
let mut got = 0usize;
let mut buf = [0u8; 2048];
while got < sent.len() {
let n = rx_sock.recv(&mut buf).expect("packet within timeout");
assert_eq!(n, 1200);
let (f, i) = (buf[0] as usize, buf[1] as usize);
assert_eq!(&buf[..n], &sent[f * 100 + i][..], "payload intact");
got += 1;
}
assert_eq!(got, 300);
assert!(running.load(Ordering::SeqCst), "no spurious client-gone");
}
}