feat(host,probe): controlled loss injection for the native path + probe keyframe-on-drop
PUNKTFUNK_VIDEO_DROP now also covers the native data plane (N% of sealed wire packets discarded before send in paced_submit — the same FEC-test knob the GameStream path has; no netem/root needed), and the probe grows the real clients' recovery trigger: the data loop publishes the session's unrecoverable-frame count and the control task sends RequestKeyframe when it rises (100ms poll = natural coalescing). Together these make the IDR-vs-intra-refresh recovery A/B runnable against any host. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -52,7 +52,7 @@ use punktfunk_core::input::{InputEvent, InputKind};
|
||||
use punktfunk_core::packet::FLAG_PROBE;
|
||||
use punktfunk_core::quic::{
|
||||
endpoint, io, window_loss_ppm, Hello, LossReport, ProbeRequest, ProbeResult, Reconfigure,
|
||||
Reconfigured, Start, Welcome,
|
||||
Reconfigured, RequestKeyframe, Start, Welcome,
|
||||
};
|
||||
use punktfunk_core::transport::UdpTransport;
|
||||
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
|
||||
@@ -505,6 +505,11 @@ async fn session(args: Args) -> Result<()> {
|
||||
// stream mode (no speed test / remode) a control-stream task relays it to the host as a
|
||||
// LossReport so it can size FEC to the link. u32::MAX = "no fresh sample this window".
|
||||
let loss_ppm = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(u32::MAX));
|
||||
// Decode-recovery feedback, mirroring the real clients: the data loop publishes the session's
|
||||
// cumulative unrecoverable-frame count; the control task requests a keyframe when it grows
|
||||
// (the correct loss trigger under infinite GOP — see NativeClient::frames_dropped). Lets the
|
||||
// probe exercise the host's IDR-vs-intra-refresh recovery path under injected loss.
|
||||
let dropped_frames = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
|
||||
// Mid-stream renegotiation test: after a delay, ask the host to switch modes on the
|
||||
// still-open control stream. The stream then carries new-mode AUs (IDR + in-band
|
||||
@@ -617,21 +622,40 @@ async fn session(args: Args) -> Result<()> {
|
||||
});
|
||||
} else {
|
||||
// Normal stream mode: relay the data loop's windowed loss estimate to the host as periodic
|
||||
// LossReports, so it can size FEC to the link (adaptive FEC). The control stream is otherwise
|
||||
// idle here (remode/speed-test own it in their modes).
|
||||
// LossReports, so it can size FEC to the link (adaptive FEC), and — like the real clients —
|
||||
// request a keyframe whenever the unrecoverable-frame count grows (100 ms poll = a natural
|
||||
// throttle; several drops in a burst coalesce into one request). The control stream is
|
||||
// otherwise idle here (remode/speed-test own it in their modes).
|
||||
let mut ls = send;
|
||||
let lp = loss_ppm.clone();
|
||||
let df = dropped_frames.clone();
|
||||
tokio::spawn(async move {
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
let mut last_report = std::time::Instant::now();
|
||||
let mut last_dropped = 0u64;
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(750)).await;
|
||||
let v = lp.swap(u32::MAX, Relaxed);
|
||||
if v != u32::MAX
|
||||
&& io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode())
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
let d = df.load(Relaxed);
|
||||
if d > last_dropped {
|
||||
last_dropped = d;
|
||||
if io::write_msg(&mut ls, &RequestKeyframe.encode())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break; // control stream gone
|
||||
{
|
||||
break; // control stream gone
|
||||
}
|
||||
tracing::debug!(dropped = d, "unrecoverable frame — requested keyframe");
|
||||
}
|
||||
if last_report.elapsed() >= std::time::Duration::from_millis(750) {
|
||||
last_report = std::time::Instant::now();
|
||||
let v = lp.swap(u32::MAX, Relaxed);
|
||||
if v != u32::MAX
|
||||
&& io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break; // control stream gone
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -943,6 +967,7 @@ async fn session(args: Args) -> Result<()> {
|
||||
let out_path = args.out.clone();
|
||||
let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone());
|
||||
let lp_dt = loss_ppm.clone();
|
||||
let df_dt = dropped_frames.clone();
|
||||
|
||||
// Express our receive time in the host clock before differencing against the host-stamped
|
||||
// capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency
|
||||
@@ -988,6 +1013,7 @@ async fn session(args: Args) -> Result<()> {
|
||||
let s = session.stats();
|
||||
rxp_dt.store(s.packets_received, Relaxed);
|
||||
rxb_dt.store(s.bytes_received, Relaxed);
|
||||
df_dt.store(s.frames_dropped, Relaxed);
|
||||
if last_loss_report.elapsed() >= std::time::Duration::from_millis(750) {
|
||||
lp_dt.store(
|
||||
window_loss_ppm(
|
||||
|
||||
Reference in New Issue
Block a user