From d8da12bbbd127315b929da2873b99ccbe8b74da0 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sat, 13 Jun 2026 11:15:01 +0000 Subject: [PATCH] fix(capture/mutter): latest-frame-only dequeue (the real GNOME flash fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deep research (OBS Studio's linux-pipewire, Mutter bug tracker) found the GNOME stale-frame flash is a buffer-RECYCLING race, not damage (Mutter sends whole frames, no SPA_META_VideoDamage) and not buffer count. OBS's proven fix is latest-frame-only dequeue: each process callback, drain ALL queued PipeWire buffers, requeue the older ones, and consume only the NEWEST — plus skip CORRUPTED buffers. Our code dequeued one buffer per callback (oldest-first) and the bounded channel dropped the NEWEST when full, so during Mutter's bursty delivery the encoder got stale frames → the flash. Switch the process callback to raw dequeue_raw_buffer + drain-to-newest (requeue older), extract the consume logic into consume_frame(spa_buf) sourcing datas via the transparent Data cast, skip SPA_META_HEADER_FLAG_CORRUPTED / CORRUPTED-chunk buffers (size-0 skip kept SHM-only so dmabuf isn't regressed), and remove the earlier content-hash drop heuristic (it couldn't tell stale re-deliveries from legit repeating content). Diagnostic logs drain depth + chunk/header flags. Reverts none of the FORCE_SHM / dmabuf_fence work. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-host/src/capture/linux.rs | 506 +++++++++++---------- 1 file changed, 277 insertions(+), 229 deletions(-) diff --git a/crates/punktfunk-host/src/capture/linux.rs b/crates/punktfunk-host/src/capture/linux.rs index 0f86bd8..3f66186 100644 --- a/crates/punktfunk-host/src/capture/linux.rs +++ b/crates/punktfunk-host/src/capture/linux.rs @@ -466,15 +466,8 @@ mod pipewire { negotiated: Arc, /// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer. importer: Option, - /// GNOME stale-frame filter state. Mutter re-delivers complete OLD pool buffers on the SHM - /// download path (no GPU sync prevents it on NVIDIA); a captured frame whose content equals - /// an EARLIER distinct frame (not the immediately-previous one — that's a normal duplicate) is - /// such a stale re-delivery and gets DROPPED so the encoder never emits the flash. `stale_seen` - /// counts drops; `keep_stale` (PUNKTFUNK_KEEP_STALE=1) turns the filter off for A/B testing. - stale_recent: std::collections::VecDeque, - stale_last: u64, - stale_seen: u64, - keep_stale: bool, + /// Rate-limit counter for the latest-frame-only diagnostic log (see `.process`). + dbg_log_n: u64, } /// Log a frame-drop reason once per process (the process callback runs per frame; a stuck @@ -710,6 +703,205 @@ mod pipewire { }) } + /// De-pad / import a single PipeWire buffer and push it to the encoder. Called from the + /// `.process` callback with the NEWEST drained buffer (latest-frame-only). `datas` is sourced + /// via the same transparent cast libspa's `Buffer::datas_mut` performs, so the safe `Data` + /// accessors (`.type_()`, `.chunk()`, `.data()`, `.fd()`, `.as_raw()`) keep working. + fn consume_frame(ud: &mut UserData, spa_buf: *mut spa::sys::spa_buffer) { + // No active stream: release the buffer without the (expensive at 5K) de-pad. + if !ud.active.load(Ordering::Relaxed) { + return; + } + let datas: &mut [pw::spa::buffer::Data] = unsafe { + if spa_buf.is_null() || (*spa_buf).n_datas == 0 || (*spa_buf).datas.is_null() { + &mut [] + } else { + std::slice::from_raw_parts_mut( + (*spa_buf).datas as *mut pw::spa::buffer::Data, + (*spa_buf).n_datas as usize, + ) + } + }; + if datas.is_empty() { + return; + } + let sz = ud.info.size(); + let (w, h) = (sz.width as usize, sz.height as usize); + if w == 0 || h == 0 { + return; // format not negotiated yet + } + + // Implicit-fence wait: Mutter renders into the dmabuf and hands it over at + // GPU-submit time; with no producer explicit sync (Mutter+NVIDIA can't) we snapshot + // the buffer's implicit fence and wait the producer's render before sampling — + // closing the stale/old-frame race on NVIDIA. No-op for shm buffers or drivers that + // attach no fence. Covers both the GPU import and the CPU mmap read below. + if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { + match crate::dmabuf_fence::wait_read_ready(datas[0].fd(), 100) { + Ok(waited) => { + static F1: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if F1.swap(false, Ordering::Relaxed) { + tracing::info!( + waited, + "dmabuf implicit-fence sync active (waited=true → driver fences \ + the render, race closed; false → no implicit fence, zero-copy \ + may still show stale frames)" + ); + } + } + Err(e) => { + static F2: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if F2.swap(false, Ordering::Relaxed) { + tracing::warn!( + error = %format!("{e}"), + "dmabuf EXPORT_SYNC_FILE failed — no implicit-fence sync; NVIDIA \ + zero-copy may show stale frames (no producer explicit sync)" + ); + } + } + } + } + + // Zero-copy path: if the buffer is a dmabuf and we have an importer, import it + // into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall + // through to the shm de-pad copy below. + let mut gpu_import_broken = false; + if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) { + if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { + let plane = crate::zerocopy::DmabufPlane { + fd: datas[0].fd(), + offset: datas[0].chunk().offset(), + stride: datas[0].chunk().stride().max(0) as u32, + }; + // Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g. + // gamescope) → direct CUDA external-memory import (NVIDIA EGL can't + // sample LINEAR). + let modifier = (ud.modifier != 0).then_some(ud.modifier); + if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) { + let imported = if modifier.is_some() { + importer.import(&plane, w as u32, h as u32, fourcc, modifier) + } else { + importer.import_linear(&plane, w as u32, h as u32) + }; + match imported { + Ok(devbuf) => { + static ONCE: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if ONCE.swap(false, Ordering::Relaxed) { + tracing::info!( + w, + h, + modifier = ud.modifier, + "zero-copy: dmabuf imported to CUDA (no CPU copy)" + ); + } + let pts_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let _ = ud.tx.try_send(CapturedFrame { + width: w as u32, + height: h as u32, + pts_ns, + format: fmt, + payload: FramePayload::Cuda(devbuf), + }); + return; + } + Err(e) => { + // GPU import unavailable for this buffer kind (e.g. the + // driver rejects LINEAR external-memory import). Disable + // the importer and fall through to the CPU mmap path — + // degraded, not dead. + tracing::warn!(error = %format!("{e:#}"), + "dmabuf GPU import failed — falling back to the CPU copy path"); + gpu_import_broken = true; + } + } + } else { + return; // format has no DRM fourcc mapping — skip the frame + } + } + } + if gpu_import_broken { + ud.importer = None; + } + + let d = &mut datas[0]; + // CPU path may also receive LINEAR dmabufs (gamescope offers only those once its + // modifier-bearing format pod wins); capture the fd before `data()` borrows `d`. + let dmabuf_fd = (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd()); + let (size, offset, stride) = { + let c = d.chunk(); + ( + c.size() as usize, + c.offset() as usize, + c.stride().max(0) as usize, + ) + }; + let Some(fmt) = ud.format else { return }; // unsupported/not negotiated + let bpp = fmt.bytes_per_pixel(); + let row = w * bpp; + let stride = if stride == 0 { row } else { stride }; + if stride < row { + warn_once("chunk stride < row — frames dropped"); + return; + } + let needed = stride * (h - 1) + row; + // dmabuf chunks commonly report size 0; fall back to the computed span. + let size = if size == 0 { needed } else { size }; + // MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs + // (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read. + let _mapping; // keeps a manual mmap alive for the copy below + let buf: &[u8] = if let Some(data) = d.data() { + data + } else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) { + match DmabufMap::new(fd, offset + needed) { + Some(m) => { + _mapping = m; + unsafe { std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) } + } + None => { + warn_once("mmap(dmabuf) failed — frames dropped"); + return; + } + } + } else { + warn_once("buffer has no mappable data — frames dropped"); + return; + }; + // Need stride*(h-1)+row valid bytes within [offset, offset+size). + if offset > buf.len() { + return; + } + let avail = buf.len() - offset; + if needed > avail || needed > size { + warn_once("buffer smaller than frame span — frames dropped"); + return; + } + let region = &buf[offset..offset + size.min(avail)]; + // De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp). + let mut tight = vec![0u8; row * h]; + for y in 0..h { + tight[y * row..y * row + row].copy_from_slice(®ion[y * stride..y * stride + row]); + } + let pts_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let frame = CapturedFrame { + width: w as u32, + height: h as u32, + pts_ns, + format: fmt, + payload: FramePayload::Cpu(tight), + }; + // Drop if the encoder is behind — never block the pipewire loop. + let _ = ud.tx.try_send(frame); + } + #[allow(clippy::too_many_arguments)] pub fn pipewire_thread( fd: Option, @@ -795,10 +987,7 @@ mod pipewire { active, negotiated, importer, - stale_recent: std::collections::VecDeque::new(), - stale_last: 0, - stale_seen: 0, - keep_stale: std::env::var_os("PUNKTFUNK_KEEP_STALE").is_some(), + dbg_log_n: 0, }; let stream = pw::stream::StreamBox::new( @@ -862,230 +1051,89 @@ mod pipewire { // PipeWire dispatches this from a C trampoline with no catch_unwind; a // panic crossing that FFI boundary would abort the whole host. Contain it. let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - let Some(mut buffer) = stream.dequeue_buffer() else { - return; - }; - // No active stream: release the buffer without the (expensive at 5K) de-pad. - if !ud.active.load(Ordering::Relaxed) { - return; - } - let datas = buffer.datas_mut(); - if datas.is_empty() { - return; - } - let sz = ud.info.size(); - let (w, h) = (sz.width as usize, sz.height as usize); - if w == 0 || h == 0 { - return; // format not negotiated yet - } - - // Implicit-fence wait: Mutter renders into the dmabuf and hands it over at - // GPU-submit time; with no producer explicit sync (Mutter+NVIDIA can't) we snapshot - // the buffer's implicit fence and wait the producer's render before sampling — - // closing the stale/old-frame race on NVIDIA. No-op for shm buffers or drivers that - // attach no fence. Covers both the GPU import and the CPU mmap read below. - if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { - match crate::dmabuf_fence::wait_read_ready(datas[0].fd(), 100) { - Ok(waited) => { - static F1: std::sync::atomic::AtomicBool = - std::sync::atomic::AtomicBool::new(true); - if F1.swap(false, Ordering::Relaxed) { - tracing::info!( - waited, - "dmabuf implicit-fence sync active (waited=true → driver fences \ - the render, race closed; false → no implicit fence, zero-copy \ - may still show stale frames)" - ); - } - } - Err(e) => { - static F2: std::sync::atomic::AtomicBool = - std::sync::atomic::AtomicBool::new(true); - if F2.swap(false, Ordering::Relaxed) { - tracing::warn!( - error = %format!("{e}"), - "dmabuf EXPORT_SYNC_FILE failed — no implicit-fence sync; NVIDIA \ - zero-copy may show stale frames (no producer explicit sync)" - ); - } - } + // Latest-frame-only (OBS pattern): Mutter delivers buffers in bursts and + // recycles its pool; an older queued buffer carries a STALE frame. Drain all + // queued buffers, requeue the older ones, keep only the newest. + let mut newest = unsafe { stream.dequeue_raw_buffer() }; + if newest.is_null() { + return; } - } + let mut drained = 1u32; + loop { + let next = unsafe { stream.dequeue_raw_buffer() }; + if next.is_null() { + break; + } + unsafe { stream.queue_raw_buffer(newest) }; + newest = next; + drained += 1; + } + let spa_buf = unsafe { (*newest).buffer }; - // Zero-copy path: if the buffer is a dmabuf and we have an importer, import it - // into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall - // through to the shm de-pad copy below. - let mut gpu_import_broken = false; - if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) { - if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { - let plane = crate::zerocopy::DmabufPlane { - fd: datas[0].fd(), - offset: datas[0].chunk().offset(), - stride: datas[0].chunk().stride().max(0) as u32, - }; - // Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g. - // gamescope) → direct CUDA external-memory import (NVIDIA EGL can't - // sample LINEAR). - let modifier = (ud.modifier != 0).then_some(ud.modifier); - if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) { - let imported = if modifier.is_some() { - importer.import(&plane, w as u32, h as u32, fourcc, modifier) - } else { - importer.import_linear(&plane, w as u32, h as u32) - }; - match imported { - Ok(devbuf) => { - static ONCE: std::sync::atomic::AtomicBool = - std::sync::atomic::AtomicBool::new(true); - if ONCE.swap(false, Ordering::Relaxed) { - tracing::info!(w, h, modifier = ud.modifier, - "zero-copy: dmabuf imported to CUDA (no CPU copy)"); - } - let pts_ns = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - let _ = ud.tx.try_send(CapturedFrame { - width: w as u32, - height: h as u32, - pts_ns, - format: fmt, - payload: FramePayload::Cuda(devbuf), - }); - return; - } - Err(e) => { - // GPU import unavailable for this buffer kind (e.g. the - // driver rejects LINEAR external-memory import). Disable - // the importer and fall through to the CPU mmap path — - // degraded, not dead. - tracing::warn!(error = %format!("{e:#}"), - "dmabuf GPU import failed — falling back to the CPU copy path"); - gpu_import_broken = true; - } - } + // Inspect the newest buffer's header + first chunk for the diagnostic and the + // CORRUPTED skip. SPA_META_Header is optional — `hdr` may be null. + let hdr = unsafe { + spa::sys::spa_buffer_find_meta_data( + spa_buf, + spa::sys::SPA_META_Header, + std::mem::size_of::(), + ) as *const spa::sys::spa_meta_header + }; + let hdr_flags = if hdr.is_null() { + 0u32 + } else { + unsafe { (*hdr).flags } + }; + // First data chunk's size + flags (used for the diagnostic + CORRUPTED check) + // and its data type (a dmabuf legitimately reports chunk size 0, so the size-0 + // stale skip only applies to mappable SHM buffers). + let (chunk_size, chunk_flags, is_dmabuf) = unsafe { + if !spa_buf.is_null() + && (*spa_buf).n_datas > 0 + && !(*spa_buf).datas.is_null() + && !(*(*spa_buf).datas).chunk.is_null() + { + let d0 = (*spa_buf).datas; + let c = (*d0).chunk; + let is_dmabuf = + (*d0).type_ == spa::sys::SPA_DATA_DmaBuf; + ((*c).size, (*c).flags, is_dmabuf) } else { - return; // format has no DRM fourcc mapping — skip the frame + (0u32, 0i32, false) } - } - } - if gpu_import_broken { - ud.importer = None; - } + }; - let d = &mut datas[0]; - // CPU path may also receive LINEAR dmabufs (gamescope offers only those once its - // modifier-bearing format pod wins); capture the fd before `data()` borrows `d`. - let dmabuf_fd = - (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd()); - let (size, offset, stride) = { - let c = d.chunk(); - ( - c.size() as usize, - c.offset() as usize, - c.stride().max(0) as usize, - ) - }; - let Some(fmt) = ud.format else { return }; // unsupported/not negotiated - let bpp = fmt.bytes_per_pixel(); - let row = w * bpp; - let stride = if stride == 0 { row } else { stride }; - if stride < row { - warn_once("chunk stride < row — frames dropped"); - return; - } - let needed = stride * (h - 1) + row; - // dmabuf chunks commonly report size 0; fall back to the computed span. - let size = if size == 0 { needed } else { size }; - // MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs - // (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read. - let _mapping; // keeps a manual mmap alive for the copy below - let buf: &[u8] = if let Some(data) = d.data() { - data - } else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) { - match DmabufMap::new(fd, offset + needed) { - Some(m) => { - _mapping = m; - unsafe { - std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) - } - } - None => { - warn_once("mmap(dmabuf) failed — frames dropped"); - return; - } - } - } else { - warn_once("buffer has no mappable data — frames dropped"); - return; - }; - // Need stride*(h-1)+row valid bytes within [offset, offset+size). - if offset > buf.len() { - return; - } - let avail = buf.len() - offset; - if needed > avail || needed > size { - warn_once("buffer smaller than frame span — frames dropped"); - return; - } - let region = &buf[offset..offset + size.min(avail)]; - // De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp). - let mut tight = vec![0u8; row * h]; - for y in 0..h { - tight[y * row..y * row + row] - .copy_from_slice(®ion[y * stride..y * stride + row]); - } - // GNOME stale-frame filter: Mutter re-delivers complete OLD pool buffers on the SHM - // download path (no NVIDIA fence prevents it). A frame whose sampled content equals an - // EARLIER distinct frame is a stale re-delivery — DROP it so the encoder never emits - // the flash (try_latest then re-sends the last good frame). Hashes a spatial sample; - // a real return-to-prior-state is rare and dropping it is harmless. KEEP_STALE=1 = off. - { - let s: &[u8] = &tight; - let step = (s.len() / 1024).max(1); - let mut hh: u64 = 0xcbf2_9ce4_8422_2325; - let mut i = 0; - while i < s.len() { - hh = (hh ^ s[i] as u64).wrapping_mul(0x0100_0000_01b3); - i += step; - } - // `stale_last` = the current (newest) frame's hash. A frame equal to it is a normal - // duplicate (deliver). A frame equal to an OLDER distinct frame in `stale_recent` - // is a stale re-delivery (drop). Anything else is new forward content (deliver). - if hh == ud.stale_last { - // duplicate of the current frame — deliver as usual - } else if ud.stale_recent.contains(&hh) { - ud.stale_seen += 1; - if ud.stale_seen.is_power_of_two() { - tracing::warn!( - dropped = ud.stale_seen, - "GNOME stale-frame dropped (capture reverted to an earlier frame)" + let corrupted = (hdr_flags & spa::sys::SPA_META_HEADER_FLAG_CORRUPTED) != 0 + || (chunk_flags & spa::sys::SPA_CHUNK_FLAG_CORRUPTED as i32) != 0; + + // Rate-limited diagnostic (~once/sec): drain depth + chunk size/flags + header + // flags. Tells us whether buffers queue up (latest-frame-only helps) and whether + // stale buffers are flagged CORRUPTED or report size 0. + if drained > 1 || chunk_flags != 0 || hdr_flags != 0 { + ud.dbg_log_n += 1; + if ud.dbg_log_n.is_power_of_two() || ud.dbg_log_n % 60 == 0 { + tracing::info!( + drained, + chunk_size, + chunk_flags, + hdr_flags, + corrupted, + n = ud.dbg_log_n, + "capture: latest-frame-only drain (diagnostic)" ); } - if !ud.keep_stale { - return; // drop the stale re-delivery — don't advance `stale_last` - } - } else { - ud.stale_recent.push_back(hh); - if ud.stale_recent.len() > 24 { - ud.stale_recent.pop_front(); - } - ud.stale_last = hh; } - } - let pts_ns = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - let frame = CapturedFrame { - width: w as u32, - height: h as u32, - pts_ns, - format: fmt, - payload: FramePayload::Cpu(tight), - }; - // Drop if the encoder is behind — never block the pipewire loop. - let _ = ud.tx.try_send(frame); + + // Skip a CORRUPTED stale buffer (or a size-0 SHM buffer — a dmabuf legitimately + // reports size 0) — requeue it and wait for the next. + if corrupted || (chunk_size == 0 && !is_dmabuf) { + warn_once("capture: skipped a CORRUPTED/size-0 buffer (stale frame)"); + unsafe { stream.queue_raw_buffer(newest) }; + return; + } + + consume_frame(ud, spa_buf); + unsafe { stream.queue_raw_buffer(newest) }; })); if outcome.is_err() { tracing::error!("panic in pipewire process callback — frame dropped");