From c83024603755c2b98faf649c192465fca9749736 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Mon, 15 Jun 2026 16:52:58 +0000 Subject: [PATCH] feat(host/windows): UDP send offload + NVENC 2-way split-encode (1 Gbps+ / 5K@240) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Windows host couldn't sustain high-throughput / high-fps streams — two gaps vs the Linux host, both found via live RTX 4090 measurement (PERF timing + nvidia-smi per-engine attribution): - UDP Send Offload (USO). punktfunk-core's UdpTransport sent one packet per `send` syscall on Windows (send_batch/send_gso were Linux-only), capping throughput at high packet rates. Add a Windows `send_gso` override using `WSASendMsg` + `UDP_SEND_MSG_SIZE` (the Windows analogue of Linux UDP GSO) via windows-sys — one syscall segments a coalesced <=512-segment super-buffer to the connected peer. On by default with auto-fallback (PUNKTFUNK_GSO=0 disables, error latches off); plugs into the existing paced send path. SO_SNDBUF (32MB) was already cross-platform. - NVENC 2-way split-frame encoding. A single Ada NVENC session tops out ~0.8 Gpix/s, so 5K@240 (1.77 Gpix/s) took ~8 ms/frame -> a ~125 fps ceiling at high motion (the in-game stutter). Set NV_ENC_INITIALIZE_PARAMS.splitEncodeMode = TWO_FORCED above ~1 Gpix/s (matching the Linux libavcodec split_encode_mode path) to use both 4090 encoders — measured ~8 ms -> ~4 ms/frame at throughput. Env override PUNKTFUNK_SPLIT_ENCODE; init-failure fallback disables it (e.g. H264). Windows-only paths; Linux/macOS unaffected. Builds clean on x86_64-pc-windows-msvc. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 10 ++ crates/punktfunk-core/Cargo.toml | 9 ++ crates/punktfunk-core/src/transport/udp.rs | 157 +++++++++++++++++++++ crates/punktfunk-host/src/encode/nvenc.rs | 38 +++++ 4 files changed, 214 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 98e3cd8..8c69330 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2609,6 +2609,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "windows-sys 0.59.0", "zerocopy", "zeroize", ] @@ -4375,6 +4376,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/crates/punktfunk-core/Cargo.toml b/crates/punktfunk-core/Cargo.toml index 72c3811..fac14bf 100644 --- a/crates/punktfunk-core/Cargo.toml +++ b/crates/punktfunk-core/Cargo.toml @@ -58,6 +58,15 @@ tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", [target.'cfg(unix)'.dependencies] libc = "0.2" +# Windows UDP Send Offload (USO): `WSASendMsg` + `UDP_SEND_MSG_SIZE` is the Windows analogue of +# Linux UDP GSO — the 1 Gbps+ send lever (the host otherwise sends one packet per `send` syscall, +# which caps throughput at high packet rates). See transport/udp.rs. +[target.'cfg(windows)'.dependencies] +# windows-sys (raw FFI, the quinn-udp choice): the high-level `windows` crate doesn't bind the +# `WSASendMsg` extension function. WinSock feature gives WSASendMsg + WSAMSG/WSABUF/CMSGHDR. +# Win32_System_IO too: WSASendMsg's signature references OVERLAPPED, so it's gated on that feature. +windows-sys = { version = "0.59", features = ["Win32_Networking_WinSock", "Win32_System_IO"] } + [dev-dependencies] proptest = "1" # Tier-1 microbenchmarks (benches/pipeline.rs). default-features off → no plotters/HTML (headless diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index 041b7f3..67bbbde 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -131,6 +131,118 @@ fn send_one_gso(fd: libc::c_int, buf: &[u8], gso_size: u16) -> std::io::Result<( Ok(()) } +/// Windows UDP Send Offload (USO) enable state (process-wide). The Windows analogue of Linux UDP +/// GSO: `WSASendMsg` + `UDP_SEND_MSG_SIZE`. **On by default** (the 1 Gbps+ send lever — Windows +/// otherwise does one `send` syscall per packet, which caps throughput at high packet rates). Kill +/// switch `PUNKTFUNK_GSO=0`; auto-fallback latches it off the first time a send reports it +/// unsupported (old OS / NIC / path). We detect support from the send error rather than a +/// `setsockopt` probe — the probe sets a socket-wide default segment size that would fragment plain +/// `send`s of larger-than-segment packets. +#[cfg(target_os = "windows")] +mod uso { + use std::sync::atomic::{AtomicU8, Ordering}; + static STATE: AtomicU8 = AtomicU8::new(0); // 0 = uninit, 1 = on, 2 = off + + pub fn active() -> bool { + match STATE.load(Ordering::Relaxed) { + 1 => true, + 2 => false, + _ => { + let off = std::env::var_os("PUNKTFUNK_GSO") + .map(|v| v == "0") + .unwrap_or(false); + STATE.store(if off { 2 } else { 1 }, Ordering::Relaxed); + tracing::info!( + enabled = !off, + "Windows UDP Send Offload (USO): {} (the 1 Gbps+ send lever; PUNKTFUNK_GSO=0 disables)", + if off { "off" } else { "on" } + ); + !off + } + } + } + /// Latch USO off for the process after a send that means it isn't usable on this OS/NIC/path. + pub fn disable() { + if STATE.swap(2, Ordering::Relaxed) != 2 { + tracing::warn!("Windows USO unsupported on this path — falling back to per-packet sends"); + } + } +} + +/// True if a `WSASendMsg` USO error means USO isn't usable here (vs a transient full-buffer +/// `WouldBlock`, handled by [`is_transient_io`]) — latch it off and fall back to per-packet sends. +/// 10022 WSAEINVAL, 10042 WSAENOPROTOOPT, 10045 WSAEOPNOTSUPP, 10040 WSAEMSGSIZE. +#[cfg(target_os = "windows")] +fn uso_unsupported(e: &std::io::Error) -> bool { + matches!( + e.raw_os_error(), + Some(10022) | Some(10042) | Some(10045) | Some(10040) + ) +} + +/// One `WSASendMsg` carrying a `UDP_SEND_MSG_SIZE` control message: Winsock splits `buf` (a +/// back-to-back concatenation of equal-size datagrams, only the final one allowed shorter) into +/// `seg_size`-byte UDP datagrams to the connected peer in ONE syscall — the analogue of +/// [`send_one_gso`]. The `WSA_CMSG_*` helpers are C macros not exported by the `windows` crate, so +/// the cmsg layout math is reimplemented here (ported from quinn-udp's Windows backend). +#[cfg(target_os = "windows")] +fn send_one_uso(socket: &std::net::UdpSocket, buf: &[u8], seg_size: u16) -> std::io::Result<()> { + use std::os::windows::io::AsRawSocket; + use windows_sys::Win32::Networking::WinSock::{ + WSASendMsg, CMSGHDR, IPPROTO_UDP, UDP_SEND_MSG_SIZE, WSABUF, WSAMSG, + }; + let align_usize = std::mem::align_of::(); + let align_hdr = std::mem::align_of::(); + let cmsgdata_align = |n: usize| (n + align_usize - 1) & !(align_usize - 1); + let cmsghdr_align = |n: usize| (n + align_hdr - 1) & !(align_hdr - 1); + let hdr = std::mem::size_of::(); + + // 8-byte-aligned control buffer; 32 B holds one u32 cmsg (WSA_CMSG_SPACE(4) = 24 on x64). + #[repr(align(8))] + struct Aligned([u8; 32]); + let mut ctrl = Aligned([0u8; 32]); + + let mut data = WSABUF { + len: buf.len() as u32, + buf: buf.as_ptr() as *mut u8, // WSASendMsg only reads it + }; + let mut msg = WSAMSG { + name: std::ptr::null_mut(), + namelen: 0, + lpBuffers: &mut data, + dwBufferCount: 1, + Control: WSABUF { + len: 0, + buf: ctrl.0.as_mut_ptr(), + }, + dwFlags: 0, + }; + let cmsg_len = cmsgdata_align(hdr) + std::mem::size_of::(); // WSA_CMSG_LEN(4) + let space = cmsgdata_align(hdr + cmsghdr_align(std::mem::size_of::())); // WSA_CMSG_SPACE(4) + unsafe { + let cmsg = ctrl.0.as_mut_ptr() as *mut CMSGHDR; + (*cmsg).cmsg_len = cmsg_len; + (*cmsg).cmsg_level = IPPROTO_UDP; + (*cmsg).cmsg_type = UDP_SEND_MSG_SIZE; + let data_ptr = (cmsg as usize + cmsgdata_align(hdr)) as *mut u32; + std::ptr::write_unaligned(data_ptr, seg_size as u32); + msg.Control.len = space as u32; + let mut sent = 0u32; + let rc = WSASendMsg( + socket.as_raw_socket() as usize, + &msg, + 0, + &mut sent, + std::ptr::null_mut(), + None, + ); + if rc != 0 { + return Err(std::io::Error::last_os_error()); + } + } + Ok(()) +} + /// Apple (macOS/iOS) batched-receive enable state. Darwin has no `recvmmsg(2)`, so without this our /// macOS client does one `recv` syscall per packet — at a few hundred Mbps that's ~40-90k syscalls/s /// on one core, and when the recv loop can't drain fast enough the kernel socket buffer backs up and @@ -487,6 +599,51 @@ impl Transport for UdpTransport { Ok(sent) } + /// UDP USO send (see [`Transport::send_gso`]) — Windows. Coalesces the frame's equal-size packets + /// and hands Winsock ≤512-segment super-buffers via `WSASendMsg(UDP_SEND_MSG_SIZE)` — one syscall + /// per chunk instead of one `send` per packet, the 1 Gbps+ lever (Windows analogue of Linux GSO). + /// On by default (kill: `PUNKTFUNK_GSO=0`); falls back to the scalar `send_batch` when off, when + /// packets aren't uniform-size, or on a USO-unsupported error (which latches it off for the + /// process). Same lossy short-count contract. + #[cfg(target_os = "windows")] + fn send_gso(&self, packets: &[&[u8]]) -> std::io::Result { + if packets.is_empty() { + return Ok(0); + } + if !uso::active() { + return self.send_batch(packets); + } + // USO needs every segment but the last to be exactly `seg` bytes (same as Linux GSO). + let seg = packets[0].len(); + let last = packets.len() - 1; + if seg == 0 || packets[..last].iter().any(|p| p.len() != seg) || packets[last].len() > seg { + return self.send_batch(packets); + } + // Win11 x64 accepts up to ~512 segments per WSASendMsg. + let max_seg = 512usize; + let mut scratch: Vec = Vec::with_capacity(seg * packets.len().min(max_seg)); + let mut sent = 0usize; + for chunk in packets.chunks(max_seg) { + scratch.clear(); + for p in chunk { + scratch.extend_from_slice(p); + } + match send_one_uso(&self.socket, &scratch, seg as u16) { + Ok(()) => sent += chunk.len(), + // Send buffer momentarily full / connected-socket ICMP blip — drop the rest, never + // block, never tear down (see is_transient_io). + Err(e) if is_transient_io(&e) => break, + // USO unsupported on this OS/NIC/path — latch off and finish via scalar send_batch. + Err(e) if uso_unsupported(&e) => { + uso::disable(); + return Ok(sent + self.send_batch(&packets[sent..])?); + } + Err(e) => return Err(e), + } + } + Ok(sent) + } + fn recv(&self) -> std::io::Result>> { let mut buf = vec![0u8; RECV_BUF]; match self.socket.recv(&mut buf) { diff --git a/crates/punktfunk-host/src/encode/nvenc.rs b/crates/punktfunk-host/src/encode/nvenc.rs index 1bbf5d5..45f6d21 100644 --- a/crates/punktfunk-host/src/encode/nvenc.rs +++ b/crates/punktfunk-host/src/encode/nvenc.rs @@ -140,6 +140,28 @@ impl NvencD3d11Encoder { const FLOOR_BPS: u64 = 10_000_000; let requested_bps = self.bitrate_bps; let mut bitrate = self.bitrate_bps; + // 2-way NVENC split-frame encoding (Ada dual-NVENC) — the high-pixel-rate throughput lever + // the Linux host enables via libavcodec `split_encode_mode`. A single Ada NVENC session tops + // out ~0.8 Gpix/s, so at high motion a 5K@240 (1.77 Gpix/s) frame takes ~8 ms to encode and + // the rate caps ~125 fps; splitting across both engines roughly halves that. Force 2-way + // above ~1 Gpix/s (matching encode/linux.rs), AUTO below (the ~2% BD-rate cost isn't worth + // it at low pixel rates). Env override PUNKTFUNK_SPLIT_ENCODE = 0/disable | 1/auto | 2 | 3. + // HEVC/AV1 only; the init-failure fallback below disables it if a codec/config rejects it. + let pixel_rate = self.width as u64 * self.height as u64 * self.fps.max(1) as u64; + let mut split_mode: u32 = match std::env::var("PUNKTFUNK_SPLIT_ENCODE").ok().as_deref() { + Some("0") | Some("disable") => { + nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32 + } + Some("1") | Some("auto") => { + nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_AUTO_FORCED_MODE as u32 + } + Some("3") => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_THREE_FORCED_MODE as u32, + Some("2") => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_TWO_FORCED_MODE as u32, + _ if pixel_rate > 1_000_000_000 => { + nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_TWO_FORCED_MODE as u32 + } + _ => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_AUTO_MODE as u32, + }; let enc = loop { // 1. open the session bound to the D3D11 device. let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS { @@ -205,6 +227,9 @@ impl NvencD3d11Encoder { encodeConfig: &mut cfg, ..Default::default() }; + // splitEncodeMode is a C bitfield — set via the generated accessor, not a struct field. + init.set_splitEncodeMode(split_mode); + match (API.initialize_encoder)(enc, &mut init).result_without_string() { Ok(()) => { self.bitrate_bps = bitrate; @@ -222,6 +247,19 @@ impl NvencD3d11Encoder { bitrate = next; continue; } + // Last resort at the floor bitrate: if split-encode was forced and init still + // fails, the codec/config may not accept it (e.g. H264) — disable split and retry + // single-engine rather than fail the session. + Err(e) + if split_mode != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_AUTO_MODE as u32 + && split_mode + != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32 => + { + let _ = (API.destroy_encoder)(enc); + tracing::warn!(error = ?e, "NVENC init rejected with split-encode forced — disabling split, retrying single-engine"); + split_mode = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32; + continue; + } Err(e) => { let _ = (API.destroy_encoder)(enc); return Err(anyhow!("initialize_encoder: {e:?} (even at {} Mbps floor)", FLOOR_BPS / 1_000_000));