diff --git a/crates/punktfunk-client-linux/src/session.rs b/crates/punktfunk-client-linux/src/session.rs index e3cb8cc..28ac757 100644 --- a/crates/punktfunk-client-linux/src/session.rs +++ b/crates/punktfunk-client-linux/src/session.rs @@ -157,6 +157,9 @@ fn pump( let mut decode_us_sum = 0u64; let mut lat_us: Vec = Vec::with_capacity(256); let mut pcm = vec![0f32; 5760 * 2]; // decode scratch: max Opus frame (120 ms stereo) + // Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs. + let mut last_dropped = connector.frames_dropped(); + let mut last_kf_req: Option = None; let end: Option = loop { if stop.load(Ordering::SeqCst) { @@ -197,6 +200,22 @@ fn pump( Err(e) => break Some(format!("session: {e:?}")), } + // Loss recovery: under infinite GOP the only recovery keyframe is one we request. The + // reassembler drops unrecoverable AUs (frames_dropped); the decoder then conceals the + // reference-missing delta frames that follow and returns Ok, so keying off a decode error + // rarely fires. Request an IDR when the drop count climbs, throttled — the decode stays + // wedged for several frames until the IDR lands, so requesting every frame would flood. + let dropped = connector.frames_dropped(); + if dropped > last_dropped { + last_dropped = dropped; + let now = Instant::now(); + if last_kf_req.is_none_or(|t| now.duration_since(t) >= Duration::from_millis(100)) { + last_kf_req = Some(now); + let _ = connector.request_keyframe(); + tracing::debug!(dropped, "requested keyframe (loss recovery)"); + } + } + // Drain audio between frames (packets land every 5 ms; the queue holds 320 ms). while let Ok(pkt) = connector.next_audio(Duration::ZERO) { if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) { diff --git a/crates/punktfunk-client-windows/src/session.rs b/crates/punktfunk-client-windows/src/session.rs index c5c7b39..127e547 100644 --- a/crates/punktfunk-client-windows/src/session.rs +++ b/crates/punktfunk-client-windows/src/session.rs @@ -160,6 +160,9 @@ fn pump( let mut decode_us_sum = 0u64; let mut lat_us: Vec = Vec::with_capacity(256); let mut pcm = vec![0f32; 5760 * 2]; // decode scratch: max Opus frame (120 ms stereo) + // Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs. + let mut last_dropped = connector.frames_dropped(); + let mut last_kf_req: Option = None; let end: Option = loop { if stop.load(Ordering::SeqCst) { @@ -202,6 +205,21 @@ fn pump( Err(e) => break Some(format!("session: {e:?}")), } + // Loss recovery: under infinite GOP the only recovery keyframe is one we request. The + // reassembler drops unrecoverable AUs (frames_dropped); the decoder conceals the + // reference-missing delta frames that follow and returns Ok, so keying off a decode error + // rarely fires. Request an IDR when the drop count climbs, throttled. + let dropped = connector.frames_dropped(); + if dropped > last_dropped { + last_dropped = dropped; + let now = Instant::now(); + if last_kf_req.is_none_or(|t| now.duration_since(t) >= Duration::from_millis(100)) { + last_kf_req = Some(now); + let _ = connector.request_keyframe(); + tracing::debug!(dropped, "requested keyframe (loss recovery)"); + } + } + // Drain audio between frames (packets land every 5 ms; the queue holds 320 ms). while let Ok(pkt) = connector.next_audio(Duration::ZERO) { if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) { diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index da9450c..eac4afc 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -21,7 +21,7 @@ use crate::quic::{ }; use crate::session::{Frame, Session}; use crate::transport::UdpTransport; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -128,6 +128,11 @@ pub struct NativeClient { /// Speed-test accumulator, shared with the data-plane pump + control task. probe: Arc>, shutdown: Arc, + /// Cumulative count of access units the reassembler gave up on (FEC couldn't recover), mirrored + /// from the data-plane pump's `Session`. A client video loop watches this for increases to request + /// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss + /// yields reference-missing frames the decoder silently conceals (a decode-error trigger misses them). + frames_dropped: Arc, worker: Option>, /// The currently active session mode (the Welcome's, then updated by every accepted /// [`NativeClient::request_mode`]). @@ -208,11 +213,13 @@ impl NativeClient { let shutdown = Arc::new(AtomicBool::new(false)); let mode_slot = Arc::new(std::sync::Mutex::new(mode)); let probe = Arc::new(Mutex::new(ProbeState::default())); + let frames_dropped = Arc::new(AtomicU64::new(0)); let host = host.to_string(); let shutdown_w = shutdown.clone(); let mode_slot_w = mode_slot.clone(); let probe_w = probe.clone(); + let frames_dropped_w = frames_dropped.clone(); let worker = std::thread::Builder::new() .name("punktfunk-client".into()) .spawn(move || { @@ -253,6 +260,7 @@ impl NativeClient { shutdown: shutdown_w, mode_slot: mode_slot_w, probe: probe_w, + frames_dropped: frames_dropped_w, })); }) .map_err(PunktfunkError::Io)?; @@ -285,6 +293,7 @@ impl NativeClient { probe, shutdown, worker: Some(worker), + frames_dropped, mode: mode_slot, host_fingerprint: fingerprint, resolved_compositor, @@ -412,6 +421,15 @@ impl NativeClient { .map_err(|_| PunktfunkError::Closed) } + /// Cumulative access units the host→client reassembler dropped as unrecoverable (FEC couldn't + /// rebuild them). A video loop polls this and calls [`request_keyframe`](Self::request_keyframe) + /// when it increases — the correct loss trigger under infinite GOP, where unrecoverable loss + /// produces reference-missing delta frames the decoder silently conceals (so a decode-error + /// trigger would rarely fire). Monotonic for the session; compare against the last observed value. + pub fn frames_dropped(&self) -> u64 { + self.frames_dropped.load(Ordering::Relaxed) + } + /// Start a bandwidth speed test: ask the host to burst filler over the data plane at /// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the /// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its @@ -566,6 +584,7 @@ struct WorkerArgs { shutdown: Arc, mode_slot: Arc>, probe: Arc>, + frames_dropped: Arc, } /// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking @@ -593,6 +612,7 @@ async fn worker_main(args: WorkerArgs) { shutdown, mode_slot, probe, + frames_dropped, } = args; let setup = async { let remote: std::net::SocketAddr = format!("{host}:{port}") @@ -864,6 +884,10 @@ async fn worker_main(args: WorkerArgs) { let _ = tokio::task::spawn_blocking(move || { pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump while !pump_shutdown.load(Ordering::SeqCst) { + // Mirror the reassembler's unrecoverable-drop count for the client's keyframe-recovery + // loop. Updated every iteration (not just on a produced frame) so it stays current through + // a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads. + frames_dropped.store(session.stats().frames_dropped, Ordering::Relaxed); match session.poll_frame() { Ok(frame) => { if frame.flags & FLAG_PROBE as u32 != 0 {