fix(capture/mutter): stale-frame flashes + stuck input after disconnect on GNOME
ci / web (push) Failing after 49s
apple / swift (push) Failing after 1m4s
ci / rust (push) Failing after 1m9s
ci / docs-site (push) Failing after 42s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 6s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 6s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
deb / build-publish (push) Successful in 2m58s
docker / deploy-docs (push) Successful in 17s
rpm / build-publish (push) Successful in 4m17s

Deep dive into the two GNOME-only host bugs (KWin/gamescope clean):

1. Stale-frame flashes (windows at old positions, typed text reverting):
   Mutter renders its virtual monitors DIRECTLY into the PipeWire buffer
   pool, and NVIDIA has no implicit dmabuf fencing — our zero-copy
   import raced the render and encoded each pool buffer's PREVIOUS
   contents. Fix, in order of preference:
   - Consumer-side PipeWire explicit sync (SPA_META_SyncTimeline): new
     drm_sync module (DRM timeline-syncobj wait/signal via raw ioctls,
     unit-tested incl. a live signal->wait round trip); announced
     post-format via update_params (the OBS pattern — at connect time
     the meta makes producers fail allocation, observed on KWin), with
     a blocks=3 Buffers filter so the producer's sync pod wins; acquire
     point awaited before any read (GPU import or CPU mmap), release
     point signaled on every path.
   - Where the producer can't do explicit sync (Mutter on NVIDIA today:
     no cogl sync_fd, "error alloc buffers"), a sticky fallback flips
     the capture to the synchronous CPU/shm path — Mutter's glReadPixels
     download orders against its render, so frames are correct by
     construction. First session pays one ~10 s probe+retry; later
     sessions go straight there. Validated live on home-worker-3
     (GNOME 50 + RTX 4090): clean fallback, 30 MB HEVC streamed.
   - Sync is only announced on Mutter sessions (new VirtualOutput.mutter
     tag): KWin+NVIDIA fails allocation when merely asked, and doesn't
     need it (verified unchanged: zero-copy CUDA import + 1.1 MB/10 s).
   PUNKTFUNK_EXPLICIT_SYNC=0 disables the probe outright.

2. Clicks wedged in the focused app after disconnect+reconnect: a client
   vanishing mid-press left keys/buttons latched in the compositor —
   Mutter keeps the destroyed EIS device's implicit grab and the focused
   app stops taking clicks until restarted. EiState now tracks held
   keys/buttons/touches (wire codes) and synthesizes releases through
   the normal inject path before the EIS connection goes away.

GNOME hosts on NVIDIA temporarily lose zero-copy (correctness over
throughput); the moment Mutter+driver gain working explicit sync, the
sync path engages automatically and zero-copy returns.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 00:34:42 +00:00
parent 2ebffe3457
commit 8531135bb7
10 changed files with 743 additions and 163 deletions
+434 -163
View File
@@ -79,7 +79,7 @@ impl PortalCapturer {
node_id,
"ScreenCast portal session started; connecting PipeWire"
);
Ok(spawn_pipewire(Some(fd), node_id, None)?.into_capturer(node_id, None))
Ok(spawn_pipewire(Some(fd), node_id, None, false)?.into_capturer(node_id, None))
}
/// Build a capturer from an already-created virtual output ([`crate::vdisplay::VirtualOutput`]):
@@ -93,7 +93,7 @@ impl PortalCapturer {
);
let node_id = vout.node_id;
Ok(
spawn_pipewire(vout.remote_fd, node_id, vout.preferred_mode)?
spawn_pipewire(vout.remote_fd, node_id, vout.preferred_mode, vout.mutter)?
.into_capturer(node_id, Some(vout.keepalive)),
)
}
@@ -133,6 +133,7 @@ fn spawn_pipewire(
fd: Option<OwnedFd>,
node_id: u32,
preferred: Option<(u32, u32, u32)>,
mutter: bool,
) -> Result<PwHandles> {
// Frames flow from the pipewire thread over a small bounded channel.
let (frame_tx, frame_rx) = sync_channel::<CapturedFrame>(8);
@@ -157,6 +158,7 @@ fn spawn_pipewire(
zerocopy,
preferred,
quit_rx,
mutter,
) {
tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed");
}
@@ -466,6 +468,66 @@ mod pipewire {
negotiated: Arc<AtomicBool>,
/// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer.
importer: Option<crate::zerocopy::EglImporter>,
/// Explicit-sync (SyncTimeline) syncobj ops; lazily opened on the first sync-carrying
/// buffer. `sync_tried` keeps a failed open from retrying per frame.
sync: Option<crate::drm_sync::DrmSync>,
sync_tried: bool,
/// Announce SyncTimeline in `param_changed` (post-format, the OBS pattern — at
/// connect time the meta acts as a hard filter and producers without support
/// fail buffer allocation outright; observed on KWin).
want_sync: bool,
/// Which Buffers pod shape to re-emit alongside the meta announcement.
want_dmabuf: bool,
/// Mutter virtual monitor: dmabufs MUST carry explicit sync (see MUTTER_SYNC_UNUSABLE).
mutter: bool,
}
/// Mutter renders directly into the pool, so its dmabufs are only safe with explicit
/// sync. Set when sync negotiation failed (unallocated buffers) or Mutter delivered
/// dmabufs without the SyncTimeline meta: the pipeline's first-frame-timeout retry
/// then rebuilds this capture on the synchronous CPU/shm path (Mutter downloads the
/// frame, which orders against its render) — slower, never stale.
static MUTTER_SYNC_UNUSABLE: AtomicBool = AtomicBool::new(false);
/// The explicit-sync points of one buffer: producers that render directly into the
/// pool (Mutter virtual monitors) attach a `SPA_META_SyncTimeline` plus two
/// `SPA_DATA_SyncObj` datas (acquire first, release second — PipeWire convention).
/// Reading before the acquire point fires shows the buffer's PREVIOUS contents on
/// drivers without implicit dmabuf fencing (NVIDIA) — the "stale frame flashes" bug.
struct SyncPoints {
acquire_fd: i32,
release_fd: i32,
acquire_point: u64,
release_point: u64,
}
/// Extract the sync points, if the producer attached them.
unsafe fn sync_points(b: *const spa::sys::spa_buffer) -> Option<SyncPoints> {
unsafe {
if b.is_null() {
return None;
}
let meta = spa::sys::spa_buffer_find_meta_data(
b,
spa::sys::SPA_META_SyncTimeline,
std::mem::size_of::<spa::sys::spa_meta_sync_timeline>(),
) as *const spa::sys::spa_meta_sync_timeline;
if meta.is_null() {
return None;
}
let datas = std::slice::from_raw_parts((*b).datas, (*b).n_datas as usize);
let mut objs = datas
.iter()
.filter(|d| d.type_ == spa::sys::SPA_DATA_SyncObj && d.fd >= 0);
let acquire = objs.next()?;
let release = objs.next()?;
Some(SyncPoints {
acquire_fd: acquire.fd as i32,
release_fd: release.fd as i32,
acquire_point: (*meta).acquire_point,
release_point: (*meta).release_point,
})
}
}
/// Log a frame-drop reason once per process (the process callback runs per frame; a stuck
@@ -511,6 +573,33 @@ mod pipewire {
}
}
/// Announce SyncTimeline (explicit sync) support: producers that render directly
/// into the buffer pool (Mutter virtual monitors) then attach per-buffer acquire /
/// release timeline points instead of relying on implicit dmabuf fencing — which
/// NVIDIA doesn't do. Producers without explicit-sync support simply ignore this.
fn build_sync_timeline_meta() -> Result<Vec<u8>> {
serialize_pod(pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamMeta.as_raw(),
id: pw::spa::param::ParamType::Meta.as_raw(),
properties: vec![
pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_META_type,
flags: pw::spa::pod::PropertyFlags::empty(),
value: pw::spa::pod::Value::Id(pw::spa::utils::Id(
pw::spa::sys::SPA_META_SyncTimeline,
)),
},
pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_META_size,
flags: pw::spa::pod::PropertyFlags::empty(),
value: pw::spa::pod::Value::Int(std::mem::size_of::<
pw::spa::sys::spa_meta_sync_timeline,
>() as i32),
},
],
})
}
fn serialize_pod(obj: pw::spa::pod::Object) -> Result<Vec<u8>> {
Ok(pw::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
@@ -650,31 +739,60 @@ mod pipewire {
/// without this bit the buffer-type intersection is empty and the link silently stalls in
/// "negotiating". A LINEAR dmabuf is mmap-able by MAP_BUFFERS, so the CPU de-pad copy works.
fn build_mappable_buffers() -> Result<Vec<u8>> {
let mask = (1i32 << pw::spa::sys::SPA_DATA_MemPtr)
| (1i32 << pw::spa::sys::SPA_DATA_MemFd)
| (1i32 << pw::spa::sys::SPA_DATA_DmaBuf);
serialize_pod(pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamBuffers.as_raw(),
id: pw::spa::param::ParamType::Buffers.as_raw(),
properties: vec![pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType,
flags: pw::spa::pod::PropertyFlags::empty(),
value: pw::spa::pod::Value::Int(
(1i32 << pw::spa::sys::SPA_DATA_MemPtr)
| (1i32 << pw::spa::sys::SPA_DATA_MemFd)
| (1i32 << pw::spa::sys::SPA_DATA_DmaBuf),
),
value: pw::spa::pod::Value::Int(mask),
}],
})
}
/// Build a Buffers param requesting dmabuf-only buffers.
fn build_dmabuf_buffers() -> Result<Vec<u8>> {
/// `sync` (Mutter explicit sync): the dataType must be a CHOICE_FLAGS holding ONLY
/// the DmaBuf bit — Mutter enables explicit sync only when the negotiated
/// buffer_types are exactly DmaBuf, and only its sync Buffers pod reserves the
/// blocks for the two SyncObj datas (the syncobjs are NOT a dataType bit). The
/// plain-Int form keeps the non-sync path byte-identical to what KWin/gamescope
/// already negotiate.
fn build_dmabuf_buffers(sync: bool) -> Result<Vec<u8>> {
let mask = 1i32 << pw::spa::sys::SPA_DATA_DmaBuf;
let value = if sync {
pw::spa::pod::Value::Choice(pw::spa::pod::ChoiceValue::Int(pw::spa::utils::Choice(
pw::spa::utils::ChoiceFlags::empty(),
pw::spa::utils::ChoiceEnum::Flags {
default: mask,
flags: vec![mask],
},
)))
} else {
pw::spa::pod::Value::Int(mask)
};
let mut properties = vec![pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType,
flags: pw::spa::pod::PropertyFlags::empty(),
value,
}];
if sync {
// Pin blocks to media + 2 SyncObj datas: the producer offers a sync pod
// (blocks=3) AND a non-sync fallback (blocks=1) — without this filter the
// fallback can win the intersection while the SyncTimeline meta still
// negotiates, and the producer then asserts on its own 1-block buffers.
properties.push(pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_BUFFERS_blocks,
flags: pw::spa::pod::PropertyFlags::empty(),
value: pw::spa::pod::Value::Int(3),
});
}
serialize_pod(pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamBuffers.as_raw(),
id: pw::spa::param::ParamType::Buffers.as_raw(),
properties: vec![pw::spa::pod::Property {
key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType,
flags: pw::spa::pod::PropertyFlags::empty(),
value: pw::spa::pod::Value::Int(1i32 << pw::spa::sys::SPA_DATA_DmaBuf),
}],
properties,
})
}
@@ -688,6 +806,7 @@ mod pipewire {
zerocopy: bool,
preferred: Option<(u32, u32, u32)>,
quit_rx: pw::channel::Receiver<()>,
mutter: bool,
) -> Result<()> {
crate::pwinit::ensure_init();
@@ -736,9 +855,26 @@ mod pipewire {
if importer.is_some() && !modifiers.contains(&0) {
modifiers.push(0); // DRM_FORMAT_MOD_LINEAR
}
let want_dmabuf = importer.is_some() && !modifiers.is_empty();
// Explicit sync (SyncTimeline meta + SyncObj datas), announced post-format in
// param_changed — only on Mutter: it is the one compositor that renders directly
// into the pool, and announcing the meta to producers whose syncobj path is
// broken makes them fail buffer allocation outright (observed on KWin + NVIDIA).
// PUNKTFUNK_EXPLICIT_SYNC=0 is the escape hatch.
let want_sync = mutter
&& !MUTTER_SYNC_UNUSABLE.load(Ordering::Relaxed)
&& std::env::var("PUNKTFUNK_EXPLICIT_SYNC").as_deref() != Ok("0");
// On Mutter, dmabufs without explicit sync are not safe to consume — fall back to
// the shm/CPU path (Mutter's synchronous download) when sync is unavailable.
let want_dmabuf = importer.is_some() && !modifiers.is_empty() && (!mutter || want_sync);
if zerocopy && !want_dmabuf {
tracing::warn!("zero-copy: no EGL-importable dmabuf modifiers — using CPU path");
if mutter && !want_sync {
tracing::warn!(
"Mutter without working explicit sync — using the synchronous CPU \
path (dmabuf capture would show stale frames on NVIDIA)"
);
} else {
tracing::warn!("zero-copy: no EGL-importable dmabuf modifiers — using CPU path");
}
} else if want_dmabuf {
tracing::info!(
count = modifiers.len(),
@@ -755,6 +891,11 @@ mod pipewire {
active,
negotiated,
importer,
sync: None,
sync_tried: false,
want_sync,
want_dmabuf,
mutter,
};
let stream = pw::stream::StreamBox::new(
@@ -775,10 +916,24 @@ mod pipewire {
let _listener = stream
.add_local_listener_with_user_data(data)
.state_changed(|_stream, _ud, old, new| {
.state_changed(|_stream, ud, old, new| {
tracing::info!(?old, ?new, "pipewire stream state");
// A sync-announced negotiation the producer cannot satisfy fails buffer
// allocation ("error alloc buffers") — no process callback will ever run,
// so flag it HERE and starve the first-frame timeout: the pipeline retry
// then rebuilds this capture on the synchronous CPU path.
if matches!(new, pw::stream::StreamState::Error(_))
&& ud.want_sync
&& !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed)
{
tracing::warn!(
"explicit-sync buffer negotiation failed (Mutter without \
DRM_CAP_SYNCOBJ_TIMELINE / cogl sync_fd, e.g. NVIDIA) — retrying \
this capture on the synchronous CPU path"
);
}
})
.param_changed(|_stream, ud, id, param| {
.param_changed(|stream, ud, id, param| {
let Some(param) = param else { return };
if id != pw::spa::param::ParamType::Format.as_raw() {
return;
@@ -812,165 +967,78 @@ mod pipewire {
"negotiated a pixel format the encoder cannot consume — frames will be skipped"
);
}
// Post-format renegotiation (the OBS pattern): announce SyncTimeline
// + a Buffers pod accepting the SyncObj datas. Producers without
// explicit-sync support ignore it here, instead of failing allocation
// as they do when it arrives at connect time.
if ud.want_sync && ud.want_dmabuf {
let update = (|| -> Result<()> {
let buffers = build_dmabuf_buffers(true)?;
let meta = build_sync_timeline_meta()?;
let mut pods = vec![
Pod::from_bytes(&buffers).context("buffers pod")?,
Pod::from_bytes(&meta).context("meta pod")?,
];
stream
.update_params(&mut pods)
.context("update_params(SyncTimeline)")?;
Ok(())
})();
if let Err(e) = update {
tracing::warn!(error = %format!("{e:#}"),
"explicit-sync param update failed — capturing without it");
}
}
}
})
.process(|stream, ud| {
// PipeWire dispatches this from a C trampoline with no catch_unwind; a
// panic crossing that FFI boundary would abort the whole host. Contain it.
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
// No active stream: release the buffer without the (expensive at 5K) de-pad.
if !ud.active.load(Ordering::Relaxed) {
// Raw dequeue: the safe Buffer wrapper hides the spa_buffer metas that
// carry the explicit-sync timeline points.
let raw = unsafe { stream.dequeue_raw_buffer() };
if raw.is_null() {
return;
}
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let sz = ud.info.size();
let (w, h) = (sz.width as usize, sz.height as usize);
if w == 0 || h == 0 {
return; // format not negotiated yet
}
// Zero-copy path: if the buffer is a dmabuf and we have an importer, import it
// into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall
// through to the shm de-pad copy below.
let mut gpu_import_broken = false;
if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) {
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
let plane = crate::zerocopy::DmabufPlane {
fd: datas[0].fd(),
offset: datas[0].chunk().offset(),
stride: datas[0].chunk().stride().max(0) as u32,
};
// Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g.
// gamescope) → direct CUDA external-memory import (NVIDIA EGL can't
// sample LINEAR).
let modifier = (ud.modifier != 0).then_some(ud.modifier);
if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) {
let imported = if modifier.is_some() {
importer.import(&plane, w as u32, h as u32, fourcc, modifier)
} else {
importer.import_linear(&plane, w as u32, h as u32)
};
match imported {
Ok(devbuf) => {
static ONCE: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
if ONCE.swap(false, Ordering::Relaxed) {
tracing::info!(w, h, modifier = ud.modifier,
"zero-copy: dmabuf imported to CUDA (no CPU copy)");
}
let pts_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let _ = ud.tx.try_send(CapturedFrame {
width: w as u32,
height: h as u32,
pts_ns,
format: fmt,
payload: FramePayload::Cuda(devbuf),
});
return;
}
Err(e) => {
// GPU import unavailable for this buffer kind (e.g. the
// driver rejects LINEAR external-memory import). Disable
// the importer and fall through to the CPU mmap path —
// degraded, not dead.
tracing::warn!(error = %format!("{e:#}"),
"dmabuf GPU import failed — falling back to the CPU copy path");
gpu_import_broken = true;
}
let spa_buf = unsafe { (*raw).buffer };
let sync = unsafe { sync_points(spa_buf) };
if let Some(s) = &sync {
if !ud.sync_tried {
ud.sync_tried = true;
match crate::drm_sync::DrmSync::open() {
Ok(d) => {
tracing::info!(
"pipewire explicit sync active (SyncTimeline — producer \
renders are awaited before encode)"
);
ud.sync = Some(d);
}
} else {
return; // format has no DRM fourcc mapping — skip the frame
Err(e) => tracing::warn!(
error = %format!("{e:#}"),
"explicit sync offered but syncobj ops unavailable — reading \
unsynchronized"
),
}
}
// Wait for the producer's render to land before ANY read (GPU import
// or CPU mmap). A bounded wait: a wedged producer must not stall the
// PipeWire loop.
if let Some(drm) = &ud.sync {
if drm.wait_point(s.acquire_fd, s.acquire_point, 100).is_err() {
warn_once("explicit-sync acquire wait failed — frame may be stale");
}
}
}
if gpu_import_broken {
ud.importer = None;
}
let d = &mut datas[0];
// CPU path may also receive LINEAR dmabufs (gamescope offers only those once its
// modifier-bearing format pod wins); capture the fd before `data()` borrows `d`.
let dmabuf_fd =
(d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd());
let (size, offset, stride) = {
let c = d.chunk();
(
c.size() as usize,
c.offset() as usize,
c.stride().max(0) as usize,
)
};
let Some(fmt) = ud.format else { return }; // unsupported/not negotiated
let bpp = fmt.bytes_per_pixel();
let row = w * bpp;
let stride = if stride == 0 { row } else { stride };
if stride < row {
warn_once("chunk stride < row — frames dropped");
return;
}
let needed = stride * (h - 1) + row;
// dmabuf chunks commonly report size 0; fall back to the computed span.
let size = if size == 0 { needed } else { size };
// MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs
// (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read.
let _mapping; // keeps a manual mmap alive for the copy below
let buf: &[u8] = if let Some(data) = d.data() {
data
} else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) {
match DmabufMap::new(fd, offset + needed) {
Some(m) => {
_mapping = m;
unsafe {
std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len)
}
}
None => {
warn_once("mmap(dmabuf) failed — frames dropped");
return;
}
consume_frame(ud, spa_buf, sync.is_some());
// The producer reuses the buffer only after the release point fires —
// signal it on EVERY path, even skipped frames, or the producer stalls.
if let (Some(s), Some(drm)) = (&sync, &ud.sync) {
if drm.signal_point(s.release_fd, s.release_point).is_err() {
warn_once("explicit-sync release signal failed — producer may stall");
}
} else {
warn_once("buffer has no mappable data — frames dropped");
return;
};
// Need stride*(h-1)+row valid bytes within [offset, offset+size).
if offset > buf.len() {
return;
}
let avail = buf.len() - offset;
if needed > avail || needed > size {
warn_once("buffer smaller than frame span — frames dropped");
return;
}
let region = &buf[offset..offset + size.min(avail)];
// De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp).
let mut tight = vec![0u8; row * h];
for y in 0..h {
tight[y * row..y * row + row]
.copy_from_slice(&region[y * stride..y * stride + row]);
}
let pts_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let frame = CapturedFrame {
width: w as u32,
height: h as u32,
pts_ns,
format: fmt,
payload: FramePayload::Cpu(tight),
};
// Drop if the encoder is behind — never block the pipewire loop.
let _ = ud.tx.try_send(frame);
unsafe { stream.queue_raw_buffer(raw) };
}));
if outcome.is_err() {
tracing::error!("panic in pipewire process callback — frame dropped");
@@ -1031,10 +1099,18 @@ mod pipewire {
// `param_changed` (the two-step DMA-BUF handshake). Otherwise offer the multi-format shm
// pod and let MAP_BUFFERS map it.
let shm_values = serialize_pod(obj)?;
// The SyncTimeline announcement itself happens post-format in param_changed (the
// OBS pattern); at connect time we only declare the data types we accept.
let (dmabuf_values, buffers_values) = if want_dmabuf {
(
Some(build_dmabuf_format(&modifiers, preferred)?),
Some(build_dmabuf_buffers()?),
// Sync path: NO Buffers pod at connect — buffers must not be allocated
// until the SyncTimeline meta is on the table (param_changed), or the
// producer's syncobj setup meets already-allocated 1-block buffers
// (observed: gnome-shell "n_datas >= SYNCOBJ_MINIMUM_N_DATAS" assertions).
(!want_sync)
.then(|| build_dmabuf_buffers(false))
.transpose()?,
)
} else {
// CPU path still accepts mappable dmabufs (gamescope offers only those once its
@@ -1068,4 +1144,199 @@ mod pipewire {
mainloop.run();
Ok(())
}
/// Consume one dequeued buffer: GPU-import or de-pad-copy `datas[0]` and hand the
/// frame to the encoder. Extracted from the process callback so the explicit-sync
/// release point can be signaled on every early return.
fn consume_frame(ud: &mut UserData, spa_buf: *mut spa::sys::spa_buffer, has_sync: bool) {
// No active stream: release the buffer without the (expensive at 5K) de-pad.
if !ud.active.load(Ordering::Relaxed) {
return;
}
let datas: &mut [pw::spa::buffer::Data] = unsafe {
if spa_buf.is_null() || (*spa_buf).n_datas == 0 || (*spa_buf).datas.is_null() {
&mut []
} else {
// Same transparent cast libspa's Buffer::datas_mut performs.
std::slice::from_raw_parts_mut(
(*spa_buf).datas as *mut pw::spa::buffer::Data,
(*spa_buf).n_datas as usize,
)
}
};
if datas.is_empty() {
return;
}
// A sync-announced negotiation that the producer could not complete leaves the
// buffers unallocated (type SPA_ID_INVALID, fd -1). Disable sync and starve the
// first-frame timeout so the pipeline retries on the safe path.
if datas[0].as_raw().type_ == u32::MAX
/* SPA_ID_INVALID */
{
if ud.want_sync && !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed) {
tracing::error!(
"explicit-sync negotiation produced unallocated buffers — retrying \
this capture on the synchronous CPU path"
);
} else {
warn_once("buffer arrived unallocated — frames dropped");
}
return;
}
if ud.mutter && !has_sync && datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
// Mutter renders straight into the pool; without explicit sync the encode
// races the render on NVIDIA and flashes the buffer's previous contents.
// Don't consume these — starve the first-frame timeout so the pipeline
// retries on the synchronous CPU path.
if !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed) {
tracing::error!(
"Mutter delivered dmabufs without explicit sync — refusing them \
(stale-frame corruption); retrying on the synchronous CPU path"
);
}
return;
}
let sz = ud.info.size();
let (w, h) = (sz.width as usize, sz.height as usize);
if w == 0 || h == 0 {
return; // format not negotiated yet
}
// Zero-copy path: if the buffer is a dmabuf and we have an importer, import it
// into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall
// through to the shm de-pad copy below.
let mut gpu_import_broken = false;
if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) {
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
let plane = crate::zerocopy::DmabufPlane {
fd: datas[0].fd(),
offset: datas[0].chunk().offset(),
stride: datas[0].chunk().stride().max(0) as u32,
};
// Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g.
// gamescope) → direct CUDA external-memory import (NVIDIA EGL can't
// sample LINEAR).
let modifier = (ud.modifier != 0).then_some(ud.modifier);
if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) {
let imported = if modifier.is_some() {
importer.import(&plane, w as u32, h as u32, fourcc, modifier)
} else {
importer.import_linear(&plane, w as u32, h as u32)
};
match imported {
Ok(devbuf) => {
static ONCE: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
if ONCE.swap(false, Ordering::Relaxed) {
tracing::info!(
w,
h,
modifier = ud.modifier,
"zero-copy: dmabuf imported to CUDA (no CPU copy)"
);
}
let pts_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let _ = ud.tx.try_send(CapturedFrame {
width: w as u32,
height: h as u32,
pts_ns,
format: fmt,
payload: FramePayload::Cuda(devbuf),
});
return;
}
Err(e) => {
// GPU import unavailable for this buffer kind (e.g. the
// driver rejects LINEAR external-memory import). Disable
// the importer and fall through to the CPU mmap path —
// degraded, not dead.
tracing::warn!(error = %format!("{e:#}"),
"dmabuf GPU import failed — falling back to the CPU copy path");
gpu_import_broken = true;
}
}
} else {
return; // format has no DRM fourcc mapping — skip the frame
}
}
}
if gpu_import_broken {
ud.importer = None;
}
let d = &mut datas[0];
// CPU path may also receive LINEAR dmabufs (gamescope offers only those once its
// modifier-bearing format pod wins); capture the fd before `data()` borrows `d`.
let dmabuf_fd = (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd());
let (size, offset, stride) = {
let c = d.chunk();
(
c.size() as usize,
c.offset() as usize,
c.stride().max(0) as usize,
)
};
let Some(fmt) = ud.format else { return }; // unsupported/not negotiated
let bpp = fmt.bytes_per_pixel();
let row = w * bpp;
let stride = if stride == 0 { row } else { stride };
if stride < row {
warn_once("chunk stride < row — frames dropped");
return;
}
let needed = stride * (h - 1) + row;
// dmabuf chunks commonly report size 0; fall back to the computed span.
let size = if size == 0 { needed } else { size };
// MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs
// (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read.
let _mapping; // keeps a manual mmap alive for the copy below
let buf: &[u8] = if let Some(data) = d.data() {
data
} else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) {
match DmabufMap::new(fd, offset + needed) {
Some(m) => {
_mapping = m;
unsafe { std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) }
}
None => {
warn_once("mmap(dmabuf) failed — frames dropped");
return;
}
}
} else {
warn_once("buffer has no mappable data — frames dropped");
return;
};
// Need stride*(h-1)+row valid bytes within [offset, offset+size).
if offset > buf.len() {
return;
}
let avail = buf.len() - offset;
if needed > avail || needed > size {
warn_once("buffer smaller than frame span — frames dropped");
return;
}
let region = &buf[offset..offset + size.min(avail)];
// De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp).
let mut tight = vec![0u8; row * h];
for y in 0..h {
tight[y * row..y * row + row].copy_from_slice(&region[y * stride..y * stride + row]);
}
let pts_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let frame = CapturedFrame {
width: w as u32,
height: h as u32,
pts_ns,
format: fmt,
payload: FramePayload::Cpu(tight),
};
// Drop if the encoder is behind — never block the pipewire loop.
let _ = ud.tx.try_send(frame);
}
}