From 94c556f0e300e1dd15e2c3866f2323119a14288c Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sun, 28 Jun 2026 09:00:35 +0000 Subject: [PATCH] fix(host/capture): recover from compositor loss instead of freezing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the compositor is torn down mid-stream (a Gaming↔Desktop switch removes the virtual output), its PipeWire stream leaves Streaming for Paused rather than disconnecting. try_latest treated that as Ok(None) ("static desktop — repeat the last frame"), so the stream froze on the last frame forever and neither recovery path fired: the capture-loss rebuild keys on Err, and the session watcher keys on a session-KIND change (a desktop→desktop new KWin instance is the same kind). Track the PipeWire stream state via state_changed (a `streaming` flag) and, in try_latest, surface a sustained non-Streaming state (1.5s grace for a transient renegotiation blip) as a capture-loss Err — which the encode loop already handles by rebuilding the pipeline in place. A static desktop stays Streaming, so no false trigger. Complements the now-default session watcher. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../punktfunk-host/src/capture/linux/mod.rs | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/crates/punktfunk-host/src/capture/linux/mod.rs b/crates/punktfunk-host/src/capture/linux/mod.rs index b420119..03c3ee8 100644 --- a/crates/punktfunk-host/src/capture/linux/mod.rs +++ b/crates/punktfunk-host/src/capture/linux/mod.rs @@ -40,6 +40,13 @@ pub struct PortalCapturer { /// branch to tell "format never negotiated" (modifier/format mismatch) apart from "negotiated /// but no buffers arrived" (compositor idle/unmapped) — the two black-screen root causes. negotiated: Arc, + /// True only while the PipeWire stream is `Streaming`. [`try_latest`](Self::try_latest) reads it + /// to distinguish a static desktop (alive, no new buffers) from a dead source (left `Streaming`). + streaming: Arc, + /// When the stream first dropped out of `Streaming` with no new frame; used to grace a transient + /// renegotiation before declaring the source lost. Cleared whenever a frame arrives or the stream + /// is `Streaming`. + stall_since: Option, /// The PipeWire node this capturer consumes — surfaced in error messages for diagnosis. node_id: u32, /// Stops the PipeWire loop on teardown (sent in `Drop`). Without it a dropped or failed @@ -109,6 +116,7 @@ struct PwHandles { frames: Receiver, active: Arc, negotiated: Arc, + streaming: Arc, quit: ::pipewire::channel::Sender<()>, join: thread::JoinHandle<()>, } @@ -121,6 +129,8 @@ impl PwHandles { frames: self.frames, active: self.active, negotiated: self.negotiated, + streaming: self.streaming, + stall_since: None, node_id, quit: Some(self.quit), join: Some(self.join), @@ -143,6 +153,8 @@ fn spawn_pipewire( let active_cb = active.clone(); let negotiated = Arc::new(AtomicBool::new(false)); let negotiated_cb = negotiated.clone(); + let streaming = Arc::new(AtomicBool::new(false)); + let streaming_cb = streaming.clone(); // pipewire's own cross-thread channel: the receiver attaches to the loop and quits it; the // sender lives on the capturer and fires in its `Drop`. Absolute `::pipewire` path — the // inner `mod pipewire` shadows the crate name at this scope. @@ -157,6 +169,7 @@ fn spawn_pipewire( frame_tx, active_cb, negotiated_cb, + streaming_cb, zerocopy, preferred, quit_rx, @@ -169,6 +182,7 @@ fn spawn_pipewire( frames: frame_rx, active, negotiated, + streaming, quit: quit_tx, join, }) @@ -219,6 +233,28 @@ impl Capturer for PortalCapturer { } } } + if latest.is_some() || self.streaming.load(Ordering::Relaxed) { + // A frame arrived, or the source is alive but idle (static desktop) — normal. Clear any + // stall and repeat the last frame on `None`, exactly as before. + self.stall_since = None; + return Ok(latest); + } + // No new frame AND the stream has left `Streaming` (Paused/Unconnected/Error). The source + // went away — a compositor torn down on a Gaming↔Desktop switch, a removed virtual output. + // Grace a brief window (a transient mid-stream renegotiation can blip out of Streaming and + // back) before declaring it lost so the encode loop rebuilds in place rather than freezing + // on the last frame forever. + const STALL_GRACE: Duration = Duration::from_millis(1500); + let since = *self.stall_since.get_or_insert_with(std::time::Instant::now); + if since.elapsed() >= STALL_GRACE { + self.stall_since = None; + return Err(anyhow!( + "PipeWire source stalled (node {}): stream left Streaming for >{}ms with no frames \ + — the compositor/virtual output went away (session switch?)", + self.node_id, + STALL_GRACE.as_millis() + )); + } Ok(latest) } @@ -467,6 +503,10 @@ mod pipewire { /// Set once a video format is agreed (`param_changed`), so a first-frame timeout can tell /// "format never negotiated" apart from "negotiated but no buffers arrived". negotiated: Arc, + /// True only while the PipeWire stream is in `Streaming` (the source is alive). Goes false on + /// `Paused`/`Unconnected`/`Error` — the source vanished (compositor torn down on a session + /// switch). Read by [`PortalCapturer::try_latest`] to surface a sustained drop as a loss. + streaming: Arc, /// Present when zero-copy is enabled on NVIDIA: imports a dmabuf → CUDA device buffer. importer: Option, /// VAAPI zero-copy: hand the raw dmabuf to the encoder (which imports + GPU-CSCs it) instead @@ -1056,6 +1096,7 @@ mod pipewire { tx: SyncSender, active: Arc, negotiated: Arc, + streaming: Arc, zerocopy: bool, preferred: Option<(u32, u32, u32)>, quit_rx: pw::channel::Receiver<()>, @@ -1150,6 +1191,7 @@ mod pipewire { tx, active, negotiated, + streaming, importer, vaapi_passthrough, nv12: crate::zerocopy::nv12_enabled(), @@ -1174,8 +1216,17 @@ mod pipewire { let _listener = stream .add_local_listener_with_user_data(data) - .state_changed(|_stream, _ud, old, new| { + .state_changed(|_stream, ud, old, new| { tracing::info!(?old, ?new, "pipewire stream state"); + // Track whether the node is actively producing. A live source sits in `Streaming` + // (a static desktop just sends no buffers); anything else — `Paused`/`Unconnected`/ + // `Error` — means the source went away (compositor died, virtual output removed on a + // Gaming↔Desktop switch). `try_latest` turns a sustained non-Streaming state into a + // capture-loss so the encode loop rebuilds instead of freezing on the last frame. + ud.streaming.store( + matches!(new, pw::stream::StreamState::Streaming), + Ordering::Relaxed, + ); }) .param_changed(|_stream, ud, id, param| { let Some(param) = param else { return };