From 94552331efacee907f015fb8870fa4392a60e38e Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 12 Jun 2026 12:42:09 +0000 Subject: [PATCH] feat(host): concurrent punktfunk/1 sessions (bounded by --max-concurrent) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The accept loop no longer awaits each session inline — it spawns each onto a JoinSet, bounded by a semaphore (--max-concurrent, default 4: a NVENC session bound; overflow clients wait in QUIC's accept backlog until a slot frees). The QUIC handshake stays in the accept loop so a failed handshake (e.g. a pin mismatch where the client aborts) doesn't consume a session slot or block accepting the next client; the slow part (control handshake, pairing, the capture/encode pipeline) runs in the spawned task. Each session already had its own virtual output + NVENC encoder; the host-lifetime input/audio/mic services stay shared — the natural "multiple devices viewing/controlling the same desktop" semantic on kwin/mutter/wlroots. gamescope's independent-desktops (per-session input/audio) isolation is a follow-up. New M3Options.max_concurrent + the `--max-concurrent` CLI flag. Validated live (GNOME box): two clients connected at once -> two independent Mutter virtual outputs (720p60 + 1080p60) streaming simultaneously (39 MB + 48 MB). All 61 host tests green (the c_abi/pairing tests exercise the new loop + the failed-handshake-doesn't-count semantics). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-host/src/m3.rs | 103 ++++++++++++++++++++++-------- crates/punktfunk-host/src/main.rs | 5 ++ docs-site/content/docs/status.md | 12 +++- 3 files changed, 94 insertions(+), 26 deletions(-) diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 46911eb..45dc987 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -53,6 +53,12 @@ pub struct M3Options { pub frames: u32, /// Exit after this many sessions (0 = serve forever). pub max_sessions: u32, + /// Maximum sessions streaming **at once** (a NVENC/GPU bound); further clients wait in the + /// accept queue until a slot frees. Concurrent sessions each get their own virtual output + + /// encoder but share the host-lifetime input/audio/mic services — i.e. multiple devices viewing + /// (and controlling) the *same* desktop on the shared-desktop backends (kwin/mutter/wlroots). + /// `0` = unlimited (bounded only by the GPU). Default a conservative few. + pub max_concurrent: usize, /// Only serve clients whose certificate fingerprint is in the paired set. Implies /// `allow_pairing` (a host that requires pairing must accept ceremonies to admit /// anyone). @@ -127,6 +133,11 @@ pub(crate) struct NativeServe { /// Options for the native host when the unified `serve --native` runs it: real virtual capture, /// persistent (no session/duration cut), pairing armed on demand via the management API (the /// shared [`NativePairing`] starts disarmed). +/// Default cap on simultaneously-streaming sessions (each holds an NVENC session; high-res +/// split-encode holds two). Conservative — consumer NVENC historically capped concurrent sessions; +/// overflow clients wait in the accept queue. Override with `--max-concurrent`. +pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 4; + pub(crate) fn native_serve_opts(cfg: &NativeServe) -> M3Options { M3Options { port: cfg.port, @@ -134,6 +145,7 @@ pub(crate) fn native_serve_opts(cfg: &NativeServe) -> M3Options { seconds: 7 * 24 * 3600, // per-session cap; large enough not to cut a live stream frames: 0, max_sessions: 0, + max_concurrent: DEFAULT_MAX_CONCURRENT, require_pairing: cfg.require_pairing, allow_pairing: false, pairing_pin: None, @@ -202,45 +214,84 @@ pub(crate) async fn serve(opts: M3Options, np: Arc) -> Result<()> "PAIRING ARMED — enter this PIN on the client to pair: {pin}" ); } - let last_pairing = std::sync::Mutex::new(None::); + let last_pairing = Arc::new(std::sync::Mutex::new(None::)); + let opts = Arc::new(opts); + + // Concurrency: serve up to `max_concurrent` sessions at once. Each gets its own virtual output + + // NVENC encoder; they share the host-lifetime input/audio/mic services — i.e. multiple devices + // viewing (and controlling) the SAME desktop on the shared-desktop backends. A permit is taken + // before accepting, so overflow clients wait in QUIC's accept backlog until a slot frees; + // `max_concurrent == 0` means unlimited (GPU-bounded). The heavy handshake + pipeline run inside + // the spawned task, so a slow client never blocks the accept loop. + let permits = match opts.max_concurrent { + 0 => tokio::sync::Semaphore::MAX_PERMITS, + n => n, + }; + let sem = Arc::new(tokio::sync::Semaphore::new(permits)); + let mut sessions = tokio::task::JoinSet::new(); + let max_sessions = opts.max_sessions; + let mut accepted = 0u32; + tracing::info!( + max_concurrent = opts.max_concurrent, + "accepting sessions (concurrent)" + ); - let mut served = 0u32; loop { - let incoming = ep - .accept() + let permit = sem + .clone() + .acquire_owned() .await - .ok_or_else(|| anyhow!("endpoint closed"))?; + .expect("session semaphore is never closed"); + let incoming = match ep.accept().await { + Some(i) => i, + None => break, // endpoint closed + }; + // Complete the QUIC handshake in the accept loop (it's ~1 RTT): a failed handshake (e.g. a + // pin mismatch — the client aborts) must NOT consume a session slot, mirroring the old + // serial loop. The slow part (control handshake, pairing, the capture/encode pipeline) runs + // in the spawned task, so a slow client still never blocks accepting the next one. let conn = match incoming.await { Ok(c) => c, Err(e) => { tracing::warn!(error = %e, "QUIC accept failed"); - continue; + continue; // `permit` drops here → slot freed; not counted toward max_sessions } }; let peer = conn.remote_address(); tracing::info!(%peer, "punktfunk/1 client connected"); - if let Err(e) = serve_session( - conn, - &opts, - &audio_cap, - injector.sender(), - mic_service.sender(), - &fingerprint, - &np, - &last_pairing, - ) - .await - { - tracing::warn!(%peer, error = %format!("{e:#}"), "session ended with error"); - } else { - tracing::info!(%peer, "session complete"); - } - served += 1; - if opts.max_sessions != 0 && served >= opts.max_sessions { + let opts = opts.clone(); + let audio_cap = audio_cap.clone(); + let np = np.clone(); + let last_pairing = last_pairing.clone(); + let inj_tx = injector.sender(); + let mic_tx = mic_service.sender(); + sessions.spawn(async move { + let _permit = permit; // held for the session's lifetime; frees a slot on completion + match serve_session( + conn, + &opts, + &audio_cap, + inj_tx, + mic_tx, + &fingerprint, + &np, + &last_pairing, + ) + .await + { + Ok(()) => tracing::info!(%peer, "session complete"), + Err(e) => { + tracing::warn!(%peer, error = %format!("{e:#}"), "session ended with error") + } + } + }); + accepted += 1; + if max_sessions != 0 && accepted >= max_sessions { break; } - tracing::info!("ready for the next client"); } + // Stop accepting; let the in-flight sessions finish (max_sessions reached or endpoint closed). + while sessions.join_next().await.is_some() {} ep.wait_idle().await; Ok(()) } @@ -2014,6 +2065,7 @@ mod tests { seconds: 0, frames: 25, max_sessions: 3, + max_concurrent: 1, require_pairing: false, allow_pairing: false, pairing_pin: None, @@ -2169,6 +2221,7 @@ mod tests { seconds: 0, frames: 25, max_sessions: 4, + max_concurrent: 1, require_pairing: true, allow_pairing: false, pairing_pin: Some("4321".into()), diff --git a/crates/punktfunk-host/src/main.rs b/crates/punktfunk-host/src/main.rs index 77f33e2..2a6ae05 100644 --- a/crates/punktfunk-host/src/main.rs +++ b/crates/punktfunk-host/src/main.rs @@ -158,6 +158,9 @@ fn real_main() -> Result<()> { max_sessions: get("--max-sessions") .and_then(|s| s.parse().ok()) .unwrap_or(0), + max_concurrent: get("--max-concurrent") + .and_then(|s| s.parse().ok()) + .unwrap_or(m3::DEFAULT_MAX_CONCURRENT), require_pairing: args.iter().any(|a| a == "--require-pairing"), allow_pairing: args.iter().any(|a| a == "--allow-pairing"), pairing_pin: None, @@ -417,6 +420,8 @@ M3-HOST OPTIONS: --seconds per-session stream duration, virtual source (default: 30) --frames per-session frame count, synthetic source (default: 300) --max-sessions exit after N sessions; 0 = serve forever (default: 0) + --max-concurrent stream at most N sessions at once (NVENC bound); overflow waits + in the accept queue; 0 = unlimited (default: 4) --allow-pairing accept PIN pairing ceremonies (arm pairing mode) --require-pairing only serve PIN-paired clients (implies --allow-pairing; the host logs a 4-digit PIN when a client starts pairing) diff --git a/docs-site/content/docs/status.md b/docs-site/content/docs/status.md index 7021aa4..4b8be3f 100644 --- a/docs-site/content/docs/status.md +++ b/docs-site/content/docs/status.md @@ -29,6 +29,14 @@ All three appliances advertise over mDNS (`_punktfunk._udp`) and require PIN pai ## Progress log ### 2026-06-12 +- **Concurrent sessions** — the host no longer serves one client at a time. The accept loop spawns + each session (`JoinSet`), bounded by `--max-concurrent` (default 4, a NVENC bound; overflow waits + in the accept queue). Each session keeps its own virtual output + encoder; they share the + host-lifetime input/audio/mic services — i.e. **multiple devices viewing/controlling the same + desktop** on kwin/mutter/wlroots. Validated live on the GNOME box: two clients connected at once + → **two independent Mutter virtual outputs (1280×720 + 1920×1080) streaming simultaneously** + (39 MB + 48 MB). gamescope's *independent-desktops* (multi-user) isolation — per-session + input/audio — is a follow-up. - **Apple client latency HUD** — `PunktfunkConnection.clockOffsetNs` (from the C-ABI getter) + `LatencyMeter` surface a skew-corrected **capture→client-receipt** p50/p95 in the macOS HUD: the first cross-machine latency the real Apple client reports. (Stage-1 `AVSampleBufferDisplayLayer` @@ -59,5 +67,7 @@ All three appliances advertise over mDNS (`_punktfunk._udp`) and require PIN pai See the [Roadmap](/docs/roadmap) for the ordered list. Near-term: - **True glass-to-glass**: Apple client present-stamp (decode→present) + host render→capture term. - **Apple stage-2 presenter** (`VTDecompressionSession` → `CAMetalLayer`). -- **Mandatory PIN pairing + delegated pairing approval**; concurrent sessions. +- **Mandatory PIN pairing + delegated pairing approval** (an already-paired device approves a new one). +- **gamescope multi-user isolation** — per-session input/audio so concurrent sessions are independent + desktops (the shared-desktop multi-view case landed). - **bazzite** kept up to date (currently offline; one rebuild behind).