feat: M4 groundwork — lumen/1 client connector in the C ABI + SwiftUI client scaffold
ci / rust (push) Has been cancelled

The shared-core architecture pays off: platform clients now link ONE Rust library that
does the entire lumen/1 protocol, and only add decode/present/input on top.

lumen-core:
- client.rs (quic feature): NativeClient — QUIC handshake + UDP data plane + input
  datagrams on internal threads; embedder surface = connect / next_frame / send_input.
- abi.rs: lumen_connect / lumen_connection_next_au (borrow-until-next-call, matching
  lumen_client_poll_frame semantics) / lumen_connection_send_input / lumen_connection_mode /
  lumen_connection_close. Guarded in the generated header by LUMEN_FEATURE_QUIC (cbindgen
  [defines] mapping), so the checked-in header is stable across feature sets.
- error.rs: append-only LumenStatus additions Timeout (-9) and Closed (-10).
- TESTED end-to-end through the C ABI: in-process lumen/1 host, lumen_connect pulls 25
  byte-verified frames, sends input, closes (m3.rs::c_abi_connection_roundtrip).

Apple client (clients/apple — SCAFFOLD, written on Linux, first Xcode build pending):
- scripts/build-xcframework.sh: cargo per Apple target → universal staticlib + header
  (LUMEN_FEATURE_QUIC pre-defined) + modulemap → LumenCore.xcframework.
- Package.swift (LumenKit) + Swift sources: LumenConnection (ABI wrapper), AnnexB
  (in-band VPS/SPS/PPS → CMVideoFormatDescription, Annex-B → AVCC CMSampleBuffers with
  DisplayImmediately), StreamView (SwiftUI over AVSampleBufferDisplayLayer — stage-1
  presenter that hardware-decodes compressed HEVC itself), InputCapture (GCMouse raw
  deltas + GCKeyboard HID→VK).
- README.md is the full handoff for the next (Mac-side) agent: build steps, ABI contract,
  first-light test recipe against the Linux host, stage-2 (VT+Metal pacing) plan, and the
  known host-side gaps (single-session m3-host, no lumen/1 audio yet, gamepad kinds not
  yet routed in m3's injector, seed-stage trust).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 07:28:41 +00:00
parent 2b4ffc3518
commit 3ea096ace9
17 changed files with 1147 additions and 26 deletions
+173
View File
@@ -441,3 +441,176 @@ pub unsafe extern "C" fn lumen_get_stats(
LumenStatus::Ok
})
}
// ---------------------------------------------------------------------------------------------
// lumen/1 connection API (`quic` feature) — the embeddable client connector platform clients
// link (SwiftUI/VideoToolbox, Android, …). In the generated header these are guarded by
// `LUMEN_FEATURE_QUIC`; define it when linking a lumen-core built with `--features quic`.
// ---------------------------------------------------------------------------------------------
/// Opaque handle to a live `lumen/1` connection (QUIC control plane + UDP data plane, all
/// pumped on internal threads).
#[cfg(feature = "quic")]
pub struct LumenConnection {
inner: crate::client::NativeClient,
/// Backs the pointer returned by the last `lumen_connection_next_au` (borrow-until-next-call).
last: Option<crate::session::Frame>,
}
/// Connect to a `lumen/1` host and start a session at `width`x`height`@`refresh_hz`.
/// Blocks up to `timeout_ms` for the handshake. Returns NULL on failure.
///
/// # Safety
/// `host` is a NUL-terminated UTF-8 string (IP or hostname resolvable by the platform).
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn lumen_connect(
host: *const std::os::raw::c_char,
port: u16,
width: u32,
height: u32,
refresh_hz: u32,
timeout_ms: u32,
) -> *mut LumenConnection {
let r = std::panic::catch_unwind(AssertUnwindSafe(|| {
if host.is_null() {
return std::ptr::null_mut();
}
let host = match unsafe { std::ffi::CStr::from_ptr(host) }.to_str() {
Ok(s) => s,
Err(_) => return std::ptr::null_mut(),
};
let mode = crate::config::Mode {
width,
height,
refresh_hz,
};
match crate::client::NativeClient::connect(
host,
port,
mode,
std::time::Duration::from_millis(timeout_ms as u64),
) {
Ok(c) => Box::into_raw(Box::new(LumenConnection {
inner: c,
last: None,
})),
Err(_) => std::ptr::null_mut(),
}
}));
r.unwrap_or(std::ptr::null_mut())
}
/// Pull the next reassembled access unit, waiting up to `timeout_ms`. Returns
/// [`LumenStatus::NoFrame`] on timeout and [`LumenStatus::Closed`] once the session ended.
/// On `Ok`, `*out` borrows connection memory **until the next call** on this handle.
///
/// # Safety
/// `c` is a valid connection handle used from a single thread; `out` is writable.
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn lumen_connection_next_au(
c: *mut LumenConnection,
out: *mut LumenFrame,
timeout_ms: u32,
) -> LumenStatus {
guard(|| {
let c = match unsafe { c.as_mut() } {
Some(c) => c,
None => return LumenStatus::NullPointer,
};
if out.is_null() {
return LumenStatus::NullPointer;
}
match c
.inner
.next_frame(std::time::Duration::from_millis(timeout_ms as u64))
{
Ok(frame) => {
c.last = Some(frame);
let f = c.last.as_ref().unwrap();
unsafe {
*out = LumenFrame {
data: f.data.as_ptr(),
len: f.data.len(),
frame_index: f.frame_index,
pts_ns: f.pts_ns,
flags: f.flags,
};
}
LumenStatus::Ok
}
Err(e) => e.status(),
}
})
}
/// Send one input event to the host as a QUIC datagram (non-blocking enqueue).
///
/// # Safety
/// `c` is a valid connection handle; `ev` points to a valid [`InputEvent`].
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn lumen_connection_send_input(
c: *mut LumenConnection,
ev: *const InputEvent,
) -> LumenStatus {
guard(|| {
let c = match unsafe { c.as_ref() } {
Some(c) => c,
None => return LumenStatus::NullPointer,
};
let ev = match unsafe { ev.as_ref() } {
Some(e) => e,
None => return LumenStatus::NullPointer,
};
match c.inner.send_input(ev) {
Ok(()) => LumenStatus::Ok,
Err(e) => e.status(),
}
})
}
/// The host-confirmed session mode (from the Welcome). Safe any time after connect.
///
/// # Safety
/// `c` is a valid connection handle; out pointers are writable (NULLs are skipped).
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn lumen_connection_mode(
c: *const LumenConnection,
width: *mut u32,
height: *mut u32,
refresh_hz: *mut u32,
) -> LumenStatus {
guard(|| {
let c = match unsafe { c.as_ref() } {
Some(c) => c,
None => return LumenStatus::NullPointer,
};
unsafe {
if !width.is_null() {
*width = c.inner.mode.width;
}
if !height.is_null() {
*height = c.inner.mode.height;
}
if !refresh_hz.is_null() {
*refresh_hz = c.inner.mode.refresh_hz;
}
}
LumenStatus::Ok
})
}
/// Close the connection and free the handle (joins the internal threads). NULL is a no-op.
///
/// # Safety
/// `c` was returned by [`lumen_connect`] and is not used after this call.
#[cfg(feature = "quic")]
#[no_mangle]
pub unsafe extern "C" fn lumen_connection_close(c: *mut LumenConnection) {
if !c.is_null() {
drop(unsafe { Box::from_raw(c) });
}
}
+215
View File
@@ -0,0 +1,215 @@
//! The embeddable `lumen/1` client connector (M4 groundwork), behind the `quic` feature.
//!
//! [`NativeClient::connect`] runs the full client side of the protocol — QUIC handshake
//! ([`crate::quic`]), UDP data plane ([`crate::session::Session`] on a native thread), input
//! datagrams — and hands the embedder a dead-simple surface: *pull reassembled access units,
//! push input events*. This is what the platform clients (SwiftUI/VideoToolbox, Android, …)
//! link via the C ABI (`lumen_connect` & co. in [`crate::abi`]); `lumen-client-rs` is the
//! Rust-native consumer of the same flow.
//!
//! Threading: one worker thread owns a tokio runtime (QUIC control plane only — design
//! invariant) plus a blocking data-plane pump; frames cross to the embedder over a bounded
//! channel. All methods are safe to call from any single embedder thread.
use crate::config::{Mode, Role};
use crate::error::{LumenError, Result};
use crate::input::InputEvent;
use crate::quic::{endpoint, io, Hello, Start, Welcome};
use crate::session::{Frame, Session};
use crate::transport::UdpTransport;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
use std::sync::Arc;
use std::time::Duration;
/// Frames buffered between the data-plane pump and the embedder. Small: the embedder
/// (decoder) should drain at frame rate; when it falls behind, the newest frame is dropped
/// (display freshness over completeness — FEC/keyframes recover).
const FRAME_QUEUE: usize = 16;
pub struct NativeClient {
frames: Receiver<Frame>,
input_tx: tokio::sync::mpsc::UnboundedSender<InputEvent>,
shutdown: Arc<AtomicBool>,
worker: Option<std::thread::JoinHandle<()>>,
/// The host-confirmed session mode (from the Welcome).
pub mode: Mode,
}
impl NativeClient {
/// Connect to a `lumen/1` host and start the session at (up to) `mode`. Blocks until the
/// handshake completes or `timeout` elapses.
pub fn connect(host: &str, port: u16, mode: Mode, timeout: Duration) -> Result<NativeClient> {
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<Frame>(FRAME_QUEUE);
let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::<InputEvent>();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<Result<Mode>>();
let shutdown = Arc::new(AtomicBool::new(false));
let host = host.to_string();
let shutdown_w = shutdown.clone();
let worker = std::thread::Builder::new()
.name("lumen-client".into())
.spawn(move || {
let rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
let _ = ready_tx.send(Err(LumenError::Io(e)));
return;
}
};
rt.block_on(worker_main(
host, port, mode, frame_tx, input_rx, ready_tx, shutdown_w,
));
})
.map_err(LumenError::Io)?;
let negotiated = match ready_rx.recv_timeout(timeout) {
Ok(Ok(m)) => m,
Ok(Err(e)) => return Err(e),
Err(_) => {
shutdown.store(true, Ordering::SeqCst);
return Err(LumenError::Timeout);
}
};
Ok(NativeClient {
frames: frame_rx,
input_tx,
shutdown,
worker: Some(worker),
mode: negotiated,
})
}
/// Pull the next reassembled, FEC-recovered access unit; [`LumenError::NoFrame`] on
/// timeout, [`LumenError::Closed`]-class errors once the session ended.
pub fn next_frame(&mut self, timeout: Duration) -> Result<Frame> {
match self.frames.recv_timeout(timeout) {
Ok(f) => Ok(f),
Err(RecvTimeoutError::Timeout) => Err(LumenError::NoFrame),
Err(RecvTimeoutError::Disconnected) => Err(LumenError::Closed),
}
}
/// Queue one input event for delivery as a QUIC datagram.
pub fn send_input(&self, ev: &InputEvent) -> Result<()> {
self.input_tx.send(*ev).map_err(|_| LumenError::Closed)
}
}
impl Drop for NativeClient {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(w) = self.worker.take() {
let _ = w.join();
}
}
}
/// The worker: QUIC handshake, then the input task + the blocking data-plane pump.
async fn worker_main(
host: String,
port: u16,
mode: Mode,
frame_tx: SyncSender<Frame>,
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<InputEvent>,
ready_tx: std::sync::mpsc::Sender<Result<Mode>>,
shutdown: Arc<AtomicBool>,
) {
let setup = async {
let remote: std::net::SocketAddr = format!("{host}:{port}")
.parse()
.map_err(|_| LumenError::InvalidArg("host:port"))?;
let ep = endpoint::client_insecure()
.map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?;
let conn = ep
.connect(remote, "lumen")
.map_err(|_| LumenError::InvalidArg("connect"))?
.await
.map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?;
let (mut send, mut recv) = conn
.open_bi()
.await
.map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?;
io::write_msg(
&mut send,
&Hello {
abi_version: crate::ABI_VERSION,
mode,
}
.encode(),
)
.await?;
let welcome = Welcome::decode(&io::read_msg(&mut recv).await?)?;
// Reserve our data-plane port, then start the host.
let probe = std::net::UdpSocket::bind("0.0.0.0:0")?;
let udp_port = probe.local_addr()?.port();
drop(probe);
io::write_msg(
&mut send,
&Start {
client_udp_port: udp_port,
}
.encode(),
)
.await?;
let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port);
let transport =
UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())?;
let session = Session::new(welcome.session_config(Role::Client), Box::new(transport))?;
Ok::<_, LumenError>((conn, session, welcome.mode))
};
let (conn, mut session, negotiated) = match setup.await {
Ok(t) => t,
Err(e) => {
let _ = ready_tx.send(Err(e));
return;
}
};
let _ = ready_tx.send(Ok(negotiated));
// Input task: embedder events → QUIC datagrams.
let input_conn = conn.clone();
tokio::spawn(async move {
while let Some(ev) = input_rx.recv().await {
let _ = input_conn.send_datagram(ev.encode().to_vec().into());
}
});
// Watch for connection close → stop the pump.
{
let shutdown = shutdown.clone();
let conn = conn.clone();
tokio::spawn(async move {
conn.closed().await;
shutdown.store(true, Ordering::SeqCst);
});
}
// Data-plane pump on a blocking thread: poll the session, hand frames to the embedder.
// try_send drops the newest frame when the embedder lags (freshness over completeness).
let pump_shutdown = shutdown.clone();
let _ = tokio::task::spawn_blocking(move || {
while !pump_shutdown.load(Ordering::SeqCst) {
match session.poll_frame() {
Ok(frame) => {
let _ = frame_tx.try_send(frame);
}
Err(LumenError::NoFrame) => {
std::thread::sleep(Duration::from_micros(300));
}
Err(_) => break,
}
}
})
.await;
conn.close(0u32.into(), b"client closed");
}
+8
View File
@@ -19,6 +19,10 @@ pub enum LumenError {
Unsupported(&'static str),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("timed out")]
Timeout,
#[error("session closed")]
Closed,
}
pub type Result<T> = core::result::Result<T, LumenError>;
@@ -37,6 +41,8 @@ pub enum LumenStatus {
Unsupported = -6,
Io = -7,
NullPointer = -8,
Timeout = -9,
Closed = -10,
Panic = -99,
}
@@ -51,6 +57,8 @@ impl LumenError {
LumenError::NoFrame => LumenStatus::NoFrame,
LumenError::Unsupported(_) => LumenStatus::Unsupported,
LumenError::Io(_) => LumenStatus::Io,
LumenError::Timeout => LumenStatus::Timeout,
LumenError::Closed => LumenStatus::Closed,
}
}
}
+2
View File
@@ -25,6 +25,8 @@
#![forbid(unsafe_op_in_unsafe_fn)]
pub mod abi;
#[cfg(feature = "quic")]
pub mod client;
pub mod config;
pub mod crypto;
pub mod error;