Files
punktfunk/crates/punktfunk-core/src/session.rs
T
enricobuehler 516efcc3a3 feat(core/fec): adaptive FEC — size recovery to measured loss, not a flat 20%
On a clean link the flat 20% FEC is pure waste: extra wire bytes AND extra
packets. On a packet-rate-bound uplink (the Steam Deck's WiFi tx caps ~22k pps
regardless of bitrate) those extra packets directly cost goodput — measured at
200 Mbps goodput, 20% FEC drove ~10% loss vs ~2.6% at 0% (it saturated the link).

Adaptive FEC closes the loop:
- Client measures the loss FEC is absorbing each ~750 ms window from session stats
  (recovered shards / received, + a bump when a frame went unrecoverable) and sends
  a periodic `LossReport { loss_ppm }` on the control stream (new message;
  `window_loss_ppm` helper, shared + unit-tested). Connector (Apple/Linux/Windows)
  and probe both report; suppressed during a speed test so its filler can't skew it.
- Host maps loss → recovery % (`adapt_fec`: ≈ loss×1.4 + 1pt, clamped 1..50) and
  applies it live via `Session::set_fec_percent` (the wire is self-describing — each
  packet carries its block's data/recovery counts, so the receiver needs no notice).
  A clean link decays to ~1%; loss ramps it up and converges.
- `PUNKTFUNK_FEC_PCT`, when set, now PINS FEC static (disables adaptation) so
  speed-test / measurement runs keep a fixed, known overhead. Unset ⇒ adaptive,
  starting at 10%.

An older host ignores LossReport (unknown control message) and keeps static FEC;
an older client simply never reports and the host holds its start value. Builds +
clippy + fmt + tests green (adapt_fec / window_loss_ppm / loss_report unit tests).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 21:31:07 +00:00

311 lines
14 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Session lifecycle and the two hot-path state machines.
//!
//! - **Host** ([`Session::submit_frame`]): encoded access unit → FEC + packetize →
//! optional AES-GCM seal → transport send.
//! - **Client** ([`Session::poll_frame`]): transport recv → optional open → reorder +
//! FEC recover + reassemble → whole access unit.
//!
//! Both directions also carry input: a client [`Session::send_input`]s events; the host
//! drains them with [`Session::poll_input`].
use crate::config::{Config, Role};
use crate::crypto::SessionCrypto;
use crate::error::{PunktfunkError, Result};
use crate::fec::{coder_for, ErasureCoder};
use crate::input::InputEvent;
use crate::packet::{Packetizer, Reassembler, ReassemblerLimits, MAX_DATAGRAM_BYTES};
use crate::stats::{Stats, StatsCounters};
use crate::transport::Transport;
/// A reassembled, FEC-recovered access unit, ready to hand to the platform decoder.
pub struct Frame {
pub data: Vec<u8>,
pub frame_index: u32,
pub pts_ns: u64,
pub flags: u32,
}
/// One end of a stream. Constructed for a single [`Role`]; calling the other role's
/// methods returns [`PunktfunkError::InvalidArg`].
///
/// Note: the AEAD layer authenticates each datagram but does **not** provide anti-replay.
/// Video replays are largely absorbed by the reassembler's per-frame dedup, but replayed
/// input events are not yet filtered. A sliding-window replay filter keyed on the
/// authenticated sequence belongs with the pairing/handshake layer (the GameStream host); until then,
/// rely on the LAN/VPN transport assumption (plan §1).
pub struct Session {
config: Config,
coder: Box<dyn ErasureCoder>,
crypto: Option<SessionCrypto>,
transport: Box<dyn Transport>,
packetizer: Packetizer,
reassembler: Reassembler,
stats: StatsCounters,
/// Monotonic wire sequence, also the AES-GCM nonce counter.
next_seq: u64,
/// Client recv ring (reused across [`poll_frame`](Self::poll_frame)): `recvmmsg` drains a batch
/// of datagrams into `recv_scratch` in one syscall, and poll_frame consumes them one at a time
/// across calls (`recv_idx`..`recv_count`), refilling when drained. Allocated lazily on the
/// first client poll so host sessions don't carry it. No per-packet recv alloc at line rate.
recv_scratch: Vec<Vec<u8>>,
recv_lens: Vec<usize>,
recv_count: usize,
recv_idx: usize,
/// Host send pool: reused wire buffers (`seal_frame` seals in place into these, the caller sends
/// then returns them via [`reclaim_wires`](Self::reclaim_wires)). After warmup each buffer keeps
/// its capacity, so the per-packet ciphertext + wire `Vec` allocations vanish from the hot path.
wire_pool: Vec<Vec<u8>>,
}
/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k
/// pkt/s this is ~4k syscalls/s instead of 125k; the buffers cost `RECV_BATCH × RECV_BUF` (~64 KB).
const RECV_BATCH: usize = 32;
impl Session {
pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> {
config.validate()?;
let coder = coder_for(config.fec.scheme);
let crypto = config
.encrypt
.then(|| SessionCrypto::new(&config.key, config.salt, config.role));
let packetizer = Packetizer::new(&config);
let reassembler = Reassembler::new(ReassemblerLimits::from_config(&config));
Ok(Session {
coder,
crypto,
transport,
packetizer,
reassembler,
stats: StatsCounters::default(),
next_seq: 0,
recv_scratch: Vec::new(),
recv_lens: Vec::new(),
recv_count: 0,
recv_idx: 0,
wire_pool: Vec::new(),
config,
})
}
pub fn role(&self) -> Role {
self.config.role
}
pub fn stats(&self) -> Stats {
self.stats.snapshot()
}
/// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian
/// sequence (the receiver derives the GCM nonce from it) then the ciphertext.
/// Seal one plaintext packet into the reused `wire` buffer in place (no allocation): the wire is
/// `seq(8) || ciphertext || tag` with crypto on, or just the packet with crypto off (probe).
/// Byte-identical to the previous `seal` + concat path; `clear()` keeps the buffer's capacity.
fn seal_into(&mut self, packet: &[u8], wire: &mut Vec<u8>) -> Result<()> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
wire.clear();
match &self.crypto {
Some(c) => {
wire.extend_from_slice(&seq.to_be_bytes()); // [0..8] plaintext seq prefix
wire.extend_from_slice(packet); // [8..8+n] plaintext to encrypt
wire.resize(wire.len() + crate::crypto::TAG_LEN, 0); // tag scratch
c.seal_in_place(seq, &mut wire[8..])?; // encrypt [8..] in place, tag written at the end
}
None => wire.extend_from_slice(packet),
}
Ok(())
}
/// Unwrap a wire datagram back into a plaintext packet.
fn open_from_wire(&self, wire: &[u8]) -> Result<Vec<u8>> {
match &self.crypto {
Some(c) => {
if wire.len() < 8 {
return Err(PunktfunkError::BadPacket);
}
let seq = u64::from_be_bytes(wire[..8].try_into().unwrap());
c.open(seq, &wire[8..])
}
None => Ok(wire.to_vec()),
}
}
// -- Host path --------------------------------------------------------
/// Host: FEC-protect, packetize, and seal one encoded access unit into wire packets WITHOUT
/// sending them. Counts the frame + its packets/bytes as submitted; the caller transmits the
/// returned packets via [`send_sealed`](Self::send_sealed) — in one call, or in chunks paced
/// over the frame interval so a real NIC doesn't drop the whole frame as a line-rate burst (the
/// 1 Gbps+ freeze fix). The nonce counter advances per packet, in order, so seal once and send
/// the result intact. (Holding the `Vec<Vec<u8>>` also keeps the buffers alive for the batch.)
pub fn seal_frame(
&mut self,
data: &[u8],
pts_ns: u64,
user_flags: u32,
) -> Result<Vec<Vec<u8>>> {
if self.config.role != Role::Host {
return Err(PunktfunkError::InvalidArg(
"seal_frame called on a client session",
));
}
let packets = self
.packetizer
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
StatsCounters::add(&self.stats.frames_submitted, 1);
// Reuse the wire-buffer pool the caller returns via `reclaim_wires`: one buffer per packet,
// sealed in place — after warmup there is no per-packet ciphertext/wire allocation. (`wires`
// is a local, so `seal_into`'s `&mut self` doesn't alias the `&mut` iteration over it.)
let mut wires = std::mem::take(&mut self.wire_pool);
wires.resize_with(packets.len(), Vec::new);
for (wire, pkt) in wires.iter_mut().zip(packets.iter()) {
self.seal_into(pkt, wire)?;
}
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
StatsCounters::add(&self.stats.packets_sent, wires.len() as u64);
StatsCounters::add(&self.stats.bytes_sent, bytes);
Ok(wires)
}
/// Return the wire buffers from [`seal_frame`](Self::seal_frame) to the reuse pool once the caller
/// has finished sending them, so the next frame reseals in place with no allocation. Optional —
/// dropping the buffers instead just forfeits the reuse (correctness is unaffected).
pub fn reclaim_wires(&mut self, wires: Vec<Vec<u8>>) {
self.wire_pool = wires;
}
/// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single
/// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`)
/// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk.
pub fn send_sealed(&self, packets: &[&[u8]]) -> Result<usize> {
// GSO when enabled (UdpTransport/Linux), else sendmmsg — same short-count drop contract.
let sent = self.transport.send_gso(packets)?;
if sent < packets.len() {
StatsCounters::add(
&self.stats.packets_send_dropped,
(packets.len() - sent) as u64,
);
}
Ok(sent)
}
/// Host: FEC-protect, packetize, seal, and send one encoded access unit (the whole frame in one
/// batched send). Convenience composition of [`seal_frame`](Self::seal_frame) +
/// [`send_sealed`](Self::send_sealed) for callers that don't pace (synthetic source, probe).
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
let wires = self.seal_frame(data, pts_ns, user_flags)?;
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
let r = self.send_sealed(&refs);
drop(refs); // release the borrow of `wires` before returning the buffers to the pool
self.reclaim_wires(wires);
r.map(|_| ())
}
/// Host: live-adjust the FEC recovery percentage (adaptive FEC). Affects the next
/// [`submit_frame`](Self::submit_frame)/[`seal_frame`](Self::seal_frame); the receiver needs no
/// notification (each packet's header carries its block's data/recovery shard counts).
pub fn set_fec_percent(&mut self, pct: u8) {
self.packetizer.set_fec_percent(pct);
}
/// The current FEC recovery percentage (host side).
pub fn fec_percent(&self) -> u8 {
self.packetizer.fec_percent()
}
/// Host: drain one pending input event from the client, if any.
pub fn poll_input(&mut self) -> Result<Option<InputEvent>> {
if self.config.role != Role::Host {
return Err(PunktfunkError::InvalidArg(
"poll_input called on a client session",
));
}
while let Some(wire) = self.transport.recv()? {
let pkt = match self.open_from_wire(&wire) {
Ok(p) => p,
Err(_) => continue, // drop undecryptable noise
};
StatsCounters::add(&self.stats.packets_received, 1);
if let Some(ev) = InputEvent::decode(&pkt) {
return Ok(Some(ev));
}
// Not an input datagram (e.g. stray video) — ignore and keep draining.
}
Ok(None)
}
// -- Client path ------------------------------------------------------
/// Client: drain the transport until a whole access unit is recovered, or no more
/// packets are pending ([`PunktfunkError::NoFrame`]).
pub fn poll_frame(&mut self) -> Result<Frame> {
if self.config.role != Role::Client {
return Err(PunktfunkError::InvalidArg(
"poll_frame called on a host session",
));
}
// Lazily allocate the recv ring on first client poll (host sessions never get here).
if self.recv_scratch.is_empty() {
// Each buffer holds a max datagram + 1 (an oversized read fills it → reassembler rejects).
self.recv_scratch = (0..RECV_BATCH)
.map(|_| vec![0u8; MAX_DATAGRAM_BYTES + 1])
.collect();
self.recv_lens = vec![0usize; RECV_BATCH];
}
loop {
// Refill the ring with one `recvmmsg` batch when the current one is drained.
if self.recv_idx >= self.recv_count {
self.recv_count = self
.transport
.recv_batch(&mut self.recv_scratch, &mut self.recv_lens)?;
self.recv_idx = 0;
if self.recv_count == 0 {
return Err(PunktfunkError::NoFrame);
}
}
let i = self.recv_idx;
self.recv_idx += 1;
let len = self.recv_lens[i];
// An oversized datagram fills the whole buffer (recvmmsg truncates + caps msg_len at the
// buffer size) — drop it rather than hand up a truncated, corrupt packet, mirroring the
// scalar `recv`'s `n >= RECV_BUF` check.
if len > MAX_DATAGRAM_BYTES {
continue;
}
let pkt = match self.open_from_wire(&self.recv_scratch[i][..len]) {
Ok(p) => p,
Err(_) => continue,
};
StatsCounters::add(&self.stats.packets_received, 1);
StatsCounters::add(&self.stats.bytes_received, pkt.len() as u64);
// The reassembler validates the packet via its parsed header (`magic`),
// ignoring anything that isn't a well-formed video packet.
if let Some(frame) = self
.reassembler
.push(&pkt, self.coder.as_ref(), &self.stats)?
{
StatsCounters::add(&self.stats.frames_completed, 1);
return Ok(frame);
}
}
}
/// Client: serialize and send one input event to the host.
pub fn send_input(&mut self, event: &InputEvent) -> Result<()> {
if self.config.role != Role::Client {
return Err(PunktfunkError::InvalidArg(
"send_input called on a host session",
));
}
let pkt = event.encode();
let mut wire = Vec::new(); // input is rare + per-event; no pool needed
self.seal_into(&pkt, &mut wire)?;
StatsCounters::add(&self.stats.packets_sent, 1);
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
if !self.transport.send(&wire)? {
StatsCounters::add(&self.stats.packets_send_dropped, 1);
}
Ok(())
}
}