feat(punktfunk/1): negotiable encoder bitrate + bandwidth speed-test probe
ci / rust (push) Has been cancelled

Two related additions to the native protocol, host-side (the client side of
each is exposed over the C ABI so the platform clients can wire it up).

Bitrate negotiation
- Hello/Welcome carry `bitrate_kbps` (appended trailing-byte field, back-compat:
  old peers decode 0 = host default). The client requests a rate; the host
  clamps it to [500 kbps, 500 Mbps] (or its 20 Mbps default when 0) and echoes
  the resolved value in Welcome. Replaces the hardcoded 20 Mbps NVENC bitrate in
  m3.rs — threaded through virtual_stream → build_pipeline → open_video, applied
  on the initial mode and every reconfigure rebuild.
- C ABI: punktfunk_connect_ex3(..., bitrate_kbps, ...) (ex2 delegates with 0);
  punktfunk_connection_bitrate() reads the resolved value.

Speed test (bandwidth probe)
- New typed control messages ProbeRequest{target_kbps,duration_ms} (0x20) /
  ProbeResult{bytes_sent,packets_sent,duration_ms} (0x21), plus a FLAG_PROBE
  packet flag. The client asks the host to burst zero-filled, FLAG_PROBE-tagged
  access units over the data plane at a target goodput for a duration (clamped
  ≤ 1 Gbps / ≤ 5 s), pacing by a bytes-allowed budget; video pauses for the
  burst. The host reports what it actually sent; the client measures received
  bytes + window → goodput and loss. Probe filler is never fed to the decoder
  (diverted in the connector pump and the reference client's poll loop).
- The host control task now multiplexes Reconfigure + ProbeRequest (inbound)
  and ProbeResult (outbound) over select!; a probe channel reaches the
  data-plane thread (both virtual and synthetic sources).
- Connector: NativeClient::request_probe()/probe_result() with an internal
  accumulator; C ABI punktfunk_connection_speed_test() +
  punktfunk_connection_probe_result() → PunktfunkProbeResult.
- punktfunk-client-rs gains `--bitrate KBPS` and `--speed-test KBPS:MS` (its own
  loop measures + logs goodput/loss) for loopback verification.

Validated on loopback (synthetic source): a 20 Mbps / 2 s probe measured
20050 kbps at 0% loss, bitrate negotiated (0→20000 and 50000→50000), and the
interleaved probe AUs were correctly excluded from frame verification
(mismatched=0). Wire codecs + trailing-byte back-compat have unit tests. C
header regenerated.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-11 18:44:47 +00:00
parent dcb2850c7c
commit 74819b1be8
7 changed files with 906 additions and 89 deletions
+106 -2
View File
@@ -39,7 +39,10 @@ use anyhow::{anyhow, Context, Result};
use punktfunk_core::config::GamepadPref;
use punktfunk_core::config::Role;
use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::quic::{endpoint, io, Hello, Reconfigure, Reconfigured, Start, Welcome};
use punktfunk_core::packet::FLAG_PROBE;
use punktfunk_core::quic::{
endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
};
use punktfunk_core::transport::UdpTransport;
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
use std::io::Write;
@@ -67,6 +70,11 @@ struct Args {
compositor: CompositorPref,
/// `--gamepad NAME` — request a host virtual-pad backend (auto|xbox360|dualsense).
gamepad: GamepadPref,
/// `--bitrate KBPS` — request this encoder bitrate (kilobits/s); 0 = host default.
bitrate_kbps: u32,
/// `--speed-test KBPS:MS` — after the stream starts, ask the host for a `MS`-millisecond
/// bandwidth probe burst at `KBPS`, then report measured throughput + loss.
speed_test: Option<(u32, u32)>,
}
fn parse_mode(m: &str) -> Option<Mode> {
@@ -178,6 +186,11 @@ fn parse_args() -> Args {
name: get("--name").unwrap_or("punktfunk-client-rs").to_string(),
compositor,
gamepad,
bitrate_kbps: get("--bitrate").and_then(|s| s.parse().ok()).unwrap_or(0),
speed_test: get("--speed-test").and_then(|s| {
let (kbps, ms) = s.split_once(':')?;
Some((kbps.parse().ok()?, ms.parse().ok()?))
}),
}
}
@@ -263,6 +276,7 @@ async fn session(args: Args) -> Result<()> {
mode: args.mode,
compositor: args.compositor,
gamepad: args.gamepad,
bitrate_kbps: args.bitrate_kbps,
}
.encode(),
)
@@ -292,9 +306,18 @@ async fn session(args: Args) -> Result<()> {
)
.await?;
// Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the
// --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold
// now_ns timestamps of the receive window (0 = unset).
let probe_recv_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let probe_recv_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let probe_first_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let probe_last_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
// Mid-stream renegotiation test: after a delay, ask the host to switch modes on the
// still-open control stream. The stream then carries new-mode AUs (IDR + in-band
// parameter sets) — ffprobe the --out file to see both resolutions.
// parameter sets) — ffprobe the --out file to see both resolutions. Mutually exclusive with
// --speed-test (both own the control stream).
if let Some((new_mode, after_secs)) = args.remode {
let mut rs = send;
let mut rr = recv;
@@ -319,6 +342,70 @@ async fn session(args: Args) -> Result<()> {
other => tracing::error!(?other, "bad Reconfigured"),
}
});
} else if let Some((target_kbps, duration_ms)) = args.speed_test {
// Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler;
// measure what arrives vs. what it reports sending.
let mut ss = send;
let mut sr = recv;
let (pb, pp, pf, pl) = (
probe_recv_bytes.clone(),
probe_recv_packets.clone(),
probe_first_ns.clone(),
probe_last_ns.clone(),
);
tokio::spawn(async move {
use std::sync::atomic::Ordering::Relaxed;
tokio::time::sleep(std::time::Duration::from_secs(2)).await; // let the stream warm up
tracing::info!(target_kbps, duration_ms, "requesting speed-test probe");
if io::write_msg(
&mut ss,
&ProbeRequest {
target_kbps,
duration_ms,
}
.encode(),
)
.await
.is_err()
{
tracing::error!("ProbeRequest write failed");
return;
}
let res = match io::read_msg(&mut sr).await.map(|b| ProbeResult::decode(&b)) {
Ok(Ok(r)) => r,
other => {
tracing::error!(?other, "bad ProbeResult");
return;
}
};
// The reliable result can beat the last UDP shards — let them reassemble.
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
let recv_bytes = pb.load(Relaxed);
let recv_packets = pp.load(Relaxed);
let (first, last) = (pf.load(Relaxed), pl.load(Relaxed));
let window_ms = if first > 0 && last > first {
(last - first) / 1_000_000
} else {
0
};
let throughput_kbps = recv_bytes.saturating_mul(8).checked_div(window_ms).unwrap_or(0);
let loss_pct = if res.bytes_sent > 0 {
res.bytes_sent.saturating_sub(recv_bytes) as f64 / res.bytes_sent as f64 * 100.0
} else {
0.0
};
tracing::info!(
target_kbps,
host_sent_bytes = res.bytes_sent,
host_sent_packets = res.packets_sent,
recv_bytes,
recv_packets,
window_ms,
throughput_kbps,
loss_pct = format!("{loss_pct:.1}%"),
"SPEED TEST complete",
);
});
}
// Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the
@@ -581,6 +668,12 @@ async fn session(args: Args) -> Result<()> {
let cfg = welcome.session_config(Role::Client);
let expected = welcome.frames;
let out_path = args.out.clone();
let (pb, pp, pf, pl) = (
probe_recv_bytes.clone(),
probe_recv_packets.clone(),
probe_first_ns.clone(),
probe_last_ns.clone(),
);
// Data plane on a blocking thread (native threads only on the frame path).
let result = tokio::task::spawn_blocking(move || -> Result<()> {
@@ -619,6 +712,17 @@ async fn session(args: Args) -> Result<()> {
match session.poll_frame() {
Ok(frame) => {
last_rx = std::time::Instant::now();
// Speed-test filler isn't video: fold it into the probe accumulators and skip
// verification / the --out sink.
if frame.flags & FLAG_PROBE as u32 != 0 {
use std::sync::atomic::Ordering::Relaxed;
let n = now_ns();
let _ = pf.compare_exchange(0, n, Relaxed, Relaxed);
pl.store(n, Relaxed);
pb.fetch_add(frame.data.len() as u64, Relaxed);
pp.fetch_add(1, Relaxed);
continue;
}
bytes += frame.data.len() as u64;
// The host stamps pts with its capture wall clock; same-host runs share it.
let lat = now_ns().saturating_sub(frame.pts_ns);
+156
View File
@@ -748,6 +748,50 @@ pub unsafe extern "C" fn punktfunk_connect_ex2(
client_cert_pem: *const std::os::raw::c_char,
client_key_pem: *const std::os::raw::c_char,
timeout_ms: u32,
) -> *mut PunktfunkConnection {
unsafe {
punktfunk_connect_ex3(
host,
port,
width,
height,
refresh_hz,
compositor,
gamepad,
0, // bitrate_kbps = 0: let the host pick its default
pin_sha256,
observed_sha256_out,
client_cert_pem,
client_key_pem,
timeout_ms,
)
}
}
/// Like [`punktfunk_connect_ex2`], but additionally requests the video encoder `bitrate_kbps`
/// (kilobits per second). `0` lets the host pick its default; any other value is clamped to the
/// host's supported range. After a speed test ([`punktfunk_connection_speed_test`]) a client can
/// reconnect (or pick at connect time) with the measured rate. The value the host actually
/// configured is readable via [`punktfunk_connection_bitrate`].
///
/// # Safety
/// Same as [`punktfunk_connect`].
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn punktfunk_connect_ex3(
host: *const std::os::raw::c_char,
port: u16,
width: u32,
height: u32,
refresh_hz: u32,
compositor: u32,
gamepad: u32,
bitrate_kbps: u32,
pin_sha256: *const u8,
observed_sha256_out: *mut u8,
client_cert_pem: *const std::os::raw::c_char,
client_key_pem: *const std::os::raw::c_char,
timeout_ms: u32,
) -> *mut PunktfunkConnection {
let r = std::panic::catch_unwind(AssertUnwindSafe(|| {
if host.is_null() {
@@ -790,6 +834,7 @@ pub unsafe extern "C" fn punktfunk_connect_ex2(
mode,
pref,
gamepad,
bitrate_kbps,
pin,
identity,
std::time::Duration::from_millis(timeout_ms as u64),
@@ -1245,6 +1290,32 @@ pub unsafe extern "C" fn punktfunk_connection_gamepad(
})
}
/// The video encoder bitrate (kilobits per second) the host actually configured for this session
/// — the [`punktfunk_connect_ex3`] request clamped to the host's range, or its default when `0`
/// was requested. `0` = an older host that didn't report it. Safe any time after connect.
///
/// # Safety
/// `c` is a valid connection handle; `bitrate_kbps` is writable (NULL is skipped).
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn punktfunk_connection_bitrate(
c: *const PunktfunkConnection,
bitrate_kbps: *mut u32,
) -> PunktfunkStatus {
guard(|| {
let c = match unsafe { c.as_ref() } {
Some(c) => c,
None => return PunktfunkStatus::NullPointer,
};
unsafe {
if !bitrate_kbps.is_null() {
*bitrate_kbps = c.inner.resolved_bitrate_kbps;
}
}
PunktfunkStatus::Ok
})
}
/// Ask the host to switch the live session to `width`x`height`@`refresh_hz` without
/// reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the
/// stream continues at the new mode — the first new-mode access unit is an IDR with
@@ -1278,6 +1349,91 @@ pub unsafe extern "C" fn punktfunk_connection_request_mode(
})
}
/// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until
/// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the
/// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate.
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
pub struct PunktfunkProbeResult {
/// 1 once the host's end-of-burst report arrived (measurement final); else 0 (partial).
pub done: u8,
/// Probe payload bytes / packets the client received.
pub recv_bytes: u64,
pub recv_packets: u32,
/// Probe payload bytes / packets the host reported sending.
pub host_bytes: u64,
pub host_packets: u32,
/// Client-measured receive window (first→last probe AU), milliseconds.
pub elapsed_ms: u32,
/// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second).
pub throughput_kbps: u32,
/// Delivery loss `(host_bytes - recv_bytes) / host_bytes` as a percentage (0 if unknown).
pub loss_pct: f32,
}
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
/// `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 1 Gbps / ≤ 5 s),
/// *briefly pausing video*. Non-blocking — poll [`punktfunk_connection_probe_result`] until its
/// `done` field is 1. Starting a probe resets any prior measurement.
///
/// # Safety
/// `c` is a valid connection handle.
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn punktfunk_connection_speed_test(
c: *const PunktfunkConnection,
target_kbps: u32,
duration_ms: u32,
) -> PunktfunkStatus {
guard(|| {
let c = match unsafe { c.as_ref() } {
Some(c) => c,
None => return PunktfunkStatus::NullPointer,
};
match c.inner.request_probe(target_kbps, duration_ms) {
Ok(()) => PunktfunkStatus::Ok,
Err(e) => e.status(),
}
})
}
/// Read the current speed-test measurement into `*out` (partial until `out->done == 1`). Safe to
/// poll repeatedly after [`punktfunk_connection_speed_test`]; before any probe it reports zeros.
///
/// # Safety
/// `c` is a valid connection handle; `out` is writable for one `PunktfunkProbeResult` (NULL is an
/// error).
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn punktfunk_connection_probe_result(
c: *const PunktfunkConnection,
out: *mut PunktfunkProbeResult,
) -> PunktfunkStatus {
guard(|| {
let c = match unsafe { c.as_ref() } {
Some(c) => c,
None => return PunktfunkStatus::NullPointer,
};
if out.is_null() {
return PunktfunkStatus::NullPointer;
}
let o = c.inner.probe_result();
unsafe {
*out = PunktfunkProbeResult {
done: o.done as u8,
recv_bytes: o.recv_bytes,
recv_packets: o.recv_packets,
host_bytes: o.host_bytes,
host_packets: o.host_packets,
elapsed_ms: o.elapsed_ms,
throughput_kbps: o.throughput_kbps,
loss_pct: o.loss_pct,
};
}
PunktfunkStatus::Ok
})
}
/// Close the connection and free the handle (joins the internal threads). NULL is a no-op.
///
/// # Safety
+209 -43
View File
@@ -14,15 +14,70 @@
use crate::config::{CompositorPref, GamepadPref, Mode, Role};
use crate::error::{PunktfunkError, Result};
use crate::input::InputEvent;
use crate::packet::FLAG_PROBE;
use crate::quic::{
endpoint, io, Hello, HidOutput, Reconfigure, Reconfigured, RichInput, Start, Welcome,
endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured,
RichInput, Start, Welcome,
};
use crate::session::{Frame, Session};
use crate::transport::UdpTransport;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
use std::sync::Arc;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
/// A control-stream request the embedder makes on the open handshake stream: a mode switch or a
/// speed test. One outbound channel carries both so the worker's `select!` has a single writer
/// (two `&mut ctrl_send` borrows across select branches don't compile).
enum CtrlRequest {
Mode(Mode),
Probe(ProbeRequest),
}
/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated
/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, and the resolved
/// encoder bitrate (kbps).
type Negotiated = (Mode, GamepadPref, [u8; 32], u32);
/// Accumulated state of an in-flight / finished speed test. The data-plane pump folds each
/// received [`FLAG_PROBE`] access unit in; the control task records the host's [`ProbeResult`]
/// when it lands. Read (and finalized into numbers) by [`NativeClient::probe_result`].
#[derive(Default)]
struct ProbeState {
/// A probe is in progress (set by `request_probe`, cleared by nothing — the latest one wins).
active: bool,
/// Probe access-unit payload bytes the client received, and their count.
recv_bytes: u64,
recv_packets: u32,
/// First/last probe AU arrival — the measured receive window.
start: Option<Instant>,
last: Option<Instant>,
/// The host's report ([`ProbeResult`]); present once the burst finished.
host_bytes: u64,
host_packets: u32,
/// The host's `ProbeResult` arrived → the measurement is final.
done: bool,
}
/// A finished/partial speed-test measurement, returned by [`NativeClient::probe_result`].
#[derive(Clone, Copy, Debug, Default)]
pub struct ProbeOutcome {
/// The host's end-of-burst report has arrived — the numbers below are final.
pub done: bool,
/// Probe payload bytes / packets the client received.
pub recv_bytes: u64,
pub recv_packets: u32,
/// Probe payload bytes / packets the host reported sending.
pub host_bytes: u64,
pub host_packets: u32,
/// The client-measured receive window (first→last probe AU), in milliseconds.
pub elapsed_ms: u32,
/// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). This is the figure to
/// drive a [`Hello::bitrate_kbps`] choice from.
pub throughput_kbps: u32,
/// Delivery loss = `(host_bytes - recv_bytes) / host_bytes`, as a percentage (0 if unknown).
pub loss_pct: f32,
}
/// Frames buffered between the data-plane pump and the embedder. Small: the embedder
/// (decoder) should drain at frame rate; when it falls behind, the newest frame is dropped
@@ -62,7 +117,10 @@ pub struct NativeClient {
mic_tx: tokio::sync::mpsc::UnboundedSender<(u32, u64, Vec<u8>)>,
/// Outbound rich input (DualSense touchpad / motion) → 0xCC datagrams by the worker.
rich_input_tx: tokio::sync::mpsc::UnboundedSender<RichInput>,
reconfig_tx: tokio::sync::mpsc::UnboundedSender<Mode>,
/// Outbound control-stream requests (mode switch, speed test) → the worker's control task.
ctrl_tx: tokio::sync::mpsc::UnboundedSender<CtrlRequest>,
/// Speed-test accumulator, shared with the data-plane pump + control task.
probe: Arc<Mutex<ProbeState>>,
shutdown: Arc<AtomicBool>,
worker: Option<std::thread::JoinHandle<()>>,
/// The currently active session mode (the Welcome's, then updated by every accepted
@@ -74,6 +132,10 @@ pub struct NativeClient {
/// The virtual gamepad backend the host actually resolved ([`Welcome::gamepad`]).
/// `Auto` = an older host that didn't say (assume X-Box 360, no DualSense feedback).
pub resolved_gamepad: GamepadPref,
/// The encoder bitrate the host actually configured ([`Welcome::bitrate_kbps`], kbps): our
/// requested rate clamped to the host's range, or its default if we requested `0`. `0` = an
/// older host that didn't report it.
pub resolved_bitrate_kbps: u32,
}
impl NativeClient {
@@ -94,6 +156,7 @@ impl NativeClient {
mode: Mode,
compositor: CompositorPref,
gamepad: GamepadPref,
bitrate_kbps: u32,
pin: Option<[u8; 32]>,
identity: Option<(String, String)>,
timeout: Duration,
@@ -105,15 +168,17 @@ impl NativeClient {
let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::<InputEvent>();
let (mic_tx, mic_rx) = tokio::sync::mpsc::unbounded_channel::<(u32, u64, Vec<u8>)>();
let (rich_input_tx, rich_input_rx) = tokio::sync::mpsc::unbounded_channel::<RichInput>();
let (reconfig_tx, reconfig_rx) = tokio::sync::mpsc::unbounded_channel::<Mode>();
let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel::<CtrlRequest>();
let (ready_tx, ready_rx) =
std::sync::mpsc::channel::<Result<(Mode, GamepadPref, [u8; 32])>>();
std::sync::mpsc::channel::<Result<Negotiated>>();
let shutdown = Arc::new(AtomicBool::new(false));
let mode_slot = Arc::new(std::sync::Mutex::new(mode));
let probe = Arc::new(Mutex::new(ProbeState::default()));
let host = host.to_string();
let shutdown_w = shutdown.clone();
let mode_slot_w = mode_slot.clone();
let probe_w = probe.clone();
let worker = std::thread::Builder::new()
.name("punktfunk-client".into())
.spawn(move || {
@@ -134,6 +199,7 @@ impl NativeClient {
mode,
compositor,
gamepad,
bitrate_kbps,
pin,
identity,
frame_tx,
@@ -143,22 +209,24 @@ impl NativeClient {
input_rx,
mic_rx,
rich_input_rx,
reconfig_rx,
ctrl_rx,
ready_tx,
shutdown: shutdown_w,
mode_slot: mode_slot_w,
probe: probe_w,
}));
})
.map_err(PunktfunkError::Io)?;
let (negotiated, resolved_gamepad, fingerprint) = match ready_rx.recv_timeout(timeout) {
Ok(Ok(t)) => t,
Ok(Err(e)) => return Err(e),
Err(_) => {
shutdown.store(true, Ordering::SeqCst);
return Err(PunktfunkError::Timeout);
}
};
let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps) =
match ready_rx.recv_timeout(timeout) {
Ok(Ok(t)) => t,
Ok(Err(e)) => return Err(e),
Err(_) => {
shutdown.store(true, Ordering::SeqCst);
return Err(PunktfunkError::Timeout);
}
};
*mode_slot.lock().unwrap() = negotiated;
Ok(NativeClient {
frames: frame_rx,
@@ -168,12 +236,14 @@ impl NativeClient {
input_tx,
mic_tx,
rich_input_tx,
reconfig_tx,
ctrl_tx,
probe,
shutdown,
worker: Some(worker),
mode: mode_slot,
host_fingerprint: fingerprint,
resolved_gamepad,
resolved_bitrate_kbps,
})
}
@@ -280,11 +350,61 @@ impl NativeClient {
/// frames open with an IDR carrying new parameter sets) and [`NativeClient::mode`]
/// reflects it. A rejected request leaves the session unchanged.
pub fn request_mode(&self, mode: Mode) -> Result<()> {
self.reconfig_tx
.send(mode)
self.ctrl_tx
.send(CtrlRequest::Mode(mode))
.map_err(|_| PunktfunkError::Closed)
}
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
/// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the
/// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its
/// `done` flag is set. Starting a probe resets any prior measurement. The host clamps both
/// fields (≤ 1 Gbps, ≤ 5 s).
pub fn request_probe(&self, target_kbps: u32, duration_ms: u32) -> Result<()> {
// Reset the accumulator so a fresh run doesn't blend into the previous one.
*self.probe.lock().unwrap() = ProbeState {
active: true,
..Default::default()
};
self.ctrl_tx
.send(CtrlRequest::Probe(ProbeRequest {
target_kbps,
duration_ms,
}))
.map_err(|_| PunktfunkError::Closed)
}
/// Read the current speed-test measurement (partial until `done`, final once the host's
/// end-of-burst report lands). Derives goodput + loss from the accumulated probe bytes.
pub fn probe_result(&self) -> ProbeOutcome {
let p = self.probe.lock().unwrap();
let elapsed_ms = match (p.start, p.last) {
(Some(s), Some(l)) => l.duration_since(s).as_millis() as u32,
_ => 0,
};
// bytes × 8 / ms = kilobits/second.
let throughput_kbps = if elapsed_ms > 0 {
(p.recv_bytes.saturating_mul(8) / elapsed_ms as u64) as u32
} else {
0
};
let loss_pct = if p.host_bytes > 0 {
p.host_bytes.saturating_sub(p.recv_bytes) as f64 / p.host_bytes as f64 * 100.0
} else {
0.0
} as f32;
ProbeOutcome {
done: p.done,
recv_bytes: p.recv_bytes,
recv_packets: p.recv_packets,
host_bytes: p.host_bytes,
host_packets: p.host_packets,
elapsed_ms,
throughput_kbps,
loss_pct,
}
}
/// Pull the next reassembled, FEC-recovered access unit; [`PunktfunkError::NoFrame`] on
/// timeout, [`PunktfunkError::Closed`]-class errors once the session ended.
///
@@ -373,6 +493,7 @@ struct WorkerArgs {
mode: Mode,
compositor: CompositorPref,
gamepad: GamepadPref,
bitrate_kbps: u32,
pin: Option<[u8; 32]>,
identity: Option<(String, String)>,
frame_tx: SyncSender<Frame>,
@@ -382,10 +503,11 @@ struct WorkerArgs {
input_rx: tokio::sync::mpsc::UnboundedReceiver<InputEvent>,
mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec<u8>)>,
rich_input_rx: tokio::sync::mpsc::UnboundedReceiver<RichInput>,
reconfig_rx: tokio::sync::mpsc::UnboundedReceiver<Mode>,
ready_tx: std::sync::mpsc::Sender<Result<(Mode, GamepadPref, [u8; 32])>>,
ctrl_rx: tokio::sync::mpsc::UnboundedReceiver<CtrlRequest>,
ready_tx: std::sync::mpsc::Sender<Result<Negotiated>>,
shutdown: Arc<AtomicBool>,
mode_slot: Arc<std::sync::Mutex<Mode>>,
probe: Arc<Mutex<ProbeState>>,
}
/// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking
@@ -397,6 +519,7 @@ async fn worker_main(args: WorkerArgs) {
mode,
compositor,
gamepad,
bitrate_kbps,
pin,
identity,
frame_tx,
@@ -406,10 +529,11 @@ async fn worker_main(args: WorkerArgs) {
mut input_rx,
mut mic_rx,
mut rich_input_rx,
mut reconfig_rx,
mut ctrl_rx,
ready_tx,
shutdown,
mode_slot,
probe,
} = args;
let setup = async {
let remote: std::net::SocketAddr = format!("{host}:{port}")
@@ -448,6 +572,7 @@ async fn worker_main(args: WorkerArgs) {
mode,
compositor,
gamepad,
bitrate_kbps,
}
.encode(),
)
@@ -491,6 +616,7 @@ async fn worker_main(args: WorkerArgs) {
welcome.mode,
welcome.gamepad,
fingerprint,
welcome.bitrate_kbps,
))
};
@@ -502,6 +628,7 @@ async fn worker_main(args: WorkerArgs) {
negotiated,
resolved_gamepad,
fingerprint,
resolved_bitrate_kbps,
) = match setup.await {
Ok(t) => t,
Err(e) => {
@@ -509,7 +636,12 @@ async fn worker_main(args: WorkerArgs) {
return;
}
};
let _ = ready_tx.send(Ok((negotiated, resolved_gamepad, fingerprint)));
let _ = ready_tx.send(Ok((
negotiated,
resolved_gamepad,
fingerprint,
resolved_bitrate_kbps,
)));
// Input task: embedder events → QUIC datagrams.
let input_conn = conn.clone();
@@ -536,30 +668,50 @@ async fn worker_main(args: WorkerArgs) {
}
});
// Control task: the handshake stream stays open for mid-stream renegotiation. One
// request at a time — write Reconfigure, await Reconfigured, publish the active mode.
// Control task: the handshake stream stays open for mid-stream renegotiation + speed tests.
// Outbound requests (mode switch, probe) and inbound replies (Reconfigured, ProbeResult) are
// multiplexed with `select!`; a single outbound channel (`ctrl_rx`) keeps one writer so the
// two `&mut ctrl_send` borrows don't collide across branches.
{
let mode_slot = mode_slot.clone();
let probe = probe.clone();
tokio::spawn(async move {
while let Some(want) = reconfig_rx.recv().await {
if io::write_msg(&mut ctrl_send, &Reconfigure { mode: want }.encode())
.await
.is_err()
{
break;
}
let ack = match io::read_msg(&mut ctrl_recv).await {
Ok(b) => match Reconfigured::decode(&b) {
Ok(a) => a,
Err(_) => break, // protocol error — stop renegotiating
},
Err(_) => break, // stream closed
};
if ack.accepted {
*mode_slot.lock().unwrap() = ack.mode;
tracing::info!(mode = ?ack.mode, "host accepted mode switch");
} else {
tracing::warn!(requested = ?want, active = ?ack.mode, "host rejected mode switch");
loop {
tokio::select! {
req = ctrl_rx.recv() => {
let Some(req) = req else { break }; // client dropped
let bytes = match req {
CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(),
CtrlRequest::Probe(p) => p.encode(),
};
if io::write_msg(&mut ctrl_send, &bytes).await.is_err() {
break;
}
}
msg = io::read_msg(&mut ctrl_recv) => {
let Ok(msg) = msg else { break }; // stream closed
if let Ok(ack) = Reconfigured::decode(&msg) {
if ack.accepted {
*mode_slot.lock().unwrap() = ack.mode;
tracing::info!(mode = ?ack.mode, "host accepted mode switch");
} else {
tracing::warn!(active = ?ack.mode, "host rejected mode switch");
}
} else if let Ok(result) = ProbeResult::decode(&msg) {
let mut p = probe.lock().unwrap();
p.host_bytes = result.bytes_sent;
p.host_packets = result.packets_sent;
p.done = true;
tracing::info!(
bytes_sent = result.bytes_sent,
packets_sent = result.packets_sent,
duration_ms = result.duration_ms,
"speed-test probe result"
);
} else {
tracing::warn!("unknown control message — ignoring");
}
}
}
}
});
@@ -607,11 +759,25 @@ async fn worker_main(args: WorkerArgs) {
// Data-plane pump on a blocking thread: poll the session, hand frames to the embedder.
// try_send drops the newest frame when the embedder lags (freshness over completeness).
// Speed-test filler ([`FLAG_PROBE`]) is folded into the probe accumulator instead of the
// decoder queue — it isn't video.
let pump_shutdown = shutdown.clone();
let pump_probe = probe.clone();
let _ = tokio::task::spawn_blocking(move || {
while !pump_shutdown.load(Ordering::SeqCst) {
match session.poll_frame() {
Ok(frame) => {
if frame.flags & FLAG_PROBE as u32 != 0 {
let mut p = pump_probe.lock().unwrap();
if p.active {
let now = Instant::now();
p.start.get_or_insert(now);
p.last = Some(now);
p.recv_bytes += frame.data.len() as u64;
p.recv_packets += 1;
}
continue; // not video — never enqueue for the decoder
}
let _ = frame_tx.try_send(frame);
}
Err(PunktfunkError::NoFrame) => {
+4
View File
@@ -30,6 +30,10 @@ pub const PUNKTFUNK_MAGIC: u8 = 0xC9;
pub const FLAG_PIC: u8 = 0x1;
pub const FLAG_EOF: u8 = 0x2;
pub const FLAG_SOF: u8 = 0x4;
/// Bandwidth-probe filler, not decodable video: a [`crate::quic::ProbeRequest`] speed test makes
/// the host burst access units carrying this flag so the client measures throughput/loss without
/// feeding them to the decoder. Punktfunk/1 only (GameStream never sets it).
pub const FLAG_PROBE: u8 = 0x8;
/// Crypto framing overhead [`Session`](crate::session::Session) adds when encrypting:
/// an 8-byte sequence prefix plus the GCM tag.
+142 -4
View File
@@ -52,6 +52,11 @@ pub struct Hello {
/// [`Welcome::gamepad`]. Appended to the wire form — omitted by older clients (decodes
/// to `Auto`).
pub gamepad: GamepadPref,
/// The client's desired video encoder bitrate, in kilobits per second. `0` = no preference
/// (the host uses its default). The host clamps the request to a supported range and reports
/// the value it actually configured in [`Welcome::bitrate_kbps`]. Appended to the wire form —
/// omitted by older clients (decodes to `0`, i.e. host default).
pub bitrate_kbps: u32,
}
/// `host → client`: the complete session offer.
@@ -77,6 +82,11 @@ pub struct Welcome {
/// DualSense feedback (0xCD) can arrive at all. Appended to the wire form — `Auto` when an
/// older host omitted it (i.e. "unknown, assume X-Box 360").
pub gamepad: GamepadPref,
/// The encoder bitrate the host actually configured for this session, in kilobits per second
/// (the client's [`Hello::bitrate_kbps`] clamped to the host's supported range, or the host
/// default when the client requested `0`). Appended to the wire form — `0` when an older host
/// omitted it (i.e. "unknown").
pub bitrate_kbps: u32,
}
/// `client → host`: data plane is bound, begin streaming.
@@ -107,10 +117,41 @@ pub struct Reconfigured {
pub mode: Mode,
}
/// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts
/// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at
/// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then
/// replies with [`ProbeResult`]. The client measures the received probe bytes + time to estimate
/// the link's sustainable rate (and the loss vs. the host's reported send count) so it can pick a
/// [`Hello::bitrate_kbps`]. The host clamps both fields to sane bounds.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ProbeRequest {
/// Goodput rate the host should send the probe at, in kilobits per second.
pub target_kbps: u32,
/// How long to burst, in milliseconds.
pub duration_ms: u32,
}
/// `host → client`: the probe burst is finished. Reports what the host actually sent so the
/// client can compute delivery ratio (loss) = `received / bytes_sent` and throughput =
/// `received_bytes * 8 / elapsed`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ProbeResult {
/// Total access-unit payload bytes the host emitted for the probe.
pub bytes_sent: u64,
/// Number of probe access units the host emitted.
pub packets_sent: u32,
/// The burst's actual duration in milliseconds (the host clamps/measures the request).
pub duration_ms: u32,
}
/// Type byte of [`Reconfigure`] (first byte after the magic).
pub const MSG_RECONFIGURE: u8 = 0x01;
/// Type byte of [`Reconfigured`].
pub const MSG_RECONFIGURED: u8 = 0x02;
/// Type byte of [`ProbeRequest`].
pub const MSG_PROBE_REQUEST: u8 = 0x20;
/// Type byte of [`ProbeResult`].
pub const MSG_PROBE_RESULT: u8 = 0x21;
// ---------------------------------------------------------------------------------------------
// Pairing ceremony (typed control messages): instead of a session Hello, a client may open
@@ -379,6 +420,7 @@ impl Hello {
b.extend_from_slice(&self.mode.refresh_hz.to_le_bytes());
b.push(self.compositor.to_u8()); // appended at offset 20 — older hosts read [0..20] and skip it
b.push(self.gamepad.to_u8()); // appended at offset 21 — same back-compat discipline
b.extend_from_slice(&self.bitrate_kbps.to_le_bytes()); // appended at offset 22..26
b
}
@@ -403,6 +445,11 @@ impl Hello {
.get(21)
.map(|&v| GamepadPref::from_u8(v))
.unwrap_or_default(),
// Optional trailing 4 bytes (LE) — absent on an older client → `0` (host default).
bitrate_kbps: b
.get(22..26)
.map(|s| u32::from_le_bytes(s.try_into().unwrap()))
.unwrap_or(0),
})
}
}
@@ -429,13 +476,15 @@ impl Welcome {
b.extend_from_slice(&self.frames.to_le_bytes());
b.push(self.compositor.to_u8()); // appended at offset 53 — older clients read [0..53] and skip it
b.push(self.gamepad.to_u8()); // appended at offset 54 — same back-compat discipline
b.extend_from_slice(&self.bitrate_kbps.to_le_bytes()); // appended at offset 55..59
b
}
pub fn decode(b: &[u8]) -> Result<Welcome> {
// Layout (LE): magic[0..4] abi[4..8] port[8..10] w[10..14] h[14..18] hz[18..22]
// scheme[22] pct[23] max_data[24..26] shard[26..28] encrypt[28] key[29..45]
// salt[45..49] frames[49..53] compositor[53] gamepad[54] (optional trailing bytes).
// salt[45..49] frames[49..53] compositor[53] gamepad[54] bitrate_kbps[55..59]
// (compositor/gamepad/bitrate are optional trailing bytes).
if b.len() < 53 || &b[0..4] != MAGIC {
return Err(PunktfunkError::InvalidArg("bad Welcome"));
}
@@ -477,6 +526,11 @@ impl Welcome {
.get(54)
.map(|&v| GamepadPref::from_u8(v))
.unwrap_or_default(),
// Optional trailing 4 bytes (LE) — absent on an older host → `0` (unknown).
bitrate_kbps: b
.get(55..59)
.map(|s| u32::from_le_bytes(s.try_into().unwrap()))
.unwrap_or(0),
})
}
@@ -567,6 +621,53 @@ impl Reconfigured {
}
}
impl ProbeRequest {
pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13]
let mut b = Vec::with_capacity(13);
b.extend_from_slice(CTL_MAGIC);
b.push(MSG_PROBE_REQUEST);
b.extend_from_slice(&self.target_kbps.to_le_bytes());
b.extend_from_slice(&self.duration_ms.to_le_bytes());
b
}
pub fn decode(b: &[u8]) -> Result<ProbeRequest> {
if b.len() != 13 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_REQUEST {
return Err(PunktfunkError::InvalidArg("bad ProbeRequest"));
}
let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]);
Ok(ProbeRequest {
target_kbps: u32at(5),
duration_ms: u32at(9),
})
}
}
impl ProbeResult {
pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] bytes_sent[5..13] packets_sent[13..17] duration_ms[17..21]
let mut b = Vec::with_capacity(21);
b.extend_from_slice(CTL_MAGIC);
b.push(MSG_PROBE_RESULT);
b.extend_from_slice(&self.bytes_sent.to_le_bytes());
b.extend_from_slice(&self.packets_sent.to_le_bytes());
b.extend_from_slice(&self.duration_ms.to_le_bytes());
b
}
pub fn decode(b: &[u8]) -> Result<ProbeResult> {
if b.len() != 21 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_RESULT {
return Err(PunktfunkError::InvalidArg("bad ProbeResult"));
}
Ok(ProbeResult {
bytes_sent: u64::from_le_bytes(b[5..13].try_into().unwrap()),
packets_sent: u32::from_le_bytes(b[13..17].try_into().unwrap()),
duration_ms: u32::from_le_bytes(b[17..21].try_into().unwrap()),
})
}
}
/// Frame a message for the control stream: `u16 LE length || payload`.
pub fn frame(payload: &[u8]) -> Vec<u8> {
let mut b = Vec::with_capacity(2 + payload.len());
@@ -1149,6 +1250,7 @@ mod tests {
frames: 600,
compositor: CompositorPref::Gamescope,
gamepad: GamepadPref::DualSense,
bitrate_kbps: 50_000,
};
assert_eq!(Welcome::decode(&w.encode()).unwrap(), w);
}
@@ -1164,6 +1266,7 @@ mod tests {
},
compositor: CompositorPref::Kwin,
gamepad: GamepadPref::DualSense,
bitrate_kbps: 25_000,
};
assert_eq!(Hello::decode(&h.encode()).unwrap(), h);
let s = Start {
@@ -1227,18 +1330,26 @@ mod tests {
},
compositor: CompositorPref::Mutter,
gamepad: GamepadPref::DualSense,
bitrate_kbps: 80_000,
};
let enc = h.encode();
assert_eq!(enc.len(), 22);
// Legacy (20-byte) Hello → both Auto, mode intact.
assert_eq!(enc.len(), 26);
// Legacy (20-byte) Hello → both Auto, no bitrate, mode intact.
let legacy = Hello::decode(&enc[..20]).unwrap();
assert_eq!(legacy.compositor, CompositorPref::Auto);
assert_eq!(legacy.gamepad, GamepadPref::Auto);
assert_eq!(legacy.bitrate_kbps, 0);
assert_eq!(legacy.mode, h.mode);
// Compositor-era (21-byte) Hello → compositor intact, gamepad Auto.
let mid = Hello::decode(&enc[..21]).unwrap();
assert_eq!(mid.compositor, CompositorPref::Mutter);
assert_eq!(mid.gamepad, GamepadPref::Auto);
// Gamepad-era (22-byte) Hello → compositor + gamepad intact, bitrate 0 (host default).
let pre_bitrate = Hello::decode(&enc[..22]).unwrap();
assert_eq!(pre_bitrate.gamepad, GamepadPref::DualSense);
assert_eq!(pre_bitrate.bitrate_kbps, 0);
// Full message → bitrate intact.
assert_eq!(Hello::decode(&enc).unwrap().bitrate_kbps, 80_000);
let w = Welcome {
abi_version: 2,
@@ -1256,17 +1367,24 @@ mod tests {
frames: 0,
compositor: CompositorPref::Kwin,
gamepad: GamepadPref::Xbox360,
bitrate_kbps: 120_000,
};
let wenc = w.encode();
assert_eq!(wenc.len(), 55);
assert_eq!(wenc.len(), 59);
let legacy_w = Welcome::decode(&wenc[..53]).unwrap();
assert_eq!(legacy_w.compositor, CompositorPref::Auto);
assert_eq!(legacy_w.gamepad, GamepadPref::Auto);
assert_eq!(legacy_w.bitrate_kbps, 0);
assert_eq!(legacy_w.frames, 0);
assert_eq!(legacy_w.key, w.key);
let mid_w = Welcome::decode(&wenc[..54]).unwrap();
assert_eq!(mid_w.compositor, CompositorPref::Kwin);
assert_eq!(mid_w.gamepad, GamepadPref::Auto);
// Gamepad-era (55-byte) Welcome → gamepad intact, bitrate 0 (unknown).
let pre_bitrate_w = Welcome::decode(&wenc[..55]).unwrap();
assert_eq!(pre_bitrate_w.gamepad, GamepadPref::Xbox360);
assert_eq!(pre_bitrate_w.bitrate_kbps, 0);
assert_eq!(Welcome::decode(&wenc).unwrap().bitrate_kbps, 120_000);
}
#[test]
@@ -1297,6 +1415,25 @@ mod tests {
.is_err());
}
#[test]
fn probe_messages_roundtrip() {
let req = ProbeRequest {
target_kbps: 250_000,
duration_ms: 2000,
};
assert_eq!(ProbeRequest::decode(&req.encode()).unwrap(), req);
let res = ProbeResult {
bytes_sent: 62_500_000,
packets_sent: 480,
duration_ms: 2003,
};
assert_eq!(ProbeResult::decode(&res.encode()).unwrap(), res);
// Type bytes keep the control messages disjoint from each other.
assert!(ProbeRequest::decode(&res.encode()).is_err());
assert!(Reconfigure::decode(&req.encode()).is_err());
assert!(ProbeResult::decode(&req.encode()).is_err());
}
#[test]
fn control_messages_disjoint_from_hello() {
// A Hello uses MAGIC (PKF1); control messages use CTL_MAGIC (PKFc). No Hello — at
@@ -1311,6 +1448,7 @@ mod tests {
},
compositor: CompositorPref::Auto,
gamepad: GamepadPref::Auto,
bitrate_kbps: 0,
}
.encode();
assert!(PairRequest::decode(&h).is_err(), "abi {abi} parsed as pair");
+196 -40
View File
@@ -25,10 +25,10 @@
use anyhow::{anyhow, Context, Result};
use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref, Role};
use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::{FLAG_PIC, FLAG_SOF};
use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF};
use punktfunk_core::quic::{
endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, Reconfigure,
Reconfigured, Start, Welcome,
endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, ProbeRequest,
ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
};
use punktfunk_core::transport::UdpTransport;
use punktfunk_core::Session;
@@ -228,6 +228,27 @@ pub(crate) async fn serve(opts: M3Options, np: Arc<NativePairing>) -> Result<()>
/// connects and never finishes the handshake would otherwise wedge the host for everyone.
const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
/// Encoder bitrate (kbps) the host falls back to when the client expresses no preference
/// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its
/// link (e.g. after a speed test) requests an explicit rate instead.
const DEFAULT_BITRATE_KBPS: u32 = 20_000;
/// Bounds a client's requested bitrate to a sane range before configuring NVENC: a 500 kbps floor
/// keeps the stream above unusable, and a 500 Mbps ceiling guards against an absurd request
/// exhausting the encoder / link (GF(2¹⁶) FEC lifts the old ~1 Gbps wall, but 500 Mbps already
/// covers 5K@240). Resolved value is echoed in `Welcome::bitrate_kbps`.
const MIN_BITRATE_KBPS: u32 = 500;
const MAX_BITRATE_KBPS: u32 = 500_000;
/// Resolve a client's [`Hello::bitrate_kbps`] request to the rate the host will configure:
/// `0` → host default; anything else clamped into `[MIN, MAX]`.
fn resolve_bitrate_kbps(requested: u32) -> u32 {
if requested == 0 {
DEFAULT_BITRATE_KBPS
} else {
requested.clamp(MIN_BITRATE_KBPS, MAX_BITRATE_KBPS)
}
}
/// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream
/// path): keeps one warm PipeWire capture stream instead of a connect/negotiate cycle —
/// and a daemon-side node churn — per session. (Drop now tears a capturer down cleanly.)
@@ -391,6 +412,14 @@ async fn serve_session(
// needed; the actual pads are created lazily by the input thread).
let gamepad = resolve_gamepad(hello.gamepad);
// Resolve the encoder bitrate (client request clamped to a sane range, or host default).
let bitrate_kbps = resolve_bitrate_kbps(hello.bitrate_kbps);
tracing::info!(
requested_kbps = hello.bitrate_kbps,
resolved_kbps = bitrate_kbps,
"encoder bitrate"
);
// Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport).
let probe = std::net::UdpSocket::bind("0.0.0.0:0")?;
let udp_port = probe.local_addr()?.port();
@@ -422,6 +451,7 @@ async fn serve_session(
.map(|c| c.as_pref())
.unwrap_or(CompositorPref::Auto),
gamepad,
bitrate_kbps,
};
io::write_msg(&mut send, &welcome.encode()).await?;
@@ -444,39 +474,62 @@ async fn serve_session(
"handshake complete — streaming"
);
// Control task: the handshake stream stays open for mid-stream renegotiation. A
// validated Reconfigure is acked, then handed to the data-plane thread, which rebuilds
// capture/encoder/virtual output at the new mode (the data plane itself is untouched).
// Control task: the handshake stream stays open for mid-stream renegotiation and speed
// tests. A validated Reconfigure is acked, then handed to the data-plane thread, which
// rebuilds capture/encoder/virtual output at the new mode (the data plane itself is
// untouched). A ProbeRequest is handed to the data plane, which bursts FLAG_PROBE filler and
// hands back a ProbeResult that this task writes to the client. The two control directions
// (inbound requests, outbound probe results) are multiplexed with `select!`.
let (reconfig_tx, reconfig_rx) = std::sync::mpsc::channel::<punktfunk_core::Mode>();
let (probe_tx, probe_rx) = std::sync::mpsc::channel::<ProbeRequest>();
let (probe_result_tx, mut probe_result_rx) =
tokio::sync::mpsc::unbounded_channel::<ProbeResult>();
tokio::spawn(async move {
let mut active = hello.mode;
while let Ok(msg) = io::read_msg(&mut ctrl_recv).await {
let Ok(req) = Reconfigure::decode(&msg) else {
tracing::warn!("unknown control message — ignoring");
continue;
};
let ok = req.mode.refresh_hz > 0
&& crate::encode::validate_dimensions(
crate::encode::Codec::H265,
req.mode.width,
req.mode.height,
)
.is_ok();
if ok {
active = req.mode;
tracing::info!(mode = ?req.mode, "mode switch accepted");
} else {
tracing::warn!(mode = ?req.mode, "mode switch rejected (invalid dimensions)");
}
let ack = Reconfigured {
accepted: ok,
mode: active,
};
if io::write_msg(&mut ctrl_send, &ack.encode()).await.is_err() {
break;
}
if ok && reconfig_tx.send(req.mode).is_err() {
break; // data plane gone
loop {
tokio::select! {
msg = io::read_msg(&mut ctrl_recv) => {
let Ok(msg) = msg else { break }; // stream closed
if let Ok(req) = Reconfigure::decode(&msg) {
let ok = req.mode.refresh_hz > 0
&& crate::encode::validate_dimensions(
crate::encode::Codec::H265,
req.mode.width,
req.mode.height,
)
.is_ok();
if ok {
active = req.mode;
tracing::info!(mode = ?req.mode, "mode switch accepted");
} else {
tracing::warn!(mode = ?req.mode, "mode switch rejected (invalid dimensions)");
}
let ack = Reconfigured { accepted: ok, mode: active };
if io::write_msg(&mut ctrl_send, &ack.encode()).await.is_err() {
break;
}
if ok && reconfig_tx.send(req.mode).is_err() {
break; // data plane gone
}
} else if let Ok(req) = ProbeRequest::decode(&msg) {
tracing::info!(
target_kbps = req.target_kbps,
duration_ms = req.duration_ms,
"speed-test probe requested"
);
if probe_tx.send(req).is_err() {
break; // data plane gone
}
} else {
tracing::warn!("unknown control message — ignoring");
}
}
result = probe_result_rx.recv() => {
let Some(result) = result else { break }; // data plane gone
if io::write_msg(&mut ctrl_send, &result.encode()).await.is_err() {
break;
}
}
}
}
});
@@ -593,6 +646,7 @@ async fn serve_session(
let source = opts.source;
let (seconds, frames) = (opts.seconds, opts.frames);
let mode = hello.mode;
let bitrate_kbps = welcome.bitrate_kbps; // resolved encoder bitrate (Hello clamped, or default)
let stop_stream = stop.clone();
let result: Result<()> = async {
tokio::task::spawn_blocking(move || -> Result<()> {
@@ -602,7 +656,13 @@ async fn serve_session(
let mut session = Session::new(cfg, Box::new(transport))
.map_err(|e| anyhow!("host session: {e:?}"))?;
match source {
M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream),
M3Source::Synthetic => synthetic_stream(
&mut session,
frames,
&stop_stream,
&probe_rx,
&probe_result_tx,
),
M3Source::Virtual => {
let compositor = compositor
.expect("the Virtual source resolves a compositor during the handshake");
@@ -613,6 +673,9 @@ async fn serve_session(
&stop_stream,
&reconfig_rx,
compositor,
bitrate_kbps,
&probe_rx,
&probe_result_tx,
)
}
}
@@ -1107,12 +1170,20 @@ fn audio_thread(_conn: quinn::Connection, _stop: Arc<AtomicBool>, _audio_cap: Au
);
}
fn synthetic_stream(session: &mut Session, frames: u32, stop: &AtomicBool) -> Result<()> {
fn synthetic_stream(
session: &mut Session,
frames: u32,
stop: &AtomicBool,
probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>,
) -> Result<()> {
let interval = std::time::Duration::from_millis(1000 / 60);
for idx in 0..frames {
if stop.load(Ordering::SeqCst) {
break;
}
// Service speed-test probes between synthetic frames (loopback bandwidth tests).
service_probes(session, stop, probe_rx, probe_result_tx);
let data = test_frame(idx, 64 * 1024);
session
.submit_frame(&data, now_ns(), (FLAG_PIC | FLAG_SOF) as u32)
@@ -1223,13 +1294,88 @@ fn resolve_compositor(pref: CompositorPref) -> Result<crate::vdisplay::Composito
Ok(chosen)
}
/// Bounds a speed-test [`ProbeRequest`] before bursting: a 1 Gbps / 5 s ceiling keeps a probe from
/// monopolizing the link or stalling the stream for too long. GF(2¹⁶) FEC makes ~1 Gbps reachable
/// on a LAN — ample headroom to find a session's sustainable bitrate.
const MAX_PROBE_KBPS: u32 = 1_000_000;
const MAX_PROBE_MS: u32 = 5_000;
/// Run a bandwidth probe over `session`: burst zero-filled access units flagged [`FLAG_PROBE`] at
/// `req.target_kbps` of goodput for `req.duration_ms` (both clamped to `MAX_PROBE_*`), pacing by a
/// "bytes allowed so far" budget so scheduling jitter doesn't overshoot the target. Returns what
/// was actually offered so the client can compute delivery ratio (`received / bytes_sent`) and
/// throughput. Video is paused for the duration (the caller's loop is blocked here) — a speed test
/// is a deliberate, short interruption the client initiates.
fn run_probe_burst(session: &mut Session, req: ProbeRequest, stop: &AtomicBool) -> ProbeResult {
let target_kbps = req.target_kbps.min(MAX_PROBE_KBPS);
let duration_ms = req.duration_ms.min(MAX_PROBE_MS);
if target_kbps == 0 || duration_ms == 0 {
return ProbeResult {
bytes_sent: 0,
packets_sent: 0,
duration_ms: 0,
};
}
// kbps -> bytes/s (x1000/8).
let bytes_per_sec = target_kbps as u64 * 125;
// ~240 AUs/s for smooth pacing, each capped so one submit_frame stays a bounded burst (a large
// AU fragments into many UDP shards via sendmmsg).
let chunk = (bytes_per_sec / 240).clamp(1200, 256 * 1024) as usize;
let filler = vec![0u8; chunk];
let start = std::time::Instant::now();
let deadline = start + std::time::Duration::from_millis(duration_ms as u64);
let mut bytes_sent = 0u64;
let mut packets_sent = 0u32;
while std::time::Instant::now() < deadline && !stop.load(Ordering::SeqCst) {
let allowed = (start.elapsed().as_secs_f64() * bytes_per_sec as f64) as u64;
if bytes_sent < allowed {
// A full send buffer drops on WouldBlock (UdpTransport returns Ok) — that loss is part
// of what the probe measures, so count what we offered and keep going.
let _ = session.submit_frame(&filler, now_ns(), FLAG_PROBE as u32);
bytes_sent += chunk as u64;
packets_sent += 1;
} else {
std::thread::sleep(std::time::Duration::from_micros(200));
}
}
let actual_ms = start.elapsed().as_millis() as u32;
tracing::info!(
target_kbps,
duration_ms = actual_ms,
bytes_sent,
packets_sent,
"speed-test probe burst complete"
);
ProbeResult {
bytes_sent,
packets_sent,
duration_ms: actual_ms,
}
}
/// Drain any pending speed-test requests and run each burst, replying with its [`ProbeResult`].
/// Called once per data-plane loop iteration so a probe runs between frames.
fn service_probes(
session: &mut Session,
stop: &AtomicBool,
probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>,
) {
while let Ok(req) = probe_rx.try_recv() {
let result = run_probe_burst(session, req, stop);
let _ = probe_result_tx.send(result);
}
}
/// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
///
/// `reconfig` delivers accepted mid-stream mode switches: the capture/encode pipeline is
/// rebuilt at the new mode (capturer drop tears down the PipeWire stream and, via its
/// keepalive, the virtual output) while the data-plane `session` continues untouched —
/// the rebuilt encoder opens with an IDR + in-band parameter sets.
/// the rebuilt encoder opens with an IDR + in-band parameter sets. `probe_rx`/`probe_result_tx`
/// carry speed-test bursts (see [`service_probes`]).
#[allow(clippy::too_many_arguments)]
fn virtual_stream(
session: &mut Session,
mode: punktfunk_core::Mode,
@@ -1237,20 +1383,26 @@ fn virtual_stream(
stop: &AtomicBool,
reconfig: &std::sync::mpsc::Receiver<punktfunk_core::Mode>,
compositor: crate::vdisplay::Compositor,
bitrate_kbps: u32,
probe_rx: &std::sync::mpsc::Receiver<ProbeRequest>,
probe_result_tx: &tokio::sync::mpsc::UnboundedSender<ProbeResult>,
) -> Result<()> {
tracing::info!(
compositor = compositor.id(),
?mode,
bitrate_kbps,
"punktfunk/1 virtual display"
);
let mut vd = crate::vdisplay::open(compositor)?;
let (mut capturer, mut enc, mut frame, mut interval) =
build_pipeline_with_retry(&mut vd, mode)?;
build_pipeline_with_retry(&mut vd, mode, bitrate_kbps)?;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
let mut next = std::time::Instant::now();
let mut sent: u64 = 0;
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
// Service speed-test probes between frames (each burst pauses video for its duration).
service_probes(session, stop, probe_rx, probe_result_tx);
// Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once,
// not once per stale intermediate mode.
let mut want = None;
@@ -1262,7 +1414,7 @@ fn virtual_stream(
// Build the new pipeline BEFORE dropping the old one: the host already acked
// the switch as accepted, so a rebuild failure must not kill an otherwise
// healthy session — keep streaming the current mode and log instead.
match build_pipeline(&mut vd, new_mode) {
match build_pipeline(&mut vd, new_mode, bitrate_kbps) {
Ok(next_pipe) => {
(capturer, enc, frame, interval) = next_pipe;
next = std::time::Instant::now();
@@ -1321,11 +1473,12 @@ type Pipeline = (
fn build_pipeline_with_retry(
vd: &mut Box<dyn crate::vdisplay::VirtualDisplay>,
mode: punktfunk_core::Mode,
bitrate_kbps: u32,
) -> Result<Pipeline> {
const MAX_ATTEMPTS: u32 = 4;
let mut backoff = std::time::Duration::from_millis(500);
for attempt in 1..=MAX_ATTEMPTS {
match build_pipeline(vd, mode) {
match build_pipeline(vd, mode, bitrate_kbps) {
Ok(pipe) => {
if attempt > 1 {
tracing::info!(attempt, "pipeline up after retry");
@@ -1382,6 +1535,7 @@ fn is_permanent_build_error(chain: &str) -> bool {
fn build_pipeline(
vd: &mut Box<dyn crate::vdisplay::VirtualDisplay>,
mode: punktfunk_core::Mode,
bitrate_kbps: u32,
) -> Result<Pipeline> {
let vout = vd.create(mode).context("create virtual output")?;
// The backend reports the refresh it actually achieved in `preferred_mode.2` (KWin may cap a
@@ -1410,7 +1564,7 @@ fn build_pipeline(
frame.width,
frame.height,
effective_hz,
20_000_000,
bitrate_kbps as u64 * 1000,
frame.is_cuda(),
)
.context("open NVENC")?;
@@ -1762,6 +1916,7 @@ mod tests {
mode,
CompositorPref::Auto,
GamepadPref::Auto,
0,
None,
None,
timeout
@@ -1786,6 +1941,7 @@ mod tests {
mode,
CompositorPref::Auto,
GamepadPref::Auto,
0,
Some(host_fp),
Some((cert.clone(), key.clone())),
timeout,