feat(1gbps): batched send via sendmmsg (Transport::send_batch)
ci / rust (push) Has been cancelled

First increment of the 1 Gbps send-path rework (the measured bottleneck): the
native data plane did one send() syscall per packet — at ~125k pkt/s (1 Gbps
wire) that burns a core on syscalls. Port the proven GameStream sendmmsg path
into the core Transport seam.

- Transport gains `send_batch(&[&[u8]]) -> usize` (count handed to the kernel;
  caller counts the rest as send-buffer drops). Default = the scalar send loop
  (loopback transport + non-Linux).
- UdpTransport overrides it on Linux with `sendmmsg` (64 datagrams/syscall);
  the connected socket needs no per-message address. Non-blocking-aware: a full
  send buffer yields a short count / EAGAIN, and we stop + report what went out
  rather than block or retry (same lossy, FEC-protected contract as send()).
- Session::submit_frame seals every shard then hands the whole frame to
  send_batch in ONE call instead of looping send() — ~64x fewer syscalls per
  frame on the native + GameStream-over-core paths; send_dropped accounting
  preserved (total - sent).

~125k → ~2k syscalls/sec at 1 Gbps line rate. Verified: new loopback-UDP test
send_batch_delivers_over_loopback (100 batched packets arrive intact, datagram
boundaries preserved); full core suite + clippy + fmt green.

Next increments: a paced send thread (microburst shaping so a real NIC doesn't
drop line-rate bursts) and recvmmsg on the client.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-11 21:55:22 +00:00
parent e1242546f2
commit c24b571e37
4 changed files with 144 additions and 10 deletions
@@ -15,5 +15,23 @@ pub trait Transport: Send + Sync {
/// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being
/// invisible. `Err` = a real send failure.
fn send(&self, packet: &[u8]) -> std::io::Result<bool>;
/// Send a whole frame's packets in as few syscalls as possible, returning how many were
/// handed to the kernel (the caller counts `packets.len() - sent` as send-buffer drops). This
/// is the 1 Gbps+ lever: the [`UdpTransport`](super::UdpTransport) override uses `sendmmsg`
/// (~64 packets/syscall) instead of one `send` each — at ~125k pkt/s that is the difference
/// between ~2k and ~125k syscalls/sec. The default is the scalar `send` loop (correct for the
/// loopback transport and non-Linux builds). On a full send buffer it stops early and reports
/// the partial count rather than blocking — same lossy, FEC-protected contract as `send`.
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
let mut sent = 0;
for p in packets {
if self.send(p)? {
sent += 1;
}
}
Ok(sent)
}
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
}
+105 -3
View File
@@ -1,8 +1,9 @@
//! Real UDP datagram transport — native sockets, no async runtime.
//!
//! M1 uses one `recv` syscall per packet; the latency budget (§7) calls for
//! `sendmmsg`/UDP-GSO batching to cut syscalls, which is a P2 optimization layered on
//! this same [`Transport`] seam.
//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the
//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per
//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining
//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam.
use super::Transport;
use crate::packet::MAX_DATAGRAM_BYTES;
@@ -81,6 +82,54 @@ impl Transport for UdpTransport {
}
}
/// Batched send via `sendmmsg` (up to 64 datagrams per syscall) — the connected socket needs
/// no per-message address. The socket is non-blocking, so a full send buffer surfaces as a
/// short count (or `EAGAIN` with nothing sent); we stop and report what went out rather than
/// block or retry — the data plane is lossy + FEC-protected, and blocking would queue stale
/// frames + add latency. Ports the proven GameStream `sendmmsg_all`. Non-Linux falls back to
/// the trait's scalar `send` loop (no `sendmmsg`).
#[cfg(target_os = "linux")]
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
use std::os::fd::AsRawFd;
const CHUNK: usize = 64;
let fd = self.socket.as_raw_fd();
let mut total_sent = 0usize;
for chunk in packets.chunks(CHUNK) {
// `hdrs` borrow `iovs` by raw pointer; both stay alive through the `sendmmsg` call.
let mut iovs: Vec<libc::iovec> = chunk
.iter()
.map(|p| libc::iovec {
iov_base: p.as_ptr() as *mut libc::c_void,
iov_len: p.len(),
})
.collect();
let mut hdrs: Vec<libc::mmsghdr> = iovs
.iter_mut()
.map(|iov| {
let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() };
h.msg_hdr.msg_iov = iov;
h.msg_hdr.msg_iovlen = 1;
h
})
.collect();
let n = unsafe { libc::sendmmsg(fd, hdrs.as_mut_ptr(), hdrs.len() as libc::c_uint, 0) };
if n < 0 {
let err = std::io::Error::last_os_error();
// Nothing fit in the send buffer — drop this + the remaining chunks (counted by
// the caller). A real error (not WouldBlock) still tears the session down.
if err.kind() == std::io::ErrorKind::WouldBlock {
break;
}
return Err(err);
}
total_sent += n as usize;
if (n as usize) < chunk.len() {
break; // buffer filled mid-chunk — drop the remainder
}
}
Ok(total_sent)
}
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
let mut buf = vec![0u8; RECV_BUF];
match self.socket.recv(&mut buf) {
@@ -96,3 +145,56 @@ impl Transport for UdpTransport {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::Transport;
/// `send_batch` delivers a whole frame's worth of packets over real loopback UDP — exercising
/// the `sendmmsg` path on Linux (the scalar-loop default elsewhere). 100 × 200 B = 20 KB fits
/// the socket buffer, so loopback is lossless and every packet must arrive intact + in order.
#[test]
fn send_batch_delivers_over_loopback() {
let rx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
rx.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let rx_addr = rx.local_addr().unwrap().to_string();
let tx = UdpTransport::connect("127.0.0.1:0", &rx_addr).unwrap();
const N: u32 = 100;
let payloads: Vec<Vec<u8>> = (0..N)
.map(|i| {
let mut v = vec![0u8; 200];
v[0..4].copy_from_slice(&i.to_le_bytes());
v
})
.collect();
let refs: Vec<&[u8]> = payloads.iter().map(|p| p.as_slice()).collect();
let sent = tx.send_batch(&refs).unwrap();
assert_eq!(
sent, N as usize,
"send_batch should hand all packets to the kernel"
);
let mut seen = std::collections::HashSet::new();
let mut buf = [0u8; 2048];
while seen.len() < N as usize {
match rx.recv(&mut buf) {
Ok(n) => {
assert_eq!(
n, 200,
"datagram boundaries preserved (one packet per recv)"
);
seen.insert(u32::from_le_bytes(buf[0..4].try_into().unwrap()));
}
Err(_) => break, // read timeout — stop and let the assert report the shortfall
}
}
assert_eq!(
seen.len(),
N as usize,
"every batched packet should arrive over loopback"
);
}
}