First increment of the 1 Gbps send-path rework (the measured bottleneck): the native data plane did one send() syscall per packet — at ~125k pkt/s (1 Gbps wire) that burns a core on syscalls. Port the proven GameStream sendmmsg path into the core Transport seam. - Transport gains `send_batch(&[&[u8]]) -> usize` (count handed to the kernel; caller counts the rest as send-buffer drops). Default = the scalar send loop (loopback transport + non-Linux). - UdpTransport overrides it on Linux with `sendmmsg` (64 datagrams/syscall); the connected socket needs no per-message address. Non-blocking-aware: a full send buffer yields a short count / EAGAIN, and we stop + report what went out rather than block or retry (same lossy, FEC-protected contract as send()). - Session::submit_frame seals every shard then hands the whole frame to send_batch in ONE call instead of looping send() — ~64x fewer syscalls per frame on the native + GameStream-over-core paths; send_dropped accounting preserved (total - sent). ~125k → ~2k syscalls/sec at 1 Gbps line rate. Verified: new loopback-UDP test send_batch_delivers_over_loopback (100 batched packets arrive intact, datagram boundaries preserved); full core suite + clippy + fmt green. Next increments: a paced send thread (microburst shaping so a real NIC doesn't drop line-rate bursts) and recvmmsg on the client. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,11 @@ hmac = { version = "0.12", optional = true }
|
||||
spake2 = { version = "0.4", optional = true }
|
||||
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] }
|
||||
|
||||
# `sendmmsg` batched UDP send (the 1 Gbps+ syscall lever) — Linux only; other targets use the
|
||||
# scalar `send` loop fallback.
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
libc = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
|
||||
|
||||
@@ -118,13 +118,22 @@ impl Session {
|
||||
.packetizer
|
||||
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
||||
StatsCounters::add(&self.stats.frames_submitted, 1);
|
||||
for pkt in packets {
|
||||
let wire = self.seal_for_wire(&pkt)?;
|
||||
StatsCounters::add(&self.stats.packets_sent, 1);
|
||||
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
|
||||
if !self.transport.send(&wire)? {
|
||||
StatsCounters::add(&self.stats.packets_send_dropped, 1);
|
||||
}
|
||||
// Seal every shard (the nonce counter advances per packet, in order), then hand the whole
|
||||
// frame to the transport in ONE batched call — `sendmmsg` does ~64 packets/syscall instead
|
||||
// of a `send` each, the dominant 1 Gbps+ lever. (Sealing must finish before the immutable
|
||||
// `send_batch` borrow; collecting the wires also keeps them alive for the batch's iovecs.)
|
||||
let mut wires: Vec<Vec<u8>> = Vec::with_capacity(packets.len());
|
||||
for pkt in &packets {
|
||||
wires.push(self.seal_for_wire(pkt)?);
|
||||
}
|
||||
let total = wires.len();
|
||||
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
|
||||
StatsCounters::add(&self.stats.packets_sent, total as u64);
|
||||
StatsCounters::add(&self.stats.bytes_sent, bytes);
|
||||
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||||
let sent = self.transport.send_batch(&refs)?;
|
||||
if sent < total {
|
||||
StatsCounters::add(&self.stats.packets_send_dropped, (total - sent) as u64);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -15,5 +15,23 @@ pub trait Transport: Send + Sync {
|
||||
/// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being
|
||||
/// invisible. `Err` = a real send failure.
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<bool>;
|
||||
|
||||
/// Send a whole frame's packets in as few syscalls as possible, returning how many were
|
||||
/// handed to the kernel (the caller counts `packets.len() - sent` as send-buffer drops). This
|
||||
/// is the 1 Gbps+ lever: the [`UdpTransport`](super::UdpTransport) override uses `sendmmsg`
|
||||
/// (~64 packets/syscall) instead of one `send` each — at ~125k pkt/s that is the difference
|
||||
/// between ~2k and ~125k syscalls/sec. The default is the scalar `send` loop (correct for the
|
||||
/// loopback transport and non-Linux builds). On a full send buffer it stops early and reports
|
||||
/// the partial count rather than blocking — same lossy, FEC-protected contract as `send`.
|
||||
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
|
||||
let mut sent = 0;
|
||||
for p in packets {
|
||||
if self.send(p)? {
|
||||
sent += 1;
|
||||
}
|
||||
}
|
||||
Ok(sent)
|
||||
}
|
||||
|
||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
//! Real UDP datagram transport — native sockets, no async runtime.
|
||||
//!
|
||||
//! M1 uses one `recv` syscall per packet; the latency budget (§7) calls for
|
||||
//! `sendmmsg`/UDP-GSO batching to cut syscalls, which is a P2 optimization layered on
|
||||
//! this same [`Transport`] seam.
|
||||
//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the
|
||||
//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per
|
||||
//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining
|
||||
//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam.
|
||||
|
||||
use super::Transport;
|
||||
use crate::packet::MAX_DATAGRAM_BYTES;
|
||||
@@ -81,6 +82,54 @@ impl Transport for UdpTransport {
|
||||
}
|
||||
}
|
||||
|
||||
/// Batched send via `sendmmsg` (up to 64 datagrams per syscall) — the connected socket needs
|
||||
/// no per-message address. The socket is non-blocking, so a full send buffer surfaces as a
|
||||
/// short count (or `EAGAIN` with nothing sent); we stop and report what went out rather than
|
||||
/// block or retry — the data plane is lossy + FEC-protected, and blocking would queue stale
|
||||
/// frames + add latency. Ports the proven GameStream `sendmmsg_all`. Non-Linux falls back to
|
||||
/// the trait's scalar `send` loop (no `sendmmsg`).
|
||||
#[cfg(target_os = "linux")]
|
||||
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
|
||||
use std::os::fd::AsRawFd;
|
||||
const CHUNK: usize = 64;
|
||||
let fd = self.socket.as_raw_fd();
|
||||
let mut total_sent = 0usize;
|
||||
for chunk in packets.chunks(CHUNK) {
|
||||
// `hdrs` borrow `iovs` by raw pointer; both stay alive through the `sendmmsg` call.
|
||||
let mut iovs: Vec<libc::iovec> = chunk
|
||||
.iter()
|
||||
.map(|p| libc::iovec {
|
||||
iov_base: p.as_ptr() as *mut libc::c_void,
|
||||
iov_len: p.len(),
|
||||
})
|
||||
.collect();
|
||||
let mut hdrs: Vec<libc::mmsghdr> = iovs
|
||||
.iter_mut()
|
||||
.map(|iov| {
|
||||
let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() };
|
||||
h.msg_hdr.msg_iov = iov;
|
||||
h.msg_hdr.msg_iovlen = 1;
|
||||
h
|
||||
})
|
||||
.collect();
|
||||
let n = unsafe { libc::sendmmsg(fd, hdrs.as_mut_ptr(), hdrs.len() as libc::c_uint, 0) };
|
||||
if n < 0 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
// Nothing fit in the send buffer — drop this + the remaining chunks (counted by
|
||||
// the caller). A real error (not WouldBlock) still tears the session down.
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
break;
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
total_sent += n as usize;
|
||||
if (n as usize) < chunk.len() {
|
||||
break; // buffer filled mid-chunk — drop the remainder
|
||||
}
|
||||
}
|
||||
Ok(total_sent)
|
||||
}
|
||||
|
||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
|
||||
let mut buf = vec![0u8; RECV_BUF];
|
||||
match self.socket.recv(&mut buf) {
|
||||
@@ -96,3 +145,56 @@ impl Transport for UdpTransport {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::transport::Transport;
|
||||
|
||||
/// `send_batch` delivers a whole frame's worth of packets over real loopback UDP — exercising
|
||||
/// the `sendmmsg` path on Linux (the scalar-loop default elsewhere). 100 × 200 B = 20 KB fits
|
||||
/// the socket buffer, so loopback is lossless and every packet must arrive intact + in order.
|
||||
#[test]
|
||||
fn send_batch_delivers_over_loopback() {
|
||||
let rx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
rx.set_read_timeout(Some(std::time::Duration::from_millis(500)))
|
||||
.unwrap();
|
||||
let rx_addr = rx.local_addr().unwrap().to_string();
|
||||
let tx = UdpTransport::connect("127.0.0.1:0", &rx_addr).unwrap();
|
||||
|
||||
const N: u32 = 100;
|
||||
let payloads: Vec<Vec<u8>> = (0..N)
|
||||
.map(|i| {
|
||||
let mut v = vec![0u8; 200];
|
||||
v[0..4].copy_from_slice(&i.to_le_bytes());
|
||||
v
|
||||
})
|
||||
.collect();
|
||||
let refs: Vec<&[u8]> = payloads.iter().map(|p| p.as_slice()).collect();
|
||||
let sent = tx.send_batch(&refs).unwrap();
|
||||
assert_eq!(
|
||||
sent, N as usize,
|
||||
"send_batch should hand all packets to the kernel"
|
||||
);
|
||||
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut buf = [0u8; 2048];
|
||||
while seen.len() < N as usize {
|
||||
match rx.recv(&mut buf) {
|
||||
Ok(n) => {
|
||||
assert_eq!(
|
||||
n, 200,
|
||||
"datagram boundaries preserved (one packet per recv)"
|
||||
);
|
||||
seen.insert(u32::from_le_bytes(buf[0..4].try_into().unwrap()));
|
||||
}
|
||||
Err(_) => break, // read timeout — stop and let the assert report the shortfall
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
seen.len(),
|
||||
N as usize,
|
||||
"every batched packet should arrive over loopback"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user