diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index cb9c4f1..de4f188 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -165,7 +165,8 @@ impl Transport for UdpTransport { /// caller's reused buffers (no per-packet allocation). `MSG_DONTWAIT` keeps it non-blocking /// (the socket already is); `EAGAIN` → `0`. A datagram larger than a buffer is truncated and /// `lens[i]` reaches the buffer size — the reassembler then rejects it as malformed, matching - /// `recv`'s oversized-drop. Non-Linux falls back to the trait's scalar `recv` default. + /// `recv`'s oversized-drop. Apple/BSD use the `recv`-loop override below; other non-unix the + /// trait's scalar default. #[cfg(target_os = "linux")] fn recv_batch(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { use std::os::fd::AsRawFd; @@ -204,6 +205,46 @@ impl Transport for UdpTransport { } Ok(n as usize) } + + /// Batched receive for Apple/BSD targets, which have no `recvmmsg(2)`. Drains up to `out.len()` + /// datagrams per call with `libc::recv(MSG_DONTWAIT)` straight into the caller's reused `out[i]` + /// buffers — eliminating the per-packet 2 KB `vec!` allocation (and its zeroing + a copy) that + /// the scalar `recv` + trait-default `recv_batch` incur. THIS is the macOS-client throughput + /// fix: at line rate the alloc/free churn — not the syscall — was the single-core wall (Moonlight + /// batches; our client per-packet-allocated). It is still one syscall per datagram (a future + /// `recvmsg_x` batch would cut that too); `EAGAIN` ends the drain. Oversized datagrams set + /// `lens[i] == buf.len()` and the caller (`poll_frame`) drops them — same contract as `recvmmsg`. + #[cfg(all(unix, not(target_os = "linux")))] + fn recv_batch(&self, out: &mut [Vec], lens: &mut [usize]) -> std::io::Result { + use std::os::fd::AsRawFd; + let fd = self.socket.as_raw_fd(); + let n_bufs = out.len().min(lens.len()); + let mut got = 0usize; + while got < n_bufs { + let buf = &mut out[got]; + let r = unsafe { + libc::recv( + fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len(), + libc::MSG_DONTWAIT, + ) + }; + if r < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + break; // socket drained + } + if got > 0 { + break; // report what we have; surface the error on the next empty poll + } + return Err(err); + } + lens[got] = r as usize; + got += 1; + } + Ok(got) + } } #[cfg(test)]