diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index 3e89313..fc62997 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -158,7 +158,8 @@ impl Session { /// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`) /// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk. pub fn send_sealed(&self, packets: &[&[u8]]) -> Result { - let sent = self.transport.send_batch(packets)?; + // GSO when enabled (UdpTransport/Linux), else sendmmsg — same short-count drop contract. + let sent = self.transport.send_gso(packets)?; if sent < packets.len() { StatsCounters::add( &self.stats.packets_send_dropped, diff --git a/crates/punktfunk-core/src/transport/mod.rs b/crates/punktfunk-core/src/transport/mod.rs index 511e143..bbd2a00 100644 --- a/crates/punktfunk-core/src/transport/mod.rs +++ b/crates/punktfunk-core/src/transport/mod.rs @@ -33,6 +33,18 @@ pub trait Transport: Send + Sync { Ok(sent) } + /// Send a frame's equal-size packets using UDP Generic Segmentation Offload where available: + /// one `sendmsg` hands the kernel a big buffer it splits into `gso_size` UDP datagrams, building + /// ~1 GSO skb per ≤64 segments instead of one skb per packet. This is the multi-Gbps lever — + /// research shows ~2.4× throughput at equal CPU and ~40× fewer syscalls, and that `sendmmsg` + /// batching alone is insufficient (it still builds one skb per datagram). The + /// [`UdpTransport`](super::UdpTransport) Linux override implements it (opt-in via `PUNKTFUNK_GSO`, + /// auto-fallback on any GSO error); the default just delegates to [`send_batch`](Self::send_batch), + /// correct for loopback and non-Linux. Same lossy, FEC-protected short-count contract as `send_batch`. + fn send_gso(&self, packets: &[&[u8]]) -> std::io::Result { + self.send_batch(packets) + } + fn recv(&self) -> std::io::Result>>; /// Receive up to `out.len()` datagrams in as few syscalls as possible, writing each into its diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index de4f188..801fe63 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -33,6 +33,81 @@ fn mmsghdrs(iovs: &mut [libc::iovec]) -> Vec { .collect() } +/// UDP GSO enable state (process-wide). Opt-in via `PUNKTFUNK_GSO` — it's new unsafe hot-path code, +/// and the auto-fallback (latch off on any GSO syscall error) covers kernels/paths without support. +#[cfg(target_os = "linux")] +mod gso { + use std::sync::atomic::{AtomicU8, Ordering}; + static STATE: AtomicU8 = AtomicU8::new(0); // 0 = uninit, 1 = on, 2 = off + + pub fn active() -> bool { + match STATE.load(Ordering::Relaxed) { + 1 => true, + 2 => false, + _ => { + let on = std::env::var_os("PUNKTFUNK_GSO").is_some(); + STATE.store(if on { 1 } else { 2 }, Ordering::Relaxed); + on + } + } + } + /// Latch GSO off for the process after a GSO syscall error (unsupported kernel/path). + pub fn disable() { + STATE.store(2, Ordering::Relaxed); + } +} + +/// True if the send error means UDP GSO isn't supported here (vs a transient/real failure) — so we +/// latch GSO off and fall back to `sendmmsg` rather than tear the stream down. +#[cfg(target_os = "linux")] +fn gso_unsupported(e: &std::io::Error) -> bool { + matches!( + e.raw_os_error(), + Some(libc::ENOPROTOOPT) | Some(libc::EOPNOTSUPP) | Some(libc::EINVAL) | Some(libc::EIO) + ) +} + +/// One `sendmsg` carrying a `UDP_SEGMENT` control message: the kernel splits `buf` (a back-to-back +/// concatenation of equal-size datagrams, only the final one allowed shorter) into `gso_size`-byte +/// UDP datagrams to the connected peer — one large GSO skb instead of N. `EAGAIN` (full send buffer) +/// surfaces as a `WouldBlock` error; the caller treats it as a lossy drop. +#[cfg(target_os = "linux")] +fn send_one_gso(fd: libc::c_int, buf: &[u8], gso_size: u16) -> std::io::Result<()> { + let mut iov = libc::iovec { + iov_base: buf.as_ptr() as *mut libc::c_void, + iov_len: buf.len(), + }; + // Aligned control buffer for one cmsg(UDP_SEGMENT = u16). 64 B > CMSG_SPACE(2); the union forces + // cmsghdr alignment (CMSG_FIRSTHDR requires it). + #[repr(C)] + union CmsgBuf { + _align: libc::cmsghdr, + bytes: [u8; 64], + } + let mut control = CmsgBuf { bytes: [0u8; 64] }; + let mut msg: libc::msghdr = unsafe { std::mem::zeroed() }; + msg.msg_iov = &mut iov; + msg.msg_iovlen = 1; + let rc = unsafe { + msg.msg_control = control.bytes.as_mut_ptr() as *mut libc::c_void; + msg.msg_controllen = libc::CMSG_SPACE(std::mem::size_of::() as u32) as _; + let cmsg = libc::CMSG_FIRSTHDR(&msg); + (*cmsg).cmsg_level = libc::SOL_UDP; + (*cmsg).cmsg_type = libc::UDP_SEGMENT; + (*cmsg).cmsg_len = libc::CMSG_LEN(std::mem::size_of::() as u32) as _; + std::ptr::copy_nonoverlapping( + (&gso_size as *const u16) as *const u8, + libc::CMSG_DATA(cmsg), + std::mem::size_of::(), + ); + libc::sendmsg(fd, &msg, 0) + }; + if rc < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} + pub struct UdpTransport { socket: UdpSocket, } @@ -146,6 +221,52 @@ impl Transport for UdpTransport { Ok(total_sent) } + /// UDP GSO send (see [`Transport::send_gso`]). Coalesces the frame's equal-size packets into a + /// reused scratch buffer and hands the kernel ≤64-segment super-buffers via `sendmsg(UDP_SEGMENT)` + /// — one GSO skb per chunk instead of one per packet, the multi-Gbps lever. Opt-in + /// (`PUNKTFUNK_GSO`); falls back to `send_batch` when off, when packets aren't uniform-size, or on + /// any GSO error (which also latches it off for the process). Same lossy short-count contract. + #[cfg(target_os = "linux")] + fn send_gso(&self, packets: &[&[u8]]) -> std::io::Result { + use std::os::fd::AsRawFd; + if packets.is_empty() { + return Ok(0); + } + if !gso::active() { + return self.send_batch(packets); + } + // GSO needs every segment but the last to be exactly `seg` bytes. Our wire packets are all + // identical size (shards zero-padded to shard_payload), but guard and fall back if not. + let seg = packets[0].len(); + let last = packets.len() - 1; + if seg == 0 || packets[..last].iter().any(|p| p.len() != seg) || packets[last].len() > seg { + return self.send_batch(packets); + } + let fd = self.socket.as_raw_fd(); + // A GSO super-buffer is capped at 64 segments AND 65535 payload bytes (kernel limits). + let max_seg = (65535 / seg).clamp(1, 64); + let mut scratch: Vec = Vec::with_capacity(seg * max_seg); + let mut sent = 0usize; + for chunk in packets.chunks(max_seg) { + scratch.clear(); + for p in chunk { + scratch.extend_from_slice(p); + } + match send_one_gso(fd, &scratch, seg as u16) { + Ok(()) => sent += chunk.len(), + // Send buffer momentarily full — drop the rest (counted by the caller), never block. + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + // GSO unsupported on this kernel/path — latch off and finish via sendmmsg. + Err(e) if gso_unsupported(&e) => { + gso::disable(); + return Ok(sent + self.send_batch(&packets[sent..])?); + } + Err(e) => return Err(e), + } + } + Ok(sent) + } + fn recv(&self) -> std::io::Result>> { let mut buf = vec![0u8; RECV_BUF]; match self.socket.recv(&mut buf) { @@ -252,6 +373,45 @@ mod tests { use super::*; use crate::transport::Transport; + /// `send_one_gso` must split one buffer into N separate UDP datagrams of `gso_size` bytes each + /// (the kernel UDP GSO segmentation) — the multi-Gbps send lever. Loopback supports GSO; if the + /// CI kernel doesn't, skip rather than fail. + #[cfg(target_os = "linux")] + #[test] + fn gso_segments_into_separate_datagrams() { + use std::os::fd::AsRawFd; + let rx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + rx.set_read_timeout(Some(std::time::Duration::from_secs(2))) + .unwrap(); + let rx_addr = rx.local_addr().unwrap(); + let tx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + tx.connect(rx_addr).unwrap(); + + let seg = 1000usize; + let segs = 5usize; + let mut buf = vec![0u8; seg * segs]; + for i in 0..segs { + buf[i * seg..(i + 1) * seg].fill(i as u8 + 1); + } + if let Err(e) = send_one_gso(tx.as_raw_fd(), &buf, seg as u16) { + if gso_unsupported(&e) { + eprintln!("UDP GSO unsupported on this kernel — skipping"); + return; + } + panic!("gso sendmsg failed: {e}"); + } + // Each segment arrives as its own datagram, full size, content intact. + let mut rbuf = vec![0u8; 4096]; + for i in 0..segs { + let n = rx.recv(&mut rbuf).expect("recv GSO segment"); + assert_eq!(n, seg, "segment {i} should be a full {seg}-byte datagram"); + assert!( + rbuf[..n].iter().all(|&b| b == i as u8 + 1), + "segment {i} content" + ); + } + } + /// `send_batch` delivers a whole frame's worth of packets over real loopback UDP — exercising /// the `sendmmsg` path on Linux (the scalar-loop default elsewhere). 100 × 200 B = 20 KB fits /// the socket buffer, so loopback is lossless and every packet must arrive intact + in order.