First increment of the 1 Gbps send-path rework (the measured bottleneck): the native data plane did one send() syscall per packet — at ~125k pkt/s (1 Gbps wire) that burns a core on syscalls. Port the proven GameStream sendmmsg path into the core Transport seam. - Transport gains `send_batch(&[&[u8]]) -> usize` (count handed to the kernel; caller counts the rest as send-buffer drops). Default = the scalar send loop (loopback transport + non-Linux). - UdpTransport overrides it on Linux with `sendmmsg` (64 datagrams/syscall); the connected socket needs no per-message address. Non-blocking-aware: a full send buffer yields a short count / EAGAIN, and we stop + report what went out rather than block or retry (same lossy, FEC-protected contract as send()). - Session::submit_frame seals every shard then hands the whole frame to send_batch in ONE call instead of looping send() — ~64x fewer syscalls per frame on the native + GameStream-over-core paths; send_dropped accounting preserved (total - sent). ~125k → ~2k syscalls/sec at 1 Gbps line rate. Verified: new loopback-UDP test send_batch_delivers_over_loopback (100 batched packets arrive intact, datagram boundaries preserved); full core suite + clippy + fmt green. Next increments: a paced send thread (microburst shaping so a real NIC doesn't drop line-rate bursts) and recvmmsg on the client. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,11 @@ hmac = { version = "0.12", optional = true }
|
|||||||
spake2 = { version = "0.4", optional = true }
|
spake2 = { version = "0.4", optional = true }
|
||||||
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] }
|
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] }
|
||||||
|
|
||||||
|
# `sendmmsg` batched UDP send (the 1 Gbps+ syscall lever) — Linux only; other targets use the
|
||||||
|
# scalar `send` loop fallback.
|
||||||
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
|
libc = "0.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
proptest = "1"
|
proptest = "1"
|
||||||
|
|
||||||
|
|||||||
@@ -118,13 +118,22 @@ impl Session {
|
|||||||
.packetizer
|
.packetizer
|
||||||
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
||||||
StatsCounters::add(&self.stats.frames_submitted, 1);
|
StatsCounters::add(&self.stats.frames_submitted, 1);
|
||||||
for pkt in packets {
|
// Seal every shard (the nonce counter advances per packet, in order), then hand the whole
|
||||||
let wire = self.seal_for_wire(&pkt)?;
|
// frame to the transport in ONE batched call — `sendmmsg` does ~64 packets/syscall instead
|
||||||
StatsCounters::add(&self.stats.packets_sent, 1);
|
// of a `send` each, the dominant 1 Gbps+ lever. (Sealing must finish before the immutable
|
||||||
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
|
// `send_batch` borrow; collecting the wires also keeps them alive for the batch's iovecs.)
|
||||||
if !self.transport.send(&wire)? {
|
let mut wires: Vec<Vec<u8>> = Vec::with_capacity(packets.len());
|
||||||
StatsCounters::add(&self.stats.packets_send_dropped, 1);
|
for pkt in &packets {
|
||||||
}
|
wires.push(self.seal_for_wire(pkt)?);
|
||||||
|
}
|
||||||
|
let total = wires.len();
|
||||||
|
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
|
||||||
|
StatsCounters::add(&self.stats.packets_sent, total as u64);
|
||||||
|
StatsCounters::add(&self.stats.bytes_sent, bytes);
|
||||||
|
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||||||
|
let sent = self.transport.send_batch(&refs)?;
|
||||||
|
if sent < total {
|
||||||
|
StatsCounters::add(&self.stats.packets_send_dropped, (total - sent) as u64);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,5 +15,23 @@ pub trait Transport: Send + Sync {
|
|||||||
/// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being
|
/// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being
|
||||||
/// invisible. `Err` = a real send failure.
|
/// invisible. `Err` = a real send failure.
|
||||||
fn send(&self, packet: &[u8]) -> std::io::Result<bool>;
|
fn send(&self, packet: &[u8]) -> std::io::Result<bool>;
|
||||||
|
|
||||||
|
/// Send a whole frame's packets in as few syscalls as possible, returning how many were
|
||||||
|
/// handed to the kernel (the caller counts `packets.len() - sent` as send-buffer drops). This
|
||||||
|
/// is the 1 Gbps+ lever: the [`UdpTransport`](super::UdpTransport) override uses `sendmmsg`
|
||||||
|
/// (~64 packets/syscall) instead of one `send` each — at ~125k pkt/s that is the difference
|
||||||
|
/// between ~2k and ~125k syscalls/sec. The default is the scalar `send` loop (correct for the
|
||||||
|
/// loopback transport and non-Linux builds). On a full send buffer it stops early and reports
|
||||||
|
/// the partial count rather than blocking — same lossy, FEC-protected contract as `send`.
|
||||||
|
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
|
||||||
|
let mut sent = 0;
|
||||||
|
for p in packets {
|
||||||
|
if self.send(p)? {
|
||||||
|
sent += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(sent)
|
||||||
|
}
|
||||||
|
|
||||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
|
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
//! Real UDP datagram transport — native sockets, no async runtime.
|
//! Real UDP datagram transport — native sockets, no async runtime.
|
||||||
//!
|
//!
|
||||||
//! M1 uses one `recv` syscall per packet; the latency budget (§7) calls for
|
//! Send is batched via `sendmmsg` ([`Transport::send_batch`], up to 64 datagrams/syscall) — the
|
||||||
//! `sendmmsg`/UDP-GSO batching to cut syscalls, which is a P2 optimization layered on
|
//! 1 Gbps+ syscall lever (~125k → ~2k syscalls/sec at line rate). Recv is still one syscall per
|
||||||
//! this same [`Transport`] seam.
|
//! packet; a `recvmmsg` batch on the client + a paced send thread on the host are the remaining
|
||||||
|
//! steps of the 1 Gbps data-plane work, layered on this same [`Transport`] seam.
|
||||||
|
|
||||||
use super::Transport;
|
use super::Transport;
|
||||||
use crate::packet::MAX_DATAGRAM_BYTES;
|
use crate::packet::MAX_DATAGRAM_BYTES;
|
||||||
@@ -81,6 +82,54 @@ impl Transport for UdpTransport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Batched send via `sendmmsg` (up to 64 datagrams per syscall) — the connected socket needs
|
||||||
|
/// no per-message address. The socket is non-blocking, so a full send buffer surfaces as a
|
||||||
|
/// short count (or `EAGAIN` with nothing sent); we stop and report what went out rather than
|
||||||
|
/// block or retry — the data plane is lossy + FEC-protected, and blocking would queue stale
|
||||||
|
/// frames + add latency. Ports the proven GameStream `sendmmsg_all`. Non-Linux falls back to
|
||||||
|
/// the trait's scalar `send` loop (no `sendmmsg`).
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn send_batch(&self, packets: &[&[u8]]) -> std::io::Result<usize> {
|
||||||
|
use std::os::fd::AsRawFd;
|
||||||
|
const CHUNK: usize = 64;
|
||||||
|
let fd = self.socket.as_raw_fd();
|
||||||
|
let mut total_sent = 0usize;
|
||||||
|
for chunk in packets.chunks(CHUNK) {
|
||||||
|
// `hdrs` borrow `iovs` by raw pointer; both stay alive through the `sendmmsg` call.
|
||||||
|
let mut iovs: Vec<libc::iovec> = chunk
|
||||||
|
.iter()
|
||||||
|
.map(|p| libc::iovec {
|
||||||
|
iov_base: p.as_ptr() as *mut libc::c_void,
|
||||||
|
iov_len: p.len(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let mut hdrs: Vec<libc::mmsghdr> = iovs
|
||||||
|
.iter_mut()
|
||||||
|
.map(|iov| {
|
||||||
|
let mut h: libc::mmsghdr = unsafe { std::mem::zeroed() };
|
||||||
|
h.msg_hdr.msg_iov = iov;
|
||||||
|
h.msg_hdr.msg_iovlen = 1;
|
||||||
|
h
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let n = unsafe { libc::sendmmsg(fd, hdrs.as_mut_ptr(), hdrs.len() as libc::c_uint, 0) };
|
||||||
|
if n < 0 {
|
||||||
|
let err = std::io::Error::last_os_error();
|
||||||
|
// Nothing fit in the send buffer — drop this + the remaining chunks (counted by
|
||||||
|
// the caller). A real error (not WouldBlock) still tears the session down.
|
||||||
|
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
total_sent += n as usize;
|
||||||
|
if (n as usize) < chunk.len() {
|
||||||
|
break; // buffer filled mid-chunk — drop the remainder
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(total_sent)
|
||||||
|
}
|
||||||
|
|
||||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
|
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
|
||||||
let mut buf = vec![0u8; RECV_BUF];
|
let mut buf = vec![0u8; RECV_BUF];
|
||||||
match self.socket.recv(&mut buf) {
|
match self.socket.recv(&mut buf) {
|
||||||
@@ -96,3 +145,56 @@ impl Transport for UdpTransport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::transport::Transport;
|
||||||
|
|
||||||
|
/// `send_batch` delivers a whole frame's worth of packets over real loopback UDP — exercising
|
||||||
|
/// the `sendmmsg` path on Linux (the scalar-loop default elsewhere). 100 × 200 B = 20 KB fits
|
||||||
|
/// the socket buffer, so loopback is lossless and every packet must arrive intact + in order.
|
||||||
|
#[test]
|
||||||
|
fn send_batch_delivers_over_loopback() {
|
||||||
|
let rx = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
rx.set_read_timeout(Some(std::time::Duration::from_millis(500)))
|
||||||
|
.unwrap();
|
||||||
|
let rx_addr = rx.local_addr().unwrap().to_string();
|
||||||
|
let tx = UdpTransport::connect("127.0.0.1:0", &rx_addr).unwrap();
|
||||||
|
|
||||||
|
const N: u32 = 100;
|
||||||
|
let payloads: Vec<Vec<u8>> = (0..N)
|
||||||
|
.map(|i| {
|
||||||
|
let mut v = vec![0u8; 200];
|
||||||
|
v[0..4].copy_from_slice(&i.to_le_bytes());
|
||||||
|
v
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let refs: Vec<&[u8]> = payloads.iter().map(|p| p.as_slice()).collect();
|
||||||
|
let sent = tx.send_batch(&refs).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
sent, N as usize,
|
||||||
|
"send_batch should hand all packets to the kernel"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut seen = std::collections::HashSet::new();
|
||||||
|
let mut buf = [0u8; 2048];
|
||||||
|
while seen.len() < N as usize {
|
||||||
|
match rx.recv(&mut buf) {
|
||||||
|
Ok(n) => {
|
||||||
|
assert_eq!(
|
||||||
|
n, 200,
|
||||||
|
"datagram boundaries preserved (one packet per recv)"
|
||||||
|
);
|
||||||
|
seen.insert(u32::from_le_bytes(buf[0..4].try_into().unwrap()));
|
||||||
|
}
|
||||||
|
Err(_) => break, // read timeout — stop and let the assert report the shortfall
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
seen.len(),
|
||||||
|
N as usize,
|
||||||
|
"every batched packet should arrive over loopback"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user