feat(punktfunk/1): request-IDR recovery for a wedged client decode
apple / swift (push) Successful in 1m17s
ci / rust (push) Failing after 31s
ci / web (push) Failing after 42s
ci / docs-site (push) Failing after 40s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Failing after 10s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 5s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 6s
docker / deploy-docs (push) Has been skipped
rpm / build-publish (push) Failing after 15s
deb / build-publish (push) Failing after 43s
apple / swift (push) Successful in 1m17s
ci / rust (push) Failing after 31s
ci / web (push) Failing after 42s
ci / docs-site (push) Failing after 40s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Failing after 10s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 5s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 6s
docker / deploy-docs (push) Has been skipped
rpm / build-publish (push) Failing after 15s
deb / build-publish (push) Failing after 43s
Fixes the intermittent first-connect freeze. The host streams infinite GOP — one opening IDR, then P-frames only (recovery keyframes just on loss) — so when the client's decoder wedges on the cold first session (a lost/corrupt opening IDR, a bad early P-frame) the picture stays frozen until the far-off next keyframe. The client had no way to ask for one; now it does. Add a RequestKeyframe control message (client -> host, reliable control stream), mirroring Reconfigure: - core: quic.rs RequestKeyframe (type 0x03) + roundtrip test; client.rs CtrlRequest::Keyframe + NativeClient::request_keyframe; abi.rs punktfunk_connection_request_keyframe (header regenerated). - host: m3.rs decodes it in the control loop and signals the encode loop, which coalesces a burst and calls enc.request_keyframe() — wiring the existing NvencEncoder hook (force_kf -> next frame pict_type=I), the same recovery the GameStream path already had via force_idr. - apple: PunktfunkConnection.requestKeyframe(); StreamPump (stage-1) requests on layer.status==.failed; Stage2Pipeline (stage-2) on a sync submit failure and on the async decode-error callback via a thread-safe KeyframeRecovery. All throttled to <=1/250ms (the decode stays wedged for several frames until the IDR lands, so per-frame requests would flood the control stream). Self-healing: a lost recovery IDR is re-requested after the throttle; the host coalesces bursts into a single IDR. Validated: cargo fmt + clippy clean; core + host test suites green (incl. new request_keyframe_roundtrip); swift build + test (39 passed); xcframework rebuilt (all 5 slices), header regenerated with no unrelated drift. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -332,6 +332,21 @@ public final class PunktfunkConnection {
|
|||||||
_ = punktfunk_connection_request_mode(h, width, height, refreshHz)
|
_ = punktfunk_connection_request_mode(h, width, height, refreshHz)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ask the host's encoder to emit a fresh IDR keyframe now — recovery when the local
|
||||||
|
/// decoder has wedged. The host opens the infinite-GOP stream with one IDR and then sends
|
||||||
|
/// P-frames only, so a stalled decode (a lost/corrupt opening IDR, a bad early P-frame —
|
||||||
|
/// most likely on the cold first connect) would otherwise stay frozen until the next
|
||||||
|
/// loss-triggered recovery keyframe, which may be far off. Fire-and-forget; the recovered
|
||||||
|
/// keyframe is the only ack. THROTTLE at the call site — the decode stays wedged for
|
||||||
|
/// several frames until the IDR lands, so requesting every frame would flood the control
|
||||||
|
/// stream. Silently dropped after close.
|
||||||
|
public func requestKeyframe() {
|
||||||
|
abiLock.lock()
|
||||||
|
defer { abiLock.unlock() }
|
||||||
|
guard let h = handle, !closeRequested else { return }
|
||||||
|
_ = punktfunk_connection_request_keyframe(h)
|
||||||
|
}
|
||||||
|
|
||||||
/// The currently active session mode (updated by accepted `requestMode` switches).
|
/// The currently active session mode (updated by accepted `requestMode` switches).
|
||||||
public func currentMode() -> (width: UInt32, height: UInt32, refreshHz: UInt32) {
|
public func currentMode() -> (width: UInt32, height: UInt32, refreshHz: UInt32) {
|
||||||
abiLock.lock()
|
abiLock.lock()
|
||||||
|
|||||||
@@ -44,11 +44,36 @@ private final class PumpToken: @unchecked Sendable {
|
|||||||
func cancel() { lock.lock(); live = false; lock.unlock() }
|
func cancel() { lock.lock(); live = false; lock.unlock() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Throttled host keyframe requests for decode recovery. The decoder's async error callback
|
||||||
|
/// (a VT thread) and the pump thread (a submit failure) both signal a wedge; this coalesces
|
||||||
|
/// them so the control stream isn't flooded while the decode stays stalled for several frames
|
||||||
|
/// until the requested IDR lands. Bound to the live connection in `start`, unbound in `stop`.
|
||||||
|
private final class KeyframeRecovery: @unchecked Sendable {
|
||||||
|
private let lock = NSLock()
|
||||||
|
private var connection: PunktfunkConnection?
|
||||||
|
private var lastNs: UInt64 = 0
|
||||||
|
|
||||||
|
func bind(_ c: PunktfunkConnection?) {
|
||||||
|
lock.lock(); connection = c; lastNs = 0; lock.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func request() {
|
||||||
|
lock.lock()
|
||||||
|
let now = DispatchTime.now().uptimeNanoseconds
|
||||||
|
let due = lastNs == 0 || now &- lastNs > 250_000_000 // ≥ 250 ms since the last request
|
||||||
|
if due { lastNs = now }
|
||||||
|
let conn = due ? connection : nil
|
||||||
|
lock.unlock()
|
||||||
|
conn?.requestKeyframe()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public final class Stage2Pipeline {
|
public final class Stage2Pipeline {
|
||||||
private let ring = ReadyRing()
|
private let ring = ReadyRing()
|
||||||
private let presenter: MetalVideoPresenter
|
private let presenter: MetalVideoPresenter
|
||||||
private let decoder: VideoDecoder
|
private let decoder: VideoDecoder
|
||||||
private let presentMeter: LatencyMeter
|
private let presentMeter: LatencyMeter
|
||||||
|
private let recovery = KeyframeRecovery()
|
||||||
private var token = PumpToken()
|
private var token = PumpToken()
|
||||||
private var offsetNs: Int64 = 0
|
private var offsetNs: Int64 = 0
|
||||||
|
|
||||||
@@ -63,9 +88,13 @@ public final class Stage2Pipeline {
|
|||||||
self.presenter = presenter
|
self.presenter = presenter
|
||||||
self.presentMeter = presentMeter
|
self.presentMeter = presentMeter
|
||||||
let ring = ring
|
let ring = ring
|
||||||
|
let recovery = recovery
|
||||||
self.decoder = VideoDecoder(
|
self.decoder = VideoDecoder(
|
||||||
onDecoded: { ring.submit($0) },
|
onDecoded: { ring.submit($0) },
|
||||||
onDecodeError: { _ in /* the pump resets the session via reset() on the next IDR */ })
|
// Async decode failure (a bad P-frame referencing a lost/corrupt IDR): the pump
|
||||||
|
// resets to re-gate on the next IDR, and we ask the host to send one now (infinite
|
||||||
|
// GOP — it wouldn't otherwise come soon). Throttled in KeyframeRecovery.
|
||||||
|
onDecodeError: { _ in recovery.request() })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start pulling AUs into the decoder. `onFrame` fires per AU at receipt (capture→client
|
/// Start pulling AUs into the decoder. `onFrame` fires per AU at receipt (capture→client
|
||||||
@@ -77,9 +106,11 @@ public final class Stage2Pipeline {
|
|||||||
onSessionEnd: (@Sendable () -> Void)?
|
onSessionEnd: (@Sendable () -> Void)?
|
||||||
) {
|
) {
|
||||||
offsetNs = connection.clockOffsetNs
|
offsetNs = connection.clockOffsetNs
|
||||||
|
recovery.bind(connection) // arm host-keyframe recovery for this session
|
||||||
token = PumpToken() // fresh token per start — cancel is permanent (like StreamPump)
|
token = PumpToken() // fresh token per start — cancel is permanent (like StreamPump)
|
||||||
let token = token
|
let token = token
|
||||||
let decoder = decoder
|
let decoder = decoder
|
||||||
|
let recovery = recovery
|
||||||
let thread = Thread {
|
let thread = Thread {
|
||||||
var format: CMVideoFormatDescription?
|
var format: CMVideoFormatDescription?
|
||||||
while token.isLive {
|
while token.isLive {
|
||||||
@@ -92,8 +123,10 @@ public final class Stage2Pipeline {
|
|||||||
guard let f = format, token.isLive else { continue }
|
guard let f = format, token.isLive else { continue }
|
||||||
if !decoder.decode(au: au, format: f) {
|
if !decoder.decode(au: au, format: f) {
|
||||||
// Submit/decoder error: drop the session and re-gate on the next IDR's
|
// Submit/decoder error: drop the session and re-gate on the next IDR's
|
||||||
// in-band parameter sets (a delta frame can't recover) — stage-1's policy.
|
// in-band parameter sets (a delta frame can't recover) — stage-1's policy
|
||||||
|
// — and ask the host for that IDR now (infinite GOP; throttled).
|
||||||
decoder.reset()
|
decoder.reset()
|
||||||
|
recovery.request()
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
if token.isLive { onSessionEnd?() }
|
if token.isLive { onSessionEnd?() }
|
||||||
@@ -125,6 +158,7 @@ public final class Stage2Pipeline {
|
|||||||
public func stop() {
|
public func stop() {
|
||||||
token.cancel()
|
token.cancel()
|
||||||
decoder.reset()
|
decoder.reset()
|
||||||
|
recovery.bind(nil) // stop requesting keyframes once the session is torn down
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit { token.cancel() }
|
deinit { token.cancel() }
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ final class StreamPump {
|
|||||||
|
|
||||||
let thread = Thread {
|
let thread = Thread {
|
||||||
var format: CMVideoFormatDescription?
|
var format: CMVideoFormatDescription?
|
||||||
|
var lastKeyframeRequest = Date.distantPast
|
||||||
while token.isLive {
|
while token.isLive {
|
||||||
do {
|
do {
|
||||||
guard let au = try connection.nextAU(timeoutMs: 100) else { continue }
|
guard let au = try connection.nextAU(timeoutMs: 100) else { continue }
|
||||||
@@ -49,13 +50,19 @@ final class StreamPump {
|
|||||||
format = f // refreshed on every IDR (mode changes included)
|
format = f // refreshed on every IDR (mode changes included)
|
||||||
}
|
}
|
||||||
if layer.status == .failed {
|
if layer.status == .failed {
|
||||||
// Decode wedged: flush and re-gate on the next in-band parameter
|
// Decode wedged: flush and re-gate on the next in-band parameter sets
|
||||||
// sets — resuming with a delta frame can't recover. (A
|
// (resuming with a delta frame can't recover), AND ask the host for a
|
||||||
// request-IDR channel on punktfunk/1 is a host-side TODO; with the
|
// fresh IDR. With the host's infinite GOP the next keyframe could be
|
||||||
// host's infinite GOP this may otherwise stay black until the
|
// far off, so without the request the picture stays frozen — the
|
||||||
// next recovery keyframe.)
|
// intermittent first-connect freeze. Throttled: the layer stays .failed
|
||||||
|
// across several polls until the IDR lands, and one request suffices.
|
||||||
layer.flush()
|
layer.flush()
|
||||||
format = AnnexB.formatDescription(fromIDR: au.data)
|
format = AnnexB.formatDescription(fromIDR: au.data)
|
||||||
|
let now = Date()
|
||||||
|
if now.timeIntervalSince(lastKeyframeRequest) > 0.25 {
|
||||||
|
connection.requestKeyframe()
|
||||||
|
lastKeyframeRequest = now
|
||||||
|
}
|
||||||
}
|
}
|
||||||
guard let f = format,
|
guard let f = format,
|
||||||
let sample = AnnexB.sampleBuffer(au: au, format: f),
|
let sample = AnnexB.sampleBuffer(au: au, format: f),
|
||||||
|
|||||||
@@ -1383,6 +1383,32 @@ pub unsafe extern "C" fn punktfunk_connection_request_mode(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ask the host's encoder to emit a fresh IDR keyframe now — client recovery when the
|
||||||
|
/// decoder has stalled (the infinite-GOP stream sends one opening IDR then P-frames only, so
|
||||||
|
/// a wedged decoder would otherwise freeze until the next loss-triggered recovery keyframe).
|
||||||
|
/// Non-blocking, fire-and-forget; the recovered keyframe is the only ack. The caller should
|
||||||
|
/// THROTTLE — the decode stays wedged for several frames until the IDR lands, so requesting
|
||||||
|
/// every frame would flood the control stream.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
/// `c` is a valid connection handle.
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn punktfunk_connection_request_keyframe(
|
||||||
|
c: *const PunktfunkConnection,
|
||||||
|
) -> PunktfunkStatus {
|
||||||
|
guard(|| {
|
||||||
|
let c = match unsafe { c.as_ref() } {
|
||||||
|
Some(c) => c,
|
||||||
|
None => return PunktfunkStatus::NullPointer,
|
||||||
|
};
|
||||||
|
match c.inner.request_keyframe() {
|
||||||
|
Ok(()) => PunktfunkStatus::Ok,
|
||||||
|
Err(e) => e.status(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until
|
/// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until
|
||||||
/// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the
|
/// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the
|
||||||
/// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate.
|
/// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate.
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use crate::input::InputEvent;
|
|||||||
use crate::packet::FLAG_PROBE;
|
use crate::packet::FLAG_PROBE;
|
||||||
use crate::quic::{
|
use crate::quic::{
|
||||||
endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured,
|
endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured,
|
||||||
RichInput, Start, Welcome,
|
RequestKeyframe, RichInput, Start, Welcome,
|
||||||
};
|
};
|
||||||
use crate::session::{Frame, Session};
|
use crate::session::{Frame, Session};
|
||||||
use crate::transport::UdpTransport;
|
use crate::transport::UdpTransport;
|
||||||
@@ -32,6 +32,7 @@ use std::time::{Duration, Instant};
|
|||||||
enum CtrlRequest {
|
enum CtrlRequest {
|
||||||
Mode(Mode),
|
Mode(Mode),
|
||||||
Probe(ProbeRequest),
|
Probe(ProbeRequest),
|
||||||
|
Keyframe,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated
|
/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated
|
||||||
@@ -365,6 +366,16 @@ impl NativeClient {
|
|||||||
.map_err(|_| PunktfunkError::Closed)
|
.map_err(|_| PunktfunkError::Closed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ask the host's encoder to emit a fresh IDR keyframe now (client recovery on a stalled
|
||||||
|
/// decode). Non-blocking, fire-and-forget — the recovered keyframe is the only ack. The
|
||||||
|
/// caller should throttle (the decode stays wedged across several frames until the IDR
|
||||||
|
/// lands, so requesting on every frame would flood the control stream).
|
||||||
|
pub fn request_keyframe(&self) -> Result<()> {
|
||||||
|
self.ctrl_tx
|
||||||
|
.send(CtrlRequest::Keyframe)
|
||||||
|
.map_err(|_| PunktfunkError::Closed)
|
||||||
|
}
|
||||||
|
|
||||||
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
||||||
/// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the
|
/// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the
|
||||||
/// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its
|
/// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its
|
||||||
@@ -716,6 +727,7 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
let bytes = match req {
|
let bytes = match req {
|
||||||
CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(),
|
CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(),
|
||||||
CtrlRequest::Probe(p) => p.encode(),
|
CtrlRequest::Probe(p) => p.encode(),
|
||||||
|
CtrlRequest::Keyframe => RequestKeyframe.encode(),
|
||||||
};
|
};
|
||||||
if io::write_msg(&mut ctrl_send, &bytes).await.is_err() {
|
if io::write_msg(&mut ctrl_send, &bytes).await.is_err() {
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -126,6 +126,16 @@ pub struct Reconfigured {
|
|||||||
pub mode: Mode,
|
pub mode: Mode,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `client → host`, any time after [`Start`]: ask the host's encoder to emit a fresh IDR
|
||||||
|
/// keyframe NOW. The infinite-GOP stream opens with one IDR then sends P-frames only, so a
|
||||||
|
/// decoder that wedges (a lost/corrupt opening IDR, a bad early P-frame — most likely on the
|
||||||
|
/// cold first session) would otherwise stay frozen until the next loss-triggered recovery
|
||||||
|
/// keyframe, which may be far off. The client sends this when it detects a stalled decode;
|
||||||
|
/// the host forces the next frame to be an IDR with in-band parameter sets, recovering the
|
||||||
|
/// picture in ~one frame. Fire-and-forget — no reply (the recovered IDR is the ack).
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
pub struct RequestKeyframe;
|
||||||
|
|
||||||
/// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts
|
/// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts
|
||||||
/// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at
|
/// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at
|
||||||
/// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then
|
/// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then
|
||||||
@@ -195,6 +205,8 @@ pub fn clock_offset_ns(samples: &[(u64, u64, u64, u64)]) -> Option<(i64, u64)> {
|
|||||||
pub const MSG_RECONFIGURE: u8 = 0x01;
|
pub const MSG_RECONFIGURE: u8 = 0x01;
|
||||||
/// Type byte of [`Reconfigured`].
|
/// Type byte of [`Reconfigured`].
|
||||||
pub const MSG_RECONFIGURED: u8 = 0x02;
|
pub const MSG_RECONFIGURED: u8 = 0x02;
|
||||||
|
/// Type byte of [`RequestKeyframe`].
|
||||||
|
pub const MSG_REQUEST_KEYFRAME: u8 = 0x03;
|
||||||
/// Type byte of [`ProbeRequest`].
|
/// Type byte of [`ProbeRequest`].
|
||||||
pub const MSG_PROBE_REQUEST: u8 = 0x20;
|
pub const MSG_PROBE_REQUEST: u8 = 0x20;
|
||||||
/// Type byte of [`ProbeResult`].
|
/// Type byte of [`ProbeResult`].
|
||||||
@@ -699,6 +711,23 @@ impl Reconfigured {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RequestKeyframe {
|
||||||
|
pub fn encode(&self) -> Vec<u8> {
|
||||||
|
// magic[0..4] type[4] — no payload
|
||||||
|
let mut b = Vec::with_capacity(5);
|
||||||
|
b.extend_from_slice(CTL_MAGIC);
|
||||||
|
b.push(MSG_REQUEST_KEYFRAME);
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode(b: &[u8]) -> Result<RequestKeyframe> {
|
||||||
|
if b.len() != 5 || &b[0..4] != CTL_MAGIC || b[4] != MSG_REQUEST_KEYFRAME {
|
||||||
|
return Err(PunktfunkError::InvalidArg("bad RequestKeyframe"));
|
||||||
|
}
|
||||||
|
Ok(RequestKeyframe)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ProbeRequest {
|
impl ProbeRequest {
|
||||||
pub fn encode(&self) -> Vec<u8> {
|
pub fn encode(&self) -> Vec<u8> {
|
||||||
// magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13]
|
// magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13]
|
||||||
@@ -1660,6 +1689,22 @@ mod tests {
|
|||||||
.is_err());
|
.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn request_keyframe_roundtrip() {
|
||||||
|
let bytes = RequestKeyframe.encode();
|
||||||
|
assert!(RequestKeyframe::decode(&bytes).is_ok());
|
||||||
|
// Distinct from the other control messages — its type byte must not collide.
|
||||||
|
let mode = Mode {
|
||||||
|
width: 1280,
|
||||||
|
height: 720,
|
||||||
|
refresh_hz: 60,
|
||||||
|
};
|
||||||
|
assert!(RequestKeyframe::decode(&Reconfigure { mode }.encode()).is_err());
|
||||||
|
assert!(Reconfigure::decode(&bytes).is_err());
|
||||||
|
// Length is exact (no trailing bytes accepted).
|
||||||
|
assert!(RequestKeyframe::decode(&[bytes.as_slice(), &[0]].concat()).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn probe_messages_roundtrip() {
|
fn probe_messages_roundtrip() {
|
||||||
let req = ProbeRequest {
|
let req = ProbeRequest {
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ use punktfunk_core::input::{InputEvent, InputKind};
|
|||||||
use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF};
|
use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF};
|
||||||
use punktfunk_core::quic::{
|
use punktfunk_core::quic::{
|
||||||
endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult,
|
endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult,
|
||||||
ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
|
ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start, Welcome,
|
||||||
};
|
};
|
||||||
use punktfunk_core::transport::UdpTransport;
|
use punktfunk_core::transport::UdpTransport;
|
||||||
use punktfunk_core::Session;
|
use punktfunk_core::Session;
|
||||||
@@ -578,6 +578,7 @@ async fn serve_session(
|
|||||||
// hands back a ProbeResult that this task writes to the client. The two control directions
|
// hands back a ProbeResult that this task writes to the client. The two control directions
|
||||||
// (inbound requests, outbound probe results) are multiplexed with `select!`.
|
// (inbound requests, outbound probe results) are multiplexed with `select!`.
|
||||||
let (reconfig_tx, reconfig_rx) = std::sync::mpsc::channel::<punktfunk_core::Mode>();
|
let (reconfig_tx, reconfig_rx) = std::sync::mpsc::channel::<punktfunk_core::Mode>();
|
||||||
|
let (keyframe_tx, keyframe_rx) = std::sync::mpsc::channel::<()>();
|
||||||
let (probe_tx, probe_rx) = std::sync::mpsc::channel::<ProbeRequest>();
|
let (probe_tx, probe_rx) = std::sync::mpsc::channel::<ProbeRequest>();
|
||||||
let (probe_result_tx, mut probe_result_rx) =
|
let (probe_result_tx, mut probe_result_rx) =
|
||||||
tokio::sync::mpsc::unbounded_channel::<ProbeResult>();
|
tokio::sync::mpsc::unbounded_channel::<ProbeResult>();
|
||||||
@@ -608,6 +609,14 @@ async fn serve_session(
|
|||||||
if ok && reconfig_tx.send(req.mode).is_err() {
|
if ok && reconfig_tx.send(req.mode).is_err() {
|
||||||
break; // data plane gone
|
break; // data plane gone
|
||||||
}
|
}
|
||||||
|
} else if RequestKeyframe::decode(&msg).is_ok() {
|
||||||
|
// Client recovery: its decoder wedged — force the next encoded frame to
|
||||||
|
// be an IDR. Coalesced in the encode loop (a wedge fires several before
|
||||||
|
// the IDR lands); a send error just means the data plane is gone.
|
||||||
|
tracing::debug!("client requested keyframe (decode recovery)");
|
||||||
|
if keyframe_tx.send(()).is_err() {
|
||||||
|
break; // data plane gone
|
||||||
|
}
|
||||||
} else if let Ok(req) = ProbeRequest::decode(&msg) {
|
} else if let Ok(req) = ProbeRequest::decode(&msg) {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
target_kbps = req.target_kbps,
|
target_kbps = req.target_kbps,
|
||||||
@@ -782,6 +791,7 @@ async fn serve_session(
|
|||||||
seconds,
|
seconds,
|
||||||
stop_stream,
|
stop_stream,
|
||||||
&reconfig_rx,
|
&reconfig_rx,
|
||||||
|
&keyframe_rx,
|
||||||
compositor,
|
compositor,
|
||||||
bitrate_kbps,
|
bitrate_kbps,
|
||||||
probe_rx,
|
probe_rx,
|
||||||
@@ -1688,6 +1698,7 @@ fn virtual_stream(
|
|||||||
seconds: u32,
|
seconds: u32,
|
||||||
stop: Arc<AtomicBool>,
|
stop: Arc<AtomicBool>,
|
||||||
reconfig: &std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
reconfig: &std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
||||||
|
keyframe: &std::sync::mpsc::Receiver<()>,
|
||||||
compositor: crate::vdisplay::Compositor,
|
compositor: crate::vdisplay::Compositor,
|
||||||
bitrate_kbps: u32,
|
bitrate_kbps: u32,
|
||||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||||
@@ -1762,6 +1773,18 @@ fn virtual_stream(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Client recovery: it asked for a fresh IDR (its decoder wedged on the cold opening
|
||||||
|
// GOP). Coalesce the backlog — several requests fire before the IDR lands — and force
|
||||||
|
// the next encoded frame to be a keyframe. (A reconfig rebuild above already opens with
|
||||||
|
// an IDR, so this is for the steady-state wedge, not mode switches.)
|
||||||
|
let mut want_kf = false;
|
||||||
|
while keyframe.try_recv().is_ok() {
|
||||||
|
want_kf = true;
|
||||||
|
}
|
||||||
|
if want_kf {
|
||||||
|
tracing::debug!("forcing keyframe (client decode recovery)");
|
||||||
|
enc.request_keyframe();
|
||||||
|
}
|
||||||
if let Some(f) = capturer.try_latest().context("capture")? {
|
if let Some(f) = capturer.try_latest().context("capture")? {
|
||||||
frame = f;
|
frame = f;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -162,6 +162,11 @@
|
|||||||
#define MSG_RECONFIGURED 2
|
#define MSG_RECONFIGURED 2
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
|
// Type byte of [`RequestKeyframe`].
|
||||||
|
#define MSG_REQUEST_KEYFRAME 3
|
||||||
|
#endif
|
||||||
|
|
||||||
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
// Type byte of [`ProbeRequest`].
|
// Type byte of [`ProbeRequest`].
|
||||||
#define MSG_PROBE_REQUEST 32
|
#define MSG_PROBE_REQUEST 32
|
||||||
@@ -837,6 +842,19 @@ PunktfunkStatus punktfunk_connection_request_mode(const PunktfunkConnection *c,
|
|||||||
uint32_t refresh_hz);
|
uint32_t refresh_hz);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
|
// Ask the host's encoder to emit a fresh IDR keyframe now — client recovery when the
|
||||||
|
// decoder has stalled (the infinite-GOP stream sends one opening IDR then P-frames only, so
|
||||||
|
// a wedged decoder would otherwise freeze until the next loss-triggered recovery keyframe).
|
||||||
|
// Non-blocking, fire-and-forget; the recovered keyframe is the only ack. The caller should
|
||||||
|
// THROTTLE — the decode stays wedged for several frames until the IDR lands, so requesting
|
||||||
|
// every frame would flood the control stream.
|
||||||
|
//
|
||||||
|
// # Safety
|
||||||
|
// `c` is a valid connection handle.
|
||||||
|
PunktfunkStatus punktfunk_connection_request_keyframe(const PunktfunkConnection *c);
|
||||||
|
#endif
|
||||||
|
|
||||||
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
||||||
// `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 3 Gbps / ≤ 5 s),
|
// `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 3 Gbps / ≤ 5 s),
|
||||||
|
|||||||
Reference in New Issue
Block a user