feat(vdisplay): harden keep-alive reconnect — same-client preempt, quit-skips-linger, configurable idle
On-glass testing (Test 2, KWin .116) surfaced that a reconnect within the QUIC idle-timeout window (~8s) lands on a fresh SECOND display instead of reusing the kept one: the old session was still Active (not yet Lingering), so the registry's keep-alive reuse (which only matches Lingering) skipped it and the old session kept streaming to nobody. Three fixes: #3 Same-client reconnect preempt (the real fix): admission::preempt_same_identity() lists a reconnecting client's OWN still-live session(s) (same cert fingerprint); serve_session signals their stop + waits the release grace BEFORE acquiring, so the zombie tears down → its display lingers → the reconnect REUSES it instead of making a second. Implements the "preempts downstream" the admission docs already promised. Independent of the mode_conflict policy; the pure core (same_identity_stops) is unit-tested. #2 Deliberate quit skips linger: a client that deliberately disconnects closes the QUIC connection with QUIT_CLOSE_CODE (0x51, shared in core::quic); the host reads the ApplicationClosed reason and tears the display down immediately (registry release() gained force_immediate → Linger::Immediate; multi-session-safe via the pure lifecycle machine), while a bare disconnect still lingers for reconnect. Threaded via a session quit flag → the DisplayLease. NativeClient::disconnect_quit() + punktfunk-probe --quit drive it; GameStream (Quit App / h_cancel) is a documented follow-up. #1 Configurable disconnect-detection latency: the QUIC control-connection idle timeout (stream_transport, 8s default) is host-tunable via --idle-timeout-ms / PUNKTFUNK_IDLE_TIMEOUT_MS, clamped >=1s with a keep-alive that scales to it so a live session never false-closes. Default unchanged (8s stays load-bearing for the Windows IDD-push reconnect flow). Workspace check + 63 core / 215 host / 47 vdisplay tests green; clippy clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -73,6 +73,10 @@ struct Args {
|
|||||||
/// `--rich-input-test` — drive the DualSense touchpad + motion over 0xCC (host needs
|
/// `--rich-input-test` — drive the DualSense touchpad + motion over 0xCC (host needs
|
||||||
/// `PUNKTFUNK_GAMEPAD=dualsense`); also logs the 0xCD HID-output feedback that comes back.
|
/// `PUNKTFUNK_GAMEPAD=dualsense`); also logs the 0xCD HID-output feedback that comes back.
|
||||||
rich_input_test: bool,
|
rich_input_test: bool,
|
||||||
|
/// `--quit` — close the connection with the deliberate-quit code (`QUIT_CLOSE_CODE`) at end of
|
||||||
|
/// stream, so the host tears its virtual display down immediately (skips keep-alive linger). A
|
||||||
|
/// bare exit closes with code 0 → the host lingers for a reconnect. Tests the #2 quit path.
|
||||||
|
quit: bool,
|
||||||
pin: Option<[u8; 32]>,
|
pin: Option<[u8; 32]>,
|
||||||
/// `--remode WxHxFPS:SECS` — request this mode SECS seconds into the stream.
|
/// `--remode WxHxFPS:SECS` — request this mode SECS seconds into the stream.
|
||||||
remode: Option<(Mode, u32)>,
|
remode: Option<(Mode, u32)>,
|
||||||
@@ -211,6 +215,7 @@ fn parse_args() -> Args {
|
|||||||
mic_burst: argv.iter().any(|a| a == "--mic-burst"),
|
mic_burst: argv.iter().any(|a| a == "--mic-burst"),
|
||||||
touch_test: argv.iter().any(|a| a == "--touch-test"),
|
touch_test: argv.iter().any(|a| a == "--touch-test"),
|
||||||
rich_input_test: argv.iter().any(|a| a == "--rich-input-test"),
|
rich_input_test: argv.iter().any(|a| a == "--rich-input-test"),
|
||||||
|
quit: argv.iter().any(|a| a == "--quit"),
|
||||||
pin,
|
pin,
|
||||||
remode,
|
remode,
|
||||||
pair: get("--pair").map(String::from),
|
pair: get("--pair").map(String::from),
|
||||||
@@ -1208,7 +1213,14 @@ async fn session(args: Args) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.close(0u32.into(), b"done");
|
// `--quit` closes with the deliberate-quit code so the host skips the keep-alive linger; a normal
|
||||||
|
// exit uses code 0 (an unwanted-disconnect close → the host lingers for a reconnect).
|
||||||
|
let close_code = if args.quit {
|
||||||
|
punktfunk_core::quic::QUIT_CLOSE_CODE
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
conn.close(close_code.into(), b"done");
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -179,6 +179,10 @@ pub struct NativeClient {
|
|||||||
/// Speed-test accumulator, shared with the data-plane pump + control task.
|
/// Speed-test accumulator, shared with the data-plane pump + control task.
|
||||||
probe: Arc<Mutex<ProbeState>>,
|
probe: Arc<Mutex<ProbeState>>,
|
||||||
shutdown: Arc<AtomicBool>,
|
shutdown: Arc<AtomicBool>,
|
||||||
|
/// Deliberate-quit flag: [`NativeClient::disconnect_quit`] sets it, so the worker closes the QUIC
|
||||||
|
/// connection with [`crate::quic::QUIT_CLOSE_CODE`] (a user "stop") instead of code 0 — telling the
|
||||||
|
/// host to skip the keep-alive linger. A plain drop leaves it false → an unwanted-disconnect close.
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
/// Cumulative count of access units the reassembler gave up on (FEC couldn't recover), mirrored
|
/// Cumulative count of access units the reassembler gave up on (FEC couldn't recover), mirrored
|
||||||
/// from the data-plane pump's `Session`. A client video loop watches this for increases to request
|
/// from the data-plane pump's `Session`. A client video loop watches this for increases to request
|
||||||
/// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss
|
/// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss
|
||||||
@@ -331,6 +335,7 @@ impl NativeClient {
|
|||||||
let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel::<CtrlRequest>();
|
let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel::<CtrlRequest>();
|
||||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<Result<Negotiated>>();
|
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<Result<Negotiated>>();
|
||||||
let shutdown = Arc::new(AtomicBool::new(false));
|
let shutdown = Arc::new(AtomicBool::new(false));
|
||||||
|
let quit = Arc::new(AtomicBool::new(false));
|
||||||
let mode_slot = Arc::new(std::sync::Mutex::new(mode));
|
let mode_slot = Arc::new(std::sync::Mutex::new(mode));
|
||||||
let probe = Arc::new(Mutex::new(ProbeState::default()));
|
let probe = Arc::new(Mutex::new(ProbeState::default()));
|
||||||
let frames_dropped = Arc::new(AtomicU64::new(0));
|
let frames_dropped = Arc::new(AtomicU64::new(0));
|
||||||
@@ -338,6 +343,7 @@ impl NativeClient {
|
|||||||
|
|
||||||
let host = host.to_string();
|
let host = host.to_string();
|
||||||
let shutdown_w = shutdown.clone();
|
let shutdown_w = shutdown.clone();
|
||||||
|
let quit_w = quit.clone();
|
||||||
let mode_slot_w = mode_slot.clone();
|
let mode_slot_w = mode_slot.clone();
|
||||||
let probe_w = probe.clone();
|
let probe_w = probe.clone();
|
||||||
let frames_dropped_w = frames_dropped.clone();
|
let frames_dropped_w = frames_dropped.clone();
|
||||||
@@ -388,6 +394,7 @@ impl NativeClient {
|
|||||||
ctrl_tx: ctrl_tx_pump,
|
ctrl_tx: ctrl_tx_pump,
|
||||||
ready_tx,
|
ready_tx,
|
||||||
shutdown: shutdown_w,
|
shutdown: shutdown_w,
|
||||||
|
quit: quit_w,
|
||||||
mode_slot: mode_slot_w,
|
mode_slot: mode_slot_w,
|
||||||
probe: probe_w,
|
probe: probe_w,
|
||||||
frames_dropped: frames_dropped_w,
|
frames_dropped: frames_dropped_w,
|
||||||
@@ -430,6 +437,7 @@ impl NativeClient {
|
|||||||
ctrl_tx,
|
ctrl_tx,
|
||||||
probe,
|
probe,
|
||||||
shutdown,
|
shutdown,
|
||||||
|
quit,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
frames_dropped,
|
frames_dropped,
|
||||||
hot_tids,
|
hot_tids,
|
||||||
@@ -764,6 +772,15 @@ impl NativeClient {
|
|||||||
.send(rich)
|
.send(rich)
|
||||||
.map_err(|_| PunktfunkError::Closed)
|
.map_err(|_| PunktfunkError::Closed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Signal a **deliberate quit** (a user "stop", not a network drop): the worker closes the QUIC
|
||||||
|
/// connection with [`crate::quic::QUIT_CLOSE_CODE`] instead of code 0, so the host tears the
|
||||||
|
/// session's virtual display down immediately and skips the keep-alive linger. Then requests
|
||||||
|
/// shutdown. A plain `drop` (without this) closes with code 0 → the host lingers for a reconnect.
|
||||||
|
pub fn disconnect_quit(&self) {
|
||||||
|
self.quit.store(true, Ordering::SeqCst);
|
||||||
|
self.shutdown.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for NativeClient {
|
impl Drop for NativeClient {
|
||||||
@@ -802,6 +819,8 @@ struct WorkerArgs {
|
|||||||
ctrl_tx: tokio::sync::mpsc::UnboundedSender<CtrlRequest>,
|
ctrl_tx: tokio::sync::mpsc::UnboundedSender<CtrlRequest>,
|
||||||
ready_tx: std::sync::mpsc::Sender<Result<Negotiated>>,
|
ready_tx: std::sync::mpsc::Sender<Result<Negotiated>>,
|
||||||
shutdown: Arc<AtomicBool>,
|
shutdown: Arc<AtomicBool>,
|
||||||
|
/// Deliberate-quit flag (see [`NativeClient::quit`]): the worker closes with the quit code if set.
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
mode_slot: Arc<std::sync::Mutex<Mode>>,
|
mode_slot: Arc<std::sync::Mutex<Mode>>,
|
||||||
probe: Arc<Mutex<ProbeState>>,
|
probe: Arc<Mutex<ProbeState>>,
|
||||||
frames_dropped: Arc<AtomicU64>,
|
frames_dropped: Arc<AtomicU64>,
|
||||||
@@ -838,6 +857,7 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
ctrl_tx,
|
ctrl_tx,
|
||||||
ready_tx,
|
ready_tx,
|
||||||
shutdown,
|
shutdown,
|
||||||
|
quit,
|
||||||
mode_slot,
|
mode_slot,
|
||||||
probe,
|
probe,
|
||||||
frames_dropped,
|
frames_dropped,
|
||||||
@@ -1210,5 +1230,12 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
conn.close(0u32.into(), b"client closed");
|
// Deliberate quit (a user "stop") closes with the quit code → the host skips the keep-alive
|
||||||
|
// linger; a plain drop / disconnect closes with 0 → the host lingers so a reconnect can resume.
|
||||||
|
let close_code = if quit.load(Ordering::SeqCst) {
|
||||||
|
crate::quic::QUIT_CLOSE_CODE
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
conn.close(close_code.into(), b"client closed");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -122,6 +122,13 @@ pub const VIDEO_CAP_444: u8 = 0x04;
|
|||||||
/// stage. Purely observability — never changes what the host encodes.
|
/// stage. Purely observability — never changes what the host encodes.
|
||||||
pub const VIDEO_CAP_HOST_TIMING: u8 = 0x08;
|
pub const VIDEO_CAP_HOST_TIMING: u8 = 0x08;
|
||||||
|
|
||||||
|
/// QUIC application error code a punktfunk/1 client closes the control connection with on a
|
||||||
|
/// **deliberate quit** (a user "stop", not a network drop). The host reads it off the connection's
|
||||||
|
/// `ApplicationClosed` reason and tears the session's virtual display down immediately, skipping the
|
||||||
|
/// keep-alive linger; any other close reason (idle timeout, reset, a bare code 0) still lingers so a
|
||||||
|
/// reconnect can resume. Shared so host + every client agree on the code.
|
||||||
|
pub const QUIT_CLOSE_CODE: u32 = 0x51;
|
||||||
|
|
||||||
/// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software**
|
/// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software**
|
||||||
/// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST
|
/// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST
|
||||||
/// advertise this.
|
/// advertise this.
|
||||||
@@ -1743,20 +1750,31 @@ pub mod endpoint {
|
|||||||
/// every `KEEP_ALIVE` keeps the path warm. The interval sits well under `MAX_IDLE` so
|
/// every `KEEP_ALIVE` keeps the path warm. The interval sits well under `MAX_IDLE` so
|
||||||
/// several keepalives can be lost back-to-back (a wifi roam, a brief blip) without a false
|
/// several keepalives can be lost back-to-back (a wifi roam, a brief blip) without a false
|
||||||
/// close, while a genuinely dead peer is still detected within `MAX_IDLE`.
|
/// close, while a genuinely dead peer is still detected within `MAX_IDLE`.
|
||||||
|
/// The default control-connection idle timeout (disconnect-detection latency). A vanished client
|
||||||
|
/// is declared dead within this window — the Windows IDD-push path needs it short so a RECONNECT
|
||||||
|
/// recreates a fresh virtual monitor instead of joining the still-lingering old session; the Linux
|
||||||
|
/// path pairs it with the same-client reconnect preempt. Host-tunable via `server_with_identity_idle`.
|
||||||
|
pub const DEFAULT_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(8);
|
||||||
|
|
||||||
fn stream_transport() -> Arc<quinn::TransportConfig> {
|
fn stream_transport() -> Arc<quinn::TransportConfig> {
|
||||||
|
stream_transport_idle(DEFAULT_IDLE_TIMEOUT)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transport config with a caller-chosen idle timeout (disconnect-detection latency). The
|
||||||
|
/// keep-alive interval tracks it at half the idle window (capped at the default 4s), so a live
|
||||||
|
/// path is PINGed at least twice per window and a single lost PING (wifi roam / brief blip) won't
|
||||||
|
/// false-close. `idle` is clamped to a ≥1s floor so a misconfigured tiny value can't tear live
|
||||||
|
/// sessions down. Active sessions are unaffected either way: video keeps the connection live and
|
||||||
|
/// the keep-alive holds it open through quiet control periods.
|
||||||
|
fn stream_transport_idle(idle: std::time::Duration) -> Arc<quinn::TransportConfig> {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
// 8s idle (was 20s): a vanished client is declared dead within 8s instead of 20, so its
|
let idle = idle.max(Duration::from_secs(1));
|
||||||
// session tears down promptly — which the Windows IDD-push path needs so a RECONNECT recreates
|
let keep_alive = (idle / 2).min(Duration::from_secs(4));
|
||||||
// a fresh virtual monitor (a reused monitor's IddCx swap-chain dies) instead of joining the
|
|
||||||
// still-lingering old session. Active sessions are unaffected: video keeps the connection live,
|
|
||||||
// and the 4s keep-alive holds it open through quiet control periods.
|
|
||||||
const MAX_IDLE: Duration = Duration::from_secs(8);
|
|
||||||
const KEEP_ALIVE: Duration = Duration::from_secs(4);
|
|
||||||
let mut t = quinn::TransportConfig::default();
|
let mut t = quinn::TransportConfig::default();
|
||||||
t.max_idle_timeout(Some(
|
t.max_idle_timeout(Some(
|
||||||
quinn::IdleTimeout::try_from(MAX_IDLE).expect("8s is a valid QUIC idle timeout"),
|
quinn::IdleTimeout::try_from(idle).expect("clamped idle timeout is a valid QUIC value"),
|
||||||
));
|
));
|
||||||
t.keep_alive_interval(Some(KEEP_ALIVE));
|
t.keep_alive_interval(Some(keep_alive));
|
||||||
Arc::new(t)
|
Arc::new(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1767,23 +1785,36 @@ pub mod endpoint {
|
|||||||
.map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?;
|
.map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?;
|
||||||
let cert_der = rustls::pki_types::CertificateDer::from(cert.cert);
|
let cert_der = rustls::pki_types::CertificateDer::from(cert.cert);
|
||||||
let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());
|
let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());
|
||||||
server_from_der(cert_der, key_der.into(), addr)
|
server_from_der(cert_der, key_der.into(), addr, DEFAULT_IDLE_TIMEOUT)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Server endpoint from a persisted PEM identity (certificate + PKCS#8 private key) —
|
/// Server endpoint from a persisted PEM identity (certificate + PKCS#8 private key) —
|
||||||
/// the host's long-lived self-signed cert, so the fingerprint clients pin is stable
|
/// the host's long-lived self-signed cert, so the fingerprint clients pin is stable
|
||||||
/// across restarts.
|
/// across restarts. Uses the [`DEFAULT_IDLE_TIMEOUT`]; see [`server_with_identity_idle`] to tune it.
|
||||||
pub fn server_with_identity(
|
pub fn server_with_identity(
|
||||||
addr: std::net::SocketAddr,
|
addr: std::net::SocketAddr,
|
||||||
cert_pem: &str,
|
cert_pem: &str,
|
||||||
key_pem: &str,
|
key_pem: &str,
|
||||||
|
) -> anyhow_result::Result<quinn::Endpoint> {
|
||||||
|
server_with_identity_idle(addr, cert_pem, key_pem, DEFAULT_IDLE_TIMEOUT)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Like [`server_with_identity`] but with a host-chosen control-connection idle timeout — the
|
||||||
|
/// disconnect-detection latency (how long a vanished client takes to be declared dead). Shorter =
|
||||||
|
/// faster teardown/linger of a dropped session; the value is clamped to a ≥1s floor and its
|
||||||
|
/// keep-alive scales with it so a live session never false-closes.
|
||||||
|
pub fn server_with_identity_idle(
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
cert_pem: &str,
|
||||||
|
key_pem: &str,
|
||||||
|
idle: std::time::Duration,
|
||||||
) -> anyhow_result::Result<quinn::Endpoint> {
|
) -> anyhow_result::Result<quinn::Endpoint> {
|
||||||
use rustls::pki_types::pem::PemObject;
|
use rustls::pki_types::pem::PemObject;
|
||||||
let cert_der = rustls::pki_types::CertificateDer::from_pem_slice(cert_pem.as_bytes())
|
let cert_der = rustls::pki_types::CertificateDer::from_pem_slice(cert_pem.as_bytes())
|
||||||
.map_err(|e| anyhow_result::Error::msg(format!("cert pem: {e}")))?;
|
.map_err(|e| anyhow_result::Error::msg(format!("cert pem: {e}")))?;
|
||||||
let key_der = rustls::pki_types::PrivateKeyDer::from_pem_slice(key_pem.as_bytes())
|
let key_der = rustls::pki_types::PrivateKeyDer::from_pem_slice(key_pem.as_bytes())
|
||||||
.map_err(|e| anyhow_result::Error::msg(format!("key pem: {e}")))?;
|
.map_err(|e| anyhow_result::Error::msg(format!("key pem: {e}")))?;
|
||||||
server_from_der(cert_der, key_der, addr)
|
server_from_der(cert_der, key_der, addr, idle)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fixed ALPN for the punktfunk/1 QUIC handshake. Pinning it rejects a cross-protocol peer at the
|
/// Fixed ALPN for the punktfunk/1 QUIC handshake. Pinning it rejects a cross-protocol peer at the
|
||||||
@@ -1796,6 +1827,7 @@ pub mod endpoint {
|
|||||||
cert_der: rustls::pki_types::CertificateDer<'static>,
|
cert_der: rustls::pki_types::CertificateDer<'static>,
|
||||||
key_der: rustls::pki_types::PrivateKeyDer<'static>,
|
key_der: rustls::pki_types::PrivateKeyDer<'static>,
|
||||||
addr: std::net::SocketAddr,
|
addr: std::net::SocketAddr,
|
||||||
|
idle: std::time::Duration,
|
||||||
) -> anyhow_result::Result<quinn::Endpoint> {
|
) -> anyhow_result::Result<quinn::Endpoint> {
|
||||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||||
// Client auth is OFFERED but optional: a client that presents its self-signed
|
// Client auth is OFFERED but optional: a client that presents its self-signed
|
||||||
@@ -1810,7 +1842,7 @@ pub mod endpoint {
|
|||||||
let quic_cfg = quinn::crypto::rustls::QuicServerConfig::try_from(rustls_cfg)
|
let quic_cfg = quinn::crypto::rustls::QuicServerConfig::try_from(rustls_cfg)
|
||||||
.map_err(|e| anyhow_result::Error::msg(format!("quic server config: {e}")))?;
|
.map_err(|e| anyhow_result::Error::msg(format!("quic server config: {e}")))?;
|
||||||
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_cfg));
|
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_cfg));
|
||||||
server_config.transport_config(stream_transport()); // keep-alive — see stream_transport
|
server_config.transport_config(stream_transport_idle(idle)); // keep-alive — see stream_transport_idle
|
||||||
Ok(quinn::Endpoint::server(server_config, addr)?)
|
Ok(quinn::Endpoint::server(server_config, addr)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -293,6 +293,10 @@ fn open_gs_virtual_source(
|
|||||||
height: cfg.height,
|
height: cfg.height,
|
||||||
refresh_hz: cfg.fps,
|
refresh_hz: cfg.fps,
|
||||||
},
|
},
|
||||||
|
// GameStream's deliberate quit is the Moonlight "Quit App" (nvhttp `h_cancel`), not a QUIC
|
||||||
|
// close code — wiring it to skip-linger is a follow-up, so this path keeps normal keep-alive
|
||||||
|
// (a fresh, never-set flag).
|
||||||
|
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||||
)
|
)
|
||||||
.context("create virtual output at client resolution")?;
|
.context("create virtual output at client resolution")?;
|
||||||
// HDR: pass the negotiated `cfg.hdr` (client asked for HDR AND the host can deliver it). On the
|
// HDR: pass the negotiated `cfg.hdr` (client asked for HDR AND the host can deliver it). On the
|
||||||
|
|||||||
@@ -425,6 +425,13 @@ fn real_main() -> Result<()> {
|
|||||||
.map(str::to_string)
|
.map(str::to_string)
|
||||||
.or_else(|| std::env::var("PUNKTFUNK_DATA_PORT").ok())
|
.or_else(|| std::env::var("PUNKTFUNK_DATA_PORT").ok())
|
||||||
.and_then(|s| s.parse().ok()),
|
.and_then(|s| s.parse().ok()),
|
||||||
|
// Disconnect-detection latency (QUIC control-connection idle timeout): --idle-timeout-ms
|
||||||
|
// overrides PUNKTFUNK_IDLE_TIMEOUT_MS; absent = the core default (8s).
|
||||||
|
idle_timeout: get("--idle-timeout-ms")
|
||||||
|
.and_then(|s| s.trim().parse::<u64>().ok())
|
||||||
|
.filter(|&ms| ms > 0)
|
||||||
|
.map(std::time::Duration::from_millis)
|
||||||
|
.or_else(punktfunk1::idle_timeout_from_env),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// Windows service control: install/uninstall/start/stop/status + the SCM `run` entry point.
|
// Windows service control: install/uninstall/start/stop/status + the SCM `run` entry point.
|
||||||
|
|||||||
@@ -84,6 +84,12 @@ pub struct Punktfunk1Options {
|
|||||||
/// fixed port only fits one data plane at a time, so a concurrent session finding it busy
|
/// fixed port only fits one data plane at a time, so a concurrent session finding it busy
|
||||||
/// falls back to random + hole-punch (see [`bind_data_socket`]).
|
/// falls back to random + hole-punch (see [`bind_data_socket`]).
|
||||||
pub data_port: Option<u16>,
|
pub data_port: Option<u16>,
|
||||||
|
/// Control-connection idle timeout — the **disconnect-detection latency** (how long a vanished
|
||||||
|
/// client takes to be declared dead, which bounds how fast a dropped session tears down / lingers
|
||||||
|
/// and thus the reconnect-overlap window). `None` = the core default (8s). Set from
|
||||||
|
/// `PUNKTFUNK_IDLE_TIMEOUT_MS`; clamped to a ≥1s floor with a keep-alive that scales to it so a
|
||||||
|
/// live session never false-closes.
|
||||||
|
pub idle_timeout: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bind the per-session data-plane UDP socket, honoring [`Punktfunk1Options::data_port`]. Returns
|
/// Bind the per-session data-plane UDP socket, honoring [`Punktfunk1Options::data_port`]. Returns
|
||||||
@@ -185,6 +191,17 @@ pub(crate) struct NativeServe {
|
|||||||
/// overflow clients wait in the accept queue. Override with `--max-concurrent`.
|
/// overflow clients wait in the accept queue. Override with `--max-concurrent`.
|
||||||
pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 4;
|
pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 4;
|
||||||
|
|
||||||
|
/// The control-connection idle timeout (disconnect-detection latency) from
|
||||||
|
/// `PUNKTFUNK_IDLE_TIMEOUT_MS`; `None` (unset/invalid/zero) = the core default (8s). Clamped
|
||||||
|
/// downstream to a ≥1s floor with a keep-alive that scales to it, so a live session never false-closes.
|
||||||
|
pub(crate) fn idle_timeout_from_env() -> Option<std::time::Duration> {
|
||||||
|
std::env::var("PUNKTFUNK_IDLE_TIMEOUT_MS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.trim().parse::<u64>().ok())
|
||||||
|
.filter(|&ms| ms > 0)
|
||||||
|
.map(std::time::Duration::from_millis)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options {
|
pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options {
|
||||||
Punktfunk1Options {
|
Punktfunk1Options {
|
||||||
port: cfg.port,
|
port: cfg.port,
|
||||||
@@ -198,6 +215,7 @@ pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options {
|
|||||||
pairing_pin: None,
|
pairing_pin: None,
|
||||||
paired_store: None,
|
paired_store: None,
|
||||||
data_port: cfg.data_port,
|
data_port: cfg.data_port,
|
||||||
|
idle_timeout: idle_timeout_from_env(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -211,10 +229,11 @@ pub(crate) async fn serve(
|
|||||||
.context("load host identity (~/.config/punktfunk)")?;
|
.context("load host identity (~/.config/punktfunk)")?;
|
||||||
let fingerprint = endpoint::fingerprint_of_pem(&identity.cert_pem)
|
let fingerprint = endpoint::fingerprint_of_pem(&identity.cert_pem)
|
||||||
.map_err(|e| anyhow!("cert fingerprint: {e}"))?;
|
.map_err(|e| anyhow!("cert fingerprint: {e}"))?;
|
||||||
let ep = endpoint::server_with_identity(
|
let ep = endpoint::server_with_identity_idle(
|
||||||
([0, 0, 0, 0], opts.port).into(),
|
([0, 0, 0, 0], opts.port).into(),
|
||||||
&identity.cert_pem,
|
&identity.cert_pem,
|
||||||
&identity.key_pem,
|
&identity.key_pem,
|
||||||
|
opts.idle_timeout.unwrap_or(endpoint::DEFAULT_IDLE_TIMEOUT),
|
||||||
)
|
)
|
||||||
.map_err(|e| anyhow!("QUIC server endpoint: {e}"))?;
|
.map_err(|e| anyhow!("QUIC server endpoint: {e}"))?;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
@@ -379,6 +398,13 @@ const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10
|
|||||||
/// code lets a client tell "host busy" apart from a transport failure.
|
/// code lets a client tell "host busy" apart from a transport failure.
|
||||||
const REJECT_BUSY_CODE: u32 = 0x42;
|
const REJECT_BUSY_CODE: u32 = 0x42;
|
||||||
|
|
||||||
|
/// QUIC application error code a client closes with on a **deliberate quit** (a user "stop", not a
|
||||||
|
/// network drop). The host reads it off the connection's `ApplicationClosed` reason and tears the
|
||||||
|
/// session's virtual display down IMMEDIATELY, skipping the keep-alive linger — an unwanted disconnect
|
||||||
|
/// (idle timeout / reset / any other code) still lingers so a reconnect can resume. Shared with the
|
||||||
|
/// clients via `punktfunk_core::quic::QUIT_CLOSE_CODE`.
|
||||||
|
const QUIT_CODE: u32 = punktfunk_core::quic::QUIT_CLOSE_CODE;
|
||||||
|
|
||||||
/// Encoder bitrate (kbps) the host falls back to when the client expresses no preference
|
/// Encoder bitrate (kbps) the host falls back to when the client expresses no preference
|
||||||
/// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its
|
/// (`Hello::bitrate_kbps == 0`) — the long-standing 20 Mbps default. A client that knows its
|
||||||
/// link (e.g. after a speed test) requests an explicit rate instead.
|
/// link (e.g. after a speed test) requests an explicit rate instead.
|
||||||
@@ -730,8 +756,32 @@ async fn serve_session(
|
|||||||
// A same-client reconnect never conflicts. THIS session registers in the live set once its
|
// A same-client reconnect never conflicts. THIS session registers in the live set once its
|
||||||
// data plane is up (below the handshake), so a later client can see + steal it.
|
// data plane is up (below the handshake), so a later client can see + steal it.
|
||||||
{
|
{
|
||||||
use crate::vdisplay::admission::{admit, Admission};
|
use crate::vdisplay::admission::{admit, preempt_same_identity, Admission};
|
||||||
match admit(endpoint::peer_fingerprint(&conn)) {
|
let peer_fp = endpoint::peer_fingerprint(&conn);
|
||||||
|
|
||||||
|
// Same-client RECONNECT preempt (design §5.3 "preempts downstream"): if THIS client
|
||||||
|
// already has a live session, it's the zombie of an unwanted disconnect whose QUIC idle
|
||||||
|
// timer hasn't fired yet (detection lags a drop by up to `max_idle_timeout`). Signal it to
|
||||||
|
// stop and give it the release grace so it tears its display down — which, keep-alive on,
|
||||||
|
// lingers — and THIS reconnect REUSES that kept display below instead of landing on a
|
||||||
|
// fresh SECOND one. Independent of the mode_conflict arm (it's our OWN prior session, not
|
||||||
|
// a conflict with a different client), and it runs before we register ourselves so we
|
||||||
|
// never signal our own stop flag.
|
||||||
|
let own_zombies = preempt_same_identity(peer_fp);
|
||||||
|
if !own_zombies.is_empty() {
|
||||||
|
tracing::info!(
|
||||||
|
count = own_zombies.len(),
|
||||||
|
"reconnect: preempting this client's own zombie session(s) so the kept display is reused"
|
||||||
|
);
|
||||||
|
for z in &own_zombies {
|
||||||
|
z.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
// Same blind release grace the steal path uses — lets the zombie's loops notice the
|
||||||
|
// stop flag and drop its display (→ Lingering) before we acquire below.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
match admit(peer_fp) {
|
||||||
Admission::Separate => {}
|
Admission::Separate => {}
|
||||||
Admission::Join(m) => {
|
Admission::Join(m) => {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
@@ -1131,11 +1181,21 @@ async fn serve_session(
|
|||||||
|
|
||||||
// Stop signal: stream duration elapsed or the client went away.
|
// Stop signal: stream duration elapsed or the client went away.
|
||||||
let stop = Arc::new(AtomicBool::new(false));
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
|
// Deliberate-quit signal: set (before `stop`, so the display lease reads it on teardown) when the
|
||||||
|
// client closed the connection with `QUIT_CODE` — a user "stop", which skips the keep-alive linger.
|
||||||
|
// A bare disconnect / idle timeout leaves it false → the display lingers for a reconnect.
|
||||||
|
let quit = Arc::new(AtomicBool::new(false));
|
||||||
{
|
{
|
||||||
let stop = stop.clone();
|
let stop = stop.clone();
|
||||||
|
let quit = quit.clone();
|
||||||
let conn = conn.clone();
|
let conn = conn.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
conn.closed().await;
|
let reason = conn.closed().await;
|
||||||
|
if matches!(&reason, quinn::ConnectionError::ApplicationClosed(ac)
|
||||||
|
if ac.error_code == quinn::VarInt::from_u32(QUIT_CODE))
|
||||||
|
{
|
||||||
|
quit.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
stop.store(true, Ordering::SeqCst);
|
stop.store(true, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -1254,6 +1314,7 @@ async fn serve_session(
|
|||||||
crate::encode::ChromaFormat::Yuv420
|
crate::encode::ChromaFormat::Yuv420
|
||||||
};
|
};
|
||||||
let stop_stream = stop.clone();
|
let stop_stream = stop.clone();
|
||||||
|
let quit_stream = quit.clone();
|
||||||
let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target
|
let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target
|
||||||
let conn_stream = conn.clone(); // for sending the source's real HDR metadata (0xCE) mid-stream
|
let conn_stream = conn.clone(); // for sending the source's real HDR metadata (0xCE) mid-stream
|
||||||
// Per-AU host-timing emission (0xCF): only when the client advertised the cap bit. All
|
// Per-AU host-timing emission (0xCF): only when the client advertised the cap bit. All
|
||||||
@@ -1325,6 +1386,7 @@ async fn serve_session(
|
|||||||
mode,
|
mode,
|
||||||
seconds,
|
seconds,
|
||||||
stop: stop_stream,
|
stop: stop_stream,
|
||||||
|
quit: quit_stream,
|
||||||
reconfig: reconfig_rx,
|
reconfig: reconfig_rx,
|
||||||
keyframe: keyframe_rx,
|
keyframe: keyframe_rx,
|
||||||
compositor,
|
compositor,
|
||||||
@@ -2864,6 +2926,9 @@ struct SessionContext {
|
|||||||
seconds: u32,
|
seconds: u32,
|
||||||
/// Session stop flag (set on disconnect / reconnect-preempt).
|
/// Session stop flag (set on disconnect / reconnect-preempt).
|
||||||
stop: Arc<AtomicBool>,
|
stop: Arc<AtomicBool>,
|
||||||
|
/// Deliberate-quit flag (set when the client closed with `QUIT_CODE`): the display lease reads it
|
||||||
|
/// on teardown to skip the keep-alive linger for a user "stop" (vs. an unwanted disconnect).
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
/// Accepted mid-stream mode switches — the pipeline is rebuilt at the new mode.
|
/// Accepted mid-stream mode switches — the pipeline is rebuilt at the new mode.
|
||||||
reconfig: std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
reconfig: std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
||||||
/// Client decode-recovery keyframe requests.
|
/// Client decode-recovery keyframe requests.
|
||||||
@@ -2923,6 +2988,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
|||||||
mode,
|
mode,
|
||||||
seconds,
|
seconds,
|
||||||
stop,
|
stop,
|
||||||
|
quit,
|
||||||
reconfig,
|
reconfig,
|
||||||
keyframe,
|
keyframe,
|
||||||
compositor,
|
compositor,
|
||||||
@@ -2973,7 +3039,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
|||||||
let _idd_setup_guard = (plan.capture == crate::session_plan::CaptureBackend::IddPush)
|
let _idd_setup_guard = (plan.capture == crate::session_plan::CaptureBackend::IddPush)
|
||||||
.then(|| crate::vdisplay::manager::vdm().begin_idd_setup(stop.clone()));
|
.then(|| crate::vdisplay::manager::vdm().begin_idd_setup(stop.clone()));
|
||||||
let (mut capturer, mut enc, mut frame, mut interval) =
|
let (mut capturer, mut enc, mut frame, mut interval) =
|
||||||
build_pipeline_with_retry(&mut vd, mode, bitrate_kbps, bit_depth, plan)?;
|
build_pipeline_with_retry(&mut vd, mode, bitrate_kbps, bit_depth, plan, &quit)?;
|
||||||
// Setup done — release the IDD-push setup lock so the next reconnect can begin (and preempt us).
|
// Setup done — release the IDD-push setup lock so the next reconnect can begin (and preempt us).
|
||||||
#[cfg(target_os = "windows")]
|
#[cfg(target_os = "windows")]
|
||||||
drop(_idd_setup_guard);
|
drop(_idd_setup_guard);
|
||||||
@@ -3141,6 +3207,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
|||||||
bitrate_kbps,
|
bitrate_kbps,
|
||||||
bit_depth,
|
bit_depth,
|
||||||
plan,
|
plan,
|
||||||
|
&quit,
|
||||||
)?;
|
)?;
|
||||||
Ok((new_vd, pipe))
|
Ok((new_vd, pipe))
|
||||||
})();
|
})();
|
||||||
@@ -3184,7 +3251,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
|||||||
// Build the new pipeline BEFORE dropping the old one: the host already acked
|
// Build the new pipeline BEFORE dropping the old one: the host already acked
|
||||||
// the switch as accepted, so a rebuild failure must not kill an otherwise
|
// the switch as accepted, so a rebuild failure must not kill an otherwise
|
||||||
// healthy session — keep streaming the current mode and log instead.
|
// healthy session — keep streaming the current mode and log instead.
|
||||||
match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth, plan) {
|
match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth, plan, &quit) {
|
||||||
Ok(next_pipe) => {
|
Ok(next_pipe) => {
|
||||||
(capturer, enc, frame, interval) = next_pipe;
|
(capturer, enc, frame, interval) = next_pipe;
|
||||||
cur_mode = new_mode;
|
cur_mode = new_mode;
|
||||||
@@ -3305,6 +3372,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
|||||||
bitrate_kbps,
|
bitrate_kbps,
|
||||||
bit_depth,
|
bit_depth,
|
||||||
plan,
|
plan,
|
||||||
|
&quit,
|
||||||
) {
|
) {
|
||||||
Ok(p) => break p,
|
Ok(p) => break p,
|
||||||
Err(e2) => {
|
Err(e2) => {
|
||||||
@@ -3531,6 +3599,7 @@ fn build_pipeline_with_retry(
|
|||||||
bitrate_kbps: u32,
|
bitrate_kbps: u32,
|
||||||
bit_depth: u8,
|
bit_depth: u8,
|
||||||
plan: crate::session_plan::SessionPlan,
|
plan: crate::session_plan::SessionPlan,
|
||||||
|
quit: &Arc<AtomicBool>,
|
||||||
) -> Result<Pipeline> {
|
) -> Result<Pipeline> {
|
||||||
// ~10s first-frame wait per attempt. 8 gives a ~90s budget for the SLOW case: a host-managed
|
// ~10s first-frame wait per attempt. 8 gives a ~90s budget for the SLOW case: a host-managed
|
||||||
// gamescope session cold-starting Steam Big Picture (the SteamOS/Bazzite takeover) can take
|
// gamescope session cold-starting Steam Big Picture (the SteamOS/Bazzite takeover) can take
|
||||||
@@ -3557,7 +3626,7 @@ fn build_pipeline_with_retry(
|
|||||||
const MAX_ATTEMPTS: u32 = 8;
|
const MAX_ATTEMPTS: u32 = 8;
|
||||||
let mut backoff = std::time::Duration::from_millis(500);
|
let mut backoff = std::time::Duration::from_millis(500);
|
||||||
for attempt in 1..=MAX_ATTEMPTS {
|
for attempt in 1..=MAX_ATTEMPTS {
|
||||||
match build_pipeline(vd, mode, bitrate_kbps, bit_depth, plan) {
|
match build_pipeline(vd, mode, bitrate_kbps, bit_depth, plan, quit) {
|
||||||
Ok(pipe) => {
|
Ok(pipe) => {
|
||||||
if attempt > 1 {
|
if attempt > 1 {
|
||||||
tracing::info!(attempt, "pipeline up after retry");
|
tracing::info!(attempt, "pipeline up after retry");
|
||||||
@@ -3620,12 +3689,15 @@ fn build_pipeline(
|
|||||||
bitrate_kbps: u32,
|
bitrate_kbps: u32,
|
||||||
bit_depth: u8,
|
bit_depth: u8,
|
||||||
plan: crate::session_plan::SessionPlan,
|
plan: crate::session_plan::SessionPlan,
|
||||||
|
quit: &Arc<AtomicBool>,
|
||||||
) -> Result<Pipeline> {
|
) -> Result<Pipeline> {
|
||||||
// Acquire through the registry (design/display-management.md): on Linux this pools the display
|
// Acquire through the registry (design/display-management.md): on Linux this pools the display
|
||||||
// for keep-alive (reuse a kept one, or create + keep the backend's keepalive so it outlives the
|
// for keep-alive (reuse a kept one, or create + keep the backend's keepalive so it outlives the
|
||||||
// session per policy); on Windows it delegates to `vd.create` (the manager already leases). The
|
// session per policy); on Windows it delegates to `vd.create` (the manager already leases). The
|
||||||
// returned `VirtualOutput`'s keepalive is a registry lease — the capturer holds it as before.
|
// returned `VirtualOutput`'s keepalive is a registry lease — the capturer holds it as before. The
|
||||||
let vout = crate::vdisplay::registry::acquire(vd, mode).context("create virtual output")?;
|
// `quit` flag rides into the lease so a deliberate-quit teardown skips the keep-alive linger.
|
||||||
|
let vout = crate::vdisplay::registry::acquire(vd, mode, quit.clone())
|
||||||
|
.context("create virtual output")?;
|
||||||
// The backend reports the refresh it actually achieved in `preferred_mode.2` (KWin may cap a
|
// The backend reports the refresh it actually achieved in `preferred_mode.2` (KWin may cap a
|
||||||
// virtual output at 60 Hz if the custom-mode install was rejected). Pace the encoder + frame
|
// virtual output at 60 Hz if the custom-mode install was rejected). Pace the encoder + frame
|
||||||
// clock to that, not the requested rate, so we don't emit phantom duplicate frames over a
|
// clock to that, not the requested rate, so we don't emit phantom duplicate frames over a
|
||||||
@@ -3933,6 +4005,7 @@ mod tests {
|
|||||||
pairing_pin: None,
|
pairing_pin: None,
|
||||||
paired_store: None,
|
paired_store: None,
|
||||||
data_port: None,
|
data_port: None,
|
||||||
|
idle_timeout: None,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||||
@@ -4128,6 +4201,7 @@ mod tests {
|
|||||||
pairing_pin: None,
|
pairing_pin: None,
|
||||||
paired_store: None, // unused: the shared `np` IS the store handle
|
paired_store: None, // unused: the shared `np` IS the store handle
|
||||||
data_port: None,
|
data_port: None,
|
||||||
|
idle_timeout: None,
|
||||||
},
|
},
|
||||||
0, // no mgmt API in this test → advertise no `mgmt` mDNS port
|
0, // no mgmt API in this test → advertise no `mgmt` mDNS port
|
||||||
np_host,
|
np_host,
|
||||||
@@ -4227,6 +4301,7 @@ mod tests {
|
|||||||
pairing_pin: Some("4321".into()),
|
pairing_pin: Some("4321".into()),
|
||||||
paired_store: Some(test_paired_path()),
|
paired_store: Some(test_paired_path()),
|
||||||
data_port: None,
|
data_port: None,
|
||||||
|
idle_timeout: None,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||||
|
|||||||
@@ -115,6 +115,31 @@ pub fn admit(req_identity: Option<[u8; 32]>) -> Admission {
|
|||||||
decide(effective_conflict(), req_identity, &table().lock().unwrap())
|
decide(effective_conflict(), req_identity, &table().lock().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pure core of [`preempt_same_identity`]: the stop flags of live sessions owned by the SAME client
|
||||||
|
/// as `req_identity` (its own zombies). Testable over a slice (the public fn locks the global table).
|
||||||
|
fn same_identity_stops(
|
||||||
|
req_identity: Option<[u8; 32]>,
|
||||||
|
live: &[LiveSession],
|
||||||
|
) -> Vec<Arc<AtomicBool>> {
|
||||||
|
live.iter()
|
||||||
|
.filter(|s| same_client(s.identity, req_identity))
|
||||||
|
.map(|s| Arc::clone(&s.stop))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Preempt this reconnecting client's OWN still-live session(s). A client has at most one live
|
||||||
|
/// session, so a new connection from an already-registered identity is a **reconnect** — the old
|
||||||
|
/// session is a zombie whose QUIC idle timer hasn't fired yet (an unwanted disconnect is only
|
||||||
|
/// declared dead after `max_idle_timeout`, ~seconds later). Return its stop flag(s) so the caller
|
||||||
|
/// signals them and waits the release grace: the zombie tears its display down, which (keep-alive on)
|
||||||
|
/// lingers, and THIS reconnect **reuses** that kept display instead of landing on a fresh SECOND one
|
||||||
|
/// (the "thrown onto a second display while the old one keeps streaming" bug). Anonymous (`None`)
|
||||||
|
/// never matches — same limitation as `steal`/`reject`. Call this BEFORE [`admit`] and before this
|
||||||
|
/// session registers itself, so it only ever signals a *prior* session's flag, never its own.
|
||||||
|
pub fn preempt_same_identity(req_identity: Option<[u8; 32]>) -> Vec<Arc<AtomicBool>> {
|
||||||
|
same_identity_stops(req_identity, &table().lock().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
/// Register a now-admitted, live session; the returned guard removes it on drop (session end). Call
|
/// Register a now-admitted, live session; the returned guard removes it on drop (session end). Call
|
||||||
/// AFTER [`admit`] (so a session never conflicts with itself) and once the mode + stop flag are known.
|
/// AFTER [`admit`] (so a session never conflicts with itself) and once the mode + stop flag are known.
|
||||||
pub fn register(
|
pub fn register(
|
||||||
@@ -225,6 +250,20 @@ mod tests {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn same_identity_stops_targets_own_zombie_only() {
|
||||||
|
let live = [
|
||||||
|
sess(Some(1), (2560, 1440, 60)), // this client's prior (zombie) session
|
||||||
|
sess(Some(2), (1920, 1080, 60)), // a different client
|
||||||
|
];
|
||||||
|
// Reconnecting as client 1 → its own zombie's stop is returned (to preempt), not client 2's.
|
||||||
|
assert_eq!(same_identity_stops(fp(1), &live).len(), 1);
|
||||||
|
// A client with no prior session (fp 3) has nothing of its own to preempt.
|
||||||
|
assert_eq!(same_identity_stops(fp(3), &live).len(), 0);
|
||||||
|
// Anonymous never matches — we can't prove it's the same client.
|
||||||
|
assert_eq!(same_identity_stops(None, &live).len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn join_targets_the_oldest_other_session() {
|
fn join_targets_the_oldest_other_session() {
|
||||||
let live = [
|
let live = [
|
||||||
|
|||||||
@@ -81,16 +81,23 @@ fn topology_str() -> String {
|
|||||||
///
|
///
|
||||||
/// Windows delegates to the [`manager`](super::manager) via `vd.create` (unchanged); Linux uses the
|
/// Windows delegates to the [`manager`](super::manager) via `vd.create` (unchanged); Linux uses the
|
||||||
/// pool below; other platforms pass through.
|
/// pool below; other platforms pass through.
|
||||||
|
/// `quit` is the session's deliberate-quit flag: when the session ends with it set (the client closed
|
||||||
|
/// with the quit application code — a user "stop", not a network drop), the display is torn down
|
||||||
|
/// **immediately**, skipping the keep-alive linger. A bare disconnect leaves it `false` → normal linger.
|
||||||
pub fn acquire(
|
pub fn acquire(
|
||||||
vd: &mut Box<dyn super::VirtualDisplay>,
|
vd: &mut Box<dyn super::VirtualDisplay>,
|
||||||
mode: super::Mode,
|
mode: super::Mode,
|
||||||
|
quit: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||||
) -> Result<super::VirtualOutput> {
|
) -> Result<super::VirtualOutput> {
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
{
|
{
|
||||||
linux::acquire(vd, mode)
|
linux::acquire(vd, mode, quit)
|
||||||
}
|
}
|
||||||
#[cfg(not(target_os = "linux"))]
|
#[cfg(not(target_os = "linux"))]
|
||||||
{
|
{
|
||||||
|
// Windows leases in the manager (its own linger); the deliberate-quit skip is not wired
|
||||||
|
// through there yet, so the flag is accepted but unused off Linux.
|
||||||
|
let _ = quit;
|
||||||
vd.create(mode)
|
vd.create(mode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -163,8 +170,8 @@ pub fn release(slot: Option<u64>) -> usize {
|
|||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
mod linux {
|
mod linux {
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
use std::sync::{Mutex, Once, OnceLock};
|
use std::sync::{Arc, Mutex, Once, OnceLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
@@ -304,16 +311,21 @@ mod linux {
|
|||||||
node_id: u32,
|
node_id: u32,
|
||||||
preferred_mode: Option<(u32, u32, u32)>,
|
preferred_mode: Option<(u32, u32, u32)>,
|
||||||
gen: u64,
|
gen: u64,
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
) -> VirtualOutput {
|
) -> VirtualOutput {
|
||||||
VirtualOutput {
|
VirtualOutput {
|
||||||
node_id,
|
node_id,
|
||||||
remote_fd: None,
|
remote_fd: None,
|
||||||
preferred_mode,
|
preferred_mode,
|
||||||
keepalive: Box::new(DisplayLease { gen }),
|
keepalive: Box::new(DisplayLease { gen, quit }),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn acquire(vd: &mut Box<dyn VirtualDisplay>, mode: Mode) -> Result<VirtualOutput> {
|
pub(super) fn acquire(
|
||||||
|
vd: &mut Box<dyn VirtualDisplay>,
|
||||||
|
mode: Mode,
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
|
) -> Result<VirtualOutput> {
|
||||||
ensure_timer();
|
ensure_timer();
|
||||||
let backend = vd.name();
|
let backend = vd.name();
|
||||||
let r = reg();
|
let r = reg();
|
||||||
@@ -343,7 +355,7 @@ mod linux {
|
|||||||
e.life.acquire();
|
e.life.acquire();
|
||||||
let gen = r.gen.fetch_add(1, Ordering::Relaxed);
|
let gen = r.gen.fetch_add(1, Ordering::Relaxed);
|
||||||
e.gen = gen;
|
e.gen = gen;
|
||||||
let out = output_for(e.node_id, e.preferred_mode, gen);
|
let out = output_for(e.node_id, e.preferred_mode, gen, quit);
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
backend,
|
backend,
|
||||||
node_id = e.node_id,
|
node_id = e.node_id,
|
||||||
@@ -443,15 +455,21 @@ mod linux {
|
|||||||
if (position.x, position.y) != (0, 0) {
|
if (position.x, position.y) != (0, 0) {
|
||||||
vd.apply_position(position.x, position.y);
|
vd.apply_position(position.x, position.y);
|
||||||
}
|
}
|
||||||
Ok(output_for(node_id, preferred_mode, gen))
|
Ok(output_for(node_id, preferred_mode, gen, quit))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The [`DisplayLease`] `Drop` path: release the session's hold on the pooled display. The
|
/// The [`DisplayLease`] `Drop` path: release the session's hold on the pooled display. The
|
||||||
/// lifecycle machine decides linger / pin / teardown; a torn-down entry's keepalive drops *after*
|
/// lifecycle machine decides linger / pin / teardown; a torn-down entry's keepalive drops *after*
|
||||||
/// the lock is released.
|
/// the lock is released.
|
||||||
fn release(gen: u64) {
|
fn release(gen: u64, force_immediate: bool) {
|
||||||
let Some(r) = REG.get() else { return };
|
let Some(r) = REG.get() else { return };
|
||||||
let linger = linger();
|
// A deliberate quit (the client closed with the quit code — a user "stop") tears the display
|
||||||
|
// down NOW, overriding the keep-alive linger; a bare disconnect honors the policy.
|
||||||
|
let linger = if force_immediate {
|
||||||
|
Linger::Immediate
|
||||||
|
} else {
|
||||||
|
linger()
|
||||||
|
};
|
||||||
let (torn_down, restore) = {
|
let (torn_down, restore) = {
|
||||||
let mut es = r.entries.lock().unwrap();
|
let mut es = r.entries.lock().unwrap();
|
||||||
let Some(idx) = es.iter().position(|e| e.gen == gen) else {
|
let Some(idx) = es.iter().position(|e| e.gen == gen) else {
|
||||||
@@ -489,10 +507,17 @@ mod linux {
|
|||||||
restore();
|
restore();
|
||||||
}
|
}
|
||||||
if let Some(e) = torn_down {
|
if let Some(e) = torn_down {
|
||||||
tracing::info!(
|
if force_immediate {
|
||||||
backend = e.backend,
|
tracing::info!(
|
||||||
"virtual display torn down (keep-alive off / released)"
|
backend = e.backend,
|
||||||
);
|
"virtual display torn down (deliberate quit — keep-alive skipped)"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
backend = e.backend,
|
||||||
|
"virtual display torn down (keep-alive off / released)"
|
||||||
|
);
|
||||||
|
}
|
||||||
drop(e); // outside the lock — the keepalive Drop may block
|
drop(e); // outside the lock — the keepalive Drop may block
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -683,11 +708,15 @@ mod linux {
|
|||||||
/// registry hold; a stale lease (its entry was reused + re-stamped, or torn down) is a no-op.
|
/// registry hold; a stale lease (its entry was reused + re-stamped, or torn down) is a no-op.
|
||||||
struct DisplayLease {
|
struct DisplayLease {
|
||||||
gen: u64,
|
gen: u64,
|
||||||
|
/// The session's deliberate-quit flag: set when the client closes with the quit application
|
||||||
|
/// code (a user "stop", not a network drop), so this lease's `Drop` tears the display down
|
||||||
|
/// immediately instead of lingering. `false` on a bare disconnect → normal keep-alive.
|
||||||
|
quit: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for DisplayLease {
|
impl Drop for DisplayLease {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
release(self.gen);
|
release(self.gen, self.quit.load(Ordering::SeqCst));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -274,6 +274,15 @@
|
|||||||
#define VIDEO_CAP_HOST_TIMING 8
|
#define VIDEO_CAP_HOST_TIMING 8
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
|
// QUIC application error code a punktfunk/1 client closes the control connection with on a
|
||||||
|
// **deliberate quit** (a user "stop", not a network drop). The host reads it off the connection's
|
||||||
|
// `ApplicationClosed` reason and tears the session's virtual display down immediately, skipping the
|
||||||
|
// keep-alive linger; any other close reason (idle timeout, reset, a bare code 0) still lingers so a
|
||||||
|
// reconnect can resume. Shared so host + every client agree on the code.
|
||||||
|
#define QUIT_CLOSE_CODE 81
|
||||||
|
#endif
|
||||||
|
|
||||||
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
#if defined(PUNKTFUNK_FEATURE_QUIC)
|
||||||
// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software**
|
// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software**
|
||||||
// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST
|
// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST
|
||||||
|
|||||||
Reference in New Issue
Block a user