feat(1gbps): pace per-frame sends so high-bitrate frames don't burst-drop
ci / rust (push) Has been cancelled
ci / rust (push) Has been cancelled
Increment B of the send-path rework — the actual fix for "freezes get more common over ~150 Mbps, no image at all at 400 Mbps" on the native path. Cause: the encoder emits a frame and submit_frame blasted ALL its packets at once into the NIC; a real link drops the line-rate burst (host send buffer EAGAINs), and under infinite GOP one dropped frame freezes the decode until the next keyframe. (The speed-test probe showed 0 drops at 400 Mbps because the probe is self-paced; real video wasn't.) Adaptive pacing, no extra thread, no regression: - Session splits into seal_frame (FEC + packetize + seal → wire packets, no send) and send_sealed (one batched sendmmsg of a chunk, counts drops); submit_frame is now their composition (synthetic + probe paths unchanged). - virtual_stream's paced_submit seals a frame then sends it in 16-packet chunks spread over ~90% of the time until the next frame is due. At 60 fps desktop (fast encode → lots of slack) the frame spreads across the interval → no NIC burst → no freeze. At 240 fps@5K (encode ≈ interval → ~0 slack) the budget collapses and every chunk goes out immediately → never slower than before. Core suite (34 + loopback round-trip + 6) + clippy + fmt green. The seal/send split is covered by the existing loopback tests; the pacing is host timing, verified by review (live-test needs a real NIC — your Mac at a raised bitrate). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -107,34 +107,58 @@ impl Session {
|
|||||||
|
|
||||||
// -- Host path --------------------------------------------------------
|
// -- Host path --------------------------------------------------------
|
||||||
|
|
||||||
/// Host: FEC-protect, packetize, seal, and send one encoded access unit.
|
/// Host: FEC-protect, packetize, and seal one encoded access unit into wire packets WITHOUT
|
||||||
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
|
/// sending them. Counts the frame + its packets/bytes as submitted; the caller transmits the
|
||||||
|
/// returned packets via [`send_sealed`](Self::send_sealed) — in one call, or in chunks paced
|
||||||
|
/// over the frame interval so a real NIC doesn't drop the whole frame as a line-rate burst (the
|
||||||
|
/// 1 Gbps+ freeze fix). The nonce counter advances per packet, in order, so seal once and send
|
||||||
|
/// the result intact. (Holding the `Vec<Vec<u8>>` also keeps the buffers alive for the batch.)
|
||||||
|
pub fn seal_frame(
|
||||||
|
&mut self,
|
||||||
|
data: &[u8],
|
||||||
|
pts_ns: u64,
|
||||||
|
user_flags: u32,
|
||||||
|
) -> Result<Vec<Vec<u8>>> {
|
||||||
if self.config.role != Role::Host {
|
if self.config.role != Role::Host {
|
||||||
return Err(PunktfunkError::InvalidArg(
|
return Err(PunktfunkError::InvalidArg(
|
||||||
"submit_frame called on a client session",
|
"seal_frame called on a client session",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let packets = self
|
let packets = self
|
||||||
.packetizer
|
.packetizer
|
||||||
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
|
||||||
StatsCounters::add(&self.stats.frames_submitted, 1);
|
StatsCounters::add(&self.stats.frames_submitted, 1);
|
||||||
// Seal every shard (the nonce counter advances per packet, in order), then hand the whole
|
|
||||||
// frame to the transport in ONE batched call — `sendmmsg` does ~64 packets/syscall instead
|
|
||||||
// of a `send` each, the dominant 1 Gbps+ lever. (Sealing must finish before the immutable
|
|
||||||
// `send_batch` borrow; collecting the wires also keeps them alive for the batch's iovecs.)
|
|
||||||
let mut wires: Vec<Vec<u8>> = Vec::with_capacity(packets.len());
|
let mut wires: Vec<Vec<u8>> = Vec::with_capacity(packets.len());
|
||||||
for pkt in &packets {
|
for pkt in &packets {
|
||||||
wires.push(self.seal_for_wire(pkt)?);
|
wires.push(self.seal_for_wire(pkt)?);
|
||||||
}
|
}
|
||||||
let total = wires.len();
|
|
||||||
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
|
let bytes: u64 = wires.iter().map(|w| w.len() as u64).sum();
|
||||||
StatsCounters::add(&self.stats.packets_sent, total as u64);
|
StatsCounters::add(&self.stats.packets_sent, wires.len() as u64);
|
||||||
StatsCounters::add(&self.stats.bytes_sent, bytes);
|
StatsCounters::add(&self.stats.bytes_sent, bytes);
|
||||||
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
Ok(wires)
|
||||||
let sent = self.transport.send_batch(&refs)?;
|
|
||||||
if sent < total {
|
|
||||||
StatsCounters::add(&self.stats.packets_send_dropped, (total - sent) as u64);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Host: transmit one chunk of already-[`seal_frame`](Self::seal_frame)ed packets in a single
|
||||||
|
/// batched `sendmmsg`, returning how many the kernel accepted. The rest (`packets.len() - n`)
|
||||||
|
/// are counted as send-buffer drops. Call once for the whole frame, or per paced chunk.
|
||||||
|
pub fn send_sealed(&self, packets: &[&[u8]]) -> Result<usize> {
|
||||||
|
let sent = self.transport.send_batch(packets)?;
|
||||||
|
if sent < packets.len() {
|
||||||
|
StatsCounters::add(
|
||||||
|
&self.stats.packets_send_dropped,
|
||||||
|
(packets.len() - sent) as u64,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(sent)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Host: FEC-protect, packetize, seal, and send one encoded access unit (the whole frame in one
|
||||||
|
/// batched send). Convenience composition of [`seal_frame`](Self::seal_frame) +
|
||||||
|
/// [`send_sealed`](Self::send_sealed) for callers that don't pace (synthetic source, probe).
|
||||||
|
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
|
||||||
|
let wires = self.seal_frame(data, pts_ns, user_flags)?;
|
||||||
|
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||||||
|
self.send_sealed(&refs)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1378,6 +1378,48 @@ fn service_probes(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Seal one access unit and send its packets PACED over the budget until `deadline` (the next
|
||||||
|
/// frame's due time), in 16-packet `sendmmsg` chunks — so a high-bitrate frame spreads across the
|
||||||
|
/// frame interval instead of bursting all at once into the NIC. A real link drops a line-rate burst
|
||||||
|
/// (the host send buffer EAGAINs), and under infinite GOP a single dropped frame freezes the decode
|
||||||
|
/// until the next keyframe — the cause of the "freezes over ~150 Mbps, no image at 400 Mbps"
|
||||||
|
/// symptom. When there's little/no slack (encode ≈ interval at very high fps) the budget collapses
|
||||||
|
/// to ~0 and every chunk goes out immediately, so this is never slower than the unpaced path.
|
||||||
|
fn paced_submit(
|
||||||
|
session: &mut Session,
|
||||||
|
data: &[u8],
|
||||||
|
pts_ns: u64,
|
||||||
|
flags: u32,
|
||||||
|
deadline: std::time::Instant,
|
||||||
|
) -> Result<()> {
|
||||||
|
const PACE_CHUNK: usize = 16;
|
||||||
|
let wires = session
|
||||||
|
.seal_frame(data, pts_ns, flags)
|
||||||
|
.map_err(|e| anyhow!("seal_frame: {e:?}"))?;
|
||||||
|
let refs: Vec<&[u8]> = wires.iter().map(|w| w.as_slice()).collect();
|
||||||
|
let n_chunks = refs.len().div_ceil(PACE_CHUNK).max(1);
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
// Spread sends over ~90% of the time to the deadline (10% margin for the caller's tail sleep);
|
||||||
|
// 0 when we're already at/past the deadline → no sleeps → immediate send.
|
||||||
|
let budget = deadline
|
||||||
|
.checked_duration_since(start)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.mul_f32(0.9);
|
||||||
|
for (i, chunk) in refs.chunks(PACE_CHUNK).enumerate() {
|
||||||
|
session
|
||||||
|
.send_sealed(chunk)
|
||||||
|
.map_err(|e| anyhow!("send_sealed: {e:?}"))?;
|
||||||
|
// Sleep toward this chunk's slice of the budget; skip sub-500µs waits (scheduler jitter).
|
||||||
|
let target = start + budget.mul_f64((i + 1) as f64 / n_chunks as f64);
|
||||||
|
if let Some(ahead) = target.checked_duration_since(std::time::Instant::now()) {
|
||||||
|
if ahead > std::time::Duration::from_micros(500) {
|
||||||
|
std::thread::sleep(ahead);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
||||||
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
|
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
|
||||||
///
|
///
|
||||||
@@ -1447,15 +1489,17 @@ fn virtual_stream(
|
|||||||
}
|
}
|
||||||
let capture_ns = now_ns();
|
let capture_ns = now_ns();
|
||||||
enc.submit(&frame).context("encoder submit")?;
|
enc.submit(&frame).context("encoder submit")?;
|
||||||
|
// The deadline for this frame's packets: pace the send up to here so a high-bitrate frame
|
||||||
|
// spreads over the interval instead of bursting all at once into the NIC (a real link drops
|
||||||
|
// the burst, freezing the infinite-GOP stream until the next keyframe — the 1 Gbps+ fix).
|
||||||
|
next += interval;
|
||||||
while let Some(au) = enc.poll().context("encoder poll")? {
|
while let Some(au) = enc.poll().context("encoder poll")? {
|
||||||
let flags = if au.keyframe {
|
let flags = if au.keyframe {
|
||||||
(FLAG_PIC | FLAG_SOF) as u32
|
(FLAG_PIC | FLAG_SOF) as u32
|
||||||
} else {
|
} else {
|
||||||
FLAG_PIC as u32
|
FLAG_PIC as u32
|
||||||
};
|
};
|
||||||
session
|
paced_submit(session, &au.data, capture_ns, flags, next)?;
|
||||||
.submit_frame(&au.data, capture_ns, flags)
|
|
||||||
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
|
|
||||||
sent += 1;
|
sent += 1;
|
||||||
}
|
}
|
||||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||||
@@ -1474,7 +1518,6 @@ fn virtual_stream(
|
|||||||
last_bytes = s.bytes_sent;
|
last_bytes = s.bytes_sent;
|
||||||
last_send_dropped = s.packets_send_dropped;
|
last_send_dropped = s.packets_send_dropped;
|
||||||
}
|
}
|
||||||
next += interval;
|
|
||||||
match next.checked_duration_since(std::time::Instant::now()) {
|
match next.checked_duration_since(std::time::Instant::now()) {
|
||||||
Some(d) => std::thread::sleep(d),
|
Some(d) => std::thread::sleep(d),
|
||||||
None => next = std::time::Instant::now(),
|
None => next = std::time::Instant::now(),
|
||||||
|
|||||||
Reference in New Issue
Block a user