diff --git a/clients/probe/src/main.rs b/clients/probe/src/main.rs index 6687d65..8b1dabd 100644 --- a/clients/probe/src/main.rs +++ b/clients/probe/src/main.rs @@ -45,7 +45,8 @@ use punktfunk_core::config::Role; use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::FLAG_PROBE; use punktfunk_core::quic::{ - endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, + endpoint, io, window_loss_ppm, Hello, LossReport, ProbeRequest, ProbeResult, Reconfigure, + Reconfigured, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; @@ -438,6 +439,10 @@ async fn session(args: Args) -> Result<()> { // wire packet (graceful past the FEC budget), not just fully-reassembled probe AUs. let rx_wire_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let rx_wire_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + // Adaptive-FEC loss feedback: the data loop publishes a windowed loss estimate here; in normal + // stream mode (no speed test / remode) a control-stream task relays it to the host as a + // LossReport so it can size FEC to the link. u32::MAX = "no fresh sample this window". + let loss_ppm = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(u32::MAX)); // Mid-stream renegotiation test: after a delay, ask the host to switch modes on the // still-open control stream. The stream then carries new-mode AUs (IDR + in-band @@ -548,6 +553,26 @@ async fn session(args: Args) -> Result<()> { "SPEED TEST complete", ); }); + } else { + // Normal stream mode: relay the data loop's windowed loss estimate to the host as periodic + // LossReports, so it can size FEC to the link (adaptive FEC). The control stream is otherwise + // idle here (remode/speed-test own it in their modes). + let mut ls = send; + let lp = loss_ppm.clone(); + tokio::spawn(async move { + use std::sync::atomic::Ordering::Relaxed; + loop { + tokio::time::sleep(std::time::Duration::from_millis(750)).await; + let v = lp.swap(u32::MAX, Relaxed); + if v != u32::MAX + && io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode()) + .await + .is_err() + { + break; // control stream gone + } + } + }); } // Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the @@ -823,6 +848,7 @@ async fn session(args: Args) -> Result<()> { let expected = welcome.frames; let out_path = args.out.clone(); let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone()); + let lp_dt = loss_ppm.clone(); // Express our receive time in the host clock before differencing against the host-stamped // capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency @@ -857,13 +883,31 @@ async fn session(args: Args) -> Result<()> { let mut latencies_us: Vec = Vec::new(); let mut last_rx = std::time::Instant::now(); let started = std::time::Instant::now(); + // Adaptive-FEC loss window: publish a fresh estimate every 750 ms for the LossReport task. + let mut last_loss_report = std::time::Instant::now(); + let (mut last_recovered, mut last_received, mut last_dropped) = (0u64, 0u64, 0u64); loop { - // Mirror packet-level receive counters for the speed-test reporter (reads their delta). + // Mirror packet-level receive counters for the speed-test reporter (reads their delta), + // and publish a windowed loss estimate for the adaptive-FEC LossReport task. { use std::sync::atomic::Ordering::Relaxed; let s = session.stats(); rxp_dt.store(s.packets_received, Relaxed); rxb_dt.store(s.bytes_received, Relaxed); + if last_loss_report.elapsed() >= std::time::Duration::from_millis(750) { + lp_dt.store( + window_loss_ppm( + s.fec_recovered_shards.wrapping_sub(last_recovered), + s.packets_received.wrapping_sub(last_received), + s.frames_dropped.wrapping_sub(last_dropped), + ), + Relaxed, + ); + last_loss_report = std::time::Instant::now(); + last_recovered = s.fec_recovered_shards; + last_received = s.packets_received; + last_dropped = s.frames_dropped; + } } if expected > 0 && ok + mismatched >= expected { break; diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 0fee134..971b1bf 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -16,15 +16,15 @@ use crate::error::{PunktfunkError, Result}; use crate::input::InputEvent; use crate::packet::FLAG_PROBE; use crate::quic::{ - endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, - RequestKeyframe, RichInput, Start, Welcome, + endpoint, io, window_loss_ppm, Hello, HidOutput, LossReport, ProbeRequest, ProbeResult, + Reconfigure, Reconfigured, RequestKeyframe, RichInput, Start, Welcome, }; use crate::session::{Frame, Session}; use crate::transport::UdpTransport; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; /// A control-stream request the embedder makes on the open handshake stream: a mode switch or a /// speed test. One outbound channel carries both so the worker's `select!` has a single writer @@ -33,6 +33,7 @@ enum CtrlRequest { Mode(Mode), Probe(ProbeRequest), Keyframe, + Loss(LossReport), } /// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated @@ -245,6 +246,7 @@ impl NativeClient { let mode_slot_w = mode_slot.clone(); let probe_w = probe.clone(); let frames_dropped_w = frames_dropped.clone(); + let ctrl_tx_pump = ctrl_tx.clone(); // the data-plane pump sends adaptive-FEC LossReports let worker = std::thread::Builder::new() .name("punktfunk-client".into()) .spawn(move || { @@ -282,6 +284,7 @@ impl NativeClient { mic_rx, rich_input_rx, ctrl_rx, + ctrl_tx: ctrl_tx_pump, ready_tx, shutdown: shutdown_w, mode_slot: mode_slot_w, @@ -629,6 +632,7 @@ struct WorkerArgs { mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec)>, rich_input_rx: tokio::sync::mpsc::UnboundedReceiver, ctrl_rx: tokio::sync::mpsc::UnboundedReceiver, + ctrl_tx: tokio::sync::mpsc::UnboundedSender, ready_tx: std::sync::mpsc::Sender>, shutdown: Arc, mode_slot: Arc>, @@ -658,6 +662,7 @@ async fn worker_main(args: WorkerArgs) { mut mic_rx, mut rich_input_rx, mut ctrl_rx, + ctrl_tx, ready_tx, shutdown, mode_slot, @@ -851,6 +856,7 @@ async fn worker_main(args: WorkerArgs) { CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(), CtrlRequest::Probe(p) => p.encode(), CtrlRequest::Keyframe => RequestKeyframe.encode(), + CtrlRequest::Loss(r) => r.encode(), }; if io::write_msg(&mut ctrl_send, &bytes).await.is_err() { break; @@ -944,6 +950,12 @@ async fn worker_main(args: WorkerArgs) { let pump_probe = probe.clone(); let _ = tokio::task::spawn_blocking(move || { pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump + // Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the + // window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can + // size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it). + const ADAPT_REPORT_INTERVAL: Duration = Duration::from_millis(750); + let mut last_report = Instant::now(); + let (mut last_recovered, mut last_received, mut last_dropped) = (0u64, 0u64, 0u64); while !pump_shutdown.load(Ordering::SeqCst) { // Mirror the reassembler's unrecoverable-drop count for the client's keyframe-recovery // loop, and (during a speed test) the packet-level receive counters for the throughput @@ -951,7 +963,7 @@ async fn worker_main(args: WorkerArgs) { // through a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads. let st = session.stats(); frames_dropped.store(st.frames_dropped, Ordering::Relaxed); - { + let probe_active = { let mut p = pump_probe.lock().unwrap(); if p.active && !p.done { p.rx_packets_now = st.packets_received; @@ -959,6 +971,19 @@ async fn worker_main(args: WorkerArgs) { p.base_packets.get_or_insert(st.packets_received); p.base_bytes.get_or_insert(st.bytes_received); } + p.active && !p.done + }; + if !probe_active && last_report.elapsed() >= ADAPT_REPORT_INTERVAL { + let loss_ppm = window_loss_ppm( + st.fec_recovered_shards.wrapping_sub(last_recovered), + st.packets_received.wrapping_sub(last_received), + st.frames_dropped.wrapping_sub(last_dropped), + ); + let _ = ctrl_tx.send(CtrlRequest::Loss(LossReport { loss_ppm })); + last_report = Instant::now(); + last_recovered = st.fec_recovered_shards; + last_received = st.packets_received; + last_dropped = st.frames_dropped; } match session.poll_frame() { Ok(frame) => { diff --git a/crates/punktfunk-core/src/packet.rs b/crates/punktfunk-core/src/packet.rs index 315179d..3d02e15 100644 --- a/crates/punktfunk-core/src/packet.rs +++ b/crates/punktfunk-core/src/packet.rs @@ -96,6 +96,18 @@ impl Packetizer { } } + /// Live-adjust the FEC recovery percentage (adaptive FEC). Takes effect on the next + /// [`packetize`](Self::packetize); the wire is self-describing (each packet carries its block's + /// data/recovery counts), so the receiver needs no notification. Clamped to ≤ 90. + pub fn set_fec_percent(&mut self, pct: u8) { + self.fec.fec_percent = pct.min(90); + } + + /// The current FEC recovery percentage. + pub fn fec_percent(&self) -> u8 { + self.fec.fec_percent + } + /// Packetize one access unit into wire packets (header + shard payload each). pub fn packetize( &mut self, diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index e2bb38a..4da9015 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -167,6 +167,18 @@ pub struct Reconfigured { #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct RequestKeyframe; +/// `client → host`, periodic: the client's observed data-plane loss, so the host can size FEC to +/// the link instead of a flat percentage (adaptive FEC). `loss_ppm` is parts-per-million of shards +/// that arrived missing-but-recovered (plus a bump when frames went unrecoverable) over the report +/// window — i.e. the loss FEC is currently absorbing. The host maps it to a recovery percentage, +/// clamped to a sane band, and applies it live; a clean link decays toward the floor (fewer packets, +/// which directly helps a packet-rate-bound uplink like the Steam Deck's WiFi tx). Fire-and-forget. +/// A host that predates this ignores it (unknown control message) and keeps its static FEC. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct LossReport { + pub loss_ppm: u32, +} + /// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts /// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at /// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then @@ -251,6 +263,8 @@ pub const MSG_RECONFIGURE: u8 = 0x01; pub const MSG_RECONFIGURED: u8 = 0x02; /// Type byte of [`RequestKeyframe`]. pub const MSG_REQUEST_KEYFRAME: u8 = 0x03; +/// Type byte of [`LossReport`]. +pub const MSG_LOSS_REPORT: u8 = 0x04; /// Type byte of [`ProbeRequest`]. pub const MSG_PROBE_REQUEST: u8 = 0x20; /// Type byte of [`ProbeResult`]. @@ -821,6 +835,43 @@ impl RequestKeyframe { } } +impl LossReport { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] loss_ppm[5..9] + let mut b = Vec::with_capacity(9); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_LOSS_REPORT); + b.extend_from_slice(&self.loss_ppm.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 9 || &b[0..4] != CTL_MAGIC || b[4] != MSG_LOSS_REPORT { + return Err(PunktfunkError::InvalidArg("bad LossReport")); + } + Ok(LossReport { + loss_ppm: u32::from_le_bytes(b[5..9].try_into().unwrap()), + }) + } +} + +/// Compute a [`LossReport`] `loss_ppm` from one window's session-stat deltas: shards FEC recovered +/// (the loss it absorbed), shards received, and frames that went unrecoverable. Loss ≈ recovered / +/// (received + recovered) — the fraction of shards that arrived missing. A frame drop means loss +/// exceeded the current FEC budget (so `recovered` plateaus), so add a fixed bump to push the host's +/// FEC up past the cap on the next adjustment. Returns parts-per-million, capped at 1e6. +pub fn window_loss_ppm(recovered: u64, received: u64, frames_dropped: u64) -> u32 { + let denom = received.saturating_add(recovered); + let mut ppm = recovered + .saturating_mul(1_000_000) + .checked_div(denom) + .unwrap_or(0) as u32; + if frames_dropped > 0 { + ppm = ppm.saturating_add(50_000); // +5%: unrecoverable loss → raise FEC past the current cap + } + ppm.min(1_000_000) +} + impl ProbeRequest { pub fn encode(&self) -> Vec { // magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13] @@ -1877,6 +1928,35 @@ mod tests { assert!(RequestKeyframe::decode(&[bytes.as_slice(), &[0]].concat()).is_err()); } + #[test] + fn loss_report_roundtrip() { + for loss_ppm in [0u32, 1, 12_345, 50_000, 1_000_000] { + let r = LossReport { loss_ppm }; + assert_eq!(LossReport::decode(&r.encode()).unwrap(), r); + } + // Disjoint from the other control messages (type byte + length). + assert!(LossReport::decode(&RequestKeyframe.encode()).is_err()); + assert!(RequestKeyframe::decode(&LossReport { loss_ppm: 0 }.encode()).is_err()); + assert!(LossReport::decode( + &[LossReport { loss_ppm: 0 }.encode().as_slice(), &[0]].concat() + ) + .is_err()); + } + + #[test] + fn window_loss_ppm_estimates_and_caps() { + // No traffic → 0. A clean window (nothing recovered) → 0. + assert_eq!(window_loss_ppm(0, 0, 0), 0); + assert_eq!(window_loss_ppm(0, 1000, 0), 0); + // 50 recovered of 1000 total (950 received + 50 recovered) = 5%. + assert_eq!(window_loss_ppm(50, 950, 0), 50_000); + // An unrecoverable frame adds the +5% bump (push FEC past the current cap). + assert_eq!(window_loss_ppm(50, 950, 1), 100_000); + // A total-loss window with a drop but nothing received still reports the bump, capped at 1e6. + assert_eq!(window_loss_ppm(0, 0, 3), 50_000); + assert!(window_loss_ppm(u64::MAX, 1, 9) <= 1_000_000); + } + #[test] fn probe_messages_roundtrip() { let req = ProbeRequest { diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index ea29b7e..450f25c 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -201,6 +201,18 @@ impl Session { r.map(|_| ()) } + /// Host: live-adjust the FEC recovery percentage (adaptive FEC). Affects the next + /// [`submit_frame`](Self::submit_frame)/[`seal_frame`](Self::seal_frame); the receiver needs no + /// notification (each packet's header carries its block's data/recovery shard counts). + pub fn set_fec_percent(&mut self, pct: u8) { + self.packetizer.set_fec_percent(pct); + } + + /// The current FEC recovery percentage (host side). + pub fn fec_percent(&self) -> u8 { + self.packetizer.fec_percent() + } + /// Host: drain one pending input event from the client, if any. pub fn poll_input(&mut self) -> Result> { if self.config.role != Role::Host { diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index 2eceebe..a0d13f6 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -27,13 +27,14 @@ use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref, use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF}; use punktfunk_core::quic::{ - endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult, - ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start, Welcome, + endpoint, io, ClockEcho, ClockProbe, Hello, LossReport, PairChallenge, PairProof, PairRequest, + PairResult, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start, + Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::Session; use rand::RngCore; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::Arc; #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -334,15 +335,41 @@ fn resolve_bitrate_kbps(requested: u32) -> u32 { } } -/// FEC recovery percent for the session's Welcome. Default 20% (Sunshine's default too); a clean -/// wired LAN can lower it (every recovery shard is wire bytes + packets), so `PUNKTFUNK_FEC_PCT` -/// overrides it — e.g. `0` disables FEC entirely, `10` halves the overhead. Clamped to ≤ 90. -fn fec_percent_from_env() -> u8 { +/// Static FEC override: `PUNKTFUNK_FEC_PCT`, when set, PINS the recovery percent and DISABLES +/// adaptive FEC — so a speed test / measurement keeps a fixed, known overhead. `None` ⇒ adaptive +/// FEC (the host sizes recovery to the loss the client reports). `0` disables FEC entirely. +/// Clamped to ≤ 90. +fn fec_static_override() -> Option { std::env::var("PUNKTFUNK_FEC_PCT") .ok() .and_then(|s| s.trim().parse::().ok()) .map(|p| p.min(90)) - .unwrap_or(20) +} + +/// Adaptive-FEC band + starting point. Every recovery shard is extra wire bytes AND an extra +/// packet, so on a clean link FEC decays toward [`FEC_MIN`] (fewer packets — the win for a +/// packet-rate-bound uplink like the Steam Deck's WiFi tx); loss ramps it toward [`FEC_MAX`]. +/// Sessions start moderate so the first frames (before any loss report) are protected. +const FEC_MIN: u8 = 1; +const FEC_MAX: u8 = 50; +const FEC_ADAPTIVE_START: u8 = 10; + +/// Map the client's reported data-plane loss (ppm of shards, see [`LossReport`]) to a recovery +/// percentage. FEC must EXCEED the loss rate to recover a block, so target ≈ loss × 1.4 + 1 pt of +/// margin, clamped to the band. A clean link (≈0 ppm) lands on [`FEC_MIN`]. +fn adapt_fec(loss_ppm: u32) -> u8 { + let loss_pct = loss_ppm as f64 / 10_000.0; // ppm → percent + let target = (loss_pct * 1.4).ceil() as u32 + 1; + target.clamp(FEC_MIN as u32, FEC_MAX as u32) as u8 +} + +/// Apply the latest adaptive-FEC target to the session if it changed (cheap relaxed load + compare), +/// called once per frame on the data-plane send path. +fn apply_fec_target(session: &mut Session, fec_target: &AtomicU8) { + let t = fec_target.load(Ordering::Relaxed); + if session.fec_percent() != t { + session.set_fec_percent(t); + } } /// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream @@ -588,7 +615,9 @@ async fn serve_session( // The post-GameStream point of punktfunk/1: Leopard GF(2¹⁶) FEC + real encryption. fec: FecConfig { scheme: FecScheme::Gf16, - fec_percent: fec_percent_from_env(), + // Static override pins it; otherwise sessions start at the adaptive midpoint and the + // host re-sizes FEC live from the client's LossReports (adaptive FEC). + fec_percent: fec_static_override().unwrap_or(FEC_ADAPTIVE_START), max_data_per_block: 4096, }, // ~1452-byte payload keeps the IP datagram within a 1500 MTU (1452 + 40 header + 24 @@ -644,6 +673,12 @@ async fn serve_session( let (probe_tx, probe_rx) = std::sync::mpsc::channel::(); let (probe_result_tx, mut probe_result_rx) = tokio::sync::mpsc::unbounded_channel::(); + // Adaptive FEC: the control task maps each client LossReport to a recovery percent and publishes + // it here; the data-plane send loop reads + applies it per frame. Disabled (pinned) when + // PUNKTFUNK_FEC_PCT is set. Seeded with the session's starting FEC so it's a no-op until a report. + let adaptive_fec = fec_static_override().is_none(); + let fec_target = Arc::new(AtomicU8::new(welcome.fec.fec_percent)); + let fec_target_ctl = fec_target.clone(); tokio::spawn(async move { let mut active = hello.mode; loop { @@ -679,6 +714,22 @@ async fn serve_session( if keyframe_tx.send(()).is_err() { break; // data plane gone } + } else if let Ok(rep) = LossReport::decode(&msg) { + // Adaptive FEC: size recovery to the loss the client is seeing. The data-plane + // send loop reads `fec_target_ctl` and applies it per frame. Ignored when FEC + // is pinned via PUNKTFUNK_FEC_PCT. + if adaptive_fec { + let target = adapt_fec(rep.loss_ppm); + let prev = fec_target_ctl.swap(target, Ordering::Relaxed); + if prev != target { + tracing::info!( + loss_ppm = rep.loss_ppm, + fec_pct = target, + prev_fec_pct = prev, + "adaptive FEC adjusted" + ); + } + } } else if let Ok(req) = ProbeRequest::decode(&msg) { tracing::info!( target_kbps = req.target_kbps, @@ -830,6 +881,7 @@ async fn serve_session( let bitrate_kbps = welcome.bitrate_kbps; // resolved encoder bitrate (Hello clamped, or default) let bit_depth = welcome.bit_depth; // resolved encode bit depth (8, or 10 when negotiated) let stop_stream = stop.clone(); + let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target let result: Result<()> = async { tokio::task::spawn_blocking(move || -> Result<()> { // Wait briefly for the client to hole-punch our data port, then stream to its OBSERVED @@ -865,6 +917,7 @@ async fn serve_session( &stop_stream, &probe_rx, &probe_result_tx, + &fec_target_dp, ), Punktfunk1Source::Virtual => { let compositor = compositor @@ -881,6 +934,7 @@ async fn serve_session( bit_depth, probe_rx, probe_result_tx, + fec_target_dp, ) } } @@ -1498,12 +1552,14 @@ fn synthetic_stream( stop: &AtomicBool, probe_rx: &std::sync::mpsc::Receiver, probe_result_tx: &tokio::sync::mpsc::UnboundedSender, + fec_target: &AtomicU8, ) -> Result<()> { let interval = std::time::Duration::from_millis(1000 / 60); for idx in 0..frames { if stop.load(Ordering::SeqCst) { break; } + apply_fec_target(session, fec_target); // Service speed-test probes between synthetic frames (loopback bandwidth tests). service_probes(session, stop, probe_rx, probe_result_tx); let data = test_frame(idx, 64 * 1024); @@ -1906,6 +1962,7 @@ pub(crate) fn boost_thread_priority(critical: bool) { } } +#[allow(clippy::too_many_arguments)] fn send_loop( mut session: Session, frame_rx: std::sync::mpsc::Receiver, @@ -1914,6 +1971,7 @@ fn send_loop( stop: Arc, perf: bool, burst_cap: usize, + fec_target: Arc, ) { boost_thread_priority(false); // transmit thread: above-normal (Apollo's encoder-thread level) let mut last_perf = std::time::Instant::now(); @@ -1929,6 +1987,8 @@ fn send_loop( // Probes run here (they need the Session); a burst pauses video — the encode thread blocks // on the full frame channel meanwhile, which is exactly the intended pause. service_probes(&mut session, &stop, &probe_rx, &probe_result_tx); + // Adaptive FEC: pick up any new recovery target the control task set from client LossReports. + apply_fec_target(&mut session, &fec_target); // Short timeout so we keep re-checking `stop` + probes when no frames are flowing. match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) { Ok(msg) => match paced_submit( @@ -2073,6 +2133,7 @@ fn virtual_stream( bit_depth: u8, probe_rx: std::sync::mpsc::Receiver, probe_result_tx: tokio::sync::mpsc::UnboundedSender, + fec_target: Arc, ) -> Result<()> { // This thread runs the capture+encode loop (single-process: Linux / synthetic / NO_WGC DDA) — or // tail-calls the relay below. Elevate it so a CPU-heavy game can't deschedule our GPU submission. @@ -2095,6 +2156,7 @@ fn virtual_stream( bit_depth, probe_rx, probe_result_tx, + fec_target, ); } tracing::info!( @@ -2149,6 +2211,7 @@ fn virtual_stream( stop, perf, burst_cap, + fec_target, ) } }) @@ -2397,6 +2460,7 @@ fn virtual_stream_relay( bit_depth: u8, probe_rx: std::sync::mpsc::Receiver, probe_result_tx: tokio::sync::mpsc::UnboundedSender, + fec_target: Arc, ) -> Result<()> { use crate::capture::dxgi::WinCaptureTarget; use crate::capture::wgc_relay::HelperRelay; @@ -2522,6 +2586,7 @@ fn virtual_stream_relay( stop, perf, burst_cap, + fec_target, ) } }) @@ -2919,6 +2984,20 @@ fn build_pipeline( mod tests { use super::*; + #[test] + fn adapt_fec_maps_loss_to_recovery_band() { + // A perfectly clean window (0 loss) lands on the floor. + assert_eq!(adapt_fec(0), FEC_MIN); + // Any nonzero loss rounds up past the floor (ceil) — tiny but never below the cushion. + assert_eq!(adapt_fec(1), 2); + // FEC exceeds the loss it covers (×1.4 + 1pt headroom). + assert_eq!(adapt_fec(50_000), 8); // 5% loss → ceil(7)+1 = 8 + assert_eq!(adapt_fec(100_000), 15); // 10% → ceil(14)+1 = 15 + // Heavy loss saturates at the ceiling, never beyond. + assert_eq!(adapt_fec(1_000_000), FEC_MAX); // 100% → clamped + assert!(adapt_fec(u32::MAX) <= FEC_MAX); + } + #[test] fn compositor_resolution_precedence() { use crate::vdisplay::Compositor::*; diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index fe81183..ecbbe54 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -183,6 +183,11 @@ #define MSG_REQUEST_KEYFRAME 3 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`LossReport`]. +#define MSG_LOSS_REPORT 4 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Type byte of [`ProbeRequest`]. #define MSG_PROBE_REQUEST 32