feat(core/fec): adaptive FEC — size recovery to measured loss, not a flat 20%

On a clean link the flat 20% FEC is pure waste: extra wire bytes AND extra
packets. On a packet-rate-bound uplink (the Steam Deck's WiFi tx caps ~22k pps
regardless of bitrate) those extra packets directly cost goodput — measured at
200 Mbps goodput, 20% FEC drove ~10% loss vs ~2.6% at 0% (it saturated the link).

Adaptive FEC closes the loop:
- Client measures the loss FEC is absorbing each ~750 ms window from session stats
  (recovered shards / received, + a bump when a frame went unrecoverable) and sends
  a periodic `LossReport { loss_ppm }` on the control stream (new message;
  `window_loss_ppm` helper, shared + unit-tested). Connector (Apple/Linux/Windows)
  and probe both report; suppressed during a speed test so its filler can't skew it.
- Host maps loss → recovery % (`adapt_fec`: ≈ loss×1.4 + 1pt, clamped 1..50) and
  applies it live via `Session::set_fec_percent` (the wire is self-describing — each
  packet carries its block's data/recovery counts, so the receiver needs no notice).
  A clean link decays to ~1%; loss ramps it up and converges.
- `PUNKTFUNK_FEC_PCT`, when set, now PINS FEC static (disables adaptation) so
  speed-test / measurement runs keep a fixed, known overhead. Unset ⇒ adaptive,
  starting at 10%.

An older host ignores LossReport (unknown control message) and keeps static FEC;
an older client simply never reports and the host holds its start value. Builds +
clippy + fmt + tests green (adapt_fec / window_loss_ppm / loss_report unit tests).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-20 21:31:07 +00:00
parent f37a304fba
commit 516efcc3a3
7 changed files with 272 additions and 15 deletions
+46 -2
View File
@@ -45,7 +45,8 @@ use punktfunk_core::config::Role;
use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::FLAG_PROBE; use punktfunk_core::packet::FLAG_PROBE;
use punktfunk_core::quic::{ use punktfunk_core::quic::{
endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, endpoint, io, window_loss_ppm, Hello, LossReport, ProbeRequest, ProbeResult, Reconfigure,
Reconfigured, Start, Welcome,
}; };
use punktfunk_core::transport::UdpTransport; use punktfunk_core::transport::UdpTransport;
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
@@ -438,6 +439,10 @@ async fn session(args: Args) -> Result<()> {
// wire packet (graceful past the FEC budget), not just fully-reassembled probe AUs. // wire packet (graceful past the FEC budget), not just fully-reassembled probe AUs.
let rx_wire_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let rx_wire_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let rx_wire_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let rx_wire_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
// Adaptive-FEC loss feedback: the data loop publishes a windowed loss estimate here; in normal
// stream mode (no speed test / remode) a control-stream task relays it to the host as a
// LossReport so it can size FEC to the link. u32::MAX = "no fresh sample this window".
let loss_ppm = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(u32::MAX));
// Mid-stream renegotiation test: after a delay, ask the host to switch modes on the // Mid-stream renegotiation test: after a delay, ask the host to switch modes on the
// still-open control stream. The stream then carries new-mode AUs (IDR + in-band // still-open control stream. The stream then carries new-mode AUs (IDR + in-band
@@ -548,6 +553,26 @@ async fn session(args: Args) -> Result<()> {
"SPEED TEST complete", "SPEED TEST complete",
); );
}); });
} else {
// Normal stream mode: relay the data loop's windowed loss estimate to the host as periodic
// LossReports, so it can size FEC to the link (adaptive FEC). The control stream is otherwise
// idle here (remode/speed-test own it in their modes).
let mut ls = send;
let lp = loss_ppm.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering::Relaxed;
loop {
tokio::time::sleep(std::time::Duration::from_millis(750)).await;
let v = lp.swap(u32::MAX, Relaxed);
if v != u32::MAX
&& io::write_msg(&mut ls, &LossReport { loss_ppm: v }.encode())
.await
.is_err()
{
break; // control stream gone
}
}
});
} }
// Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the // Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the
@@ -823,6 +848,7 @@ async fn session(args: Args) -> Result<()> {
let expected = welcome.frames; let expected = welcome.frames;
let out_path = args.out.clone(); let out_path = args.out.clone();
let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone()); let (rxp_dt, rxb_dt) = (rx_wire_packets.clone(), rx_wire_bytes.clone());
let lp_dt = loss_ppm.clone();
// Express our receive time in the host clock before differencing against the host-stamped // Express our receive time in the host clock before differencing against the host-stamped
// capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency // capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency
@@ -857,13 +883,31 @@ async fn session(args: Args) -> Result<()> {
let mut latencies_us: Vec<u64> = Vec::new(); let mut latencies_us: Vec<u64> = Vec::new();
let mut last_rx = std::time::Instant::now(); let mut last_rx = std::time::Instant::now();
let started = std::time::Instant::now(); let started = std::time::Instant::now();
// Adaptive-FEC loss window: publish a fresh estimate every 750 ms for the LossReport task.
let mut last_loss_report = std::time::Instant::now();
let (mut last_recovered, mut last_received, mut last_dropped) = (0u64, 0u64, 0u64);
loop { loop {
// Mirror packet-level receive counters for the speed-test reporter (reads their delta). // Mirror packet-level receive counters for the speed-test reporter (reads their delta),
// and publish a windowed loss estimate for the adaptive-FEC LossReport task.
{ {
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
let s = session.stats(); let s = session.stats();
rxp_dt.store(s.packets_received, Relaxed); rxp_dt.store(s.packets_received, Relaxed);
rxb_dt.store(s.bytes_received, Relaxed); rxb_dt.store(s.bytes_received, Relaxed);
if last_loss_report.elapsed() >= std::time::Duration::from_millis(750) {
lp_dt.store(
window_loss_ppm(
s.fec_recovered_shards.wrapping_sub(last_recovered),
s.packets_received.wrapping_sub(last_received),
s.frames_dropped.wrapping_sub(last_dropped),
),
Relaxed,
);
last_loss_report = std::time::Instant::now();
last_recovered = s.fec_recovered_shards;
last_received = s.packets_received;
last_dropped = s.frames_dropped;
}
} }
if expected > 0 && ok + mismatched >= expected { if expected > 0 && ok + mismatched >= expected {
break; break;
+29 -4
View File
@@ -16,15 +16,15 @@ use crate::error::{PunktfunkError, Result};
use crate::input::InputEvent; use crate::input::InputEvent;
use crate::packet::FLAG_PROBE; use crate::packet::FLAG_PROBE;
use crate::quic::{ use crate::quic::{
endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, endpoint, io, window_loss_ppm, Hello, HidOutput, LossReport, ProbeRequest, ProbeResult,
RequestKeyframe, RichInput, Start, Welcome, Reconfigure, Reconfigured, RequestKeyframe, RichInput, Start, Welcome,
}; };
use crate::session::{Frame, Session}; use crate::session::{Frame, Session};
use crate::transport::UdpTransport; use crate::transport::UdpTransport;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::{Duration, Instant};
/// A control-stream request the embedder makes on the open handshake stream: a mode switch or a /// A control-stream request the embedder makes on the open handshake stream: a mode switch or a
/// speed test. One outbound channel carries both so the worker's `select!` has a single writer /// speed test. One outbound channel carries both so the worker's `select!` has a single writer
@@ -33,6 +33,7 @@ enum CtrlRequest {
Mode(Mode), Mode(Mode),
Probe(ProbeRequest), Probe(ProbeRequest),
Keyframe, Keyframe,
Loss(LossReport),
} }
/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated /// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated
@@ -245,6 +246,7 @@ impl NativeClient {
let mode_slot_w = mode_slot.clone(); let mode_slot_w = mode_slot.clone();
let probe_w = probe.clone(); let probe_w = probe.clone();
let frames_dropped_w = frames_dropped.clone(); let frames_dropped_w = frames_dropped.clone();
let ctrl_tx_pump = ctrl_tx.clone(); // the data-plane pump sends adaptive-FEC LossReports
let worker = std::thread::Builder::new() let worker = std::thread::Builder::new()
.name("punktfunk-client".into()) .name("punktfunk-client".into())
.spawn(move || { .spawn(move || {
@@ -282,6 +284,7 @@ impl NativeClient {
mic_rx, mic_rx,
rich_input_rx, rich_input_rx,
ctrl_rx, ctrl_rx,
ctrl_tx: ctrl_tx_pump,
ready_tx, ready_tx,
shutdown: shutdown_w, shutdown: shutdown_w,
mode_slot: mode_slot_w, mode_slot: mode_slot_w,
@@ -629,6 +632,7 @@ struct WorkerArgs {
mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec<u8>)>, mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec<u8>)>,
rich_input_rx: tokio::sync::mpsc::UnboundedReceiver<RichInput>, rich_input_rx: tokio::sync::mpsc::UnboundedReceiver<RichInput>,
ctrl_rx: tokio::sync::mpsc::UnboundedReceiver<CtrlRequest>, ctrl_rx: tokio::sync::mpsc::UnboundedReceiver<CtrlRequest>,
ctrl_tx: tokio::sync::mpsc::UnboundedSender<CtrlRequest>,
ready_tx: std::sync::mpsc::Sender<Result<Negotiated>>, ready_tx: std::sync::mpsc::Sender<Result<Negotiated>>,
shutdown: Arc<AtomicBool>, shutdown: Arc<AtomicBool>,
mode_slot: Arc<std::sync::Mutex<Mode>>, mode_slot: Arc<std::sync::Mutex<Mode>>,
@@ -658,6 +662,7 @@ async fn worker_main(args: WorkerArgs) {
mut mic_rx, mut mic_rx,
mut rich_input_rx, mut rich_input_rx,
mut ctrl_rx, mut ctrl_rx,
ctrl_tx,
ready_tx, ready_tx,
shutdown, shutdown,
mode_slot, mode_slot,
@@ -851,6 +856,7 @@ async fn worker_main(args: WorkerArgs) {
CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(), CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(),
CtrlRequest::Probe(p) => p.encode(), CtrlRequest::Probe(p) => p.encode(),
CtrlRequest::Keyframe => RequestKeyframe.encode(), CtrlRequest::Keyframe => RequestKeyframe.encode(),
CtrlRequest::Loss(r) => r.encode(),
}; };
if io::write_msg(&mut ctrl_send, &bytes).await.is_err() { if io::write_msg(&mut ctrl_send, &bytes).await.is_err() {
break; break;
@@ -944,6 +950,12 @@ async fn worker_main(args: WorkerArgs) {
let pump_probe = probe.clone(); let pump_probe = probe.clone();
let _ = tokio::task::spawn_blocking(move || { let _ = tokio::task::spawn_blocking(move || {
pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump
// Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the
// window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can
// size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it).
const ADAPT_REPORT_INTERVAL: Duration = Duration::from_millis(750);
let mut last_report = Instant::now();
let (mut last_recovered, mut last_received, mut last_dropped) = (0u64, 0u64, 0u64);
while !pump_shutdown.load(Ordering::SeqCst) { while !pump_shutdown.load(Ordering::SeqCst) {
// Mirror the reassembler's unrecoverable-drop count for the client's keyframe-recovery // Mirror the reassembler's unrecoverable-drop count for the client's keyframe-recovery
// loop, and (during a speed test) the packet-level receive counters for the throughput // loop, and (during a speed test) the packet-level receive counters for the throughput
@@ -951,7 +963,7 @@ async fn worker_main(args: WorkerArgs) {
// through a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads. // through a total-loss drought where no AU completes. Cheap: a few relaxed atomic loads.
let st = session.stats(); let st = session.stats();
frames_dropped.store(st.frames_dropped, Ordering::Relaxed); frames_dropped.store(st.frames_dropped, Ordering::Relaxed);
{ let probe_active = {
let mut p = pump_probe.lock().unwrap(); let mut p = pump_probe.lock().unwrap();
if p.active && !p.done { if p.active && !p.done {
p.rx_packets_now = st.packets_received; p.rx_packets_now = st.packets_received;
@@ -959,6 +971,19 @@ async fn worker_main(args: WorkerArgs) {
p.base_packets.get_or_insert(st.packets_received); p.base_packets.get_or_insert(st.packets_received);
p.base_bytes.get_or_insert(st.bytes_received); p.base_bytes.get_or_insert(st.bytes_received);
} }
p.active && !p.done
};
if !probe_active && last_report.elapsed() >= ADAPT_REPORT_INTERVAL {
let loss_ppm = window_loss_ppm(
st.fec_recovered_shards.wrapping_sub(last_recovered),
st.packets_received.wrapping_sub(last_received),
st.frames_dropped.wrapping_sub(last_dropped),
);
let _ = ctrl_tx.send(CtrlRequest::Loss(LossReport { loss_ppm }));
last_report = Instant::now();
last_recovered = st.fec_recovered_shards;
last_received = st.packets_received;
last_dropped = st.frames_dropped;
} }
match session.poll_frame() { match session.poll_frame() {
Ok(frame) => { Ok(frame) => {
+12
View File
@@ -96,6 +96,18 @@ impl Packetizer {
} }
} }
/// Live-adjust the FEC recovery percentage (adaptive FEC). Takes effect on the next
/// [`packetize`](Self::packetize); the wire is self-describing (each packet carries its block's
/// data/recovery counts), so the receiver needs no notification. Clamped to ≤ 90.
pub fn set_fec_percent(&mut self, pct: u8) {
self.fec.fec_percent = pct.min(90);
}
/// The current FEC recovery percentage.
pub fn fec_percent(&self) -> u8 {
self.fec.fec_percent
}
/// Packetize one access unit into wire packets (header + shard payload each). /// Packetize one access unit into wire packets (header + shard payload each).
pub fn packetize( pub fn packetize(
&mut self, &mut self,
+80
View File
@@ -167,6 +167,18 @@ pub struct Reconfigured {
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RequestKeyframe; pub struct RequestKeyframe;
/// `client → host`, periodic: the client's observed data-plane loss, so the host can size FEC to
/// the link instead of a flat percentage (adaptive FEC). `loss_ppm` is parts-per-million of shards
/// that arrived missing-but-recovered (plus a bump when frames went unrecoverable) over the report
/// window — i.e. the loss FEC is currently absorbing. The host maps it to a recovery percentage,
/// clamped to a sane band, and applies it live; a clean link decays toward the floor (fewer packets,
/// which directly helps a packet-rate-bound uplink like the Steam Deck's WiFi tx). Fire-and-forget.
/// A host that predates this ignores it (unknown control message) and keeps its static FEC.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct LossReport {
pub loss_ppm: u32,
}
/// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts /// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts
/// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at /// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at
/// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then /// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then
@@ -251,6 +263,8 @@ pub const MSG_RECONFIGURE: u8 = 0x01;
pub const MSG_RECONFIGURED: u8 = 0x02; pub const MSG_RECONFIGURED: u8 = 0x02;
/// Type byte of [`RequestKeyframe`]. /// Type byte of [`RequestKeyframe`].
pub const MSG_REQUEST_KEYFRAME: u8 = 0x03; pub const MSG_REQUEST_KEYFRAME: u8 = 0x03;
/// Type byte of [`LossReport`].
pub const MSG_LOSS_REPORT: u8 = 0x04;
/// Type byte of [`ProbeRequest`]. /// Type byte of [`ProbeRequest`].
pub const MSG_PROBE_REQUEST: u8 = 0x20; pub const MSG_PROBE_REQUEST: u8 = 0x20;
/// Type byte of [`ProbeResult`]. /// Type byte of [`ProbeResult`].
@@ -821,6 +835,43 @@ impl RequestKeyframe {
} }
} }
impl LossReport {
pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] loss_ppm[5..9]
let mut b = Vec::with_capacity(9);
b.extend_from_slice(CTL_MAGIC);
b.push(MSG_LOSS_REPORT);
b.extend_from_slice(&self.loss_ppm.to_le_bytes());
b
}
pub fn decode(b: &[u8]) -> Result<LossReport> {
if b.len() != 9 || &b[0..4] != CTL_MAGIC || b[4] != MSG_LOSS_REPORT {
return Err(PunktfunkError::InvalidArg("bad LossReport"));
}
Ok(LossReport {
loss_ppm: u32::from_le_bytes(b[5..9].try_into().unwrap()),
})
}
}
/// Compute a [`LossReport`] `loss_ppm` from one window's session-stat deltas: shards FEC recovered
/// (the loss it absorbed), shards received, and frames that went unrecoverable. Loss ≈ recovered /
/// (received + recovered) — the fraction of shards that arrived missing. A frame drop means loss
/// exceeded the current FEC budget (so `recovered` plateaus), so add a fixed bump to push the host's
/// FEC up past the cap on the next adjustment. Returns parts-per-million, capped at 1e6.
pub fn window_loss_ppm(recovered: u64, received: u64, frames_dropped: u64) -> u32 {
let denom = received.saturating_add(recovered);
let mut ppm = recovered
.saturating_mul(1_000_000)
.checked_div(denom)
.unwrap_or(0) as u32;
if frames_dropped > 0 {
ppm = ppm.saturating_add(50_000); // +5%: unrecoverable loss → raise FEC past the current cap
}
ppm.min(1_000_000)
}
impl ProbeRequest { impl ProbeRequest {
pub fn encode(&self) -> Vec<u8> { pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13] // magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13]
@@ -1877,6 +1928,35 @@ mod tests {
assert!(RequestKeyframe::decode(&[bytes.as_slice(), &[0]].concat()).is_err()); assert!(RequestKeyframe::decode(&[bytes.as_slice(), &[0]].concat()).is_err());
} }
#[test]
fn loss_report_roundtrip() {
for loss_ppm in [0u32, 1, 12_345, 50_000, 1_000_000] {
let r = LossReport { loss_ppm };
assert_eq!(LossReport::decode(&r.encode()).unwrap(), r);
}
// Disjoint from the other control messages (type byte + length).
assert!(LossReport::decode(&RequestKeyframe.encode()).is_err());
assert!(RequestKeyframe::decode(&LossReport { loss_ppm: 0 }.encode()).is_err());
assert!(LossReport::decode(
&[LossReport { loss_ppm: 0 }.encode().as_slice(), &[0]].concat()
)
.is_err());
}
#[test]
fn window_loss_ppm_estimates_and_caps() {
// No traffic → 0. A clean window (nothing recovered) → 0.
assert_eq!(window_loss_ppm(0, 0, 0), 0);
assert_eq!(window_loss_ppm(0, 1000, 0), 0);
// 50 recovered of 1000 total (950 received + 50 recovered) = 5%.
assert_eq!(window_loss_ppm(50, 950, 0), 50_000);
// An unrecoverable frame adds the +5% bump (push FEC past the current cap).
assert_eq!(window_loss_ppm(50, 950, 1), 100_000);
// A total-loss window with a drop but nothing received still reports the bump, capped at 1e6.
assert_eq!(window_loss_ppm(0, 0, 3), 50_000);
assert!(window_loss_ppm(u64::MAX, 1, 9) <= 1_000_000);
}
#[test] #[test]
fn probe_messages_roundtrip() { fn probe_messages_roundtrip() {
let req = ProbeRequest { let req = ProbeRequest {
+12
View File
@@ -201,6 +201,18 @@ impl Session {
r.map(|_| ()) r.map(|_| ())
} }
/// Host: live-adjust the FEC recovery percentage (adaptive FEC). Affects the next
/// [`submit_frame`](Self::submit_frame)/[`seal_frame`](Self::seal_frame); the receiver needs no
/// notification (each packet's header carries its block's data/recovery shard counts).
pub fn set_fec_percent(&mut self, pct: u8) {
self.packetizer.set_fec_percent(pct);
}
/// The current FEC recovery percentage (host side).
pub fn fec_percent(&self) -> u8 {
self.packetizer.fec_percent()
}
/// Host: drain one pending input event from the client, if any. /// Host: drain one pending input event from the client, if any.
pub fn poll_input(&mut self) -> Result<Option<InputEvent>> { pub fn poll_input(&mut self) -> Result<Option<InputEvent>> {
if self.config.role != Role::Host { if self.config.role != Role::Host {
+88 -9
View File
@@ -27,13 +27,14 @@ use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref,
use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF}; use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF};
use punktfunk_core::quic::{ use punktfunk_core::quic::{
endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult, endpoint, io, ClockEcho, ClockProbe, Hello, LossReport, PairChallenge, PairProof, PairRequest,
ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start, Welcome, PairResult, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, RequestKeyframe, Start,
Welcome,
}; };
use punktfunk_core::transport::UdpTransport; use punktfunk_core::transport::UdpTransport;
use punktfunk_core::Session; use punktfunk_core::Session;
use rand::RngCore; use rand::RngCore;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -334,15 +335,41 @@ fn resolve_bitrate_kbps(requested: u32) -> u32 {
} }
} }
/// FEC recovery percent for the session's Welcome. Default 20% (Sunshine's default too); a clean /// Static FEC override: `PUNKTFUNK_FEC_PCT`, when set, PINS the recovery percent and DISABLES
/// wired LAN can lower it (every recovery shard is wire bytes + packets), so `PUNKTFUNK_FEC_PCT` /// adaptive FEC — so a speed test / measurement keeps a fixed, known overhead. `None` ⇒ adaptive
/// overrides it — e.g. `0` disables FEC entirely, `10` halves the overhead. Clamped to ≤ 90. /// FEC (the host sizes recovery to the loss the client reports). `0` disables FEC entirely.
fn fec_percent_from_env() -> u8 { /// Clamped to ≤ 90.
fn fec_static_override() -> Option<u8> {
std::env::var("PUNKTFUNK_FEC_PCT") std::env::var("PUNKTFUNK_FEC_PCT")
.ok() .ok()
.and_then(|s| s.trim().parse::<u8>().ok()) .and_then(|s| s.trim().parse::<u8>().ok())
.map(|p| p.min(90)) .map(|p| p.min(90))
.unwrap_or(20) }
/// Adaptive-FEC band + starting point. Every recovery shard is extra wire bytes AND an extra
/// packet, so on a clean link FEC decays toward [`FEC_MIN`] (fewer packets — the win for a
/// packet-rate-bound uplink like the Steam Deck's WiFi tx); loss ramps it toward [`FEC_MAX`].
/// Sessions start moderate so the first frames (before any loss report) are protected.
const FEC_MIN: u8 = 1;
const FEC_MAX: u8 = 50;
const FEC_ADAPTIVE_START: u8 = 10;
/// Map the client's reported data-plane loss (ppm of shards, see [`LossReport`]) to a recovery
/// percentage. FEC must EXCEED the loss rate to recover a block, so target ≈ loss × 1.4 + 1 pt of
/// margin, clamped to the band. A clean link (≈0 ppm) lands on [`FEC_MIN`].
fn adapt_fec(loss_ppm: u32) -> u8 {
let loss_pct = loss_ppm as f64 / 10_000.0; // ppm → percent
let target = (loss_pct * 1.4).ceil() as u32 + 1;
target.clamp(FEC_MIN as u32, FEC_MAX as u32) as u8
}
/// Apply the latest adaptive-FEC target to the session if it changed (cheap relaxed load + compare),
/// called once per frame on the data-plane send path.
fn apply_fec_target(session: &mut Session, fec_target: &AtomicU8) {
let t = fec_target.load(Ordering::Relaxed);
if session.fec_percent() != t {
session.set_fec_percent(t);
}
} }
/// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream /// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream
@@ -588,7 +615,9 @@ async fn serve_session(
// The post-GameStream point of punktfunk/1: Leopard GF(2¹⁶) FEC + real encryption. // The post-GameStream point of punktfunk/1: Leopard GF(2¹⁶) FEC + real encryption.
fec: FecConfig { fec: FecConfig {
scheme: FecScheme::Gf16, scheme: FecScheme::Gf16,
fec_percent: fec_percent_from_env(), // Static override pins it; otherwise sessions start at the adaptive midpoint and the
// host re-sizes FEC live from the client's LossReports (adaptive FEC).
fec_percent: fec_static_override().unwrap_or(FEC_ADAPTIVE_START),
max_data_per_block: 4096, max_data_per_block: 4096,
}, },
// ~1452-byte payload keeps the IP datagram within a 1500 MTU (1452 + 40 header + 24 // ~1452-byte payload keeps the IP datagram within a 1500 MTU (1452 + 40 header + 24
@@ -644,6 +673,12 @@ async fn serve_session(
let (probe_tx, probe_rx) = std::sync::mpsc::channel::<ProbeRequest>(); let (probe_tx, probe_rx) = std::sync::mpsc::channel::<ProbeRequest>();
let (probe_result_tx, mut probe_result_rx) = let (probe_result_tx, mut probe_result_rx) =
tokio::sync::mpsc::unbounded_channel::<ProbeResult>(); tokio::sync::mpsc::unbounded_channel::<ProbeResult>();
// Adaptive FEC: the control task maps each client LossReport to a recovery percent and publishes
// it here; the data-plane send loop reads + applies it per frame. Disabled (pinned) when
// PUNKTFUNK_FEC_PCT is set. Seeded with the session's starting FEC so it's a no-op until a report.
let adaptive_fec = fec_static_override().is_none();
let fec_target = Arc::new(AtomicU8::new(welcome.fec.fec_percent));
let fec_target_ctl = fec_target.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut active = hello.mode; let mut active = hello.mode;
loop { loop {
@@ -679,6 +714,22 @@ async fn serve_session(
if keyframe_tx.send(()).is_err() { if keyframe_tx.send(()).is_err() {
break; // data plane gone break; // data plane gone
} }
} else if let Ok(rep) = LossReport::decode(&msg) {
// Adaptive FEC: size recovery to the loss the client is seeing. The data-plane
// send loop reads `fec_target_ctl` and applies it per frame. Ignored when FEC
// is pinned via PUNKTFUNK_FEC_PCT.
if adaptive_fec {
let target = adapt_fec(rep.loss_ppm);
let prev = fec_target_ctl.swap(target, Ordering::Relaxed);
if prev != target {
tracing::info!(
loss_ppm = rep.loss_ppm,
fec_pct = target,
prev_fec_pct = prev,
"adaptive FEC adjusted"
);
}
}
} else if let Ok(req) = ProbeRequest::decode(&msg) { } else if let Ok(req) = ProbeRequest::decode(&msg) {
tracing::info!( tracing::info!(
target_kbps = req.target_kbps, target_kbps = req.target_kbps,
@@ -830,6 +881,7 @@ async fn serve_session(
let bitrate_kbps = welcome.bitrate_kbps; // resolved encoder bitrate (Hello clamped, or default) let bitrate_kbps = welcome.bitrate_kbps; // resolved encoder bitrate (Hello clamped, or default)
let bit_depth = welcome.bit_depth; // resolved encode bit depth (8, or 10 when negotiated) let bit_depth = welcome.bit_depth; // resolved encode bit depth (8, or 10 when negotiated)
let stop_stream = stop.clone(); let stop_stream = stop.clone();
let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target
let result: Result<()> = async { let result: Result<()> = async {
tokio::task::spawn_blocking(move || -> Result<()> { tokio::task::spawn_blocking(move || -> Result<()> {
// Wait briefly for the client to hole-punch our data port, then stream to its OBSERVED // Wait briefly for the client to hole-punch our data port, then stream to its OBSERVED
@@ -865,6 +917,7 @@ async fn serve_session(
&stop_stream, &stop_stream,
&probe_rx, &probe_rx,
&probe_result_tx, &probe_result_tx,
&fec_target_dp,
), ),
Punktfunk1Source::Virtual => { Punktfunk1Source::Virtual => {
let compositor = compositor let compositor = compositor
@@ -881,6 +934,7 @@ async fn serve_session(
bit_depth, bit_depth,
probe_rx, probe_rx,
probe_result_tx, probe_result_tx,
fec_target_dp,
) )
} }
} }
@@ -1498,12 +1552,14 @@ fn synthetic_stream(
stop: &AtomicBool, stop: &AtomicBool,
probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>, probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>, probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>,
fec_target: &AtomicU8,
) -> Result<()> { ) -> Result<()> {
let interval = std::time::Duration::from_millis(1000 / 60); let interval = std::time::Duration::from_millis(1000 / 60);
for idx in 0..frames { for idx in 0..frames {
if stop.load(Ordering::SeqCst) { if stop.load(Ordering::SeqCst) {
break; break;
} }
apply_fec_target(session, fec_target);
// Service speed-test probes between synthetic frames (loopback bandwidth tests). // Service speed-test probes between synthetic frames (loopback bandwidth tests).
service_probes(session, stop, probe_rx, probe_result_tx); service_probes(session, stop, probe_rx, probe_result_tx);
let data = test_frame(idx, 64 * 1024); let data = test_frame(idx, 64 * 1024);
@@ -1906,6 +1962,7 @@ pub(crate) fn boost_thread_priority(critical: bool) {
} }
} }
#[allow(clippy::too_many_arguments)]
fn send_loop( fn send_loop(
mut session: Session, mut session: Session,
frame_rx: std::sync::mpsc::Receiver<FrameMsg>, frame_rx: std::sync::mpsc::Receiver<FrameMsg>,
@@ -1914,6 +1971,7 @@ fn send_loop(
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
perf: bool, perf: bool,
burst_cap: usize, burst_cap: usize,
fec_target: Arc<AtomicU8>,
) { ) {
boost_thread_priority(false); // transmit thread: above-normal (Apollo's encoder-thread level) boost_thread_priority(false); // transmit thread: above-normal (Apollo's encoder-thread level)
let mut last_perf = std::time::Instant::now(); let mut last_perf = std::time::Instant::now();
@@ -1929,6 +1987,8 @@ fn send_loop(
// Probes run here (they need the Session); a burst pauses video — the encode thread blocks // Probes run here (they need the Session); a burst pauses video — the encode thread blocks
// on the full frame channel meanwhile, which is exactly the intended pause. // on the full frame channel meanwhile, which is exactly the intended pause.
service_probes(&mut session, &stop, &probe_rx, &probe_result_tx); service_probes(&mut session, &stop, &probe_rx, &probe_result_tx);
// Adaptive FEC: pick up any new recovery target the control task set from client LossReports.
apply_fec_target(&mut session, &fec_target);
// Short timeout so we keep re-checking `stop` + probes when no frames are flowing. // Short timeout so we keep re-checking `stop` + probes when no frames are flowing.
match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) { match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) {
Ok(msg) => match paced_submit( Ok(msg) => match paced_submit(
@@ -2073,6 +2133,7 @@ fn virtual_stream(
bit_depth: u8, bit_depth: u8,
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>, probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>, probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
fec_target: Arc<AtomicU8>,
) -> Result<()> { ) -> Result<()> {
// This thread runs the capture+encode loop (single-process: Linux / synthetic / NO_WGC DDA) — or // This thread runs the capture+encode loop (single-process: Linux / synthetic / NO_WGC DDA) — or
// tail-calls the relay below. Elevate it so a CPU-heavy game can't deschedule our GPU submission. // tail-calls the relay below. Elevate it so a CPU-heavy game can't deschedule our GPU submission.
@@ -2095,6 +2156,7 @@ fn virtual_stream(
bit_depth, bit_depth,
probe_rx, probe_rx,
probe_result_tx, probe_result_tx,
fec_target,
); );
} }
tracing::info!( tracing::info!(
@@ -2149,6 +2211,7 @@ fn virtual_stream(
stop, stop,
perf, perf,
burst_cap, burst_cap,
fec_target,
) )
} }
}) })
@@ -2397,6 +2460,7 @@ fn virtual_stream_relay(
bit_depth: u8, bit_depth: u8,
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>, probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>, probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
fec_target: Arc<AtomicU8>,
) -> Result<()> { ) -> Result<()> {
use crate::capture::dxgi::WinCaptureTarget; use crate::capture::dxgi::WinCaptureTarget;
use crate::capture::wgc_relay::HelperRelay; use crate::capture::wgc_relay::HelperRelay;
@@ -2522,6 +2586,7 @@ fn virtual_stream_relay(
stop, stop,
perf, perf,
burst_cap, burst_cap,
fec_target,
) )
} }
}) })
@@ -2919,6 +2984,20 @@ fn build_pipeline(
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn adapt_fec_maps_loss_to_recovery_band() {
// A perfectly clean window (0 loss) lands on the floor.
assert_eq!(adapt_fec(0), FEC_MIN);
// Any nonzero loss rounds up past the floor (ceil) — tiny but never below the cushion.
assert_eq!(adapt_fec(1), 2);
// FEC exceeds the loss it covers (×1.4 + 1pt headroom).
assert_eq!(adapt_fec(50_000), 8); // 5% loss → ceil(7)+1 = 8
assert_eq!(adapt_fec(100_000), 15); // 10% → ceil(14)+1 = 15
// Heavy loss saturates at the ceiling, never beyond.
assert_eq!(adapt_fec(1_000_000), FEC_MAX); // 100% → clamped
assert!(adapt_fec(u32::MAX) <= FEC_MAX);
}
#[test] #[test]
fn compositor_resolution_precedence() { fn compositor_resolution_precedence() {
use crate::vdisplay::Compositor::*; use crate::vdisplay::Compositor::*;
+5
View File
@@ -183,6 +183,11 @@
#define MSG_REQUEST_KEYFRAME 3 #define MSG_REQUEST_KEYFRAME 3
#endif #endif
#if defined(PUNKTFUNK_FEATURE_QUIC)
// Type byte of [`LossReport`].
#define MSG_LOSS_REPORT 4
#endif
#if defined(PUNKTFUNK_FEATURE_QUIC) #if defined(PUNKTFUNK_FEATURE_QUIC)
// Type byte of [`ProbeRequest`]. // Type byte of [`ProbeRequest`].
#define MSG_PROBE_REQUEST 32 #define MSG_PROBE_REQUEST 32