diff --git a/Cargo.lock b/Cargo.lock index 1cbb471..0b36d59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1464,7 +1464,10 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" name = "lumen-client-rs" version = "0.0.1" dependencies = [ + "anyhow", "lumen-core", + "quinn", + "tokio", "tracing", "tracing-subscriber", ] @@ -1480,7 +1483,9 @@ dependencies = [ "proptest", "quinn", "rand 0.9.4", + "rcgen", "reed-solomon-simd", + "rustls", "thiserror 2.0.18", "tokio", "tracing", @@ -1509,6 +1514,7 @@ dependencies = [ "mdns-sd", "opus", "pipewire", + "quinn", "rand 0.8.6", "rcgen", "reis", diff --git a/crates/lumen-client-rs/Cargo.toml b/crates/lumen-client-rs/Cargo.toml index 1328b18..c782de1 100644 --- a/crates/lumen-client-rs/Cargo.toml +++ b/crates/lumen-client-rs/Cargo.toml @@ -9,6 +9,9 @@ authors.workspace = true repository.workspace = true [dependencies] -lumen-core = { path = "../lumen-core" } +lumen-core = { path = "../lumen-core", features = ["quic"] } +quinn = "0.11" +tokio = { version = "1", features = ["rt-multi-thread", "net", "time", "macros"] } +anyhow = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/lumen-client-rs/src/main.rs b/crates/lumen-client-rs/src/main.rs index de6fb64..0bdc83b 100644 --- a/crates/lumen-client-rs/src/main.rs +++ b/crates/lumen-client-rs/src/main.rs @@ -1,9 +1,15 @@ -//! `lumen-client-rs` — the reference client (plan M4). Exists to exercise the `lumen/1` -//! (P2) transport: `lumen_core` pulls reassembled, FEC-recovered access units; decode via -//! VAAPI; present via wgpu/Vulkan aligned to client vsync (frame pacing, plan §7). +//! `lumen-client-rs` — the reference client. M3 seed mode: speak `lumen/1` (QUIC control +//! plane) to a lumen host, bring up the client `lumen_core::Session` over UDP, reassemble + +//! FEC-recover the host's deterministic test frames, and verify them byte-exactly. (M4 adds +//! VAAPI decode + wgpu present on this same skeleton.) //! -//! Status: scaffold. The client side of `lumen_core` ([`lumen_core::Session::poll_frame`]) -//! is already complete and tested; this binary wires it to a real decoder + presenter. +//! Usage: `lumen-client-rs [--connect HOST:PORT]` (default `127.0.0.1:9777`). + +use anyhow::{anyhow, Context, Result}; +use lumen_core::config::Role; +use lumen_core::quic::{endpoint, io, Hello, Start, Welcome}; +use lumen_core::transport::UdpTransport; +use lumen_core::{LumenError, Session}; fn main() { tracing_subscriber::fmt() @@ -11,12 +17,125 @@ fn main() { tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), ) .init(); + let addr = std::env::args() + .skip_while(|a| a != "--connect") + .nth(1) + .unwrap_or_else(|| "127.0.0.1:9777".into()); + if let Err(e) = run(&addr) { + tracing::error!("{e:#}"); + std::process::exit(1); + } +} + +fn run(addr: &str) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build()?; + rt.block_on(session(addr)) +} + +async fn session(addr: &str) -> Result<()> { + let remote: std::net::SocketAddr = addr.parse().context("--connect host:port")?; + let ep = endpoint::client_insecure().map_err(|e| anyhow!("QUIC client endpoint: {e}"))?; + let conn = ep + .connect(remote, "lumen") + .context("connect")? + .await + .context("QUIC handshake")?; + tracing::info!(%remote, "lumen/1 connected"); + let (mut send, mut recv) = conn.open_bi().await.context("open control stream")?; + + io::write_msg( + &mut send, + &Hello { + abi_version: lumen_core::ABI_VERSION, + } + .encode(), + ) + .await?; + let welcome = Welcome::decode(&io::read_msg(&mut recv).await?) + .map_err(|e| anyhow!("Welcome decode: {e:?}"))?; + tracing::info!( + mode = ?welcome.mode, + fec = ?welcome.fec, + encrypt = welcome.encrypt, + frames = welcome.frames, + "session offer" + ); + + // Reserve our data-plane port, then tell the host to start. + let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; + let udp_port = probe.local_addr()?.port(); + drop(probe); + io::write_msg( + &mut send, + &Start { + client_udp_port: udp_port, + } + .encode(), + ) + .await?; + + let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port); + let cfg = welcome.session_config(Role::Client); + let expected = welcome.frames; + + // Data plane on a blocking thread (native threads only on the frame path). + let verified = tokio::task::spawn_blocking(move || -> Result { + let transport = + UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string()) + .context("bind data plane")?; + let mut session = + Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("client session: {e:?}"))?; + let mut ok = 0u32; + let mut mismatched = 0u32; + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + let mut last_rx = std::time::Instant::now(); + while ok + mismatched < expected && std::time::Instant::now() < deadline { + match session.poll_frame() { + Ok(frame) => { + last_rx = std::time::Instant::now(); + let idx = u32::from_le_bytes(frame.data[0..4].try_into().unwrap()); + if frame.data == test_frame(idx, frame.data.len()) { + ok += 1; + } else { + mismatched += 1; + tracing::warn!(idx, "frame content mismatch"); + } + } + Err(LumenError::NoFrame) => { + if last_rx.elapsed() > std::time::Duration::from_secs(5) { + break; // stream went quiet + } + std::thread::sleep(std::time::Duration::from_micros(500)); + } + Err(e) => return Err(anyhow!("poll_frame: {e:?}")), + } + } + tracing::info!(ok, mismatched, expected, "verification complete"); + anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames"); + anyhow::ensure!(ok == expected, "received {ok}/{expected} frames"); + Ok(ok) + }) + .await??; tracing::info!( - "lumen-client-rs scaffold (lumen_core ABI v{})", - lumen_core::ABI_VERSION - ); - tracing::info!( - "intended flow: lumen_core::Session(client) over UDP → poll_frame → VAAPI decode → wgpu present" + verified, + "lumen/1 session PASSED — GF(2^16) FEC + AES-GCM over real UDP, QUIC-negotiated" ); + conn.close(0u32.into(), b"done"); + Ok(()) +} + +/// The host's deterministic test frame (mirror of `lumen-host::m3::test_frame`). +fn test_frame(idx: u32, len: usize) -> Vec { + let mut d = vec![0u8; len]; + if len >= 4 { + d[0..4].copy_from_slice(&idx.to_le_bytes()); + } + for (i, b) in d.iter_mut().enumerate().skip(4) { + *b = (idx as u8).wrapping_add(i as u8); + } + d } diff --git a/crates/lumen-core/Cargo.toml b/crates/lumen-core/Cargo.toml index 0d1bf85..fe1e8ca 100644 --- a/crates/lumen-core/Cargo.toml +++ b/crates/lumen-core/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["lib", "cdylib", "staticlib"] default = [] # Control-plane QUIC (pairing, config, reverse audio). tokio is permitted ONLY here, # never on the per-frame hot path. Off by default so the core stays runtime-free. -quic = ["dep:quinn", "dep:tokio"] +quic = ["dep:quinn", "dep:tokio", "dep:rustls", "dep:rcgen"] [dependencies] reed-solomon-simd = "3.1" # GF(2^16) Leopard-RS, SIMD, O(n log n) — the wall-breaker (P2) @@ -37,6 +37,8 @@ rand = "0.9" zeroize = "1" quinn = { version = "0.11", optional = true } +rustls = { version = "0.23", optional = true, default-features = false, features = ["ring", "std"] } +rcgen = { version = "0.13", optional = true, default-features = false, features = ["aws_lc_rs"] } tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] } [dev-dependencies] diff --git a/crates/lumen-core/src/config.rs b/crates/lumen-core/src/config.rs index b0e781a..ed303a5 100644 --- a/crates/lumen-core/src/config.rs +++ b/crates/lumen-core/src/config.rs @@ -60,7 +60,7 @@ pub struct Mode { /// Per-block FEC parameters. Recovery count is derived from `fec_percent` exactly as /// GameStream does: `m = ceil(k * fec_percent / 100)`. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct FecConfig { pub scheme: FecScheme, /// Recovery overhead as a percentage of data shards (0 disables FEC). diff --git a/crates/lumen-core/src/lib.rs b/crates/lumen-core/src/lib.rs index 96660c2..b092953 100644 --- a/crates/lumen-core/src/lib.rs +++ b/crates/lumen-core/src/lib.rs @@ -31,6 +31,8 @@ pub mod error; pub mod fec; pub mod input; pub mod packet; +#[cfg(feature = "quic")] +pub mod quic; pub mod session; pub mod stats; pub mod transport; diff --git a/crates/lumen-core/src/quic.rs b/crates/lumen-core/src/quic.rs new file mode 100644 index 0000000..53d10a3 --- /dev/null +++ b/crates/lumen-core/src/quic.rs @@ -0,0 +1,334 @@ +//! `lumen/1` — the native control plane (M3), gated behind the `quic` feature. +//! +//! GameStream is lumen's compatibility layer; this is the start of its own protocol. A QUIC +//! connection (quinn, tokio — control plane only, never the per-frame path) carries a +//! length-prefixed binary handshake on one bidirectional stream: +//! +//! ```text +//! client → host Hello { abi_version } +//! host → client Welcome { abi_version, session: full data-plane Config + mode + UDP port } +//! client → host Start { client_udp_port } +//! ``` +//! +//! after which both sides bring up a [`crate::session::Session`] over a plain +//! [`UdpTransport`](crate::transport::udp) (native threads, no async) and the host streams. +//! The Welcome carries everything the M1 core negotiates — FEC scheme (including GF(2¹⁶) +//! Leopard, which GameStream can't express), shard sizing, crypto key/salt — so the data +//! plane is exactly the hardened M1 `Session`. +//! +//! Seed-stage transport security: the host presents a self-signed certificate and the client +//! accepts any (pairing/pinning lands with the trust model; the data plane's AES-GCM is +//! already real). All integers little-endian; every message is `u16 length || payload`. + +use crate::config::{Config, FecConfig, FecScheme, Mode, ProtocolPhase, Role}; +use crate::error::{LumenError, Result}; + +/// Protocol magic + version, first bytes of every message payload. +pub const MAGIC: &[u8; 4] = b"LMN1"; + +/// `client → host`: open the session. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct Hello { + pub abi_version: u32, +} + +/// `host → client`: the complete session offer. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct Welcome { + pub abi_version: u32, + /// Host UDP port for the data plane. + pub udp_port: u16, + pub mode: Mode, + pub fec: FecConfig, + pub shard_payload: u16, + pub encrypt: bool, + pub key: [u8; 16], + pub salt: [u8; 4], + /// Seed/testing: how many frames the host will send (0 = unbounded). + pub frames: u32, +} + +/// `client → host`: data plane is bound, begin streaming. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct Start { + pub client_udp_port: u16, +} + +impl Hello { + pub fn encode(&self) -> Vec { + let mut b = Vec::with_capacity(8); + b.extend_from_slice(MAGIC); + b.extend_from_slice(&self.abi_version.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() < 8 || &b[0..4] != MAGIC { + return Err(LumenError::InvalidArg("bad Hello")); + } + Ok(Hello { + abi_version: u32::from_le_bytes([b[4], b[5], b[6], b[7]]), + }) + } +} + +impl Welcome { + pub fn encode(&self) -> Vec { + let mut b = Vec::with_capacity(64); + b.extend_from_slice(MAGIC); + b.extend_from_slice(&self.abi_version.to_le_bytes()); + b.extend_from_slice(&self.udp_port.to_le_bytes()); + b.extend_from_slice(&self.mode.width.to_le_bytes()); + b.extend_from_slice(&self.mode.height.to_le_bytes()); + b.extend_from_slice(&self.mode.refresh_hz.to_le_bytes()); + b.push(match self.fec.scheme { + FecScheme::Gf8 => 0, + FecScheme::Gf16 => 1, + }); + b.push(self.fec.fec_percent); + b.extend_from_slice(&self.fec.max_data_per_block.to_le_bytes()); + b.extend_from_slice(&self.shard_payload.to_le_bytes()); + b.push(self.encrypt as u8); + b.extend_from_slice(&self.key); + b.extend_from_slice(&self.salt); + b.extend_from_slice(&self.frames.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + // Layout (LE): magic[0..4] abi[4..8] port[8..10] w[10..14] h[14..18] hz[18..22] + // scheme[22] pct[23] max_data[24..26] shard[26..28] encrypt[28] key[29..45] + // salt[45..49] frames[49..53]. + if b.len() < 53 || &b[0..4] != MAGIC { + return Err(LumenError::InvalidArg("bad Welcome")); + } + let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]); + let u16at = |o: usize| u16::from_le_bytes([b[o], b[o + 1]]); + let mut key = [0u8; 16]; + key.copy_from_slice(&b[29..45]); + let mut salt = [0u8; 4]; + salt.copy_from_slice(&b[45..49]); + Ok(Welcome { + abi_version: u32at(4), + udp_port: u16at(8), + mode: Mode { + width: u32at(10), + height: u32at(14), + refresh_hz: u32at(18), + }, + fec: FecConfig { + scheme: if b[22] == 1 { + FecScheme::Gf16 + } else { + FecScheme::Gf8 + }, + fec_percent: b[23], + max_data_per_block: u16at(24), + }, + shard_payload: u16at(26), + encrypt: b[28] != 0, + key, + salt, + frames: u32at(49), + }) + } + + /// Build the data-plane [`Config`] this offer describes (for `role`). + pub fn session_config(&self, role: Role) -> Config { + let mut c = Config::p1_defaults(role); + c.phase = ProtocolPhase::P1GameStream; // wire phase id pending the P2 packet rev + c.fec = self.fec; + c.shard_payload = self.shard_payload as usize; + c.encrypt = self.encrypt; + c.key = self.key; + c.salt = self.salt; + c + } +} + +impl Start { + pub fn encode(&self) -> Vec { + let mut b = Vec::with_capacity(6); + b.extend_from_slice(MAGIC); + b.extend_from_slice(&self.client_udp_port.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() < 6 || &b[0..4] != MAGIC { + return Err(LumenError::InvalidArg("bad Start")); + } + Ok(Start { + client_udp_port: u16::from_le_bytes([b[4], b[5]]), + }) + } +} + +/// Frame a message for the control stream: `u16 LE length || payload`. +pub fn frame(payload: &[u8]) -> Vec { + let mut b = Vec::with_capacity(2 + payload.len()); + b.extend_from_slice(&(payload.len() as u16).to_le_bytes()); + b.extend_from_slice(payload); + b +} + +/// Async framed-message IO over a quinn stream (`u16 LE length || payload`). +pub mod io { + /// Read one framed message (bounded at 64 KiB — control messages are tiny). + pub async fn read_msg(recv: &mut quinn::RecvStream) -> std::io::Result> { + let mut len = [0u8; 2]; + recv.read_exact(&mut len) + .await + .map_err(std::io::Error::other)?; + let n = u16::from_le_bytes(len) as usize; + let mut buf = vec![0u8; n]; + recv.read_exact(&mut buf) + .await + .map_err(std::io::Error::other)?; + Ok(buf) + } + + /// Write one framed message. + pub async fn write_msg(send: &mut quinn::SendStream, payload: &[u8]) -> std::io::Result<()> { + send.write_all(&super::frame(payload)) + .await + .map_err(std::io::Error::other) + } +} + +/// quinn endpoint constructors (host self-signed; client accepts-any — seed-stage trust). +pub mod endpoint { + use std::sync::Arc; + + /// Server endpoint with a fresh self-signed certificate. + pub fn server(addr: std::net::SocketAddr) -> anyhow_result::Result { + let cert = rcgen::generate_simple_self_signed(vec!["lumen".into()]) + .map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?; + let cert_der = rustls::pki_types::CertificateDer::from(cert.cert); + let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); + let server_config = + quinn::ServerConfig::with_single_cert(vec![cert_der], key_der.into()) + .map_err(|e| anyhow_result::Error::msg(format!("server config: {e}")))?; + Ok(quinn::Endpoint::server(server_config, addr)?) + } + + /// Client endpoint that skips certificate verification (seed stage; pinning lands with + /// the pairing/trust model). + pub fn client_insecure() -> anyhow_result::Result { + let _ = rustls::crypto::ring::default_provider().install_default(); + let rustls_cfg = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(SkipVerify)) + .with_no_client_auth(); + let quic_cfg = quinn::crypto::rustls::QuicClientConfig::try_from(rustls_cfg) + .map_err(|e| anyhow_result::Error::msg(format!("quic client config: {e}")))?; + let mut ep = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?; + ep.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_cfg))); + Ok(ep) + } + + /// Minimal error plumbing without pulling anyhow into lumen-core's public API. + pub mod anyhow_result { + pub type Result = std::result::Result; + #[derive(Debug)] + pub struct Error(String); + impl Error { + pub fn msg(s: String) -> Self { + Error(s) + } + } + impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } + } + impl std::error::Error for Error {} + impl From for Error { + fn from(e: std::io::Error) -> Self { + Error(e.to_string()) + } + } + } + + #[derive(Debug)] + struct SkipVerify; + + impl rustls::client::danger::ServerCertVerifier for SkipVerify { + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> std::result::Result + { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result + { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result + { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + rustls::crypto::ring::default_provider() + .signature_verification_algorithms + .supported_schemes() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn welcome_roundtrip() { + let w = Welcome { + abi_version: 1, + udp_port: 9999, + mode: Mode { + width: 2560, + height: 1440, + refresh_hz: 240, + }, + fec: FecConfig { + scheme: FecScheme::Gf16, + fec_percent: 20, + max_data_per_block: 4096, + }, + shard_payload: 1200, + encrypt: true, + key: [7u8; 16], + salt: [1, 2, 3, 4], + frames: 600, + }; + assert_eq!(Welcome::decode(&w.encode()).unwrap(), w); + } + + #[test] + fn hello_start_roundtrip() { + let h = Hello { abi_version: 1 }; + assert_eq!(Hello::decode(&h.encode()).unwrap(), h); + let s = Start { + client_udp_port: 1234, + }; + assert_eq!(Start::decode(&s.encode()).unwrap(), s); + } +} diff --git a/crates/lumen-host/Cargo.toml b/crates/lumen-host/Cargo.toml index 2dc6002..82f45e3 100644 --- a/crates/lumen-host/Cargo.toml +++ b/crates/lumen-host/Cargo.toml @@ -9,7 +9,9 @@ authors.workspace = true repository.workspace = true [dependencies] -lumen-core = { path = "../lumen-core" } +lumen-core = { path = "../lumen-core", features = ["quic"] } +# M3 native control plane (the `lumen/1` QUIC handshake; data plane stays native-thread UDP). +quinn = "0.11" anyhow = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/lumen-host/src/m3.rs b/crates/lumen-host/src/m3.rs new file mode 100644 index 0000000..858bc4a --- /dev/null +++ b/crates/lumen-host/src/m3.rs @@ -0,0 +1,122 @@ +//! M3 seed — the `lumen/1` native host: QUIC control plane (lumen-core `quic`) + the hardened +//! M1 data plane over real UDP. This is the first end-to-end run of lumen's own protocol, +//! past the GameStream compatibility layer: the Welcome negotiates **GF(2¹⁶) Leopard FEC** +//! (positively not expressible in GameStream) and AES-GCM with per-direction salts. +//! +//! `lumen-host m3-host [--port 9777] [--frames 300]` serves one session: handshake on QUIC, +//! then a native thread streams deterministic, verifiable test frames through +//! `lumen_core::Session` → `UdpTransport`. `lumen-client-rs --connect host:9777` is the +//! counterpart (reassembles, FEC-recovers, verifies content). + +use anyhow::{anyhow, Context, Result}; +use lumen_core::config::{FecConfig, FecScheme, Mode, Role}; +use lumen_core::packet::{FLAG_PIC, FLAG_SOF}; +use lumen_core::quic::{endpoint, io, Hello, Start, Welcome}; +use lumen_core::transport::UdpTransport; +use lumen_core::Session; +use rand::RngCore; + +/// Deterministic test frame: `u32 LE index` then `data[i] = idx + i` (wrapping). +pub fn test_frame(idx: u32, len: usize) -> Vec { + let mut d = vec![0u8; len]; + d[0..4].copy_from_slice(&idx.to_le_bytes()); + for (i, b) in d.iter_mut().enumerate().skip(4) { + *b = (idx as u8).wrapping_add(i as u8); + } + d +} + +pub fn run(port: u16, frames: u32) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .context("tokio runtime")?; + rt.block_on(serve_one(port, frames)) +} + +async fn serve_one(port: u16, frames: u32) -> Result<()> { + let ep = endpoint::server(([0, 0, 0, 0], port).into()) + .map_err(|e| anyhow!("QUIC server endpoint: {e}"))?; + tracing::info!(port, "lumen/1 host listening (QUIC)"); + + let incoming = ep + .accept() + .await + .ok_or_else(|| anyhow!("endpoint closed"))?; + let conn = incoming.await.context("QUIC accept")?; + let peer = conn.remote_address(); + tracing::info!(%peer, "lumen/1 client connected"); + let (mut send, mut recv) = conn.accept_bi().await.context("accept control stream")?; + + let hello = Hello::decode(&io::read_msg(&mut recv).await?) + .map_err(|e| anyhow!("Hello decode: {e:?}"))?; + anyhow::ensure!( + hello.abi_version == lumen_core::ABI_VERSION, + "ABI mismatch: client {} host {}", + hello.abi_version, + lumen_core::ABI_VERSION + ); + + // Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport). + let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; + let udp_port = probe.local_addr()?.port(); + drop(probe); + + let mut key = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut key); + let welcome = Welcome { + abi_version: lumen_core::ABI_VERSION, + udp_port, + mode: Mode { + width: 1920, + height: 1080, + refresh_hz: 60, + }, + // The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption. + fec: FecConfig { + scheme: FecScheme::Gf16, + fec_percent: 20, + max_data_per_block: 4096, + }, + shard_payload: 1200, + encrypt: true, + key, + salt: *b"lmn1", + frames, + }; + io::write_msg(&mut send, &welcome.encode()).await?; + + let start = Start::decode(&io::read_msg(&mut recv).await?) + .map_err(|e| anyhow!("Start decode: {e:?}"))?; + let client_udp = std::net::SocketAddr::new(peer.ip(), start.client_udp_port); + tracing::info!(%client_udp, udp_port, "handshake complete — streaming"); + + // Data plane on a native thread (no async on the hot path — design invariant). + let cfg = welcome.session_config(Role::Host); + tokio::task::spawn_blocking(move || -> Result<()> { + let transport = + UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string()) + .context("bind data plane")?; + let mut session = + Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("host session: {e:?}"))?; + let interval = std::time::Duration::from_millis(1000 / 60); + for idx in 0..frames { + let data = test_frame(idx, 64 * 1024); + session + .submit_frame(&data, idx as u64 * 16_666_667, (FLAG_PIC | FLAG_SOF) as u32) + .map_err(|e| anyhow!("submit_frame: {e:?}"))?; + std::thread::sleep(interval); + } + tracing::info!(frames, "all frames sent"); + Ok(()) + }) + .await + .context("stream thread")??; + + // Give the client a moment to drain, then close cleanly. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + conn.close(0u32.into(), b"done"); + ep.wait_idle().await; + Ok(()) +} diff --git a/crates/lumen-host/src/main.rs b/crates/lumen-host/src/main.rs index c13b874..1e09086 100644 --- a/crates/lumen-host/src/main.rs +++ b/crates/lumen-host/src/main.rs @@ -19,6 +19,7 @@ mod encode; mod gamestream; mod inject; mod m0; +mod m3; mod pipeline; mod pwinit; mod vdisplay; @@ -59,6 +60,22 @@ fn real_main() -> Result<()> { Some("zerocopy-probe") => zerocopy::probe(), // M0 pipeline spike. Some("m0") => m0::run(parse_m0(&args[1..])?), + // M3 seed: native lumen/1 host (QUIC control plane + UDP data plane). + Some("m3-host") => { + let port = args + .iter() + .skip_while(|a| *a != "--port") + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(9777); + let frames = args + .iter() + .skip_while(|a| *a != "--frames") + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(300); + m3::run(port, frames) + } Some("-h") | Some("--help") | Some("help") | None => { print_usage(); Ok(())