From 9c86f667ca26de1643aa02303d22fe29dfe5e359 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 12 Jun 2026 23:47:38 +0000 Subject: [PATCH] perf(core): in-place AES-GCM seal + reused wire-buffer pool (host send) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The host sealed every packet with ~3 heap allocations: aes-gcm's convenience encrypt() allocates the ciphertext Vec, seal_for_wire allocates the seq||ct||tag wire Vec, and seal_frame allocated a fresh Vec> per frame. At line rate (~250k–500k pkt/s for 2.5–5 Gbps) that's the single-core allocator wall. - SessionCrypto::seal_in_place uses AeadInPlace::encrypt_in_place_detached to encrypt into the caller's buffer and write the detached tag at the end — byte-identical to seal's ciphertext||tag, no allocation (unit-tested for byte equality + decrypt). - Session keeps a wire_pool the caller returns via reclaim_wires; seal_frame seals each packet in place into the reused buffers (clear() keeps capacity), so after warmup there's no per-packet ciphertext/wire allocation. paced_submit and submit_frame reclaim the pool after sending. End-to-end encrypted/lossless multi-frame tests stay green (validates the pool reuse doesn't corrupt across frames). Next: write packetize directly into a contiguous send buffer (kills the remaining shard allocs + GSO's coalescing copy). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-core/src/crypto.rs | 43 +++++++++++++++++++++++- crates/punktfunk-core/src/session.rs | 49 ++++++++++++++++++++-------- crates/punktfunk-host/src/m3.rs | 8 ++--- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/crates/punktfunk-core/src/crypto.rs b/crates/punktfunk-core/src/crypto.rs index 2ec5d63..08c9600 100644 --- a/crates/punktfunk-core/src/crypto.rs +++ b/crates/punktfunk-core/src/crypto.rs @@ -20,7 +20,7 @@ use crate::config::Role; use crate::error::{PunktfunkError, Result}; -use aes_gcm::aead::{Aead, KeyInit, Payload}; +use aes_gcm::aead::{Aead, AeadInPlace, KeyInit, Payload}; use aes_gcm::{Aes128Gcm, Key, Nonce}; /// 16-byte AEAD authentication tag appended by GCM. @@ -60,6 +60,23 @@ impl SessionCrypto { .map_err(|_| PunktfunkError::Crypto) } + /// Seal in place, no per-packet allocation: `buf` is laid out as `[plaintext .. ][TAG_LEN]` (the + /// last `TAG_LEN` bytes are scratch); on return it holds `[ciphertext .. ][tag]` — byte-identical + /// to `seal`'s `ciphertext || tag`, just written in place. The hot-path sealer (`Session`) uses + /// this to avoid the `Vec` that `seal`'s convenience API allocates for every packet. + pub fn seal_in_place(&self, seq: u64, buf: &mut [u8]) -> Result<()> { + debug_assert!(buf.len() >= TAG_LEN); + let nonce = nonce(self.send_salt, seq); + let split = buf.len() - TAG_LEN; + let (plaintext, tag_slot) = buf.split_at_mut(split); + let tag = self + .cipher + .encrypt_in_place_detached(Nonce::from_slice(&nonce), &seq.to_be_bytes(), plaintext) + .map_err(|_| PunktfunkError::Crypto)?; + tag_slot.copy_from_slice(&tag); + Ok(()) + } + /// Open `ciphertext || tag` for sequence `seq` (also bound as associated data). pub fn open(&self, seq: u64, ciphertext: &[u8]) -> Result> { let nonce = nonce(self.recv_salt, seq); @@ -146,4 +163,28 @@ mod tests { client.seal(0, b"abc").unwrap() ); } + + #[test] + fn seal_in_place_matches_seal_and_opens() { + let key = random_key(); + let salt = random_salt(); + let host = SessionCrypto::new(&key, salt, Role::Host); + let client = SessionCrypto::new(&key, salt, Role::Client); + for msg in [ + &b""[..], + b"x", + b"the quick brown fox jumps over 13 lazy dogs!!", + ] { + let reference = host.seal(7, msg).unwrap(); // ciphertext || tag + // In-place: [plaintext .. ][TAG_LEN scratch]. + let mut buf = msg.to_vec(); + buf.resize(msg.len() + TAG_LEN, 0); + host.seal_in_place(7, &mut buf).unwrap(); + assert_eq!( + buf, reference, + "in-place seal must be byte-identical to seal" + ); + assert_eq!(client.open(7, &buf).unwrap(), msg); + } + } } diff --git a/crates/punktfunk-core/src/session.rs b/crates/punktfunk-core/src/session.rs index fc62997..a2ec1c9 100644 --- a/crates/punktfunk-core/src/session.rs +++ b/crates/punktfunk-core/src/session.rs @@ -51,6 +51,10 @@ pub struct Session { recv_lens: Vec, recv_count: usize, recv_idx: usize, + /// Host send pool: reused wire buffers (`seal_frame` seals in place into these, the caller sends + /// then returns them via [`reclaim_wires`](Self::reclaim_wires)). After warmup each buffer keeps + /// its capacity, so the per-packet ciphertext + wire `Vec` allocations vanish from the hot path. + wire_pool: Vec>, } /// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k @@ -78,6 +82,7 @@ impl Session { recv_lens: Vec::new(), recv_count: 0, recv_idx: 0, + wire_pool: Vec::new(), config, }) } @@ -92,19 +97,23 @@ impl Session { /// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian /// sequence (the receiver derives the GCM nonce from it) then the ciphertext. - fn seal_for_wire(&mut self, packet: &[u8]) -> Result> { + /// Seal one plaintext packet into the reused `wire` buffer in place (no allocation): the wire is + /// `seq(8) || ciphertext || tag` with crypto on, or just the packet with crypto off (probe). + /// Byte-identical to the previous `seal` + concat path; `clear()` keeps the buffer's capacity. + fn seal_into(&mut self, packet: &[u8], wire: &mut Vec) -> Result<()> { let seq = self.next_seq; self.next_seq = self.next_seq.wrapping_add(1); + wire.clear(); match &self.crypto { Some(c) => { - let ct = c.seal(seq, packet)?; - let mut wire = Vec::with_capacity(8 + ct.len()); - wire.extend_from_slice(&seq.to_be_bytes()); - wire.extend_from_slice(&ct); - Ok(wire) + wire.extend_from_slice(&seq.to_be_bytes()); // [0..8] plaintext seq prefix + wire.extend_from_slice(packet); // [8..8+n] plaintext to encrypt + wire.resize(wire.len() + crate::crypto::TAG_LEN, 0); // tag scratch + c.seal_in_place(seq, &mut wire[8..])?; // encrypt [8..] in place, tag written at the end } - None => Ok(packet.to_vec()), + None => wire.extend_from_slice(packet), } + Ok(()) } /// Unwrap a wire datagram back into a plaintext packet. @@ -144,9 +153,13 @@ impl Session { .packetizer .packetize(data, pts_ns, user_flags, self.coder.as_ref())?; StatsCounters::add(&self.stats.frames_submitted, 1); - let mut wires: Vec> = Vec::with_capacity(packets.len()); - for pkt in &packets { - wires.push(self.seal_for_wire(pkt)?); + // Reuse the wire-buffer pool the caller returns via `reclaim_wires`: one buffer per packet, + // sealed in place — after warmup there is no per-packet ciphertext/wire allocation. (`wires` + // is a local, so `seal_into`'s `&mut self` doesn't alias the `&mut` iteration over it.) + let mut wires = std::mem::take(&mut self.wire_pool); + wires.resize_with(packets.len(), Vec::new); + for (wire, pkt) in wires.iter_mut().zip(packets.iter()) { + self.seal_into(pkt, wire)?; } let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum(); StatsCounters::add(&self.stats.packets_sent, wires.len() as u64); @@ -154,6 +167,13 @@ impl Session { Ok(wires) } + /// Return the wire buffers from [`seal_frame`](Self::seal_frame) to the reuse pool once the caller + /// has finished sending them, so the next frame reseals in place with no allocation. Optional — + /// dropping the buffers instead just forfeits the reuse (correctness is unaffected). + pub fn reclaim_wires(&mut self, wires: Vec>) { + self.wire_pool = wires; + } + /// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single /// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`) /// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk. @@ -175,8 +195,10 @@ impl Session { pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> { let wires = self.seal_frame(data, pts_ns, user_flags)?; let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect(); - self.send_sealed(&refs)?; - Ok(()) + let r = self.send_sealed(&refs); + drop(refs); // release the borrow of `wires` before returning the buffers to the pool + self.reclaim_wires(wires); + r.map(|_| ()) } /// Host: drain one pending input event from the client, if any. @@ -264,7 +286,8 @@ impl Session { )); } let pkt = event.encode(); - let wire = self.seal_for_wire(&pkt)?; + let mut wire = Vec::new(); // input is rare + per-event; no pool needed + self.seal_into(&pkt, &mut wire)?; StatsCounters::add(&self.stats.packets_sent, 1); StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64); if !self.transport.send(&wire)? { diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 0f58f10..7f08008 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1588,10 +1588,10 @@ fn paced_submit( } } } - Ok(PaceStat { - spread_us: start.elapsed().as_micros() as u32, - paced, - }) + let spread_us = start.elapsed().as_micros() as u32; + drop(refs); // release the borrow of `wires` so it can return to the seal pool + session.reclaim_wires(wires); + Ok(PaceStat { spread_us, paced }) } /// Percentile of a slice (sorts it in place first). `q` in 0.0..=1.0.