fix(core/speed-test): packet-level throughput + paced burst (kill the 0/100% cliff)
The punktfunk/1 speed test was unusable across every client/host: at the start of a burst a little data got through, then everything read as dropped (~10 MB total). Two compounding bugs: 1. Receive side measured throughput from fully-reassembled FLAG_PROBE *access units* only. The instant loss crossed the 20% FEC budget no AU completed, so the figure cliffed to 0 / 100% loss even though most bytes still arrived — a binary cliff, not a graded measurement. 2. Send side blasted each filler AU (up to 256 KB ≈ 200 packets) into the socket buffer in one unpaced batch, unlike the real video path which paces. On a small buffer (e.g. the Steam Deck's 416 KB) a single AU overflowed it, so the test measured self-inflicted buffer overflow instead of the link. Fixes: - Host `run_probe_burst` keeps each AU a small (~16 KB) burst and paces by the byte budget, mirroring `paced_submit`; reports the WIRE packets the kernel accepted and the ones the send buffer dropped (stat deltas), separating host-side drops from link loss. - `ProbeResult` gains `wire_packets_sent` + `send_dropped` (back-compat decode: a 21-byte pre-wire-stats result still decodes, new fields 0). - Clients (probe + connector) count delivered traffic at the packet level via `session.stats()` deltas over the burst window, so throughput/loss degrade gracefully. Connector freezes the delivered figure when the host report lands so resumed video can't inflate it. New `ProbeOutcome`/`PunktfunkProbeResult` fields: `host_drop_pct`, `wire_packets_sent`, `send_dropped`. Validated on loopback (graded 142→1391 Mbps, host_drop/link_loss split correctly, no cliff) and live against the Deck: clean to ~200 Mbps goodput / 273 Mbps wire at 0% link loss, host send buffer the wall above that (the lever-#1 target). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+58
-50
@@ -433,13 +433,11 @@ async fn session(args: Args) -> Result<()> {
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the
|
||||
// --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold
|
||||
// now_ns timestamps of the receive window (0 = unset).
|
||||
let probe_recv_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let probe_recv_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let probe_first_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let probe_last_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
// Packet-level receive counters mirrored from `session.stats()` by the data-plane loop. The
|
||||
// speed test reads their delta over the burst window so throughput/loss reflect every delivered
|
||||
// wire packet (graceful past the FEC budget), not just fully-reassembled probe AUs.
|
||||
let rx_wire_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let rx_wire_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
|
||||
// Mid-stream renegotiation test: after a delay, ask the host to switch modes on the
|
||||
// still-open control stream. The stream then carries new-mode AUs (IDR + in-band
|
||||
@@ -470,19 +468,25 @@ async fn session(args: Args) -> Result<()> {
|
||||
}
|
||||
});
|
||||
} else if let Some((target_kbps, duration_ms)) = args.speed_test {
|
||||
// Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler;
|
||||
// measure what arrives vs. what it reports sending.
|
||||
// Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler; measure
|
||||
// delivered WIRE packets (session-stat delta) vs. what the host reports putting on the wire.
|
||||
let mut ss = send;
|
||||
let mut sr = recv;
|
||||
let (pb, pp, pf, pl) = (
|
||||
probe_recv_bytes.clone(),
|
||||
probe_recv_packets.clone(),
|
||||
probe_first_ns.clone(),
|
||||
probe_last_ns.clone(),
|
||||
);
|
||||
let (rxp, rxb) = (rx_wire_packets.clone(), rx_wire_bytes.clone());
|
||||
// Per-packet wire size to express delivered bytes as link bytes (header + shard + crypto);
|
||||
// every shard is zero-padded to shard_payload so all data packets are this exact size.
|
||||
let crypto_overhead = if welcome.encrypt {
|
||||
punktfunk_core::packet::CRYPTO_OVERHEAD as u64
|
||||
} else {
|
||||
0
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await; // let the stream warm up
|
||||
// Baseline the packet-level counters right before the burst (video is paused during it,
|
||||
// so the delta is pure probe traffic plus a sliver of resumed video in the settle).
|
||||
let base_pkts = rxp.load(Relaxed);
|
||||
let base_bytes = rxb.load(Relaxed);
|
||||
tracing::info!(target_kbps, duration_ms, "requesting speed-test probe");
|
||||
if io::write_msg(
|
||||
&mut ss,
|
||||
@@ -505,34 +509,42 @@ async fn session(args: Args) -> Result<()> {
|
||||
return;
|
||||
}
|
||||
};
|
||||
// The reliable result can beat the last UDP shards — let them reassemble.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
|
||||
let recv_bytes = pb.load(Relaxed);
|
||||
let recv_packets = pp.load(Relaxed);
|
||||
let (first, last) = (pf.load(Relaxed), pl.load(Relaxed));
|
||||
let window_ms = if first > 0 && last > first {
|
||||
(last - first) / 1_000_000
|
||||
// The reliable result can beat the last UDP shards — let the tail arrive before reading.
|
||||
// Keep this short: video resumes the instant the burst ends, so a long settle counts
|
||||
// resumed-video packets against the probe (inflating recv past the host's wire count).
|
||||
tokio::time::sleep(std::time::Duration::from_millis(60)).await;
|
||||
let recv_packets = rxp.load(Relaxed).saturating_sub(base_pkts);
|
||||
// bytes_received counts plaintext (header + shard); add per-packet crypto back for the
|
||||
// true on-wire byte count.
|
||||
let recv_wire_bytes =
|
||||
rxb.load(Relaxed).saturating_sub(base_bytes) + recv_packets * crypto_overhead;
|
||||
// The host's burst duration is the rate denominator (it sent for this long).
|
||||
let window_ms = res.duration_ms.max(1) as u64;
|
||||
let throughput_kbps = recv_wire_bytes.saturating_mul(8) / window_ms;
|
||||
// Link loss: wire packets the host put out that didn't arrive. host_drop: wire packets
|
||||
// the host couldn't even hand to the kernel (send buffer too small / can't keep up).
|
||||
let link_loss = if res.wire_packets_sent > 0 {
|
||||
(res.wire_packets_sent as i64 - recv_packets as i64).max(0) as f64
|
||||
/ res.wire_packets_sent as f64
|
||||
* 100.0
|
||||
} else {
|
||||
0
|
||||
0.0
|
||||
};
|
||||
let throughput_kbps = recv_bytes
|
||||
.saturating_mul(8)
|
||||
.checked_div(window_ms)
|
||||
.unwrap_or(0);
|
||||
let loss_pct = if res.bytes_sent > 0 {
|
||||
res.bytes_sent.saturating_sub(recv_bytes) as f64 / res.bytes_sent as f64 * 100.0
|
||||
let offered_wire = res.wire_packets_sent + res.send_dropped;
|
||||
let host_drop = if offered_wire > 0 {
|
||||
res.send_dropped as f64 / offered_wire as f64 * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
tracing::info!(
|
||||
target_kbps,
|
||||
host_sent_bytes = res.bytes_sent,
|
||||
host_sent_packets = res.packets_sent,
|
||||
recv_bytes,
|
||||
recv_packets,
|
||||
window_ms,
|
||||
throughput_kbps,
|
||||
loss_pct = format!("{loss_pct:.1}%"),
|
||||
target_mbps = target_kbps / 1000,
|
||||
delivered_mbps = throughput_kbps / 1000,
|
||||
link_loss_pct = format!("{link_loss:.1}%"),
|
||||
host_drop_pct = format!("{host_drop:.1}%"),
|
||||
wire_pkts_sent = res.wire_packets_sent,
|
||||
wire_pkts_recv = recv_packets,
|
||||
send_dropped = res.send_dropped,
|
||||
"SPEED TEST complete",
|
||||
);
|
||||
});
|
||||
@@ -810,12 +822,7 @@ async fn session(args: Args) -> Result<()> {
|
||||
let cfg = welcome.session_config(Role::Client);
|
||||
let expected = welcome.frames;
|
||||
let out_path = args.out.clone();
|
||||
let (pb, pp, pf, pl) = (
|
||||
probe_recv_bytes.clone(),
|
||||
probe_recv_packets.clone(),
|
||||
probe_first_ns.clone(),
|
||||
probe_last_ns.clone(),
|
||||
);
|
||||
let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone());
|
||||
|
||||
// Express our receive time in the host clock before differencing against the host-stamped
|
||||
// capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency
|
||||
@@ -851,6 +858,13 @@ async fn session(args: Args) -> Result<()> {
|
||||
let mut last_rx = std::time::Instant::now();
|
||||
let started = std::time::Instant::now();
|
||||
loop {
|
||||
// Mirror packet-level receive counters for the speed-test reporter (reads their delta).
|
||||
{
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
let s = session.stats();
|
||||
rxp_dt.store(s.packets_received, Relaxed);
|
||||
rxb_dt.store(s.bytes_received, Relaxed);
|
||||
}
|
||||
if expected > 0 && ok + mismatched >= expected {
|
||||
break;
|
||||
}
|
||||
@@ -867,15 +881,9 @@ async fn session(args: Args) -> Result<()> {
|
||||
match session.poll_frame() {
|
||||
Ok(frame) => {
|
||||
last_rx = std::time::Instant::now();
|
||||
// Speed-test filler isn't video: fold it into the probe accumulators and skip
|
||||
// verification / the --out sink.
|
||||
// Speed-test filler isn't video: it's measured via the packet-level counters
|
||||
// mirrored at the loop head — skip verification / the --out sink.
|
||||
if frame.flags & FLAG_PROBE as u32 != 0 {
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
let n = now_ns();
|
||||
let _ = pf.compare_exchange(0, n, Relaxed, Relaxed);
|
||||
pl.store(n, Relaxed);
|
||||
pb.fetch_add(frame.data.len() as u64, Relaxed);
|
||||
pp.fetch_add(1, Relaxed);
|
||||
continue;
|
||||
}
|
||||
bytes += frame.data.len() as u64;
|
||||
|
||||
Reference in New Issue
Block a user