diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index c7801d1..e029a45 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -16,6 +16,21 @@ use std::net::UdpSocket; /// silently truncating it. const RECV_BUF: usize = MAX_DATAGRAM_BYTES + 1; +/// True for transient socket conditions that must be a lossy drop / "no data this poll" — NOT a +/// stream teardown. Two cases: +/// - `WouldBlock`: the kernel send/recv buffer is momentarily full (a frame burst saturated the tx +/// queue — the dominant condition at 1 Gbps+). Drop the packet; FEC + the next frame recover. +/// - `ConnectionRefused` / `ConnectionReset`: a *connected* UDP socket received an asynchronous ICMP +/// port-unreachable / reset for an *earlier* datagram. With data-plane hole-punching the path +/// blips — the peer's data socket briefly gone, a NAT rebind, or a stale ICMP from punch setup — +/// so erroring out here kills a stream that the very next packet would resume. If the peer is +/// genuinely gone, the QUIC control plane times out and ends the session cleanly instead. (This is +/// the classic connected-UDP "ICMP errors are advisory" rule, doubly true with hole-punching.) +fn is_transient_io(e: &std::io::Error) -> bool { + use std::io::ErrorKind::{ConnectionRefused, ConnectionReset, WouldBlock}; + matches!(e.kind(), WouldBlock | ConnectionRefused | ConnectionReset) +} + /// Build one `mmsghdr` per `iovec` (each a single-buffer message) for `sendmmsg`/`recvmmsg`. Shared /// by `send_batch` + `recv_batch` so the raw-pointer scaffolding lives in exactly one place. /// @@ -347,7 +362,7 @@ impl UdpTransport { }; if n < 0 { let err = std::io::Error::last_os_error(); - if err.kind() == std::io::ErrorKind::WouldBlock { + if is_transient_io(&err) { return Ok(0); } return Err(err); @@ -371,7 +386,7 @@ impl Transport for UdpTransport { // latency, and erroring tears the session down. `Ok(false)` surfaces the drop so the // session counts it (`packets_send_dropped`) instead of it being invisible. Mirrors // the `recv` WouldBlock handling above. - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false), + Err(e) if is_transient_io(&e) => Ok(false), Err(e) => Err(e), } } @@ -401,9 +416,10 @@ impl Transport for UdpTransport { let n = unsafe { libc::sendmmsg(fd, hdrs.as_mut_ptr(), hdrs.len() as libc::c_uint, 0) }; if n < 0 { let err = std::io::Error::last_os_error(); - // Nothing fit in the send buffer — drop this + the remaining chunks (counted by - // the caller). A real error (not WouldBlock) still tears the session down. - if err.kind() == std::io::ErrorKind::WouldBlock { + // Nothing fit in the send buffer (or a stale ICMP from a connected-socket blip) — + // drop this + the remaining chunks (counted by the caller). Only a genuine error + // tears the session down; transient conditions are lossy drops (see is_transient_io). + if is_transient_io(&err) { break; } return Err(err); @@ -449,8 +465,9 @@ impl Transport for UdpTransport { } match send_one_gso(fd, &scratch, seg as u16) { Ok(()) => sent += chunk.len(), - // Send buffer momentarily full — drop the rest (counted by the caller), never block. - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + // Send buffer momentarily full, or a stale ICMP from a connected-socket blip — drop + // the rest (counted by the caller), never block, never tear down (see is_transient_io). + Err(e) if is_transient_io(&e) => break, // GSO unsupported on this kernel/path — latch off and finish via sendmmsg. Err(e) if gso_unsupported(&e) => { gso::disable(); @@ -472,7 +489,7 @@ impl Transport for UdpTransport { buf.truncate(n); Ok(Some(buf)) } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None), + Err(e) if is_transient_io(&e) => Ok(None), Err(e) => Err(e), } } @@ -511,7 +528,7 @@ impl Transport for UdpTransport { }; if n < 0 { let err = std::io::Error::last_os_error(); - if err.kind() == std::io::ErrorKind::WouldBlock { + if is_transient_io(&err) { return Ok(0); } return Err(err); @@ -557,8 +574,8 @@ impl Transport for UdpTransport { }; if r < 0 { let err = std::io::Error::last_os_error(); - if err.kind() == std::io::ErrorKind::WouldBlock { - break; // socket drained + if is_transient_io(&err) { + break; // socket drained, or a stale connected-socket ICMP — no data this poll } if got > 0 { break; // report what we have; surface the error on the next empty poll @@ -577,6 +594,26 @@ mod tests { use super::*; use crate::transport::Transport; + /// A connected UDP socket's stale ICMP (ECONNREFUSED/RESET) and a full buffer (EAGAIN) must all + /// be classified transient — a lossy drop, never a stream teardown. A real error must not be. + #[test] + fn transient_io_covers_connected_udp_blips() { + use std::io::{Error, ErrorKind}; + for k in [ + ErrorKind::WouldBlock, + ErrorKind::ConnectionRefused, + ErrorKind::ConnectionReset, + ] { + assert!( + is_transient_io(&Error::from(k)), + "{k:?} should be transient" + ); + } + for k in [ErrorKind::PermissionDenied, ErrorKind::AddrInUse] { + assert!(!is_transient_io(&Error::from(k)), "{k:?} must stay fatal"); + } + } + /// `send_one_gso` must split one buffer into N separate UDP datagrams of `gso_size` bytes each /// (the kernel UDP GSO segmentation) — the multi-Gbps send lever. Loopback supports GSO; if the /// CI kernel doesn't, skip rather than fail.