feat(host): KDE-reliability phase 2 — pipeline retry, graceful capture teardown, refresh reconcile
Hardens the virtual-display → capture → encode bring-up against the transient failures that surfaced as black screens / wrong refresh on cold KDE sessions. - m3: build_pipeline_with_retry wraps the initial vd.create() + first-frame with bounded exponential backoff (4 attempts, 500ms→2s). is_permanent_build_error classifies config/version/missing-tool failures so they fail fast instead of burning the retry budget. Encoder + frame clock now pace to the *achieved* refresh reported in VirtualOutput::preferred_mode, not the requested rate. - capture/linux: PortalCapturer::Drop sends a pipewire channel quit and joins the thread, so a dropped/failed/retried capturer releases its PipeWire thread + EGL/ CUDA context promptly instead of leaking it to process exit. First-frame timeout now reports the node id and distinguishes "format never negotiated" from "negotiated but no buffers arrived" via a negotiated flag set in param_changed. - vdisplay/kwin: set_custom_refresh reads back the active mode from kscreen-doctor and returns the refresh KWin actually gave us (a rejected custom mode silently leaves the output at 60Hz); create() carries it into preferred_mode. - vdisplay/gamescope: find_gamescope_node requires the Video/Source object (the node.name=gamescope tag is on two objects; the other wedges the link); a version check warns on <3.16.22 (the PipeWire-1.6 capture-deadlock signature). Live-validated against headless KWin: 720p120 build with requested=120 achieved=120, zero-copy CUDA frames, and no per-session thread accumulation across back-to-back sessions. Tests: +3 unit (retry classifier, gamescope version parse); 49 host tests green, clippy/fmt clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,9 +11,11 @@
|
||||
//! frames leave the pipewire thread over a bounded channel. The authoritative frame size
|
||||
//! comes from the negotiated PipeWire format, not the portal's size hint.
|
||||
//!
|
||||
//! Cleanup note (M0): the two threads are detached and torn down at process exit. A
|
||||
//! graceful stop (pipewire `channel` quit + Session close) belongs with the M2 session
|
||||
//! lifecycle.
|
||||
//! Cleanup: the pipewire thread is stopped deterministically — [`PortalCapturer`]'s `Drop`
|
||||
//! sends a pipewire `channel` quit and joins the thread, so dropping a capturer (session end,
|
||||
//! or a retried/failed pipeline build) releases its EGL importer / CUDA context promptly
|
||||
//! instead of leaking it to process exit. The portal thread (when used) still parks on its zbus
|
||||
//! connection until process exit.
|
||||
|
||||
use super::{CapturedFrame, Capturer, FramePayload, PixelFormat};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
@@ -31,6 +33,19 @@ use std::time::Duration;
|
||||
pub struct PortalCapturer {
|
||||
frames: Receiver<CapturedFrame>,
|
||||
active: Arc<AtomicBool>,
|
||||
/// Set true once the PipeWire stream agrees a video format. Read in [`next_frame`]'s timeout
|
||||
/// branch to tell "format never negotiated" (modifier/format mismatch) apart from "negotiated
|
||||
/// but no buffers arrived" (compositor idle/unmapped) — the two black-screen root causes.
|
||||
negotiated: Arc<AtomicBool>,
|
||||
/// The PipeWire node this capturer consumes — surfaced in error messages for diagnosis.
|
||||
node_id: u32,
|
||||
/// Stops the PipeWire loop on teardown (sent in `Drop`). Without it a dropped or failed
|
||||
/// capturer leaks its PipeWire thread — and its EGL importer / CUDA context — because
|
||||
/// `mainloop.run()` otherwise blocks until process exit. `Option` so `Drop` can take it.
|
||||
quit: Option<::pipewire::channel::Sender<()>>,
|
||||
/// Joined in `Drop` (after `quit`) so teardown is synchronous: the importer/CUDA context is
|
||||
/// released before the next pipeline builds, not left racing it.
|
||||
join: Option<thread::JoinHandle<()>>,
|
||||
/// Owns the virtual output (if this capturer was built from one) — dropped when the capturer
|
||||
/// is, releasing the compositor-side output via the keepalive's own `Drop`. `None` for the
|
||||
/// portal source (its session ends with the portal thread's zbus connection).
|
||||
@@ -64,12 +79,7 @@ impl PortalCapturer {
|
||||
node_id,
|
||||
"ScreenCast portal session started; connecting PipeWire"
|
||||
);
|
||||
let (frames, active) = spawn_pipewire(Some(fd), node_id, None)?;
|
||||
Ok(PortalCapturer {
|
||||
frames,
|
||||
active,
|
||||
_keepalive: None,
|
||||
})
|
||||
Ok(spawn_pipewire(Some(fd), node_id, None)?.into_capturer(node_id, None))
|
||||
}
|
||||
|
||||
/// Build a capturer from an already-created virtual output ([`crate::vdisplay::VirtualOutput`]):
|
||||
@@ -81,40 +91,84 @@ impl PortalCapturer {
|
||||
node_id = vout.node_id,
|
||||
"connecting PipeWire to virtual output"
|
||||
);
|
||||
let (frames, active) = spawn_pipewire(vout.remote_fd, vout.node_id, vout.preferred_mode)?;
|
||||
Ok(PortalCapturer {
|
||||
frames,
|
||||
active,
|
||||
_keepalive: Some(vout.keepalive),
|
||||
})
|
||||
let node_id = vout.node_id;
|
||||
Ok(
|
||||
spawn_pipewire(vout.remote_fd, node_id, vout.preferred_mode)?
|
||||
.into_capturer(node_id, Some(vout.keepalive)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Live PipeWire-thread handles returned by [`spawn_pipewire`]: the frame channel, the
|
||||
/// activation flag the per-frame copy gates on, a "format negotiated" flag (timeout diagnostics),
|
||||
/// a quit sender that stops the loop, and the thread's join handle (synchronous teardown).
|
||||
struct PwHandles {
|
||||
frames: Receiver<CapturedFrame>,
|
||||
active: Arc<AtomicBool>,
|
||||
negotiated: Arc<AtomicBool>,
|
||||
quit: ::pipewire::channel::Sender<()>,
|
||||
join: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl PwHandles {
|
||||
/// Assemble a [`PortalCapturer`] around these handles. `node_id` is carried for diagnostics;
|
||||
/// `keepalive` owns the virtual output (drops after the PipeWire thread is joined).
|
||||
fn into_capturer(self, node_id: u32, keepalive: Option<Box<dyn Send>>) -> PortalCapturer {
|
||||
PortalCapturer {
|
||||
frames: self.frames,
|
||||
active: self.active,
|
||||
negotiated: self.negotiated,
|
||||
node_id,
|
||||
quit: Some(self.quit),
|
||||
join: Some(self.join),
|
||||
_keepalive: keepalive,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the PipeWire consumer thread for `node_id` (fd `Some` = portal remote, `None` =
|
||||
/// default daemon) and return the frame channel + the activation flag it gates on.
|
||||
/// `preferred` seeds the format negotiation's default size/framerate — for Mutter virtual
|
||||
/// monitors this is what actually sizes the monitor.
|
||||
/// default daemon) and return its [`PwHandles`]. `preferred` seeds the format negotiation's
|
||||
/// default size/framerate — for Mutter virtual monitors this is what actually sizes the monitor.
|
||||
fn spawn_pipewire(
|
||||
fd: Option<OwnedFd>,
|
||||
node_id: u32,
|
||||
preferred: Option<(u32, u32, u32)>,
|
||||
) -> Result<(Receiver<CapturedFrame>, Arc<AtomicBool>)> {
|
||||
) -> Result<PwHandles> {
|
||||
// Frames flow from the pipewire thread over a small bounded channel.
|
||||
let (frame_tx, frame_rx) = sync_channel::<CapturedFrame>(8);
|
||||
let active = Arc::new(AtomicBool::new(false));
|
||||
let active_cb = active.clone();
|
||||
let negotiated = Arc::new(AtomicBool::new(false));
|
||||
let negotiated_cb = negotiated.clone();
|
||||
// pipewire's own cross-thread channel: the receiver attaches to the loop and quits it; the
|
||||
// sender lives on the capturer and fires in its `Drop`. Absolute `::pipewire` path — the
|
||||
// inner `mod pipewire` shadows the crate name at this scope.
|
||||
let (quit_tx, quit_rx) = ::pipewire::channel::channel::<()>();
|
||||
let zerocopy = crate::zerocopy::enabled();
|
||||
thread::Builder::new()
|
||||
let join = thread::Builder::new()
|
||||
.name("punktfunk-pipewire".into())
|
||||
.spawn(move || {
|
||||
if let Err(e) =
|
||||
pipewire::pipewire_thread(fd, node_id, frame_tx, active_cb, zerocopy, preferred)
|
||||
{
|
||||
if let Err(e) = pipewire::pipewire_thread(
|
||||
fd,
|
||||
node_id,
|
||||
frame_tx,
|
||||
active_cb,
|
||||
negotiated_cb,
|
||||
zerocopy,
|
||||
preferred,
|
||||
quit_rx,
|
||||
) {
|
||||
tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed");
|
||||
}
|
||||
})
|
||||
.context("spawn pipewire thread")?;
|
||||
Ok((frame_rx, active))
|
||||
Ok(PwHandles {
|
||||
frames: frame_rx,
|
||||
active,
|
||||
negotiated,
|
||||
quit: quit_tx,
|
||||
join,
|
||||
})
|
||||
}
|
||||
|
||||
impl Capturer for PortalCapturer {
|
||||
@@ -122,8 +176,30 @@ impl Capturer for PortalCapturer {
|
||||
// First frame can lag behind format negotiation; later frames arrive at ~fps.
|
||||
match self.frames.recv_timeout(Duration::from_secs(10)) {
|
||||
Ok(frame) => Ok(frame),
|
||||
Err(RecvTimeoutError::Timeout) => Err(anyhow!("no PipeWire frame within 10s")),
|
||||
Err(RecvTimeoutError::Disconnected) => Err(anyhow!("PipeWire capture thread ended")),
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
// Split the two black-screen root causes apart so the operator gets a cause, not
|
||||
// just a symptom: did the format negotiate (compositor produced no buffers) or
|
||||
// not (no acceptable format / node never emitted a param)?
|
||||
if self.negotiated.load(Ordering::Relaxed) {
|
||||
Err(anyhow!(
|
||||
"no PipeWire frame within 10s (node {}): format negotiated but no buffers \
|
||||
arrived — the compositor produced no frames (virtual output idle/unmapped, \
|
||||
or capture never started)",
|
||||
self.node_id
|
||||
))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"no PipeWire frame within 10s (node {}): format negotiation never \
|
||||
completed — the compositor offered no format this consumer accepts \
|
||||
(pixel-format/modifier mismatch) or the node never emitted a Format param",
|
||||
self.node_id
|
||||
))
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => Err(anyhow!(
|
||||
"PipeWire capture thread ended before a frame (node {})",
|
||||
self.node_id
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,6 +224,22 @@ impl Capturer for PortalCapturer {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PortalCapturer {
|
||||
fn drop(&mut self) {
|
||||
// Stop the PipeWire loop and wait for the thread to unwind BEFORE the keepalive (virtual
|
||||
// output) drops: quit → join releases the EGL importer / CUDA context, then field-drop
|
||||
// order releases the output. Without this, `mainloop.run()` blocks until process exit, so
|
||||
// every dropped/failed capturer (e.g. a retried first-frame attempt) leaks a thread + GPU
|
||||
// context. `send` errors only if the thread already exited — then `join` returns at once.
|
||||
if let Some(quit) = self.quit.take() {
|
||||
let _ = quit.send(());
|
||||
}
|
||||
if let Some(join) = self.join.take() {
|
||||
let _ = join.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The portal handshake: connect ScreenCast, select a single monitor, start, open the
|
||||
/// PipeWire remote, hand the fd + node id back, then keep the session alive.
|
||||
fn portal_thread(setup_tx: std::sync::mpsc::Sender<Result<(OwnedFd, u32), String>>) {
|
||||
@@ -369,6 +461,9 @@ mod pipewire {
|
||||
tx: SyncSender<CapturedFrame>,
|
||||
/// When false (no active stream), skip the de-pad copy — the buffer is just released.
|
||||
active: Arc<AtomicBool>,
|
||||
/// Set once a video format is agreed (`param_changed`), so a first-frame timeout can tell
|
||||
/// "format never negotiated" apart from "negotiated but no buffers arrived".
|
||||
negotiated: Arc<AtomicBool>,
|
||||
/// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer.
|
||||
importer: Option<crate::zerocopy::EglImporter>,
|
||||
}
|
||||
@@ -583,17 +678,28 @@ mod pipewire {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn pipewire_thread(
|
||||
fd: Option<OwnedFd>,
|
||||
node_id: u32,
|
||||
tx: SyncSender<CapturedFrame>,
|
||||
active: Arc<AtomicBool>,
|
||||
negotiated: Arc<AtomicBool>,
|
||||
zerocopy: bool,
|
||||
preferred: Option<(u32, u32, u32)>,
|
||||
quit_rx: pw::channel::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
crate::pwinit::ensure_init();
|
||||
|
||||
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?;
|
||||
// A quit signal (capturer `Drop`) lands here on the loop thread and stops `run()` so the
|
||||
// thread unwinds instead of blocking to process exit. Hold the attachment for the loop's
|
||||
// life; the cloned loop handle is the one the callback quits.
|
||||
let quit_loop = mainloop.clone();
|
||||
let _quit_attach = quit_rx.attach(mainloop.loop_(), move |()| {
|
||||
tracing::debug!("pipewire: quit signal received — stopping capture loop");
|
||||
quit_loop.quit();
|
||||
});
|
||||
let context = pw::context::ContextRc::new(&mainloop, None).context("pw Context")?;
|
||||
// A portal source hands us an fd to a (sandboxed) PipeWire remote; the KWin
|
||||
// virtual-output source has no fd — its node lives on the user's default daemon.
|
||||
@@ -647,6 +753,7 @@ mod pipewire {
|
||||
modifier: 0,
|
||||
tx,
|
||||
active,
|
||||
negotiated,
|
||||
importer,
|
||||
};
|
||||
|
||||
@@ -687,6 +794,7 @@ mod pipewire {
|
||||
return;
|
||||
}
|
||||
if ud.info.parse(param).is_ok() {
|
||||
ud.negotiated.store(true, Ordering::Relaxed);
|
||||
let sz = ud.info.size();
|
||||
ud.format = map_format(ud.info.format());
|
||||
ud.modifier = ud.info.modifier();
|
||||
|
||||
Reference in New Issue
Block a user