feat(1gbps): batched client recv via recvmmsg (increment C)
ci / rust (push) Has been cancelled

Final increment of the 1 Gbps data-plane rework — the recv counterpart of the
sendmmsg work. The client recv path did one recvfrom + one Vec allocation per
packet (and the pump's 300µs idle sleep could let packets pile up at line rate).

- Transport gains recv_batch(&mut [Vec<u8>], &mut [usize]) -> count; default is
  a single scalar recv into out[0] (loopback + non-Linux).
- UdpTransport overrides it on Linux with recvmmsg (MSG_DONTWAIT) draining up to
  N datagrams per syscall into the caller's reused buffers — no per-packet alloc.
- Session::poll_frame owns a lazily-allocated recv ring (RECV_BATCH=32) and
  consumes it one packet at a time across calls, refilling with one recvmmsg when
  drained. Encapsulated: the punktfunk-client-rs + NativeClient pumps are
  unchanged, and draining a batch per syscall means the 300µs sleep no longer
  underdrains. Added UdpTransport::local_addr (used by the test, generally handy).

~125k → ~4k recv syscalls/sec at line rate, zero per-packet recv allocation.
Verified: new recv_batch_drains_over_loopback test (50 datagrams drained intact
via recvmmsg) + the existing loopback round-trip now runs through the batched
poll_frame; full suite (35 + round-trip + 6) + clippy + fmt green.

Decode-in-place (kill the per-packet open_from_wire alloc) is a separate later
optimization. With A (sendmmsg) + B (paced send) + C (recvmmsg), the native data
plane is batched + paced end to end.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-11 22:39:51 +00:00
parent 10a932d013
commit 2f4f92a804
3 changed files with 166 additions and 10 deletions
+39 -6
View File
@@ -13,7 +13,7 @@ use crate::crypto::SessionCrypto;
use crate::error::{PunktfunkError, Result}; use crate::error::{PunktfunkError, Result};
use crate::fec::{coder_for, ErasureCoder}; use crate::fec::{coder_for, ErasureCoder};
use crate::input::InputEvent; use crate::input::InputEvent;
use crate::packet::{Packetizer, Reassembler, ReassemblerLimits}; use crate::packet::{Packetizer, Reassembler, ReassemblerLimits, MAX_DATAGRAM_BYTES};
use crate::stats::{Stats, StatsCounters}; use crate::stats::{Stats, StatsCounters};
use crate::transport::Transport; use crate::transport::Transport;
@@ -43,8 +43,20 @@ pub struct Session {
stats: StatsCounters, stats: StatsCounters,
/// Monotonic wire sequence, also the AES-GCM nonce counter. /// Monotonic wire sequence, also the AES-GCM nonce counter.
next_seq: u64, next_seq: u64,
/// Client recv ring (reused across [`poll_frame`](Self::poll_frame)): `recvmmsg` drains a batch
/// of datagrams into `recv_scratch` in one syscall, and poll_frame consumes them one at a time
/// across calls (`recv_idx`..`recv_count`), refilling when drained. Allocated lazily on the
/// first client poll so host sessions don't carry it. No per-packet recv alloc at line rate.
recv_scratch: Vec<Vec<u8>>,
recv_lens: Vec<usize>,
recv_count: usize,
recv_idx: usize,
} }
/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k
/// pkt/s this is ~4k syscalls/s instead of 125k; the buffers cost `RECV_BATCH × RECV_BUF` (~64 KB).
const RECV_BATCH: usize = 32;
impl Session { impl Session {
pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> { pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> {
config.validate()?; config.validate()?;
@@ -62,6 +74,10 @@ impl Session {
reassembler, reassembler,
stats: StatsCounters::default(), stats: StatsCounters::default(),
next_seq: 0, next_seq: 0,
recv_scratch: Vec::new(),
recv_lens: Vec::new(),
recv_count: 0,
recv_idx: 0,
config, config,
}) })
} }
@@ -193,12 +209,29 @@ impl Session {
"poll_frame called on a host session", "poll_frame called on a host session",
)); ));
} }
// Lazily allocate the recv ring on first client poll (host sessions never get here).
if self.recv_scratch.is_empty() {
// Each buffer holds a max datagram + 1 (an oversized read fills it → reassembler rejects).
self.recv_scratch = (0..RECV_BATCH)
.map(|_| vec![0u8; MAX_DATAGRAM_BYTES + 1])
.collect();
self.recv_lens = vec![0usize; RECV_BATCH];
}
loop { loop {
let wire = match self.transport.recv()? { // Refill the ring with one `recvmmsg` batch when the current one is drained.
Some(w) => w, if self.recv_idx >= self.recv_count {
None => return Err(PunktfunkError::NoFrame), self.recv_count = self
}; .transport
let pkt = match self.open_from_wire(&wire) { .recv_batch(&mut self.recv_scratch, &mut self.recv_lens)?;
self.recv_idx = 0;
if self.recv_count == 0 {
return Err(PunktfunkError::NoFrame);
}
}
let i = self.recv_idx;
self.recv_idx += 1;
let len = self.recv_lens[i];
let pkt = match self.open_from_wire(&self.recv_scratch[i][..len]) {
Ok(p) => p, Ok(p) => p,
Err(_) => continue, Err(_) => continue,
}; };
@@ -34,4 +34,25 @@ pub trait Transport: Send + Sync {
} }
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>; fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
/// Receive up to `out.len()` datagrams in as few syscalls as possible, writing each into its
/// `out[i]` buffer (sized ≥ a max datagram) and its byte count into `lens[i]`; returns how many
/// arrived (`0` = none available; non-blocking). The recv counterpart of [`send_batch`]: the
/// [`UdpTransport`](super::UdpTransport) override uses `recvmmsg` into a caller-owned, reused
/// buffer ring — no per-packet allocation or syscall at line rate. The default does a single
/// scalar [`recv`](Self::recv) into `out[0]` (correct for the loopback transport + non-Linux).
fn recv_batch(&self, out: &mut [Vec<u8>], lens: &mut [usize]) -> std::io::Result<usize> {
if out.is_empty() {
return Ok(0);
}
match self.recv()? {
Some(pkt) => {
let n = pkt.len().min(out[0].len());
out[0][..n].copy_from_slice(&pkt[..n]);
lens[0] = n;
Ok(1)
}
None => Ok(0),
}
}
} }
+106 -4
View File
@@ -1,9 +1,10 @@
//! Real UDP datagram transport — native sockets, no async runtime. //! Real UDP datagram transport — native sockets, no async runtime.
//! //!
//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the //! Send is batched via `sendmmsg` ([`Transport::send_batch`], ≤64/syscall) and recv via `recvmmsg`
//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per //! ([`Transport::recv_batch`], ≤32/syscall into a reused ring) — the 1 Gbps+ syscall lever
//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining //! (~125k → a few-k syscalls/sec at line rate). The host additionally paces each frame's send
//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam. //! across the frame interval (see `m3.rs::paced_submit`) so a real NIC doesn't drop a line-rate
//! burst. All three layer on this same [`Transport`] seam (scalar fallbacks for loopback/non-Linux).
use super::Transport; use super::Transport;
use crate::packet::MAX_DATAGRAM_BYTES; use crate::packet::MAX_DATAGRAM_BYTES;
@@ -43,6 +44,11 @@ impl UdpTransport {
Ok(UdpTransport { socket }) Ok(UdpTransport { socket })
} }
/// The bound local address (e.g. to learn the OS-assigned ephemeral port).
pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
self.socket.local_addr()
}
/// Best-effort grow of SO_SNDBUF/SO_RCVBUF (see [`TARGET_SOCKBUF`]). A failure isn't fatal /// Best-effort grow of SO_SNDBUF/SO_RCVBUF (see [`TARGET_SOCKBUF`]). A failure isn't fatal
/// (the stream just runs lossier); a grant far below the request means the OS cap is too /// (the stream just runs lossier); a grant far below the request means the OS cap is too
/// low for clean 4K/5K streaming, so warn once with the knob to raise. /// low for clean 4K/5K streaming, so warn once with the knob to raise.
@@ -144,6 +150,58 @@ impl Transport for UdpTransport {
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
/// Batched receive via `recvmmsg` — drains up to `out.len()` datagrams in one syscall into the
/// caller's reused buffers (no per-packet allocation). `MSG_DONTWAIT` keeps it non-blocking
/// (the socket already is); `EAGAIN` → `0`. A datagram larger than a buffer is truncated and
/// `lens[i]` reaches the buffer size — the reassembler then rejects it as malformed, matching
/// `recv`'s oversized-drop. Non-Linux falls back to the trait's scalar `recv` default.
#[cfg(target_os = "linux")]
fn recv_batch(&self, out: &mut [Vec<u8>], lens: &mut [usize]) -> std::io::Result<usize> {
use std::os::fd::AsRawFd;
let fd = self.socket.as_raw_fd();
let n_bufs = out.len().min(lens.len());
if n_bufs == 0 {
return Ok(0);
}
// `hdrs` borrow `iovs` (one per buffer) by raw pointer; both live through the recvmmsg call.
let mut iovs: Vec<libc::iovec> = out[..n_bufs]
.iter_mut()
.map(|b| libc::iovec {
iov_base: b.as_mut_ptr() as *mut libc::c_void,
iov_len: b.len(),
})
.collect();
let mut hdrs: Vec<libc::mmsghdr> = iovs
.iter_mut()
.map(|iov| {
let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() };
h.msg_hdr.msg_iov = iov;
h.msg_hdr.msg_iovlen = 1;
h
})
.collect();
let n = unsafe {
libc::recvmmsg(
fd,
hdrs.as_mut_ptr(),
n_bufs as libc::c_uint,
libc::MSG_DONTWAIT,
std::ptr::null_mut(),
)
};
if n < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
return Ok(0);
}
return Err(err);
}
for (i, h) in hdrs[..n as usize].iter().enumerate() {
lens[i] = h.msg_len as usize;
}
Ok(n as usize)
}
} }
#[cfg(test)] #[cfg(test)]
@@ -197,4 +255,48 @@ mod tests {
"every batched packet should arrive over loopback" "every batched packet should arrive over loopback"
); );
} }
/// `recv_batch` drains many datagrams per call over real loopback UDP — exercising `recvmmsg`
/// on Linux (the scalar `recv` default elsewhere). Send 50 distinct packets, then drain in
/// batches and assert every one arrives intact with the right length.
#[test]
fn recv_batch_drains_over_loopback() {
// Receiver is the UdpTransport (the thing under test); sender is a raw socket bound to a
// known addr so the connected receiver accepts its datagrams.
let tx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
let tx_addr = tx.local_addr().unwrap().to_string();
let rx = UdpTransport::connect("127.0.0.1:0", &tx_addr).unwrap();
let rx_addr = rx.local_addr().unwrap();
const N: u32 = 50;
for i in 0..N {
let mut p = vec![0u8; 300];
p[0..4].copy_from_slice(&i.to_le_bytes());
tx.send_to(&p, rx_addr).unwrap();
}
let mut bufs: Vec<Vec<u8>> = (0..16).map(|_| vec![0u8; RECV_BUF]).collect();
let mut lens = vec![0usize; 16];
let mut seen = std::collections::HashSet::new();
// A few drains absorb scheduling jitter; stop once all N are in or we go dry.
for _ in 0..50 {
let n = rx.recv_batch(&mut bufs, &mut lens).unwrap();
if n == 0 {
if seen.len() == N as usize {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
continue;
}
for i in 0..n {
assert_eq!(lens[i], 300, "recvmmsg reports the datagram length");
seen.insert(u32::from_le_bytes(bufs[i][0..4].try_into().unwrap()));
}
}
assert_eq!(
seen.len(),
N as usize,
"every datagram should be drained via recv_batch"
);
}
} }