diff --git a/crates/punktfunk-client-rs/src/main.rs b/crates/punktfunk-client-rs/src/main.rs index 4245175..c3dc4c6 100644 --- a/crates/punktfunk-client-rs/src/main.rs +++ b/crates/punktfunk-client-rs/src/main.rs @@ -39,7 +39,10 @@ use anyhow::{anyhow, Context, Result}; use punktfunk_core::config::GamepadPref; use punktfunk_core::config::Role; use punktfunk_core::input::{InputEvent, InputKind}; -use punktfunk_core::quic::{endpoint, io, Hello, Reconfigure, Reconfigured, Start, Welcome}; +use punktfunk_core::packet::FLAG_PROBE; +use punktfunk_core::quic::{ + endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, +}; use punktfunk_core::transport::UdpTransport; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; use std::io::Write; @@ -67,6 +70,11 @@ struct Args { compositor: CompositorPref, /// `--gamepad NAME` — request a host virtual-pad backend (auto|xbox360|dualsense). gamepad: GamepadPref, + /// `--bitrate KBPS` — request this encoder bitrate (kilobits/s); 0 = host default. + bitrate_kbps: u32, + /// `--speed-test KBPS:MS` — after the stream starts, ask the host for a `MS`-millisecond + /// bandwidth probe burst at `KBPS`, then report measured throughput + loss. + speed_test: Option<(u32, u32)>, } fn parse_mode(m: &str) -> Option { @@ -178,6 +186,11 @@ fn parse_args() -> Args { name: get("--name").unwrap_or("punktfunk-client-rs").to_string(), compositor, gamepad, + bitrate_kbps: get("--bitrate").and_then(|s| s.parse().ok()).unwrap_or(0), + speed_test: get("--speed-test").and_then(|s| { + let (kbps, ms) = s.split_once(':')?; + Some((kbps.parse().ok()?, ms.parse().ok()?)) + }), } } @@ -263,6 +276,7 @@ async fn session(args: Args) -> Result<()> { mode: args.mode, compositor: args.compositor, gamepad: args.gamepad, + bitrate_kbps: args.bitrate_kbps, } .encode(), ) @@ -292,9 +306,18 @@ async fn session(args: Args) -> Result<()> { ) .await?; + // Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the + // --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold + // now_ns timestamps of the receive window (0 = unset). + let probe_recv_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let probe_recv_packets = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let probe_first_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let probe_last_ns = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + // Mid-stream renegotiation test: after a delay, ask the host to switch modes on the // still-open control stream. The stream then carries new-mode AUs (IDR + in-band - // parameter sets) — ffprobe the --out file to see both resolutions. + // parameter sets) — ffprobe the --out file to see both resolutions. Mutually exclusive with + // --speed-test (both own the control stream). if let Some((new_mode, after_secs)) = args.remode { let mut rs = send; let mut rr = recv; @@ -319,6 +342,70 @@ async fn session(args: Args) -> Result<()> { other => tracing::error!(?other, "bad Reconfigured"), } }); + } else if let Some((target_kbps, duration_ms)) = args.speed_test { + // Bandwidth probe: after the stream warms up, ask the host to burst FLAG_PROBE filler; + // measure what arrives vs. what it reports sending. + let mut ss = send; + let mut sr = recv; + let (pb, pp, pf, pl) = ( + probe_recv_bytes.clone(), + probe_recv_packets.clone(), + probe_first_ns.clone(), + probe_last_ns.clone(), + ); + tokio::spawn(async move { + use std::sync::atomic::Ordering::Relaxed; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; // let the stream warm up + tracing::info!(target_kbps, duration_ms, "requesting speed-test probe"); + if io::write_msg( + &mut ss, + &ProbeRequest { + target_kbps, + duration_ms, + } + .encode(), + ) + .await + .is_err() + { + tracing::error!("ProbeRequest write failed"); + return; + } + let res = match io::read_msg(&mut sr).await.map(|b| ProbeResult::decode(&b)) { + Ok(Ok(r)) => r, + other => { + tracing::error!(?other, "bad ProbeResult"); + return; + } + }; + // The reliable result can beat the last UDP shards — let them reassemble. + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + let recv_bytes = pb.load(Relaxed); + let recv_packets = pp.load(Relaxed); + let (first, last) = (pf.load(Relaxed), pl.load(Relaxed)); + let window_ms = if first > 0 && last > first { + (last - first) / 1_000_000 + } else { + 0 + }; + let throughput_kbps = recv_bytes.saturating_mul(8).checked_div(window_ms).unwrap_or(0); + let loss_pct = if res.bytes_sent > 0 { + res.bytes_sent.saturating_sub(recv_bytes) as f64 / res.bytes_sent as f64 * 100.0 + } else { + 0.0 + }; + tracing::info!( + target_kbps, + host_sent_bytes = res.bytes_sent, + host_sent_packets = res.packets_sent, + recv_bytes, + recv_packets, + window_ms, + throughput_kbps, + loss_pct = format!("{loss_pct:.1}%"), + "SPEED TEST complete", + ); + }); } // Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the @@ -581,6 +668,12 @@ async fn session(args: Args) -> Result<()> { let cfg = welcome.session_config(Role::Client); let expected = welcome.frames; let out_path = args.out.clone(); + let (pb, pp, pf, pl) = ( + probe_recv_bytes.clone(), + probe_recv_packets.clone(), + probe_first_ns.clone(), + probe_last_ns.clone(), + ); // Data plane on a blocking thread (native threads only on the frame path). let result = tokio::task::spawn_blocking(move || -> Result<()> { @@ -619,6 +712,17 @@ async fn session(args: Args) -> Result<()> { match session.poll_frame() { Ok(frame) => { last_rx = std::time::Instant::now(); + // Speed-test filler isn't video: fold it into the probe accumulators and skip + // verification / the --out sink. + if frame.flags & FLAG_PROBE as u32 != 0 { + use std::sync::atomic::Ordering::Relaxed; + let n = now_ns(); + let _ = pf.compare_exchange(0, n, Relaxed, Relaxed); + pl.store(n, Relaxed); + pb.fetch_add(frame.data.len() as u64, Relaxed); + pp.fetch_add(1, Relaxed); + continue; + } bytes += frame.data.len() as u64; // The host stamps pts with its capture wall clock; same-host runs share it. let lat = now_ns().saturating_sub(frame.pts_ns); diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index ff74d2a..42a2585 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -748,6 +748,50 @@ pub unsafe extern "C" fn punktfunk_connect_ex2( client_cert_pem: *const std::os::raw::c_char, client_key_pem: *const std::os::raw::c_char, timeout_ms: u32, +) -> *mut PunktfunkConnection { + unsafe { + punktfunk_connect_ex3( + host, + port, + width, + height, + refresh_hz, + compositor, + gamepad, + 0, // bitrate_kbps = 0: let the host pick its default + pin_sha256, + observed_sha256_out, + client_cert_pem, + client_key_pem, + timeout_ms, + ) + } +} + +/// Like [`punktfunk_connect_ex2`], but additionally requests the video encoder `bitrate_kbps` +/// (kilobits per second). `0` lets the host pick its default; any other value is clamped to the +/// host's supported range. After a speed test ([`punktfunk_connection_speed_test`]) a client can +/// reconnect (or pick at connect time) with the measured rate. The value the host actually +/// configured is readable via [`punktfunk_connection_bitrate`]. +/// +/// # Safety +/// Same as [`punktfunk_connect`]. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connect_ex3( + host: *const std::os::raw::c_char, + port: u16, + width: u32, + height: u32, + refresh_hz: u32, + compositor: u32, + gamepad: u32, + bitrate_kbps: u32, + pin_sha256: *const u8, + observed_sha256_out: *mut u8, + client_cert_pem: *const std::os::raw::c_char, + client_key_pem: *const std::os::raw::c_char, + timeout_ms: u32, ) -> *mut PunktfunkConnection { let r = std::panic::catch_unwind(AssertUnwindSafe(|| { if host.is_null() { @@ -790,6 +834,7 @@ pub unsafe extern "C" fn punktfunk_connect_ex2( mode, pref, gamepad, + bitrate_kbps, pin, identity, std::time::Duration::from_millis(timeout_ms as u64), @@ -1245,6 +1290,32 @@ pub unsafe extern "C" fn punktfunk_connection_gamepad( }) } +/// The video encoder bitrate (kilobits per second) the host actually configured for this session +/// — the [`punktfunk_connect_ex3`] request clamped to the host's range, or its default when `0` +/// was requested. `0` = an older host that didn't report it. Safe any time after connect. +/// +/// # Safety +/// `c` is a valid connection handle; `bitrate_kbps` is writable (NULL is skipped). +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_bitrate( + c: *const PunktfunkConnection, + bitrate_kbps: *mut u32, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + unsafe { + if !bitrate_kbps.is_null() { + *bitrate_kbps = c.inner.resolved_bitrate_kbps; + } + } + PunktfunkStatus::Ok + }) +} + /// Ask the host to switch the live session to `width`x`height`@`refresh_hz` without /// reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the /// stream continues at the new mode — the first new-mode access unit is an IDR with @@ -1278,6 +1349,91 @@ pub unsafe extern "C" fn punktfunk_connection_request_mode( }) } +/// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until +/// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the +/// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate. +#[repr(C)] +#[derive(Clone, Copy, Debug, Default)] +pub struct PunktfunkProbeResult { + /// 1 once the host's end-of-burst report arrived (measurement final); else 0 (partial). + pub done: u8, + /// Probe payload bytes / packets the client received. + pub recv_bytes: u64, + pub recv_packets: u32, + /// Probe payload bytes / packets the host reported sending. + pub host_bytes: u64, + pub host_packets: u32, + /// Client-measured receive window (first→last probe AU), milliseconds. + pub elapsed_ms: u32, + /// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). + pub throughput_kbps: u32, + /// Delivery loss `(host_bytes - recv_bytes) / host_bytes` as a percentage (0 if unknown). + pub loss_pct: f32, +} + +/// Start a bandwidth speed test: ask the host to burst filler over the data plane at +/// `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 1 Gbps / ≤ 5 s), +/// *briefly pausing video*. Non-blocking — poll [`punktfunk_connection_probe_result`] until its +/// `done` field is 1. Starting a probe resets any prior measurement. +/// +/// # Safety +/// `c` is a valid connection handle. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_speed_test( + c: *const PunktfunkConnection, + target_kbps: u32, + duration_ms: u32, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + match c.inner.request_probe(target_kbps, duration_ms) { + Ok(()) => PunktfunkStatus::Ok, + Err(e) => e.status(), + } + }) +} + +/// Read the current speed-test measurement into `*out` (partial until `out->done == 1`). Safe to +/// poll repeatedly after [`punktfunk_connection_speed_test`]; before any probe it reports zeros. +/// +/// # Safety +/// `c` is a valid connection handle; `out` is writable for one `PunktfunkProbeResult` (NULL is an +/// error). +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_probe_result( + c: *const PunktfunkConnection, + out: *mut PunktfunkProbeResult, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + if out.is_null() { + return PunktfunkStatus::NullPointer; + } + let o = c.inner.probe_result(); + unsafe { + *out = PunktfunkProbeResult { + done: o.done as u8, + recv_bytes: o.recv_bytes, + recv_packets: o.recv_packets, + host_bytes: o.host_bytes, + host_packets: o.host_packets, + elapsed_ms: o.elapsed_ms, + throughput_kbps: o.throughput_kbps, + loss_pct: o.loss_pct, + }; + } + PunktfunkStatus::Ok + }) +} + /// Close the connection and free the handle (joins the internal threads). NULL is a no-op. /// /// # Safety diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 8a1764f..9eb6dd5 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -14,15 +14,70 @@ use crate::config::{CompositorPref, GamepadPref, Mode, Role}; use crate::error::{PunktfunkError, Result}; use crate::input::InputEvent; +use crate::packet::FLAG_PROBE; use crate::quic::{ - endpoint, io, Hello, HidOutput, Reconfigure, Reconfigured, RichInput, Start, Welcome, + endpoint, io, Hello, HidOutput, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, + RichInput, Start, Welcome, }; use crate::session::{Frame, Session}; use crate::transport::UdpTransport; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +/// A control-stream request the embedder makes on the open handshake stream: a mode switch or a +/// speed test. One outbound channel carries both so the worker's `select!` has a single writer +/// (two `&mut ctrl_send` borrows across select branches don't compile). +enum CtrlRequest { + Mode(Mode), + Probe(ProbeRequest), +} + +/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated +/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, and the resolved +/// encoder bitrate (kbps). +type Negotiated = (Mode, GamepadPref, [u8; 32], u32); + +/// Accumulated state of an in-flight / finished speed test. The data-plane pump folds each +/// received [`FLAG_PROBE`] access unit in; the control task records the host's [`ProbeResult`] +/// when it lands. Read (and finalized into numbers) by [`NativeClient::probe_result`]. +#[derive(Default)] +struct ProbeState { + /// A probe is in progress (set by `request_probe`, cleared by nothing — the latest one wins). + active: bool, + /// Probe access-unit payload bytes the client received, and their count. + recv_bytes: u64, + recv_packets: u32, + /// First/last probe AU arrival — the measured receive window. + start: Option, + last: Option, + /// The host's report ([`ProbeResult`]); present once the burst finished. + host_bytes: u64, + host_packets: u32, + /// The host's `ProbeResult` arrived → the measurement is final. + done: bool, +} + +/// A finished/partial speed-test measurement, returned by [`NativeClient::probe_result`]. +#[derive(Clone, Copy, Debug, Default)] +pub struct ProbeOutcome { + /// The host's end-of-burst report has arrived — the numbers below are final. + pub done: bool, + /// Probe payload bytes / packets the client received. + pub recv_bytes: u64, + pub recv_packets: u32, + /// Probe payload bytes / packets the host reported sending. + pub host_bytes: u64, + pub host_packets: u32, + /// The client-measured receive window (first→last probe AU), in milliseconds. + pub elapsed_ms: u32, + /// Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). This is the figure to + /// drive a [`Hello::bitrate_kbps`] choice from. + pub throughput_kbps: u32, + /// Delivery loss = `(host_bytes - recv_bytes) / host_bytes`, as a percentage (0 if unknown). + pub loss_pct: f32, +} /// Frames buffered between the data-plane pump and the embedder. Small: the embedder /// (decoder) should drain at frame rate; when it falls behind, the newest frame is dropped @@ -62,7 +117,10 @@ pub struct NativeClient { mic_tx: tokio::sync::mpsc::UnboundedSender<(u32, u64, Vec)>, /// Outbound rich input (DualSense touchpad / motion) → 0xCC datagrams by the worker. rich_input_tx: tokio::sync::mpsc::UnboundedSender, - reconfig_tx: tokio::sync::mpsc::UnboundedSender, + /// Outbound control-stream requests (mode switch, speed test) → the worker's control task. + ctrl_tx: tokio::sync::mpsc::UnboundedSender, + /// Speed-test accumulator, shared with the data-plane pump + control task. + probe: Arc>, shutdown: Arc, worker: Option>, /// The currently active session mode (the Welcome's, then updated by every accepted @@ -74,6 +132,10 @@ pub struct NativeClient { /// The virtual gamepad backend the host actually resolved ([`Welcome::gamepad`]). /// `Auto` = an older host that didn't say (assume X-Box 360, no DualSense feedback). pub resolved_gamepad: GamepadPref, + /// The encoder bitrate the host actually configured ([`Welcome::bitrate_kbps`], kbps): our + /// requested rate clamped to the host's range, or its default if we requested `0`. `0` = an + /// older host that didn't report it. + pub resolved_bitrate_kbps: u32, } impl NativeClient { @@ -94,6 +156,7 @@ impl NativeClient { mode: Mode, compositor: CompositorPref, gamepad: GamepadPref, + bitrate_kbps: u32, pin: Option<[u8; 32]>, identity: Option<(String, String)>, timeout: Duration, @@ -105,15 +168,17 @@ impl NativeClient { let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::(); let (mic_tx, mic_rx) = tokio::sync::mpsc::unbounded_channel::<(u32, u64, Vec)>(); let (rich_input_tx, rich_input_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (reconfig_tx, reconfig_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel::(); let (ready_tx, ready_rx) = - std::sync::mpsc::channel::>(); + std::sync::mpsc::channel::>(); let shutdown = Arc::new(AtomicBool::new(false)); let mode_slot = Arc::new(std::sync::Mutex::new(mode)); + let probe = Arc::new(Mutex::new(ProbeState::default())); let host = host.to_string(); let shutdown_w = shutdown.clone(); let mode_slot_w = mode_slot.clone(); + let probe_w = probe.clone(); let worker = std::thread::Builder::new() .name("punktfunk-client".into()) .spawn(move || { @@ -134,6 +199,7 @@ impl NativeClient { mode, compositor, gamepad, + bitrate_kbps, pin, identity, frame_tx, @@ -143,22 +209,24 @@ impl NativeClient { input_rx, mic_rx, rich_input_rx, - reconfig_rx, + ctrl_rx, ready_tx, shutdown: shutdown_w, mode_slot: mode_slot_w, + probe: probe_w, })); }) .map_err(PunktfunkError::Io)?; - let (negotiated, resolved_gamepad, fingerprint) = match ready_rx.recv_timeout(timeout) { - Ok(Ok(t)) => t, - Ok(Err(e)) => return Err(e), - Err(_) => { - shutdown.store(true, Ordering::SeqCst); - return Err(PunktfunkError::Timeout); - } - }; + let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps) = + match ready_rx.recv_timeout(timeout) { + Ok(Ok(t)) => t, + Ok(Err(e)) => return Err(e), + Err(_) => { + shutdown.store(true, Ordering::SeqCst); + return Err(PunktfunkError::Timeout); + } + }; *mode_slot.lock().unwrap() = negotiated; Ok(NativeClient { frames: frame_rx, @@ -168,12 +236,14 @@ impl NativeClient { input_tx, mic_tx, rich_input_tx, - reconfig_tx, + ctrl_tx, + probe, shutdown, worker: Some(worker), mode: mode_slot, host_fingerprint: fingerprint, resolved_gamepad, + resolved_bitrate_kbps, }) } @@ -280,11 +350,61 @@ impl NativeClient { /// frames open with an IDR carrying new parameter sets) and [`NativeClient::mode`] /// reflects it. A rejected request leaves the session unchanged. pub fn request_mode(&self, mode: Mode) -> Result<()> { - self.reconfig_tx - .send(mode) + self.ctrl_tx + .send(CtrlRequest::Mode(mode)) .map_err(|_| PunktfunkError::Closed) } + /// Start a bandwidth speed test: ask the host to burst filler over the data plane at + /// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the + /// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its + /// `done` flag is set. Starting a probe resets any prior measurement. The host clamps both + /// fields (≤ 1 Gbps, ≤ 5 s). + pub fn request_probe(&self, target_kbps: u32, duration_ms: u32) -> Result<()> { + // Reset the accumulator so a fresh run doesn't blend into the previous one. + *self.probe.lock().unwrap() = ProbeState { + active: true, + ..Default::default() + }; + self.ctrl_tx + .send(CtrlRequest::Probe(ProbeRequest { + target_kbps, + duration_ms, + })) + .map_err(|_| PunktfunkError::Closed) + } + + /// Read the current speed-test measurement (partial until `done`, final once the host's + /// end-of-burst report lands). Derives goodput + loss from the accumulated probe bytes. + pub fn probe_result(&self) -> ProbeOutcome { + let p = self.probe.lock().unwrap(); + let elapsed_ms = match (p.start, p.last) { + (Some(s), Some(l)) => l.duration_since(s).as_millis() as u32, + _ => 0, + }; + // bytes × 8 / ms = kilobits/second. + let throughput_kbps = if elapsed_ms > 0 { + (p.recv_bytes.saturating_mul(8) / elapsed_ms as u64) as u32 + } else { + 0 + }; + let loss_pct = if p.host_bytes > 0 { + p.host_bytes.saturating_sub(p.recv_bytes) as f64 / p.host_bytes as f64 * 100.0 + } else { + 0.0 + } as f32; + ProbeOutcome { + done: p.done, + recv_bytes: p.recv_bytes, + recv_packets: p.recv_packets, + host_bytes: p.host_bytes, + host_packets: p.host_packets, + elapsed_ms, + throughput_kbps, + loss_pct, + } + } + /// Pull the next reassembled, FEC-recovered access unit; [`PunktfunkError::NoFrame`] on /// timeout, [`PunktfunkError::Closed`]-class errors once the session ended. /// @@ -373,6 +493,7 @@ struct WorkerArgs { mode: Mode, compositor: CompositorPref, gamepad: GamepadPref, + bitrate_kbps: u32, pin: Option<[u8; 32]>, identity: Option<(String, String)>, frame_tx: SyncSender, @@ -382,10 +503,11 @@ struct WorkerArgs { input_rx: tokio::sync::mpsc::UnboundedReceiver, mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec)>, rich_input_rx: tokio::sync::mpsc::UnboundedReceiver, - reconfig_rx: tokio::sync::mpsc::UnboundedReceiver, - ready_tx: std::sync::mpsc::Sender>, + ctrl_rx: tokio::sync::mpsc::UnboundedReceiver, + ready_tx: std::sync::mpsc::Sender>, shutdown: Arc, mode_slot: Arc>, + probe: Arc>, } /// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking @@ -397,6 +519,7 @@ async fn worker_main(args: WorkerArgs) { mode, compositor, gamepad, + bitrate_kbps, pin, identity, frame_tx, @@ -406,10 +529,11 @@ async fn worker_main(args: WorkerArgs) { mut input_rx, mut mic_rx, mut rich_input_rx, - mut reconfig_rx, + mut ctrl_rx, ready_tx, shutdown, mode_slot, + probe, } = args; let setup = async { let remote: std::net::SocketAddr = format!("{host}:{port}") @@ -448,6 +572,7 @@ async fn worker_main(args: WorkerArgs) { mode, compositor, gamepad, + bitrate_kbps, } .encode(), ) @@ -491,6 +616,7 @@ async fn worker_main(args: WorkerArgs) { welcome.mode, welcome.gamepad, fingerprint, + welcome.bitrate_kbps, )) }; @@ -502,6 +628,7 @@ async fn worker_main(args: WorkerArgs) { negotiated, resolved_gamepad, fingerprint, + resolved_bitrate_kbps, ) = match setup.await { Ok(t) => t, Err(e) => { @@ -509,7 +636,12 @@ async fn worker_main(args: WorkerArgs) { return; } }; - let _ = ready_tx.send(Ok((negotiated, resolved_gamepad, fingerprint))); + let _ = ready_tx.send(Ok(( + negotiated, + resolved_gamepad, + fingerprint, + resolved_bitrate_kbps, + ))); // Input task: embedder events → QUIC datagrams. let input_conn = conn.clone(); @@ -536,30 +668,50 @@ async fn worker_main(args: WorkerArgs) { } }); - // Control task: the handshake stream stays open for mid-stream renegotiation. One - // request at a time — write Reconfigure, await Reconfigured, publish the active mode. + // Control task: the handshake stream stays open for mid-stream renegotiation + speed tests. + // Outbound requests (mode switch, probe) and inbound replies (Reconfigured, ProbeResult) are + // multiplexed with `select!`; a single outbound channel (`ctrl_rx`) keeps one writer so the + // two `&mut ctrl_send` borrows don't collide across branches. { let mode_slot = mode_slot.clone(); + let probe = probe.clone(); tokio::spawn(async move { - while let Some(want) = reconfig_rx.recv().await { - if io::write_msg(&mut ctrl_send, &Reconfigure { mode: want }.encode()) - .await - .is_err() - { - break; - } - let ack = match io::read_msg(&mut ctrl_recv).await { - Ok(b) => match Reconfigured::decode(&b) { - Ok(a) => a, - Err(_) => break, // protocol error — stop renegotiating - }, - Err(_) => break, // stream closed - }; - if ack.accepted { - *mode_slot.lock().unwrap() = ack.mode; - tracing::info!(mode = ?ack.mode, "host accepted mode switch"); - } else { - tracing::warn!(requested = ?want, active = ?ack.mode, "host rejected mode switch"); + loop { + tokio::select! { + req = ctrl_rx.recv() => { + let Some(req) = req else { break }; // client dropped + let bytes = match req { + CtrlRequest::Mode(m) => Reconfigure { mode: m }.encode(), + CtrlRequest::Probe(p) => p.encode(), + }; + if io::write_msg(&mut ctrl_send, &bytes).await.is_err() { + break; + } + } + msg = io::read_msg(&mut ctrl_recv) => { + let Ok(msg) = msg else { break }; // stream closed + if let Ok(ack) = Reconfigured::decode(&msg) { + if ack.accepted { + *mode_slot.lock().unwrap() = ack.mode; + tracing::info!(mode = ?ack.mode, "host accepted mode switch"); + } else { + tracing::warn!(active = ?ack.mode, "host rejected mode switch"); + } + } else if let Ok(result) = ProbeResult::decode(&msg) { + let mut p = probe.lock().unwrap(); + p.host_bytes = result.bytes_sent; + p.host_packets = result.packets_sent; + p.done = true; + tracing::info!( + bytes_sent = result.bytes_sent, + packets_sent = result.packets_sent, + duration_ms = result.duration_ms, + "speed-test probe result" + ); + } else { + tracing::warn!("unknown control message — ignoring"); + } + } } } }); @@ -607,11 +759,25 @@ async fn worker_main(args: WorkerArgs) { // Data-plane pump on a blocking thread: poll the session, hand frames to the embedder. // try_send drops the newest frame when the embedder lags (freshness over completeness). + // Speed-test filler ([`FLAG_PROBE`]) is folded into the probe accumulator instead of the + // decoder queue — it isn't video. let pump_shutdown = shutdown.clone(); + let pump_probe = probe.clone(); let _ = tokio::task::spawn_blocking(move || { while !pump_shutdown.load(Ordering::SeqCst) { match session.poll_frame() { Ok(frame) => { + if frame.flags & FLAG_PROBE as u32 != 0 { + let mut p = pump_probe.lock().unwrap(); + if p.active { + let now = Instant::now(); + p.start.get_or_insert(now); + p.last = Some(now); + p.recv_bytes += frame.data.len() as u64; + p.recv_packets += 1; + } + continue; // not video — never enqueue for the decoder + } let _ = frame_tx.try_send(frame); } Err(PunktfunkError::NoFrame) => { diff --git a/crates/punktfunk-core/src/packet.rs b/crates/punktfunk-core/src/packet.rs index 8d5b5b8..6cab929 100644 --- a/crates/punktfunk-core/src/packet.rs +++ b/crates/punktfunk-core/src/packet.rs @@ -30,6 +30,10 @@ pub const PUNKTFUNK_MAGIC: u8 = 0xC9; pub const FLAG_PIC: u8 = 0x1; pub const FLAG_EOF: u8 = 0x2; pub const FLAG_SOF: u8 = 0x4; +/// Bandwidth-probe filler, not decodable video: a [`crate::quic::ProbeRequest`] speed test makes +/// the host burst access units carrying this flag so the client measures throughput/loss without +/// feeding them to the decoder. Punktfunk/1 only (GameStream never sets it). +pub const FLAG_PROBE: u8 = 0x8; /// Crypto framing overhead [`Session`](crate::session::Session) adds when encrypting: /// an 8-byte sequence prefix plus the GCM tag. diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index f2a992e..476c762 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -52,6 +52,11 @@ pub struct Hello { /// [`Welcome::gamepad`]. Appended to the wire form — omitted by older clients (decodes /// to `Auto`). pub gamepad: GamepadPref, + /// The client's desired video encoder bitrate, in kilobits per second. `0` = no preference + /// (the host uses its default). The host clamps the request to a supported range and reports + /// the value it actually configured in [`Welcome::bitrate_kbps`]. Appended to the wire form — + /// omitted by older clients (decodes to `0`, i.e. host default). + pub bitrate_kbps: u32, } /// `host → client`: the complete session offer. @@ -77,6 +82,11 @@ pub struct Welcome { /// DualSense feedback (0xCD) can arrive at all. Appended to the wire form — `Auto` when an /// older host omitted it (i.e. "unknown, assume X-Box 360"). pub gamepad: GamepadPref, + /// The encoder bitrate the host actually configured for this session, in kilobits per second + /// (the client's [`Hello::bitrate_kbps`] clamped to the host's supported range, or the host + /// default when the client requested `0`). Appended to the wire form — `0` when an older host + /// omitted it (i.e. "unknown"). + pub bitrate_kbps: u32, } /// `client → host`: data plane is bound, begin streaming. @@ -107,10 +117,41 @@ pub struct Reconfigured { pub mode: Mode, } +/// `client → host`, any time after [`Start`]: run a bandwidth speed test. The host bursts +/// filler access units (flagged [`crate::packet::FLAG_PROBE`]) over the data plane at +/// `target_kbps` of application goodput for `duration_ms`, *pausing video for the duration*, then +/// replies with [`ProbeResult`]. The client measures the received probe bytes + time to estimate +/// the link's sustainable rate (and the loss vs. the host's reported send count) so it can pick a +/// [`Hello::bitrate_kbps`]. The host clamps both fields to sane bounds. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct ProbeRequest { + /// Goodput rate the host should send the probe at, in kilobits per second. + pub target_kbps: u32, + /// How long to burst, in milliseconds. + pub duration_ms: u32, +} + +/// `host → client`: the probe burst is finished. Reports what the host actually sent so the +/// client can compute delivery ratio (loss) = `received / bytes_sent` and throughput = +/// `received_bytes * 8 / elapsed`. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct ProbeResult { + /// Total access-unit payload bytes the host emitted for the probe. + pub bytes_sent: u64, + /// Number of probe access units the host emitted. + pub packets_sent: u32, + /// The burst's actual duration in milliseconds (the host clamps/measures the request). + pub duration_ms: u32, +} + /// Type byte of [`Reconfigure`] (first byte after the magic). pub const MSG_RECONFIGURE: u8 = 0x01; /// Type byte of [`Reconfigured`]. pub const MSG_RECONFIGURED: u8 = 0x02; +/// Type byte of [`ProbeRequest`]. +pub const MSG_PROBE_REQUEST: u8 = 0x20; +/// Type byte of [`ProbeResult`]. +pub const MSG_PROBE_RESULT: u8 = 0x21; // --------------------------------------------------------------------------------------------- // Pairing ceremony (typed control messages): instead of a session Hello, a client may open @@ -379,6 +420,7 @@ impl Hello { b.extend_from_slice(&self.mode.refresh_hz.to_le_bytes()); b.push(self.compositor.to_u8()); // appended at offset 20 — older hosts read [0..20] and skip it b.push(self.gamepad.to_u8()); // appended at offset 21 — same back-compat discipline + b.extend_from_slice(&self.bitrate_kbps.to_le_bytes()); // appended at offset 22..26 b } @@ -403,6 +445,11 @@ impl Hello { .get(21) .map(|&v| GamepadPref::from_u8(v)) .unwrap_or_default(), + // Optional trailing 4 bytes (LE) — absent on an older client → `0` (host default). + bitrate_kbps: b + .get(22..26) + .map(|s| u32::from_le_bytes(s.try_into().unwrap())) + .unwrap_or(0), }) } } @@ -429,13 +476,15 @@ impl Welcome { b.extend_from_slice(&self.frames.to_le_bytes()); b.push(self.compositor.to_u8()); // appended at offset 53 — older clients read [0..53] and skip it b.push(self.gamepad.to_u8()); // appended at offset 54 — same back-compat discipline + b.extend_from_slice(&self.bitrate_kbps.to_le_bytes()); // appended at offset 55..59 b } pub fn decode(b: &[u8]) -> Result { // Layout (LE): magic[0..4] abi[4..8] port[8..10] w[10..14] h[14..18] hz[18..22] // scheme[22] pct[23] max_data[24..26] shard[26..28] encrypt[28] key[29..45] - // salt[45..49] frames[49..53] compositor[53] gamepad[54] (optional trailing bytes). + // salt[45..49] frames[49..53] compositor[53] gamepad[54] bitrate_kbps[55..59] + // (compositor/gamepad/bitrate are optional trailing bytes). if b.len() < 53 || &b[0..4] != MAGIC { return Err(PunktfunkError::InvalidArg("bad Welcome")); } @@ -477,6 +526,11 @@ impl Welcome { .get(54) .map(|&v| GamepadPref::from_u8(v)) .unwrap_or_default(), + // Optional trailing 4 bytes (LE) — absent on an older host → `0` (unknown). + bitrate_kbps: b + .get(55..59) + .map(|s| u32::from_le_bytes(s.try_into().unwrap())) + .unwrap_or(0), }) } @@ -567,6 +621,53 @@ impl Reconfigured { } } +impl ProbeRequest { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] target_kbps[5..9] duration_ms[9..13] + let mut b = Vec::with_capacity(13); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_PROBE_REQUEST); + b.extend_from_slice(&self.target_kbps.to_le_bytes()); + b.extend_from_slice(&self.duration_ms.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 13 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_REQUEST { + return Err(PunktfunkError::InvalidArg("bad ProbeRequest")); + } + let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]); + Ok(ProbeRequest { + target_kbps: u32at(5), + duration_ms: u32at(9), + }) + } +} + +impl ProbeResult { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] bytes_sent[5..13] packets_sent[13..17] duration_ms[17..21] + let mut b = Vec::with_capacity(21); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_PROBE_RESULT); + b.extend_from_slice(&self.bytes_sent.to_le_bytes()); + b.extend_from_slice(&self.packets_sent.to_le_bytes()); + b.extend_from_slice(&self.duration_ms.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 21 || &b[0..4] != CTL_MAGIC || b[4] != MSG_PROBE_RESULT { + return Err(PunktfunkError::InvalidArg("bad ProbeResult")); + } + Ok(ProbeResult { + bytes_sent: u64::from_le_bytes(b[5..13].try_into().unwrap()), + packets_sent: u32::from_le_bytes(b[13..17].try_into().unwrap()), + duration_ms: u32::from_le_bytes(b[17..21].try_into().unwrap()), + }) + } +} + /// Frame a message for the control stream: `u16 LE length || payload`. pub fn frame(payload: &[u8]) -> Vec { let mut b = Vec::with_capacity(2 + payload.len()); @@ -1149,6 +1250,7 @@ mod tests { frames: 600, compositor: CompositorPref::Gamescope, gamepad: GamepadPref::DualSense, + bitrate_kbps: 50_000, }; assert_eq!(Welcome::decode(&w.encode()).unwrap(), w); } @@ -1164,6 +1266,7 @@ mod tests { }, compositor: CompositorPref::Kwin, gamepad: GamepadPref::DualSense, + bitrate_kbps: 25_000, }; assert_eq!(Hello::decode(&h.encode()).unwrap(), h); let s = Start { @@ -1227,18 +1330,26 @@ mod tests { }, compositor: CompositorPref::Mutter, gamepad: GamepadPref::DualSense, + bitrate_kbps: 80_000, }; let enc = h.encode(); - assert_eq!(enc.len(), 22); - // Legacy (20-byte) Hello → both Auto, mode intact. + assert_eq!(enc.len(), 26); + // Legacy (20-byte) Hello → both Auto, no bitrate, mode intact. let legacy = Hello::decode(&enc[..20]).unwrap(); assert_eq!(legacy.compositor, CompositorPref::Auto); assert_eq!(legacy.gamepad, GamepadPref::Auto); + assert_eq!(legacy.bitrate_kbps, 0); assert_eq!(legacy.mode, h.mode); // Compositor-era (21-byte) Hello → compositor intact, gamepad Auto. let mid = Hello::decode(&enc[..21]).unwrap(); assert_eq!(mid.compositor, CompositorPref::Mutter); assert_eq!(mid.gamepad, GamepadPref::Auto); + // Gamepad-era (22-byte) Hello → compositor + gamepad intact, bitrate 0 (host default). + let pre_bitrate = Hello::decode(&enc[..22]).unwrap(); + assert_eq!(pre_bitrate.gamepad, GamepadPref::DualSense); + assert_eq!(pre_bitrate.bitrate_kbps, 0); + // Full message → bitrate intact. + assert_eq!(Hello::decode(&enc).unwrap().bitrate_kbps, 80_000); let w = Welcome { abi_version: 2, @@ -1256,17 +1367,24 @@ mod tests { frames: 0, compositor: CompositorPref::Kwin, gamepad: GamepadPref::Xbox360, + bitrate_kbps: 120_000, }; let wenc = w.encode(); - assert_eq!(wenc.len(), 55); + assert_eq!(wenc.len(), 59); let legacy_w = Welcome::decode(&wenc[..53]).unwrap(); assert_eq!(legacy_w.compositor, CompositorPref::Auto); assert_eq!(legacy_w.gamepad, GamepadPref::Auto); + assert_eq!(legacy_w.bitrate_kbps, 0); assert_eq!(legacy_w.frames, 0); assert_eq!(legacy_w.key, w.key); let mid_w = Welcome::decode(&wenc[..54]).unwrap(); assert_eq!(mid_w.compositor, CompositorPref::Kwin); assert_eq!(mid_w.gamepad, GamepadPref::Auto); + // Gamepad-era (55-byte) Welcome → gamepad intact, bitrate 0 (unknown). + let pre_bitrate_w = Welcome::decode(&wenc[..55]).unwrap(); + assert_eq!(pre_bitrate_w.gamepad, GamepadPref::Xbox360); + assert_eq!(pre_bitrate_w.bitrate_kbps, 0); + assert_eq!(Welcome::decode(&wenc).unwrap().bitrate_kbps, 120_000); } #[test] @@ -1297,6 +1415,25 @@ mod tests { .is_err()); } + #[test] + fn probe_messages_roundtrip() { + let req = ProbeRequest { + target_kbps: 250_000, + duration_ms: 2000, + }; + assert_eq!(ProbeRequest::decode(&req.encode()).unwrap(), req); + let res = ProbeResult { + bytes_sent: 62_500_000, + packets_sent: 480, + duration_ms: 2003, + }; + assert_eq!(ProbeResult::decode(&res.encode()).unwrap(), res); + // Type bytes keep the control messages disjoint from each other. + assert!(ProbeRequest::decode(&res.encode()).is_err()); + assert!(Reconfigure::decode(&req.encode()).is_err()); + assert!(ProbeResult::decode(&req.encode()).is_err()); + } + #[test] fn control_messages_disjoint_from_hello() { // A Hello uses MAGIC (PKF1); control messages use CTL_MAGIC (PKFc). No Hello — at @@ -1311,6 +1448,7 @@ mod tests { }, compositor: CompositorPref::Auto, gamepad: GamepadPref::Auto, + bitrate_kbps: 0, } .encode(); assert!(PairRequest::decode(&h).is_err(), "abi {abi} parsed as pair"); diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index d7391d3..3c9cd21 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -25,10 +25,10 @@ use anyhow::{anyhow, Context, Result}; use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref, Role}; use punktfunk_core::input::{InputEvent, InputKind}; -use punktfunk_core::packet::{FLAG_PIC, FLAG_SOF}; +use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF}; use punktfunk_core::quic::{ - endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, Reconfigure, - Reconfigured, Start, Welcome, + endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, ProbeRequest, + ProbeResult, Reconfigure, Reconfigured, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::Session; @@ -228,6 +228,27 @@ pub(crate) async fn serve(opts: M3Options, np: Arc) -> Result<()> /// connects and never finishes the handshake would otherwise wedge the host for everyone. const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +/// Encoder bitrate (kbps) the host falls back to when the client expresses no preference +/// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its +/// link (e.g. after a speed test) requests an explicit rate instead. +const DEFAULT_BITRATE_KBPS: u32 = 20_000; +/// Bounds a client's requested bitrate to a sane range before configuring NVENC: a 500 kbps floor +/// keeps the stream above unusable, and a 500 Mbps ceiling guards against an absurd request +/// exhausting the encoder / link (GF(2¹⁶) FEC lifts the old ~1 Gbps wall, but 500 Mbps already +/// covers 5K@240). Resolved value is echoed in `Welcome::bitrate_kbps`. +const MIN_BITRATE_KBPS: u32 = 500; +const MAX_BITRATE_KBPS: u32 = 500_000; + +/// Resolve a client's [`Hello::bitrate_kbps`] request to the rate the host will configure: +/// `0` → host default; anything else clamped into `[MIN, MAX]`. +fn resolve_bitrate_kbps(requested: u32) -> u32 { + if requested == 0 { + DEFAULT_BITRATE_KBPS + } else { + requested.clamp(MIN_BITRATE_KBPS, MAX_BITRATE_KBPS) + } +} + /// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream /// path): keeps one warm PipeWire capture stream instead of a connect/negotiate cycle — /// and a daemon-side node churn — per session. (Drop now tears a capturer down cleanly.) @@ -391,6 +412,14 @@ async fn serve_session( // needed; the actual pads are created lazily by the input thread). let gamepad = resolve_gamepad(hello.gamepad); + // Resolve the encoder bitrate (client request clamped to a sane range, or host default). + let bitrate_kbps = resolve_bitrate_kbps(hello.bitrate_kbps); + tracing::info!( + requested_kbps = hello.bitrate_kbps, + resolved_kbps = bitrate_kbps, + "encoder bitrate" + ); + // Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport). let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; let udp_port = probe.local_addr()?.port(); @@ -422,6 +451,7 @@ async fn serve_session( .map(|c| c.as_pref()) .unwrap_or(CompositorPref::Auto), gamepad, + bitrate_kbps, }; io::write_msg(&mut send, &welcome.encode()).await?; @@ -444,39 +474,62 @@ async fn serve_session( "handshake complete — streaming" ); - // Control task: the handshake stream stays open for mid-stream renegotiation. A - // validated Reconfigure is acked, then handed to the data-plane thread, which rebuilds - // capture/encoder/virtual output at the new mode (the data plane itself is untouched). + // Control task: the handshake stream stays open for mid-stream renegotiation and speed + // tests. A validated Reconfigure is acked, then handed to the data-plane thread, which + // rebuilds capture/encoder/virtual output at the new mode (the data plane itself is + // untouched). A ProbeRequest is handed to the data plane, which bursts FLAG_PROBE filler and + // hands back a ProbeResult that this task writes to the client. The two control directions + // (inbound requests, outbound probe results) are multiplexed with `select!`. let (reconfig_tx, reconfig_rx) = std::sync::mpsc::channel::(); + let (probe_tx, probe_rx) = std::sync::mpsc::channel::(); + let (probe_result_tx, mut probe_result_rx) = + tokio::sync::mpsc::unbounded_channel::(); tokio::spawn(async move { let mut active = hello.mode; - while let Ok(msg) = io::read_msg(&mut ctrl_recv).await { - let Ok(req) = Reconfigure::decode(&msg) else { - tracing::warn!("unknown control message — ignoring"); - continue; - }; - let ok = req.mode.refresh_hz > 0 - && crate::encode::validate_dimensions( - crate::encode::Codec::H265, - req.mode.width, - req.mode.height, - ) - .is_ok(); - if ok { - active = req.mode; - tracing::info!(mode = ?req.mode, "mode switch accepted"); - } else { - tracing::warn!(mode = ?req.mode, "mode switch rejected (invalid dimensions)"); - } - let ack = Reconfigured { - accepted: ok, - mode: active, - }; - if io::write_msg(&mut ctrl_send, &ack.encode()).await.is_err() { - break; - } - if ok && reconfig_tx.send(req.mode).is_err() { - break; // data plane gone + loop { + tokio::select! { + msg = io::read_msg(&mut ctrl_recv) => { + let Ok(msg) = msg else { break }; // stream closed + if let Ok(req) = Reconfigure::decode(&msg) { + let ok = req.mode.refresh_hz > 0 + && crate::encode::validate_dimensions( + crate::encode::Codec::H265, + req.mode.width, + req.mode.height, + ) + .is_ok(); + if ok { + active = req.mode; + tracing::info!(mode = ?req.mode, "mode switch accepted"); + } else { + tracing::warn!(mode = ?req.mode, "mode switch rejected (invalid dimensions)"); + } + let ack = Reconfigured { accepted: ok, mode: active }; + if io::write_msg(&mut ctrl_send, &ack.encode()).await.is_err() { + break; + } + if ok && reconfig_tx.send(req.mode).is_err() { + break; // data plane gone + } + } else if let Ok(req) = ProbeRequest::decode(&msg) { + tracing::info!( + target_kbps = req.target_kbps, + duration_ms = req.duration_ms, + "speed-test probe requested" + ); + if probe_tx.send(req).is_err() { + break; // data plane gone + } + } else { + tracing::warn!("unknown control message — ignoring"); + } + } + result = probe_result_rx.recv() => { + let Some(result) = result else { break }; // data plane gone + if io::write_msg(&mut ctrl_send, &result.encode()).await.is_err() { + break; + } + } } } }); @@ -593,6 +646,7 @@ async fn serve_session( let source = opts.source; let (seconds, frames) = (opts.seconds, opts.frames); let mode = hello.mode; + let bitrate_kbps = welcome.bitrate_kbps; // resolved encoder bitrate (Hello clamped, or default) let stop_stream = stop.clone(); let result: Result<()> = async { tokio::task::spawn_blocking(move || -> Result<()> { @@ -602,7 +656,13 @@ async fn serve_session( let mut session = Session::new(cfg, Box::new(transport)) .map_err(|e| anyhow!("host session: {e:?}"))?; match source { - M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream), + M3Source::Synthetic => synthetic_stream( + &mut session, + frames, + &stop_stream, + &probe_rx, + &probe_result_tx, + ), M3Source::Virtual => { let compositor = compositor .expect("the Virtual source resolves a compositor during the handshake"); @@ -613,6 +673,9 @@ async fn serve_session( &stop_stream, &reconfig_rx, compositor, + bitrate_kbps, + &probe_rx, + &probe_result_tx, ) } } @@ -1107,12 +1170,20 @@ fn audio_thread(_conn: quinn::Connection, _stop: Arc, _audio_cap: Au ); } -fn synthetic_stream(session: &mut Session, frames: u32, stop: &AtomicBool) -> Result<()> { +fn synthetic_stream( + session: &mut Session, + frames: u32, + stop: &AtomicBool, + probe_rx: &std::sync::mpsc::Receiver, + probe_result_tx: &tokio::sync::mpsc::UnboundedSender, +) -> Result<()> { let interval = std::time::Duration::from_millis(1000 / 60); for idx in 0..frames { if stop.load(Ordering::SeqCst) { break; } + // Service speed-test probes between synthetic frames (loopback bandwidth tests). + service_probes(session, stop, probe_rx, probe_result_tx); let data = test_frame(idx, 64 * 1024); session .submit_frame(&data, now_ns(), (FLAG_PIC | FLAG_SOF) as u32) @@ -1223,13 +1294,88 @@ fn resolve_compositor(pref: CompositorPref) -> Result ProbeResult { + let target_kbps = req.target_kbps.min(MAX_PROBE_KBPS); + let duration_ms = req.duration_ms.min(MAX_PROBE_MS); + if target_kbps == 0 || duration_ms == 0 { + return ProbeResult { + bytes_sent: 0, + packets_sent: 0, + duration_ms: 0, + }; + } + // kbps -> bytes/s (x1000/8). + let bytes_per_sec = target_kbps as u64 * 125; + // ~240 AUs/s for smooth pacing, each capped so one submit_frame stays a bounded burst (a large + // AU fragments into many UDP shards via sendmmsg). + let chunk = (bytes_per_sec / 240).clamp(1200, 256 * 1024) as usize; + let filler = vec![0u8; chunk]; + let start = std::time::Instant::now(); + let deadline = start + std::time::Duration::from_millis(duration_ms as u64); + let mut bytes_sent = 0u64; + let mut packets_sent = 0u32; + while std::time::Instant::now() < deadline && !stop.load(Ordering::SeqCst) { + let allowed = (start.elapsed().as_secs_f64() * bytes_per_sec as f64) as u64; + if bytes_sent < allowed { + // A full send buffer drops on WouldBlock (UdpTransport returns Ok) — that loss is part + // of what the probe measures, so count what we offered and keep going. + let _ = session.submit_frame(&filler, now_ns(), FLAG_PROBE as u32); + bytes_sent += chunk as u64; + packets_sent += 1; + } else { + std::thread::sleep(std::time::Duration::from_micros(200)); + } + } + let actual_ms = start.elapsed().as_millis() as u32; + tracing::info!( + target_kbps, + duration_ms = actual_ms, + bytes_sent, + packets_sent, + "speed-test probe burst complete" + ); + ProbeResult { + bytes_sent, + packets_sent, + duration_ms: actual_ms, + } +} + +/// Drain any pending speed-test requests and run each burst, replying with its [`ProbeResult`]. +/// Called once per data-plane loop iteration so a probe runs between frames. +fn service_probes( + session: &mut Session, + stop: &AtomicBool, + probe_rx: &std::sync::mpsc::Receiver, + probe_result_tx: &tokio::sync::mpsc::UnboundedSender, +) { + while let Ok(req) = probe_rx.try_recv() { + let result = run_probe_burst(session, req, stop); + let _ = probe_result_tx.send(result); + } +} + /// Real capture→encode→punktfunk/1: a native virtual output at the client's mode, NVENC AUs /// stamped with the capture wall clock (the client derives per-frame pipeline latency). /// /// `reconfig` delivers accepted mid-stream mode switches: the capture/encode pipeline is /// rebuilt at the new mode (capturer drop tears down the PipeWire stream and, via its /// keepalive, the virtual output) while the data-plane `session` continues untouched — -/// the rebuilt encoder opens with an IDR + in-band parameter sets. +/// the rebuilt encoder opens with an IDR + in-band parameter sets. `probe_rx`/`probe_result_tx` +/// carry speed-test bursts (see [`service_probes`]). +#[allow(clippy::too_many_arguments)] fn virtual_stream( session: &mut Session, mode: punktfunk_core::Mode, @@ -1237,20 +1383,26 @@ fn virtual_stream( stop: &AtomicBool, reconfig: &std::sync::mpsc::Receiver, compositor: crate::vdisplay::Compositor, + bitrate_kbps: u32, + probe_rx: &std::sync::mpsc::Receiver, + probe_result_tx: &tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { tracing::info!( compositor = compositor.id(), ?mode, + bitrate_kbps, "punktfunk/1 virtual display" ); let mut vd = crate::vdisplay::open(compositor)?; let (mut capturer, mut enc, mut frame, mut interval) = - build_pipeline_with_retry(&mut vd, mode)?; + build_pipeline_with_retry(&mut vd, mode, bitrate_kbps)?; let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let mut next = std::time::Instant::now(); let mut sent: u64 = 0; while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { + // Service speed-test probes between frames (each burst pauses video for its duration). + service_probes(session, stop, probe_rx, probe_result_tx); // Drain to the NEWEST requested mode (a resize drag queues many) so we rebuild once, // not once per stale intermediate mode. let mut want = None; @@ -1262,7 +1414,7 @@ fn virtual_stream( // Build the new pipeline BEFORE dropping the old one: the host already acked // the switch as accepted, so a rebuild failure must not kill an otherwise // healthy session — keep streaming the current mode and log instead. - match build_pipeline(&mut vd, new_mode) { + match build_pipeline(&mut vd, new_mode, bitrate_kbps) { Ok(next_pipe) => { (capturer, enc, frame, interval) = next_pipe; next = std::time::Instant::now(); @@ -1321,11 +1473,12 @@ type Pipeline = ( fn build_pipeline_with_retry( vd: &mut Box, mode: punktfunk_core::Mode, + bitrate_kbps: u32, ) -> Result { const MAX_ATTEMPTS: u32 = 4; let mut backoff = std::time::Duration::from_millis(500); for attempt in 1..=MAX_ATTEMPTS { - match build_pipeline(vd, mode) { + match build_pipeline(vd, mode, bitrate_kbps) { Ok(pipe) => { if attempt > 1 { tracing::info!(attempt, "pipeline up after retry"); @@ -1382,6 +1535,7 @@ fn is_permanent_build_error(chain: &str) -> bool { fn build_pipeline( vd: &mut Box, mode: punktfunk_core::Mode, + bitrate_kbps: u32, ) -> Result { let vout = vd.create(mode).context("create virtual output")?; // The backend reports the refresh it actually achieved in `preferred_mode.2` (KWin may cap a @@ -1410,7 +1564,7 @@ fn build_pipeline( frame.width, frame.height, effective_hz, - 20_000_000, + bitrate_kbps as u64 * 1000, frame.is_cuda(), ) .context("open NVENC")?; @@ -1762,6 +1916,7 @@ mod tests { mode, CompositorPref::Auto, GamepadPref::Auto, + 0, None, None, timeout @@ -1786,6 +1941,7 @@ mod tests { mode, CompositorPref::Auto, GamepadPref::Auto, + 0, Some(host_fp), Some((cert.clone(), key.clone())), timeout, diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index bb8acbb..0841205 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -137,6 +137,11 @@ #define FLAG_SOF 4 +// Bandwidth-probe filler, not decodable video: a [`crate::quic::ProbeRequest`] speed test makes +// the host burst access units carrying this flag so the client measures throughput/loss without +// feeding them to the decoder. Punktfunk/1 only (GameStream never sets it). +#define FLAG_PROBE 8 + // Largest UDP datagram the core will send or accept. `Config::validate` bounds // `shard_payload` so `HEADER_LEN + shard_payload + CRYPTO_OVERHEAD ≤ MAX_DATAGRAM_BYTES`. #define MAX_DATAGRAM_BYTES 2048 @@ -151,6 +156,16 @@ #define MSG_RECONFIGURED 2 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`ProbeRequest`]. +#define MSG_PROBE_REQUEST 32 +#endif + +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`ProbeResult`]. +#define MSG_PROBE_RESULT 33 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Type byte of [`PairRequest`]. #define MSG_PAIR_REQUEST 16 @@ -408,6 +423,26 @@ typedef struct { } PunktfunkRichInput; #endif +// A speed-test measurement, filled by [`punktfunk_connection_probe_result`]. `done` is 0 until +// the host's end-of-burst report lands, then 1 (the numbers are final). `throughput_kbps` is the +// measured goodput to drive a bitrate choice from; `loss_pct` is the delivery loss at that rate. +typedef struct { + // 1 once the host's end-of-burst report arrived (measurement final); else 0 (partial). + uint8_t done; + // Probe payload bytes / packets the client received. + uint64_t recv_bytes; + uint32_t recv_packets; + // Probe payload bytes / packets the host reported sending. + uint64_t host_bytes; + uint32_t host_packets; + // Client-measured receive window (first→last probe AU), milliseconds. + uint32_t elapsed_ms; + // Measured goodput = `recv_bytes * 8 / elapsed_ms` (kilobits/second). + uint32_t throughput_kbps; + // Delivery loss `(host_bytes - recv_bytes) / host_bytes` as a percentage (0 if unknown). + float loss_pct; +} PunktfunkProbeResult; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -562,6 +597,30 @@ PunktfunkConnection *punktfunk_connect_ex2(const char *host, uint32_t timeout_ms); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Like [`punktfunk_connect_ex2`], but additionally requests the video encoder `bitrate_kbps` +// (kilobits per second). `0` lets the host pick its default; any other value is clamped to the +// host's supported range. After a speed test ([`punktfunk_connection_speed_test`]) a client can +// reconnect (or pick at connect time) with the measured rate. The value the host actually +// configured is readable via [`punktfunk_connection_bitrate`]. +// +// # Safety +// Same as [`punktfunk_connect`]. +PunktfunkConnection *punktfunk_connect_ex3(const char *host, + uint16_t port, + uint32_t width, + uint32_t height, + uint32_t refresh_hz, + uint32_t compositor, + uint32_t gamepad, + uint32_t bitrate_kbps, + const uint8_t *pin_sha256, + uint8_t *observed_sha256_out, + const char *client_cert_pem, + const char *client_key_pem, + uint32_t timeout_ms); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Generate a persistent client identity: a self-signed certificate + private key, both // PEM, NUL-terminated, written into the caller's buffers. Generate ONCE, store both @@ -715,6 +774,16 @@ PunktfunkStatus punktfunk_connection_mode(const PunktfunkConnection *c, PunktfunkStatus punktfunk_connection_gamepad(const PunktfunkConnection *c, uint32_t *gamepad); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// The video encoder bitrate (kilobits per second) the host actually configured for this session +// — the [`punktfunk_connect_ex3`] request clamped to the host's range, or its default when `0` +// was requested. `0` = an older host that didn't report it. Safe any time after connect. +// +// # Safety +// `c` is a valid connection handle; `bitrate_kbps` is writable (NULL is skipped). +PunktfunkStatus punktfunk_connection_bitrate(const PunktfunkConnection *c, uint32_t *bitrate_kbps); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Ask the host to switch the live session to `width`x`height`@`refresh_hz` without // reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the @@ -731,6 +800,30 @@ PunktfunkStatus punktfunk_connection_request_mode(const PunktfunkConnection *c, uint32_t refresh_hz); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Start a bandwidth speed test: ask the host to burst filler over the data plane at +// `target_kbps` of goodput for `duration_ms` (each clamped host-side to ≤ 1 Gbps / ≤ 5 s), +// *briefly pausing video*. Non-blocking — poll [`punktfunk_connection_probe_result`] until its +// `done` field is 1. Starting a probe resets any prior measurement. +// +// # Safety +// `c` is a valid connection handle. +PunktfunkStatus punktfunk_connection_speed_test(const PunktfunkConnection *c, + uint32_t target_kbps, + uint32_t duration_ms); +#endif + +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Read the current speed-test measurement into `*out` (partial until `out->done == 1`). Safe to +// poll repeatedly after [`punktfunk_connection_speed_test`]; before any probe it reports zeros. +// +// # Safety +// `c` is a valid connection handle; `out` is writable for one `PunktfunkProbeResult` (NULL is an +// error). +PunktfunkStatus punktfunk_connection_probe_result(const PunktfunkConnection *c, + PunktfunkProbeResult *out); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Close the connection and free the handle (joins the internal threads). NULL is a no-op. //