diff --git a/clients/apple/Sources/PunktfunkKit/PunktfunkConnection.swift b/clients/apple/Sources/PunktfunkKit/PunktfunkConnection.swift index 0a4b919..ccb0265 100644 --- a/clients/apple/Sources/PunktfunkKit/PunktfunkConnection.swift +++ b/clients/apple/Sources/PunktfunkKit/PunktfunkConnection.swift @@ -332,6 +332,21 @@ public final class PunktfunkConnection { _ = punktfunk_connection_request_mode(h, width, height, refreshHz) } + /// Ask the host's encoder to emit a fresh IDR keyframe now — recovery when the local + /// decoder has wedged. The host opens the infinite-GOP stream with one IDR and then sends + /// P-frames only, so a stalled decode (a lost/corrupt opening IDR, a bad early P-frame — + /// most likely on the cold first connect) would otherwise stay frozen until the next + /// loss-triggered recovery keyframe, which may be far off. Fire-and-forget; the recovered + /// keyframe is the only ack. THROTTLE at the call site — the decode stays wedged for + /// several frames until the IDR lands, so requesting every frame would flood the control + /// stream. Silently dropped after close. + public func requestKeyframe() { + abiLock.lock() + defer { abiLock.unlock() } + guard let h = handle, !closeRequested else { return } + _ = punktfunk_connection_request_keyframe(h) + } + /// The currently active session mode (updated by accepted `requestMode` switches). public func currentMode() -> (width: UInt32, height: UInt32, refreshHz: UInt32) { abiLock.lock() diff --git a/clients/apple/Sources/PunktfunkKit/Stage2Pipeline.swift b/clients/apple/Sources/PunktfunkKit/Stage2Pipeline.swift index 6c8bc2f..32fdc74 100644 --- a/clients/apple/Sources/PunktfunkKit/Stage2Pipeline.swift +++ b/clients/apple/Sources/PunktfunkKit/Stage2Pipeline.swift @@ -44,11 +44,36 @@ private final class PumpToken: @unchecked Sendable { func cancel() { lock.lock(); live = false; lock.unlock() } } +/// Throttled host keyframe requests for decode recovery. The decoder's async error callback +/// (a VT thread) and the pump thread (a submit failure) both signal a wedge; this coalesces +/// them so the control stream isn't flooded while the decode stays stalled for several frames +/// until the requested IDR lands. Bound to the live connection in `start`, unbound in `stop`. +private final class KeyframeRecovery: @unchecked Sendable { + private let lock = NSLock() + private var connection: PunktfunkConnection? + private var lastNs: UInt64 = 0 + + func bind(_ c: PunktfunkConnection?) { + lock.lock(); connection = c; lastNs = 0; lock.unlock() + } + + func request() { + lock.lock() + let now = DispatchTime.now().uptimeNanoseconds + let due = lastNs == 0 || now &- lastNs > 250_000_000 // ≥ 250 ms since the last request + if due { lastNs = now } + let conn = due ? connection : nil + lock.unlock() + conn?.requestKeyframe() + } +} + public final class Stage2Pipeline { private let ring = ReadyRing() private let presenter: MetalVideoPresenter private let decoder: VideoDecoder private let presentMeter: LatencyMeter + private let recovery = KeyframeRecovery() private var token = PumpToken() private var offsetNs: Int64 = 0 @@ -63,9 +88,13 @@ public final class Stage2Pipeline { self.presenter = presenter self.presentMeter = presentMeter let ring = ring + let recovery = recovery self.decoder = VideoDecoder( onDecoded: { ring.submit($0) }, - onDecodeError: { _ in /* the pump resets the session via reset() on the next IDR */ }) + // Async decode failure (a bad P-frame referencing a lost/corrupt IDR): the pump + // resets to re-gate on the next IDR, and we ask the host to send one now (infinite + // GOP — it wouldn't otherwise come soon). Throttled in KeyframeRecovery. + onDecodeError: { _ in recovery.request() }) } /// Start pulling AUs into the decoder. `onFrame` fires per AU at receipt (capture→client @@ -77,9 +106,11 @@ public final class Stage2Pipeline { onSessionEnd: (@Sendable () -> Void)? ) { offsetNs = connection.clockOffsetNs + recovery.bind(connection) // arm host-keyframe recovery for this session token = PumpToken() // fresh token per start — cancel is permanent (like StreamPump) let token = token let decoder = decoder + let recovery = recovery let thread = Thread { var format: CMVideoFormatDescription? while token.isLive { @@ -92,8 +123,10 @@ public final class Stage2Pipeline { guard let f = format, token.isLive else { continue } if !decoder.decode(au: au, format: f) { // Submit/decoder error: drop the session and re-gate on the next IDR's - // in-band parameter sets (a delta frame can't recover) — stage-1's policy. + // in-band parameter sets (a delta frame can't recover) — stage-1's policy + // — and ask the host for that IDR now (infinite GOP; throttled). decoder.reset() + recovery.request() } } catch { if token.isLive { onSessionEnd?() } @@ -125,6 +158,7 @@ public final class Stage2Pipeline { public func stop() { token.cancel() decoder.reset() + recovery.bind(nil) // stop requesting keyframes once the session is torn down } deinit { token.cancel() } diff --git a/clients/apple/Sources/PunktfunkKit/StreamPump.swift b/clients/apple/Sources/PunktfunkKit/StreamPump.swift index fbcb7fa..5b15efd 100644 --- a/clients/apple/Sources/PunktfunkKit/StreamPump.swift +++ b/clients/apple/Sources/PunktfunkKit/StreamPump.swift @@ -41,6 +41,7 @@ final class StreamPump { let thread = Thread { var format: CMVideoFormatDescription? + var lastKeyframeRequest = Date.distantPast while token.isLive { do { guard let au = try connection.nextAU(timeoutMs: 100) else { continue } @@ -49,13 +50,19 @@ final class StreamPump { format = f // refreshed on every IDR (mode changes included) } if layer.status == .failed { - // Decode wedged: flush and re-gate on the next in-band parameter - // sets — resuming with a delta frame can't recover. (A - // request-IDR channel on punktfunk/1 is a host-side TODO; with the - // host's infinite GOP this may otherwise stay black until the - // next recovery keyframe.) + // Decode wedged: flush and re-gate on the next in-band parameter sets + // (resuming with a delta frame can't recover), AND ask the host for a + // fresh IDR. With the host's infinite GOP the next keyframe could be + // far off, so without the request the picture stays frozen — the + // intermittent first-connect freeze. Throttled: the layer stays .failed + // across several polls until the IDR lands, and one request suffices. layer.flush() format = AnnexB.formatDescription(fromIDR: au.data) + let now = Date() + if now.timeIntervalSince(lastKeyframeRequest) > 0.25 { + connection.requestKeyframe() + lastKeyframeRequest = now + } } guard let f = format, let sample = AnnexB.sampleBuffer(au: au, format: f), diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index f3d13d6..9deb6d7 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -1383,6 +1383,32 @@ pub unsafe extern "C" fn punktfunk_connection_request_mode( }) } +/// Ask the host's encoder to emit a fresh IDR keyframe now — client recovery when the +/// decoder has stalled (the infinite-GOP stream sends one opening IDR then P-frames only, so +/// a wedged decoder would otherwise freeze until the next loss-triggered recovery keyframe). +/// Non-blocking, fire-and-forget; the recovered keyframe is the only ack. The caller should +/// THROTTLE — the decode stays wedged for several frames until the IDR lands, so requesting +/// every frame would flood the control stream. +/// +/// # Safety +/// `c` is a valid connection handle. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_request_keyframe( + c: *const PunktfunkConnection, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + match c.inner.request_keyframe() { + Ok(()) => PunktfunkStatus::Ok, + Err(e) => e.status(), + } + }) +} + /// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until /// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the /// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate. diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index d86a808..e18d310 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -17,7 +17,7 @@ use crate::input::InputEvent; use crate::packet::FLAG_PROBE; use crate::quic::{ endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, - RichInput, Start, Welcome, + RequestKeyframe, RichInput, Start, Welcome, }; use crate::session::{Frame, Session}; use crate::transport::UdpTransport; @@ -32,6 +32,7 @@ use std::time::{Duration, Instant}; enum CtrlRequest { Mode(Mode), Probe(ProbeRequest), + Keyframe, } /// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated @@ -365,6 +366,16 @@ impl NativeClient { .map_err(|_| PunktfunkError::Closed) } + /// Ask the host's encoder to emit a fresh IDR keyframe now (client recovery on a stalled + /// decode). Non-blocking, fire-and-forget — the recovered keyframe is the only ack. The + /// caller should throttle (the decode stays wedged across several frames until the IDR + /// lands, so requesting on every frame would flood the control stream). + pub fn request_keyframe(&self) -> Result<()> { + self.ctrl_tx + .send(CtrlRequest::Keyframe) + .map_err(|_| PunktfunkError::Closed) + } + /// Start a bandwidth speed test: ask the host to burst filler over the data plane at /// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the /// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its @@ -716,6 +727,7 @@ async fn worker_main(args: WorkerArgs) { let bytes = match req { CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(), CtrlRequest::Probe(p) => p.encode(), + CtrlRequest::Keyframe => RequestKeyframe.encode(), }; if io::write_msg(&mut ctrl_send, &bytes).await.is_err() { break; diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 3e21f69..300247c 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -126,6 +126,16 @@ pub struct Reconfigured { pub mode: Mode, } +/// `client → host`, any time after [`Start`]: ask the host's encoder to emit a fresh IDR +/// keyframe NOW. The infinite-GOP stream opens with one IDR then sends P-frames only, so a +/// decoder that wedges (a lost/corrupt opening IDR, a bad early P-frame — most likely on the +/// cold first session) would otherwise stay frozen until the next loss-triggered recovery +/// keyframe, which may be far off. The client sends this when it detects a stalled decode; +/// the host forces the next frame to be an IDR with in-band parameter sets, recovering the +/// picture in ~one frame. Fire-and-forget — no reply (the recovered IDR is the ack). +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct RequestKeyframe; + /// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts /// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at /// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then @@ -195,6 +205,8 @@ pub fn clock_offset_ns(samples: &[(u64, u64, u64, u64)]) -> Option<(i64, u64)> { pub const MSG_RECONFIGURE: u8 = 0x01; /// Type byte of [`Reconfigured`]. pub const MSG_RECONFIGURED: u8 = 0x02; +/// Type byte of [`RequestKeyframe`]. +pub const MSG_REQUEST_KEYFRAME: u8 = 0x03; /// Type byte of [`ProbeRequest`]. pub const MSG_PROBE_REQUEST: u8 = 0x20; /// Type byte of [`ProbeResult`]. @@ -699,6 +711,23 @@ impl Reconfigured { } } +impl RequestKeyframe { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] — no payload + let mut b = Vec::with_capacity(5); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_REQUEST_KEYFRAME); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 5 || &b[0..4] != CTL_MAGIC || b[4] != MSG_REQUEST_KEYFRAME { + return Err(PunktfunkError::InvalidArg("bad RequestKeyframe")); + } + Ok(RequestKeyframe) + } +} + impl ProbeRequest { pub fn encode(&self) -> Vec { // magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13] @@ -1660,6 +1689,22 @@ mod tests { .is_err()); } + #[test] + fn request_keyframe_roundtrip() { + let bytes = RequestKeyframe.encode(); + assert!(RequestKeyframe::decode(&bytes).is_ok()); + // Distinct from the other control messages — its type byte must not collide. + let mode = Mode { + width: 1280, + height: 720, + refresh_hz: 60, + }; + assert!(RequestKeyframe::decode(&Reconfigure { mode }.encode()).is_err()); + assert!(Reconfigure::decode(&bytes).is_err()); + // Length is exact (no trailing bytes accepted). + assert!(RequestKeyframe::decode(&[bytes.as_slice(), &[0]].concat()).is_err()); + } + #[test] fn probe_messages_roundtrip() { let req = ProbeRequest { diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 37e2a28..652c0da 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -28,7 +28,7 @@ use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF}; use punktfunk_core::quic::{ endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult, - ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, + ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::Session; @@ -578,6 +578,7 @@ async fn serve_session( // hands back a ProbeResult that this task writes to the client. The two control directions // (inbound requests, outbound probe results) are multiplexed with `select!`. let (reconfig_tx, reconfig_rx) = std::sync::mpsc::channel::(); + let (keyframe_tx, keyframe_rx) = std::sync::mpsc::channel::<()>(); let (probe_tx, probe_rx) = std::sync::mpsc::channel::(); let (probe_result_tx, mut probe_result_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -608,6 +609,14 @@ async fn serve_session( if ok && reconfig_tx.send(req.mode).is_err() { break; // data plane gone } + } else if RequestKeyframe::decode(&msg).is_ok() { + // Client recovery: its decoder wedged — force the next encoded frame to + // be an IDR. Coalesced in the encode loop (a wedge fires several before + // the IDR lands); a send error just means the data plane is gone. + tracing::debug!("client requested keyframe (decode recovery)"); + if keyframe_tx.send(()).is_err() { + break; // data plane gone + } } else if let Ok(req) = ProbeRequest::decode(&msg) { tracing::info!( target_kbps = req.target_kbps, @@ -782,6 +791,7 @@ async fn serve_session( seconds, stop_stream, &reconfig_rx, + &keyframe_rx, compositor, bitrate_kbps, probe_rx, @@ -1688,6 +1698,7 @@ fn virtual_stream( seconds: u32, stop: Arc, reconfig: &std::sync::mpsc::Receiver, + keyframe: &std::sync::mpsc::Receiver<()>, compositor: crate::vdisplay::Compositor, bitrate_kbps: u32, probe_rx: std::sync::mpsc::Receiver, @@ -1762,6 +1773,18 @@ fn virtual_stream( } } } + // Client recovery: it asked for a fresh IDR (its decoder wedged on the cold opening + // GOP). Coalesce the backlog — several requests fire before the IDR lands — and force + // the next encoded frame to be a keyframe. (A reconfig rebuild above already opens with + // an IDR, so this is for the steady-state wedge, not mode switches.) + let mut want_kf = false; + while keyframe.try_recv().is_ok() { + want_kf = true; + } + if want_kf { + tracing::debug!("forcing keyframe (client decode recovery)"); + enc.request_keyframe(); + } if let Some(f) = capturer.try_latest().context("capture")? { frame = f; } diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index 54e773a..cef501e 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -162,6 +162,11 @@ #define MSG_RECONFIGURED 2 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`RequestKeyframe`]. +#define MSG_REQUEST_KEYFRAME 3 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Type byte of [`ProbeRequest`]. #define MSG_PROBE_REQUEST 32 @@ -837,6 +842,19 @@ PunktfunkStatus punktfunk_connection_request_mode(const PunktfunkConnection *c, uint32_t refresh_hz); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Ask the host's encoder to emit a fresh IDR keyframe now — client recovery when the +// decoder has stalled (the infinite-GOP stream sends one opening IDR then P-frames only, so +// a wedged decoder would otherwise freeze until the next loss-triggered recovery keyframe). +// Non-blocking, fire-and-forget; the recovered keyframe is the only ack. The caller should +// THROTTLE — the decode stays wedged for several frames until the IDR lands, so requesting +// every frame would flood the control stream. +// +// # Safety +// `c` is a valid connection handle. +PunktfunkStatus punktfunk_connection_request_keyframe(const PunktfunkConnection *c); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Start a bandwidth speed test: ask the host to burst filler over the data plane at // `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 3 Gbps / ≤ 5 s),