9c86f667ca
ci / web (push) Failing after 39s
ci / docs-site (push) Failing after 33s
apple / swift (push) Successful in 1m16s
ci / rust (push) Successful in 1m20s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 5s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
deb / build-publish (push) Successful in 3m3s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (push) Successful in 4m35s
The host sealed every packet with ~3 heap allocations: aes-gcm's convenience encrypt() allocates the ciphertext Vec, seal_for_wire allocates the seq||ct||tag wire Vec, and seal_frame allocated a fresh Vec<Vec<u8>> per frame. At line rate (~250k–500k pkt/s for 2.5–5 Gbps) that's the single-core allocator wall. - SessionCrypto::seal_in_place uses AeadInPlace::encrypt_in_place_detached to encrypt into the caller's buffer and write the detached tag at the end — byte-identical to seal's ciphertext||tag, no allocation (unit-tested for byte equality + decrypt). - Session keeps a wire_pool the caller returns via reclaim_wires; seal_frame seals each packet in place into the reused buffers (clear() keeps capacity), so after warmup there's no per-packet ciphertext/wire allocation. paced_submit and submit_frame reclaim the pool after sending. End-to-end encrypted/lossless multi-frame tests stay green (validates the pool reuse doesn't corrupt across frames). Next: write packetize directly into a contiguous send buffer (kills the remaining shard allocs + GSO's coalescing copy). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
299 lines
13 KiB
Rust
299 lines
13 KiB
Rust
//! Session lifecycle and the two hot-path state machines.
|
||
//!
|
||
//! - **Host** ([`Session::submit_frame`]): encoded access unit → FEC + packetize →
|
||
//! optional AES-GCM seal → transport send.
|
||
//! - **Client** ([`Session::poll_frame`]): transport recv → optional open → reorder +
|
||
//! FEC recover + reassemble → whole access unit.
|
||
//!
|
||
//! Both directions also carry input: a client [`Session::send_input`]s events; the host
|
||
//! drains them with [`Session::poll_input`].
|
||
|
||
use crate::config::{Config, Role};
|
||
use crate::crypto::SessionCrypto;
|
||
use crate::error::{PunktfunkError, Result};
|
||
use crate::fec::{coder_for, ErasureCoder};
|
||
use crate::input::InputEvent;
|
||
use crate::packet::{Packetizer, Reassembler, ReassemblerLimits, MAX_DATAGRAM_BYTES};
|
||
use crate::stats::{Stats, StatsCounters};
|
||
use crate::transport::Transport;
|
||
|
||
/// A reassembled, FEC-recovered access unit, ready to hand to the platform decoder.
|
||
pub struct Frame {
|
||
pub data: Vec<u8>,
|
||
pub frame_index: u32,
|
||
pub pts_ns: u64,
|
||
pub flags: u32,
|
||
}
|
||
|
||
/// One end of a stream. Constructed for a single [`Role`]; calling the other role's
|
||
/// methods returns [`PunktfunkError::InvalidArg`].
|
||
///
|
||
/// Note: the AEAD layer authenticates each datagram but does **not** provide anti-replay.
|
||
/// Video replays are largely absorbed by the reassembler's per-frame dedup, but replayed
|
||
/// input events are not yet filtered. A sliding-window replay filter keyed on the
|
||
/// authenticated sequence belongs with the pairing/handshake layer (M2); until then,
|
||
/// rely on the LAN/VPN transport assumption (plan §1).
|
||
pub struct Session {
|
||
config: Config,
|
||
coder: Box<dyn ErasureCoder>,
|
||
crypto: Option<SessionCrypto>,
|
||
transport: Box<dyn Transport>,
|
||
packetizer: Packetizer,
|
||
reassembler: Reassembler,
|
||
stats: StatsCounters,
|
||
/// Monotonic wire sequence, also the AES-GCM nonce counter.
|
||
next_seq: u64,
|
||
/// Client recv ring (reused across [`poll_frame`](Self::poll_frame)): `recvmmsg` drains a batch
|
||
/// of datagrams into `recv_scratch` in one syscall, and poll_frame consumes them one at a time
|
||
/// across calls (`recv_idx`..`recv_count`), refilling when drained. Allocated lazily on the
|
||
/// first client poll so host sessions don't carry it. No per-packet recv alloc at line rate.
|
||
recv_scratch: Vec<Vec<u8>>,
|
||
recv_lens: Vec<usize>,
|
||
recv_count: usize,
|
||
recv_idx: usize,
|
||
/// Host send pool: reused wire buffers (`seal_frame` seals in place into these, the caller sends
|
||
/// then returns them via [`reclaim_wires`](Self::reclaim_wires)). After warmup each buffer keeps
|
||
/// its capacity, so the per-packet ciphertext + wire `Vec` allocations vanish from the hot path.
|
||
wire_pool: Vec<Vec<u8>>,
|
||
}
|
||
|
||
/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k
|
||
/// pkt/s this is ~4k syscalls/s instead of 125k; the buffers cost `RECV_BATCH × RECV_BUF` (~64 KB).
|
||
const RECV_BATCH: usize = 32;
|
||
|
||
impl Session {
|
||
pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> {
|
||
config.validate()?;
|
||
let coder = coder_for(config.fec.scheme);
|
||
let crypto = config
|
||
.encrypt
|
||
.then(|| SessionCrypto::new(&config.key, config.salt, config.role));
|
||
let packetizer = Packetizer::new(&config);
|
||
let reassembler = Reassembler::new(ReassemblerLimits::from_config(&config));
|
||
Ok(Session {
|
||
coder,
|
||
crypto,
|
||
transport,
|
||
packetizer,
|
||
reassembler,
|
||
stats: StatsCounters::default(),
|
||
next_seq: 0,
|
||
recv_scratch: Vec::new(),
|
||
recv_lens: Vec::new(),
|
||
recv_count: 0,
|
||
recv_idx: 0,
|
||
wire_pool: Vec::new(),
|
||
config,
|
||
})
|
||
}
|
||
|
||
pub fn role(&self) -> Role {
|
||
self.config.role
|
||
}
|
||
|
||
pub fn stats(&self) -> Stats {
|
||
self.stats.snapshot()
|
||
}
|
||
|
||
/// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian
|
||
/// sequence (the receiver derives the GCM nonce from it) then the ciphertext.
|
||
/// Seal one plaintext packet into the reused `wire` buffer in place (no allocation): the wire is
|
||
/// `seq(8) || ciphertext || tag` with crypto on, or just the packet with crypto off (probe).
|
||
/// Byte-identical to the previous `seal` + concat path; `clear()` keeps the buffer's capacity.
|
||
fn seal_into(&mut self, packet: &[u8], wire: &mut Vec<u8>) -> Result<()> {
|
||
let seq = self.next_seq;
|
||
self.next_seq = self.next_seq.wrapping_add(1);
|
||
wire.clear();
|
||
match &self.crypto {
|
||
Some(c) => {
|
||
wire.extend_from_slice(&seq.to_be_bytes()); // [0..8] plaintext seq prefix
|
||
wire.extend_from_slice(packet); // [8..8+n] plaintext to encrypt
|
||
wire.resize(wire.len() + crate::crypto::TAG_LEN, 0); // tag scratch
|
||
c.seal_in_place(seq, &mut wire[8..])?; // encrypt [8..] in place, tag written at the end
|
||
}
|
||
None => wire.extend_from_slice(packet),
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// Unwrap a wire datagram back into a plaintext packet.
|
||
fn open_from_wire(&self, wire: &[u8]) -> Result<Vec<u8>> {
|
||
match &self.crypto {
|
||
Some(c) => {
|
||
if wire.len() < 8 {
|
||
return Err(PunktfunkError::BadPacket);
|
||
}
|
||
let seq = u64::from_be_bytes(wire[..8].try_into().unwrap());
|
||
c.open(seq, &wire[8..])
|
||
}
|
||
None => Ok(wire.to_vec()),
|
||
}
|
||
}
|
||
|
||
// -- Host path --------------------------------------------------------
|
||
|
||
/// Host: FEC-protect, packetize, and seal one encoded access unit into wire packets WITHOUT
|
||
/// sending them. Counts the frame + its packets/bytes as submitted; the caller transmits the
|
||
/// returned packets via [`send_sealed`](Self::send_sealed) — in one call, or in chunks paced
|
||
/// over the frame interval so a real NIC doesn't drop the whole frame as a line-rate burst (the
|
||
/// 1 Gbps+ freeze fix). The nonce counter advances per packet, in order, so seal once and send
|
||
/// the result intact. (Holding the `Vec<Vec<u8>>` also keeps the buffers alive for the batch.)
|
||
pub fn seal_frame(
|
||
&mut self,
|
||
data: &[u8],
|
||
pts_ns: u64,
|
||
user_flags: u32,
|
||
) -> Result<Vec<Vec<u8>>> {
|
||
if self.config.role != Role::Host {
|
||
return Err(PunktfunkError::InvalidArg(
|
||
"seal_frame called on a client session",
|
||
));
|
||
}
|
||
let packets = self
|
||
.packetizer
|
||
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
||
StatsCounters::add(&self.stats.frames_submitted, 1);
|
||
// Reuse the wire-buffer pool the caller returns via `reclaim_wires`: one buffer per packet,
|
||
// sealed in place — after warmup there is no per-packet ciphertext/wire allocation. (`wires`
|
||
// is a local, so `seal_into`'s `&mut self` doesn't alias the `&mut` iteration over it.)
|
||
let mut wires = std::mem::take(&mut self.wire_pool);
|
||
wires.resize_with(packets.len(), Vec::new);
|
||
for (wire, pkt) in wires.iter_mut().zip(packets.iter()) {
|
||
self.seal_into(pkt, wire)?;
|
||
}
|
||
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
|
||
StatsCounters::add(&self.stats.packets_sent, wires.len() as u64);
|
||
StatsCounters::add(&self.stats.bytes_sent, bytes);
|
||
Ok(wires)
|
||
}
|
||
|
||
/// Return the wire buffers from [`seal_frame`](Self::seal_frame) to the reuse pool once the caller
|
||
/// has finished sending them, so the next frame reseals in place with no allocation. Optional —
|
||
/// dropping the buffers instead just forfeits the reuse (correctness is unaffected).
|
||
pub fn reclaim_wires(&mut self, wires: Vec<Vec<u8>>) {
|
||
self.wire_pool = wires;
|
||
}
|
||
|
||
/// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single
|
||
/// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`)
|
||
/// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk.
|
||
pub fn send_sealed(&self, packets: &[&[u8]]) -> Result<usize> {
|
||
// GSO when enabled (UdpTransport/Linux), else sendmmsg — same short-count drop contract.
|
||
let sent = self.transport.send_gso(packets)?;
|
||
if sent < packets.len() {
|
||
StatsCounters::add(
|
||
&self.stats.packets_send_dropped,
|
||
(packets.len() - sent) as u64,
|
||
);
|
||
}
|
||
Ok(sent)
|
||
}
|
||
|
||
/// Host: FEC-protect, packetize, seal, and send one encoded access unit (the whole frame in one
|
||
/// batched send). Convenience composition of [`seal_frame`](Self::seal_frame) +
|
||
/// [`send_sealed`](Self::send_sealed) for callers that don't pace (synthetic source, probe).
|
||
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
|
||
let wires = self.seal_frame(data, pts_ns, user_flags)?;
|
||
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||
let r = self.send_sealed(&refs);
|
||
drop(refs); // release the borrow of `wires` before returning the buffers to the pool
|
||
self.reclaim_wires(wires);
|
||
r.map(|_| ())
|
||
}
|
||
|
||
/// Host: drain one pending input event from the client, if any.
|
||
pub fn poll_input(&mut self) -> Result<Option<InputEvent>> {
|
||
if self.config.role != Role::Host {
|
||
return Err(PunktfunkError::InvalidArg(
|
||
"poll_input called on a client session",
|
||
));
|
||
}
|
||
while let Some(wire) = self.transport.recv()? {
|
||
let pkt = match self.open_from_wire(&wire) {
|
||
Ok(p) => p,
|
||
Err(_) => continue, // drop undecryptable noise
|
||
};
|
||
StatsCounters::add(&self.stats.packets_received, 1);
|
||
if let Some(ev) = InputEvent::decode(&pkt) {
|
||
return Ok(Some(ev));
|
||
}
|
||
// Not an input datagram (e.g. stray video) — ignore and keep draining.
|
||
}
|
||
Ok(None)
|
||
}
|
||
|
||
// -- Client path ------------------------------------------------------
|
||
|
||
/// Client: drain the transport until a whole access unit is recovered, or no more
|
||
/// packets are pending ([`PunktfunkError::NoFrame`]).
|
||
pub fn poll_frame(&mut self) -> Result<Frame> {
|
||
if self.config.role != Role::Client {
|
||
return Err(PunktfunkError::InvalidArg(
|
||
"poll_frame called on a host session",
|
||
));
|
||
}
|
||
// Lazily allocate the recv ring on first client poll (host sessions never get here).
|
||
if self.recv_scratch.is_empty() {
|
||
// Each buffer holds a max datagram + 1 (an oversized read fills it → reassembler rejects).
|
||
self.recv_scratch = (0..RECV_BATCH)
|
||
.map(|_| vec![0u8; MAX_DATAGRAM_BYTES + 1])
|
||
.collect();
|
||
self.recv_lens = vec![0usize; RECV_BATCH];
|
||
}
|
||
loop {
|
||
// Refill the ring with one `recvmmsg` batch when the current one is drained.
|
||
if self.recv_idx >= self.recv_count {
|
||
self.recv_count = self
|
||
.transport
|
||
.recv_batch(&mut self.recv_scratch, &mut self.recv_lens)?;
|
||
self.recv_idx = 0;
|
||
if self.recv_count == 0 {
|
||
return Err(PunktfunkError::NoFrame);
|
||
}
|
||
}
|
||
let i = self.recv_idx;
|
||
self.recv_idx += 1;
|
||
let len = self.recv_lens[i];
|
||
// An oversized datagram fills the whole buffer (recvmmsg truncates + caps msg_len at the
|
||
// buffer size) — drop it rather than hand up a truncated, corrupt packet, mirroring the
|
||
// scalar `recv`'s `n >= RECV_BUF` check.
|
||
if len > MAX_DATAGRAM_BYTES {
|
||
continue;
|
||
}
|
||
let pkt = match self.open_from_wire(&self.recv_scratch[i][..len]) {
|
||
Ok(p) => p,
|
||
Err(_) => continue,
|
||
};
|
||
StatsCounters::add(&self.stats.packets_received, 1);
|
||
StatsCounters::add(&self.stats.bytes_received, pkt.len() as u64);
|
||
// The reassembler validates the packet via its parsed header (`magic`),
|
||
// ignoring anything that isn't a well-formed video packet.
|
||
if let Some(frame) = self
|
||
.reassembler
|
||
.push(&pkt, self.coder.as_ref(), &self.stats)?
|
||
{
|
||
StatsCounters::add(&self.stats.frames_completed, 1);
|
||
return Ok(frame);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Client: serialize and send one input event to the host.
|
||
pub fn send_input(&mut self, event: &InputEvent) -> Result<()> {
|
||
if self.config.role != Role::Client {
|
||
return Err(PunktfunkError::InvalidArg(
|
||
"send_input called on a host session",
|
||
));
|
||
}
|
||
let pkt = event.encode();
|
||
let mut wire = Vec::new(); // input is rare + per-event; no pool needed
|
||
self.seal_into(&pkt, &mut wire)?;
|
||
StatsCounters::add(&self.stats.packets_sent, 1);
|
||
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
|
||
if !self.transport.send(&wire)? {
|
||
StatsCounters::add(&self.stats.packets_send_dropped, 1);
|
||
}
|
||
Ok(())
|
||
}
|
||
}
|