feat(host): concurrent punktfunk/1 sessions (bounded by --max-concurrent)
ci / web (push) Failing after 32s
ci / docs-site (push) Failing after 34s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 17s
ci / rust (push) Successful in 5m25s
apple / swift (push) Successful in 1m23s
ci / web (push) Failing after 32s
ci / docs-site (push) Failing after 34s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 17s
ci / rust (push) Successful in 5m25s
apple / swift (push) Successful in 1m23s
The accept loop no longer awaits each session inline — it spawns each onto a JoinSet, bounded by a semaphore (--max-concurrent, default 4: a NVENC session bound; overflow clients wait in QUIC's accept backlog until a slot frees). The QUIC handshake stays in the accept loop so a failed handshake (e.g. a pin mismatch where the client aborts) doesn't consume a session slot or block accepting the next client; the slow part (control handshake, pairing, the capture/encode pipeline) runs in the spawned task. Each session already had its own virtual output + NVENC encoder; the host-lifetime input/audio/mic services stay shared — the natural "multiple devices viewing/controlling the same desktop" semantic on kwin/mutter/wlroots. gamescope's independent-desktops (per-session input/audio) isolation is a follow-up. New M3Options.max_concurrent + the `--max-concurrent` CLI flag. Validated live (GNOME box): two clients connected at once -> two independent Mutter virtual outputs (720p60 + 1080p60) streaming simultaneously (39 MB + 48 MB). All 61 host tests green (the c_abi/pairing tests exercise the new loop + the failed-handshake-doesn't-count semantics). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -53,6 +53,12 @@ pub struct M3Options {
|
||||
pub frames: u32,
|
||||
/// Exit after this many sessions (0 = serve forever).
|
||||
pub max_sessions: u32,
|
||||
/// Maximum sessions streaming **at once** (a NVENC/GPU bound); further clients wait in the
|
||||
/// accept queue until a slot frees. Concurrent sessions each get their own virtual output +
|
||||
/// encoder but share the host-lifetime input/audio/mic services — i.e. multiple devices viewing
|
||||
/// (and controlling) the *same* desktop on the shared-desktop backends (kwin/mutter/wlroots).
|
||||
/// `0` = unlimited (bounded only by the GPU). Default a conservative few.
|
||||
pub max_concurrent: usize,
|
||||
/// Only serve clients whose certificate fingerprint is in the paired set. Implies
|
||||
/// `allow_pairing` (a host that requires pairing must accept ceremonies to admit
|
||||
/// anyone).
|
||||
@@ -127,6 +133,11 @@ pub(crate) struct NativeServe {
|
||||
/// Options for the native host when the unified `serve --native` runs it: real virtual capture,
|
||||
/// persistent (no session/duration cut), pairing armed on demand via the management API (the
|
||||
/// shared [`NativePairing`] starts disarmed).
|
||||
/// Default cap on simultaneously-streaming sessions (each holds an NVENC session; high-res
|
||||
/// split-encode holds two). Conservative — consumer NVENC historically capped concurrent sessions;
|
||||
/// overflow clients wait in the accept queue. Override with `--max-concurrent`.
|
||||
pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 4;
|
||||
|
||||
pub(crate) fn native_serve_opts(cfg: &NativeServe) -> M3Options {
|
||||
M3Options {
|
||||
port: cfg.port,
|
||||
@@ -134,6 +145,7 @@ pub(crate) fn native_serve_opts(cfg: &NativeServe) -> M3Options {
|
||||
seconds: 7 * 24 * 3600, // per-session cap; large enough not to cut a live stream
|
||||
frames: 0,
|
||||
max_sessions: 0,
|
||||
max_concurrent: DEFAULT_MAX_CONCURRENT,
|
||||
require_pairing: cfg.require_pairing,
|
||||
allow_pairing: false,
|
||||
pairing_pin: None,
|
||||
@@ -202,45 +214,84 @@ pub(crate) async fn serve(opts: M3Options, np: Arc<NativePairing>) -> Result<()>
|
||||
"PAIRING ARMED — enter this PIN on the client to pair: {pin}"
|
||||
);
|
||||
}
|
||||
let last_pairing = std::sync::Mutex::new(None::<std::time::Instant>);
|
||||
let last_pairing = Arc::new(std::sync::Mutex::new(None::<std::time::Instant>));
|
||||
let opts = Arc::new(opts);
|
||||
|
||||
// Concurrency: serve up to `max_concurrent` sessions at once. Each gets its own virtual output +
|
||||
// NVENC encoder; they share the host-lifetime input/audio/mic services — i.e. multiple devices
|
||||
// viewing (and controlling) the SAME desktop on the shared-desktop backends. A permit is taken
|
||||
// before accepting, so overflow clients wait in QUIC's accept backlog until a slot frees;
|
||||
// `max_concurrent == 0` means unlimited (GPU-bounded). The heavy handshake + pipeline run inside
|
||||
// the spawned task, so a slow client never blocks the accept loop.
|
||||
let permits = match opts.max_concurrent {
|
||||
0 => tokio::sync::Semaphore::MAX_PERMITS,
|
||||
n => n,
|
||||
};
|
||||
let sem = Arc::new(tokio::sync::Semaphore::new(permits));
|
||||
let mut sessions = tokio::task::JoinSet::new();
|
||||
let max_sessions = opts.max_sessions;
|
||||
let mut accepted = 0u32;
|
||||
tracing::info!(
|
||||
max_concurrent = opts.max_concurrent,
|
||||
"accepting sessions (concurrent)"
|
||||
);
|
||||
|
||||
let mut served = 0u32;
|
||||
loop {
|
||||
let incoming = ep
|
||||
.accept()
|
||||
let permit = sem
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("endpoint closed"))?;
|
||||
.expect("session semaphore is never closed");
|
||||
let incoming = match ep.accept().await {
|
||||
Some(i) => i,
|
||||
None => break, // endpoint closed
|
||||
};
|
||||
// Complete the QUIC handshake in the accept loop (it's ~1 RTT): a failed handshake (e.g. a
|
||||
// pin mismatch — the client aborts) must NOT consume a session slot, mirroring the old
|
||||
// serial loop. The slow part (control handshake, pairing, the capture/encode pipeline) runs
|
||||
// in the spawned task, so a slow client still never blocks accepting the next one.
|
||||
let conn = match incoming.await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "QUIC accept failed");
|
||||
continue;
|
||||
continue; // `permit` drops here → slot freed; not counted toward max_sessions
|
||||
}
|
||||
};
|
||||
let peer = conn.remote_address();
|
||||
tracing::info!(%peer, "punktfunk/1 client connected");
|
||||
if let Err(e) = serve_session(
|
||||
conn,
|
||||
&opts,
|
||||
&audio_cap,
|
||||
injector.sender(),
|
||||
mic_service.sender(),
|
||||
&fingerprint,
|
||||
&np,
|
||||
&last_pairing,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(%peer, error = %format!("{e:#}"), "session ended with error");
|
||||
} else {
|
||||
tracing::info!(%peer, "session complete");
|
||||
}
|
||||
served += 1;
|
||||
if opts.max_sessions != 0 && served >= opts.max_sessions {
|
||||
let opts = opts.clone();
|
||||
let audio_cap = audio_cap.clone();
|
||||
let np = np.clone();
|
||||
let last_pairing = last_pairing.clone();
|
||||
let inj_tx = injector.sender();
|
||||
let mic_tx = mic_service.sender();
|
||||
sessions.spawn(async move {
|
||||
let _permit = permit; // held for the session's lifetime; frees a slot on completion
|
||||
match serve_session(
|
||||
conn,
|
||||
&opts,
|
||||
&audio_cap,
|
||||
inj_tx,
|
||||
mic_tx,
|
||||
&fingerprint,
|
||||
&np,
|
||||
&last_pairing,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => tracing::info!(%peer, "session complete"),
|
||||
Err(e) => {
|
||||
tracing::warn!(%peer, error = %format!("{e:#}"), "session ended with error")
|
||||
}
|
||||
}
|
||||
});
|
||||
accepted += 1;
|
||||
if max_sessions != 0 && accepted >= max_sessions {
|
||||
break;
|
||||
}
|
||||
tracing::info!("ready for the next client");
|
||||
}
|
||||
// Stop accepting; let the in-flight sessions finish (max_sessions reached or endpoint closed).
|
||||
while sessions.join_next().await.is_some() {}
|
||||
ep.wait_idle().await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -2014,6 +2065,7 @@ mod tests {
|
||||
seconds: 0,
|
||||
frames: 25,
|
||||
max_sessions: 3,
|
||||
max_concurrent: 1,
|
||||
require_pairing: false,
|
||||
allow_pairing: false,
|
||||
pairing_pin: None,
|
||||
@@ -2169,6 +2221,7 @@ mod tests {
|
||||
seconds: 0,
|
||||
frames: 25,
|
||||
max_sessions: 4,
|
||||
max_concurrent: 1,
|
||||
require_pairing: true,
|
||||
allow_pairing: false,
|
||||
pairing_pin: Some("4321".into()),
|
||||
|
||||
@@ -158,6 +158,9 @@ fn real_main() -> Result<()> {
|
||||
max_sessions: get("--max-sessions")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(0),
|
||||
max_concurrent: get("--max-concurrent")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(m3::DEFAULT_MAX_CONCURRENT),
|
||||
require_pairing: args.iter().any(|a| a == "--require-pairing"),
|
||||
allow_pairing: args.iter().any(|a| a == "--allow-pairing"),
|
||||
pairing_pin: None,
|
||||
@@ -417,6 +420,8 @@ M3-HOST OPTIONS:
|
||||
--seconds <N> per-session stream duration, virtual source (default: 30)
|
||||
--frames <N> per-session frame count, synthetic source (default: 300)
|
||||
--max-sessions <N> exit after N sessions; 0 = serve forever (default: 0)
|
||||
--max-concurrent <N> stream at most N sessions at once (NVENC bound); overflow waits
|
||||
in the accept queue; 0 = unlimited (default: 4)
|
||||
--allow-pairing accept PIN pairing ceremonies (arm pairing mode)
|
||||
--require-pairing only serve PIN-paired clients (implies --allow-pairing;
|
||||
the host logs a 4-digit PIN when a client starts pairing)
|
||||
|
||||
@@ -29,6 +29,14 @@ All three appliances advertise over mDNS (`_punktfunk._udp`) and require PIN pai
|
||||
## Progress log
|
||||
|
||||
### 2026-06-12
|
||||
- **Concurrent sessions** — the host no longer serves one client at a time. The accept loop spawns
|
||||
each session (`JoinSet`), bounded by `--max-concurrent` (default 4, a NVENC bound; overflow waits
|
||||
in the accept queue). Each session keeps its own virtual output + encoder; they share the
|
||||
host-lifetime input/audio/mic services — i.e. **multiple devices viewing/controlling the same
|
||||
desktop** on kwin/mutter/wlroots. Validated live on the GNOME box: two clients connected at once
|
||||
→ **two independent Mutter virtual outputs (1280×720 + 1920×1080) streaming simultaneously**
|
||||
(39 MB + 48 MB). gamescope's *independent-desktops* (multi-user) isolation — per-session
|
||||
input/audio — is a follow-up.
|
||||
- **Apple client latency HUD** — `PunktfunkConnection.clockOffsetNs` (from the C-ABI getter) +
|
||||
`LatencyMeter` surface a skew-corrected **capture→client-receipt** p50/p95 in the macOS HUD: the
|
||||
first cross-machine latency the real Apple client reports. (Stage-1 `AVSampleBufferDisplayLayer`
|
||||
@@ -59,5 +67,7 @@ All three appliances advertise over mDNS (`_punktfunk._udp`) and require PIN pai
|
||||
See the [Roadmap](/docs/roadmap) for the ordered list. Near-term:
|
||||
- **True glass-to-glass**: Apple client present-stamp (decode→present) + host render→capture term.
|
||||
- **Apple stage-2 presenter** (`VTDecompressionSession` → `CAMetalLayer`).
|
||||
- **Mandatory PIN pairing + delegated pairing approval**; concurrent sessions.
|
||||
- **Mandatory PIN pairing + delegated pairing approval** (an already-paired device approves a new one).
|
||||
- **gamescope multi-user isolation** — per-session input/audio so concurrent sessions are independent
|
||||
desktops (the shared-desktop multi-view case landed).
|
||||
- **bazzite** kept up to date (currently offline; one rebuild behind).
|
||||
|
||||
Reference in New Issue
Block a user