Bigger-bet #1 from the latency plan. virtual_stream ran capture+encode+seal+ paced-send on ONE thread, so frame N+1's capture/encode couldn't start until frame N's entire paced tail had left the wire — the pacing budget (~0.9×interval) was serialized in front of the next encode. Port GameStream's spawn_sender model to the native path: - A dedicated send thread (`send_loop`) owns the WHOLE Session (so no socket clone or shared/Arc stats needed — `seal_frame` mutates the nonce, `send_sealed` + the probe bursts all live there) and does FEC+seal + microburst-paced send. - The encode thread captures+encodes + handles reconfig and hands each AU over a bounded sync_channel(3) as a FrameMsg (data, capture_ns, flags, deadline, encode_us). It BLOCKS on backpressure if the send falls behind — frames slow down rather than a dropped frame freezing the infinite-GOP stream (we don't drop). Clean shutdown: drop the channel → send thread drains/exits → join. - Probes (run_probe_burst) move to the send thread since they need the Session; a burst naturally pauses video (the encode thread blocks on the full channel). - Per-frame encode_us/pace_us histogram moved to the send thread (carries encode_us in the FrameMsg) and now reflects the overlap. Removes the encode↔paced-tail serialization (~2-8 ms @60-120 fps), independent of the pacing policy, no quality cost. Substrate for the future NVENC slice wrapper. Verified live on this box (appliance restarted onto it): a client streamed the KWin desktop (1.49 MB H.265, clean, no panic) and a 200 Mbps speed-test probe completed through the send thread (0 drops). Build + clippy + fmt green. Real-NIC sustained soak (reconfig under load, line-rate, mode switches) pending the Ubuntu third host. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+158
-67
@@ -671,15 +671,15 @@ async fn serve_session(
|
||||
let compositor = compositor
|
||||
.expect("the Virtual source resolves a compositor during the handshake");
|
||||
virtual_stream(
|
||||
&mut session,
|
||||
session,
|
||||
mode,
|
||||
seconds,
|
||||
&stop_stream,
|
||||
stop_stream,
|
||||
&reconfig_rx,
|
||||
compositor,
|
||||
bitrate_kbps,
|
||||
&probe_rx,
|
||||
&probe_result_tx,
|
||||
probe_rx,
|
||||
probe_result_tx,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1469,6 +1469,104 @@ fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 {
|
||||
sorted_or_not[i]
|
||||
}
|
||||
|
||||
/// One encoded frame handed from the capture/encode thread to the send thread (the encode|send
|
||||
/// split). The send thread does FEC+seal+paced-send while this thread captures+encodes the next.
|
||||
struct FrameMsg {
|
||||
data: Vec<u8>,
|
||||
capture_ns: u64,
|
||||
flags: u32,
|
||||
/// When this frame's packets should have fully left (the next frame's due time) = the pacing
|
||||
/// budget. In the past when the send thread is behind → immediate send (catch up).
|
||||
deadline: std::time::Instant,
|
||||
/// capture→encoded latency (µs), measured on the encode thread, carried for the perf histogram.
|
||||
encode_us: u32,
|
||||
}
|
||||
|
||||
/// The dedicated send thread: it owns the whole [`Session`] (so no socket clone or shared stats are
|
||||
/// needed) and does FEC+seal + microburst-paced send OFF the capture/encode thread, plus the
|
||||
/// speed-test probe bursts (which also need the Session). Decoupling the paced send from encoding
|
||||
/// lets the encode of frame N+1 overlap the transmit of frame N instead of waiting behind its tail.
|
||||
/// Runs until the encode thread drops the frame channel (end of stream) or `stop` is set.
|
||||
fn send_loop(
|
||||
mut session: Session,
|
||||
frame_rx: std::sync::mpsc::Receiver<FrameMsg>,
|
||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
stop: Arc<AtomicBool>,
|
||||
perf: bool,
|
||||
burst_cap: usize,
|
||||
) {
|
||||
let mut last_perf = std::time::Instant::now();
|
||||
let mut last_bytes = 0u64;
|
||||
let mut last_send_dropped = 0u64;
|
||||
let mut encode_us: Vec<u32> = Vec::new();
|
||||
let mut pace_us: Vec<u32> = Vec::new();
|
||||
let (mut paced_frames, mut immediate_frames) = (0u64, 0u64);
|
||||
loop {
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
// Probes run here (they need the Session); a burst pauses video — the encode thread blocks
|
||||
// on the full frame channel meanwhile, which is exactly the intended pause.
|
||||
service_probes(&mut session, &stop, &probe_rx, &probe_result_tx);
|
||||
// Short timeout so we keep re-checking `stop` + probes when no frames are flowing.
|
||||
match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) {
|
||||
Ok(msg) => match paced_submit(
|
||||
&mut session,
|
||||
&msg.data,
|
||||
msg.capture_ns,
|
||||
msg.flags,
|
||||
msg.deadline,
|
||||
burst_cap,
|
||||
) {
|
||||
Ok(stat) => {
|
||||
if perf {
|
||||
encode_us.push(msg.encode_us);
|
||||
pace_us.push(stat.spread_us);
|
||||
if stat.paced {
|
||||
paced_frames += 1;
|
||||
} else {
|
||||
immediate_frames += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), "send failed — stopping stream");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, // encode thread done
|
||||
}
|
||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||
let s = session.stats();
|
||||
let secs = last_perf.elapsed().as_secs_f64();
|
||||
// Attempted (sealed) transmit rate; `send_dropped` is what didn't reach the wire.
|
||||
let tx_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0;
|
||||
tracing::info!(
|
||||
tx_mbps = format!("{tx_mbps:.0}"),
|
||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||
send_dropped_total = s.packets_send_dropped,
|
||||
encode_us_p50 = percentile(&mut encode_us, 0.50),
|
||||
encode_us_p99 = percentile(&mut encode_us, 0.99),
|
||||
pace_us_p50 = percentile(&mut pace_us, 0.50),
|
||||
pace_us_p99 = percentile(&mut pace_us, 0.99),
|
||||
pace_us_max = pace_us.last().copied().unwrap_or(0),
|
||||
immediate_frames,
|
||||
paced_frames,
|
||||
"perf"
|
||||
);
|
||||
last_perf = std::time::Instant::now();
|
||||
last_bytes = s.bytes_sent;
|
||||
last_send_dropped = s.packets_send_dropped;
|
||||
encode_us.clear();
|
||||
pace_us.clear();
|
||||
paced_frames = 0;
|
||||
immediate_frames = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
|
||||
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
|
||||
///
|
||||
@@ -1479,15 +1577,15 @@ fn percentile(sorted_or_not: &mut [u32], q: f64) -> u32 {
|
||||
/// carry speed-test bursts (see [`service_probes`]).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn virtual_stream(
|
||||
session: &mut Session,
|
||||
session: Session,
|
||||
mode: punktfunk_core::Mode,
|
||||
seconds: u32,
|
||||
stop: &AtomicBool,
|
||||
stop: Arc<AtomicBool>,
|
||||
reconfig: &std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
||||
compositor: crate::vdisplay::Compositor,
|
||||
bitrate_kbps: u32,
|
||||
probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
) -> Result<()> {
|
||||
tracing::info!(
|
||||
compositor = compositor.id(),
|
||||
@@ -1499,32 +1597,43 @@ fn virtual_stream(
|
||||
let (mut capturer, mut enc, mut frame, mut interval) =
|
||||
build_pipeline_with_retry(&mut vd, mode, bitrate_kbps)?;
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||
let mut next = std::time::Instant::now();
|
||||
let mut sent: u64 = 0;
|
||||
// Throughput/drop + per-frame-latency instrumentation (PUNKTFUNK_PERF) — makes a high-bitrate
|
||||
// soak observable: wire goodput, send-buffer drops, and the per-frame encode + pacing-tail
|
||||
// distributions (so the pacing latency the microburst cap is meant to recover is visible).
|
||||
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
||||
let mut last_perf = std::time::Instant::now();
|
||||
let mut last_bytes = 0u64;
|
||||
let mut last_send_dropped = 0u64;
|
||||
let mut encode_us: Vec<u32> = Vec::new();
|
||||
let mut pace_us: Vec<u32> = Vec::new();
|
||||
let (mut paced_frames, mut immediate_frames) = (0u64, 0u64);
|
||||
// Microburst cap: a frame whose sealed size is ≤ this goes out in one immediate burst (no
|
||||
// pacing latency); only the overflow of a bigger frame (IDR / sustained high bitrate) is spread.
|
||||
// 128 KB is a conservative default (well under the ~150 Mbps@60 frame size where bursts started
|
||||
// dropping). Raise it via PUNKTFUNK_PACE_BURST_KB after confirming send_dropped stays 0 on your
|
||||
// link, to recover more pacing tail on higher-bitrate streams.
|
||||
// Microburst cap (applied in send_loop/paced_submit): a frame ≤ this bursts out immediately;
|
||||
// only a bigger frame's overflow is spread. PUNKTFUNK_PACE_BURST_KB overrides the 128 KB default.
|
||||
let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.unwrap_or(128)
|
||||
* 1024;
|
||||
|
||||
// Encode|send split: this thread captures+encodes (the GPU work) + handles reconfig, and hands
|
||||
// each AU to a dedicated send thread that owns the Session and does FEC+seal+paced-send — so the
|
||||
// encode of frame N+1 overlaps the paced transmit of frame N instead of waiting behind its tail.
|
||||
// The bounded channel applies backpressure (the encode thread blocks if the send falls behind,
|
||||
// so frames slow down rather than a dropped frame freezing the infinite-GOP stream).
|
||||
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<FrameMsg>(3);
|
||||
let send_thread = std::thread::Builder::new()
|
||||
.name("punktfunk-send".into())
|
||||
.spawn({
|
||||
let stop = stop.clone();
|
||||
move || {
|
||||
send_loop(
|
||||
session,
|
||||
frame_rx,
|
||||
probe_rx,
|
||||
probe_result_tx,
|
||||
stop,
|
||||
perf,
|
||||
burst_cap,
|
||||
)
|
||||
}
|
||||
})
|
||||
.context("spawn send thread")?;
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||
let mut next = std::time::Instant::now();
|
||||
let mut sent: u64 = 0;
|
||||
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||
// Service speed-test probes between frames (each burst pauses video for its duration).
|
||||
service_probes(session, stop, probe_rx, probe_result_tx);
|
||||
// Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once,
|
||||
// not once per stale intermediate mode.
|
||||
let mut want = None;
|
||||
@@ -1552,61 +1661,43 @@ fn virtual_stream(
|
||||
}
|
||||
let capture_ns = now_ns();
|
||||
enc.submit(&frame).context("encoder submit")?;
|
||||
// The deadline for this frame's packets: pace the send up to here so a high-bitrate frame
|
||||
// spreads over the interval instead of bursting all at once into the NIC (a real link drops
|
||||
// the burst, freezing the infinite-GOP stream until the next keyframe — the 1 Gbps+ fix).
|
||||
// The deadline for this frame's packets (the next frame's due time); the send thread paces
|
||||
// up to here so a high-bitrate frame spreads over the interval instead of bursting.
|
||||
next += interval;
|
||||
let mut send_gone = false;
|
||||
while let Some(au) = enc.poll().context("encoder poll")? {
|
||||
let flags = if au.keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
let t_encode_done = now_ns();
|
||||
let stat = paced_submit(session, &au.data, capture_ns, flags, next, burst_cap)?;
|
||||
sent += 1;
|
||||
if perf {
|
||||
encode_us.push((t_encode_done.saturating_sub(capture_ns) / 1000) as u32);
|
||||
pace_us.push(stat.spread_us);
|
||||
if stat.paced {
|
||||
paced_frames += 1;
|
||||
} else {
|
||||
immediate_frames += 1;
|
||||
}
|
||||
let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32;
|
||||
let msg = FrameMsg {
|
||||
data: au.data,
|
||||
capture_ns,
|
||||
flags,
|
||||
deadline: next,
|
||||
encode_us,
|
||||
};
|
||||
// Hand to the send thread; this blocks (backpressure) if it's behind. An Err means it
|
||||
// exited (send failure / stop) — end the encode loop too.
|
||||
if frame_tx.send(msg).is_err() {
|
||||
send_gone = true;
|
||||
break;
|
||||
}
|
||||
sent += 1;
|
||||
}
|
||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||
let s = session.stats();
|
||||
let secs = last_perf.elapsed().as_secs_f64();
|
||||
// Attempted (sealed) transmit rate; `send_dropped` below is what didn't reach the wire.
|
||||
let tx_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0;
|
||||
tracing::info!(
|
||||
tx_mbps = format!("{tx_mbps:.0}"),
|
||||
frames = sent,
|
||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||
send_dropped_total = s.packets_send_dropped,
|
||||
encode_us_p50 = percentile(&mut encode_us, 0.50),
|
||||
encode_us_p99 = percentile(&mut encode_us, 0.99),
|
||||
pace_us_p50 = percentile(&mut pace_us, 0.50),
|
||||
pace_us_p99 = percentile(&mut pace_us, 0.99),
|
||||
pace_us_max = pace_us.last().copied().unwrap_or(0),
|
||||
immediate_frames,
|
||||
paced_frames,
|
||||
"perf"
|
||||
);
|
||||
last_perf = std::time::Instant::now();
|
||||
last_bytes = s.bytes_sent;
|
||||
last_send_dropped = s.packets_send_dropped;
|
||||
encode_us.clear();
|
||||
pace_us.clear();
|
||||
paced_frames = 0;
|
||||
immediate_frames = 0;
|
||||
if send_gone {
|
||||
break;
|
||||
}
|
||||
match next.checked_duration_since(std::time::Instant::now()) {
|
||||
Some(d) => std::thread::sleep(d),
|
||||
None => next = std::time::Instant::now(),
|
||||
}
|
||||
}
|
||||
// Signal the send thread to drain + exit (drop the channel), then join it.
|
||||
drop(frame_tx);
|
||||
let _ = send_thread.join();
|
||||
tracing::info!(sent, "punktfunk/1 virtual stream complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user