perf(core): in-place AES-GCM seal + reused wire-buffer pool (host send)
ci / web (push) Failing after 39s
ci / docs-site (push) Failing after 33s
apple / swift (push) Successful in 1m16s
ci / rust (push) Successful in 1m20s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 5s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
deb / build-publish (push) Successful in 3m3s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (push) Successful in 4m35s

The host sealed every packet with ~3 heap allocations: aes-gcm's convenience
encrypt() allocates the ciphertext Vec, seal_for_wire allocates the seq||ct||tag
wire Vec, and seal_frame allocated a fresh Vec<Vec<u8>> per frame. At line rate
(~250k–500k pkt/s for 2.5–5 Gbps) that's the single-core allocator wall.

- SessionCrypto::seal_in_place uses AeadInPlace::encrypt_in_place_detached to
  encrypt into the caller's buffer and write the detached tag at the end —
  byte-identical to seal's ciphertext||tag, no allocation (unit-tested for byte
  equality + decrypt).
- Session keeps a wire_pool the caller returns via reclaim_wires; seal_frame
  seals each packet in place into the reused buffers (clear() keeps capacity), so
  after warmup there's no per-packet ciphertext/wire allocation. paced_submit and
  submit_frame reclaim the pool after sending.

End-to-end encrypted/lossless multi-frame tests stay green (validates the pool
reuse doesn't corrupt across frames). Next: write packetize directly into a
contiguous send buffer (kills the remaining shard allocs + GSO's coalescing copy).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-12 23:47:38 +00:00
parent 448986f41c
commit 9c86f667ca
3 changed files with 82 additions and 18 deletions
+42 -1
View File
@@ -20,7 +20,7 @@
use crate::config::Role;
use crate::error::{PunktfunkError, Result};
use aes_gcm::aead::{Aead, KeyInit, Payload};
use aes_gcm::aead::{Aead, AeadInPlace, KeyInit, Payload};
use aes_gcm::{Aes128Gcm, Key, Nonce};
/// 16-byte AEAD authentication tag appended by GCM.
@@ -60,6 +60,23 @@ impl SessionCrypto {
.map_err(|_| PunktfunkError::Crypto)
}
/// Seal in place, no per-packet allocation: `buf` is laid out as `[plaintext .. ][TAG_LEN]` (the
/// last `TAG_LEN` bytes are scratch); on return it holds `[ciphertext .. ][tag]` — byte-identical
/// to `seal`'s `ciphertext || tag`, just written in place. The hot-path sealer (`Session`) uses
/// this to avoid the `Vec` that `seal`'s convenience API allocates for every packet.
pub fn seal_in_place(&self, seq: u64, buf: &mut [u8]) -> Result<()> {
debug_assert!(buf.len() >= TAG_LEN);
let nonce = nonce(self.send_salt, seq);
let split = buf.len() - TAG_LEN;
let (plaintext, tag_slot) = buf.split_at_mut(split);
let tag = self
.cipher
.encrypt_in_place_detached(Nonce::from_slice(&nonce), &seq.to_be_bytes(), plaintext)
.map_err(|_| PunktfunkError::Crypto)?;
tag_slot.copy_from_slice(&tag);
Ok(())
}
/// Open `ciphertext || tag` for sequence `seq` (also bound as associated data).
pub fn open(&self, seq: u64, ciphertext: &[u8]) -> Result<Vec<u8>> {
let nonce = nonce(self.recv_salt, seq);
@@ -146,4 +163,28 @@ mod tests {
client.seal(0, b"abc").unwrap()
);
}
#[test]
fn seal_in_place_matches_seal_and_opens() {
let key = random_key();
let salt = random_salt();
let host = SessionCrypto::new(&key, salt, Role::Host);
let client = SessionCrypto::new(&key, salt, Role::Client);
for msg in [
&b""[..],
b"x",
b"the quick brown fox jumps over 13 lazy dogs!!",
] {
let reference = host.seal(7, msg).unwrap(); // ciphertext || tag
// In-place: [plaintext .. ][TAG_LEN scratch].
let mut buf = msg.to_vec();
buf.resize(msg.len() + TAG_LEN, 0);
host.seal_in_place(7, &mut buf).unwrap();
assert_eq!(
buf, reference,
"in-place seal must be byte-identical to seal"
);
assert_eq!(client.open(7, &buf).unwrap(), msg);
}
}
}
+36 -13
View File
@@ -51,6 +51,10 @@ pub struct Session {
recv_lens: Vec<usize>,
recv_count: usize,
recv_idx: usize,
/// Host send pool: reused wire buffers (`seal_frame` seals in place into these, the caller sends
/// then returns them via [`reclaim_wires`](Self::reclaim_wires)). After warmup each buffer keeps
/// its capacity, so the per-packet ciphertext + wire `Vec` allocations vanish from the hot path.
wire_pool: Vec<Vec<u8>>,
}
/// Datagrams drained per `recvmmsg` syscall on the client (the reused ring's size). At ~125k
@@ -78,6 +82,7 @@ impl Session {
recv_lens: Vec::new(),
recv_count: 0,
recv_idx: 0,
wire_pool: Vec::new(),
config,
})
}
@@ -92,19 +97,23 @@ impl Session {
/// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian
/// sequence (the receiver derives the GCM nonce from it) then the ciphertext.
fn seal_for_wire(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
/// Seal one plaintext packet into the reused `wire` buffer in place (no allocation): the wire is
/// `seq(8) || ciphertext || tag` with crypto on, or just the packet with crypto off (probe).
/// Byte-identical to the previous `seal` + concat path; `clear()` keeps the buffer's capacity.
fn seal_into(&mut self, packet: &[u8], wire: &mut Vec<u8>) -> Result<()> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
wire.clear();
match &self.crypto {
Some(c) => {
let ct = c.seal(seq, packet)?;
let mut wire = Vec::with_capacity(8 + ct.len());
wire.extend_from_slice(&seq.to_be_bytes());
wire.extend_from_slice(&ct);
Ok(wire)
wire.extend_from_slice(&seq.to_be_bytes()); // [0..8] plaintext seq prefix
wire.extend_from_slice(packet); // [8..8+n] plaintext to encrypt
wire.resize(wire.len() + crate::crypto::TAG_LEN, 0); // tag scratch
c.seal_in_place(seq, &mut wire[8..])?; // encrypt [8..] in place, tag written at the end
}
None => Ok(packet.to_vec()),
None => wire.extend_from_slice(packet),
}
Ok(())
}
/// Unwrap a wire datagram back into a plaintext packet.
@@ -144,9 +153,13 @@ impl Session {
.packetizer
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
StatsCounters::add(&self.stats.frames_submitted, 1);
let mut wires: Vec<Vec<u8>> = Vec::with_capacity(packets.len());
for pkt in &packets {
wires.push(self.seal_for_wire(pkt)?);
// Reuse the wire-buffer pool the caller returns via `reclaim_wires`: one buffer per packet,
// sealed in place — after warmup there is no per-packet ciphertext/wire allocation. (`wires`
// is a local, so `seal_into`'s `&mut self` doesn't alias the `&mut` iteration over it.)
let mut wires = std::mem::take(&mut self.wire_pool);
wires.resize_with(packets.len(), Vec::new);
for (wire, pkt) in wires.iter_mut().zip(packets.iter()) {
self.seal_into(pkt, wire)?;
}
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
StatsCounters::add(&self.stats.packets_sent, wires.len() as u64);
@@ -154,6 +167,13 @@ impl Session {
Ok(wires)
}
/// Return the wire buffers from [`seal_frame`](Self::seal_frame) to the reuse pool once the caller
/// has finished sending them, so the next frame reseals in place with no allocation. Optional —
/// dropping the buffers instead just forfeits the reuse (correctness is unaffected).
pub fn reclaim_wires(&mut self, wires: Vec<Vec<u8>>) {
self.wire_pool = wires;
}
/// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single
/// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`)
/// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk.
@@ -175,8 +195,10 @@ impl Session {
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
let wires = self.seal_frame(data, pts_ns, user_flags)?;
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
self.send_sealed(&refs)?;
Ok(())
let r = self.send_sealed(&refs);
drop(refs); // release the borrow of `wires` before returning the buffers to the pool
self.reclaim_wires(wires);
r.map(|_| ())
}
/// Host: drain one pending input event from the client, if any.
@@ -264,7 +286,8 @@ impl Session {
));
}
let pkt = event.encode();
let wire = self.seal_for_wire(&pkt)?;
let mut wire = Vec::new(); // input is rare + per-event; no pool needed
self.seal_into(&pkt, &mut wire)?;
StatsCounters::add(&self.stats.packets_sent, 1);
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
if !self.transport.send(&wire)? {