perf(core): recvmsg_x batched receive on Apple (macOS client)
apple / swift (push) Failing after 1m2s
ci / rust (push) Failing after 1m11s
ci / web (push) Failing after 39s
ci / docs-site (push) Failing after 41s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 5s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
deb / build-publish (push) Successful in 3m5s
docker / deploy-docs (push) Successful in 17s
rpm / build-publish (push) Successful in 4m30s

macOS/iOS have no recvmmsg(2), so the Mac client did one recv() syscall per
packet (non-allocating after the earlier fix, but still a syscall each — a
single-core wall at line rate that Moonlight avoids). Add the Darwin recvmsg_x(2)
batched-receive path (the recv counterpart of Linux recvmmsg): one syscall drains
up to RECV_BATCH datagrams into the reused ring. struct msghdr_x + the extern
aren't in the libc crate, so declared here (cfg target_vendor=apple).

Opt-in via PUNKTFUNK_RECVMSG_X (it's FFI we can't exercise off-Apple) with
auto-fallback to the tested scalar recv-loop on any unexpected error. Linux
recvmmsg + the non-Apple scalar loop are unchanged; apple.yml compiles the path.

Re GRO: Linux recv already batches via recvmmsg (32/syscall), so UDP GRO is only a
marginal add there and needs a recv-path redesign to split coalesced buffers —
deferred as low-ROI vs the Mac, which had no batching at all.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-12 23:52:39 +00:00
parent 9c86f667ca
commit 2ebffe3457
+109
View File
@@ -108,6 +108,57 @@ fn send_one_gso(fd: libc::c_int, buf: &[u8], gso_size: u16) -> std::io::Result<(
Ok(()) Ok(())
} }
/// Apple (macOS/iOS) batched-receive enable state. Darwin has no `recvmmsg(2)`, so our macOS client
/// does one `recv` per packet (non-allocating, but a syscall each); `recvmsg_x(2)` is the batched
/// equivalent. Opt-in via `PUNKTFUNK_RECVMSG_X` (it's FFI we can't exercise off-Apple — the scalar
/// recv-loop is the tested default), with auto-fallback if the syscall ever errors unexpectedly.
#[cfg(target_vendor = "apple")]
mod recvx {
use std::sync::atomic::{AtomicU8, Ordering};
static STATE: AtomicU8 = AtomicU8::new(0); // 0 = uninit, 1 = on, 2 = off
pub fn active() -> bool {
match STATE.load(Ordering::Relaxed) {
1 => true,
2 => false,
_ => {
let on = std::env::var_os("PUNKTFUNK_RECVMSG_X").is_some();
STATE.store(if on { 1 } else { 2 }, Ordering::Relaxed);
on
}
}
}
pub fn disable() {
STATE.store(2, Ordering::Relaxed);
}
}
/// `struct msghdr_x` from Darwin `<sys/socket.h>` (the batched-I/O variant — not in the `libc` crate).
#[cfg(target_vendor = "apple")]
#[repr(C)]
struct MsghdrX {
msg_name: *mut libc::c_void,
msg_namelen: libc::socklen_t,
msg_iov: *mut libc::iovec,
msg_iovlen: libc::c_int,
msg_control: *mut libc::c_void,
msg_controllen: libc::socklen_t,
msg_flags: libc::c_int,
msg_datalen: libc::size_t,
}
#[cfg(target_vendor = "apple")]
extern "C" {
/// Darwin batched receive: up to `cnt` datagrams in one syscall; returns the count received and
/// sets each `msg_datalen` to its byte length. Present in libSystem on all macOS/iOS.
fn recvmsg_x(
s: libc::c_int,
msgp: *mut MsghdrX,
cnt: libc::c_uint,
flags: libc::c_int,
) -> libc::ssize_t;
}
pub struct UdpTransport { pub struct UdpTransport {
socket: UdpSocket, socket: UdpSocket,
} }
@@ -162,6 +213,55 @@ impl UdpTransport {
); );
} }
} }
/// Apple batched receive via `recvmsg_x` — drains up to `out.len()` datagrams in one syscall into
/// the caller's reused buffers (the recv counterpart of Linux `recvmmsg`, which Darwin lacks).
/// SAFETY: each `MsghdrX` holds a raw pointer into `iovs`, which holds raw pointers into `out`'s
/// buffers; both `iovs` and `msgs` stay alive and unmoved through the syscall.
#[cfg(target_vendor = "apple")]
fn recv_batch_x(&self, out: &mut [Vec<u8>], lens: &mut [usize]) -> std::io::Result<usize> {
use std::os::fd::AsRawFd;
let fd = self.socket.as_raw_fd();
let n_bufs = out.len().min(lens.len());
if n_bufs == 0 {
return Ok(0);
}
let mut iovs: Vec<libc::iovec> = out[..n_bufs]
.iter_mut()
.map(|b| libc::iovec {
iov_base: b.as_mut_ptr() as *mut libc::c_void,
iov_len: b.len(),
})
.collect();
let mut msgs: Vec<MsghdrX> = iovs
.iter_mut()
.map(|iov| {
let mut m: MsghdrX = unsafe { std::mem::zeroed() };
m.msg_iov = iov as *mut libc::iovec;
m.msg_iovlen = 1;
m
})
.collect();
let n = unsafe {
recvmsg_x(
fd,
msgs.as_mut_ptr(),
n_bufs as libc::c_uint,
libc::MSG_DONTWAIT,
)
};
if n < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
return Ok(0);
}
return Err(err);
}
for (i, m) in msgs[..n as usize].iter().enumerate() {
lens[i] = m.msg_datalen;
}
Ok(n as usize)
}
} }
impl Transport for UdpTransport { impl Transport for UdpTransport {
@@ -337,6 +437,15 @@ impl Transport for UdpTransport {
/// `lens[i] == buf.len()` and the caller (`poll_frame`) drops them — same contract as `recvmmsg`. /// `lens[i] == buf.len()` and the caller (`poll_frame`) drops them — same contract as `recvmmsg`.
#[cfg(all(unix, not(target_os = "linux")))] #[cfg(all(unix, not(target_os = "linux")))]
fn recv_batch(&self, out: &mut [Vec<u8>], lens: &mut [usize]) -> std::io::Result<usize> { fn recv_batch(&self, out: &mut [Vec<u8>], lens: &mut [usize]) -> std::io::Result<usize> {
// Apple: prefer the batched `recvmsg_x` syscall when enabled; a surprise error disables it
// and falls through to the always-correct scalar loop below.
#[cfg(target_vendor = "apple")]
if recvx::active() {
match self.recv_batch_x(out, lens) {
Ok(n) => return Ok(n),
Err(_) => recvx::disable(),
}
}
use std::os::fd::AsRawFd; use std::os::fd::AsRawFd;
let fd = self.socket.as_raw_fd(); let fd = self.socket.as_raw_fd();
let n_bufs = out.len().min(lens.len()); let n_bufs = out.len().min(lens.len());