From f37a304fba0c491670dd066a11e56cce88ade6dc Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sat, 20 Jun 2026 17:46:17 +0000 Subject: [PATCH] fix(core/speed-test): packet-level throughput + paced burst (kill the 0/100% cliff) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The punktfunk/1 speed test was unusable across every client/host: at the start of a burst a little data got through, then everything read as dropped (~10 MB total). Two compounding bugs: 1. Receive side measured throughput from fully-reassembled FLAG_PROBE *access units* only. The instant loss crossed the 20% FEC budget no AU completed, so the figure cliffed to 0 / 100% loss even though most bytes still arrived — a binary cliff, not a graded measurement. 2. Send side blasted each filler AU (up to 256 KB ≈ 200 packets) into the socket buffer in one unpaced batch, unlike the real video path which paces. On a small buffer (e.g. the Steam Deck's 416 KB) a single AU overflowed it, so the test measured self-inflicted buffer overflow instead of the link. Fixes: - Host `run_probe_burst` keeps each AU a small (~16 KB) burst and paces by the byte budget, mirroring `paced_submit`; reports the WIRE packets the kernel accepted and the ones the send buffer dropped (stat deltas), separating host-side drops from link loss. - `ProbeResult` gains `wire_packets_sent` + `send_dropped` (back-compat decode: a 21-byte pre-wire-stats result still decodes, new fields 0). - Clients (probe + connector) count delivered traffic at the packet level via `session.stats()` deltas over the burst window, so throughput/loss degrade gracefully. Connector freezes the delivered figure when the host report lands so resumed video can't inflate it. New `ProbeOutcome`/`PunktfunkProbeResult` fields: `host_drop_pct`, `wire_packets_sent`, `send_dropped`. Validated on loopback (graded 142→1391 Mbps, host_drop/link_loss split correctly, no cliff) and live against the Deck: clean to ~200 Mbps goodput / 273 Mbps wire at 0% link loss, host send buffer the wall above that (the lever-#1 target). Co-Authored-By: Claude Opus 4.8 (1M context) --- clients/probe/src/main.rs | 108 +++++++++-------- crates/punktfunk-core/src/abi.rs | 22 +++- crates/punktfunk-core/src/client.rs | 153 ++++++++++++++++-------- crates/punktfunk-core/src/quic.rs | 54 +++++++-- crates/punktfunk-host/src/punktfunk1.rs | 37 ++++-- include/punktfunk_core.h | 19 ++- 6 files changed, 264 insertions(+), 129 deletions(-) diff --git a/clients/probe/src/main.rs b/clients/probe/src/main.rs index 6b05b99..6687d65 100644 --- a/clients/probe/src/main.rs +++ b/clients/probe/src/main.rs @@ -433,13 +433,11 @@ async fn session(args: Args) -> Result<()> { None => None, }; - // Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the - // --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold - // now_ns timestamps of the receive window (0 = unset). - let probe_recv_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - let probe_recv_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - let probe_first_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - let probe_last_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + // Packet-level receive counters mirrored from `session.stats()` by the data-plane loop. The + // speed test reads their delta over the burst window so throughput/loss reflect every delivered + // wire packet (graceful past the FEC budget), not just fully-reassembled probe AUs. + let rx_wire_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let rx_wire_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); // Mid-stream renegotiation test: after a delay, ask the host to switch modes on the // still-open control stream. The stream then carries new-mode AUs (IDR + in-band @@ -470,19 +468,25 @@ async fn session(args: Args) -> Result<()> { } }); } else if let Some((target_kbps, duration_ms)) = args.speed_test { - // Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler; - // measure what arrives vs. what it reports sending. + // Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler; measure + // delivered WIRE packets (session-stat delta) vs. what the host reports putting on the wire. let mut ss = send; let mut sr = recv; - let (pb, pp, pf, pl) = ( - probe_recv_bytes.clone(), - probe_recv_packets.clone(), - probe_first_ns.clone(), - probe_last_ns.clone(), - ); + let (rxp, rxb) = (rx_wire_packets.clone(), rx_wire_bytes.clone()); + // Per-packet wire size to express delivered bytes as link bytes (header + shard + crypto); + // every shard is zero-padded to shard_payload so all data packets are this exact size. + let crypto_overhead = if welcome.encrypt { + punktfunk_core::packet::CRYPTO_OVERHEAD as u64 + } else { + 0 + }; tokio::spawn(async move { use std::sync::atomic::Ordering::Relaxed; tokio::time::sleep(std::time::Duration::from_secs(2)).await; // let the stream warm up + // Baseline the packet-level counters right before the burst (video is paused during it, + // so the delta is pure probe traffic plus a sliver of resumed video in the settle). + let base_pkts = rxp.load(Relaxed); + let base_bytes = rxb.load(Relaxed); tracing::info!(target_kbps, duration_ms, "requesting speed-test probe"); if io::write_msg( &mut ss, @@ -505,34 +509,42 @@ async fn session(args: Args) -> Result<()> { return; } }; - // The reliable result can beat the last UDP shards — let them reassemble. - tokio::time::sleep(std::time::Duration::from_millis(400)).await; - let recv_bytes = pb.load(Relaxed); - let recv_packets = pp.load(Relaxed); - let (first, last) = (pf.load(Relaxed), pl.load(Relaxed)); - let window_ms = if first > 0 && last > first { - (last - first) / 1_000_000 + // The reliable result can beat the last UDP shards — let the tail arrive before reading. + // Keep this short: video resumes the instant the burst ends, so a long settle counts + // resumed-video packets against the probe (inflating recv past the host's wire count). + tokio::time::sleep(std::time::Duration::from_millis(60)).await; + let recv_packets = rxp.load(Relaxed).saturating_sub(base_pkts); + // bytes_received counts plaintext (header + shard); add per-packet crypto back for the + // true on-wire byte count. + let recv_wire_bytes = + rxb.load(Relaxed).saturating_sub(base_bytes) + recv_packets * crypto_overhead; + // The host's burst duration is the rate denominator (it sent for this long). + let window_ms = res.duration_ms.max(1) as u64; + let throughput_kbps = recv_wire_bytes.saturating_mul(8) / window_ms; + // Link loss: wire packets the host put out that didn't arrive. host_drop: wire packets + // the host couldn't even hand to the kernel (send buffer too small / can't keep up). + let link_loss = if res.wire_packets_sent > 0 { + (res.wire_packets_sent as i64 - recv_packets as i64).max(0) as f64 + / res.wire_packets_sent as f64 + * 100.0 } else { - 0 + 0.0 }; - let throughput_kbps = recv_bytes - .saturating_mul(8) - .checked_div(window_ms) - .unwrap_or(0); - let loss_pct = if res.bytes_sent > 0 { - res.bytes_sent.saturating_sub(recv_bytes) as f64 / res.bytes_sent as f64 * 100.0 + let offered_wire = res.wire_packets_sent + res.send_dropped; + let host_drop = if offered_wire > 0 { + res.send_dropped as f64 / offered_wire as f64 * 100.0 } else { 0.0 }; tracing::info!( target_kbps, - host_sent_bytes = res.bytes_sent, - host_sent_packets = res.packets_sent, - recv_bytes, - recv_packets, - window_ms, - throughput_kbps, - loss_pct = format!("{loss_pct:.1}%"), + target_mbps = target_kbps / 1000, + delivered_mbps = throughput_kbps / 1000, + link_loss_pct = format!("{link_loss:.1}%"), + host_drop_pct = format!("{host_drop:.1}%"), + wire_pkts_sent = res.wire_packets_sent, + wire_pkts_recv = recv_packets, + send_dropped = res.send_dropped, "SPEED TEST complete", ); }); @@ -810,12 +822,7 @@ async fn session(args: Args) -> Result<()> { let cfg = welcome.session_config(Role::Client); let expected = welcome.frames; let out_path = args.out.clone(); - let (pb, pp, pf, pl) = ( - probe_recv_bytes.clone(), - probe_recv_packets.clone(), - probe_first_ns.clone(), - probe_last_ns.clone(), - ); + let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone()); // Express our receive time in the host clock before differencing against the host-stamped // capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency @@ -851,6 +858,13 @@ async fn session(args: Args) -> Result<()> { let mut last_rx = std::time::Instant::now(); let started = std::time::Instant::now(); loop { + // Mirror packet-level receive counters for the speed-test reporter (reads their delta). + { + use std::sync::atomic::Ordering::Relaxed; + let s = session.stats(); + rxp_dt.store(s.packets_received, Relaxed); + rxb_dt.store(s.bytes_received, Relaxed); + } if expected > 0 && ok + mismatched >= expected { break; } @@ -867,15 +881,9 @@ async fn session(args: Args) -> Result<()> { match session.poll_frame() { Ok(frame) => { last_rx = std::time::Instant::now(); - // Speed-test filler isn't video: fold it into the probe accumulators and skip - // verification / the --out sink. + // Speed-test filler isn't video: it's measured via the packet-level counters + // mirrored at the loop head — skip verification / the --out sink. if frame.flags & FLAG_PROBE as u32 != 0 { - use std::sync::atomic::Ordering::Relaxed; - let n = now_ns(); - let _ = pf.compare_exchange(0, n, Relaxed, Relaxed); - pl.store(n, Relaxed); - pb.fetch_add(frame.data.len() as u64, Relaxed); - pp.fetch_add(1, Relaxed); continue; } bytes += frame.data.len() as u64; diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index a2592bb..f2bbbf9 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -1525,24 +1525,31 @@ pub unsafe extern "C" fn punktfunk_connection_frames_dropped( /// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until /// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the -/// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate. +/// delivered wire throughput to drive a bitrate choice from; `loss_pct` is the link loss and +/// `host_drop_pct` the host-side send-buffer drop (raise `net.core.wmem_max`) — they're measured +/// separately so a host that can't keep up reads differently from a lossy link. #[repr(C)] #[derive(Clone, Copy, Debug, Default)] pub struct PunktfunkProbeResult { /// 1 once the host's end-of-burst report arrived (measurement final); else 0 (partial). pub done: u8, - /// Probe payload bytes / packets the client received. + /// Delivered wire bytes (header + shard) / packets the client received during the burst. pub recv_bytes: u64, pub recv_packets: u32, - /// Probe payload bytes / packets the host reported sending. + /// Application goodput bytes / access units the host offered. pub host_bytes: u64, pub host_packets: u32, - /// Client-measured receive window (first→last probe AU), milliseconds. + /// The host's measured burst duration, milliseconds (the throughput denominator). pub elapsed_ms: u32, - /// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). + /// Delivered wire throughput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). pub throughput_kbps: u32, - /// Delivery loss `(host_bytes - recv_bytes) / host_bytes` as a percentage (0 if unknown). + /// Link loss `(wire_packets_sent − recv_packets) / wire_packets_sent` as a percentage. pub loss_pct: f32, + /// Host-side send-buffer drop `send_dropped / (wire_packets_sent + send_dropped)`, percent. + pub host_drop_pct: f32, + /// Wire packets the host put on the link, and the ones its send buffer dropped (raw counts). + pub wire_packets_sent: u32, + pub send_dropped: u32, } /// Start a bandwidth speed test: ask the host to burst filler over the data plane at @@ -1602,6 +1609,9 @@ pub unsafe extern "C" fn punktfunk_connection_probe_result( elapsed_ms: o.elapsed_ms, throughput_kbps: o.throughput_kbps, loss_pct: o.loss_pct, + host_drop_pct: o.host_drop_pct, + wire_packets_sent: o.wire_packets_sent, + send_dropped: o.send_dropped, }; } PunktfunkStatus::Ok diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 1f02969..0fee134 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -24,7 +24,7 @@ use crate::transport::UdpTransport; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; /// A control-stream request the embedder makes on the open handshake stream: a mode switch or a /// speed test. One outbound channel carries both so the worker's `select!` has a single writer @@ -41,22 +41,35 @@ enum CtrlRequest { /// (ns, host minus client; 0 = no skew correction / an old host that didn't answer the handshake). type Negotiated = (Mode, CompositorPref, GamepadPref, [u8; 32], u32, i64); -/// Accumulated state of an in-flight / finished speed test. The data-plane pump folds each -/// received [`FLAG_PROBE`] access unit in; the control task records the host's [`ProbeResult`] -/// when it lands. Read (and finalized into numbers) by [`NativeClient::probe_result`]. +/// Accumulated state of an in-flight / finished speed test. The data-plane pump mirrors the +/// session's packet-level receive counters here; the control task finalizes the delivered figure +/// and folds in the host's [`ProbeResult`] when it lands. Read by [`NativeClient::probe_result`]. +/// +/// Counting at the *packet* level (every delivered wire packet) — not whole reassembled probe AUs — +/// is what makes the measurement degrade gracefully: once loss exceeds the FEC budget no AU +/// completes, so the old AU-based count cliffed to zero even though most bytes still arrived. #[derive(Default)] struct ProbeState { /// A probe is in progress (set by `request_probe`, cleared by nothing — the latest one wins). active: bool, - /// Probe access-unit payload bytes the client received, and their count. - recv_bytes: u64, - recv_packets: u32, - /// First/last probe AU arrival — the measured receive window. - start: Option, - last: Option, - /// The host's report ([`ProbeResult`]); present once the burst finished. - host_bytes: u64, - host_packets: u32, + /// `session.stats()` receive counters at the burst's start (snapshotted by the pump on its first + /// tick while active) and latest, mirrored every pump iteration. + base_packets: Option, + base_bytes: Option, + rx_packets_now: u64, + rx_bytes_now: u64, + /// Delivered wire packets / plaintext bytes (header + shard), frozen when the host's report lands + /// (so resumed video after the burst can't inflate them). + delivered_packets: u64, + delivered_bytes: u64, + /// The host's end-of-burst report. + host_goodput_bytes: u64, + host_au: u32, + /// Wire packets the host actually put on the link, and the ones its send buffer dropped. + host_wire_packets: u32, + host_send_dropped: u32, + /// The host's measured burst duration (the throughput denominator). + host_duration_ms: u32, /// The host's `ProbeResult` arrived → the measurement is final. done: bool, } @@ -66,19 +79,27 @@ struct ProbeState { pub struct ProbeOutcome { /// The host's end-of-burst report has arrived — the numbers below are final. pub done: bool, - /// Probe payload bytes / packets the client received. + /// Delivered wire bytes (header + shard) / packets the client received during the burst. pub recv_bytes: u64, pub recv_packets: u32, - /// Probe payload bytes / packets the host reported sending. + /// Application goodput bytes / access units the host offered. pub host_bytes: u64, pub host_packets: u32, - /// The client-measured receive window (first→last probe AU), in milliseconds. + /// The burst duration the host measured, in milliseconds (the throughput denominator). pub elapsed_ms: u32, - /// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). This is the figure to - /// drive a [`Hello::bitrate_kbps`] choice from. + /// Delivered wire throughput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). The figure to + /// drive a [`Hello::bitrate_kbps`] choice from (allow headroom for the FEC overhead + loss). pub throughput_kbps: u32, - /// Delivery loss = `(host_bytes - recv_bytes) / host_bytes`, as a percentage (0 if unknown). + /// Link loss = `(wire_packets_sent − received) / wire_packets_sent`, percent. Packets the host + /// put on the wire that didn't arrive. pub loss_pct: f32, + /// Host-side drop = `send_dropped / (wire_packets_sent + send_dropped)`, percent. Packets the + /// host's send buffer couldn't accept (raise `net.core.wmem_max` / lower the rate). Distinct + /// from `loss_pct`: this is the host failing to keep up, not the link dropping traffic. + pub host_drop_pct: f32, + /// Wire packets the host put on the link and the ones its send buffer dropped (raw counts). + pub wire_packets_sent: u32, + pub send_dropped: u32, } /// Frames buffered between the data-plane pump and the embedder. Small: the embedder @@ -458,30 +479,52 @@ impl NativeClient { /// end-of-burst report lands). Derives goodput + loss from the accumulated probe bytes. pub fn probe_result(&self) -> ProbeOutcome { let p = self.probe.lock().unwrap(); - let elapsed_ms = match (p.start, p.last) { - (Some(s), Some(l)) => l.duration_since(s).as_millis() as u32, - _ => 0, + // Delivered figures: live (rx_now − base) while the burst runs, frozen at the host's report. + let (delivered_packets, delivered_bytes) = if p.done { + (p.delivered_packets, p.delivered_bytes) + } else { + let base_p = p.base_packets.unwrap_or(p.rx_packets_now); + let base_b = p.base_bytes.unwrap_or(p.rx_bytes_now); + ( + p.rx_packets_now.saturating_sub(base_p), + p.rx_bytes_now.saturating_sub(base_b), + ) }; - // bytes × 8 / ms = kilobits/second. - let throughput_kbps = if elapsed_ms > 0 { - (p.recv_bytes.saturating_mul(8) / elapsed_ms as u64) as u32 + // The host's burst duration is the throughput denominator. bytes × 8 / ms = kilobits/second. + let window_ms = p.host_duration_ms; + let throughput_kbps = if window_ms > 0 { + (delivered_bytes.saturating_mul(8) / window_ms as u64) as u32 } else { 0 }; - let loss_pct = if p.host_bytes > 0 { - p.host_bytes.saturating_sub(p.recv_bytes) as f64 / p.host_bytes as f64 * 100.0 + // Link loss: wire packets the host put out that didn't arrive. Packet-level, so it degrades + // smoothly past the FEC budget instead of cliffing to 100% the moment AUs stop completing. + let loss_pct = if p.host_wire_packets > 0 { + (p.host_wire_packets as i64 - delivered_packets as i64).max(0) as f64 + / p.host_wire_packets as f64 + * 100.0 + } else { + 0.0 + } as f32; + // Host-side drop: what the send buffer couldn't even accept (the host-side ceiling). + let offered_wire = p.host_wire_packets + p.host_send_dropped; + let host_drop_pct = if offered_wire > 0 { + p.host_send_dropped as f64 / offered_wire as f64 * 100.0 } else { 0.0 } as f32; ProbeOutcome { done: p.done, - recv_bytes: p.recv_bytes, - recv_packets: p.recv_packets, - host_bytes: p.host_bytes, - host_packets: p.host_packets, - elapsed_ms, + recv_bytes: delivered_bytes, + recv_packets: delivered_packets as u32, + host_bytes: p.host_goodput_bytes, + host_packets: p.host_au, + elapsed_ms: window_ms, throughput_kbps, loss_pct, + host_drop_pct, + wire_packets_sent: p.host_wire_packets, + send_dropped: p.host_send_dropped, } } @@ -824,13 +867,24 @@ async fn worker_main(args: WorkerArgs) { } } else if let Ok(result) = ProbeResult::decode(&msg) { let mut p = probe.lock().unwrap(); - p.host_bytes = result.bytes_sent; - p.host_packets = result.packets_sent; + // Freeze the delivered figures now (the burst is done), before resumed + // video can inflate the packet counters. + let base_p = p.base_packets.unwrap_or(p.rx_packets_now); + let base_b = p.base_bytes.unwrap_or(p.rx_bytes_now); + p.delivered_packets = p.rx_packets_now.saturating_sub(base_p); + p.delivered_bytes = p.rx_bytes_now.saturating_sub(base_b); + p.host_goodput_bytes = result.bytes_sent; + p.host_au = result.packets_sent; + p.host_wire_packets = result.wire_packets_sent; + p.host_send_dropped = result.send_dropped; + p.host_duration_ms = result.duration_ms; p.done = true; tracing::info!( - bytes_sent = result.bytes_sent, - packets_sent = result.packets_sent, + host_goodput_bytes = result.bytes_sent, + wire_packets_sent = result.wire_packets_sent, + send_dropped = result.send_dropped, duration_ms = result.duration_ms, + delivered_packets = p.delivered_packets, "speed-test probe result" ); } else { @@ -892,21 +946,24 @@ async fn worker_main(args: WorkerArgs) { pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump while !pump_shutdown.load(Ordering::SeqCst) { // Mirror the reassembler's unrecoverable-drop count for the client's keyframe-recovery - // loop. Updated every iteration (not just on a produced frame) so it stays current through - // a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads. - frames_dropped.store(session.stats().frames_dropped, Ordering::Relaxed); + // loop, and (during a speed test) the packet-level receive counters for the throughput + // measurement. Updated every iteration (not just on a produced frame) so they stay current + // through a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads. + let st = session.stats(); + frames_dropped.store(st.frames_dropped, Ordering::Relaxed); + { + let mut p = pump_probe.lock().unwrap(); + if p.active && !p.done { + p.rx_packets_now = st.packets_received; + p.rx_bytes_now = st.bytes_received; + p.base_packets.get_or_insert(st.packets_received); + p.base_bytes.get_or_insert(st.bytes_received); + } + } match session.poll_frame() { Ok(frame) => { if frame.flags & FLAG_PROBE as u32 != 0 { - let mut p = pump_probe.lock().unwrap(); - if p.active { - let now = Instant::now(); - p.start.get_or_insert(now); - p.last = Some(now); - p.recv_bytes += frame.data.len() as u64; - p.recv_packets += 1; - } - continue; // not video — never enqueue for the decoder + continue; // speed-test filler, not video — measured via the counters above } let _ = frame_tx.try_send(frame); } diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 8fcf477..e2bb38a 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -181,17 +181,30 @@ pub struct ProbeRequest { pub duration_ms: u32, } -/// `host → client`: the probe burst is finished. Reports what the host actually sent so the -/// client can compute delivery ratio (loss) = `received / bytes_sent` and throughput = -/// `received_bytes * 8 / elapsed`. +/// `host → client`: the probe burst is finished. Reports what the host actually put on the wire so +/// the client can split the two failure modes apart: **host-side** drops (the send buffer couldn't +/// keep up — raise `net.core.wmem_max`) vs **link** loss (wire packets the air dropped). The client +/// measures delivered wire packets itself and computes: +/// +/// - link loss = `(wire_packets_sent − received) / wire_packets_sent` +/// - host drop = `send_dropped / (wire_packets_sent + send_dropped)` +/// - throughput = `received_wire_bytes * 8 / duration_ms` +/// +/// Counting delivered traffic at the *packet* level (not whole reassembled AUs) makes the figure +/// degrade gracefully past the FEC budget instead of cliffing to zero. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct ProbeResult { - /// Total access-unit payload bytes the host emitted for the probe. + /// Total access-unit payload bytes the host emitted for the probe (application goodput offered). pub bytes_sent: u64, /// Number of probe access units the host emitted. pub packets_sent: u32, /// The burst's actual duration in milliseconds (the host clamps/measures the request). pub duration_ms: u32, + /// Wire packets the kernel ACCEPTED for transmission — what actually went on the link (offered + /// minus the send-buffer drops below). `0` from a pre-wire-stats host (back-compat decode). + pub wire_packets_sent: u32, + /// Wire packets the host could NOT hand to the kernel (send buffer full): the host-side ceiling. + pub send_dropped: u32, } /// `client → host`, right after [`Start`]: one round of the wall-clock skew handshake. The client @@ -834,23 +847,36 @@ impl ProbeRequest { impl ProbeResult { pub fn encode(&self) -> Vec { // magic[0..4] type[4] bytes_sent[5..13] packets_sent[13..17] duration_ms[17..21] - let mut b = Vec::with_capacity(21); + // wire_packets_sent[21..25] send_dropped[25..29] + let mut b = Vec::with_capacity(29); b.extend_from_slice(CTL_MAGIC); b.push(MSG_PROBE_RESULT); b.extend_from_slice(&self.bytes_sent.to_le_bytes()); b.extend_from_slice(&self.packets_sent.to_le_bytes()); b.extend_from_slice(&self.duration_ms.to_le_bytes()); + b.extend_from_slice(&self.wire_packets_sent.to_le_bytes()); + b.extend_from_slice(&self.send_dropped.to_le_bytes()); b } pub fn decode(b: &[u8]) -> Result { - if b.len() != 21 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_RESULT { + // Back-compat: 21 bytes (pre-wire-stats host, new fields default 0) or 29 bytes (with the + // wire_packets_sent + send_dropped tail). Accept either; reject anything shorter/garbled. + if b.len() < 21 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_RESULT { return Err(PunktfunkError::InvalidArg("bad ProbeResult")); } + let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]); + let (wire_packets_sent, send_dropped) = if b.len() >= 29 { + (u32at(21), u32at(25)) + } else { + (0, 0) + }; Ok(ProbeResult { bytes_sent: u64::from_le_bytes(b[5..13].try_into().unwrap()), - packets_sent: u32::from_le_bytes(b[13..17].try_into().unwrap()), - duration_ms: u32::from_le_bytes(b[17..21].try_into().unwrap()), + packets_sent: u32at(13), + duration_ms: u32at(17), + wire_packets_sent, + send_dropped, }) } } @@ -1862,8 +1888,20 @@ mod tests { bytes_sent: 62_500_000, packets_sent: 480, duration_ms: 2003, + wire_packets_sent: 41_000, + send_dropped: 1_200, }; assert_eq!(ProbeResult::decode(&res.encode()).unwrap(), res); + assert_eq!(res.encode().len(), 29); + // A pre-wire-stats host's 21-byte ProbeResult still decodes, with the new fields zeroed. + let legacy = { + let full = res.encode(); + full[..21].to_vec() + }; + let decoded = ProbeResult::decode(&legacy).unwrap(); + assert_eq!(decoded.wire_packets_sent, 0); + assert_eq!(decoded.send_dropped, 0); + assert_eq!(decoded.bytes_sent, res.bytes_sent); // Type bytes keep the control messages disjoint from each other. assert!(ProbeRequest::decode(&res.encode()).is_err()); assert!(Reconfigure::decode(&req.encode()).is_err()); diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index 9ad5f3d..2eceebe 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -1667,26 +1667,35 @@ fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool) bytes_sent: 0, packets_sent: 0, duration_ms: 0, + wire_packets_sent: 0, + send_dropped: 0, }; } // kbps -> bytes/s (x1000/8). let bytes_per_sec = target_kbps as u64 * 125; - // ~240 AUs/s for smooth pacing, each capped so one submit_frame stays a bounded burst (a large - // AU fragments into many UDP shards via sendmmsg). - let chunk = (bytes_per_sec / 240).clamp(1200, 256 * 1024) as usize; + // Keep each AU a SMALL burst (~16 KB ≈ a dozen MTU shards) and let the byte budget below pace + // the rate finely. The old 256 KB cap blasted ~200 packets into the send buffer per submit, so + // a small buffer (e.g. the Deck's 416 KB) overflowed on a single AU and the test measured + // self-inflicted buffer overflow instead of the link — mirror how `paced_submit` spreads the + // real video path's frames so the probe stresses the same way a real stream does. + let chunk = (bytes_per_sec / 240).clamp(1200, 16 * 1024) as usize; let filler = vec![0u8; chunk]; - // Host send-buffer drops over the burst — at high target rates this is where the native - // single-send()-per-packet path first loses, so report it alongside what we offered. - let send_dropped0 = session.stats().packets_send_dropped; + // Wire-packet accounting via session-stat deltas: `packets_sent` counts every sealed wire packet + // (seal_frame), `packets_send_dropped` every one the send buffer rejected (WouldBlock/ENOBUFS). + // Their delta over the burst is exact — and isolates host-side drops from link loss for the + // client. Video is paused for the burst (the data-plane loop is blocked here), so these deltas + // are pure probe traffic. + let wire0 = session.stats().packets_sent; + let drop0 = session.stats().packets_send_dropped; let start = std::time::Instant::now(); let deadline = start + std::time::Duration::from_millis(duration_ms as u64); let mut bytes_sent = 0u64; - let mut packets_sent = 0u32; + let mut packets_sent = 0u32; // probe access-unit count (goodput chunks) while std::time::Instant::now() < deadline && !stop.load(Ordering::SeqCst) { let allowed = (start.elapsed().as_secs_f64() * bytes_per_sec as f64) as u64; if bytes_sent < allowed { - // A full send buffer drops on WouldBlock (UdpTransport returns Ok) — that loss is part - // of what the probe measures, so count what we offered and keep going. + // A full send buffer drops on WouldBlock/ENOBUFS (UdpTransport returns Ok) — that loss is + // part of what the probe measures (it surfaces as send_dropped), so keep going. let _ = session.submit_frame(&filler, now_ns(), FLAG_PROBE as u32); bytes_sent += chunk as u64; packets_sent += 1; @@ -1695,12 +1704,16 @@ fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool) } } let actual_ms = start.elapsed().as_millis() as u32; - let send_dropped = session.stats().packets_send_dropped - send_dropped0; + let wire_offered = (session.stats().packets_sent - wire0) as u32; + let send_dropped = (session.stats().packets_send_dropped - drop0) as u32; + let wire_packets_sent = wire_offered.saturating_sub(send_dropped); tracing::info!( target_kbps, duration_ms = actual_ms, bytes_sent, - packets_sent, + au_count = packets_sent, + wire_offered, + wire_packets_sent, send_dropped, "speed-test probe burst complete" ); @@ -1708,6 +1721,8 @@ fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool) bytes_sent, packets_sent, duration_ms: actual_ms, + wire_packets_sent, + send_dropped, } } diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index e4d0e41..fe81183 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -468,22 +468,29 @@ typedef struct { // A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until // the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the -// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate. +// delivered wire throughput to drive a bitrate choice from; `loss_pct` is the link loss and +// `host_drop_pct` the host-side send-buffer drop (raise `net.core.wmem_max`) — they're measured +// separately so a host that can't keep up reads differently from a lossy link. typedef struct { // 1 once the host's end-of-burst report arrived (measurement final); else 0 (partial). uint8_t done; - // Probe payload bytes / packets the client received. + // Delivered wire bytes (header + shard) / packets the client received during the burst. uint64_t recv_bytes; uint32_t recv_packets; - // Probe payload bytes / packets the host reported sending. + // Application goodput bytes / access units the host offered. uint64_t host_bytes; uint32_t host_packets; - // Client-measured receive window (first→last probe AU), milliseconds. + // The host's measured burst duration, milliseconds (the throughput denominator). uint32_t elapsed_ms; - // Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). + // Delivered wire throughput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). uint32_t throughput_kbps; - // Delivery loss `(host_bytes - recv_bytes) / host_bytes` as a percentage (0 if unknown). + // Link loss `(wire_packets_sent − recv_packets) / wire_packets_sent` as a percentage. float loss_pct; + // Host-side send-buffer drop `send_dropped / (wire_packets_sent + send_dropped)`, percent. + float host_drop_pct; + // Wire packets the host put on the link, and the ones its send buffer dropped (raw counts). + uint32_t wire_packets_sent; + uint32_t send_dropped; } PunktfunkProbeResult; #ifdef __cplusplus