fix(host/capture): recover from compositor loss instead of freezing
apple / screenshots (push) Successful in 5m7s
apple / swift (push) Successful in 1m1s
windows-host / package (push) Successful in 7m26s
android / android (push) Successful in 4m50s
ci / web (push) Successful in 50s
ci / docs-site (push) Successful in 54s
decky / build-publish (push) Successful in 11s
ci / rust (push) Successful in 4m51s
deb / build-publish (push) Successful in 2m29s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 3s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
ci / bench (push) Successful in 4m37s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m1s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 8m47s
apple / screenshots (push) Successful in 5m7s
apple / swift (push) Successful in 1m1s
windows-host / package (push) Successful in 7m26s
android / android (push) Successful in 4m50s
ci / web (push) Successful in 50s
ci / docs-site (push) Successful in 54s
decky / build-publish (push) Successful in 11s
ci / rust (push) Successful in 4m51s
deb / build-publish (push) Successful in 2m29s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 3s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
ci / bench (push) Successful in 4m37s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m1s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 8m47s
When the compositor is torn down mid-stream (a Gaming↔Desktop switch removes
the virtual output), its PipeWire stream leaves Streaming for Paused rather
than disconnecting. try_latest treated that as Ok(None) ("static desktop —
repeat the last frame"), so the stream froze on the last frame forever and
neither recovery path fired: the capture-loss rebuild keys on Err, and the
session watcher keys on a session-KIND change (a desktop→desktop new KWin
instance is the same kind).
Track the PipeWire stream state via state_changed (a `streaming` flag) and,
in try_latest, surface a sustained non-Streaming state (1.5s grace for a
transient renegotiation blip) as a capture-loss Err — which the encode loop
already handles by rebuilding the pipeline in place. A static desktop stays
Streaming, so no false trigger. Complements the now-default session watcher.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -40,6 +40,13 @@ pub struct PortalCapturer {
|
|||||||
/// branch to tell "format never negotiated" (modifier/format mismatch) apart from "negotiated
|
/// branch to tell "format never negotiated" (modifier/format mismatch) apart from "negotiated
|
||||||
/// but no buffers arrived" (compositor idle/unmapped) — the two black-screen root causes.
|
/// but no buffers arrived" (compositor idle/unmapped) — the two black-screen root causes.
|
||||||
negotiated: Arc<AtomicBool>,
|
negotiated: Arc<AtomicBool>,
|
||||||
|
/// True only while the PipeWire stream is `Streaming`. [`try_latest`](Self::try_latest) reads it
|
||||||
|
/// to distinguish a static desktop (alive, no new buffers) from a dead source (left `Streaming`).
|
||||||
|
streaming: Arc<AtomicBool>,
|
||||||
|
/// When the stream first dropped out of `Streaming` with no new frame; used to grace a transient
|
||||||
|
/// renegotiation before declaring the source lost. Cleared whenever a frame arrives or the stream
|
||||||
|
/// is `Streaming`.
|
||||||
|
stall_since: Option<std::time::Instant>,
|
||||||
/// The PipeWire node this capturer consumes — surfaced in error messages for diagnosis.
|
/// The PipeWire node this capturer consumes — surfaced in error messages for diagnosis.
|
||||||
node_id: u32,
|
node_id: u32,
|
||||||
/// Stops the PipeWire loop on teardown (sent in `Drop`). Without it a dropped or failed
|
/// Stops the PipeWire loop on teardown (sent in `Drop`). Without it a dropped or failed
|
||||||
@@ -109,6 +116,7 @@ struct PwHandles {
|
|||||||
frames: Receiver<CapturedFrame>,
|
frames: Receiver<CapturedFrame>,
|
||||||
active: Arc<AtomicBool>,
|
active: Arc<AtomicBool>,
|
||||||
negotiated: Arc<AtomicBool>,
|
negotiated: Arc<AtomicBool>,
|
||||||
|
streaming: Arc<AtomicBool>,
|
||||||
quit: ::pipewire::channel::Sender<()>,
|
quit: ::pipewire::channel::Sender<()>,
|
||||||
join: thread::JoinHandle<()>,
|
join: thread::JoinHandle<()>,
|
||||||
}
|
}
|
||||||
@@ -121,6 +129,8 @@ impl PwHandles {
|
|||||||
frames: self.frames,
|
frames: self.frames,
|
||||||
active: self.active,
|
active: self.active,
|
||||||
negotiated: self.negotiated,
|
negotiated: self.negotiated,
|
||||||
|
streaming: self.streaming,
|
||||||
|
stall_since: None,
|
||||||
node_id,
|
node_id,
|
||||||
quit: Some(self.quit),
|
quit: Some(self.quit),
|
||||||
join: Some(self.join),
|
join: Some(self.join),
|
||||||
@@ -143,6 +153,8 @@ fn spawn_pipewire(
|
|||||||
let active_cb = active.clone();
|
let active_cb = active.clone();
|
||||||
let negotiated = Arc::new(AtomicBool::new(false));
|
let negotiated = Arc::new(AtomicBool::new(false));
|
||||||
let negotiated_cb = negotiated.clone();
|
let negotiated_cb = negotiated.clone();
|
||||||
|
let streaming = Arc::new(AtomicBool::new(false));
|
||||||
|
let streaming_cb = streaming.clone();
|
||||||
// pipewire's own cross-thread channel: the receiver attaches to the loop and quits it; the
|
// pipewire's own cross-thread channel: the receiver attaches to the loop and quits it; the
|
||||||
// sender lives on the capturer and fires in its `Drop`. Absolute `::pipewire` path — the
|
// sender lives on the capturer and fires in its `Drop`. Absolute `::pipewire` path — the
|
||||||
// inner `mod pipewire` shadows the crate name at this scope.
|
// inner `mod pipewire` shadows the crate name at this scope.
|
||||||
@@ -157,6 +169,7 @@ fn spawn_pipewire(
|
|||||||
frame_tx,
|
frame_tx,
|
||||||
active_cb,
|
active_cb,
|
||||||
negotiated_cb,
|
negotiated_cb,
|
||||||
|
streaming_cb,
|
||||||
zerocopy,
|
zerocopy,
|
||||||
preferred,
|
preferred,
|
||||||
quit_rx,
|
quit_rx,
|
||||||
@@ -169,6 +182,7 @@ fn spawn_pipewire(
|
|||||||
frames: frame_rx,
|
frames: frame_rx,
|
||||||
active,
|
active,
|
||||||
negotiated,
|
negotiated,
|
||||||
|
streaming,
|
||||||
quit: quit_tx,
|
quit: quit_tx,
|
||||||
join,
|
join,
|
||||||
})
|
})
|
||||||
@@ -219,6 +233,28 @@ impl Capturer for PortalCapturer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if latest.is_some() || self.streaming.load(Ordering::Relaxed) {
|
||||||
|
// A frame arrived, or the source is alive but idle (static desktop) — normal. Clear any
|
||||||
|
// stall and repeat the last frame on `None`, exactly as before.
|
||||||
|
self.stall_since = None;
|
||||||
|
return Ok(latest);
|
||||||
|
}
|
||||||
|
// No new frame AND the stream has left `Streaming` (Paused/Unconnected/Error). The source
|
||||||
|
// went away — a compositor torn down on a Gaming↔Desktop switch, a removed virtual output.
|
||||||
|
// Grace a brief window (a transient mid-stream renegotiation can blip out of Streaming and
|
||||||
|
// back) before declaring it lost so the encode loop rebuilds in place rather than freezing
|
||||||
|
// on the last frame forever.
|
||||||
|
const STALL_GRACE: Duration = Duration::from_millis(1500);
|
||||||
|
let since = *self.stall_since.get_or_insert_with(std::time::Instant::now);
|
||||||
|
if since.elapsed() >= STALL_GRACE {
|
||||||
|
self.stall_since = None;
|
||||||
|
return Err(anyhow!(
|
||||||
|
"PipeWire source stalled (node {}): stream left Streaming for >{}ms with no frames \
|
||||||
|
— the compositor/virtual output went away (session switch?)",
|
||||||
|
self.node_id,
|
||||||
|
STALL_GRACE.as_millis()
|
||||||
|
));
|
||||||
|
}
|
||||||
Ok(latest)
|
Ok(latest)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -467,6 +503,10 @@ mod pipewire {
|
|||||||
/// Set once a video format is agreed (`param_changed`), so a first-frame timeout can tell
|
/// Set once a video format is agreed (`param_changed`), so a first-frame timeout can tell
|
||||||
/// "format never negotiated" apart from "negotiated but no buffers arrived".
|
/// "format never negotiated" apart from "negotiated but no buffers arrived".
|
||||||
negotiated: Arc<AtomicBool>,
|
negotiated: Arc<AtomicBool>,
|
||||||
|
/// True only while the PipeWire stream is in `Streaming` (the source is alive). Goes false on
|
||||||
|
/// `Paused`/`Unconnected`/`Error` — the source vanished (compositor torn down on a session
|
||||||
|
/// switch). Read by [`PortalCapturer::try_latest`] to surface a sustained drop as a loss.
|
||||||
|
streaming: Arc<AtomicBool>,
|
||||||
/// Present when zero-copy is enabled on NVIDIA: imports a dmabuf → CUDA device buffer.
|
/// Present when zero-copy is enabled on NVIDIA: imports a dmabuf → CUDA device buffer.
|
||||||
importer: Option<crate::zerocopy::EglImporter>,
|
importer: Option<crate::zerocopy::EglImporter>,
|
||||||
/// VAAPI zero-copy: hand the raw dmabuf to the encoder (which imports + GPU-CSCs it) instead
|
/// VAAPI zero-copy: hand the raw dmabuf to the encoder (which imports + GPU-CSCs it) instead
|
||||||
@@ -1056,6 +1096,7 @@ mod pipewire {
|
|||||||
tx: SyncSender<CapturedFrame>,
|
tx: SyncSender<CapturedFrame>,
|
||||||
active: Arc<AtomicBool>,
|
active: Arc<AtomicBool>,
|
||||||
negotiated: Arc<AtomicBool>,
|
negotiated: Arc<AtomicBool>,
|
||||||
|
streaming: Arc<AtomicBool>,
|
||||||
zerocopy: bool,
|
zerocopy: bool,
|
||||||
preferred: Option<(u32, u32, u32)>,
|
preferred: Option<(u32, u32, u32)>,
|
||||||
quit_rx: pw::channel::Receiver<()>,
|
quit_rx: pw::channel::Receiver<()>,
|
||||||
@@ -1150,6 +1191,7 @@ mod pipewire {
|
|||||||
tx,
|
tx,
|
||||||
active,
|
active,
|
||||||
negotiated,
|
negotiated,
|
||||||
|
streaming,
|
||||||
importer,
|
importer,
|
||||||
vaapi_passthrough,
|
vaapi_passthrough,
|
||||||
nv12: crate::zerocopy::nv12_enabled(),
|
nv12: crate::zerocopy::nv12_enabled(),
|
||||||
@@ -1174,8 +1216,17 @@ mod pipewire {
|
|||||||
|
|
||||||
let _listener = stream
|
let _listener = stream
|
||||||
.add_local_listener_with_user_data(data)
|
.add_local_listener_with_user_data(data)
|
||||||
.state_changed(|_stream, _ud, old, new| {
|
.state_changed(|_stream, ud, old, new| {
|
||||||
tracing::info!(?old, ?new, "pipewire stream state");
|
tracing::info!(?old, ?new, "pipewire stream state");
|
||||||
|
// Track whether the node is actively producing. A live source sits in `Streaming`
|
||||||
|
// (a static desktop just sends no buffers); anything else — `Paused`/`Unconnected`/
|
||||||
|
// `Error` — means the source went away (compositor died, virtual output removed on a
|
||||||
|
// Gaming↔Desktop switch). `try_latest` turns a sustained non-Streaming state into a
|
||||||
|
// capture-loss so the encode loop rebuilds instead of freezing on the last frame.
|
||||||
|
ud.streaming.store(
|
||||||
|
matches!(new, pw::stream::StreamState::Streaming),
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
})
|
})
|
||||||
.param_changed(|_stream, ud, id, param| {
|
.param_changed(|_stream, ud, id, param| {
|
||||||
let Some(param) = param else { return };
|
let Some(param) = param else { return };
|
||||||
|
|||||||
Reference in New Issue
Block a user