diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index f619a73..17c8ee7 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -13,7 +13,7 @@ use crate::crypto::SessionCrypto; use crate::error::{PunktfunkError, Result}; use crate::fec::{coder_for, ErasureCoder}; use crate::input::InputEvent; -use crate::packet::{Packetizer, Reassembler, ReassemblerLimits}; +use crate::packet::{Packetizer, Reassembler, ReassemblerLimits, MAX_DATAGRAM_BYTES}; use crate::stats::{Stats, StatsCounters}; use crate::transport::Transport; @@ -43,8 +43,20 @@ pub struct Session { stats: StatsCounters, /// Monotonic wire sequence, also the AES-GCM nonce counter. next_seq: u64, + /// Client recv ring (reused across [`poll_frame`](Self::poll_frame)): `recvmmsg` drains a batch + /// of datagrams into `recv_scratch` in one syscall, and poll_frame consumes them one at a time + /// across calls (`recv_idx`..`recv_count`), refilling when drained. Allocated lazily on the + /// first client poll so host sessions don't carry it. No per-packet recv alloc at line rate. + recv_scratch: Vec>, + recv_lens: Vec, + recv_count: usize, + recv_idx: usize, } +/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k +/// pkt/s this is ~4k syscalls/s instead of 125k; the buffers cost `RECV_BATCH × RECV_BUF` (~64 KB). +const RECV_BATCH: usize = 32; + impl Session { pub fn new(config: Config, transport: Box) -> Result { config.validate()?; @@ -62,6 +74,10 @@ impl Session { reassembler, stats: StatsCounters::default(), next_seq: 0, + recv_scratch: Vec::new(), + recv_lens: Vec::new(), + recv_count: 0, + recv_idx: 0, config, }) } @@ -193,12 +209,29 @@ impl Session { "poll_frame called on a host session", )); } + // Lazily allocate the recv ring on first client poll (host sessions never get here). + if self.recv_scratch.is_empty() { + // Each buffer holds a max datagram + 1 (an oversized read fills it → reassembler rejects). + self.recv_scratch = (0..RECV_BATCH) + .map(|_| vec![0u8; MAX_DATAGRAM_BYTES + 1]) + .collect(); + self.recv_lens = vec![0usize; RECV_BATCH]; + } loop { - let wire = match self.transport.recv()? { - Some(w) => w, - None => return Err(PunktfunkError::NoFrame), - }; - let pkt = match self.open_from_wire(&wire) { + // Refill the ring with one `recvmmsg` batch when the current one is drained. + if self.recv_idx >= self.recv_count { + self.recv_count = self + .transport + .recv_batch(&mut self.recv_scratch, &mut self.recv_lens)?; + self.recv_idx = 0; + if self.recv_count == 0 { + return Err(PunktfunkError::NoFrame); + } + } + let i = self.recv_idx; + self.recv_idx += 1; + let len = self.recv_lens[i]; + let pkt = match self.open_from_wire(&self.recv_scratch[i][..len]) { Ok(p) => p, Err(_) => continue, }; diff --git a/crates/punktfunk-core/src/transport/mod.rs b/crates/punktfunk-core/src/transport/mod.rs index 73ab19f..511e143 100644 --- a/crates/punktfunk-core/src/transport/mod.rs +++ b/crates/punktfunk-core/src/transport/mod.rs @@ -34,4 +34,25 @@ pub trait Transport: Send + Sync { } fn recv(&self) -> std::io::Result>>; + + /// Receive up to `out.len()` datagrams in as few syscalls as possible, writing each into its + /// `out[i]` buffer (sized ≥ a max datagram) and its byte count into `lens[i]`; returns how many + /// arrived (`0` = none available; non-blocking). The recv counterpart of [`send_batch`]: the + /// [`UdpTransport`](super::UdpTransport) override uses `recvmmsg` into a caller-owned, reused + /// buffer ring — no per-packet allocation or syscall at line rate. The default does a single + /// scalar [`recv`](Self::recv) into `out[0]` (correct for the loopback transport + non-Linux). + fn recv_batch(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { + if out.is_empty() { + return Ok(0); + } + match self.recv()? { + Some(pkt) => { + let n = pkt.len().min(out[0].len()); + out[0][..n].copy_from_slice(&pkt[..n]); + lens[0] = n; + Ok(1) + } + None => Ok(0), + } + } } diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index fee6d62..70cb87a 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -1,9 +1,10 @@ //! Real UDP datagram transport — native sockets, no async runtime. //! -//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the -//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per -//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining -//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam. +//! Send is batched via `sendmmsg` ([`Transport::send_batch`], ≤64/syscall) and recv via `recvmmsg` +//! ([`Transport::recv_batch`], ≤32/syscall into a reused ring) — the 1 Gbps+ syscall lever +//! (~125k → a few-k syscalls/sec at line rate). The host additionally paces each frame's send +//! across the frame interval (see `m3.rs::paced_submit`) so a real NIC doesn't drop a line-rate +//! burst. All three layer on this same [`Transport`] seam (scalar fallbacks for loopback/non-Linux). use super::Transport; use crate::packet::MAX_DATAGRAM_BYTES; @@ -43,6 +44,11 @@ impl UdpTransport { Ok(UdpTransport { socket }) } + /// The bound local address (e.g. to learn the OS-assigned ephemeral port). + pub fn local_addr(&self) -> std::io::Result { + self.socket.local_addr() + } + /// Best-effort grow of SO_SNDBUF/SO_RCVBUF (see [`TARGET_SOCKBUF`]). A failure isn't fatal /// (the stream just runs lossier); a grant far below the request means the OS cap is too /// low for clean 4K/5K streaming, so warn once with the knob to raise. @@ -144,6 +150,58 @@ impl Transport for UdpTransport { Err(e) => Err(e), } } + + /// Batched receive via `recvmmsg` — drains up to `out.len()` datagrams in one syscall into the + /// caller's reused buffers (no per-packet allocation). `MSG_DONTWAIT` keeps it non-blocking + /// (the socket already is); `EAGAIN` → `0`. A datagram larger than a buffer is truncated and + /// `lens[i]` reaches the buffer size — the reassembler then rejects it as malformed, matching + /// `recv`'s oversized-drop. Non-Linux falls back to the trait's scalar `recv` default. + #[cfg(target_os = "linux")] + fn recv_batch(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { + use std::os::fd::AsRawFd; + let fd = self.socket.as_raw_fd(); + let n_bufs = out.len().min(lens.len()); + if n_bufs == 0 { + return Ok(0); + } + // `hdrs` borrow `iovs` (one per buffer) by raw pointer; both live through the recvmmsg call. + let mut iovs: Vec = out[..n_bufs] + .iter_mut() + .map(|b| libc::iovec { + iov_base: b.as_mut_ptr() as *mut libc::c_void, + iov_len: b.len(), + }) + .collect(); + let mut hdrs: Vec = iovs + .iter_mut() + .map(|iov| { + let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() }; + h.msg_hdr.msg_iov = iov; + h.msg_hdr.msg_iovlen = 1; + h + }) + .collect(); + let n = unsafe { + libc::recvmmsg( + fd, + hdrs.as_mut_ptr(), + n_bufs as libc::c_uint, + libc::MSG_DONTWAIT, + std::ptr::null_mut(), + ) + }; + if n < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + return Ok(0); + } + return Err(err); + } + for (i, h) in hdrs[..n as usize].iter().enumerate() { + lens[i] = h.msg_len as usize; + } + Ok(n as usize) + } } #[cfg(test)] @@ -197,4 +255,48 @@ mod tests { "every batched packet should arrive over loopback" ); } + + /// `recv_batch` drains many datagrams per call over real loopback UDP — exercising `recvmmsg` + /// on Linux (the scalar `recv` default elsewhere). Send 50 distinct packets, then drain in + /// batches and assert every one arrives intact with the right length. + #[test] + fn recv_batch_drains_over_loopback() { + // Receiver is the UdpTransport (the thing under test); sender is a raw socket bound to a + // known addr so the connected receiver accepts its datagrams. + let tx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + let tx_addr = tx.local_addr().unwrap().to_string(); + let rx = UdpTransport::connect("127.0.0.1:0", &tx_addr).unwrap(); + let rx_addr = rx.local_addr().unwrap(); + + const N: u32 = 50; + for i in 0..N { + let mut p = vec![0u8; 300]; + p[0..4].copy_from_slice(&i.to_le_bytes()); + tx.send_to(&p, rx_addr).unwrap(); + } + + let mut bufs: Vec> = (0..16).map(|_| vec![0u8; RECV_BUF]).collect(); + let mut lens = vec![0usize; 16]; + let mut seen = std::collections::HashSet::new(); + // A few drains absorb scheduling jitter; stop once all N are in or we go dry. + for _ in 0..50 { + let n = rx.recv_batch(&mut bufs, &mut lens).unwrap(); + if n == 0 { + if seen.len() == N as usize { + break; + } + std::thread::sleep(std::time::Duration::from_millis(5)); + continue; + } + for i in 0..n { + assert_eq!(lens[i], 300, "recvmmsg reports the datagram length"); + seen.insert(u32::from_le_bytes(bufs[i][0..4].try_into().unwrap())); + } + } + assert_eq!( + seen.len(), + N as usize, + "every datagram should be drained via recv_batch" + ); + } }