From 5b0d84acd0225b493dd0426622bf4db9eb55dca7 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Wed, 10 Jun 2026 06:56:47 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20M3=20=E2=80=94=20lumen/1=20native=20str?= =?UTF-8?q?eaming:=20real=20video=20at=20client=20mode=20+=20input=20over?= =?UTF-8?q?=20QUIC=20datagrams?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The native protocol now does the real thing, end to end: - Hello carries the client's requested mode; the host creates a NATIVE virtual output at exactly that size/refresh (same vdisplay backends as the GameStream path) and streams NVENC HEVC through the M1 Session (GF(2^16) Leopard FEC + AES-GCM, QUIC-negotiated). - Input rides QUIC DATAGRAMS — encrypted, congestion-managed, no ENet retransmission spikes — decoded into lumen_core InputEvents and fed to the session's input injector. - Frames are stamped with the capture wall clock; the reference client computes per-frame capture→reassembled latency percentiles and writes a playable .h265. - m3-host gains --source synthetic|virtual + --seconds; the client gains --mode WxHxFPS, --out, --input-test (scripted mouse/keyboard datagrams). VALIDATED live (gamescope session, xev nested): client requested 1280x720@120 → host created gamescope at that mode → 1680/1680 frames over 14s, zero loss, valid HEVC; pipeline latency p50 0.83ms / p95 1.2ms / p99 1.3ms (capture→encode→FEC→crypto→UDP→ reassembled, same-host clock); 176 input datagrams sent → injector (GamescopeEi) → 164 X events observed inside the nested session. Known follow-on: slice-level sub-frame pipelining needs the NVENC SDK directly (libavcodec emits whole AUs only) — the next big latency lever. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/lumen-client-rs/src/main.rs | 229 ++++++++++++++++++++++++----- crates/lumen-core/src/quic.rs | 28 +++- crates/lumen-host/src/m3.rs | 225 ++++++++++++++++++++++++---- crates/lumen-host/src/main.rs | 31 ++-- 4 files changed, 429 insertions(+), 84 deletions(-) diff --git a/crates/lumen-client-rs/src/main.rs b/crates/lumen-client-rs/src/main.rs index 0bdc83b..eaed65a 100644 --- a/crates/lumen-client-rs/src/main.rs +++ b/crates/lumen-client-rs/src/main.rs @@ -1,15 +1,68 @@ -//! `lumen-client-rs` — the reference client. M3 seed mode: speak `lumen/1` (QUIC control -//! plane) to a lumen host, bring up the client `lumen_core::Session` over UDP, reassemble + -//! FEC-recover the host's deterministic test frames, and verify them byte-exactly. (M4 adds -//! VAAPI decode + wgpu present on this same skeleton.) +//! `lumen-client-rs` — the reference client for `lumen/1` (M3): QUIC control plane, UDP data +//! plane, input over QUIC datagrams. Two modes, decided by the host's Welcome: //! -//! Usage: `lumen-client-rs [--connect HOST:PORT]` (default `127.0.0.1:9777`). +//! * **verification** (`frames > 0`, synthetic host): byte-checks deterministic test frames; +//! * **stream** (`frames == 0`, virtual host): receives real NVENC AUs, writes a playable +//! `.h265`, and reports per-frame **capture→…→reassembled latency** percentiles (the host +//! stamps each frame with its capture wall clock; same-host runs share that clock). +//! +//! `--input-test` exercises the input plane: scripted mouse/keyboard datagrams during the +//! stream (watch them land in the host session, e.g. xev inside gamescope). +//! +//! Usage: `lumen-client-rs [--connect HOST:PORT] [--mode WxHxFPS] [--out FILE] [--input-test]` +//! (M4 adds VAAPI decode + wgpu present on this same skeleton.) use anyhow::{anyhow, Context, Result}; use lumen_core::config::Role; +use lumen_core::input::{InputEvent, InputKind}; use lumen_core::quic::{endpoint, io, Hello, Start, Welcome}; use lumen_core::transport::UdpTransport; -use lumen_core::{LumenError, Session}; +use lumen_core::{LumenError, Mode, Session}; +use std::io::Write; + +struct Args { + connect: String, + mode: Mode, + out: Option, + input_test: bool, +} + +fn parse_args() -> Args { + let argv: Vec = std::env::args().collect(); + let get = |flag: &str| { + argv.iter() + .skip_while(|a| *a != flag) + .nth(1) + .map(String::as_str) + }; + let mode = get("--mode") + .and_then(|m| { + let mut it = m.split('x'); + Some(Mode { + width: it.next()?.parse().ok()?, + height: it.next()?.parse().ok()?, + refresh_hz: it.next()?.parse().ok()?, + }) + }) + .unwrap_or(Mode { + width: 1280, + height: 720, + refresh_hz: 60, + }); + Args { + connect: get("--connect").unwrap_or("127.0.0.1:9777").to_string(), + mode, + out: get("--out").map(String::from), + input_test: argv.iter().any(|a| a == "--input-test"), + } +} + +fn now_ns() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) +} fn main() { tracing_subscriber::fmt() @@ -17,26 +70,23 @@ fn main() { tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); - let addr = std::env::args() - .skip_while(|a| a != "--connect") - .nth(1) - .unwrap_or_else(|| "127.0.0.1:9777".into()); - if let Err(e) = run(&addr) { + let args = parse_args(); + if let Err(e) = run(args) { tracing::error!("{e:#}"); std::process::exit(1); } } -fn run(addr: &str) -> Result<()> { +fn run(args: Args) -> Result<()> { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build()?; - rt.block_on(session(addr)) + rt.block_on(session(args)) } -async fn session(addr: &str) -> Result<()> { - let remote: std::net::SocketAddr = addr.parse().context("--connect host:port")?; +async fn session(args: Args) -> Result<()> { + let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?; let ep = endpoint::client_insecure().map_err(|e| anyhow!("QUIC client endpoint: {e}"))?; let conn = ep .connect(remote, "lumen") @@ -50,6 +100,7 @@ async fn session(addr: &str) -> Result<()> { &mut send, &Hello { abi_version: lumen_core::ABI_VERSION, + mode: args.mode, } .encode(), ) @@ -77,53 +128,161 @@ async fn session(addr: &str) -> Result<()> { ) .await?; + // Input plane: scripted events as QUIC datagrams (mouse square + 'A' taps), proving the + // low-latency input path without a real input device. + if args.input_test { + let conn2 = conn.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tracing::info!("input-test: sending scripted datagrams for ~6s"); + for i in 0..160u32 { + let (dx, dy) = match (i / 10) % 4 { + 0 => (12, 0), + 1 => (0, 12), + 2 => (-12, 0), + _ => (0, -12), + }; + let mv = InputEvent { + kind: InputKind::MouseMove, + _pad: [0; 3], + code: 0, + x: dx, + y: dy, + flags: 0, + }; + let _ = conn2.send_datagram(mv.encode().to_vec().into()); + if i % 20 == 0 { + for kind in [InputKind::KeyDown, InputKind::KeyUp] { + let key = InputEvent { + kind, + _pad: [0; 3], + code: 0x41, // VK 'A' + x: 0, + y: 0, + flags: 0, + }; + let _ = conn2.send_datagram(key.encode().to_vec().into()); + } + } + tokio::time::sleep(std::time::Duration::from_millis(40)).await; + } + tracing::info!("input-test: done"); + }); + } + + // Closed-flag for the blocking receive loop. + let closed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + { + let closed = closed.clone(); + let conn2 = conn.clone(); + tokio::spawn(async move { + conn2.closed().await; + closed.store(true, std::sync::atomic::Ordering::SeqCst); + }); + } + let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port); let cfg = welcome.session_config(Role::Client); let expected = welcome.frames; + let out_path = args.out.clone(); // Data plane on a blocking thread (native threads only on the frame path). - let verified = tokio::task::spawn_blocking(move || -> Result { + tokio::task::spawn_blocking(move || -> Result<()> { let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string()) .context("bind data plane")?; let mut session = Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("client session: {e:?}"))?; + let mut sink = match &out_path { + Some(p) => Some(std::io::BufWriter::new( + std::fs::File::create(p).with_context(|| format!("create {p}"))?, + )), + None => None, + }; + let mut ok = 0u32; let mut mismatched = 0u32; - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + let mut bytes = 0u64; + let mut latencies_us: Vec = Vec::new(); let mut last_rx = std::time::Instant::now(); - while ok + mismatched < expected && std::time::Instant::now() < deadline { + let started = std::time::Instant::now(); + loop { + if expected > 0 && ok + mismatched >= expected { + break; + } + if closed.load(std::sync::atomic::Ordering::SeqCst) + && last_rx.elapsed() > std::time::Duration::from_millis(300) + { + break; + } + if started.elapsed() > std::time::Duration::from_secs(120) + || last_rx.elapsed() > std::time::Duration::from_secs(8) + { + break; + } match session.poll_frame() { Ok(frame) => { last_rx = std::time::Instant::now(); - let idx = u32::from_le_bytes(frame.data[0..4].try_into().unwrap()); - if frame.data == test_frame(idx, frame.data.len()) { - ok += 1; + bytes += frame.data.len() as u64; + // The host stamps pts with its capture wall clock; same-host runs share it. + let lat = now_ns().saturating_sub(frame.pts_ns); + if lat > 0 && lat < 10_000_000_000 { + latencies_us.push(lat / 1000); + } + if expected > 0 { + // Verification mode: deterministic content. + let idx = u32::from_le_bytes(frame.data[0..4].try_into().unwrap()); + if frame.data == test_frame(idx, frame.data.len()) { + ok += 1; + } else { + mismatched += 1; + } } else { - mismatched += 1; - tracing::warn!(idx, "frame content mismatch"); + ok += 1; + if let Some(s) = sink.as_mut() { + s.write_all(&frame.data).context("write AU")?; + } } } Err(LumenError::NoFrame) => { - if last_rx.elapsed() > std::time::Duration::from_secs(5) { - break; // stream went quiet - } - std::thread::sleep(std::time::Duration::from_micros(500)); + std::thread::sleep(std::time::Duration::from_micros(300)); } Err(e) => return Err(anyhow!("poll_frame: {e:?}")), } } - tracing::info!(ok, mismatched, expected, "verification complete"); - anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames"); - anyhow::ensure!(ok == expected, "received {ok}/{expected} frames"); - Ok(ok) + if let Some(mut s) = sink { + s.flush().ok(); + } + + latencies_us.sort_unstable(); + let pct = |p: f64| -> u64 { + if latencies_us.is_empty() { + return 0; + } + let i = ((latencies_us.len() as f64 * p) as usize).min(latencies_us.len() - 1); + latencies_us[i] + }; + tracing::info!( + frames = ok, + mismatched, + mb = bytes / 1_000_000, + lat_p50_us = pct(0.50), + lat_p95_us = pct(0.95), + lat_p99_us = pct(0.99), + lat_max_us = latencies_us.last().copied().unwrap_or(0), + "lumen/1 stream complete (capture→reassembled latency, same-host clock)" + ); + if expected > 0 { + anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames"); + anyhow::ensure!(ok == expected, "received {ok}/{expected} frames"); + tracing::info!("verification PASSED"); + } else { + anyhow::ensure!(ok > 0, "no frames received"); + } + Ok(()) }) .await??; - tracing::info!( - verified, - "lumen/1 session PASSED — GF(2^16) FEC + AES-GCM over real UDP, QUIC-negotiated" - ); conn.close(0u32.into(), b"done"); Ok(()) } diff --git a/crates/lumen-core/src/quic.rs b/crates/lumen-core/src/quic.rs index 53d10a3..dde35f1 100644 --- a/crates/lumen-core/src/quic.rs +++ b/crates/lumen-core/src/quic.rs @@ -26,10 +26,12 @@ use crate::error::{LumenError, Result}; /// Protocol magic + version, first bytes of every message payload. pub const MAGIC: &[u8; 4] = b"LMN1"; -/// `client → host`: open the session. +/// `client → host`: open the session, requesting a display mode (the host creates its +/// virtual output at exactly this size/refresh — native resolution end to end). #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct Hello { pub abi_version: u32, + pub mode: Mode, } /// `host → client`: the complete session offer. @@ -56,18 +58,27 @@ pub struct Start { impl Hello { pub fn encode(&self) -> Vec { - let mut b = Vec::with_capacity(8); + let mut b = Vec::with_capacity(20); b.extend_from_slice(MAGIC); b.extend_from_slice(&self.abi_version.to_le_bytes()); + b.extend_from_slice(&self.mode.width.to_le_bytes()); + b.extend_from_slice(&self.mode.height.to_le_bytes()); + b.extend_from_slice(&self.mode.refresh_hz.to_le_bytes()); b } pub fn decode(b: &[u8]) -> Result { - if b.len() < 8 || &b[0..4] != MAGIC { + if b.len() < 20 || &b[0..4] != MAGIC { return Err(LumenError::InvalidArg("bad Hello")); } + let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]); Ok(Hello { - abi_version: u32::from_le_bytes([b[4], b[5], b[6], b[7]]), + abi_version: u32at(4), + mode: Mode { + width: u32at(8), + height: u32at(12), + refresh_hz: u32at(16), + }, }) } } @@ -324,7 +335,14 @@ mod tests { #[test] fn hello_start_roundtrip() { - let h = Hello { abi_version: 1 }; + let h = Hello { + abi_version: 1, + mode: Mode { + width: 1280, + height: 720, + refresh_hz: 120, + }, + }; assert_eq!(Hello::decode(&h.encode()).unwrap(), h); let s = Start { client_udp_port: 1234, diff --git a/crates/lumen-host/src/m3.rs b/crates/lumen-host/src/m3.rs index 858bc4a..cc8d567 100644 --- a/crates/lumen-host/src/m3.rs +++ b/crates/lumen-host/src/m3.rs @@ -1,20 +1,45 @@ -//! M3 seed — the `lumen/1` native host: QUIC control plane (lumen-core `quic`) + the hardened -//! M1 data plane over real UDP. This is the first end-to-end run of lumen's own protocol, -//! past the GameStream compatibility layer: the Welcome negotiates **GF(2¹⁶) Leopard FEC** -//! (positively not expressible in GameStream) and AES-GCM with per-direction salts. +//! M3 — the `lumen/1` native host: QUIC control plane + the hardened M1 data plane over UDP. +//! This is lumen's own protocol, past the GameStream compatibility layer: //! -//! `lumen-host m3-host [--port 9777] [--frames 300]` serves one session: handshake on QUIC, -//! then a native thread streams deterministic, verifiable test frames through -//! `lumen_core::Session` → `UdpTransport`. `lumen-client-rs --connect host:9777` is the -//! counterpart (reassembles, FEC-recovers, verifies content). +//! * the Welcome negotiates **GF(2¹⁶) Leopard FEC** (inexpressible in GameStream) + AES-GCM; +//! * the client's Hello requests a display mode and the host creates a **native virtual +//! output** at exactly that size/refresh (same vdisplay backends as the GameStream path); +//! * **input arrives as QUIC datagrams** — encrypted, congestion-managed, no ENet +//! retransmission spikes — and feeds the session's input injector; +//! * video frames carry a wall-clock `pts_ns`, so a same-host client measures the full +//! capture→encode→FEC→UDP→reassemble latency per frame. +//! +//! `lumen-host m3-host [--port 9777] [--source synthetic|virtual] [--seconds 30] +//! [--frames 300]` serves one session; `lumen-client-rs --connect host:9777` is the +//! counterpart. The data plane runs on native threads (no async on the frame path). use anyhow::{anyhow, Context, Result}; -use lumen_core::config::{FecConfig, FecScheme, Mode, Role}; +use lumen_core::config::{FecConfig, FecScheme, Role}; +use lumen_core::input::InputEvent; use lumen_core::packet::{FLAG_PIC, FLAG_SOF}; use lumen_core::quic::{endpoint, io, Hello, Start, Welcome}; use lumen_core::transport::UdpTransport; use lumen_core::Session; use rand::RngCore; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum M3Source { + /// Deterministic test frames (protocol verification; the client byte-checks them). + Synthetic, + /// Real capture: virtual display at the client's requested mode → NVENC. + Virtual, +} + +pub struct M3Options { + pub port: u16, + pub source: M3Source, + /// Virtual-source stream duration. + pub seconds: u32, + /// Synthetic-source frame count. + pub frames: u32, +} /// Deterministic test frame: `u32 LE index` then `data[i] = idx + i` (wrapping). pub fn test_frame(idx: u32, len: usize) -> Vec { @@ -26,19 +51,26 @@ pub fn test_frame(idx: u32, len: usize) -> Vec { d } -pub fn run(port: u16, frames: u32) -> Result<()> { +fn now_ns() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) +} + +pub fn run(opts: M3Options) -> Result<()> { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() .context("tokio runtime")?; - rt.block_on(serve_one(port, frames)) + rt.block_on(serve_one(opts)) } -async fn serve_one(port: u16, frames: u32) -> Result<()> { - let ep = endpoint::server(([0, 0, 0, 0], port).into()) +async fn serve_one(opts: M3Options) -> Result<()> { + let ep = endpoint::server(([0, 0, 0, 0], opts.port).into()) .map_err(|e| anyhow!("QUIC server endpoint: {e}"))?; - tracing::info!(port, "lumen/1 host listening (QUIC)"); + tracing::info!(port = opts.port, source = ?opts.source, "lumen/1 host listening (QUIC)"); let incoming = ep .accept() @@ -57,6 +89,12 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> { hello.abi_version, lumen_core::ABI_VERSION ); + crate::encode::validate_dimensions( + crate::encode::Codec::H265, + hello.mode.width, + hello.mode.height, + ) + .context("client-requested mode")?; // Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport). let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; @@ -68,11 +106,7 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> { let welcome = Welcome { abi_version: lumen_core::ABI_VERSION, udp_port, - mode: Mode { - width: 1920, - height: 1080, - refresh_hz: 60, - }, + mode: hello.mode, // The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption. fec: FecConfig { scheme: FecScheme::Gf16, @@ -83,33 +117,66 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> { encrypt: true, key, salt: *b"lmn1", - frames, + frames: match opts.source { + M3Source::Synthetic => opts.frames, + M3Source::Virtual => 0, // unbounded — client streams until we close + }, }; io::write_msg(&mut send, &welcome.encode()).await?; let start = Start::decode(&io::read_msg(&mut recv).await?) .map_err(|e| anyhow!("Start decode: {e:?}"))?; let client_udp = std::net::SocketAddr::new(peer.ip(), start.client_udp_port); - tracing::info!(%client_udp, udp_port, "handshake complete — streaming"); + tracing::info!(%client_udp, udp_port, mode = ?hello.mode, "handshake complete — streaming"); + + // Input plane: QUIC datagrams → channel → a native injector thread (the injector owns + // non-Send compositor state, so it lives on its own thread). + let (input_tx, input_rx) = std::sync::mpsc::channel::(); + std::thread::Builder::new() + .name("lumen-m3-input".into()) + .spawn(move || input_thread(input_rx)) + .context("spawn input thread")?; + let input_conn = conn.clone(); + tokio::spawn(async move { + let mut count = 0u64; + while let Ok(d) = input_conn.read_datagram().await { + if let Some(ev) = InputEvent::decode(&d) { + count += 1; + if input_tx.send(ev).is_err() { + break; + } + } + } + tracing::info!(count, "input datagram stream ended"); + }); + + // Stop signal: stream duration elapsed or the client went away. + let stop = Arc::new(AtomicBool::new(false)); + { + let stop = stop.clone(); + let conn = conn.clone(); + tokio::spawn(async move { + conn.closed().await; + stop.store(true, Ordering::SeqCst); + }); + } // Data plane on a native thread (no async on the hot path — design invariant). let cfg = welcome.session_config(Role::Host); + let source = opts.source; + let (seconds, frames) = (opts.seconds, opts.frames); + let mode = hello.mode; + let stop_stream = stop.clone(); tokio::task::spawn_blocking(move || -> Result<()> { let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string()) .context("bind data plane")?; let mut session = Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("host session: {e:?}"))?; - let interval = std::time::Duration::from_millis(1000 / 60); - for idx in 0..frames { - let data = test_frame(idx, 64 * 1024); - session - .submit_frame(&data, idx as u64 * 16_666_667, (FLAG_PIC | FLAG_SOF) as u32) - .map_err(|e| anyhow!("submit_frame: {e:?}"))?; - std::thread::sleep(interval); + match source { + M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream), + M3Source::Virtual => virtual_stream(&mut session, mode, seconds, &stop_stream), } - tracing::info!(frames, "all frames sent"); - Ok(()) }) .await .context("stream thread")??; @@ -120,3 +187,101 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> { ep.wait_idle().await; Ok(()) } + +/// The injector thread: open the session's input backend on first event, then inject. +fn input_thread(rx: std::sync::mpsc::Receiver) { + let mut injector: Option> = None; + while let Ok(ev) = rx.recv() { + if injector.is_none() { + let backend = crate::inject::default_backend(); + match crate::inject::open(backend) { + Ok(i) => { + tracing::info!(?backend, "lumen/1 input injector opened"); + injector = Some(i); + } + Err(e) => { + tracing::error!(error = %format!("{e:#}"), "input injection unavailable"); + return; + } + } + } + if let Err(e) = injector.as_mut().unwrap().inject(&ev) { + tracing::warn!(error = %format!("{e:#}"), "inject failed"); + } + } +} + +fn synthetic_stream(session: &mut Session, frames: u32, stop: &AtomicBool) -> Result<()> { + let interval = std::time::Duration::from_millis(1000 / 60); + for idx in 0..frames { + if stop.load(Ordering::SeqCst) { + break; + } + let data = test_frame(idx, 64 * 1024); + session + .submit_frame(&data, now_ns(), (FLAG_PIC | FLAG_SOF) as u32) + .map_err(|e| anyhow!("submit_frame: {e:?}"))?; + std::thread::sleep(interval); + } + tracing::info!(frames, "synthetic stream complete"); + Ok(()) +} + +/// Real capture→encode→lumen/1: a native virtual output at the client's mode, NVENC AUs +/// stamped with the capture wall clock (the client derives per-frame pipeline latency). +fn virtual_stream( + session: &mut Session, + mode: lumen_core::Mode, + seconds: u32, + stop: &AtomicBool, +) -> Result<()> { + let compositor = crate::vdisplay::detect().context("detect compositor")?; + tracing::info!(?compositor, ?mode, "lumen/1 virtual display"); + let mut vd = crate::vdisplay::open(compositor)?; + let vout = vd.create(mode).context("create virtual output")?; + let mut capturer = + crate::capture::capture_virtual_output(vout).context("capture virtual output")?; + capturer.set_active(true); + + let mut frame = capturer.next_frame().context("first frame")?; + let mut enc = crate::encode::open_video( + crate::encode::Codec::H265, + frame.format, + frame.width, + frame.height, + mode.refresh_hz, + 20_000_000, + frame.is_cuda(), + ) + .context("open NVENC")?; + + let interval = std::time::Duration::from_secs_f64(1.0 / mode.refresh_hz.max(1) as f64); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); + let mut next = std::time::Instant::now(); + let mut sent: u64 = 0; + while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { + if let Some(f) = capturer.try_latest().context("capture")? { + frame = f; + } + let capture_ns = now_ns(); + enc.submit(&frame).context("encoder submit")?; + while let Some(au) = enc.poll().context("encoder poll")? { + let flags = if au.keyframe { + (FLAG_PIC | FLAG_SOF) as u32 + } else { + FLAG_PIC as u32 + }; + session + .submit_frame(&au.data, capture_ns, flags) + .map_err(|e| anyhow!("submit_frame: {e:?}"))?; + sent += 1; + } + next += interval; + match next.checked_duration_since(std::time::Instant::now()) { + Some(d) => std::thread::sleep(d), + None => next = std::time::Instant::now(), + } + } + tracing::info!(sent, "lumen/1 virtual stream complete"); + Ok(()) +} diff --git a/crates/lumen-host/src/main.rs b/crates/lumen-host/src/main.rs index 1e09086..7196fc8 100644 --- a/crates/lumen-host/src/main.rs +++ b/crates/lumen-host/src/main.rs @@ -60,21 +60,24 @@ fn real_main() -> Result<()> { Some("zerocopy-probe") => zerocopy::probe(), // M0 pipeline spike. Some("m0") => m0::run(parse_m0(&args[1..])?), - // M3 seed: native lumen/1 host (QUIC control plane + UDP data plane). + // M3: native lumen/1 host (QUIC control plane + UDP data plane). Some("m3-host") => { - let port = args - .iter() - .skip_while(|a| *a != "--port") - .nth(1) - .and_then(|s| s.parse().ok()) - .unwrap_or(9777); - let frames = args - .iter() - .skip_while(|a| *a != "--frames") - .nth(1) - .and_then(|s| s.parse().ok()) - .unwrap_or(300); - m3::run(port, frames) + let get = |flag: &str| { + args.iter() + .skip_while(|a| *a != flag) + .nth(1) + .map(String::as_str) + }; + let source = match get("--source") { + Some("virtual") => m3::M3Source::Virtual, + _ => m3::M3Source::Synthetic, + }; + m3::run(m3::M3Options { + port: get("--port").and_then(|s| s.parse().ok()).unwrap_or(9777), + source, + seconds: get("--seconds").and_then(|s| s.parse().ok()).unwrap_or(30), + frames: get("--frames").and_then(|s| s.parse().ok()).unwrap_or(300), + }) } Some("-h") | Some("--help") | Some("help") | None => { print_usage();