feat: M3 seed — the lumen/1 native protocol: QUIC control plane + reference client (Phase 5)
The first end-to-end run of lumen's own protocol, past the GameStream compatibility layer. - lumen-core/src/quic.rs (behind the `quic` feature): the lumen/1 handshake — Hello/Welcome/ Start as length-prefixed LE binary on one QUIC bi-stream. Welcome carries the COMPLETE data-plane Config: mode, FEC scheme incl. GF(2^16) Leopard (inexpressible in GameStream), shard sizing, AES-GCM key + per-direction salt, data UDP port. Plus quinn endpoint helpers (self-signed server; accepts-any client — pinning lands with the trust model) and framed async IO. Round-trip unit-tested. - lumen-host m3-host: serves one lumen/1 session — QUIC handshake, then a NATIVE thread (no async on the frame path — design invariant) streams deterministic 64KB test frames through the hardened M1 Session over UdpTransport. - lumen-client-rs: from scaffold to working reference client — connects, negotiates, brings up the client Session over UDP, reassembles + FEC-recovers + byte-verifies every frame. VALIDATED END-TO-END on localhost: 300/300 frames verified, 0 mismatches, through QUIC-negotiated GF(2^16) FEC + AES-GCM over real UDP sockets. M4 (decode+present) builds on this exact client skeleton. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Generated
+6
@@ -1464,7 +1464,10 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
|
||||
name = "lumen-client-rs"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"lumen-core",
|
||||
"quinn",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
@@ -1480,7 +1483,9 @@ dependencies = [
|
||||
"proptest",
|
||||
"quinn",
|
||||
"rand 0.9.4",
|
||||
"rcgen",
|
||||
"reed-solomon-simd",
|
||||
"rustls",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -1509,6 +1514,7 @@ dependencies = [
|
||||
"mdns-sd",
|
||||
"opus",
|
||||
"pipewire",
|
||||
"quinn",
|
||||
"rand 0.8.6",
|
||||
"rcgen",
|
||||
"reis",
|
||||
|
||||
@@ -9,6 +9,9 @@ authors.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
[dependencies]
|
||||
lumen-core = { path = "../lumen-core" }
|
||||
lumen-core = { path = "../lumen-core", features = ["quic"] }
|
||||
quinn = "0.11"
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "net", "time", "macros"] }
|
||||
anyhow = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
//! `lumen-client-rs` — the reference client (plan M4). Exists to exercise the `lumen/1`
|
||||
//! (P2) transport: `lumen_core` pulls reassembled, FEC-recovered access units; decode via
|
||||
//! VAAPI; present via wgpu/Vulkan aligned to client vsync (frame pacing, plan §7).
|
||||
//! `lumen-client-rs` — the reference client. M3 seed mode: speak `lumen/1` (QUIC control
|
||||
//! plane) to a lumen host, bring up the client `lumen_core::Session` over UDP, reassemble +
|
||||
//! FEC-recover the host's deterministic test frames, and verify them byte-exactly. (M4 adds
|
||||
//! VAAPI decode + wgpu present on this same skeleton.)
|
||||
//!
|
||||
//! Status: scaffold. The client side of `lumen_core` ([`lumen_core::Session::poll_frame`])
|
||||
//! is already complete and tested; this binary wires it to a real decoder + presenter.
|
||||
//! Usage: `lumen-client-rs [--connect HOST:PORT]` (default `127.0.0.1:9777`).
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use lumen_core::config::Role;
|
||||
use lumen_core::quic::{endpoint, io, Hello, Start, Welcome};
|
||||
use lumen_core::transport::UdpTransport;
|
||||
use lumen_core::{LumenError, Session};
|
||||
|
||||
fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
@@ -11,12 +17,125 @@ fn main() {
|
||||
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
|
||||
)
|
||||
.init();
|
||||
let addr = std::env::args()
|
||||
.skip_while(|a| a != "--connect")
|
||||
.nth(1)
|
||||
.unwrap_or_else(|| "127.0.0.1:9777".into());
|
||||
if let Err(e) = run(&addr) {
|
||||
tracing::error!("{e:#}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
fn run(addr: &str) -> Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.enable_all()
|
||||
.build()?;
|
||||
rt.block_on(session(addr))
|
||||
}
|
||||
|
||||
async fn session(addr: &str) -> Result<()> {
|
||||
let remote: std::net::SocketAddr = addr.parse().context("--connect host:port")?;
|
||||
let ep = endpoint::client_insecure().map_err(|e| anyhow!("QUIC client endpoint: {e}"))?;
|
||||
let conn = ep
|
||||
.connect(remote, "lumen")
|
||||
.context("connect")?
|
||||
.await
|
||||
.context("QUIC handshake")?;
|
||||
tracing::info!(%remote, "lumen/1 connected");
|
||||
let (mut send, mut recv) = conn.open_bi().await.context("open control stream")?;
|
||||
|
||||
io::write_msg(
|
||||
&mut send,
|
||||
&Hello {
|
||||
abi_version: lumen_core::ABI_VERSION,
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await?;
|
||||
let welcome = Welcome::decode(&io::read_msg(&mut recv).await?)
|
||||
.map_err(|e| anyhow!("Welcome decode: {e:?}"))?;
|
||||
tracing::info!(
|
||||
mode = ?welcome.mode,
|
||||
fec = ?welcome.fec,
|
||||
encrypt = welcome.encrypt,
|
||||
frames = welcome.frames,
|
||||
"session offer"
|
||||
);
|
||||
|
||||
// Reserve our data-plane port, then tell the host to start.
|
||||
let probe = std::net::UdpSocket::bind("0.0.0.0:0")?;
|
||||
let udp_port = probe.local_addr()?.port();
|
||||
drop(probe);
|
||||
io::write_msg(
|
||||
&mut send,
|
||||
&Start {
|
||||
client_udp_port: udp_port,
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port);
|
||||
let cfg = welcome.session_config(Role::Client);
|
||||
let expected = welcome.frames;
|
||||
|
||||
// Data plane on a blocking thread (native threads only on the frame path).
|
||||
let verified = tokio::task::spawn_blocking(move || -> Result<u32> {
|
||||
let transport =
|
||||
UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())
|
||||
.context("bind data plane")?;
|
||||
let mut session =
|
||||
Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("client session: {e:?}"))?;
|
||||
let mut ok = 0u32;
|
||||
let mut mismatched = 0u32;
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
|
||||
let mut last_rx = std::time::Instant::now();
|
||||
while ok + mismatched < expected && std::time::Instant::now() < deadline {
|
||||
match session.poll_frame() {
|
||||
Ok(frame) => {
|
||||
last_rx = std::time::Instant::now();
|
||||
let idx = u32::from_le_bytes(frame.data[0..4].try_into().unwrap());
|
||||
if frame.data == test_frame(idx, frame.data.len()) {
|
||||
ok += 1;
|
||||
} else {
|
||||
mismatched += 1;
|
||||
tracing::warn!(idx, "frame content mismatch");
|
||||
}
|
||||
}
|
||||
Err(LumenError::NoFrame) => {
|
||||
if last_rx.elapsed() > std::time::Duration::from_secs(5) {
|
||||
break; // stream went quiet
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_micros(500));
|
||||
}
|
||||
Err(e) => return Err(anyhow!("poll_frame: {e:?}")),
|
||||
}
|
||||
}
|
||||
tracing::info!(ok, mismatched, expected, "verification complete");
|
||||
anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames");
|
||||
anyhow::ensure!(ok == expected, "received {ok}/{expected} frames");
|
||||
Ok(ok)
|
||||
})
|
||||
.await??;
|
||||
|
||||
tracing::info!(
|
||||
"lumen-client-rs scaffold (lumen_core ABI v{})",
|
||||
lumen_core::ABI_VERSION
|
||||
);
|
||||
tracing::info!(
|
||||
"intended flow: lumen_core::Session(client) over UDP → poll_frame → VAAPI decode → wgpu present"
|
||||
verified,
|
||||
"lumen/1 session PASSED — GF(2^16) FEC + AES-GCM over real UDP, QUIC-negotiated"
|
||||
);
|
||||
conn.close(0u32.into(), b"done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The host's deterministic test frame (mirror of `lumen-host::m3::test_frame`).
|
||||
fn test_frame(idx: u32, len: usize) -> Vec<u8> {
|
||||
let mut d = vec![0u8; len];
|
||||
if len >= 4 {
|
||||
d[0..4].copy_from_slice(&idx.to_le_bytes());
|
||||
}
|
||||
for (i, b) in d.iter_mut().enumerate().skip(4) {
|
||||
*b = (idx as u8).wrapping_add(i as u8);
|
||||
}
|
||||
d
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ crate-type = ["lib", "cdylib", "staticlib"]
|
||||
default = []
|
||||
# Control-plane QUIC (pairing, config, reverse audio). tokio is permitted ONLY here,
|
||||
# never on the per-frame hot path. Off by default so the core stays runtime-free.
|
||||
quic = ["dep:quinn", "dep:tokio"]
|
||||
quic = ["dep:quinn", "dep:tokio", "dep:rustls", "dep:rcgen"]
|
||||
|
||||
[dependencies]
|
||||
reed-solomon-simd = "3.1" # GF(2^16) Leopard-RS, SIMD, O(n log n) — the wall-breaker (P2)
|
||||
@@ -37,6 +37,8 @@ rand = "0.9"
|
||||
zeroize = "1"
|
||||
|
||||
quinn = { version = "0.11", optional = true }
|
||||
rustls = { version = "0.23", optional = true, default-features = false, features = ["ring", "std"] }
|
||||
rcgen = { version = "0.13", optional = true, default-features = false, features = ["aws_lc_rs"] }
|
||||
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -60,7 +60,7 @@ pub struct Mode {
|
||||
|
||||
/// Per-block FEC parameters. Recovery count is derived from `fec_percent` exactly as
|
||||
/// GameStream does: `m = ceil(k * fec_percent / 100)`.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct FecConfig {
|
||||
pub scheme: FecScheme,
|
||||
/// Recovery overhead as a percentage of data shards (0 disables FEC).
|
||||
|
||||
@@ -31,6 +31,8 @@ pub mod error;
|
||||
pub mod fec;
|
||||
pub mod input;
|
||||
pub mod packet;
|
||||
#[cfg(feature = "quic")]
|
||||
pub mod quic;
|
||||
pub mod session;
|
||||
pub mod stats;
|
||||
pub mod transport;
|
||||
|
||||
@@ -0,0 +1,334 @@
|
||||
//! `lumen/1` — the native control plane (M3), gated behind the `quic` feature.
|
||||
//!
|
||||
//! GameStream is lumen's compatibility layer; this is the start of its own protocol. A QUIC
|
||||
//! connection (quinn, tokio — control plane only, never the per-frame path) carries a
|
||||
//! length-prefixed binary handshake on one bidirectional stream:
|
||||
//!
|
||||
//! ```text
|
||||
//! client → host Hello { abi_version }
|
||||
//! host → client Welcome { abi_version, session: full data-plane Config + mode + UDP port }
|
||||
//! client → host Start { client_udp_port }
|
||||
//! ```
|
||||
//!
|
||||
//! after which both sides bring up a [`crate::session::Session`] over a plain
|
||||
//! [`UdpTransport`](crate::transport::udp) (native threads, no async) and the host streams.
|
||||
//! The Welcome carries everything the M1 core negotiates — FEC scheme (including GF(2¹⁶)
|
||||
//! Leopard, which GameStream can't express), shard sizing, crypto key/salt — so the data
|
||||
//! plane is exactly the hardened M1 `Session`.
|
||||
//!
|
||||
//! Seed-stage transport security: the host presents a self-signed certificate and the client
|
||||
//! accepts any (pairing/pinning lands with the trust model; the data plane's AES-GCM is
|
||||
//! already real). All integers little-endian; every message is `u16 length || payload`.
|
||||
|
||||
use crate::config::{Config, FecConfig, FecScheme, Mode, ProtocolPhase, Role};
|
||||
use crate::error::{LumenError, Result};
|
||||
|
||||
/// Protocol magic + version, first bytes of every message payload.
|
||||
pub const MAGIC: &[u8; 4] = b"LMN1";
|
||||
|
||||
/// `client → host`: open the session.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct Hello {
|
||||
pub abi_version: u32,
|
||||
}
|
||||
|
||||
/// `host → client`: the complete session offer.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct Welcome {
|
||||
pub abi_version: u32,
|
||||
/// Host UDP port for the data plane.
|
||||
pub udp_port: u16,
|
||||
pub mode: Mode,
|
||||
pub fec: FecConfig,
|
||||
pub shard_payload: u16,
|
||||
pub encrypt: bool,
|
||||
pub key: [u8; 16],
|
||||
pub salt: [u8; 4],
|
||||
/// Seed/testing: how many frames the host will send (0 = unbounded).
|
||||
pub frames: u32,
|
||||
}
|
||||
|
||||
/// `client → host`: data plane is bound, begin streaming.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct Start {
|
||||
pub client_udp_port: u16,
|
||||
}
|
||||
|
||||
impl Hello {
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let mut b = Vec::with_capacity(8);
|
||||
b.extend_from_slice(MAGIC);
|
||||
b.extend_from_slice(&self.abi_version.to_le_bytes());
|
||||
b
|
||||
}
|
||||
|
||||
pub fn decode(b: &[u8]) -> Result<Hello> {
|
||||
if b.len() < 8 || &b[0..4] != MAGIC {
|
||||
return Err(LumenError::InvalidArg("bad Hello"));
|
||||
}
|
||||
Ok(Hello {
|
||||
abi_version: u32::from_le_bytes([b[4], b[5], b[6], b[7]]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Welcome {
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let mut b = Vec::with_capacity(64);
|
||||
b.extend_from_slice(MAGIC);
|
||||
b.extend_from_slice(&self.abi_version.to_le_bytes());
|
||||
b.extend_from_slice(&self.udp_port.to_le_bytes());
|
||||
b.extend_from_slice(&self.mode.width.to_le_bytes());
|
||||
b.extend_from_slice(&self.mode.height.to_le_bytes());
|
||||
b.extend_from_slice(&self.mode.refresh_hz.to_le_bytes());
|
||||
b.push(match self.fec.scheme {
|
||||
FecScheme::Gf8 => 0,
|
||||
FecScheme::Gf16 => 1,
|
||||
});
|
||||
b.push(self.fec.fec_percent);
|
||||
b.extend_from_slice(&self.fec.max_data_per_block.to_le_bytes());
|
||||
b.extend_from_slice(&self.shard_payload.to_le_bytes());
|
||||
b.push(self.encrypt as u8);
|
||||
b.extend_from_slice(&self.key);
|
||||
b.extend_from_slice(&self.salt);
|
||||
b.extend_from_slice(&self.frames.to_le_bytes());
|
||||
b
|
||||
}
|
||||
|
||||
pub fn decode(b: &[u8]) -> Result<Welcome> {
|
||||
// Layout (LE): magic[0..4] abi[4..8] port[8..10] w[10..14] h[14..18] hz[18..22]
|
||||
// scheme[22] pct[23] max_data[24..26] shard[26..28] encrypt[28] key[29..45]
|
||||
// salt[45..49] frames[49..53].
|
||||
if b.len() < 53 || &b[0..4] != MAGIC {
|
||||
return Err(LumenError::InvalidArg("bad Welcome"));
|
||||
}
|
||||
let u32at = |o: usize| u32::from_le_bytes([b[o], b[o + 1], b[o + 2], b[o + 3]]);
|
||||
let u16at = |o: usize| u16::from_le_bytes([b[o], b[o + 1]]);
|
||||
let mut key = [0u8; 16];
|
||||
key.copy_from_slice(&b[29..45]);
|
||||
let mut salt = [0u8; 4];
|
||||
salt.copy_from_slice(&b[45..49]);
|
||||
Ok(Welcome {
|
||||
abi_version: u32at(4),
|
||||
udp_port: u16at(8),
|
||||
mode: Mode {
|
||||
width: u32at(10),
|
||||
height: u32at(14),
|
||||
refresh_hz: u32at(18),
|
||||
},
|
||||
fec: FecConfig {
|
||||
scheme: if b[22] == 1 {
|
||||
FecScheme::Gf16
|
||||
} else {
|
||||
FecScheme::Gf8
|
||||
},
|
||||
fec_percent: b[23],
|
||||
max_data_per_block: u16at(24),
|
||||
},
|
||||
shard_payload: u16at(26),
|
||||
encrypt: b[28] != 0,
|
||||
key,
|
||||
salt,
|
||||
frames: u32at(49),
|
||||
})
|
||||
}
|
||||
|
||||
/// Build the data-plane [`Config`] this offer describes (for `role`).
|
||||
pub fn session_config(&self, role: Role) -> Config {
|
||||
let mut c = Config::p1_defaults(role);
|
||||
c.phase = ProtocolPhase::P1GameStream; // wire phase id pending the P2 packet rev
|
||||
c.fec = self.fec;
|
||||
c.shard_payload = self.shard_payload as usize;
|
||||
c.encrypt = self.encrypt;
|
||||
c.key = self.key;
|
||||
c.salt = self.salt;
|
||||
c
|
||||
}
|
||||
}
|
||||
|
||||
impl Start {
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let mut b = Vec::with_capacity(6);
|
||||
b.extend_from_slice(MAGIC);
|
||||
b.extend_from_slice(&self.client_udp_port.to_le_bytes());
|
||||
b
|
||||
}
|
||||
|
||||
pub fn decode(b: &[u8]) -> Result<Start> {
|
||||
if b.len() < 6 || &b[0..4] != MAGIC {
|
||||
return Err(LumenError::InvalidArg("bad Start"));
|
||||
}
|
||||
Ok(Start {
|
||||
client_udp_port: u16::from_le_bytes([b[4], b[5]]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Frame a message for the control stream: `u16 LE length || payload`.
|
||||
pub fn frame(payload: &[u8]) -> Vec<u8> {
|
||||
let mut b = Vec::with_capacity(2 + payload.len());
|
||||
b.extend_from_slice(&(payload.len() as u16).to_le_bytes());
|
||||
b.extend_from_slice(payload);
|
||||
b
|
||||
}
|
||||
|
||||
/// Async framed-message IO over a quinn stream (`u16 LE length || payload`).
|
||||
pub mod io {
|
||||
/// Read one framed message (bounded at 64 KiB — control messages are tiny).
|
||||
pub async fn read_msg(recv: &mut quinn::RecvStream) -> std::io::Result<Vec<u8>> {
|
||||
let mut len = [0u8; 2];
|
||||
recv.read_exact(&mut len)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
let n = u16::from_le_bytes(len) as usize;
|
||||
let mut buf = vec![0u8; n];
|
||||
recv.read_exact(&mut buf)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Write one framed message.
|
||||
pub async fn write_msg(send: &mut quinn::SendStream, payload: &[u8]) -> std::io::Result<()> {
|
||||
send.write_all(&super::frame(payload))
|
||||
.await
|
||||
.map_err(std::io::Error::other)
|
||||
}
|
||||
}
|
||||
|
||||
/// quinn endpoint constructors (host self-signed; client accepts-any — seed-stage trust).
|
||||
pub mod endpoint {
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Server endpoint with a fresh self-signed certificate.
|
||||
pub fn server(addr: std::net::SocketAddr) -> anyhow_result::Result<quinn::Endpoint> {
|
||||
let cert = rcgen::generate_simple_self_signed(vec!["lumen".into()])
|
||||
.map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?;
|
||||
let cert_der = rustls::pki_types::CertificateDer::from(cert.cert);
|
||||
let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());
|
||||
let server_config =
|
||||
quinn::ServerConfig::with_single_cert(vec![cert_der], key_der.into())
|
||||
.map_err(|e| anyhow_result::Error::msg(format!("server config: {e}")))?;
|
||||
Ok(quinn::Endpoint::server(server_config, addr)?)
|
||||
}
|
||||
|
||||
/// Client endpoint that skips certificate verification (seed stage; pinning lands with
|
||||
/// the pairing/trust model).
|
||||
pub fn client_insecure() -> anyhow_result::Result<quinn::Endpoint> {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let rustls_cfg = rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(Arc::new(SkipVerify))
|
||||
.with_no_client_auth();
|
||||
let quic_cfg = quinn::crypto::rustls::QuicClientConfig::try_from(rustls_cfg)
|
||||
.map_err(|e| anyhow_result::Error::msg(format!("quic client config: {e}")))?;
|
||||
let mut ep = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?;
|
||||
ep.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_cfg)));
|
||||
Ok(ep)
|
||||
}
|
||||
|
||||
/// Minimal error plumbing without pulling anyhow into lumen-core's public API.
|
||||
pub mod anyhow_result {
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
#[derive(Debug)]
|
||||
pub struct Error(String);
|
||||
impl Error {
|
||||
pub fn msg(s: String) -> Self {
|
||||
Error(s)
|
||||
}
|
||||
}
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
impl std::error::Error for Error {}
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Error(e.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SkipVerify;
|
||||
|
||||
impl rustls::client::danger::ServerCertVerifier for SkipVerify {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error>
|
||||
{
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
|
||||
{
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
rustls::crypto::ring::default_provider()
|
||||
.signature_verification_algorithms
|
||||
.supported_schemes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn welcome_roundtrip() {
|
||||
let w = Welcome {
|
||||
abi_version: 1,
|
||||
udp_port: 9999,
|
||||
mode: Mode {
|
||||
width: 2560,
|
||||
height: 1440,
|
||||
refresh_hz: 240,
|
||||
},
|
||||
fec: FecConfig {
|
||||
scheme: FecScheme::Gf16,
|
||||
fec_percent: 20,
|
||||
max_data_per_block: 4096,
|
||||
},
|
||||
shard_payload: 1200,
|
||||
encrypt: true,
|
||||
key: [7u8; 16],
|
||||
salt: [1, 2, 3, 4],
|
||||
frames: 600,
|
||||
};
|
||||
assert_eq!(Welcome::decode(&w.encode()).unwrap(), w);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hello_start_roundtrip() {
|
||||
let h = Hello { abi_version: 1 };
|
||||
assert_eq!(Hello::decode(&h.encode()).unwrap(), h);
|
||||
let s = Start {
|
||||
client_udp_port: 1234,
|
||||
};
|
||||
assert_eq!(Start::decode(&s.encode()).unwrap(), s);
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,9 @@ authors.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
[dependencies]
|
||||
lumen-core = { path = "../lumen-core" }
|
||||
lumen-core = { path = "../lumen-core", features = ["quic"] }
|
||||
# M3 native control plane (the `lumen/1` QUIC handshake; data plane stays native-thread UDP).
|
||||
quinn = "0.11"
|
||||
anyhow = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
//! M3 seed — the `lumen/1` native host: QUIC control plane (lumen-core `quic`) + the hardened
|
||||
//! M1 data plane over real UDP. This is the first end-to-end run of lumen's own protocol,
|
||||
//! past the GameStream compatibility layer: the Welcome negotiates **GF(2¹⁶) Leopard FEC**
|
||||
//! (positively not expressible in GameStream) and AES-GCM with per-direction salts.
|
||||
//!
|
||||
//! `lumen-host m3-host [--port 9777] [--frames 300]` serves one session: handshake on QUIC,
|
||||
//! then a native thread streams deterministic, verifiable test frames through
|
||||
//! `lumen_core::Session` → `UdpTransport`. `lumen-client-rs --connect host:9777` is the
|
||||
//! counterpart (reassembles, FEC-recovers, verifies content).
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use lumen_core::config::{FecConfig, FecScheme, Mode, Role};
|
||||
use lumen_core::packet::{FLAG_PIC, FLAG_SOF};
|
||||
use lumen_core::quic::{endpoint, io, Hello, Start, Welcome};
|
||||
use lumen_core::transport::UdpTransport;
|
||||
use lumen_core::Session;
|
||||
use rand::RngCore;
|
||||
|
||||
/// Deterministic test frame: `u32 LE index` then `data[i] = idx + i` (wrapping).
|
||||
pub fn test_frame(idx: u32, len: usize) -> Vec<u8> {
|
||||
let mut d = vec![0u8; len];
|
||||
d[0..4].copy_from_slice(&idx.to_le_bytes());
|
||||
for (i, b) in d.iter_mut().enumerate().skip(4) {
|
||||
*b = (idx as u8).wrapping_add(i as u8);
|
||||
}
|
||||
d
|
||||
}
|
||||
|
||||
pub fn run(port: u16, frames: u32) -> Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(2)
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("tokio runtime")?;
|
||||
rt.block_on(serve_one(port, frames))
|
||||
}
|
||||
|
||||
async fn serve_one(port: u16, frames: u32) -> Result<()> {
|
||||
let ep = endpoint::server(([0, 0, 0, 0], port).into())
|
||||
.map_err(|e| anyhow!("QUIC server endpoint: {e}"))?;
|
||||
tracing::info!(port, "lumen/1 host listening (QUIC)");
|
||||
|
||||
let incoming = ep
|
||||
.accept()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("endpoint closed"))?;
|
||||
let conn = incoming.await.context("QUIC accept")?;
|
||||
let peer = conn.remote_address();
|
||||
tracing::info!(%peer, "lumen/1 client connected");
|
||||
let (mut send, mut recv) = conn.accept_bi().await.context("accept control stream")?;
|
||||
|
||||
let hello = Hello::decode(&io::read_msg(&mut recv).await?)
|
||||
.map_err(|e| anyhow!("Hello decode: {e:?}"))?;
|
||||
anyhow::ensure!(
|
||||
hello.abi_version == lumen_core::ABI_VERSION,
|
||||
"ABI mismatch: client {} host {}",
|
||||
hello.abi_version,
|
||||
lumen_core::ABI_VERSION
|
||||
);
|
||||
|
||||
// Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport).
|
||||
let probe = std::net::UdpSocket::bind("0.0.0.0:0")?;
|
||||
let udp_port = probe.local_addr()?.port();
|
||||
drop(probe);
|
||||
|
||||
let mut key = [0u8; 16];
|
||||
rand::thread_rng().fill_bytes(&mut key);
|
||||
let welcome = Welcome {
|
||||
abi_version: lumen_core::ABI_VERSION,
|
||||
udp_port,
|
||||
mode: Mode {
|
||||
width: 1920,
|
||||
height: 1080,
|
||||
refresh_hz: 60,
|
||||
},
|
||||
// The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption.
|
||||
fec: FecConfig {
|
||||
scheme: FecScheme::Gf16,
|
||||
fec_percent: 20,
|
||||
max_data_per_block: 4096,
|
||||
},
|
||||
shard_payload: 1200,
|
||||
encrypt: true,
|
||||
key,
|
||||
salt: *b"lmn1",
|
||||
frames,
|
||||
};
|
||||
io::write_msg(&mut send, &welcome.encode()).await?;
|
||||
|
||||
let start = Start::decode(&io::read_msg(&mut recv).await?)
|
||||
.map_err(|e| anyhow!("Start decode: {e:?}"))?;
|
||||
let client_udp = std::net::SocketAddr::new(peer.ip(), start.client_udp_port);
|
||||
tracing::info!(%client_udp, udp_port, "handshake complete — streaming");
|
||||
|
||||
// Data plane on a native thread (no async on the hot path — design invariant).
|
||||
let cfg = welcome.session_config(Role::Host);
|
||||
tokio::task::spawn_blocking(move || -> Result<()> {
|
||||
let transport =
|
||||
UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string())
|
||||
.context("bind data plane")?;
|
||||
let mut session =
|
||||
Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("host session: {e:?}"))?;
|
||||
let interval = std::time::Duration::from_millis(1000 / 60);
|
||||
for idx in 0..frames {
|
||||
let data = test_frame(idx, 64 * 1024);
|
||||
session
|
||||
.submit_frame(&data, idx as u64 * 16_666_667, (FLAG_PIC | FLAG_SOF) as u32)
|
||||
.map_err(|e| anyhow!("submit_frame: {e:?}"))?;
|
||||
std::thread::sleep(interval);
|
||||
}
|
||||
tracing::info!(frames, "all frames sent");
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.context("stream thread")??;
|
||||
|
||||
// Give the client a moment to drain, then close cleanly.
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
conn.close(0u32.into(), b"done");
|
||||
ep.wait_idle().await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -19,6 +19,7 @@ mod encode;
|
||||
mod gamestream;
|
||||
mod inject;
|
||||
mod m0;
|
||||
mod m3;
|
||||
mod pipeline;
|
||||
mod pwinit;
|
||||
mod vdisplay;
|
||||
@@ -59,6 +60,22 @@ fn real_main() -> Result<()> {
|
||||
Some("zerocopy-probe") => zerocopy::probe(),
|
||||
// M0 pipeline spike.
|
||||
Some("m0") => m0::run(parse_m0(&args[1..])?),
|
||||
// M3 seed: native lumen/1 host (QUIC control plane + UDP data plane).
|
||||
Some("m3-host") => {
|
||||
let port = args
|
||||
.iter()
|
||||
.skip_while(|a| *a != "--port")
|
||||
.nth(1)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(9777);
|
||||
let frames = args
|
||||
.iter()
|
||||
.skip_while(|a| *a != "--frames")
|
||||
.nth(1)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(300);
|
||||
m3::run(port, frames)
|
||||
}
|
||||
Some("-h") | Some("--help") | Some("help") | None => {
|
||||
print_usage();
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user