feat: M3 — lumen/1 native streaming: real video at client mode + input over QUIC datagrams

The native protocol now does the real thing, end to end:

- Hello carries the client's requested mode; the host creates a NATIVE virtual output at
  exactly that size/refresh (same vdisplay backends as the GameStream path) and streams
  NVENC HEVC through the M1 Session (GF(2^16) Leopard FEC + AES-GCM, QUIC-negotiated).
- Input rides QUIC DATAGRAMS — encrypted, congestion-managed, no ENet retransmission
  spikes — decoded into lumen_core InputEvents and fed to the session's input injector.
- Frames are stamped with the capture wall clock; the reference client computes per-frame
  capture→reassembled latency percentiles and writes a playable .h265.
- m3-host gains --source synthetic|virtual + --seconds; the client gains --mode WxHxFPS,
  --out, --input-test (scripted mouse/keyboard datagrams).

VALIDATED live (gamescope session, xev nested): client requested 1280x720@120 → host
created gamescope at that mode → 1680/1680 frames over 14s, zero loss, valid HEVC;
pipeline latency p50 0.83ms / p95 1.2ms / p99 1.3ms (capture→encode→FEC→crypto→UDP→
reassembled, same-host clock); 176 input datagrams sent → injector (GamescopeEi) → 164
X events observed inside the nested session.

Known follow-on: slice-level sub-frame pipelining needs the NVENC SDK directly (libavcodec
emits whole AUs only) — the next big latency lever.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 06:56:47 +00:00
parent de3123038f
commit 5b0d84acd0
4 changed files with 429 additions and 84 deletions
+195 -30
View File
@@ -1,20 +1,45 @@
//! M3 seed — the `lumen/1` native host: QUIC control plane (lumen-core `quic`) + the hardened
//! M1 data plane over real UDP. This is the first end-to-end run of lumen's own protocol,
//! past the GameStream compatibility layer: the Welcome negotiates **GF(2¹⁶) Leopard FEC**
//! (positively not expressible in GameStream) and AES-GCM with per-direction salts.
//! M3 — the `lumen/1` native host: QUIC control plane + the hardened M1 data plane over UDP.
//! This is lumen's own protocol, past the GameStream compatibility layer:
//!
//! `lumen-host m3-host [--port 9777] [--frames 300]` serves one session: handshake on QUIC,
//! then a native thread streams deterministic, verifiable test frames through
//! `lumen_core::Session` → `UdpTransport`. `lumen-client-rs --connect host:9777` is the
//! counterpart (reassembles, FEC-recovers, verifies content).
//! * the Welcome negotiates **GF(2¹⁶) Leopard FEC** (inexpressible in GameStream) + AES-GCM;
//! * the client's Hello requests a display mode and the host creates a **native virtual
//! output** at exactly that size/refresh (same vdisplay backends as the GameStream path);
//! * **input arrives as QUIC datagrams** — encrypted, congestion-managed, no ENet
//! retransmission spikes — and feeds the session's input injector;
//! * video frames carry a wall-clock `pts_ns`, so a same-host client measures the full
//! capture→encode→FEC→UDP→reassemble latency per frame.
//!
//! `lumen-host m3-host [--port 9777] [--source synthetic|virtual] [--seconds 30]
//! [--frames 300]` serves one session; `lumen-client-rs --connect host:9777` is the
//! counterpart. The data plane runs on native threads (no async on the frame path).
use anyhow::{anyhow, Context, Result};
use lumen_core::config::{FecConfig, FecScheme, Mode, Role};
use lumen_core::config::{FecConfig, FecScheme, Role};
use lumen_core::input::InputEvent;
use lumen_core::packet::{FLAG_PIC, FLAG_SOF};
use lumen_core::quic::{endpoint, io, Hello, Start, Welcome};
use lumen_core::transport::UdpTransport;
use lumen_core::Session;
use rand::RngCore;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum M3Source {
/// Deterministic test frames (protocol verification; the client byte-checks them).
Synthetic,
/// Real capture: virtual display at the client's requested mode → NVENC.
Virtual,
}
pub struct M3Options {
pub port: u16,
pub source: M3Source,
/// Virtual-source stream duration.
pub seconds: u32,
/// Synthetic-source frame count.
pub frames: u32,
}
/// Deterministic test frame: `u32 LE index` then `data[i] = idx + i` (wrapping).
pub fn test_frame(idx: u32, len: usize) -> Vec<u8> {
@@ -26,19 +51,26 @@ pub fn test_frame(idx: u32, len: usize) -> Vec<u8> {
d
}
pub fn run(port: u16, frames: u32) -> Result<()> {
fn now_ns() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
pub fn run(opts: M3Options) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.context("tokio runtime")?;
rt.block_on(serve_one(port, frames))
rt.block_on(serve_one(opts))
}
async fn serve_one(port: u16, frames: u32) -> Result<()> {
let ep = endpoint::server(([0, 0, 0, 0], port).into())
async fn serve_one(opts: M3Options) -> Result<()> {
let ep = endpoint::server(([0, 0, 0, 0], opts.port).into())
.map_err(|e| anyhow!("QUIC server endpoint: {e}"))?;
tracing::info!(port, "lumen/1 host listening (QUIC)");
tracing::info!(port = opts.port, source = ?opts.source, "lumen/1 host listening (QUIC)");
let incoming = ep
.accept()
@@ -57,6 +89,12 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> {
hello.abi_version,
lumen_core::ABI_VERSION
);
crate::encode::validate_dimensions(
crate::encode::Codec::H265,
hello.mode.width,
hello.mode.height,
)
.context("client-requested mode")?;
// Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport).
let probe = std::net::UdpSocket::bind("0.0.0.0:0")?;
@@ -68,11 +106,7 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> {
let welcome = Welcome {
abi_version: lumen_core::ABI_VERSION,
udp_port,
mode: Mode {
width: 1920,
height: 1080,
refresh_hz: 60,
},
mode: hello.mode,
// The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption.
fec: FecConfig {
scheme: FecScheme::Gf16,
@@ -83,33 +117,66 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> {
encrypt: true,
key,
salt: *b"lmn1",
frames,
frames: match opts.source {
M3Source::Synthetic => opts.frames,
M3Source::Virtual => 0, // unbounded — client streams until we close
},
};
io::write_msg(&mut send, &welcome.encode()).await?;
let start = Start::decode(&io::read_msg(&mut recv).await?)
.map_err(|e| anyhow!("Start decode: {e:?}"))?;
let client_udp = std::net::SocketAddr::new(peer.ip(), start.client_udp_port);
tracing::info!(%client_udp, udp_port, "handshake complete — streaming");
tracing::info!(%client_udp, udp_port, mode = ?hello.mode, "handshake complete — streaming");
// Input plane: QUIC datagrams → channel → a native injector thread (the injector owns
// non-Send compositor state, so it lives on its own thread).
let (input_tx, input_rx) = std::sync::mpsc::channel::<InputEvent>();
std::thread::Builder::new()
.name("lumen-m3-input".into())
.spawn(move || input_thread(input_rx))
.context("spawn input thread")?;
let input_conn = conn.clone();
tokio::spawn(async move {
let mut count = 0u64;
while let Ok(d) = input_conn.read_datagram().await {
if let Some(ev) = InputEvent::decode(&d) {
count += 1;
if input_tx.send(ev).is_err() {
break;
}
}
}
tracing::info!(count, "input datagram stream ended");
});
// Stop signal: stream duration elapsed or the client went away.
let stop = Arc::new(AtomicBool::new(false));
{
let stop = stop.clone();
let conn = conn.clone();
tokio::spawn(async move {
conn.closed().await;
stop.store(true, Ordering::SeqCst);
});
}
// Data plane on a native thread (no async on the hot path — design invariant).
let cfg = welcome.session_config(Role::Host);
let source = opts.source;
let (seconds, frames) = (opts.seconds, opts.frames);
let mode = hello.mode;
let stop_stream = stop.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
let transport =
UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string())
.context("bind data plane")?;
let mut session =
Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("host session: {e:?}"))?;
let interval = std::time::Duration::from_millis(1000 / 60);
for idx in 0..frames {
let data = test_frame(idx, 64 * 1024);
session
.submit_frame(&data, idx as u64 * 16_666_667, (FLAG_PIC | FLAG_SOF) as u32)
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
std::thread::sleep(interval);
match source {
M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream),
M3Source::Virtual => virtual_stream(&mut session, mode, seconds, &stop_stream),
}
tracing::info!(frames, "all frames sent");
Ok(())
})
.await
.context("stream thread")??;
@@ -120,3 +187,101 @@ async fn serve_one(port: u16, frames: u32) -> Result<()> {
ep.wait_idle().await;
Ok(())
}
/// The injector thread: open the session's input backend on first event, then inject.
fn input_thread(rx: std::sync::mpsc::Receiver<InputEvent>) {
let mut injector: Option<Box<dyn crate::inject::InputInjector>> = None;
while let Ok(ev) = rx.recv() {
if injector.is_none() {
let backend = crate::inject::default_backend();
match crate::inject::open(backend) {
Ok(i) => {
tracing::info!(?backend, "lumen/1 input injector opened");
injector = Some(i);
}
Err(e) => {
tracing::error!(error = %format!("{e:#}"), "input injection unavailable");
return;
}
}
}
if let Err(e) = injector.as_mut().unwrap().inject(&ev) {
tracing::warn!(error = %format!("{e:#}"), "inject failed");
}
}
}
fn synthetic_stream(session: &mut Session, frames: u32, stop: &AtomicBool) -> Result<()> {
let interval = std::time::Duration::from_millis(1000 / 60);
for idx in 0..frames {
if stop.load(Ordering::SeqCst) {
break;
}
let data = test_frame(idx, 64 * 1024);
session
.submit_frame(&data, now_ns(), (FLAG_PIC | FLAG_SOF) as u32)
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
std::thread::sleep(interval);
}
tracing::info!(frames, "synthetic stream complete");
Ok(())
}
/// Real capture→encode→lumen/1: a native virtual output at the client's mode, NVENC AUs
/// stamped with the capture wall clock (the client derives per-frame pipeline latency).
fn virtual_stream(
session: &mut Session,
mode: lumen_core::Mode,
seconds: u32,
stop: &AtomicBool,
) -> Result<()> {
let compositor = crate::vdisplay::detect().context("detect compositor")?;
tracing::info!(?compositor, ?mode, "lumen/1 virtual display");
let mut vd = crate::vdisplay::open(compositor)?;
let vout = vd.create(mode).context("create virtual output")?;
let mut capturer =
crate::capture::capture_virtual_output(vout).context("capture virtual output")?;
capturer.set_active(true);
let mut frame = capturer.next_frame().context("first frame")?;
let mut enc = crate::encode::open_video(
crate::encode::Codec::H265,
frame.format,
frame.width,
frame.height,
mode.refresh_hz,
20_000_000,
frame.is_cuda(),
)
.context("open NVENC")?;
let interval = std::time::Duration::from_secs_f64(1.0 / mode.refresh_hz.max(1) as f64);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
let mut next = std::time::Instant::now();
let mut sent: u64 = 0;
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
if let Some(f) = capturer.try_latest().context("capture")? {
frame = f;
}
let capture_ns = now_ns();
enc.submit(&frame).context("encoder submit")?;
while let Some(au) = enc.poll().context("encoder poll")? {
let flags = if au.keyframe {
(FLAG_PIC | FLAG_SOF) as u32
} else {
FLAG_PIC as u32
};
session
.submit_frame(&au.data, capture_ns, flags)
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
sent += 1;
}
next += interval;
match next.checked_duration_since(std::time::Instant::now()) {
Some(d) => std::thread::sleep(d),
None => next = std::time::Instant::now(),
}
}
tracing::info!(sent, "lumen/1 virtual stream complete");
Ok(())
}
+17 -14
View File
@@ -60,21 +60,24 @@ fn real_main() -> Result<()> {
Some("zerocopy-probe") => zerocopy::probe(),
// M0 pipeline spike.
Some("m0") => m0::run(parse_m0(&args[1..])?),
// M3 seed: native lumen/1 host (QUIC control plane + UDP data plane).
// M3: native lumen/1 host (QUIC control plane + UDP data plane).
Some("m3-host") => {
let port = args
.iter()
.skip_while(|a| *a != "--port")
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(9777);
let frames = args
.iter()
.skip_while(|a| *a != "--frames")
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(300);
m3::run(port, frames)
let get = |flag: &str| {
args.iter()
.skip_while(|a| *a != flag)
.nth(1)
.map(String::as_str)
};
let source = match get("--source") {
Some("virtual") => m3::M3Source::Virtual,
_ => m3::M3Source::Synthetic,
};
m3::run(m3::M3Options {
port: get("--port").and_then(|s| s.parse().ok()).unwrap_or(9777),
source,
seconds: get("--seconds").and_then(|s| s.parse().ok()).unwrap_or(30),
frames: get("--frames").and_then(|s| s.parse().ok()).unwrap_or(300),
})
}
Some("-h") | Some("--help") | Some("help") | None => {
print_usage();