From b53710da1ac514cfceea2eb44acd4f69c19a0375 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sun, 5 Jul 2026 16:41:06 +0000 Subject: [PATCH] =?UTF-8?q?feat(vdisplay):=20harden=20keep-alive=20reconne?= =?UTF-8?q?ct=20=E2=80=94=20same-client=20preempt,=20quit-skips-linger,=20?= =?UTF-8?q?configurable=20idle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On-glass testing (Test 2, KWin .116) surfaced that a reconnect within the QUIC idle-timeout window (~8s) lands on a fresh SECOND display instead of reusing the kept one: the old session was still Active (not yet Lingering), so the registry's keep-alive reuse (which only matches Lingering) skipped it and the old session kept streaming to nobody. Three fixes: #3 Same-client reconnect preempt (the real fix): admission::preempt_same_identity() lists a reconnecting client's OWN still-live session(s) (same cert fingerprint); serve_session signals their stop + waits the release grace BEFORE acquiring, so the zombie tears down → its display lingers → the reconnect REUSES it instead of making a second. Implements the "preempts downstream" the admission docs already promised. Independent of the mode_conflict policy; the pure core (same_identity_stops) is unit-tested. #2 Deliberate quit skips linger: a client that deliberately disconnects closes the QUIC connection with QUIT_CLOSE_CODE (0x51, shared in core::quic); the host reads the ApplicationClosed reason and tears the display down immediately (registry release() gained force_immediate → Linger::Immediate; multi-session-safe via the pure lifecycle machine), while a bare disconnect still lingers for reconnect. Threaded via a session quit flag → the DisplayLease. NativeClient::disconnect_quit() + punktfunk-probe --quit drive it; GameStream (Quit App / h_cancel) is a documented follow-up. #1 Configurable disconnect-detection latency: the QUIC control-connection idle timeout (stream_transport, 8s default) is host-tunable via --idle-timeout-ms / PUNKTFUNK_IDLE_TIMEOUT_MS, clamped >=1s with a keep-alive that scales to it so a live session never false-closes. Default unchanged (8s stays load-bearing for the Windows IDD-push reconnect flow). Workspace check + 63 core / 215 host / 47 vdisplay tests green; clippy clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- clients/probe/src/main.rs | 14 ++- crates/punktfunk-core/src/client.rs | 29 +++++- crates/punktfunk-core/src/quic.rs | 58 +++++++++--- .../punktfunk-host/src/gamestream/stream.rs | 4 + crates/punktfunk-host/src/main.rs | 7 ++ crates/punktfunk-host/src/punktfunk1.rs | 93 +++++++++++++++++-- .../punktfunk-host/src/vdisplay/admission.rs | 39 ++++++++ .../punktfunk-host/src/vdisplay/registry.rs | 57 +++++++++--- include/punktfunk_core.h | 9 ++ 9 files changed, 272 insertions(+), 38 deletions(-) diff --git a/clients/probe/src/main.rs b/clients/probe/src/main.rs index 99c47cf..7da9149 100644 --- a/clients/probe/src/main.rs +++ b/clients/probe/src/main.rs @@ -73,6 +73,10 @@ struct Args { /// `--rich-input-test` — drive the DualSense touchpad + motion over 0xCC (host needs /// `PUNKTFUNK_GAMEPAD=dualsense`); also logs the 0xCD HID-output feedback that comes back. rich_input_test: bool, + /// `--quit` — close the connection with the deliberate-quit code (`QUIT_CLOSE_CODE`) at end of + /// stream, so the host tears its virtual display down immediately (skips keep-alive linger). A + /// bare exit closes with code 0 → the host lingers for a reconnect. Tests the #2 quit path. + quit: bool, pin: Option<[u8; 32]>, /// `--remode WxHxFPS:SECS` — request this mode SECS seconds into the stream. remode: Option<(Mode, u32)>, @@ -211,6 +215,7 @@ fn parse_args() -> Args { mic_burst: argv.iter().any(|a| a == "--mic-burst"), touch_test: argv.iter().any(|a| a == "--touch-test"), rich_input_test: argv.iter().any(|a| a == "--rich-input-test"), + quit: argv.iter().any(|a| a == "--quit"), pin, remode, pair: get("--pair").map(String::from), @@ -1208,7 +1213,14 @@ async fn session(args: Args) -> Result<()> { } } - conn.close(0u32.into(), b"done"); + // `--quit` closes with the deliberate-quit code so the host skips the keep-alive linger; a normal + // exit uses code 0 (an unwanted-disconnect close → the host lingers for a reconnect). + let close_code = if args.quit { + punktfunk_core::quic::QUIT_CLOSE_CODE + } else { + 0 + }; + conn.close(close_code.into(), b"done"); result } diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 91bf531..c18bf4b 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -179,6 +179,10 @@ pub struct NativeClient { /// Speed-test accumulator, shared with the data-plane pump + control task. probe: Arc>, shutdown: Arc, + /// Deliberate-quit flag: [`NativeClient::disconnect_quit`] sets it, so the worker closes the QUIC + /// connection with [`crate::quic::QUIT_CLOSE_CODE`] (a user "stop") instead of code 0 — telling the + /// host to skip the keep-alive linger. A plain drop leaves it false → an unwanted-disconnect close. + quit: Arc, /// Cumulative count of access units the reassembler gave up on (FEC couldn't recover), mirrored /// from the data-plane pump's `Session`. A client video loop watches this for increases to request /// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss @@ -331,6 +335,7 @@ impl NativeClient { let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel::(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); let shutdown = Arc::new(AtomicBool::new(false)); + let quit = Arc::new(AtomicBool::new(false)); let mode_slot = Arc::new(std::sync::Mutex::new(mode)); let probe = Arc::new(Mutex::new(ProbeState::default())); let frames_dropped = Arc::new(AtomicU64::new(0)); @@ -338,6 +343,7 @@ impl NativeClient { let host = host.to_string(); let shutdown_w = shutdown.clone(); + let quit_w = quit.clone(); let mode_slot_w = mode_slot.clone(); let probe_w = probe.clone(); let frames_dropped_w = frames_dropped.clone(); @@ -388,6 +394,7 @@ impl NativeClient { ctrl_tx: ctrl_tx_pump, ready_tx, shutdown: shutdown_w, + quit: quit_w, mode_slot: mode_slot_w, probe: probe_w, frames_dropped: frames_dropped_w, @@ -430,6 +437,7 @@ impl NativeClient { ctrl_tx, probe, shutdown, + quit, worker: Some(worker), frames_dropped, hot_tids, @@ -764,6 +772,15 @@ impl NativeClient { .send(rich) .map_err(|_| PunktfunkError::Closed) } + + /// Signal a **deliberate quit** (a user "stop", not a network drop): the worker closes the QUIC + /// connection with [`crate::quic::QUIT_CLOSE_CODE`] instead of code 0, so the host tears the + /// session's virtual display down immediately and skips the keep-alive linger. Then requests + /// shutdown. A plain `drop` (without this) closes with code 0 → the host lingers for a reconnect. + pub fn disconnect_quit(&self) { + self.quit.store(true, Ordering::SeqCst); + self.shutdown.store(true, Ordering::SeqCst); + } } impl Drop for NativeClient { @@ -802,6 +819,8 @@ struct WorkerArgs { ctrl_tx: tokio::sync::mpsc::UnboundedSender, ready_tx: std::sync::mpsc::Sender>, shutdown: Arc, + /// Deliberate-quit flag (see [`NativeClient::quit`]): the worker closes with the quit code if set. + quit: Arc, mode_slot: Arc>, probe: Arc>, frames_dropped: Arc, @@ -838,6 +857,7 @@ async fn worker_main(args: WorkerArgs) { ctrl_tx, ready_tx, shutdown, + quit, mode_slot, probe, frames_dropped, @@ -1210,5 +1230,12 @@ async fn worker_main(args: WorkerArgs) { }) .await; - conn.close(0u32.into(), b"client closed"); + // Deliberate quit (a user "stop") closes with the quit code → the host skips the keep-alive + // linger; a plain drop / disconnect closes with 0 → the host lingers so a reconnect can resume. + let close_code = if quit.load(Ordering::SeqCst) { + crate::quic::QUIT_CLOSE_CODE + } else { + 0 + }; + conn.close(close_code.into(), b"client closed"); } diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 8126a2d..ff89707 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -122,6 +122,13 @@ pub const VIDEO_CAP_444: u8 = 0x04; /// stage. Purely observability — never changes what the host encodes. pub const VIDEO_CAP_HOST_TIMING: u8 = 0x08; +/// QUIC application error code a punktfunk/1 client closes the control connection with on a +/// **deliberate quit** (a user "stop", not a network drop). The host reads it off the connection's +/// `ApplicationClosed` reason and tears the session's virtual display down immediately, skipping the +/// keep-alive linger; any other close reason (idle timeout, reset, a bare code 0) still lingers so a +/// reconnect can resume. Shared so host + every client agree on the code. +pub const QUIT_CLOSE_CODE: u32 = 0x51; + /// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software** /// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST /// advertise this. @@ -1743,20 +1750,31 @@ pub mod endpoint { /// every `KEEP_ALIVE` keeps the path warm. The interval sits well under `MAX_IDLE` so /// several keepalives can be lost back-to-back (a wifi roam, a brief blip) without a false /// close, while a genuinely dead peer is still detected within `MAX_IDLE`. + /// The default control-connection idle timeout (disconnect-detection latency). A vanished client + /// is declared dead within this window — the Windows IDD-push path needs it short so a RECONNECT + /// recreates a fresh virtual monitor instead of joining the still-lingering old session; the Linux + /// path pairs it with the same-client reconnect preempt. Host-tunable via `server_with_identity_idle`. + pub const DEFAULT_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(8); + fn stream_transport() -> Arc { + stream_transport_idle(DEFAULT_IDLE_TIMEOUT) + } + + /// Transport config with a caller-chosen idle timeout (disconnect-detection latency). The + /// keep-alive interval tracks it at half the idle window (capped at the default 4s), so a live + /// path is PINGed at least twice per window and a single lost PING (wifi roam / brief blip) won't + /// false-close. `idle` is clamped to a ≥1s floor so a misconfigured tiny value can't tear live + /// sessions down. Active sessions are unaffected either way: video keeps the connection live and + /// the keep-alive holds it open through quiet control periods. + fn stream_transport_idle(idle: std::time::Duration) -> Arc { use std::time::Duration; - // 8s idle (was 20s): a vanished client is declared dead within 8s instead of 20, so its - // session tears down promptly — which the Windows IDD-push path needs so a RECONNECT recreates - // a fresh virtual monitor (a reused monitor's IddCx swap-chain dies) instead of joining the - // still-lingering old session. Active sessions are unaffected: video keeps the connection live, - // and the 4s keep-alive holds it open through quiet control periods. - const MAX_IDLE: Duration = Duration::from_secs(8); - const KEEP_ALIVE: Duration = Duration::from_secs(4); + let idle = idle.max(Duration::from_secs(1)); + let keep_alive = (idle / 2).min(Duration::from_secs(4)); let mut t = quinn::TransportConfig::default(); t.max_idle_timeout(Some( - quinn::IdleTimeout::try_from(MAX_IDLE).expect("8s is a valid QUIC idle timeout"), + quinn::IdleTimeout::try_from(idle).expect("clamped idle timeout is a valid QUIC value"), )); - t.keep_alive_interval(Some(KEEP_ALIVE)); + t.keep_alive_interval(Some(keep_alive)); Arc::new(t) } @@ -1767,23 +1785,36 @@ pub mod endpoint { .map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?; let cert_der = rustls::pki_types::CertificateDer::from(cert.cert); let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); - server_from_der(cert_der, key_der.into(), addr) + server_from_der(cert_der, key_der.into(), addr, DEFAULT_IDLE_TIMEOUT) } /// Server endpoint from a persisted PEM identity (certificate + PKCS#8 private key) — /// the host's long-lived self-signed cert, so the fingerprint clients pin is stable - /// across restarts. + /// across restarts. Uses the [`DEFAULT_IDLE_TIMEOUT`]; see [`server_with_identity_idle`] to tune it. pub fn server_with_identity( addr: std::net::SocketAddr, cert_pem: &str, key_pem: &str, + ) -> anyhow_result::Result { + server_with_identity_idle(addr, cert_pem, key_pem, DEFAULT_IDLE_TIMEOUT) + } + + /// Like [`server_with_identity`] but with a host-chosen control-connection idle timeout — the + /// disconnect-detection latency (how long a vanished client takes to be declared dead). Shorter = + /// faster teardown/linger of a dropped session; the value is clamped to a ≥1s floor and its + /// keep-alive scales with it so a live session never false-closes. + pub fn server_with_identity_idle( + addr: std::net::SocketAddr, + cert_pem: &str, + key_pem: &str, + idle: std::time::Duration, ) -> anyhow_result::Result { use rustls::pki_types::pem::PemObject; let cert_der = rustls::pki_types::CertificateDer::from_pem_slice(cert_pem.as_bytes()) .map_err(|e| anyhow_result::Error::msg(format!("cert pem: {e}")))?; let key_der = rustls::pki_types::PrivateKeyDer::from_pem_slice(key_pem.as_bytes()) .map_err(|e| anyhow_result::Error::msg(format!("key pem: {e}")))?; - server_from_der(cert_der, key_der, addr) + server_from_der(cert_der, key_der, addr, idle) } /// Fixed ALPN for the punktfunk/1 QUIC handshake. Pinning it rejects a cross-protocol peer at the @@ -1796,6 +1827,7 @@ pub mod endpoint { cert_der: rustls::pki_types::CertificateDer<'static>, key_der: rustls::pki_types::PrivateKeyDer<'static>, addr: std::net::SocketAddr, + idle: std::time::Duration, ) -> anyhow_result::Result { let _ = rustls::crypto::ring::default_provider().install_default(); // Client auth is OFFERED but optional: a client that presents its self-signed @@ -1810,7 +1842,7 @@ pub mod endpoint { let quic_cfg = quinn::crypto::rustls::QuicServerConfig::try_from(rustls_cfg) .map_err(|e| anyhow_result::Error::msg(format!("quic server config: {e}")))?; let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_cfg)); - server_config.transport_config(stream_transport()); // keep-alive — see stream_transport + server_config.transport_config(stream_transport_idle(idle)); // keep-alive — see stream_transport_idle Ok(quinn::Endpoint::server(server_config, addr)?) } diff --git a/crates/punktfunk-host/src/gamestream/stream.rs b/crates/punktfunk-host/src/gamestream/stream.rs index 2380a0f..f786850 100644 --- a/crates/punktfunk-host/src/gamestream/stream.rs +++ b/crates/punktfunk-host/src/gamestream/stream.rs @@ -293,6 +293,10 @@ fn open_gs_virtual_source( height: cfg.height, refresh_hz: cfg.fps, }, + // GameStream's deliberate quit is the Moonlight "Quit App" (nvhttp `h_cancel`), not a QUIC + // close code — wiring it to skip-linger is a follow-up, so this path keeps normal keep-alive + // (a fresh, never-set flag). + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), ) .context("create virtual output at client resolution")?; // HDR: pass the negotiated `cfg.hdr` (client asked for HDR AND the host can deliver it). On the diff --git a/crates/punktfunk-host/src/main.rs b/crates/punktfunk-host/src/main.rs index dc45d41..725e26d 100644 --- a/crates/punktfunk-host/src/main.rs +++ b/crates/punktfunk-host/src/main.rs @@ -425,6 +425,13 @@ fn real_main() -> Result<()> { .map(str::to_string) .or_else(|| std::env::var("PUNKTFUNK_DATA_PORT").ok()) .and_then(|s| s.parse().ok()), + // Disconnect-detection latency (QUIC control-connection idle timeout): --idle-timeout-ms + // overrides PUNKTFUNK_IDLE_TIMEOUT_MS; absent = the core default (8s). + idle_timeout: get("--idle-timeout-ms") + .and_then(|s| s.trim().parse::().ok()) + .filter(|&ms| ms > 0) + .map(std::time::Duration::from_millis) + .or_else(punktfunk1::idle_timeout_from_env), }) } // Windows service control: install/uninstall/start/stop/status + the SCM `run` entry point. diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index a8485f9..88a3460 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -84,6 +84,12 @@ pub struct Punktfunk1Options { /// fixed port only fits one data plane at a time, so a concurrent session finding it busy /// falls back to random + hole-punch (see [`bind_data_socket`]). pub data_port: Option, + /// Control-connection idle timeout — the **disconnect-detection latency** (how long a vanished + /// client takes to be declared dead, which bounds how fast a dropped session tears down / lingers + /// and thus the reconnect-overlap window). `None` = the core default (8s). Set from + /// `PUNKTFUNK_IDLE_TIMEOUT_MS`; clamped to a ≥1s floor with a keep-alive that scales to it so a + /// live session never false-closes. + pub idle_timeout: Option, } /// Bind the per-session data-plane UDP socket, honoring [`Punktfunk1Options::data_port`]. Returns @@ -185,6 +191,17 @@ pub(crate) struct NativeServe { /// overflow clients wait in the accept queue. Override with `--max-concurrent`. pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 4; +/// The control-connection idle timeout (disconnect-detection latency) from +/// `PUNKTFUNK_IDLE_TIMEOUT_MS`; `None` (unset/invalid/zero) = the core default (8s). Clamped +/// downstream to a ≥1s floor with a keep-alive that scales to it, so a live session never false-closes. +pub(crate) fn idle_timeout_from_env() -> Option { + std::env::var("PUNKTFUNK_IDLE_TIMEOUT_MS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .filter(|&ms| ms > 0) + .map(std::time::Duration::from_millis) +} + pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options { Punktfunk1Options { port: cfg.port, @@ -198,6 +215,7 @@ pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options { pairing_pin: None, paired_store: None, data_port: cfg.data_port, + idle_timeout: idle_timeout_from_env(), } } @@ -211,10 +229,11 @@ pub(crate) async fn serve( .context("load host identity (~/.config/punktfunk)")?; let fingerprint = endpoint::fingerprint_of_pem(&identity.cert_pem) .map_err(|e| anyhow!("cert fingerprint: {e}"))?; - let ep = endpoint::server_with_identity( + let ep = endpoint::server_with_identity_idle( ([0, 0, 0, 0], opts.port).into(), &identity.cert_pem, &identity.key_pem, + opts.idle_timeout.unwrap_or(endpoint::DEFAULT_IDLE_TIMEOUT), ) .map_err(|e| anyhow!("QUIC server endpoint: {e}"))?; tracing::info!( @@ -379,6 +398,13 @@ const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10 /// code lets a client tell "host busy" apart from a transport failure. const REJECT_BUSY_CODE: u32 = 0x42; +/// QUIC application error code a client closes with on a **deliberate quit** (a user "stop", not a +/// network drop). The host reads it off the connection's `ApplicationClosed` reason and tears the +/// session's virtual display down IMMEDIATELY, skipping the keep-alive linger — an unwanted disconnect +/// (idle timeout / reset / any other code) still lingers so a reconnect can resume. Shared with the +/// clients via `punktfunk_core::quic::QUIT_CLOSE_CODE`. +const QUIT_CODE: u32 = punktfunk_core::quic::QUIT_CLOSE_CODE; + /// Encoder bitrate (kbps) the host falls back to when the client expresses no preference /// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its /// link (e.g. after a speed test) requests an explicit rate instead. @@ -730,8 +756,32 @@ async fn serve_session( // A same-client reconnect never conflicts. THIS session registers in the live set once its // data plane is up (below the handshake), so a later client can see + steal it. { - use crate::vdisplay::admission::{admit, Admission}; - match admit(endpoint::peer_fingerprint(&conn)) { + use crate::vdisplay::admission::{admit, preempt_same_identity, Admission}; + let peer_fp = endpoint::peer_fingerprint(&conn); + + // Same-client RECONNECT preempt (design §5.3 "preempts downstream"): if THIS client + // already has a live session, it's the zombie of an unwanted disconnect whose QUIC idle + // timer hasn't fired yet (detection lags a drop by up to `max_idle_timeout`). Signal it to + // stop and give it the release grace so it tears its display down — which, keep-alive on, + // lingers — and THIS reconnect REUSES that kept display below instead of landing on a + // fresh SECOND one. Independent of the mode_conflict arm (it's our OWN prior session, not + // a conflict with a different client), and it runs before we register ourselves so we + // never signal our own stop flag. + let own_zombies = preempt_same_identity(peer_fp); + if !own_zombies.is_empty() { + tracing::info!( + count = own_zombies.len(), + "reconnect: preempting this client's own zombie session(s) so the kept display is reused" + ); + for z in &own_zombies { + z.store(true, Ordering::SeqCst); + } + // Same blind release grace the steal path uses — lets the zombie's loops notice the + // stop flag and drop its display (→ Lingering) before we acquire below. + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + } + + match admit(peer_fp) { Admission::Separate => {} Admission::Join(m) => { tracing::info!( @@ -1131,11 +1181,21 @@ async fn serve_session( // Stop signal: stream duration elapsed or the client went away. let stop = Arc::new(AtomicBool::new(false)); + // Deliberate-quit signal: set (before `stop`, so the display lease reads it on teardown) when the + // client closed the connection with `QUIT_CODE` — a user "stop", which skips the keep-alive linger. + // A bare disconnect / idle timeout leaves it false → the display lingers for a reconnect. + let quit = Arc::new(AtomicBool::new(false)); { let stop = stop.clone(); + let quit = quit.clone(); let conn = conn.clone(); tokio::spawn(async move { - conn.closed().await; + let reason = conn.closed().await; + if matches!(&reason, quinn::ConnectionError::ApplicationClosed(ac) + if ac.error_code == quinn::VarInt::from_u32(QUIT_CODE)) + { + quit.store(true, Ordering::SeqCst); + } stop.store(true, Ordering::SeqCst); }); } @@ -1254,6 +1314,7 @@ async fn serve_session( crate::encode::ChromaFormat::Yuv420 }; let stop_stream = stop.clone(); + let quit_stream = quit.clone(); let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target let conn_stream = conn.clone(); // for sending the source's real HDR metadata (0xCE) mid-stream // Per-AU host-timing emission (0xCF): only when the client advertised the cap bit. All @@ -1325,6 +1386,7 @@ async fn serve_session( mode, seconds, stop: stop_stream, + quit: quit_stream, reconfig: reconfig_rx, keyframe: keyframe_rx, compositor, @@ -2864,6 +2926,9 @@ struct SessionContext { seconds: u32, /// Session stop flag (set on disconnect / reconnect-preempt). stop: Arc, + /// Deliberate-quit flag (set when the client closed with `QUIT_CODE`): the display lease reads it + /// on teardown to skip the keep-alive linger for a user "stop" (vs. an unwanted disconnect). + quit: Arc, /// Accepted mid-stream mode switches — the pipeline is rebuilt at the new mode. reconfig: std::sync::mpsc::Receiver, /// Client decode-recovery keyframe requests. @@ -2923,6 +2988,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { mode, seconds, stop, + quit, reconfig, keyframe, compositor, @@ -2973,7 +3039,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { let _idd_setup_guard = (plan.capture == crate::session_plan::CaptureBackend::IddPush) .then(|| crate::vdisplay::manager::vdm().begin_idd_setup(stop.clone())); let (mut capturer, mut enc, mut frame, mut interval) = - build_pipeline_with_retry(&mut vd, mode, bitrate_kbps, bit_depth, plan)?; + build_pipeline_with_retry(&mut vd, mode, bitrate_kbps, bit_depth, plan, &quit)?; // Setup done — release the IDD-push setup lock so the next reconnect can begin (and preempt us). #[cfg(target_os = "windows")] drop(_idd_setup_guard); @@ -3141,6 +3207,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { bitrate_kbps, bit_depth, plan, + &quit, )?; Ok((new_vd, pipe)) })(); @@ -3184,7 +3251,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { // Build the new pipeline BEFORE dropping the old one: the host already acked // the switch as accepted, so a rebuild failure must not kill an otherwise // healthy session — keep streaming the current mode and log instead. - match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth, plan) { + match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth, plan, &quit) { Ok(next_pipe) => { (capturer, enc, frame, interval) = next_pipe; cur_mode = new_mode; @@ -3305,6 +3372,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { bitrate_kbps, bit_depth, plan, + &quit, ) { Ok(p) => break p, Err(e2) => { @@ -3531,6 +3599,7 @@ fn build_pipeline_with_retry( bitrate_kbps: u32, bit_depth: u8, plan: crate::session_plan::SessionPlan, + quit: &Arc, ) -> Result { // ~10s first-frame wait per attempt. 8 gives a ~90s budget for the SLOW case: a host-managed // gamescope session cold-starting Steam Big Picture (the SteamOS/Bazzite takeover) can take @@ -3557,7 +3626,7 @@ fn build_pipeline_with_retry( const MAX_ATTEMPTS: u32 = 8; let mut backoff = std::time::Duration::from_millis(500); for attempt in 1..=MAX_ATTEMPTS { - match build_pipeline(vd, mode, bitrate_kbps, bit_depth, plan) { + match build_pipeline(vd, mode, bitrate_kbps, bit_depth, plan, quit) { Ok(pipe) => { if attempt > 1 { tracing::info!(attempt, "pipeline up after retry"); @@ -3620,12 +3689,15 @@ fn build_pipeline( bitrate_kbps: u32, bit_depth: u8, plan: crate::session_plan::SessionPlan, + quit: &Arc, ) -> Result { // Acquire through the registry (design/display-management.md): on Linux this pools the display // for keep-alive (reuse a kept one, or create + keep the backend's keepalive so it outlives the // session per policy); on Windows it delegates to `vd.create` (the manager already leases). The - // returned `VirtualOutput`'s keepalive is a registry lease — the capturer holds it as before. - let vout = crate::vdisplay::registry::acquire(vd, mode).context("create virtual output")?; + // returned `VirtualOutput`'s keepalive is a registry lease — the capturer holds it as before. The + // `quit` flag rides into the lease so a deliberate-quit teardown skips the keep-alive linger. + let vout = crate::vdisplay::registry::acquire(vd, mode, quit.clone()) + .context("create virtual output")?; // The backend reports the refresh it actually achieved in `preferred_mode.2` (KWin may cap a // virtual output at 60 Hz if the custom-mode install was rejected). Pace the encoder + frame // clock to that, not the requested rate, so we don't emit phantom duplicate frames over a @@ -3933,6 +4005,7 @@ mod tests { pairing_pin: None, paired_store: None, data_port: None, + idle_timeout: None, }) }); std::thread::sleep(std::time::Duration::from_millis(500)); @@ -4128,6 +4201,7 @@ mod tests { pairing_pin: None, paired_store: None, // unused: the shared `np` IS the store handle data_port: None, + idle_timeout: None, }, 0, // no mgmt API in this test → advertise no `mgmt` mDNS port np_host, @@ -4227,6 +4301,7 @@ mod tests { pairing_pin: Some("4321".into()), paired_store: Some(test_paired_path()), data_port: None, + idle_timeout: None, }) }); std::thread::sleep(std::time::Duration::from_millis(500)); diff --git a/crates/punktfunk-host/src/vdisplay/admission.rs b/crates/punktfunk-host/src/vdisplay/admission.rs index 1763793..bf318b3 100644 --- a/crates/punktfunk-host/src/vdisplay/admission.rs +++ b/crates/punktfunk-host/src/vdisplay/admission.rs @@ -115,6 +115,31 @@ pub fn admit(req_identity: Option<[u8; 32]>) -> Admission { decide(effective_conflict(), req_identity, &table().lock().unwrap()) } +/// Pure core of [`preempt_same_identity`]: the stop flags of live sessions owned by the SAME client +/// as `req_identity` (its own zombies). Testable over a slice (the public fn locks the global table). +fn same_identity_stops( + req_identity: Option<[u8; 32]>, + live: &[LiveSession], +) -> Vec> { + live.iter() + .filter(|s| same_client(s.identity, req_identity)) + .map(|s| Arc::clone(&s.stop)) + .collect() +} + +/// Preempt this reconnecting client's OWN still-live session(s). A client has at most one live +/// session, so a new connection from an already-registered identity is a **reconnect** — the old +/// session is a zombie whose QUIC idle timer hasn't fired yet (an unwanted disconnect is only +/// declared dead after `max_idle_timeout`, ~seconds later). Return its stop flag(s) so the caller +/// signals them and waits the release grace: the zombie tears its display down, which (keep-alive on) +/// lingers, and THIS reconnect **reuses** that kept display instead of landing on a fresh SECOND one +/// (the "thrown onto a second display while the old one keeps streaming" bug). Anonymous (`None`) +/// never matches — same limitation as `steal`/`reject`. Call this BEFORE [`admit`] and before this +/// session registers itself, so it only ever signals a *prior* session's flag, never its own. +pub fn preempt_same_identity(req_identity: Option<[u8; 32]>) -> Vec> { + same_identity_stops(req_identity, &table().lock().unwrap()) +} + /// Register a now-admitted, live session; the returned guard removes it on drop (session end). Call /// AFTER [`admit`] (so a session never conflicts with itself) and once the mode + stop flag are known. pub fn register( @@ -225,6 +250,20 @@ mod tests { )); } + #[test] + fn same_identity_stops_targets_own_zombie_only() { + let live = [ + sess(Some(1), (2560, 1440, 60)), // this client's prior (zombie) session + sess(Some(2), (1920, 1080, 60)), // a different client + ]; + // Reconnecting as client 1 → its own zombie's stop is returned (to preempt), not client 2's. + assert_eq!(same_identity_stops(fp(1), &live).len(), 1); + // A client with no prior session (fp 3) has nothing of its own to preempt. + assert_eq!(same_identity_stops(fp(3), &live).len(), 0); + // Anonymous never matches — we can't prove it's the same client. + assert_eq!(same_identity_stops(None, &live).len(), 0); + } + #[test] fn join_targets_the_oldest_other_session() { let live = [ diff --git a/crates/punktfunk-host/src/vdisplay/registry.rs b/crates/punktfunk-host/src/vdisplay/registry.rs index 0985290..ed602be 100644 --- a/crates/punktfunk-host/src/vdisplay/registry.rs +++ b/crates/punktfunk-host/src/vdisplay/registry.rs @@ -81,16 +81,23 @@ fn topology_str() -> String { /// /// Windows delegates to the [`manager`](super::manager) via `vd.create` (unchanged); Linux uses the /// pool below; other platforms pass through. +/// `quit` is the session's deliberate-quit flag: when the session ends with it set (the client closed +/// with the quit application code — a user "stop", not a network drop), the display is torn down +/// **immediately**, skipping the keep-alive linger. A bare disconnect leaves it `false` → normal linger. pub fn acquire( vd: &mut Box, mode: super::Mode, + quit: std::sync::Arc, ) -> Result { #[cfg(target_os = "linux")] { - linux::acquire(vd, mode) + linux::acquire(vd, mode, quit) } #[cfg(not(target_os = "linux"))] { + // Windows leases in the manager (its own linger); the deliberate-quit skip is not wired + // through there yet, so the flag is accepted but unused off Linux. + let _ = quit; vd.create(mode) } } @@ -163,8 +170,8 @@ pub fn release(slot: Option) -> usize { #[cfg(target_os = "linux")] mod linux { - use std::sync::atomic::{AtomicU64, Ordering}; - use std::sync::{Mutex, Once, OnceLock}; + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; + use std::sync::{Arc, Mutex, Once, OnceLock}; use std::time::{Duration, Instant}; use anyhow::Result; @@ -304,16 +311,21 @@ mod linux { node_id: u32, preferred_mode: Option<(u32, u32, u32)>, gen: u64, + quit: Arc, ) -> VirtualOutput { VirtualOutput { node_id, remote_fd: None, preferred_mode, - keepalive: Box::new(DisplayLease { gen }), + keepalive: Box::new(DisplayLease { gen, quit }), } } - pub(super) fn acquire(vd: &mut Box, mode: Mode) -> Result { + pub(super) fn acquire( + vd: &mut Box, + mode: Mode, + quit: Arc, + ) -> Result { ensure_timer(); let backend = vd.name(); let r = reg(); @@ -343,7 +355,7 @@ mod linux { e.life.acquire(); let gen = r.gen.fetch_add(1, Ordering::Relaxed); e.gen = gen; - let out = output_for(e.node_id, e.preferred_mode, gen); + let out = output_for(e.node_id, e.preferred_mode, gen, quit); tracing::info!( backend, node_id = e.node_id, @@ -443,15 +455,21 @@ mod linux { if (position.x, position.y) != (0, 0) { vd.apply_position(position.x, position.y); } - Ok(output_for(node_id, preferred_mode, gen)) + Ok(output_for(node_id, preferred_mode, gen, quit)) } /// The [`DisplayLease`] `Drop` path: release the session's hold on the pooled display. The /// lifecycle machine decides linger / pin / teardown; a torn-down entry's keepalive drops *after* /// the lock is released. - fn release(gen: u64) { + fn release(gen: u64, force_immediate: bool) { let Some(r) = REG.get() else { return }; - let linger = linger(); + // A deliberate quit (the client closed with the quit code — a user "stop") tears the display + // down NOW, overriding the keep-alive linger; a bare disconnect honors the policy. + let linger = if force_immediate { + Linger::Immediate + } else { + linger() + }; let (torn_down, restore) = { let mut es = r.entries.lock().unwrap(); let Some(idx) = es.iter().position(|e| e.gen == gen) else { @@ -489,10 +507,17 @@ mod linux { restore(); } if let Some(e) = torn_down { - tracing::info!( - backend = e.backend, - "virtual display torn down (keep-alive off / released)" - ); + if force_immediate { + tracing::info!( + backend = e.backend, + "virtual display torn down (deliberate quit — keep-alive skipped)" + ); + } else { + tracing::info!( + backend = e.backend, + "virtual display torn down (keep-alive off / released)" + ); + } drop(e); // outside the lock — the keepalive Drop may block } } @@ -683,11 +708,15 @@ mod linux { /// registry hold; a stale lease (its entry was reused + re-stamped, or torn down) is a no-op. struct DisplayLease { gen: u64, + /// The session's deliberate-quit flag: set when the client closes with the quit application + /// code (a user "stop", not a network drop), so this lease's `Drop` tears the display down + /// immediately instead of lingering. `false` on a bare disconnect → normal keep-alive. + quit: Arc, } impl Drop for DisplayLease { fn drop(&mut self) { - release(self.gen); + release(self.gen, self.quit.load(Ordering::SeqCst)); } } diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index 4cbfc90..8773bb6 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -274,6 +274,15 @@ #define VIDEO_CAP_HOST_TIMING 8 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// QUIC application error code a punktfunk/1 client closes the control connection with on a +// **deliberate quit** (a user "stop", not a network drop). The host reads it off the connection's +// `ApplicationClosed` reason and tears the session's virtual display down immediately, skipping the +// keep-alive linger; any other close reason (idle timeout, reset, a bare code 0) still lingers so a +// reconnect can resume. Shared so host + every client agree on the code. +#define QUIT_CLOSE_CODE 81 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software** // encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST