From 10a932d013c2c579d3cec8bea1e4f3343b7ff34e Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Thu, 11 Jun 2026 22:15:52 +0000 Subject: [PATCH] feat(1gbps): pace per-frame sends so high-bitrate frames don't burst-drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Increment B of the send-path rework — the actual fix for "freezes get more common over ~150 Mbps, no image at all at 400 Mbps" on the native path. Cause: the encoder emits a frame and submit_frame blasted ALL its packets at once into the NIC; a real link drops the line-rate burst (host send buffer EAGAINs), and under infinite GOP one dropped frame freezes the decode until the next keyframe. (The speed-test probe showed 0 drops at 400 Mbps because the probe is self-paced; real video wasn't.) Adaptive pacing, no extra thread, no regression: - Session splits into seal_frame (FEC + packetize + seal → wire packets, no send) and send_sealed (one batched sendmmsg of a chunk, counts drops); submit_frame is now their composition (synthetic + probe paths unchanged). - virtual_stream's paced_submit seals a frame then sends it in 16-packet chunks spread over ~90% of the time until the next frame is due. At 60 fps desktop (fast encode → lots of slack) the frame spreads across the interval → no NIC burst → no freeze. At 240 fps@5K (encode ≈ interval → ~0 slack) the budget collapses and every chunk goes out immediately → never slower than before. Core suite (34 + loopback round-trip + 6) + clippy + fmt green. The seal/send split is covered by the existing loopback tests; the pacing is host timing, verified by review (live-test needs a real NIC — your Mac at a raised bitrate). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-core/src/session.rs | 50 ++++++++++++++++++++------- crates/punktfunk-host/src/m3.rs | 51 +++++++++++++++++++++++++--- 2 files changed, 84 insertions(+), 17 deletions(-) diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index b30a1f6..f619a73 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -107,34 +107,58 @@ impl Session { // -- Host path -------------------------------------------------------- - /// Host: FEC-protect, packetize, seal, and send one encoded access unit. - pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> { + /// Host: FEC-protect, packetize, and seal one encoded access unit into wire packets WITHOUT + /// sending them. Counts the frame + its packets/bytes as submitted; the caller transmits the + /// returned packets via [`send_sealed`](Self::send_sealed) — in one call, or in chunks paced + /// over the frame interval so a real NIC doesn't drop the whole frame as a line-rate burst (the + /// 1 Gbps+ freeze fix). The nonce counter advances per packet, in order, so seal once and send + /// the result intact. (Holding the `Vec>` also keeps the buffers alive for the batch.) + pub fn seal_frame( + &mut self, + data: &[u8], + pts_ns: u64, + user_flags: u32, + ) -> Result>> { if self.config.role != Role::Host { return Err(PunktfunkError::InvalidArg( - "submit_frame called on a client session", + "seal_frame called on a client session", )); } let packets = self .packetizer .packetize(data, pts_ns, user_flags, self.coder.as_ref())?; StatsCounters::add(&self.stats.frames_submitted, 1); - // Seal every shard (the nonce counter advances per packet, in order), then hand the whole - // frame to the transport in ONE batched call — `sendmmsg` does ~64 packets/syscall instead - // of a `send` each, the dominant 1 Gbps+ lever. (Sealing must finish before the immutable - // `send_batch` borrow; collecting the wires also keeps them alive for the batch's iovecs.) let mut wires: Vec> = Vec::with_capacity(packets.len()); for pkt in &packets { wires.push(self.seal_for_wire(pkt)?); } - let total = wires.len(); let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum(); - StatsCounters::add(&self.stats.packets_sent, total as u64); + StatsCounters::add(&self.stats.packets_sent, wires.len() as u64); StatsCounters::add(&self.stats.bytes_sent, bytes); - let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); - let sent = self.transport.send_batch(&refs)?; - if sent < total { - StatsCounters::add(&self.stats.packets_send_dropped, (total - sent) as u64); + Ok(wires) + } + + /// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single + /// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`) + /// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk. + pub fn send_sealed(&self, packets: &[&[u8]]) -> Result { + let sent = self.transport.send_batch(packets)?; + if sent < packets.len() { + StatsCounters::add( + &self.stats.packets_send_dropped, + (packets.len() - sent) as u64, + ); } + Ok(sent) + } + + /// Host: FEC-protect, packetize, seal, and send one encoded access unit (the whole frame in one + /// batched send). Convenience composition of [`seal_frame`](Self::seal_frame) + + /// [`send_sealed`](Self::send_sealed) for callers that don't pace (synthetic source, probe). + pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> { + let wires = self.seal_frame(data, pts_ns, user_flags)?; + let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); + self.send_sealed(&refs)?; Ok(()) } diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index b22e274..2709df3 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1378,6 +1378,48 @@ fn service_probes( } } +/// Seal one access unit and send its packets PACED over the budget until `deadline` (the next +/// frame's due time), in 16-packet `sendmmsg` chunks — so a high-bitrate frame spreads across the +/// frame interval instead of bursting all at once into the NIC. A real link drops a line-rate burst +/// (the host send buffer EAGAINs), and under infinite GOP a single dropped frame freezes the decode +/// until the next keyframe — the cause of the "freezes over ~150 Mbps, no image at 400 Mbps" +/// symptom. When there's little/no slack (encode ≈ interval at very high fps) the budget collapses +/// to ~0 and every chunk goes out immediately, so this is never slower than the unpaced path. +fn paced_submit( + session: &mut Session, + data: &[u8], + pts_ns: u64, + flags: u32, + deadline: std::time::Instant, +) -> Result<()> { + const PACE_CHUNK: usize = 16; + let wires = session + .seal_frame(data, pts_ns, flags) + .map_err(|e| anyhow!("seal_frame: {e:?}"))?; + let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); + let n_chunks = refs.len().div_ceil(PACE_CHUNK).max(1); + let start = std::time::Instant::now(); + // Spread sends over ~90% of the time to the deadline (10% margin for the caller's tail sleep); + // 0 when we're already at/past the deadline → no sleeps → immediate send. + let budget = deadline + .checked_duration_since(start) + .unwrap_or_default() + .mul_f32(0.9); + for (i, chunk) in refs.chunks(PACE_CHUNK).enumerate() { + session + .send_sealed(chunk) + .map_err(|e| anyhow!("send_sealed: {e:?}"))?; + // Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter). + let target = start + budget.mul_f64((i + 1) as f64 / n_chunks as f64); + if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) { + if ahead > std::time::Duration::from_micros(500) { + std::thread::sleep(ahead); + } + } + } + Ok(()) +} + /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs /// stamped with the capture wall clock (the client derives per-frame pipeline latency). /// @@ -1447,15 +1489,17 @@ fn virtual_stream( } let capture_ns = now_ns(); enc.submit(&frame).context("encoder submit")?; + // The deadline for this frame's packets: pace the send up to here so a high-bitrate frame + // spreads over the interval instead of bursting all at once into the NIC (a real link drops + // the burst, freezing the infinite-GOP stream until the next keyframe — the 1 Gbps+ fix). + next += interval; while let Some(au) = enc.poll().context("encoder poll")? { let flags = if au.keyframe { (FLAG_PIC | FLAG_SOF) as u32 } else { FLAG_PIC as u32 }; - session - .submit_frame(&au.data, capture_ns, flags) - .map_err(|e| anyhow!("submit_frame: {e:?}"))?; + paced_submit(session, &au.data, capture_ns, flags, next)?; sent += 1; } if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) { @@ -1474,7 +1518,6 @@ fn virtual_stream( last_bytes = s.bytes_sent; last_send_dropped = s.packets_send_dropped; } - next += interval; match next.checked_duration_since(std::time::Instant::now()) { Some(d) => std::thread::sleep(d), None => next = std::time::Instant::now(),