feat: M2 zero-copy — PipeWire dmabuf negotiation + EGL device-platform import (WIP)
Wire the capture side of zero-copy (LUMEN_ZEROCOPY=1): - EGL importer now opens the headless EGLDisplay on the NVIDIA EGL device (EGL_PLATFORM_DEVICE_EXT) and queries its importable DRM modifiers (eglQueryDmaBufModifiersEXT). - The PipeWire stream advertises a BGRx dmabuf format with those modifiers as a mandatory enum Choice + a dmabuf-only Buffers param; the compositor fixates an importable tiled modifier. param_changed reads the negotiated modifier; the process callback imports the dmabuf (eglCreateImage with explicit LO/HI modifier) and would copy it into a CUDA buffer for the encoder. Validated against headless KWin (Plasma 6.4): negotiation succeeds (13 NVIDIA modifiers advertised, KWin fixates one, stream reaches Streaming with a real tiled dmabuf) and `eglCreateImage` succeeds. The remaining blocker is `cuGraphicsEGLRegisterImage` returning CUDA_ERROR_INVALID_VALUE on the dmabuf-imported EGLImage — the likely fix is to bind the EGLImage to a GL texture (glEGLImageTargetTexture2DOES) and register that via cuGraphicsGLRegisterImage (OBS/Sunshine's path), which needs a GL context. The CPU-copy path stays the default and is unaffected (regression-checked: real KWin capture → HEVC). LUMEN_ZEROCOPY is opt-in/experimental until the CUDA registration lands. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -65,10 +65,13 @@ impl PortalCapturer {
|
||||
let (frame_tx, frame_rx) = sync_channel::<CapturedFrame>(8);
|
||||
let active = Arc::new(AtomicBool::new(false));
|
||||
let active_cb = active.clone();
|
||||
let zerocopy = crate::zerocopy::enabled();
|
||||
thread::Builder::new()
|
||||
.name("lumen-pipewire".into())
|
||||
.spawn(move || {
|
||||
if let Err(e) = pipewire::pipewire_thread(fd, node_id, frame_tx, active_cb) {
|
||||
if let Err(e) =
|
||||
pipewire::pipewire_thread(fd, node_id, frame_tx, active_cb, zerocopy)
|
||||
{
|
||||
tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed");
|
||||
}
|
||||
})
|
||||
@@ -328,9 +331,91 @@ mod pipewire {
|
||||
info: VideoInfoRaw,
|
||||
/// Negotiated layout (`None` until param_changed, or if unsupported).
|
||||
format: Option<PixelFormat>,
|
||||
/// Negotiated DRM format modifier (for dmabuf import); 0 = LINEAR.
|
||||
modifier: u64,
|
||||
tx: SyncSender<CapturedFrame>,
|
||||
/// When false (no active stream), skip the de-pad copy — the buffer is just released.
|
||||
active: Arc<AtomicBool>,
|
||||
/// Present when zero-copy is enabled: imports a dmabuf → CUDA device buffer.
|
||||
importer: Option<crate::zerocopy::EglImporter>,
|
||||
}
|
||||
|
||||
fn serialize_pod(obj: pw::spa::pod::Object) -> Result<Vec<u8>> {
|
||||
Ok(pw::spa::pod::serialize::PodSerializer::serialize(
|
||||
std::io::Cursor::new(Vec::new()),
|
||||
&pw::spa::pod::Value::Object(obj),
|
||||
)
|
||||
.context("serialize pod")?
|
||||
.0
|
||||
.into_inner())
|
||||
}
|
||||
|
||||
/// Build a BGRx dmabuf `EnumFormat` pod advertising the EGL-importable `modifiers` as a
|
||||
/// mandatory enum Choice; the compositor fixates to one of them that it can allocate, which
|
||||
/// we read back in `param_changed`.
|
||||
fn build_dmabuf_format(modifiers: &[u64]) -> Result<Vec<u8>> {
|
||||
use pw::spa::param::format::{FormatProperties, MediaSubtype, MediaType};
|
||||
let mut obj = pw::spa::pod::object!(
|
||||
pw::spa::utils::SpaTypes::ObjectParamFormat,
|
||||
pw::spa::param::ParamType::EnumFormat,
|
||||
pw::spa::pod::property!(FormatProperties::MediaType, Id, MediaType::Video),
|
||||
pw::spa::pod::property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Raw),
|
||||
pw::spa::pod::property!(FormatProperties::VideoFormat, Id, VideoFormat::BGRx),
|
||||
pw::spa::pod::property!(
|
||||
FormatProperties::VideoSize,
|
||||
Choice,
|
||||
Range,
|
||||
Rectangle,
|
||||
pw::spa::utils::Rectangle {
|
||||
width: 1920,
|
||||
height: 1080
|
||||
},
|
||||
pw::spa::utils::Rectangle {
|
||||
width: 1,
|
||||
height: 1
|
||||
},
|
||||
pw::spa::utils::Rectangle {
|
||||
width: 8192,
|
||||
height: 8192
|
||||
}
|
||||
),
|
||||
pw::spa::pod::property!(
|
||||
FormatProperties::VideoFramerate,
|
||||
Choice,
|
||||
Range,
|
||||
Fraction,
|
||||
pw::spa::utils::Fraction { num: 60, denom: 1 },
|
||||
pw::spa::utils::Fraction { num: 0, denom: 1 },
|
||||
pw::spa::utils::Fraction { num: 240, denom: 1 }
|
||||
),
|
||||
);
|
||||
obj.properties.push(pw::spa::pod::Property {
|
||||
key: pw::spa::sys::SPA_FORMAT_VIDEO_modifier,
|
||||
flags: pw::spa::pod::PropertyFlags::MANDATORY,
|
||||
value: pw::spa::pod::Value::Choice(pw::spa::pod::ChoiceValue::Long(
|
||||
pw::spa::utils::Choice(
|
||||
pw::spa::utils::ChoiceFlags::empty(),
|
||||
pw::spa::utils::ChoiceEnum::Enum {
|
||||
default: modifiers[0] as i64,
|
||||
alternatives: modifiers.iter().map(|&m| m as i64).collect(),
|
||||
},
|
||||
),
|
||||
)),
|
||||
});
|
||||
serialize_pod(obj)
|
||||
}
|
||||
|
||||
/// Build a Buffers param requesting dmabuf-only buffers.
|
||||
fn build_dmabuf_buffers() -> Result<Vec<u8>> {
|
||||
serialize_pod(pw::spa::pod::Object {
|
||||
type_: pw::spa::utils::SpaTypes::ObjectParamBuffers.as_raw(),
|
||||
id: pw::spa::param::ParamType::Buffers.as_raw(),
|
||||
properties: vec![pw::spa::pod::Property {
|
||||
key: pw::spa::sys::SPA_PARAM_BUFFERS_dataType,
|
||||
flags: pw::spa::pod::PropertyFlags::empty(),
|
||||
value: pw::spa::pod::Value::Int(1i32 << pw::spa::sys::SPA_DATA_DmaBuf),
|
||||
}],
|
||||
})
|
||||
}
|
||||
|
||||
pub fn pipewire_thread(
|
||||
@@ -338,6 +423,7 @@ mod pipewire {
|
||||
node_id: u32,
|
||||
tx: SyncSender<CapturedFrame>,
|
||||
active: Arc<AtomicBool>,
|
||||
zerocopy: bool,
|
||||
) -> Result<()> {
|
||||
crate::pwinit::ensure_init();
|
||||
|
||||
@@ -347,11 +433,43 @@ mod pipewire {
|
||||
.connect_fd_rc(fd, None)
|
||||
.context("pw connect_fd (portal remote)")?;
|
||||
|
||||
// Build the EGL→CUDA importer up front; if it fails, log and fall back to the CPU path
|
||||
// (we simply won't request dmabuf below).
|
||||
let importer = if zerocopy {
|
||||
match crate::zerocopy::EglImporter::new() {
|
||||
Ok(i) => Some(i),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %format!("{e:#}"), "zero-copy import unavailable — using CPU path");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Modifiers our EGL stack can import for BGRx (the layout KWin gives); if none, we can't
|
||||
// negotiate dmabuf and fall back to the shm path.
|
||||
let modifiers = importer
|
||||
.as_ref()
|
||||
.map(|i| i.supported_modifiers(crate::zerocopy::drm_fourcc(PixelFormat::Bgrx).unwrap()))
|
||||
.unwrap_or_default();
|
||||
let want_dmabuf = importer.is_some() && !modifiers.is_empty();
|
||||
if zerocopy && !want_dmabuf {
|
||||
tracing::warn!("zero-copy: no EGL-importable dmabuf modifiers — using CPU path");
|
||||
} else if want_dmabuf {
|
||||
tracing::info!(
|
||||
count = modifiers.len(),
|
||||
sample = ?&modifiers[..modifiers.len().min(6)],
|
||||
"zero-copy: advertising EGL-importable dmabuf modifiers"
|
||||
);
|
||||
}
|
||||
|
||||
let data = UserData {
|
||||
info: VideoInfoRaw::default(),
|
||||
format: None,
|
||||
modifier: 0,
|
||||
tx,
|
||||
active,
|
||||
importer,
|
||||
};
|
||||
|
||||
let stream = pw::stream::StreamBox::new(
|
||||
@@ -388,11 +506,13 @@ mod pipewire {
|
||||
if ud.info.parse(param).is_ok() {
|
||||
let sz = ud.info.size();
|
||||
ud.format = map_format(ud.info.format());
|
||||
ud.modifier = ud.info.modifier();
|
||||
tracing::info!(
|
||||
width = sz.width,
|
||||
height = sz.height,
|
||||
spa_format = ?ud.info.format(),
|
||||
mapped = ?ud.format,
|
||||
modifier = ud.modifier,
|
||||
"pipewire format negotiated"
|
||||
);
|
||||
if ud.format.is_none() {
|
||||
@@ -423,6 +543,55 @@ mod pipewire {
|
||||
if w == 0 || h == 0 {
|
||||
return; // format not negotiated yet
|
||||
}
|
||||
|
||||
// Zero-copy path: if the buffer is a dmabuf and we have an importer, import it
|
||||
// into a CUDA device buffer (no CPU touch) and deliver that. Otherwise fall
|
||||
// through to the shm de-pad copy below.
|
||||
if let (Some(importer), Some(fmt)) = (ud.importer.as_ref(), ud.format) {
|
||||
if datas[0].type_() == pw::spa::buffer::DataType::DmaBuf {
|
||||
let plane = crate::zerocopy::DmabufPlane {
|
||||
fd: datas[0].fd(),
|
||||
offset: datas[0].chunk().offset(),
|
||||
stride: datas[0].chunk().stride().max(0) as u32,
|
||||
};
|
||||
// 0 (unset/LINEAR) → import with the implicit modifier; a real tiled
|
||||
// modifier (if the producer reported one) → import it explicitly.
|
||||
let modifier = (ud.modifier != 0).then_some(ud.modifier);
|
||||
if let Some(fourcc) = crate::zerocopy::drm_fourcc(fmt) {
|
||||
match importer.import(&plane, w as u32, h as u32, fourcc, modifier) {
|
||||
Ok(devbuf) => {
|
||||
static ONCE: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if ONCE.swap(false, Ordering::Relaxed) {
|
||||
tracing::info!(w, h, modifier = ud.modifier,
|
||||
"zero-copy: dmabuf imported to CUDA (no CPU copy)");
|
||||
}
|
||||
let pts_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let _ = ud.tx.try_send(CapturedFrame {
|
||||
width: w as u32,
|
||||
height: h as u32,
|
||||
pts_ns,
|
||||
format: fmt,
|
||||
payload: FramePayload::Cuda(devbuf),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
static ONCE: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(true);
|
||||
if ONCE.swap(false, Ordering::Relaxed) {
|
||||
tracing::warn!(error = %format!("{e:#}"),
|
||||
"dmabuf import failed — frames dropped (consider unsetting LUMEN_ZEROCOPY)");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let d = &mut datas[0];
|
||||
let (size, offset, stride) = {
|
||||
let c = d.chunk();
|
||||
@@ -534,14 +703,33 @@ mod pipewire {
|
||||
),
|
||||
);
|
||||
|
||||
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
|
||||
std::io::Cursor::new(Vec::new()),
|
||||
&pw::spa::pod::Value::Object(obj),
|
||||
)
|
||||
.context("serialize format pod")?
|
||||
.0
|
||||
.into_inner();
|
||||
let mut params = [Pod::from_bytes(&values).context("pod from bytes")?];
|
||||
// When zero-copy is on, offer ONLY a BGRx dmabuf format with our EGL-importable modifiers
|
||||
// (offering shm too makes the compositor pick shm). The modifier list is advertised with
|
||||
// DONT_FIXATE so the compositor's allocator chooses one; we re-emit the fixated format in
|
||||
// `param_changed` (the two-step DMA-BUF handshake). Otherwise offer the multi-format shm
|
||||
// pod and let MAP_BUFFERS map it.
|
||||
let shm_values = serialize_pod(obj)?;
|
||||
let (dmabuf_values, buffers_values) = if want_dmabuf {
|
||||
(
|
||||
Some(build_dmabuf_format(&modifiers)?),
|
||||
Some(build_dmabuf_buffers()?),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let mut byte_slices: Vec<&[u8]> = Vec::new();
|
||||
match &dmabuf_values {
|
||||
Some(d) => byte_slices.push(d),
|
||||
None => byte_slices.push(&shm_values),
|
||||
}
|
||||
if let Some(b) = &buffers_values {
|
||||
byte_slices.push(b);
|
||||
}
|
||||
let mut params: Vec<&Pod> = byte_slices
|
||||
.iter()
|
||||
.map(|&b| Pod::from_bytes(b).context("pod from bytes"))
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
stream
|
||||
.connect(
|
||||
|
||||
Reference in New Issue
Block a user