From 2ebffe34572e57bd05a199cfae9ea637b569400b Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 12 Jun 2026 23:52:39 +0000 Subject: [PATCH] perf(core): recvmsg_x batched receive on Apple (macOS client) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit macOS/iOS have no recvmmsg(2), so the Mac client did one recv() syscall per packet (non-allocating after the earlier fix, but still a syscall each — a single-core wall at line rate that Moonlight avoids). Add the Darwin recvmsg_x(2) batched-receive path (the recv counterpart of Linux recvmmsg): one syscall drains up to RECV_BATCH datagrams into the reused ring. struct msghdr_x + the extern aren't in the libc crate, so declared here (cfg target_vendor=apple). Opt-in via PUNKTFUNK_RECVMSG_X (it's FFI we can't exercise off-Apple) with auto-fallback to the tested scalar recv-loop on any unexpected error. Linux recvmmsg + the non-Apple scalar loop are unchanged; apple.yml compiles the path. Re GRO: Linux recv already batches via recvmmsg (32/syscall), so UDP GRO is only a marginal add there and needs a recv-path redesign to split coalesced buffers — deferred as low-ROI vs the Mac, which had no batching at all. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-core/src/transport/udp.rs | 109 +++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index 801fe63..2b0d385 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -108,6 +108,57 @@ fn send_one_gso(fd: libc::c_int, buf: &[u8], gso_size: u16) -> std::io::Result<( Ok(()) } +/// Apple (macOS/iOS) batched-receive enable state. Darwin has no `recvmmsg(2)`, so our macOS client +/// does one `recv` per packet (non-allocating, but a syscall each); `recvmsg_x(2)` is the batched +/// equivalent. Opt-in via `PUNKTFUNK_RECVMSG_X` (it's FFI we can't exercise off-Apple — the scalar +/// recv-loop is the tested default), with auto-fallback if the syscall ever errors unexpectedly. +#[cfg(target_vendor = "apple")] +mod recvx { + use std::sync::atomic::{AtomicU8, Ordering}; + static STATE: AtomicU8 = AtomicU8::new(0); // 0 = uninit, 1 = on, 2 = off + + pub fn active() -> bool { + match STATE.load(Ordering::Relaxed) { + 1 => true, + 2 => false, + _ => { + let on = std::env::var_os("PUNKTFUNK_RECVMSG_X").is_some(); + STATE.store(if on { 1 } else { 2 }, Ordering::Relaxed); + on + } + } + } + pub fn disable() { + STATE.store(2, Ordering::Relaxed); + } +} + +/// `struct msghdr_x` from Darwin `` (the batched-I/O variant — not in the `libc` crate). +#[cfg(target_vendor = "apple")] +#[repr(C)] +struct MsghdrX { + msg_name: *mut libc::c_void, + msg_namelen: libc::socklen_t, + msg_iov: *mut libc::iovec, + msg_iovlen: libc::c_int, + msg_control: *mut libc::c_void, + msg_controllen: libc::socklen_t, + msg_flags: libc::c_int, + msg_datalen: libc::size_t, +} + +#[cfg(target_vendor = "apple")] +extern "C" { + /// Darwin batched receive: up to `cnt` datagrams in one syscall; returns the count received and + /// sets each `msg_datalen` to its byte length. Present in libSystem on all macOS/iOS. + fn recvmsg_x( + s: libc::c_int, + msgp: *mut MsghdrX, + cnt: libc::c_uint, + flags: libc::c_int, + ) -> libc::ssize_t; +} + pub struct UdpTransport { socket: UdpSocket, } @@ -162,6 +213,55 @@ impl UdpTransport { ); } } + + /// Apple batched receive via `recvmsg_x` — drains up to `out.len()` datagrams in one syscall into + /// the caller's reused buffers (the recv counterpart of Linux `recvmmsg`, which Darwin lacks). + /// SAFETY: each `MsghdrX` holds a raw pointer into `iovs`, which holds raw pointers into `out`'s + /// buffers; both `iovs` and `msgs` stay alive and unmoved through the syscall. + #[cfg(target_vendor = "apple")] + fn recv_batch_x(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { + use std::os::fd::AsRawFd; + let fd = self.socket.as_raw_fd(); + let n_bufs = out.len().min(lens.len()); + if n_bufs == 0 { + return Ok(0); + } + let mut iovs: Vec = out[..n_bufs] + .iter_mut() + .map(|b| libc::iovec { + iov_base: b.as_mut_ptr() as *mut libc::c_void, + iov_len: b.len(), + }) + .collect(); + let mut msgs: Vec = iovs + .iter_mut() + .map(|iov| { + let mut m: MsghdrX = unsafe { std::mem::zeroed() }; + m.msg_iov = iov as *mut libc::iovec; + m.msg_iovlen = 1; + m + }) + .collect(); + let n = unsafe { + recvmsg_x( + fd, + msgs.as_mut_ptr(), + n_bufs as libc::c_uint, + libc::MSG_DONTWAIT, + ) + }; + if n < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + return Ok(0); + } + return Err(err); + } + for (i, m) in msgs[..n as usize].iter().enumerate() { + lens[i] = m.msg_datalen; + } + Ok(n as usize) + } } impl Transport for UdpTransport { @@ -337,6 +437,15 @@ impl Transport for UdpTransport { /// `lens[i] == buf.len()` and the caller (`poll_frame`) drops them — same contract as `recvmmsg`. #[cfg(all(unix, not(target_os = "linux")))] fn recv_batch(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { + // Apple: prefer the batched `recvmsg_x` syscall when enabled; a surprise error disables it + // and falls through to the always-correct scalar loop below. + #[cfg(target_vendor = "apple")] + if recvx::active() { + match self.recv_batch_x(out, lens) { + Ok(n) => return Ok(n), + Err(_) => recvx::disable(), + } + } use std::os::fd::AsRawFd; let fd = self.socket.as_raw_fd(); let n_bufs = out.len().min(lens.len());