feat(1gbps): raise bitrate/probe clamps + socket buffers, count send-buffer drops
ci / rust (push) Has been cancelled
ci / rust (push) Has been cancelled
First step of 1 Gbps+ readiness (the whole point of the GF(2^16) Leopard FEC): make 1 Gbps configurable and its dominant failure mode observable, before the real transport work (sendmmsg + paced encode|send split) lands. Investigation (6-way) verdict: we're ~halfway, and it's mostly clamps plus one real piece of work. The integer/type path, FEC (a 1 Gbps frame is only a few hundred shards in one GF(2^16) block, far under the 65535 ceiling), AES-GCM (AES-NI, ~10-25x headroom), and the M1 reassembler bounds (fully derived from the negotiated FecConfig) are ALL already 1 Gbps-ready and untouched. This commit (the configurable + observable foundation): - m3.rs: MAX_BITRATE_KBPS 500_000 -> 2_000_000 (2 Gbps headroom over the 1 Gbps+ target); MAX_PROBE_KBPS 1_000_000 -> 3_000_000 (probe can demonstrate headroom ABOVE the session cap so a client can confidently pick a 1 Gbps+ bitrate). - transport/udp.rs: TARGET_SOCKBUF 8 MB -> 32 MB (a multi-MB IDR keyframe burst no longer fills the buffer); scripts/99-punktfunk-net.conf bumped to match. - Observability: Transport::send now returns Ok(true|false) (false = WouldBlock send-buffer drop, previously a silent Ok(())). Session counts these as a new `packets_send_dropped` stat (distinct from recv-side packets_dropped) — in Stats, the C ABI PunktfunkStats (header regenerated), a PUNKTFUNK_PERF periodic wire-Mbps + drop dump in virtual_stream, and the speed-test probe completion log. This is the dominant 1 Gbps+ loss mode and was invisible. Loopback-verified: a probe now runs at 1.2 Gbps target (no longer truncated to 1 Gbps) with the drop counter live. NOT yet a sustained-1-Gbps proof — the single-send()-per-packet native path is the next, real piece of work (port the proven GameStream sendmmsg + paced send thread into the core Transport). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,6 +136,9 @@ pub struct PunktfunkStats {
|
||||
pub packets_sent: u64,
|
||||
pub packets_received: u64,
|
||||
pub packets_dropped: u64,
|
||||
/// Packets dropped on the host send path because the kernel buffer was full (WouldBlock) — the
|
||||
/// dominant loss mode at very high bitrate; distinct from `packets_dropped` (recv-side).
|
||||
pub packets_send_dropped: u64,
|
||||
pub fec_recovered_shards: u64,
|
||||
pub bytes_sent: u64,
|
||||
pub bytes_received: u64,
|
||||
@@ -150,6 +153,7 @@ impl From<Stats> for PunktfunkStats {
|
||||
packets_sent: s.packets_sent,
|
||||
packets_received: s.packets_received,
|
||||
packets_dropped: s.packets_dropped,
|
||||
packets_send_dropped: s.packets_send_dropped,
|
||||
fec_recovered_shards: s.fec_recovered_shards,
|
||||
bytes_sent: s.bytes_sent,
|
||||
bytes_received: s.bytes_received,
|
||||
|
||||
@@ -122,7 +122,9 @@ impl Session {
|
||||
let wire = self.seal_for_wire(&pkt)?;
|
||||
StatsCounters::add(&self.stats.packets_sent, 1);
|
||||
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
|
||||
self.transport.send(&wire)?;
|
||||
if !self.transport.send(&wire)? {
|
||||
StatsCounters::add(&self.stats.packets_send_dropped, 1);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -192,7 +194,9 @@ impl Session {
|
||||
let wire = self.seal_for_wire(&pkt)?;
|
||||
StatsCounters::add(&self.stats.packets_sent, 1);
|
||||
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
|
||||
self.transport.send(&wire)?;
|
||||
if !self.transport.send(&wire)? {
|
||||
StatsCounters::add(&self.stats.packets_send_dropped, 1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,11 @@ pub struct Stats {
|
||||
pub packets_sent: u64,
|
||||
pub packets_received: u64,
|
||||
pub packets_dropped: u64,
|
||||
/// Packets the host could NOT hand to the kernel because the send buffer was full (WouldBlock)
|
||||
/// — the dominant loss mode at very high bitrate. Distinct from `packets_dropped` (recv-side
|
||||
/// reassembler rejects). A non-zero, growing value means the link/encoder is outrunning the
|
||||
/// send path; raise `net.core.wmem_max` / lower the bitrate, or wait for paced batched sending.
|
||||
pub packets_send_dropped: u64,
|
||||
pub fec_recovered_shards: u64,
|
||||
pub bytes_sent: u64,
|
||||
pub bytes_received: u64,
|
||||
@@ -27,6 +32,7 @@ pub struct StatsCounters {
|
||||
pub packets_sent: AtomicU64,
|
||||
pub packets_received: AtomicU64,
|
||||
pub packets_dropped: AtomicU64,
|
||||
pub packets_send_dropped: AtomicU64,
|
||||
pub fec_recovered_shards: AtomicU64,
|
||||
pub bytes_sent: AtomicU64,
|
||||
pub bytes_received: AtomicU64,
|
||||
@@ -47,6 +53,7 @@ impl StatsCounters {
|
||||
packets_sent: self.packets_sent.load(l),
|
||||
packets_received: self.packets_received.load(l),
|
||||
packets_dropped: self.packets_dropped.load(l),
|
||||
packets_send_dropped: self.packets_send_dropped.load(l),
|
||||
fec_recovered_shards: self.fec_recovered_shards.load(l),
|
||||
bytes_sent: self.bytes_sent.load(l),
|
||||
bytes_received: self.bytes_received.load(l),
|
||||
|
||||
@@ -57,15 +57,18 @@ pub fn loopback_pair(
|
||||
}
|
||||
|
||||
impl Transport for LoopbackTransport {
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<()> {
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<bool> {
|
||||
let n = self.tx.sent.fetch_add(1, Ordering::Relaxed);
|
||||
if self.tx.drop_period != 0 && (n % self.tx.drop_period as u64) == 0 {
|
||||
// Deterministically drop in flight (the 1st of each `drop_period` group).
|
||||
// Deterministically drop in flight (the 1st of each `drop_period` group). This models
|
||||
// NETWORK loss (the packet left the sender, then vanished), not a local send-buffer
|
||||
// drop — so it still reports `Ok(true)`: the host sent it; the recv/FEC side handles
|
||||
// the loss. (`Ok(false)` is reserved for a real WouldBlock send-buffer overflow.)
|
||||
self.tx.dropped.fetch_add(1, Ordering::Relaxed);
|
||||
return Ok(());
|
||||
return Ok(true);
|
||||
}
|
||||
self.tx.queue.lock().unwrap().push_back(packet.to_vec());
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
|
||||
|
||||
@@ -10,6 +10,10 @@ pub use udp::UdpTransport;
|
||||
/// A datagram transport. `recv` is non-blocking: it returns `Ok(None)` when no packet
|
||||
/// is currently available, so the caller (decode/present thread) never blocks here.
|
||||
pub trait Transport: Send + Sync {
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<()>;
|
||||
/// Send one packet. `Ok(true)` = handed to the kernel; `Ok(false)` = dropped locally because
|
||||
/// the send buffer was momentarily full (WouldBlock) — a non-fatal loss the FEC/keyframe path
|
||||
/// recovers, surfaced so the caller can count it (`packets_send_dropped`) instead of it being
|
||||
/// invisible. `Err` = a real send failure.
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<bool>;
|
||||
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
|
||||
}
|
||||
|
||||
@@ -25,7 +25,12 @@ impl UdpTransport {
|
||||
/// packets) or drops on the client recv — and with infinite-GOP a single lost frame
|
||||
/// freezes the decode until the next RFI refresh. Requested large; the OS clamps to
|
||||
/// `net.core.{wmem,rmem}_max` (Linux) / `kern.ipc.maxsockbuf` (macOS).
|
||||
const TARGET_SOCKBUF: usize = 8 * 1024 * 1024;
|
||||
///
|
||||
/// Sized for 1 Gbps+: at ~1.2 Gbps on the wire an 8 MB buffer is only ~49 ms of steady state,
|
||||
/// and a single multi-MB IDR keyframe (~4 MB ≈ 3300 packets) instantly fills most of it. 32 MB
|
||||
/// gives ~200 ms of headroom and absorbs a keyframe burst without EAGAIN drops. (Paced sending
|
||||
/// will reduce the buffer actually needed once it lands — see the 1 Gbps roadmap work.)
|
||||
const TARGET_SOCKBUF: usize = 32 * 1024 * 1024;
|
||||
|
||||
/// Bind `local` and `connect` to `peer`, so `send`/`recv` need no address and the
|
||||
/// kernel filters to this peer. Non-blocking, matching the [`Transport`] contract.
|
||||
@@ -60,16 +65,18 @@ impl UdpTransport {
|
||||
}
|
||||
|
||||
impl Transport for UdpTransport {
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<()> {
|
||||
fn send(&self, packet: &[u8]) -> std::io::Result<bool> {
|
||||
match self.socket.send(packet) {
|
||||
Ok(_) => Ok(()),
|
||||
Ok(_) => Ok(true),
|
||||
// The kernel UDP send buffer is momentarily full (a frame burst saturated the
|
||||
// tx queue — common right after attaching to an already-running source that
|
||||
// emits at full rate). Drop this packet rather than fail the whole stream: the
|
||||
// data plane is lossy + FEC-protected and the next frame/RFI keyframe recovers,
|
||||
// whereas blocking would queue stale frames and add latency, and erroring tears
|
||||
// the session down. Mirrors the `recv` WouldBlock handling above.
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
|
||||
// emits at full rate, and the dominant failure mode at 1 Gbps+). Drop this packet
|
||||
// rather than fail the whole stream: the data plane is lossy + FEC-protected and the
|
||||
// next frame/RFI keyframe recovers, whereas blocking would queue stale frames and add
|
||||
// latency, and erroring tears the session down. `Ok(false)` surfaces the drop so the
|
||||
// session counts it (`packets_send_dropped`) instead of it being invisible. Mirrors
|
||||
// the `recv` WouldBlock handling above.
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,12 +232,16 @@ const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10
|
||||
/// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its
|
||||
/// link (e.g. after a speed test) requests an explicit rate instead.
|
||||
const DEFAULT_BITRATE_KBPS: u32 = 20_000;
|
||||
/// Bounds a client's requested bitrate to a sane range before configuring NVENC: a 500 kbps floor
|
||||
/// keeps the stream above unusable, and a 500 Mbps ceiling guards against an absurd request
|
||||
/// exhausting the encoder / link (GF(2¹⁶) FEC lifts the old ~1 Gbps wall, but 500 Mbps already
|
||||
/// covers 5K@240). Resolved value is echoed in `Welcome::bitrate_kbps`.
|
||||
/// Bounds a client's requested bitrate before configuring NVENC: a 500 kbps floor keeps the stream
|
||||
/// above unusable, and a **2 Gbps** ceiling is generous headroom over the 1 Gbps+ target that
|
||||
/// GF(2¹⁶) Leopard FEC was built to reach — it lifts the GF(2⁸)/~1 Gbps wall, and at 1 Gbps a frame
|
||||
/// is only a few-hundred shards in one block (far under the 65535 limit). Enough for 5K@240 with
|
||||
/// margin. Resolved value is echoed in `Welcome::bitrate_kbps`. CAVEAT: the native data plane still
|
||||
/// does one `send()` syscall per packet (no `sendmmsg` batching / paced send thread yet), so
|
||||
/// sustained multi-hundred-Mbps can show send-buffer drops (counted as `packets_send_dropped`)
|
||||
/// until that lands — see the 1 Gbps work in the roadmap.
|
||||
const MIN_BITRATE_KBPS: u32 = 500;
|
||||
const MAX_BITRATE_KBPS: u32 = 500_000;
|
||||
const MAX_BITRATE_KBPS: u32 = 2_000_000;
|
||||
|
||||
/// Resolve a client's [`Hello::bitrate_kbps`] request to the rate the host will configure:
|
||||
/// `0` → host default; anything else clamped into `[MIN, MAX]`.
|
||||
@@ -1294,10 +1298,12 @@ fn resolve_compositor(pref: CompositorPref) -> Result<crate::vdisplay::Composito
|
||||
Ok(chosen)
|
||||
}
|
||||
|
||||
/// Bounds a speed-test [`ProbeRequest`] before bursting: a 1 Gbps / 5 s ceiling keeps a probe from
|
||||
/// monopolizing the link or stalling the stream for too long. GF(2¹⁶) FEC makes ~1 Gbps reachable
|
||||
/// on a LAN — ample headroom to find a session's sustainable bitrate.
|
||||
const MAX_PROBE_KBPS: u32 = 1_000_000;
|
||||
/// Bounds a speed-test [`ProbeRequest`] before bursting: a 3 Gbps / 5 s ceiling keeps a probe from
|
||||
/// monopolizing the link or stalling the stream for too long. The ceiling is set ABOVE the session
|
||||
/// bitrate cap ([`MAX_BITRATE_KBPS`], 2 Gbps) on purpose — a probe should be able to demonstrate
|
||||
/// headroom past the rate a session will actually be configured to use, so the client can pick a
|
||||
/// confident 1 Gbps+ bitrate. GF(2¹⁶) FEC makes multi-Gbps reachable on a LAN.
|
||||
const MAX_PROBE_KBPS: u32 = 3_000_000;
|
||||
const MAX_PROBE_MS: u32 = 5_000;
|
||||
|
||||
/// Run a bandwidth probe over `session`: burst zero-filled access units flagged [`FLAG_PROBE`] at
|
||||
@@ -1322,6 +1328,9 @@ fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool)
|
||||
// AU fragments into many UDP shards via sendmmsg).
|
||||
let chunk = (bytes_per_sec / 240).clamp(1200, 256 * 1024) as usize;
|
||||
let filler = vec![0u8; chunk];
|
||||
// Host send-buffer drops over the burst — at high target rates this is where the native
|
||||
// single-send()-per-packet path first loses, so report it alongside what we offered.
|
||||
let send_dropped0 = session.stats().packets_send_dropped;
|
||||
let start = std::time::Instant::now();
|
||||
let deadline = start + std::time::Duration::from_millis(duration_ms as u64);
|
||||
let mut bytes_sent = 0u64;
|
||||
@@ -1339,11 +1348,13 @@ fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool)
|
||||
}
|
||||
}
|
||||
let actual_ms = start.elapsed().as_millis() as u32;
|
||||
let send_dropped = session.stats().packets_send_dropped - send_dropped0;
|
||||
tracing::info!(
|
||||
target_kbps,
|
||||
duration_ms = actual_ms,
|
||||
bytes_sent,
|
||||
packets_sent,
|
||||
send_dropped,
|
||||
"speed-test probe burst complete"
|
||||
);
|
||||
ProbeResult {
|
||||
@@ -1400,6 +1411,12 @@ fn virtual_stream(
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||
let mut next = std::time::Instant::now();
|
||||
let mut sent: u64 = 0;
|
||||
// Throughput/drop instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate / 1 Gbps soak
|
||||
// observable: wire goodput + send-buffer drops (the dominant 1 Gbps+ loss mode) as they happen.
|
||||
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
||||
let mut last_perf = std::time::Instant::now();
|
||||
let mut last_bytes = 0u64;
|
||||
let mut last_send_dropped = 0u64;
|
||||
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||
// Service speed-test probes between frames (each burst pauses video for its duration).
|
||||
service_probes(session, stop, probe_rx, probe_result_tx);
|
||||
@@ -1441,6 +1458,22 @@ fn virtual_stream(
|
||||
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
|
||||
sent += 1;
|
||||
}
|
||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||
let s = session.stats();
|
||||
let secs = last_perf.elapsed().as_secs_f64();
|
||||
let wire_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0;
|
||||
tracing::info!(
|
||||
wire_mbps = format!("{wire_mbps:.0}"),
|
||||
frames = sent,
|
||||
packets_sent = s.packets_sent,
|
||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||
send_dropped_total = s.packets_send_dropped,
|
||||
"perf"
|
||||
);
|
||||
last_perf = std::time::Instant::now();
|
||||
last_bytes = s.bytes_sent;
|
||||
last_send_dropped = s.packets_send_dropped;
|
||||
}
|
||||
next += interval;
|
||||
match next.checked_duration_since(std::time::Instant::now()) {
|
||||
Some(d) => std::thread::sleep(d),
|
||||
|
||||
Reference in New Issue
Block a user