From c24b571e374669eb9d8cdd35430fa0c84eb8ccff Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Thu, 11 Jun 2026 21:55:22 +0000 Subject: [PATCH] feat(1gbps): batched send via sendmmsg (Transport::send_batch) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First increment of the 1 Gbps send-path rework (the measured bottleneck): the native data plane did one send() syscall per packet — at ~125k pkt/s (1 Gbps wire) that burns a core on syscalls. Port the proven GameStream sendmmsg path into the core Transport seam. - Transport gains `send_batch(&[&[u8]]) -> usize` (count handed to the kernel; caller counts the rest as send-buffer drops). Default = the scalar send loop (loopback transport + non-Linux). - UdpTransport overrides it on Linux with `sendmmsg` (64 datagrams/syscall); the connected socket needs no per-message address. Non-blocking-aware: a full send buffer yields a short count / EAGAIN, and we stop + report what went out rather than block or retry (same lossy, FEC-protected contract as send()). - Session::submit_frame seals every shard then hands the whole frame to send_batch in ONE call instead of looping send() — ~64x fewer syscalls per frame on the native + GameStream-over-core paths; send_dropped accounting preserved (total - sent). ~125k → ~2k syscalls/sec at 1 Gbps line rate. Verified: new loopback-UDP test send_batch_delivers_over_loopback (100 batched packets arrive intact, datagram boundaries preserved); full core suite + clippy + fmt green. Next increments: a paced send thread (microburst shaping so a real NIC doesn't drop line-rate bursts) and recvmmsg on the client. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-core/Cargo.toml | 5 + crates/punktfunk-core/src/session.rs | 23 +++-- crates/punktfunk-core/src/transport/mod.rs | 18 ++++ crates/punktfunk-core/src/transport/udp.rs | 108 ++++++++++++++++++++- 4 files changed, 144 insertions(+), 10 deletions(-) diff --git a/crates/punktfunk-core/Cargo.toml b/crates/punktfunk-core/Cargo.toml index 1c9d74e..5f7390a 100644 --- a/crates/punktfunk-core/Cargo.toml +++ b/crates/punktfunk-core/Cargo.toml @@ -46,6 +46,11 @@ hmac = { version = "0.12", optional = true } spake2 = { version = "0.4", optional = true } tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] } +# `sendmmsg` batched UDP send (the 1 Gbps+ syscall lever) — Linux only; other targets use the +# scalar `send` loop fallback. +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2" + [dev-dependencies] proptest = "1" diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index b57149d..b30a1f6 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -118,13 +118,22 @@ impl Session { .packetizer .packetize(data, pts_ns, user_flags, self.coder.as_ref())?; StatsCounters::add(&self.stats.frames_submitted, 1); - for pkt in packets { - let wire = self.seal_for_wire(&pkt)?; - StatsCounters::add(&self.stats.packets_sent, 1); - StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64); - if !self.transport.send(&wire)? { - StatsCounters::add(&self.stats.packets_send_dropped, 1); - } + // Seal every shard (the nonce counter advances per packet, in order), then hand the whole + // frame to the transport in ONE batched call — `sendmmsg` does ~64 packets/syscall instead + // of a `send` each, the dominant 1 Gbps+ lever. (Sealing must finish before the immutable + // `send_batch` borrow; collecting the wires also keeps them alive for the batch's iovecs.) + let mut wires: Vec> = Vec::with_capacity(packets.len()); + for pkt in &packets { + wires.push(self.seal_for_wire(pkt)?); + } + let total = wires.len(); + let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum(); + StatsCounters::add(&self.stats.packets_sent, total as u64); + StatsCounters::add(&self.stats.bytes_sent, bytes); + let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); + let sent = self.transport.send_batch(&refs)?; + if sent < total { + StatsCounters::add(&self.stats.packets_send_dropped, (total - sent) as u64); } Ok(()) } diff --git a/crates/punktfunk-core/src/transport/mod.rs b/crates/punktfunk-core/src/transport/mod.rs index aa4354b..73ab19f 100644 --- a/crates/punktfunk-core/src/transport/mod.rs +++ b/crates/punktfunk-core/src/transport/mod.rs @@ -15,5 +15,23 @@ pub trait Transport: Send + Sync { /// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being /// invisible. `Err` = a real send failure. fn send(&self, packet: &[u8]) -> std::io::Result; + + /// Send a whole frame's packets in as few syscalls as possible, returning how many were + /// handed to the kernel (the caller counts `packets.len() - sent` as send-buffer drops). This + /// is the 1 Gbps+ lever: the [`UdpTransport`](super::UdpTransport) override uses `sendmmsg` + /// (~64 packets/syscall) instead of one `send` each — at ~125k pkt/s that is the difference + /// between ~2k and ~125k syscalls/sec. The default is the scalar `send` loop (correct for the + /// loopback transport and non-Linux builds). On a full send buffer it stops early and reports + /// the partial count rather than blocking — same lossy, FEC-protected contract as `send`. + fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result { + let mut sent = 0; + for p in packets { + if self.send(p)? { + sent += 1; + } + } + Ok(sent) + } + fn recv(&self) -> std::io::Result>>; } diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index 23a0753..fee6d62 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -1,8 +1,9 @@ //! Real UDP datagram transport — native sockets, no async runtime. //! -//! M1 uses one `recv` syscall per packet; the latency budget (§7) calls for -//! `sendmmsg`/UDP-GSO batching to cut syscalls, which is a P2 optimization layered on -//! this same [`Transport`] seam. +//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the +//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per +//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining +//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam. use super::Transport; use crate::packet::MAX_DATAGRAM_BYTES; @@ -81,6 +82,54 @@ impl Transport for UdpTransport { } } + /// Batched send via `sendmmsg` (up to 64 datagrams per syscall) — the connected socket needs + /// no per-message address. The socket is non-blocking, so a full send buffer surfaces as a + /// short count (or `EAGAIN` with nothing sent); we stop and report what went out rather than + /// block or retry — the data plane is lossy + FEC-protected, and blocking would queue stale + /// frames + add latency. Ports the proven GameStream `sendmmsg_all`. Non-Linux falls back to + /// the trait's scalar `send` loop (no `sendmmsg`). + #[cfg(target_os = "linux")] + fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result { + use std::os::fd::AsRawFd; + const CHUNK: usize = 64; + let fd = self.socket.as_raw_fd(); + let mut total_sent = 0usize; + for chunk in packets.chunks(CHUNK) { + // `hdrs` borrow `iovs` by raw pointer; both stay alive through the `sendmmsg` call. + let mut iovs: Vec = chunk + .iter() + .map(|p| libc::iovec { + iov_base: p.as_ptr() as *mut libc::c_void, + iov_len: p.len(), + }) + .collect(); + let mut hdrs: Vec = iovs + .iter_mut() + .map(|iov| { + let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() }; + h.msg_hdr.msg_iov = iov; + h.msg_hdr.msg_iovlen = 1; + h + }) + .collect(); + let n = unsafe { libc::sendmmsg(fd, hdrs.as_mut_ptr(), hdrs.len() as libc::c_uint, 0) }; + if n < 0 { + let err = std::io::Error::last_os_error(); + // Nothing fit in the send buffer — drop this + the remaining chunks (counted by + // the caller). A real error (not WouldBlock) still tears the session down. + if err.kind() == std::io::ErrorKind::WouldBlock { + break; + } + return Err(err); + } + total_sent += n as usize; + if (n as usize) < chunk.len() { + break; // buffer filled mid-chunk — drop the remainder + } + } + Ok(total_sent) + } + fn recv(&self) -> std::io::Result>> { let mut buf = vec![0u8; RECV_BUF]; match self.socket.recv(&mut buf) { @@ -96,3 +145,56 @@ impl Transport for UdpTransport { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::transport::Transport; + + /// `send_batch` delivers a whole frame's worth of packets over real loopback UDP — exercising + /// the `sendmmsg` path on Linux (the scalar-loop default elsewhere). 100 × 200 B = 20 KB fits + /// the socket buffer, so loopback is lossless and every packet must arrive intact + in order. + #[test] + fn send_batch_delivers_over_loopback() { + let rx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + rx.set_read_timeout(Some(std::time::Duration::from_millis(500))) + .unwrap(); + let rx_addr = rx.local_addr().unwrap().to_string(); + let tx = UdpTransport::connect("127.0.0.1:0", &rx_addr).unwrap(); + + const N: u32 = 100; + let payloads: Vec> = (0..N) + .map(|i| { + let mut v = vec![0u8; 200]; + v[0..4].copy_from_slice(&i.to_le_bytes()); + v + }) + .collect(); + let refs: Vec<&[u8]> = payloads.iter().map(|p| p.as_slice()).collect(); + let sent = tx.send_batch(&refs).unwrap(); + assert_eq!( + sent, N as usize, + "send_batch should hand all packets to the kernel" + ); + + let mut seen = std::collections::HashSet::new(); + let mut buf = [0u8; 2048]; + while seen.len() < N as usize { + match rx.recv(&mut buf) { + Ok(n) => { + assert_eq!( + n, 200, + "datagram boundaries preserved (one packet per recv)" + ); + seen.insert(u32::from_le_bytes(buf[0..4].try_into().unwrap())); + } + Err(_) => break, // read timeout — stop and let the assert report the shortfall + } + } + assert_eq!( + seen.len(), + N as usize, + "every batched packet should arrive over loopback" + ); + } +}