From 8531135bb749815c2d2b0749bb2f8b75d0619ba2 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Sat, 13 Jun 2026 00:34:42 +0000 Subject: [PATCH] fix(capture/mutter): stale-frame flashes + stuck input after disconnect on GNOME MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deep dive into the two GNOME-only host bugs (KWin/gamescope clean): 1. Stale-frame flashes (windows at old positions, typed text reverting): Mutter renders its virtual monitors DIRECTLY into the PipeWire buffer pool, and NVIDIA has no implicit dmabuf fencing — our zero-copy import raced the render and encoded each pool buffer's PREVIOUS contents. Fix, in order of preference: - Consumer-side PipeWire explicit sync (SPA_META_SyncTimeline): new drm_sync module (DRM timeline-syncobj wait/signal via raw ioctls, unit-tested incl. a live signal->wait round trip); announced post-format via update_params (the OBS pattern — at connect time the meta makes producers fail allocation, observed on KWin), with a blocks=3 Buffers filter so the producer's sync pod wins; acquire point awaited before any read (GPU import or CPU mmap), release point signaled on every path. - Where the producer can't do explicit sync (Mutter on NVIDIA today: no cogl sync_fd, "error alloc buffers"), a sticky fallback flips the capture to the synchronous CPU/shm path — Mutter's glReadPixels download orders against its render, so frames are correct by construction. First session pays one ~10 s probe+retry; later sessions go straight there. Validated live on home-worker-3 (GNOME 50 + RTX 4090): clean fallback, 30 MB HEVC streamed. - Sync is only announced on Mutter sessions (new VirtualOutput.mutter tag): KWin+NVIDIA fails allocation when merely asked, and doesn't need it (verified unchanged: zero-copy CUDA import + 1.1 MB/10 s). PUNKTFUNK_EXPLICIT_SYNC=0 disables the probe outright. 2. Clicks wedged in the focused app after disconnect+reconnect: a client vanishing mid-press left keys/buttons latched in the compositor — Mutter keeps the destroyed EIS device's implicit grab and the focused app stops taking clicks until restarted. EiState now tracks held keys/buttons/touches (wire codes) and synthesizes releases through the normal inject path before the EIS connection goes away. GNOME hosts on NVIDIA temporarily lose zero-copy (correctness over throughput); the moment Mutter+driver gain working explicit sync, the sync path engages automatically and zero-copy returns. Co-Authored-By: Claude Fable 5 --- crates/punktfunk-host/src/capture/linux.rs | 597 +++++++++++++----- crates/punktfunk-host/src/drm_sync.rs | 209 ++++++ crates/punktfunk-host/src/inject/libei.rs | 71 +++ crates/punktfunk-host/src/main.rs | 1 + crates/punktfunk-host/src/vdisplay.rs | 5 + .../punktfunk-host/src/vdisplay/gamescope.rs | 4 + crates/punktfunk-host/src/vdisplay/kwin.rs | 1 + crates/punktfunk-host/src/vdisplay/mutter.rs | 1 + crates/punktfunk-host/src/vdisplay/wlroots.rs | 1 + include/punktfunk_core.h | 16 + 10 files changed, 743 insertions(+), 163 deletions(-) create mode 100644 crates/punktfunk-host/src/drm_sync.rs diff --git a/crates/punktfunk-host/src/capture/linux.rs b/crates/punktfunk-host/src/capture/linux.rs index eddd053..c3068f8 100644 --- a/crates/punktfunk-host/src/capture/linux.rs +++ b/crates/punktfunk-host/src/capture/linux.rs @@ -79,7 +79,7 @@ impl PortalCapturer { node_id, "ScreenCast portal session started; connecting PipeWire" ); - Ok(spawn_pipewire(Some(fd), node_id, None)?.into_capturer(node_id, None)) + Ok(spawn_pipewire(Some(fd), node_id, None, false)?.into_capturer(node_id, None)) } /// Build a capturer from an already-created virtual output ([`crate::vdisplay::VirtualOutput`]): @@ -93,7 +93,7 @@ impl PortalCapturer { ); let node_id = vout.node_id; Ok( - spawn_pipewire(vout.remote_fd, node_id, vout.preferred_mode)? + spawn_pipewire(vout.remote_fd, node_id, vout.preferred_mode, vout.mutter)? .into_capturer(node_id, Some(vout.keepalive)), ) } @@ -133,6 +133,7 @@ fn spawn_pipewire( fd: Option, node_id: u32, preferred: Option<(u32, u32, u32)>, + mutter: bool, ) -> Result { // Frames flow from the pipewire thread over a small bounded channel. let (frame_tx, frame_rx) = sync_channel::(8); @@ -157,6 +158,7 @@ fn spawn_pipewire( zerocopy, preferred, quit_rx, + mutter, ) { tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed"); } @@ -466,6 +468,66 @@ mod pipewire { negotiated: Arc, /// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer. importer: Option, + /// Explicit-sync (SyncTimeline) syncobj ops; lazily opened on the first sync-carrying + /// buffer. `sync_tried` keeps a failed open from retrying per frame. + sync: Option, + sync_tried: bool, + /// Announce SyncTimeline in `param_changed` (post-format, the OBS pattern — at + /// connect time the meta acts as a hard filter and producers without support + /// fail buffer allocation outright; observed on KWin). + want_sync: bool, + /// Which Buffers pod shape to re-emit alongside the meta announcement. + want_dmabuf: bool, + /// Mutter virtual monitor: dmabufs MUST carry explicit sync (see MUTTER_SYNC_UNUSABLE). + mutter: bool, + } + + /// Mutter renders directly into the pool, so its dmabufs are only safe with explicit + /// sync. Set when sync negotiation failed (unallocated buffers) or Mutter delivered + /// dmabufs without the SyncTimeline meta: the pipeline's first-frame-timeout retry + /// then rebuilds this capture on the synchronous CPU/shm path (Mutter downloads the + /// frame, which orders against its render) — slower, never stale. + static MUTTER_SYNC_UNUSABLE: AtomicBool = AtomicBool::new(false); + + /// The explicit-sync points of one buffer: producers that render directly into the + /// pool (Mutter virtual monitors) attach a `SPA_META_SyncTimeline` plus two + /// `SPA_DATA_SyncObj` datas (acquire first, release second — PipeWire convention). + /// Reading before the acquire point fires shows the buffer's PREVIOUS contents on + /// drivers without implicit dmabuf fencing (NVIDIA) — the "stale frame flashes" bug. + struct SyncPoints { + acquire_fd: i32, + release_fd: i32, + acquire_point: u64, + release_point: u64, + } + + /// Extract the sync points, if the producer attached them. + unsafe fn sync_points(b: *const spa::sys::spa_buffer) -> Option { + unsafe { + if b.is_null() { + return None; + } + let meta = spa::sys::spa_buffer_find_meta_data( + b, + spa::sys::SPA_META_SyncTimeline, + std::mem::size_of::(), + ) as *const spa::sys::spa_meta_sync_timeline; + if meta.is_null() { + return None; + } + let datas = std::slice::from_raw_parts((*b).datas, (*b).n_datas as usize); + let mut objs = datas + .iter() + .filter(|d| d.type_ == spa::sys::SPA_DATA_SyncObj && d.fd >= 0); + let acquire = objs.next()?; + let release = objs.next()?; + Some(SyncPoints { + acquire_fd: acquire.fd as i32, + release_fd: release.fd as i32, + acquire_point: (*meta).acquire_point, + release_point: (*meta).release_point, + }) + } } /// Log a frame-drop reason once per process (the process callback runs per frame; a stuck @@ -511,6 +573,33 @@ mod pipewire { } } + /// Announce SyncTimeline (explicit sync) support: producers that render directly + /// into the buffer pool (Mutter virtual monitors) then attach per-buffer acquire / + /// release timeline points instead of relying on implicit dmabuf fencing — which + /// NVIDIA doesn't do. Producers without explicit-sync support simply ignore this. + fn build_sync_timeline_meta() -> Result> { + serialize_pod(pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamMeta.as_raw(), + id: pw::spa::param::ParamType::Meta.as_raw(), + properties: vec![ + pw::spa::pod::Property { + key: pw::spa::sys::SPA_PARAM_META_type, + flags: pw::spa::pod::PropertyFlags::empty(), + value: pw::spa::pod::Value::Id(pw::spa::utils::Id( + pw::spa::sys::SPA_META_SyncTimeline, + )), + }, + pw::spa::pod::Property { + key: pw::spa::sys::SPA_PARAM_META_size, + flags: pw::spa::pod::PropertyFlags::empty(), + value: pw::spa::pod::Value::Int(std::mem::size_of::< + pw::spa::sys::spa_meta_sync_timeline, + >() as i32), + }, + ], + }) + } + fn serialize_pod(obj: pw::spa::pod::Object) -> Result> { Ok(pw::spa::pod::serialize::PodSerializer::serialize( std::io::Cursor::new(Vec::new()), @@ -650,31 +739,60 @@ mod pipewire { /// without this bit the buffer-type intersection is empty and the link silently stalls in /// "negotiating". A LINEAR dmabuf is mmap-able by MAP_BUFFERS, so the CPU de-pad copy works. fn build_mappable_buffers() -> Result> { + let mask = (1i32 << pw::spa::sys::SPA_DATA_MemPtr) + | (1i32 << pw::spa::sys::SPA_DATA_MemFd) + | (1i32 << pw::spa::sys::SPA_DATA_DmaBuf); serialize_pod(pw::spa::pod::Object { type_: pw::spa::utils::SpaTypes::ObjectParamBuffers.as_raw(), id: pw::spa::param::ParamType::Buffers.as_raw(), properties: vec![pw::spa::pod::Property { key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType, flags: pw::spa::pod::PropertyFlags::empty(), - value: pw::spa::pod::Value::Int( - (1i32 << pw::spa::sys::SPA_DATA_MemPtr) - | (1i32 << pw::spa::sys::SPA_DATA_MemFd) - | (1i32 << pw::spa::sys::SPA_DATA_DmaBuf), - ), + value: pw::spa::pod::Value::Int(mask), }], }) } /// Build a Buffers param requesting dmabuf-only buffers. - fn build_dmabuf_buffers() -> Result> { + /// `sync` (Mutter explicit sync): the dataType must be a CHOICE_FLAGS holding ONLY + /// the DmaBuf bit — Mutter enables explicit sync only when the negotiated + /// buffer_types are exactly DmaBuf, and only its sync Buffers pod reserves the + /// blocks for the two SyncObj datas (the syncobjs are NOT a dataType bit). The + /// plain-Int form keeps the non-sync path byte-identical to what KWin/gamescope + /// already negotiate. + fn build_dmabuf_buffers(sync: bool) -> Result> { + let mask = 1i32 << pw::spa::sys::SPA_DATA_DmaBuf; + let value = if sync { + pw::spa::pod::Value::Choice(pw::spa::pod::ChoiceValue::Int(pw::spa::utils::Choice( + pw::spa::utils::ChoiceFlags::empty(), + pw::spa::utils::ChoiceEnum::Flags { + default: mask, + flags: vec![mask], + }, + ))) + } else { + pw::spa::pod::Value::Int(mask) + }; + let mut properties = vec![pw::spa::pod::Property { + key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType, + flags: pw::spa::pod::PropertyFlags::empty(), + value, + }]; + if sync { + // Pin blocks to media + 2 SyncObj datas: the producer offers a sync pod + // (blocks=3) AND a non-sync fallback (blocks=1) — without this filter the + // fallback can win the intersection while the SyncTimeline meta still + // negotiates, and the producer then asserts on its own 1-block buffers. + properties.push(pw::spa::pod::Property { + key: pw::spa::sys::SPA_PARAM_BUFFERS_blocks, + flags: pw::spa::pod::PropertyFlags::empty(), + value: pw::spa::pod::Value::Int(3), + }); + } serialize_pod(pw::spa::pod::Object { type_: pw::spa::utils::SpaTypes::ObjectParamBuffers.as_raw(), id: pw::spa::param::ParamType::Buffers.as_raw(), - properties: vec![pw::spa::pod::Property { - key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType, - flags: pw::spa::pod::PropertyFlags::empty(), - value: pw::spa::pod::Value::Int(1i32 << pw::spa::sys::SPA_DATA_DmaBuf), - }], + properties, }) } @@ -688,6 +806,7 @@ mod pipewire { zerocopy: bool, preferred: Option<(u32, u32, u32)>, quit_rx: pw::channel::Receiver<()>, + mutter: bool, ) -> Result<()> { crate::pwinit::ensure_init(); @@ -736,9 +855,26 @@ mod pipewire { if importer.is_some() && !modifiers.contains(&0) { modifiers.push(0); // DRM_FORMAT_MOD_LINEAR } - let want_dmabuf = importer.is_some() && !modifiers.is_empty(); + // Explicit sync (SyncTimeline meta + SyncObj datas), announced post-format in + // param_changed — only on Mutter: it is the one compositor that renders directly + // into the pool, and announcing the meta to producers whose syncobj path is + // broken makes them fail buffer allocation outright (observed on KWin + NVIDIA). + // PUNKTFUNK_EXPLICIT_SYNC=0 is the escape hatch. + let want_sync = mutter + && !MUTTER_SYNC_UNUSABLE.load(Ordering::Relaxed) + && std::env::var("PUNKTFUNK_EXPLICIT_SYNC").as_deref() != Ok("0"); + // On Mutter, dmabufs without explicit sync are not safe to consume — fall back to + // the shm/CPU path (Mutter's synchronous download) when sync is unavailable. + let want_dmabuf = importer.is_some() && !modifiers.is_empty() && (!mutter || want_sync); if zerocopy && !want_dmabuf { - tracing::warn!("zero-copy: no EGL-importable dmabuf modifiers — using CPU path"); + if mutter && !want_sync { + tracing::warn!( + "Mutter without working explicit sync — using the synchronous CPU \ + path (dmabuf capture would show stale frames on NVIDIA)" + ); + } else { + tracing::warn!("zero-copy: no EGL-importable dmabuf modifiers — using CPU path"); + } } else if want_dmabuf { tracing::info!( count = modifiers.len(), @@ -755,6 +891,11 @@ mod pipewire { active, negotiated, importer, + sync: None, + sync_tried: false, + want_sync, + want_dmabuf, + mutter, }; let stream = pw::stream::StreamBox::new( @@ -775,10 +916,24 @@ mod pipewire { let _listener = stream .add_local_listener_with_user_data(data) - .state_changed(|_stream, _ud, old, new| { + .state_changed(|_stream, ud, old, new| { tracing::info!(?old, ?new, "pipewire stream state"); + // A sync-announced negotiation the producer cannot satisfy fails buffer + // allocation ("error alloc buffers") — no process callback will ever run, + // so flag it HERE and starve the first-frame timeout: the pipeline retry + // then rebuilds this capture on the synchronous CPU path. + if matches!(new, pw::stream::StreamState::Error(_)) + && ud.want_sync + && !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed) + { + tracing::warn!( + "explicit-sync buffer negotiation failed (Mutter without \ + DRM_CAP_SYNCOBJ_TIMELINE / cogl sync_fd, e.g. NVIDIA) — retrying \ + this capture on the synchronous CPU path" + ); + } }) - .param_changed(|_stream, ud, id, param| { + .param_changed(|stream, ud, id, param| { let Some(param) = param else { return }; if id != pw::spa::param::ParamType::Format.as_raw() { return; @@ -812,165 +967,78 @@ mod pipewire { "negotiated a pixel format the encoder cannot consume — frames will be skipped" ); } + // Post-format renegotiation (the OBS pattern): announce SyncTimeline + // + a Buffers pod accepting the SyncObj datas. Producers without + // explicit-sync support ignore it here, instead of failing allocation + // as they do when it arrives at connect time. + if ud.want_sync && ud.want_dmabuf { + let update = (|| -> Result<()> { + let buffers = build_dmabuf_buffers(true)?; + let meta = build_sync_timeline_meta()?; + let mut pods = vec![ + Pod::from_bytes(&buffers).context("buffers pod")?, + Pod::from_bytes(&meta).context("meta pod")?, + ]; + stream + .update_params(&mut pods) + .context("update_params(SyncTimeline)")?; + Ok(()) + })(); + if let Err(e) = update { + tracing::warn!(error = %format!("{e:#}"), + "explicit-sync param update failed — capturing without it"); + } + } } }) .process(|stream, ud| { // PipeWire dispatches this from a C trampoline with no catch_unwind; a // panic crossing that FFI boundary would abort the whole host. Contain it. let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - let Some(mut buffer) = stream.dequeue_buffer() else { - return; - }; - // No active stream: release the buffer without the (expensive at 5K) de-pad. - if !ud.active.load(Ordering::Relaxed) { + // Raw dequeue: the safe Buffer wrapper hides the spa_buffer metas that + // carry the explicit-sync timeline points. + let raw = unsafe { stream.dequeue_raw_buffer() }; + if raw.is_null() { return; } - let datas = buffer.datas_mut(); - if datas.is_empty() { - return; - } - let sz = ud.info.size(); - let (w, h) = (sz.width as usize, sz.height as usize); - if w == 0 || h == 0 { - return; // format not negotiated yet - } - - // Zero-copy path: if the buffer is a dmabuf and we have an importer, import it - // into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall - // through to the shm de-pad copy below. - let mut gpu_import_broken = false; - if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) { - if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { - let plane = crate::zerocopy::DmabufPlane { - fd: datas[0].fd(), - offset: datas[0].chunk().offset(), - stride: datas[0].chunk().stride().max(0) as u32, - }; - // Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g. - // gamescope) → direct CUDA external-memory import (NVIDIA EGL can't - // sample LINEAR). - let modifier = (ud.modifier != 0).then_some(ud.modifier); - if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) { - let imported = if modifier.is_some() { - importer.import(&plane, w as u32, h as u32, fourcc, modifier) - } else { - importer.import_linear(&plane, w as u32, h as u32) - }; - match imported { - Ok(devbuf) => { - static ONCE: std::sync::atomic::AtomicBool = - std::sync::atomic::AtomicBool::new(true); - if ONCE.swap(false, Ordering::Relaxed) { - tracing::info!(w, h, modifier = ud.modifier, - "zero-copy: dmabuf imported to CUDA (no CPU copy)"); - } - let pts_ns = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - let _ = ud.tx.try_send(CapturedFrame { - width: w as u32, - height: h as u32, - pts_ns, - format: fmt, - payload: FramePayload::Cuda(devbuf), - }); - return; - } - Err(e) => { - // GPU import unavailable for this buffer kind (e.g. the - // driver rejects LINEAR external-memory import). Disable - // the importer and fall through to the CPU mmap path — - // degraded, not dead. - tracing::warn!(error = %format!("{e:#}"), - "dmabuf GPU import failed — falling back to the CPU copy path"); - gpu_import_broken = true; - } + let spa_buf = unsafe { (*raw).buffer }; + let sync = unsafe { sync_points(spa_buf) }; + if let Some(s) = &sync { + if !ud.sync_tried { + ud.sync_tried = true; + match crate::drm_sync::DrmSync::open() { + Ok(d) => { + tracing::info!( + "pipewire explicit sync active (SyncTimeline — producer \ + renders are awaited before encode)" + ); + ud.sync = Some(d); } - } else { - return; // format has no DRM fourcc mapping — skip the frame + Err(e) => tracing::warn!( + error = %format!("{e:#}"), + "explicit sync offered but syncobj ops unavailable — reading \ + unsynchronized" + ), + } + } + // Wait for the producer's render to land before ANY read (GPU import + // or CPU mmap). A bounded wait: a wedged producer must not stall the + // PipeWire loop. + if let Some(drm) = &ud.sync { + if drm.wait_point(s.acquire_fd, s.acquire_point, 100).is_err() { + warn_once("explicit-sync acquire wait failed — frame may be stale"); } } } - if gpu_import_broken { - ud.importer = None; - } - - let d = &mut datas[0]; - // CPU path may also receive LINEAR dmabufs (gamescope offers only those once its - // modifier-bearing format pod wins); capture the fd before `data()` borrows `d`. - let dmabuf_fd = - (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd()); - let (size, offset, stride) = { - let c = d.chunk(); - ( - c.size() as usize, - c.offset() as usize, - c.stride().max(0) as usize, - ) - }; - let Some(fmt) = ud.format else { return }; // unsupported/not negotiated - let bpp = fmt.bytes_per_pixel(); - let row = w * bpp; - let stride = if stride == 0 { row } else { stride }; - if stride < row { - warn_once("chunk stride < row — frames dropped"); - return; - } - let needed = stride * (h - 1) + row; - // dmabuf chunks commonly report size 0; fall back to the computed span. - let size = if size == 0 { needed } else { size }; - // MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs - // (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read. - let _mapping; // keeps a manual mmap alive for the copy below - let buf: &[u8] = if let Some(data) = d.data() { - data - } else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) { - match DmabufMap::new(fd, offset + needed) { - Some(m) => { - _mapping = m; - unsafe { - std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) - } - } - None => { - warn_once("mmap(dmabuf) failed — frames dropped"); - return; - } + consume_frame(ud, spa_buf, sync.is_some()); + // The producer reuses the buffer only after the release point fires — + // signal it on EVERY path, even skipped frames, or the producer stalls. + if let (Some(s), Some(drm)) = (&sync, &ud.sync) { + if drm.signal_point(s.release_fd, s.release_point).is_err() { + warn_once("explicit-sync release signal failed — producer may stall"); } - } else { - warn_once("buffer has no mappable data — frames dropped"); - return; - }; - // Need stride*(h-1)+row valid bytes within [offset, offset+size). - if offset > buf.len() { - return; } - let avail = buf.len() - offset; - if needed > avail || needed > size { - warn_once("buffer smaller than frame span — frames dropped"); - return; - } - let region = &buf[offset..offset + size.min(avail)]; - // De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp). - let mut tight = vec![0u8; row * h]; - for y in 0..h { - tight[y * row..y * row + row] - .copy_from_slice(®ion[y * stride..y * stride + row]); - } - let pts_ns = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - let frame = CapturedFrame { - width: w as u32, - height: h as u32, - pts_ns, - format: fmt, - payload: FramePayload::Cpu(tight), - }; - // Drop if the encoder is behind — never block the pipewire loop. - let _ = ud.tx.try_send(frame); + unsafe { stream.queue_raw_buffer(raw) }; })); if outcome.is_err() { tracing::error!("panic in pipewire process callback — frame dropped"); @@ -1031,10 +1099,18 @@ mod pipewire { // `param_changed` (the two-step DMA-BUF handshake). Otherwise offer the multi-format shm // pod and let MAP_BUFFERS map it. let shm_values = serialize_pod(obj)?; + // The SyncTimeline announcement itself happens post-format in param_changed (the + // OBS pattern); at connect time we only declare the data types we accept. let (dmabuf_values, buffers_values) = if want_dmabuf { ( Some(build_dmabuf_format(&modifiers, preferred)?), - Some(build_dmabuf_buffers()?), + // Sync path: NO Buffers pod at connect — buffers must not be allocated + // until the SyncTimeline meta is on the table (param_changed), or the + // producer's syncobj setup meets already-allocated 1-block buffers + // (observed: gnome-shell "n_datas >= SYNCOBJ_MINIMUM_N_DATAS" assertions). + (!want_sync) + .then(|| build_dmabuf_buffers(false)) + .transpose()?, ) } else { // CPU path still accepts mappable dmabufs (gamescope offers only those once its @@ -1068,4 +1144,199 @@ mod pipewire { mainloop.run(); Ok(()) } + + /// Consume one dequeued buffer: GPU-import or de-pad-copy `datas[0]` and hand the + /// frame to the encoder. Extracted from the process callback so the explicit-sync + /// release point can be signaled on every early return. + fn consume_frame(ud: &mut UserData, spa_buf: *mut spa::sys::spa_buffer, has_sync: bool) { + // No active stream: release the buffer without the (expensive at 5K) de-pad. + if !ud.active.load(Ordering::Relaxed) { + return; + } + let datas: &mut [pw::spa::buffer::Data] = unsafe { + if spa_buf.is_null() || (*spa_buf).n_datas == 0 || (*spa_buf).datas.is_null() { + &mut [] + } else { + // Same transparent cast libspa's Buffer::datas_mut performs. + std::slice::from_raw_parts_mut( + (*spa_buf).datas as *mut pw::spa::buffer::Data, + (*spa_buf).n_datas as usize, + ) + } + }; + if datas.is_empty() { + return; + } + // A sync-announced negotiation that the producer could not complete leaves the + // buffers unallocated (type SPA_ID_INVALID, fd -1). Disable sync and starve the + // first-frame timeout so the pipeline retries on the safe path. + if datas[0].as_raw().type_ == u32::MAX + /* SPA_ID_INVALID */ + { + if ud.want_sync && !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed) { + tracing::error!( + "explicit-sync negotiation produced unallocated buffers — retrying \ + this capture on the synchronous CPU path" + ); + } else { + warn_once("buffer arrived unallocated — frames dropped"); + } + return; + } + if ud.mutter && !has_sync && datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { + // Mutter renders straight into the pool; without explicit sync the encode + // races the render on NVIDIA and flashes the buffer's previous contents. + // Don't consume these — starve the first-frame timeout so the pipeline + // retries on the synchronous CPU path. + if !MUTTER_SYNC_UNUSABLE.swap(true, Ordering::Relaxed) { + tracing::error!( + "Mutter delivered dmabufs without explicit sync — refusing them \ + (stale-frame corruption); retrying on the synchronous CPU path" + ); + } + return; + } + let sz = ud.info.size(); + let (w, h) = (sz.width as usize, sz.height as usize); + if w == 0 || h == 0 { + return; // format not negotiated yet + } + + // Zero-copy path: if the buffer is a dmabuf and we have an importer, import it + // into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall + // through to the shm de-pad copy below. + let mut gpu_import_broken = false; + if let (Some(importer), Some(fmt)) = (ud.importer.as_mut(), ud.format) { + if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf { + let plane = crate::zerocopy::DmabufPlane { + fd: datas[0].fd(), + offset: datas[0].chunk().offset(), + stride: datas[0].chunk().stride().max(0) as u32, + }; + // Tiled modifier → EGL/GL de-tile import; LINEAR (0/unset, e.g. + // gamescope) → direct CUDA external-memory import (NVIDIA EGL can't + // sample LINEAR). + let modifier = (ud.modifier != 0).then_some(ud.modifier); + if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) { + let imported = if modifier.is_some() { + importer.import(&plane, w as u32, h as u32, fourcc, modifier) + } else { + importer.import_linear(&plane, w as u32, h as u32) + }; + match imported { + Ok(devbuf) => { + static ONCE: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if ONCE.swap(false, Ordering::Relaxed) { + tracing::info!( + w, + h, + modifier = ud.modifier, + "zero-copy: dmabuf imported to CUDA (no CPU copy)" + ); + } + let pts_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let _ = ud.tx.try_send(CapturedFrame { + width: w as u32, + height: h as u32, + pts_ns, + format: fmt, + payload: FramePayload::Cuda(devbuf), + }); + return; + } + Err(e) => { + // GPU import unavailable for this buffer kind (e.g. the + // driver rejects LINEAR external-memory import). Disable + // the importer and fall through to the CPU mmap path — + // degraded, not dead. + tracing::warn!(error = %format!("{e:#}"), + "dmabuf GPU import failed — falling back to the CPU copy path"); + gpu_import_broken = true; + } + } + } else { + return; // format has no DRM fourcc mapping — skip the frame + } + } + } + if gpu_import_broken { + ud.importer = None; + } + + let d = &mut datas[0]; + // CPU path may also receive LINEAR dmabufs (gamescope offers only those once its + // modifier-bearing format pod wins); capture the fd before `data()` borrows `d`. + let dmabuf_fd = (d.type_() == pw::spa::buffer::DataType::DmaBuf).then(|| d.fd()); + let (size, offset, stride) = { + let c = d.chunk(); + ( + c.size() as usize, + c.offset() as usize, + c.stride().max(0) as usize, + ) + }; + let Some(fmt) = ud.format else { return }; // unsupported/not negotiated + let bpp = fmt.bytes_per_pixel(); + let row = w * bpp; + let stride = if stride == 0 { row } else { stride }; + if stride < row { + warn_once("chunk stride < row — frames dropped"); + return; + } + let needed = stride * (h - 1) + row; + // dmabuf chunks commonly report size 0; fall back to the computed span. + let size = if size == 0 { needed } else { size }; + // MAP_BUFFERS only maps buffers flagged mappable; Vulkan-exported dmabufs + // (gamescope) usually aren't, so mmap the fd ourselves for the de-pad read. + let _mapping; // keeps a manual mmap alive for the copy below + let buf: &[u8] = if let Some(data) = d.data() { + data + } else if let Some(fd) = dmabuf_fd.filter(|&fd| fd > 0) { + match DmabufMap::new(fd, offset + needed) { + Some(m) => { + _mapping = m; + unsafe { std::slice::from_raw_parts(_mapping.ptr as *const u8, _mapping.len) } + } + None => { + warn_once("mmap(dmabuf) failed — frames dropped"); + return; + } + } + } else { + warn_once("buffer has no mappable data — frames dropped"); + return; + }; + // Need stride*(h-1)+row valid bytes within [offset, offset+size). + if offset > buf.len() { + return; + } + let avail = buf.len() - offset; + if needed > avail || needed > size { + warn_once("buffer smaller than frame span — frames dropped"); + return; + } + let region = &buf[offset..offset + size.min(avail)]; + // De-pad into a tightly-packed buffer (chunk stride may exceed w*bpp). + let mut tight = vec![0u8; row * h]; + for y in 0..h { + tight[y * row..y * row + row].copy_from_slice(®ion[y * stride..y * stride + row]); + } + let pts_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let frame = CapturedFrame { + width: w as u32, + height: h as u32, + pts_ns, + format: fmt, + payload: FramePayload::Cpu(tight), + }; + // Drop if the encoder is behind — never block the pipewire loop. + let _ = ud.tx.try_send(frame); + } } diff --git a/crates/punktfunk-host/src/drm_sync.rs b/crates/punktfunk-host/src/drm_sync.rs new file mode 100644 index 0000000..9ca7e35 --- /dev/null +++ b/crates/punktfunk-host/src/drm_sync.rs @@ -0,0 +1,209 @@ +//! Minimal DRM timeline-syncobj operations — the consumer side of PipeWire explicit sync +//! (`SPA_META_SyncTimeline`). +//! +//! Compositors that render directly into the PipeWire buffer pool (Mutter's virtual +//! monitors) hand buffers over at GPU-submit time; on drivers without implicit dmabuf +//! fencing (NVIDIA) reading immediately races the render and shows the buffer's +//! *previous* contents. With explicit sync the producer attaches a timeline syncobj: +//! wait the acquire point before touching the buffer, signal the release point when done. +//! +//! Syncobjs are DRM-core objects: any render node can import and wait them, so this +//! opens its own fd independent of the capture GPU path. + +use anyhow::{bail, Result}; +use std::os::fd::RawFd; + +// drm.h ioctls on the 'd' (0x64) magic. _IOWR = dir(3)<<30 | size<<16 | 0x64<<8 | nr. +const fn iowr(nr: u32, size: usize) -> u64 { + (3u64 << 30) | ((size as u64) << 16) | (0x64u64 << 8) | nr as u64 +} + +#[repr(C)] +#[derive(Default)] +struct DrmSyncobjHandle { + handle: u32, + flags: u32, + fd: i32, + pad: u32, +} + +#[repr(C)] +#[derive(Default)] +struct DrmSyncobjDestroy { + handle: u32, + pad: u32, +} + +#[repr(C)] +#[derive(Default)] +struct DrmSyncobjTimelineWait { + handles: u64, + points: u64, + /// Absolute CLOCK_MONOTONIC deadline, nanoseconds. + timeout_nsec: i64, + count_handles: u32, + flags: u32, + first_signaled: u32, + pad: u32, +} + +#[repr(C)] +#[derive(Default)] +struct DrmSyncobjTimelineArray { + handles: u64, + points: u64, + count_handles: u32, + flags: u32, +} + +const DRM_IOCTL_SYNCOBJ_DESTROY: u64 = iowr(0xC0, std::mem::size_of::()); +const DRM_IOCTL_SYNCOBJ_FD_TO_HANDLE: u64 = iowr(0xC2, std::mem::size_of::()); +const DRM_IOCTL_SYNCOBJ_TIMELINE_WAIT: u64 = + iowr(0xCA, std::mem::size_of::()); +const DRM_IOCTL_SYNCOBJ_TIMELINE_SIGNAL: u64 = + iowr(0xCD, std::mem::size_of::()); + +/// The producer's point may not be attached yet when the buffer reaches us. +const DRM_SYNCOBJ_WAIT_FLAGS_WAIT_FOR_SUBMIT: u32 = 1 << 1; + +pub struct DrmSync { + fd: RawFd, +} + +impl DrmSync { + pub fn open() -> Result { + let path = c"/dev/dri/renderD128"; + let fd = unsafe { libc::open(path.as_ptr(), libc::O_RDWR | libc::O_CLOEXEC) }; + if fd < 0 { + bail!("open /dev/dri/renderD128 for syncobj ops: {}", errno()); + } + Ok(DrmSync { fd }) + } + + /// Import a syncobj fd into a (temporary) handle on our device. + fn import(&self, syncobj_fd: RawFd) -> Result { + let mut req = DrmSyncobjHandle { + fd: syncobj_fd, + ..Default::default() + }; + let r = unsafe { libc::ioctl(self.fd, DRM_IOCTL_SYNCOBJ_FD_TO_HANDLE, &mut req) }; + if r < 0 { + bail!("SYNCOBJ_FD_TO_HANDLE: {}", errno()); + } + Ok(req.handle) + } + + fn destroy(&self, handle: u32) { + let mut req = DrmSyncobjDestroy { + handle, + ..Default::default() + }; + unsafe { libc::ioctl(self.fd, DRM_IOCTL_SYNCOBJ_DESTROY, &mut req) }; + } + + /// Block until `point` on the producer's timeline is signaled (the buffer's contents + /// are ready), or `timeout_ms` passes. + pub fn wait_point(&self, syncobj_fd: RawFd, point: u64, timeout_ms: u64) -> Result<()> { + let handle = self.import(syncobj_fd)?; + let mut now = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut now) }; + let deadline = now.tv_sec * 1_000_000_000 + now.tv_nsec + timeout_ms as i64 * 1_000_000; + let handles = [handle]; + let points = [point]; + let mut req = DrmSyncobjTimelineWait { + handles: handles.as_ptr() as u64, + points: points.as_ptr() as u64, + timeout_nsec: deadline, + count_handles: 1, + flags: DRM_SYNCOBJ_WAIT_FLAGS_WAIT_FOR_SUBMIT, + ..Default::default() + }; + let r = unsafe { libc::ioctl(self.fd, DRM_IOCTL_SYNCOBJ_TIMELINE_WAIT, &mut req) }; + let saved = errno(); + self.destroy(handle); + if r < 0 { + bail!("SYNCOBJ_TIMELINE_WAIT(point {point}): {saved}"); + } + Ok(()) + } + + /// Signal `point` on the consumer release timeline — the producer may reuse the + /// buffer. Must be called for every buffer that carried sync metadata, even when the + /// frame was skipped, or the producer stalls waiting for it. + pub fn signal_point(&self, syncobj_fd: RawFd, point: u64) -> Result<()> { + let handle = self.import(syncobj_fd)?; + let handles = [handle]; + let points = [point]; + let mut req = DrmSyncobjTimelineArray { + handles: handles.as_ptr() as u64, + points: points.as_ptr() as u64, + count_handles: 1, + flags: 0, + }; + let r = unsafe { libc::ioctl(self.fd, DRM_IOCTL_SYNCOBJ_TIMELINE_SIGNAL, &mut req) }; + let saved = errno(); + self.destroy(handle); + if r < 0 { + bail!("SYNCOBJ_TIMELINE_SIGNAL(point {point}): {saved}"); + } + Ok(()) + } +} + +impl Drop for DrmSync { + fn drop(&mut self) { + unsafe { libc::close(self.fd) }; + } +} + +fn errno() -> std::io::Error { + std::io::Error::last_os_error() +} + +// `DrmSync::open` must not panic the PipeWire thread; everything is Result-based and the +// caller degrades to unsynchronized capture (with a loud warning) when it fails. +#[cfg(test)] +mod tests { + use super::*; + + /// The ioctl numbers must match drm.h exactly — computed, so lock them down. + #[test] + fn ioctl_numbers_match_drm_h() { + assert_eq!(DRM_IOCTL_SYNCOBJ_FD_TO_HANDLE, 0xC010_64C2); + assert_eq!(DRM_IOCTL_SYNCOBJ_DESTROY, 0xC008_64C0); + assert_eq!(DRM_IOCTL_SYNCOBJ_TIMELINE_WAIT, 0xC028_64CA); + assert_eq!(DRM_IOCTL_SYNCOBJ_TIMELINE_SIGNAL, 0xC018_64CD); + } + + /// Round-trip against the real DRM device when one exists (CI containers skip). + #[test] + fn signal_then_wait_roundtrip() { + let Ok(sync) = DrmSync::open() else { + eprintln!("no render node — skipping"); + return; + }; + // Create a fresh syncobj (CREATE = 0xBF), export it, signal point 1, wait point 1. + #[repr(C)] + #[derive(Default)] + struct Create { + handle: u32, + flags: u32, + } + const CREATE: u64 = iowr(0xBF, std::mem::size_of::()); + const HANDLE_TO_FD: u64 = iowr(0xC1, std::mem::size_of::()); + let mut c = Create::default(); + assert!(unsafe { libc::ioctl(sync.fd, CREATE, &mut c) } >= 0); + let mut h = DrmSyncobjHandle { + handle: c.handle, + ..Default::default() + }; + assert!(unsafe { libc::ioctl(sync.fd, HANDLE_TO_FD, &mut h) } >= 0); + sync.signal_point(h.fd, 1).expect("signal"); + sync.wait_point(h.fd, 1, 100).expect("wait after signal"); + unsafe { libc::close(h.fd) }; + sync.destroy(c.handle); + } +} diff --git a/crates/punktfunk-host/src/inject/libei.rs b/crates/punktfunk-host/src/inject/libei.rs index aedd59e..aa4b18b 100644 --- a/crates/punktfunk-host/src/inject/libei.rs +++ b/crates/punktfunk-host/src/inject/libei.rs @@ -163,6 +163,11 @@ async fn session_main(mut rx: UnboundedReceiver, source: EiSource) { } } } + // A client that vanished mid-press must not leave keys/buttons latched in the + // compositor — Mutter keeps the implicit grab of a destroyed device's button and the + // focused app stops taking clicks until it is restarted. Release everything still + // held before the EIS connection (and its devices) go away. + state.release_all(&context); } /// Tie down the verbose tuple the connect step returns. The keep-alive must stay alive for the @@ -360,6 +365,14 @@ struct EiState { /// kind a client sends + whether it emitted, so an unexpected client — e.g. a touch-only /// tablet hitting a compositor without ei_touchscreen — is immediately diagnosable). seen_kinds: u32, + /// Wire codes currently held down (keys = VK, buttons = GameStream ids, touches = ids) + /// — synthesized back up at session end ([`EiState::release_all`]). A client that + /// vanishes mid-press must not leave the compositor with a latched key or an implicit + /// pointer grab: observed on Mutter, a button held by a destroyed EIS device wedges + /// click delivery to the focused app until that app is restarted. + held_keys: Vec, + held_buttons: Vec, + held_touches: Vec, } /// Stable small index per [`InputKind`] for the `seen_kinds` bitmask. @@ -390,6 +403,47 @@ impl EiState { start: Instant::now(), injected: 0, seen_kinds: 0, + held_keys: Vec::new(), + held_buttons: Vec::new(), + held_touches: Vec::new(), + } + } + + /// Release everything the remote client still holds — called when the session ends + /// (client gone, EIS closing). Synthesizes wire-level release events through the + /// normal [`EiState::inject`] path so the compositor sees proper key-up / button-up / + /// touch-up frames before the devices disappear. + fn release_all(&mut self, ctx: &ei::Context) { + let (keys, buttons, touches) = ( + std::mem::take(&mut self.held_keys), + std::mem::take(&mut self.held_buttons), + std::mem::take(&mut self.held_touches), + ); + if keys.is_empty() && buttons.is_empty() && touches.is_empty() { + return; + } + tracing::info!( + keys = keys.len(), + buttons = buttons.len(), + touches = touches.len(), + "libei: releasing input still held at session end" + ); + let release = |kind: InputKind, code: u32| InputEvent { + kind, + _pad: [0; 3], + code, + x: 0, + y: 0, + flags: 0, + }; + for code in buttons { + self.inject(&release(InputKind::MouseButtonUp, code), ctx); + } + for code in keys { + self.inject(&release(InputKind::KeyUp, code), ctx); + } + for id in touches { + self.inject(&release(InputKind::TouchUp, id), ctx); } } @@ -620,6 +674,23 @@ impl EiState { } if emitted { + // Track held state on the wire codes so `release_all` can undo it at + // session end (vanished clients must not leave anything latched). + match ev.kind { + InputKind::KeyDown if !self.held_keys.contains(&ev.code) => { + self.held_keys.push(ev.code); + } + InputKind::KeyUp => self.held_keys.retain(|&c| c != ev.code), + InputKind::MouseButtonDown if !self.held_buttons.contains(&ev.code) => { + self.held_buttons.push(ev.code); + } + InputKind::MouseButtonUp => self.held_buttons.retain(|&c| c != ev.code), + InputKind::TouchDown if !self.held_touches.contains(&ev.code) => { + self.held_touches.push(ev.code); + } + InputKind::TouchUp => self.held_touches.retain(|&c| c != ev.code), + _ => {} + } dev.frame(self.last_serial, self.now_us()); } if let Err(e) = ctx.flush() { diff --git a/crates/punktfunk-host/src/main.rs b/crates/punktfunk-host/src/main.rs index 2a6ae05..d7681ba 100644 --- a/crates/punktfunk-host/src/main.rs +++ b/crates/punktfunk-host/src/main.rs @@ -16,6 +16,7 @@ mod audio; mod capture; mod discovery; +mod drm_sync; mod encode; mod gamestream; mod inject; diff --git a/crates/punktfunk-host/src/vdisplay.rs b/crates/punktfunk-host/src/vdisplay.rs index c584e40..56e4551 100644 --- a/crates/punktfunk-host/src/vdisplay.rs +++ b/crates/punktfunk-host/src/vdisplay.rs @@ -35,6 +35,11 @@ pub struct VirtualOutput { pub preferred_mode: Option<(u32, u32, u32)>, /// Keeps the output — and whatever connection/thread backs it — alive; dropped on teardown. pub keepalive: Box, + /// This is a Mutter virtual monitor: the compositor renders DIRECTLY into the + /// PipeWire buffer pool, so consuming its dmabufs needs explicit sync (SyncTimeline) + /// — without it, NVIDIA's missing implicit fencing shows stale frames. The capture + /// layer keys its sync negotiation / shm fallback on this. + pub mutter: bool, } /// Pluggable virtual-output creation, per compositor. diff --git a/crates/punktfunk-host/src/vdisplay/gamescope.rs b/crates/punktfunk-host/src/vdisplay/gamescope.rs index 0c22039..2364961 100644 --- a/crates/punktfunk-host/src/vdisplay/gamescope.rs +++ b/crates/punktfunk-host/src/vdisplay/gamescope.rs @@ -83,6 +83,7 @@ impl VirtualDisplay for GamescopeDisplay { point_injector_at_eis(); tracing::info!(node_id, "gamescope: attaching to existing PipeWire node"); return Ok(VirtualOutput { + mutter: false, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), @@ -107,6 +108,7 @@ impl VirtualDisplay for GamescopeDisplay { "gamescope virtual output ready" ); Ok(VirtualOutput { + mutter: false, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), @@ -136,6 +138,7 @@ fn create_managed_session(client: &str, mode: Mode) -> Result { "gamescope session: reusing the running session (same mode — no Steam restart)" ); return Ok(VirtualOutput { + mutter: false, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), @@ -162,6 +165,7 @@ fn create_managed_session(client: &str, mode: Mode) -> Result { "gamescope session: launched gamescope-session-plus at the client's mode" ); Ok(VirtualOutput { + mutter: false, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), diff --git a/crates/punktfunk-host/src/vdisplay/kwin.rs b/crates/punktfunk-host/src/vdisplay/kwin.rs index b3ae402..2ed92ca 100644 --- a/crates/punktfunk-host/src/vdisplay/kwin.rs +++ b/crates/punktfunk-host/src/vdisplay/kwin.rs @@ -104,6 +104,7 @@ impl VirtualDisplay for KwinDisplay { mode.refresh_hz }; Ok(VirtualOutput { + mutter: false, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, achieved_hz)), diff --git a/crates/punktfunk-host/src/vdisplay/mutter.rs b/crates/punktfunk-host/src/vdisplay/mutter.rs index 7ec3560..44ee28d 100644 --- a/crates/punktfunk-host/src/vdisplay/mutter.rs +++ b/crates/punktfunk-host/src/vdisplay/mutter.rs @@ -85,6 +85,7 @@ impl VirtualDisplay for MutterDisplay { "Mutter virtual monitor ready" ); Ok(VirtualOutput { + mutter: true, node_id, remote_fd: None, preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), diff --git a/crates/punktfunk-host/src/vdisplay/wlroots.rs b/crates/punktfunk-host/src/vdisplay/wlroots.rs index 33d7488..2fa0d91 100644 --- a/crates/punktfunk-host/src/vdisplay/wlroots.rs +++ b/crates/punktfunk-host/src/vdisplay/wlroots.rs @@ -123,6 +123,7 @@ impl VirtualDisplay for WlrootsDisplay { "sway headless output ready" ); Ok(VirtualOutput { + mutter: false, node_id, remote_fd: Some(fd), preferred_mode: Some((mode.width, mode.height, mode.refresh_hz)), diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index cef501e..82d82db 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -470,6 +470,18 @@ typedef struct { float loss_pct; } PunktfunkProbeResult; +// `struct msghdr_x` from Darwin `` (the batched-I/O variant — not in the `libc` crate). +typedef struct { + void *msg_name; + socklen_t msg_namelen; + iovec *msg_iov; + int msg_iovlen; + void *msg_control; + socklen_t msg_controllen; + int msg_flags; + size_t msg_datalen; +} MsghdrX; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -887,6 +899,10 @@ PunktfunkStatus punktfunk_connection_probe_result(const PunktfunkConnection *c, void punktfunk_connection_close(PunktfunkConnection *c); #endif +// Darwin batched receive: up to `cnt` datagrams in one syscall; returns the count received and +// sets each `msg_datalen` to its byte length. Present in libSystem on all macOS/iOS. +extern ssize_t recvmsg_x(int s, MsghdrX *msgp, unsigned int cnt, int flags); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus