feat(host,probe): controlled loss injection for the native path + probe keyframe-on-drop

PUNKTFUNK_VIDEO_DROP now also covers the native data plane (N% of
sealed wire packets discarded before send in paced_submit — the same
FEC-test knob the GameStream path has; no netem/root needed), and the
probe grows the real clients' recovery trigger: the data loop publishes
the session's unrecoverable-frame count and the control task sends
RequestKeyframe when it rises (100ms poll = natural coalescing).
Together these make the IDR-vs-intra-refresh recovery A/B runnable
against any host.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 14:12:23 +00:00
parent fa4c798a25
commit 49e6021ece
3 changed files with 62 additions and 11 deletions
+35 -9
View File
@@ -52,7 +52,7 @@ use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::FLAG_PROBE;
use punktfunk_core::quic::{
endpoint, io, window_loss_ppm, Hello, LossReport, ProbeRequest, ProbeResult, Reconfigure,
Reconfigured, Start, Welcome,
Reconfigured, RequestKeyframe, Start, Welcome,
};
use punktfunk_core::transport::UdpTransport;
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
@@ -505,6 +505,11 @@ async fn session(args: Args) -> Result<()> {
// stream mode (no speed test / remode) a control-stream task relays it to the host as a
// LossReport so it can size FEC to the link. u32::MAX = "no fresh sample this window".
let loss_ppm = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(u32::MAX));
// Decode-recovery feedback, mirroring the real clients: the data loop publishes the session's
// cumulative unrecoverable-frame count; the control task requests a keyframe when it grows
// (the correct loss trigger under infinite GOP — see NativeClient::frames_dropped). Lets the
// probe exercise the host's IDR-vs-intra-refresh recovery path under injected loss.
let dropped_frames = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
// Mid-stream renegotiation test: after a delay, ask the host to switch modes on the
// still-open control stream. The stream then carries new-mode AUs (IDR + in-band
@@ -617,21 +622,40 @@ async fn session(args: Args) -> Result<()> {
});
} else {
// Normal stream mode: relay the data loop's windowed loss estimate to the host as periodic
// LossReports, so it can size FEC to the link (adaptive FEC). The control stream is otherwise
// idle here (remode/speed-test own it in their modes).
// LossReports, so it can size FEC to the link (adaptive FEC), and — like the real clients —
// request a keyframe whenever the unrecoverable-frame count grows (100 ms poll = a natural
// throttle; several drops in a burst coalesce into one request). The control stream is
// otherwise idle here (remode/speed-test own it in their modes).
let mut ls = send;
let lp = loss_ppm.clone();
let df = dropped_frames.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering::Relaxed;
let mut last_report = std::time::Instant::now();
let mut last_dropped = 0u64;
loop {
tokio::time::sleep(std::time::Duration::from_millis(750)).await;
let v = lp.swap(u32::MAX, Relaxed);
if v != u32::MAX
&& io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode())
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let d = df.load(Relaxed);
if d > last_dropped {
last_dropped = d;
if io::write_msg(&mut ls, &RequestKeyframe.encode())
.await
.is_err()
{
break; // control stream gone
{
break; // control stream gone
}
tracing::debug!(dropped = d, "unrecoverable frame — requested keyframe");
}
if last_report.elapsed() >= std::time::Duration::from_millis(750) {
last_report = std::time::Instant::now();
let v = lp.swap(u32::MAX, Relaxed);
if v != u32::MAX
&& io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode())
.await
.is_err()
{
break; // control stream gone
}
}
}
});
@@ -943,6 +967,7 @@ async fn session(args: Args) -> Result<()> {
let out_path = args.out.clone();
let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone());
let lp_dt = loss_ppm.clone();
let df_dt = dropped_frames.clone();
// Express our receive time in the host clock before differencing against the host-stamped
// capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency
@@ -988,6 +1013,7 @@ async fn session(args: Args) -> Result<()> {
let s = session.stats();
rxp_dt.store(s.packets_received, Relaxed);
rxb_dt.store(s.bytes_received, Relaxed);
df_dt.store(s.frames_dropped, Relaxed);
if last_loss_report.elapsed() >= std::time::Duration::from_millis(750) {
lp_dt.store(
window_loss_ppm(