Files
punktfunk/crates/punktfunk-core/src/session.rs
T
enricobuehler 9c86f667ca
ci / web (push) Failing after 39s
ci / docs-site (push) Failing after 33s
apple / swift (push) Successful in 1m16s
ci / rust (push) Successful in 1m20s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 5s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
deb / build-publish (push) Successful in 3m3s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (push) Successful in 4m35s
perf(core): in-place AES-GCM seal + reused wire-buffer pool (host send)
The host sealed every packet with ~3 heap allocations: aes-gcm's convenience
encrypt() allocates the ciphertext Vec, seal_for_wire allocates the seq||ct||tag
wire Vec, and seal_frame allocated a fresh Vec<Vec<u8>> per frame. At line rate
(~250k–500k pkt/s for 2.5–5 Gbps) that's the single-core allocator wall.

- SessionCrypto::seal_in_place uses AeadInPlace::encrypt_in_place_detached to
  encrypt into the caller's buffer and write the detached tag at the end —
  byte-identical to seal's ciphertext||tag, no allocation (unit-tested for byte
  equality + decrypt).
- Session keeps a wire_pool the caller returns via reclaim_wires; seal_frame
  seals each packet in place into the reused buffers (clear() keeps capacity), so
  after warmup there's no per-packet ciphertext/wire allocation. paced_submit and
  submit_frame reclaim the pool after sending.

End-to-end encrypted/lossless multi-frame tests stay green (validates the pool
reuse doesn't corrupt across frames). Next: write packetize directly into a
contiguous send buffer (kills the remaining shard allocs + GSO's coalescing copy).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 23:47:38 +00:00

299 lines
13 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Session lifecycle and the two hot-path state machines.
//!
//! - **Host** ([`Session::submit_frame`]): encoded access unit → FEC + packetize →
//! optional AES-GCM seal → transport send.
//! - **Client** ([`Session::poll_frame`]): transport recv → optional open → reorder +
//! FEC recover + reassemble → whole access unit.
//!
//! Both directions also carry input: a client [`Session::send_input`]s events; the host
//! drains them with [`Session::poll_input`].
use crate::config::{Config, Role};
use crate::crypto::SessionCrypto;
use crate::error::{PunktfunkError, Result};
use crate::fec::{coder_for, ErasureCoder};
use crate::input::InputEvent;
use crate::packet::{Packetizer, Reassembler, ReassemblerLimits, MAX_DATAGRAM_BYTES};
use crate::stats::{Stats, StatsCounters};
use crate::transport::Transport;
/// A reassembled, FEC-recovered access unit, ready to hand to the platform decoder.
pub struct Frame {
pub data: Vec<u8>,
pub frame_index: u32,
pub pts_ns: u64,
pub flags: u32,
}
/// One end of a stream. Constructed for a single [`Role`]; calling the other role's
/// methods returns [`PunktfunkError::InvalidArg`].
///
/// Note: the AEAD layer authenticates each datagram but does **not** provide anti-replay.
/// Video replays are largely absorbed by the reassembler's per-frame dedup, but replayed
/// input events are not yet filtered. A sliding-window replay filter keyed on the
/// authenticated sequence belongs with the pairing/handshake layer (M2); until then,
/// rely on the LAN/VPN transport assumption (plan §1).
pub struct Session {
config: Config,
coder: Box<dyn ErasureCoder>,
crypto: Option<SessionCrypto>,
transport: Box<dyn Transport>,
packetizer: Packetizer,
reassembler: Reassembler,
stats: StatsCounters,
/// Monotonic wire sequence, also the AES-GCM nonce counter.
next_seq: u64,
/// Client recv ring (reused across [`poll_frame`](Self::poll_frame)): `recvmmsg` drains a batch
/// of datagrams into `recv_scratch` in one syscall, and poll_frame consumes them one at a time
/// across calls (`recv_idx`..`recv_count`), refilling when drained. Allocated lazily on the
/// first client poll so host sessions don't carry it. No per-packet recv alloc at line rate.
recv_scratch: Vec<Vec<u8>>,
recv_lens: Vec<usize>,
recv_count: usize,
recv_idx: usize,
/// Host send pool: reused wire buffers (`seal_frame` seals in place into these, the caller sends
/// then returns them via [`reclaim_wires`](Self::reclaim_wires)). After warmup each buffer keeps
/// its capacity, so the per-packet ciphertext + wire `Vec` allocations vanish from the hot path.
wire_pool: Vec<Vec<u8>>,
}
/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k
/// pkt/s this is ~4k syscalls/s instead of 125k; the buffers cost `RECV_BATCH × RECV_BUF` (~64 KB).
const RECV_BATCH: usize = 32;
impl Session {
pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> {
config.validate()?;
let coder = coder_for(config.fec.scheme);
let crypto = config
.encrypt
.then(|| SessionCrypto::new(&config.key, config.salt, config.role));
let packetizer = Packetizer::new(&config);
let reassembler = Reassembler::new(ReassemblerLimits::from_config(&config));
Ok(Session {
coder,
crypto,
transport,
packetizer,
reassembler,
stats: StatsCounters::default(),
next_seq: 0,
recv_scratch: Vec::new(),
recv_lens: Vec::new(),
recv_count: 0,
recv_idx: 0,
wire_pool: Vec::new(),
config,
})
}
pub fn role(&self) -> Role {
self.config.role
}
pub fn stats(&self) -> Stats {
self.stats.snapshot()
}
/// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian
/// sequence (the receiver derives the GCM nonce from it) then the ciphertext.
/// Seal one plaintext packet into the reused `wire` buffer in place (no allocation): the wire is
/// `seq(8) || ciphertext || tag` with crypto on, or just the packet with crypto off (probe).
/// Byte-identical to the previous `seal` + concat path; `clear()` keeps the buffer's capacity.
fn seal_into(&mut self, packet: &[u8], wire: &mut Vec<u8>) -> Result<()> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
wire.clear();
match &self.crypto {
Some(c) => {
wire.extend_from_slice(&seq.to_be_bytes()); // [0..8] plaintext seq prefix
wire.extend_from_slice(packet); // [8..8+n] plaintext to encrypt
wire.resize(wire.len() + crate::crypto::TAG_LEN, 0); // tag scratch
c.seal_in_place(seq, &mut wire[8..])?; // encrypt [8..] in place, tag written at the end
}
None => wire.extend_from_slice(packet),
}
Ok(())
}
/// Unwrap a wire datagram back into a plaintext packet.
fn open_from_wire(&self, wire: &[u8]) -> Result<Vec<u8>> {
match &self.crypto {
Some(c) => {
if wire.len() < 8 {
return Err(PunktfunkError::BadPacket);
}
let seq = u64::from_be_bytes(wire[..8].try_into().unwrap());
c.open(seq, &wire[8..])
}
None => Ok(wire.to_vec()),
}
}
// -- Host path --------------------------------------------------------
/// Host: FEC-protect, packetize, and seal one encoded access unit into wire packets WITHOUT
/// sending them. Counts the frame + its packets/bytes as submitted; the caller transmits the
/// returned packets via [`send_sealed`](Self::send_sealed) — in one call, or in chunks paced
/// over the frame interval so a real NIC doesn't drop the whole frame as a line-rate burst (the
/// 1 Gbps+ freeze fix). The nonce counter advances per packet, in order, so seal once and send
/// the result intact. (Holding the `Vec<Vec<u8>>` also keeps the buffers alive for the batch.)
pub fn seal_frame(
&mut self,
data: &[u8],
pts_ns: u64,
user_flags: u32,
) -> Result<Vec<Vec<u8>>> {
if self.config.role != Role::Host {
return Err(PunktfunkError::InvalidArg(
"seal_frame called on a client session",
));
}
let packets = self
.packetizer
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
StatsCounters::add(&self.stats.frames_submitted, 1);
// Reuse the wire-buffer pool the caller returns via `reclaim_wires`: one buffer per packet,
// sealed in place — after warmup there is no per-packet ciphertext/wire allocation. (`wires`
// is a local, so `seal_into`'s `&mut self` doesn't alias the `&mut` iteration over it.)
let mut wires = std::mem::take(&mut self.wire_pool);
wires.resize_with(packets.len(), Vec::new);
for (wire, pkt) in wires.iter_mut().zip(packets.iter()) {
self.seal_into(pkt, wire)?;
}
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
StatsCounters::add(&self.stats.packets_sent, wires.len() as u64);
StatsCounters::add(&self.stats.bytes_sent, bytes);
Ok(wires)
}
/// Return the wire buffers from [`seal_frame`](Self::seal_frame) to the reuse pool once the caller
/// has finished sending them, so the next frame reseals in place with no allocation. Optional —
/// dropping the buffers instead just forfeits the reuse (correctness is unaffected).
pub fn reclaim_wires(&mut self, wires: Vec<Vec<u8>>) {
self.wire_pool = wires;
}
/// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single
/// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`)
/// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk.
pub fn send_sealed(&self, packets: &[&[u8]]) -> Result<usize> {
// GSO when enabled (UdpTransport/Linux), else sendmmsg — same short-count drop contract.
let sent = self.transport.send_gso(packets)?;
if sent < packets.len() {
StatsCounters::add(
&self.stats.packets_send_dropped,
(packets.len() - sent) as u64,
);
}
Ok(sent)
}
/// Host: FEC-protect, packetize, seal, and send one encoded access unit (the whole frame in one
/// batched send). Convenience composition of [`seal_frame`](Self::seal_frame) +
/// [`send_sealed`](Self::send_sealed) for callers that don't pace (synthetic source, probe).
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
let wires = self.seal_frame(data, pts_ns, user_flags)?;
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
let r = self.send_sealed(&refs);
drop(refs); // release the borrow of `wires` before returning the buffers to the pool
self.reclaim_wires(wires);
r.map(|_| ())
}
/// Host: drain one pending input event from the client, if any.
pub fn poll_input(&mut self) -> Result<Option<InputEvent>> {
if self.config.role != Role::Host {
return Err(PunktfunkError::InvalidArg(
"poll_input called on a client session",
));
}
while let Some(wire) = self.transport.recv()? {
let pkt = match self.open_from_wire(&wire) {
Ok(p) => p,
Err(_) => continue, // drop undecryptable noise
};
StatsCounters::add(&self.stats.packets_received, 1);
if let Some(ev) = InputEvent::decode(&pkt) {
return Ok(Some(ev));
}
// Not an input datagram (e.g. stray video) — ignore and keep draining.
}
Ok(None)
}
// -- Client path ------------------------------------------------------
/// Client: drain the transport until a whole access unit is recovered, or no more
/// packets are pending ([`PunktfunkError::NoFrame`]).
pub fn poll_frame(&mut self) -> Result<Frame> {
if self.config.role != Role::Client {
return Err(PunktfunkError::InvalidArg(
"poll_frame called on a host session",
));
}
// Lazily allocate the recv ring on first client poll (host sessions never get here).
if self.recv_scratch.is_empty() {
// Each buffer holds a max datagram + 1 (an oversized read fills it → reassembler rejects).
self.recv_scratch = (0..RECV_BATCH)
.map(|_| vec![0u8; MAX_DATAGRAM_BYTES + 1])
.collect();
self.recv_lens = vec![0usize; RECV_BATCH];
}
loop {
// Refill the ring with one `recvmmsg` batch when the current one is drained.
if self.recv_idx >= self.recv_count {
self.recv_count = self
.transport
.recv_batch(&mut self.recv_scratch, &mut self.recv_lens)?;
self.recv_idx = 0;
if self.recv_count == 0 {
return Err(PunktfunkError::NoFrame);
}
}
let i = self.recv_idx;
self.recv_idx += 1;
let len = self.recv_lens[i];
// An oversized datagram fills the whole buffer (recvmmsg truncates + caps msg_len at the
// buffer size) — drop it rather than hand up a truncated, corrupt packet, mirroring the
// scalar `recv`'s `n >= RECV_BUF` check.
if len > MAX_DATAGRAM_BYTES {
continue;
}
let pkt = match self.open_from_wire(&self.recv_scratch[i][..len]) {
Ok(p) => p,
Err(_) => continue,
};
StatsCounters::add(&self.stats.packets_received, 1);
StatsCounters::add(&self.stats.bytes_received, pkt.len() as u64);
// The reassembler validates the packet via its parsed header (`magic`),
// ignoring anything that isn't a well-formed video packet.
if let Some(frame) = self
.reassembler
.push(&pkt, self.coder.as_ref(), &self.stats)?
{
StatsCounters::add(&self.stats.frames_completed, 1);
return Ok(frame);
}
}
}
/// Client: serialize and send one input event to the host.
pub fn send_input(&mut self, event: &InputEvent) -> Result<()> {
if self.config.role != Role::Client {
return Err(PunktfunkError::InvalidArg(
"send_input called on a host session",
));
}
let pkt = event.encode();
let mut wire = Vec::new(); // input is rare + per-event; no pool needed
self.seal_into(&pkt, &mut wire)?;
StatsCounters::add(&self.stats.packets_sent, 1);
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
if !self.transport.send(&wire)? {
StatsCounters::add(&self.stats.packets_send_dropped, 1);
}
Ok(())
}
}