fix(capture/mutter): latest-frame-only dequeue (the real GNOME flash fix)
ci / web (push) Failing after 39s
apple / swift (push) Successful in 1m18s
ci / rust (push) Successful in 1m22s
ci / docs-site (push) Failing after 44s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 5s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
deb / build-publish (push) Successful in 3m17s
docker / deploy-docs (push) Successful in 19s
rpm / build-publish (push) Successful in 4m42s
ci / web (push) Failing after 39s
apple / swift (push) Successful in 1m18s
ci / rust (push) Successful in 1m22s
ci / docs-site (push) Failing after 44s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 5s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
deb / build-publish (push) Successful in 3m17s
docker / deploy-docs (push) Successful in 19s
rpm / build-publish (push) Successful in 4m42s
Deep research (OBS Studio's linux-pipewire, Mutter bug tracker) found the GNOME stale-frame flash is a buffer-RECYCLING race, not damage (Mutter sends whole frames, no SPA_META_VideoDamage) and not buffer count. OBS's proven fix is latest-frame-only dequeue: each process callback, drain ALL queued PipeWire buffers, requeue the older ones, and consume only the NEWEST — plus skip CORRUPTED buffers. Our code dequeued one buffer per callback (oldest-first) and the bounded channel dropped the NEWEST when full, so during Mutter's bursty delivery the encoder got stale frames → the flash. Switch the process callback to raw dequeue_raw_buffer + drain-to-newest (requeue older), extract the consume logic into consume_frame(spa_buf) sourcing datas via the transparent Data cast, skip SPA_META_HEADER_FLAG_CORRUPTED / CORRUPTED-chunk buffers (size-0 skip kept SHM-only so dmabuf isn't regressed), and remove the earlier content-hash drop heuristic (it couldn't tell stale re-deliveries from legit repeating content). Diagnostic logs drain depth + chunk/header flags. Reverts none of the FORCE_SHM / dmabuf_fence work. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -466,15 +466,8 @@ mod pipewire {
|
||||
negotiated: Arc<AtomicBool>,
|
||||
/// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer.
|
||||
importer: Option<crate::zerocopy::EglImporter>,
|
||||
/// GNOME stale-frame filter state. Mutter re-delivers complete OLD pool buffers on the SHM
|
||||
/// download path (no GPU sync prevents it on NVIDIA); a captured frame whose content equals
|
||||
/// an EARLIER distinct frame (not the immediately-previous one — that's a normal duplicate) is
|
||||
/// such a stale re-delivery and gets DROPPED so the encoder never emits the flash. `stale_seen`
|
||||
/// counts drops; `keep_stale` (PUNKTFUNK_KEEP_STALE=1) turns the filter off for A/B testing.
|
||||
stale_recent: std::collections::VecDeque<u64>,
|
||||
stale_last: u64,
|
||||
stale_seen: u64,
|
||||
keep_stale: bool,
|
||||
/// Rate-limit counter for the latest-frame-only diagnostic log (see `.process`).
|
||||
dbg_log_n: u64,
|
||||
}
|
||||
|
||||
/// Log a frame-drop reason once per process (the process callback runs per frame; a stuck
|
||||
@@ -710,6 +703,205 @@ mod pipewire {
|
||||
})
|
||||
}
|
||||
|
||||
/// De-pad / import a single PipeWire buffer and push it to the encoder. Called from the
|
||||
/// `.process` callback with the NEWEST drained buffer (latest-frame-only). `datas` is sourced
|
||||
/// via the same transparent cast libspa's `Buffer::datas_mut` performs, so the safe `Data`
|
||||
/// accessors (`.type_()`, `.chunk()`, `.data()`, `.fd()`, `.as_raw()`) keep working.
|
||||
fn consume_frame(ud: &mut UserData, spa_buf: *mut spa::sys::spa_buffer) {
|
||||
// No active stream: release the buffer without the (expensive at 5K) de-pad.
|
||||
if !ud.active.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let datas: &mut [pw::spa::buffer::Data] = unsafe {
|
||||
if spa_buf.is_null() || (*spa_buf).n_datas == 0 || (*spa_buf).datas.is_null() {
|
||||
&mut []
|
||||
} else {
|
||||
std::slice::from_raw_parts_mut(
|
||||
(*spa_buf).datas as *mut pw::spa::buffer::Data,
|
||||
(*spa_buf).n_datas as usize,
|
||||
)
|
||||
}
|
||||
};
|
||||
if datas.is_empty() {
|
||||
return;
|
||||
}
|
||||
let sz = ud.info.size();
|
||||
let (w, h) = (sz.width as usize, sz.height as usize);
|
||||
if w == 0 || h == 0 {
|
||||
return; // format not negotiated yet
|
||||
}
|
||||
|
||||
// Implicit-fence wait: Mutter renders into the dmabuf and hands it over at
|
||||
// GPU-submit time; with no producer explicit sync (Mutter+NVIDIA can't) we snapshot
|
||||
// the buffer's implicit fence and wait the producer's render before sampling —
|
||||
// closing the stale/old-frame race on NVIDIA. No-op for shm buffers or drivers that
|
||||
// attach no fence. Covers both the GPU import and the CPU mmap read below.
|
||||
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
|
||||
match crate::dmabuf_fence::wait_read_ready(datas[0].fd(), 100) {
|
||||
Ok(waited) => {
|
||||
static F1: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if F1.swap(false, Ordering::Relaxed) {
|
||||
tracing::info!(
|
||||
waited,
|
||||
"dmabuf implicit-fence sync active (waited=true → driver fences \
|
||||
the render, race closed; false → no implicit fence, zero-copy \
|
||||
may still show stale frames)"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
static F2: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if F2.swap(false, Ordering::Relaxed) {
|
||||
tracing::warn!(
|
||||
error = %format!("{e}"),
|
||||
"dmabuf EXPORT_SYNC_FILE failed — no implicit-fence sync; NVIDIA \
|
||||
zero-copy may show stale frames (no producer explicit sync)"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Zero-copy path: if the buffer is a dmabuf and we have an importer, import it
|
||||
// into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall
|
||||
// through to the shm de-pad copy below.
|
||||
let mut gpu_import_broken = false;
|
||||
if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) {
|
||||
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
|
||||
let plane = crate::zerocopy::DmabufPlane {
|
||||
fd: datas[0].fd(),
|
||||
offset: datas[0].chunk().offset(),
|
||||
stride: datas[0].chunk().stride().max(0) as u32,
|
||||
};
|
||||
// Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g.
|
||||
// gamescope) → direct CUDA external-memory import (NVIDIA EGL can't
|
||||
// sample LINEAR).
|
||||
let modifier = (ud.modifier != 0).then_some(ud.modifier);
|
||||
if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) {
|
||||
let imported = if modifier.is_some() {
|
||||
importer.import(&plane, w as u32, h as u32, fourcc, modifier)
|
||||
} else {
|
||||
importer.import_linear(&plane, w as u32, h as u32)
|
||||
};
|
||||
match imported {
|
||||
Ok(devbuf) => {
|
||||
static ONCE: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if ONCE.swap(false, Ordering::Relaxed) {
|
||||
tracing::info!(
|
||||
w,
|
||||
h,
|
||||
modifier = ud.modifier,
|
||||
"zero-copy: dmabuf imported to CUDA (no CPU copy)"
|
||||
);
|
||||
}
|
||||
let pts_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let _ = ud.tx.try_send(CapturedFrame {
|
||||
width: w as u32,
|
||||
height: h as u32,
|
||||
pts_ns,
|
||||
format: fmt,
|
||||
payload: FramePayload::Cuda(devbuf),
|
||||
});
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
// GPU import unavailable for this buffer kind (e.g. the
|
||||
// driver rejects LINEAR external-memory import). Disable
|
||||
// the importer and fall through to the CPU mmap path —
|
||||
// degraded, not dead.
|
||||
tracing::warn!(error = %format!("{e:#}"),
|
||||
"dmabuf GPU import failed — falling back to the CPU copy path");
|
||||
gpu_import_broken = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return; // format has no DRM fourcc mapping — skip the frame
|
||||
}
|
||||
}
|
||||
}
|
||||
if gpu_import_broken {
|
||||
ud.importer = None;
|
||||
}
|
||||
|
||||
let d = &mut datas[0];
|
||||
// CPU path may also receive LINEAR dmabufs (gamescope offers only those once its
|
||||
// modifier-bearing format pod wins); capture the fd before `data()` borrows `d`.
|
||||
let dmabuf_fd = (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd());
|
||||
let (size, offset, stride) = {
|
||||
let c = d.chunk();
|
||||
(
|
||||
c.size() as usize,
|
||||
c.offset() as usize,
|
||||
c.stride().max(0) as usize,
|
||||
)
|
||||
};
|
||||
let Some(fmt) = ud.format else { return }; // unsupported/not negotiated
|
||||
let bpp = fmt.bytes_per_pixel();
|
||||
let row = w * bpp;
|
||||
let stride = if stride == 0 { row } else { stride };
|
||||
if stride < row {
|
||||
warn_once("chunk stride < row — frames dropped");
|
||||
return;
|
||||
}
|
||||
let needed = stride * (h - 1) + row;
|
||||
// dmabuf chunks commonly report size 0; fall back to the computed span.
|
||||
let size = if size == 0 { needed } else { size };
|
||||
// MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs
|
||||
// (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read.
|
||||
let _mapping; // keeps a manual mmap alive for the copy below
|
||||
let buf: &[u8] = if let Some(data) = d.data() {
|
||||
data
|
||||
} else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) {
|
||||
match DmabufMap::new(fd, offset + needed) {
|
||||
Some(m) => {
|
||||
_mapping = m;
|
||||
unsafe { std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) }
|
||||
}
|
||||
None => {
|
||||
warn_once("mmap(dmabuf) failed — frames dropped");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn_once("buffer has no mappable data — frames dropped");
|
||||
return;
|
||||
};
|
||||
// Need stride*(h-1)+row valid bytes within [offset, offset+size).
|
||||
if offset > buf.len() {
|
||||
return;
|
||||
}
|
||||
let avail = buf.len() - offset;
|
||||
if needed > avail || needed > size {
|
||||
warn_once("buffer smaller than frame span — frames dropped");
|
||||
return;
|
||||
}
|
||||
let region = &buf[offset..offset + size.min(avail)];
|
||||
// De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp).
|
||||
let mut tight = vec![0u8; row * h];
|
||||
for y in 0..h {
|
||||
tight[y * row..y * row + row].copy_from_slice(®ion[y * stride..y * stride + row]);
|
||||
}
|
||||
let pts_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let frame = CapturedFrame {
|
||||
width: w as u32,
|
||||
height: h as u32,
|
||||
pts_ns,
|
||||
format: fmt,
|
||||
payload: FramePayload::Cpu(tight),
|
||||
};
|
||||
// Drop if the encoder is behind — never block the pipewire loop.
|
||||
let _ = ud.tx.try_send(frame);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn pipewire_thread(
|
||||
fd: Option<OwnedFd>,
|
||||
@@ -795,10 +987,7 @@ mod pipewire {
|
||||
active,
|
||||
negotiated,
|
||||
importer,
|
||||
stale_recent: std::collections::VecDeque::new(),
|
||||
stale_last: 0,
|
||||
stale_seen: 0,
|
||||
keep_stale: std::env::var_os("PUNKTFUNK_KEEP_STALE").is_some(),
|
||||
dbg_log_n: 0,
|
||||
};
|
||||
|
||||
let stream = pw::stream::StreamBox::new(
|
||||
@@ -862,230 +1051,89 @@ mod pipewire {
|
||||
// PipeWire dispatches this from a C trampoline with no catch_unwind; a
|
||||
// panic crossing that FFI boundary would abort the whole host. Contain it.
|
||||
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
// No active stream: release the buffer without the (expensive at 5K) de-pad.
|
||||
if !ud.active.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let datas = buffer.datas_mut();
|
||||
if datas.is_empty() {
|
||||
return;
|
||||
}
|
||||
let sz = ud.info.size();
|
||||
let (w, h) = (sz.width as usize, sz.height as usize);
|
||||
if w == 0 || h == 0 {
|
||||
return; // format not negotiated yet
|
||||
}
|
||||
|
||||
// Implicit-fence wait: Mutter renders into the dmabuf and hands it over at
|
||||
// GPU-submit time; with no producer explicit sync (Mutter+NVIDIA can't) we snapshot
|
||||
// the buffer's implicit fence and wait the producer's render before sampling —
|
||||
// closing the stale/old-frame race on NVIDIA. No-op for shm buffers or drivers that
|
||||
// attach no fence. Covers both the GPU import and the CPU mmap read below.
|
||||
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
|
||||
match crate::dmabuf_fence::wait_read_ready(datas[0].fd(), 100) {
|
||||
Ok(waited) => {
|
||||
static F1: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if F1.swap(false, Ordering::Relaxed) {
|
||||
tracing::info!(
|
||||
waited,
|
||||
"dmabuf implicit-fence sync active (waited=true → driver fences \
|
||||
the render, race closed; false → no implicit fence, zero-copy \
|
||||
may still show stale frames)"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
static F2: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if F2.swap(false, Ordering::Relaxed) {
|
||||
tracing::warn!(
|
||||
error = %format!("{e}"),
|
||||
"dmabuf EXPORT_SYNC_FILE failed — no implicit-fence sync; NVIDIA \
|
||||
zero-copy may show stale frames (no producer explicit sync)"
|
||||
);
|
||||
}
|
||||
}
|
||||
// Latest-frame-only (OBS pattern): Mutter delivers buffers in bursts and
|
||||
// recycles its pool; an older queued buffer carries a STALE frame. Drain all
|
||||
// queued buffers, requeue the older ones, keep only the newest.
|
||||
let mut newest = unsafe { stream.dequeue_raw_buffer() };
|
||||
if newest.is_null() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
let mut drained = 1u32;
|
||||
loop {
|
||||
let next = unsafe { stream.dequeue_raw_buffer() };
|
||||
if next.is_null() {
|
||||
break;
|
||||
}
|
||||
unsafe { stream.queue_raw_buffer(newest) };
|
||||
newest = next;
|
||||
drained += 1;
|
||||
}
|
||||
let spa_buf = unsafe { (*newest).buffer };
|
||||
|
||||
// Zero-copy path: if the buffer is a dmabuf and we have an importer, import it
|
||||
// into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall
|
||||
// through to the shm de-pad copy below.
|
||||
let mut gpu_import_broken = false;
|
||||
if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) {
|
||||
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
|
||||
let plane = crate::zerocopy::DmabufPlane {
|
||||
fd: datas[0].fd(),
|
||||
offset: datas[0].chunk().offset(),
|
||||
stride: datas[0].chunk().stride().max(0) as u32,
|
||||
};
|
||||
// Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g.
|
||||
// gamescope) → direct CUDA external-memory import (NVIDIA EGL can't
|
||||
// sample LINEAR).
|
||||
let modifier = (ud.modifier != 0).then_some(ud.modifier);
|
||||
if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) {
|
||||
let imported = if modifier.is_some() {
|
||||
importer.import(&plane, w as u32, h as u32, fourcc, modifier)
|
||||
} else {
|
||||
importer.import_linear(&plane, w as u32, h as u32)
|
||||
};
|
||||
match imported {
|
||||
Ok(devbuf) => {
|
||||
static ONCE: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if ONCE.swap(false, Ordering::Relaxed) {
|
||||
tracing::info!(w, h, modifier = ud.modifier,
|
||||
"zero-copy: dmabuf imported to CUDA (no CPU copy)");
|
||||
}
|
||||
let pts_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let _ = ud.tx.try_send(CapturedFrame {
|
||||
width: w as u32,
|
||||
height: h as u32,
|
||||
pts_ns,
|
||||
format: fmt,
|
||||
payload: FramePayload::Cuda(devbuf),
|
||||
});
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
// GPU import unavailable for this buffer kind (e.g. the
|
||||
// driver rejects LINEAR external-memory import). Disable
|
||||
// the importer and fall through to the CPU mmap path —
|
||||
// degraded, not dead.
|
||||
tracing::warn!(error = %format!("{e:#}"),
|
||||
"dmabuf GPU import failed — falling back to the CPU copy path");
|
||||
gpu_import_broken = true;
|
||||
}
|
||||
}
|
||||
// Inspect the newest buffer's header + first chunk for the diagnostic and the
|
||||
// CORRUPTED skip. SPA_META_Header is optional — `hdr` may be null.
|
||||
let hdr = unsafe {
|
||||
spa::sys::spa_buffer_find_meta_data(
|
||||
spa_buf,
|
||||
spa::sys::SPA_META_Header,
|
||||
std::mem::size_of::<spa::sys::spa_meta_header>(),
|
||||
) as *const spa::sys::spa_meta_header
|
||||
};
|
||||
let hdr_flags = if hdr.is_null() {
|
||||
0u32
|
||||
} else {
|
||||
unsafe { (*hdr).flags }
|
||||
};
|
||||
// First data chunk's size + flags (used for the diagnostic + CORRUPTED check)
|
||||
// and its data type (a dmabuf legitimately reports chunk size 0, so the size-0
|
||||
// stale skip only applies to mappable SHM buffers).
|
||||
let (chunk_size, chunk_flags, is_dmabuf) = unsafe {
|
||||
if !spa_buf.is_null()
|
||||
&& (*spa_buf).n_datas > 0
|
||||
&& !(*spa_buf).datas.is_null()
|
||||
&& !(*(*spa_buf).datas).chunk.is_null()
|
||||
{
|
||||
let d0 = (*spa_buf).datas;
|
||||
let c = (*d0).chunk;
|
||||
let is_dmabuf =
|
||||
(*d0).type_ == spa::sys::SPA_DATA_DmaBuf;
|
||||
((*c).size, (*c).flags, is_dmabuf)
|
||||
} else {
|
||||
return; // format has no DRM fourcc mapping — skip the frame
|
||||
(0u32, 0i32, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
if gpu_import_broken {
|
||||
ud.importer = None;
|
||||
}
|
||||
};
|
||||
|
||||
let d = &mut datas[0];
|
||||
// CPU path may also receive LINEAR dmabufs (gamescope offers only those once its
|
||||
// modifier-bearing format pod wins); capture the fd before `data()` borrows `d`.
|
||||
let dmabuf_fd =
|
||||
(d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd());
|
||||
let (size, offset, stride) = {
|
||||
let c = d.chunk();
|
||||
(
|
||||
c.size() as usize,
|
||||
c.offset() as usize,
|
||||
c.stride().max(0) as usize,
|
||||
)
|
||||
};
|
||||
let Some(fmt) = ud.format else { return }; // unsupported/not negotiated
|
||||
let bpp = fmt.bytes_per_pixel();
|
||||
let row = w * bpp;
|
||||
let stride = if stride == 0 { row } else { stride };
|
||||
if stride < row {
|
||||
warn_once("chunk stride < row — frames dropped");
|
||||
return;
|
||||
}
|
||||
let needed = stride * (h - 1) + row;
|
||||
// dmabuf chunks commonly report size 0; fall back to the computed span.
|
||||
let size = if size == 0 { needed } else { size };
|
||||
// MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs
|
||||
// (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read.
|
||||
let _mapping; // keeps a manual mmap alive for the copy below
|
||||
let buf: &[u8] = if let Some(data) = d.data() {
|
||||
data
|
||||
} else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) {
|
||||
match DmabufMap::new(fd, offset + needed) {
|
||||
Some(m) => {
|
||||
_mapping = m;
|
||||
unsafe {
|
||||
std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn_once("mmap(dmabuf) failed — frames dropped");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn_once("buffer has no mappable data — frames dropped");
|
||||
return;
|
||||
};
|
||||
// Need stride*(h-1)+row valid bytes within [offset, offset+size).
|
||||
if offset > buf.len() {
|
||||
return;
|
||||
}
|
||||
let avail = buf.len() - offset;
|
||||
if needed > avail || needed > size {
|
||||
warn_once("buffer smaller than frame span — frames dropped");
|
||||
return;
|
||||
}
|
||||
let region = &buf[offset..offset + size.min(avail)];
|
||||
// De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp).
|
||||
let mut tight = vec![0u8; row * h];
|
||||
for y in 0..h {
|
||||
tight[y * row..y * row + row]
|
||||
.copy_from_slice(®ion[y * stride..y * stride + row]);
|
||||
}
|
||||
// GNOME stale-frame filter: Mutter re-delivers complete OLD pool buffers on the SHM
|
||||
// download path (no NVIDIA fence prevents it). A frame whose sampled content equals an
|
||||
// EARLIER distinct frame is a stale re-delivery — DROP it so the encoder never emits
|
||||
// the flash (try_latest then re-sends the last good frame). Hashes a spatial sample;
|
||||
// a real return-to-prior-state is rare and dropping it is harmless. KEEP_STALE=1 = off.
|
||||
{
|
||||
let s: &[u8] = &tight;
|
||||
let step = (s.len() / 1024).max(1);
|
||||
let mut hh: u64 = 0xcbf2_9ce4_8422_2325;
|
||||
let mut i = 0;
|
||||
while i < s.len() {
|
||||
hh = (hh ^ s[i] as u64).wrapping_mul(0x0100_0000_01b3);
|
||||
i += step;
|
||||
}
|
||||
// `stale_last` = the current (newest) frame's hash. A frame equal to it is a normal
|
||||
// duplicate (deliver). A frame equal to an OLDER distinct frame in `stale_recent`
|
||||
// is a stale re-delivery (drop). Anything else is new forward content (deliver).
|
||||
if hh == ud.stale_last {
|
||||
// duplicate of the current frame — deliver as usual
|
||||
} else if ud.stale_recent.contains(&hh) {
|
||||
ud.stale_seen += 1;
|
||||
if ud.stale_seen.is_power_of_two() {
|
||||
tracing::warn!(
|
||||
dropped = ud.stale_seen,
|
||||
"GNOME stale-frame dropped (capture reverted to an earlier frame)"
|
||||
let corrupted = (hdr_flags & spa::sys::SPA_META_HEADER_FLAG_CORRUPTED) != 0
|
||||
|| (chunk_flags & spa::sys::SPA_CHUNK_FLAG_CORRUPTED as i32) != 0;
|
||||
|
||||
// Rate-limited diagnostic (~once/sec): drain depth + chunk size/flags + header
|
||||
// flags. Tells us whether buffers queue up (latest-frame-only helps) and whether
|
||||
// stale buffers are flagged CORRUPTED or report size 0.
|
||||
if drained > 1 || chunk_flags != 0 || hdr_flags != 0 {
|
||||
ud.dbg_log_n += 1;
|
||||
if ud.dbg_log_n.is_power_of_two() || ud.dbg_log_n % 60 == 0 {
|
||||
tracing::info!(
|
||||
drained,
|
||||
chunk_size,
|
||||
chunk_flags,
|
||||
hdr_flags,
|
||||
corrupted,
|
||||
n = ud.dbg_log_n,
|
||||
"capture: latest-frame-only drain (diagnostic)"
|
||||
);
|
||||
}
|
||||
if !ud.keep_stale {
|
||||
return; // drop the stale re-delivery — don't advance `stale_last`
|
||||
}
|
||||
} else {
|
||||
ud.stale_recent.push_back(hh);
|
||||
if ud.stale_recent.len() > 24 {
|
||||
ud.stale_recent.pop_front();
|
||||
}
|
||||
ud.stale_last = hh;
|
||||
}
|
||||
}
|
||||
let pts_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let frame = CapturedFrame {
|
||||
width: w as u32,
|
||||
height: h as u32,
|
||||
pts_ns,
|
||||
format: fmt,
|
||||
payload: FramePayload::Cpu(tight),
|
||||
};
|
||||
// Drop if the encoder is behind — never block the pipewire loop.
|
||||
let _ = ud.tx.try_send(frame);
|
||||
|
||||
// Skip a CORRUPTED stale buffer (or a size-0 SHM buffer — a dmabuf legitimately
|
||||
// reports size 0) — requeue it and wait for the next.
|
||||
if corrupted || (chunk_size == 0 && !is_dmabuf) {
|
||||
warn_once("capture: skipped a CORRUPTED/size-0 buffer (stale frame)");
|
||||
unsafe { stream.queue_raw_buffer(newest) };
|
||||
return;
|
||||
}
|
||||
|
||||
consume_frame(ud, spa_buf);
|
||||
unsafe { stream.queue_raw_buffer(newest) };
|
||||
}));
|
||||
if outcome.is_err() {
|
||||
tracing::error!("panic in pipewire process callback — frame dropped");
|
||||
|
||||
Reference in New Issue
Block a user