feat: M1 lumen-core (FEC/crypto/packet/session + C ABI) and workspace scaffold

Ground-up low-latency streaming stack per docs/implementation-plan.md. M1 is
complete and tested; Linux host backends are cfg-gated stubs to be filled in on
real hardware (M0/M2).

lumen-core (built + tested on macOS/aarch64 — 21 tests):
- fec: ErasureCoder over GF(2^8) (reed-solomon-erasure, Moonlight-compatible)
  and GF(2^16) Leopard-RS (reed-solomon-simd, the >1 Gbps wall-breaker); proptested
- packet: zero-copy #[repr(C)] framing, multi-block, FEC-aware reassembly
- crypto: AES-128-GCM with per-direction nonce salts + sequence-as-AAD
- session: host submit / client poll hot paths + input; loopback & UDP transports
- abi: opaque handles, versioned LumenConfig, panic guards; cbindgen-generated header
- acceptance: Rust loopback+proptest and a C harness that links the staticlib

Scaffold (compiles green on all platforms): lumen-host (vdisplay/capture/encode/
inject/web/pipeline seams under cfg(linux)), lumen-client-rs, tools/{loss-harness,
latency-probe}, Apple/Android client stubs, Gitea CI, docs.

Hardened against a multi-agent adversarial review (13 verified findings fixed,
regression-tested): reassembler memory-DoS bounds + block-consistency validation,
GCM nonce-reuse direction separation, ABI struct_size guard + range checks, FEC
shard-length guards, shard_payload datagram bound, key zeroization + Debug redaction.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 00:02:52 +02:00
parent 4a1e3cd2fd
commit a913042367
47 changed files with 6015 additions and 0 deletions
+43
View File
@@ -0,0 +1,43 @@
# CI for lumen (Gitea Actions, GitHub-Actions-compatible syntax).
# Adjust `runs-on` to match your runner labels if not using the default ubuntu image.
name: ci
on:
push:
branches: [main]
pull_request:
jobs:
rust:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust toolchain
run: |
if ! command -v cargo >/dev/null; then
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
fi
rustup component add rustfmt clippy
- name: Format
run: cargo fmt --all --check
- name: Clippy (deny warnings)
run: cargo clippy --workspace --all-targets -- -D warnings
- name: Build
run: cargo build --workspace --locked
- name: Test (unit + loopback + proptest + C ABI harness)
run: cargo test --workspace --locked
- name: C ABI harness (standalone link proof)
run: bash crates/lumen-core/tests/c/run.sh
- name: Verify generated header is committed & up to date
run: |
cargo build -p lumen-core
git diff --exit-code include/lumen_core.h \
|| (echo "include/lumen_core.h is stale — commit the regenerated header" && exit 1)
+9
View File
@@ -0,0 +1,9 @@
/target
**/*.rs.bk
*.pdb
.DS_Store
# generated C header is checked in (see include/), build artifacts are not
/crates/*/target
/tools/*/target
node_modules/
dist/
Generated
+1934
View File
File diff suppressed because it is too large Load Diff
+31
View File
@@ -0,0 +1,31 @@
[workspace]
resolver = "2"
members = [
"crates/lumen-core",
"crates/lumen-host",
"crates/lumen-client-rs",
"tools/latency-probe",
"tools/loss-harness",
]
[workspace.package]
version = "0.0.1"
edition = "2021"
rust-version = "1.82"
license = "MIT OR Apache-2.0"
authors = ["unom"]
repository = "https://git.unom.io/unom/lumen"
[profile.release]
opt-level = 3
lto = "thin"
codegen-units = 1
# NOTE: deliberately NOT `panic = "abort"`. lumen-core ships as a cdylib/staticlib into
# third-party apps (Swift/Kotlin/C) and its C ABI catches panics at the boundary
# (`catch_unwind` → `LumenStatus::Panic`). `panic = "abort"` would make that guard a
# no-op and let a stray panic abort the embedding application. Unwinding keeps the
# documented isolation guarantee real.
# The per-frame hot path must stay fast even in dev builds.
[profile.dev.package."*"]
opt-level = 2
+64
View File
@@ -1,2 +1,66 @@
# lumen
*A ground-up low-latency desktop streaming stack, built Linux-first, with a shared Rust
protocol core and native clients per platform.*
`lumen` is a placeholder codename. The bet: ship a **Linux virtual-display streaming
host** that speaks the existing Moonlight protocol (every Moonlight/Artemis client works
day one), then break the ~1 Gbps FEC wall with a **GF(2¹⁶) Leopard-RS** transport as a
negotiated extension. See [`docs/implementation-plan.md`](docs/implementation-plan.md).
## Status
| Milestone | State |
|-----------|-------|
| **M1 — `lumen-core` + C ABI** | ✅ done & tested (FEC, packetization, crypto, session, `lumen_core.h`) |
| M0 — pipeline spike (wlroots→PipeWire→encode→file) | ⬜ needs Linux GPU |
| M2 — P1 host → stock Moonlight | ⬜ scaffolded (`lumen-host`) |
| M3 — measurement harness | 🟡 `tools/loss-harness` runs; `latency-probe` scaffolded |
| M4 — P2 transport + Rust client | 🟡 GF(2¹⁶) core done; `lumen-client-rs` scaffolded |
| M5 — Apple client | ⬜ scaffolded (`clients/apple`) |
`lumen-core` is complete and verified: it builds and its full test suite (FEC recovery,
loopback round-trip under loss, property tests, and a **C ABI harness**) passes on
macOS/aarch64. The Linux host backends (PipeWire, VAAPI/NVENC, KWin, libei) are
`#[cfg(target_os = "linux")]` seams — defined and compiling, implementations pending.
## Layout
```
crates/
lumen-core/ protocol · FEC · pacing · crypto — the C ABI (lib + cdylib + staticlib)
lumen-host/ Linux host: vdisplay · capture · encode · inject · web (cfg-gated)
lumen-client-rs/ reference client (M4): VAAPI decode + wgpu present
clients/{apple,android}/ native client scaffolds (import lumen_core.h)
include/lumen_core.h cbindgen-generated C header (checked in)
tools/{latency-probe,loss-harness}/ measurement (plan §10)
docs/implementation-plan.md
```
## Build & test
```sh
cargo build --workspace # green on Linux and macOS
cargo test --workspace # unit + loopback + proptest + C ABI harness
cargo clippy --workspace --all-targets
cargo run -p loss-harness # FEC loss-resilience sweep (no network needed)
bash crates/lumen-core/tests/c/run.sh # standalone C-ABI link+round-trip proof
```
The C header regenerates from `crates/lumen-core/src/abi.rs` on every build (cbindgen via
`build.rs`) into `include/lumen_core.h`.
## Design invariants
- **One core, linked everywhere.** Protocol/FEC/crypto/pacing live in `lumen-core` exactly
once, exposed over a stable, versioned C ABI (`lumen_abi_version()`, `LumenConfig`
carries its own `struct_size`).
- **No async on the hot path.** The per-frame pipeline uses native threads only;
`tokio`/`quinn` are gated behind the off-by-default `quic` feature (control plane only).
- **FEC is the wall-breaker.** GF(2⁸) (≤255 shards/block) for Moonlight compat;
GF(2¹⁶) (≤65535 shards/block, SIMD, O(n log n)) to push past ~1 Gbps.
## License
MIT OR Apache-2.0.
+20
View File
@@ -0,0 +1,20 @@
# lumen Android client (later)
Kotlin UI + MediaCodec (decode) + a thin JNI layer over the `lumen-core` C ABI.
## Wiring
1. Build the core as a shared library per Android ABI:
```sh
rustup target add aarch64-linux-android armv7-linux-androideabi x86_64-linux-android
cargo build -p lumen-core --release --target aarch64-linux-android # liblumen_core.so
```
(Use `cargo-ndk` to handle the NDK toolchain/linker.)
2. JNI shim: small C/Rust glue mapping `lumen_*` to Kotlin `external fun`s, bundling
`liblumen_core.so` into the APK's `jniLibs/`.
3. Kotlin: client `LumenSession` → `lumen_client_poll_frame` on a decode thread → feed
`MediaCodec` → render to a `SurfaceView` aligned to the display refresh.
## Status
Placeholder — scheduled after the Apple client (M5).
+22
View File
@@ -0,0 +1,22 @@
# lumen Apple client (M5)
Swift + VideoToolbox (decode) + Metal (present) + SwiftUI, linking `lumen-core` through
the generated C ABI — **no glue layer**. Imports `include/lumen_core.h` via a module map.
## Wiring
1. Build the core as a static or dynamic library for Apple targets:
```sh
rustup target add aarch64-apple-ios aarch64-apple-darwin
cargo build -p lumen-core --release --target aarch64-apple-darwin # liblumen_core.a / .dylib
```
2. Expose the C ABI to Swift with a module map (`module.modulemap` here) that points at
the checked-in header `../../include/lumen_core.h`.
3. In Swift: create a client `LumenSession`, `lumen_client_poll_frame` on a display-link
thread, feed the access unit to a `VTDecompressionSession`, present the `CVImageBuffer`
with Metal aligned to the screen's refresh (frame pacing, plan §7).
## Status
Scaffold. The client half of `lumen_core` (`poll_frame`, FEC recovery, reassembly) is
complete and tested; this target adds the platform decode + present.
+7
View File
@@ -0,0 +1,7 @@
// Exposes the lumen-core C ABI to Swift as `import LumenCore`.
// Point Xcode's "Import Paths" (SWIFT_INCLUDE_PATHS) at this directory, and link
// liblumen_core.a (or .dylib) built via `cargo build -p lumen-core --target <apple>`.
module LumenCore {
header "../../include/lumen_core.h"
export *
}
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "lumen-client-rs"
description = "lumen reference client (M4): VAAPI decode + wgpu/Vulkan present"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
[dependencies]
lumen-core = { path = "../lumen-core" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+22
View File
@@ -0,0 +1,22 @@
//! `lumen-client-rs` — the reference client (plan M4). Exists to exercise the `lumen/1`
//! (P2) transport: `lumen_core` pulls reassembled, FEC-recovered access units; decode via
//! VAAPI; present via wgpu/Vulkan aligned to client vsync (frame pacing, plan §7).
//!
//! Status: scaffold. The client side of `lumen_core` ([`lumen_core::Session::poll_frame`])
//! is already complete and tested; this binary wires it to a real decoder + presenter.
fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.init();
tracing::info!(
"lumen-client-rs scaffold (lumen_core ABI v{})",
lumen_core::ABI_VERSION
);
tracing::info!(
"intended flow: lumen_core::Session(client) over UDP → poll_frame → VAAPI decode → wgpu present"
);
}
+42
View File
@@ -0,0 +1,42 @@
[package]
name = "lumen-core"
description = "lumen shared protocol/transport/FEC core, exposed over a stable C ABI"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
[lib]
name = "lumen_core"
# `lib` — so lumen-host / lumen-client-rs / tools link it as a normal Rust crate.
# `staticlib` — `liblumen_core.a` for the C test harness and static embedding.
# `cdylib` — `liblumen_core.{so,dylib}` for Swift/Kotlin clients via the C ABI.
crate-type = ["lib", "cdylib", "staticlib"]
[features]
default = []
# Control-plane QUIC (pairing, config, reverse audio). tokio is permitted ONLY here,
# never on the per-frame hot path. Off by default so the core stays runtime-free.
quic = ["dep:quinn", "dep:tokio"]
[dependencies]
reed-solomon-simd = "3.1" # GF(2^16) Leopard-RS, SIMD, O(n log n) — the wall-breaker (P2)
reed-solomon-erasure = "6.0" # GF(2^8) classic RS — GameStream/Moonlight compat (P1)
aes-gcm = "0.10" # AES-128-GCM session crypto, matches GameStream
zerocopy = { version = "0.8", features = ["derive"] }
bytes = "1"
thiserror = "2"
tracing = { version = "0.1", default-features = false, features = ["std"] }
rand = "0.9"
zeroize = "1"
quinn = { version = "0.11", optional = true }
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] }
[dev-dependencies]
proptest = "1"
[build-dependencies]
cbindgen = "0.29"
+32
View File
@@ -0,0 +1,32 @@
//! Generate the C header (`include/lumen_core.h`) from the `extern "C"` surface.
//!
//! cbindgen failure is a warning, not a hard error, so the crate still builds in minimal
//! environments (e.g. a CI image without the full toolchain); the header is checked in.
use std::env;
use std::path::PathBuf;
fn main() {
println!("cargo:rerun-if-changed=src/abi.rs");
println!("cargo:rerun-if-changed=src/config.rs");
println!("cargo:rerun-if-changed=src/input.rs");
println!("cargo:rerun-if-changed=cbindgen.toml");
let crate_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR");
// Workspace-level include/ dir: crates/lumen-core/ -> ../../include/
let out = PathBuf::from(&crate_dir)
.join("..")
.join("..")
.join("include")
.join("lumen_core.h");
match cbindgen::generate(&crate_dir) {
Ok(bindings) => {
bindings.write_to_file(&out);
println!("cargo:warning=lumen-core: wrote {}", out.display());
}
Err(e) => {
println!("cargo:warning=lumen-core: cbindgen failed ({e}); header not regenerated");
}
}
}
+28
View File
@@ -0,0 +1,28 @@
language = "C"
pragma_once = true
include_guard = "LUMEN_CORE_H"
autogen_warning = "/* Generated by cbindgen from lumen-core. Do not edit by hand. */"
header = "/* lumen-core C ABI — see crates/lumen-core/src/abi.rs */"
style = "type"
cpp_compat = true
tab_width = 4
documentation = true
documentation_style = "c99"
[parse]
parse_deps = false
[export.rename]
"InputEvent" = "LumenInputEvent"
"InputKind" = "LumenInputKind"
# QualifiedScreamingSnakeCase already qualifies each variant with the enum name
# (LumenStatus::Ok -> LUMEN_STATUS_OK); do NOT also set prefix_with_name or it doubles.
[enum]
rename_variants = "QualifiedScreamingSnakeCase"
[fn]
sort_by = "None"
[struct]
derive_eq = false
+443
View File
@@ -0,0 +1,443 @@
//! The stable `extern "C"` surface. `cbindgen` turns this module into
//! `include/lumen_core.h` (see `build.rs`).
//!
//! ## Principles (plan §5)
//! - Opaque handles only: C sees `LumenSession*`, never a Rust type's fields.
//! - All cross-boundary structs are `#[repr(C)]`; buffers are pointer + length.
//! - Explicit ownership: every handle from `*_new` / `*_pair` must be passed to
//! [`lumen_session_free`]. A [`LumenFrame`]'s `data` is borrowed until the next
//! `poll`/`free` on that session — copy it out before then.
//! - Versioned: [`lumen_abi_version`] + `LumenConfig::struct_size` for forward-compat.
//! - Panics never cross the boundary: every entry point is wrapped in `catch_unwind`.
use crate::config::{Config, FecConfig, FecScheme, ProtocolPhase, Role};
use crate::error::LumenStatus;
use crate::input::InputEvent;
use crate::session::Session;
use crate::stats::Stats;
use crate::transport::{loopback_pair, Transport, UdpTransport};
use std::ffi::{c_void, CStr};
use std::os::raw::c_char;
use std::panic::AssertUnwindSafe;
use std::ptr;
/// Opaque session handle. Pointer-only from C.
pub struct LumenSession {
inner: Session,
/// Keeps the most recently polled frame alive so [`LumenFrame::data`] stays valid
/// until the next poll or free.
last_frame: Option<crate::session::Frame>,
input_cb: Option<(LumenInputCb, *mut c_void)>,
}
/// Forward-compatible session configuration. The caller MUST set `struct_size` to
/// `sizeof(LumenConfig)`; the core uses it to detect ABI skew.
#[repr(C)]
#[derive(Clone, Copy)]
pub struct LumenConfig {
pub struct_size: u32,
/// 0 = host, 1 = client.
pub role: u32,
/// 1 = P1 (GameStream-compatible), 2 = P2 (`lumen/1`).
pub phase: u32,
/// 0 = GF(2⁸), 1 = GF(2¹⁶).
pub fec_scheme: u32,
pub fec_percent: u32,
pub max_data_per_block: u32,
pub shard_payload: u32,
/// Non-zero enables AES-128-GCM.
pub encrypt: u32,
pub key: [u8; 16],
pub salt: [u8; 4],
/// Test hook for the loopback transport; 0 in production.
pub loopback_drop_period: u32,
/// Largest encoded access unit the receiver will accept (bounds reassembler memory).
pub max_frame_bytes: u64,
}
impl LumenConfig {
fn to_config(self) -> Result<Config, LumenStatus> {
let role = match self.role {
0 => Role::Host,
1 => Role::Client,
_ => return Err(LumenStatus::InvalidArg),
};
let phase = match self.phase {
1 => ProtocolPhase::P1GameStream,
2 => ProtocolPhase::P2Lumen,
_ => return Err(LumenStatus::InvalidArg),
};
// Range-check before narrowing: a `300` fec_percent or `65600` block size must be
// rejected, not silently truncated to a valid-looking value.
let scheme = u8::try_from(self.fec_scheme)
.ok()
.and_then(FecScheme::from_u8)
.ok_or(LumenStatus::InvalidArg)?;
let fec_percent = u8::try_from(self.fec_percent).map_err(|_| LumenStatus::InvalidArg)?;
let max_data_per_block =
u16::try_from(self.max_data_per_block).map_err(|_| LumenStatus::InvalidArg)?;
let cfg = Config {
role,
phase,
fec: FecConfig {
scheme,
fec_percent,
max_data_per_block,
},
shard_payload: self.shard_payload as usize,
max_frame_bytes: self.max_frame_bytes as usize,
encrypt: self.encrypt != 0,
key: self.key,
salt: self.salt,
loopback_drop_period: self.loopback_drop_period,
};
cfg.validate().map_err(|e| e.status())?;
Ok(cfg)
}
}
/// Read a `LumenConfig` from a caller pointer, enforcing the `struct_size` ABI-skew
/// guard *before* reading the whole struct: a caller compiled against a smaller (older)
/// layout is rejected rather than causing an out-of-bounds read.
///
/// # Safety
/// `cfg` must either be null or point to at least its own declared `struct_size` bytes.
unsafe fn config_from_ptr(cfg: *const LumenConfig) -> Result<Config, LumenStatus> {
if cfg.is_null() {
return Err(LumenStatus::NullPointer);
}
// Read only the 4-byte size prefix first to bound the subsequent full read.
let declared = unsafe { std::ptr::addr_of!((*cfg).struct_size).read_unaligned() } as usize;
if declared < std::mem::size_of::<LumenConfig>() {
return Err(LumenStatus::InvalidArg);
}
unsafe { *cfg }.to_config()
}
/// A reassembled access unit. `data`/`len` borrow session-owned memory valid until the
/// next `lumen_client_poll_frame`/`lumen_session_free` on the same session.
#[repr(C)]
pub struct LumenFrame {
pub data: *const u8,
pub len: usize,
pub frame_index: u32,
pub pts_ns: u64,
pub flags: u32,
}
/// Snapshot of session counters.
#[repr(C)]
#[derive(Clone, Copy, Default)]
pub struct LumenStats {
pub frames_submitted: u64,
pub frames_completed: u64,
pub frames_dropped: u64,
pub packets_sent: u64,
pub packets_received: u64,
pub packets_dropped: u64,
pub fec_recovered_shards: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
}
impl From<Stats> for LumenStats {
fn from(s: Stats) -> Self {
LumenStats {
frames_submitted: s.frames_submitted,
frames_completed: s.frames_completed,
frames_dropped: s.frames_dropped,
packets_sent: s.packets_sent,
packets_received: s.packets_received,
packets_dropped: s.packets_dropped,
fec_recovered_shards: s.fec_recovered_shards,
bytes_sent: s.bytes_sent,
bytes_received: s.bytes_received,
}
}
}
/// Host-side callback invoked for each input event drained by `lumen_host_poll_input`.
pub type LumenInputCb = extern "C" fn(event: *const InputEvent, user: *mut c_void);
#[inline]
fn guard<F: FnOnce() -> LumenStatus>(f: F) -> LumenStatus {
std::panic::catch_unwind(AssertUnwindSafe(f)).unwrap_or(LumenStatus::Panic)
}
fn new_handle(session: Session) -> *mut LumenSession {
Box::into_raw(Box::new(LumenSession {
inner: session,
last_frame: None,
input_cb: None,
}))
}
/// Current ABI version. Mismatch with [`crate::ABI_VERSION`] means incompatible core.
#[no_mangle]
pub extern "C" fn lumen_abi_version() -> u32 {
crate::ABI_VERSION
}
/// Create a session over a real UDP transport (`local`/`peer` are `host:port` strings).
/// Returns NULL on error.
///
/// # Safety
/// `cfg`, `local`, `peer` must be valid pointers; the strings must be NUL-terminated.
#[no_mangle]
pub unsafe extern "C" fn lumen_session_new(
cfg: *const LumenConfig,
local: *const c_char,
peer: *const c_char,
) -> *mut LumenSession {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
if cfg.is_null() || local.is_null() || peer.is_null() {
return ptr::null_mut();
}
let config = match unsafe { config_from_ptr(cfg) } {
Ok(c) => c,
Err(_) => return ptr::null_mut(),
};
let local = match unsafe { CStr::from_ptr(local) }.to_str() {
Ok(s) => s,
Err(_) => return ptr::null_mut(),
};
let peer = match unsafe { CStr::from_ptr(peer) }.to_str() {
Ok(s) => s,
Err(_) => return ptr::null_mut(),
};
let transport: Box<dyn Transport> = match UdpTransport::connect(local, peer) {
Ok(t) => Box::new(t),
Err(_) => return ptr::null_mut(),
};
match Session::new(config, transport) {
Ok(s) => new_handle(s),
Err(_) => ptr::null_mut(),
}
}));
result.unwrap_or(ptr::null_mut())
}
/// Create a connected host+client session pair sharing an in-process loopback
/// transport. Test/dev only — exercises the full FEC + framing path without a network.
///
/// # Safety
/// All four pointers must be valid; the two out-params receive owned handles.
#[no_mangle]
pub unsafe extern "C" fn lumen_test_loopback_pair(
host_cfg: *const LumenConfig,
client_cfg: *const LumenConfig,
out_host: *mut *mut LumenSession,
out_client: *mut *mut LumenSession,
) -> LumenStatus {
guard(|| {
if host_cfg.is_null() || client_cfg.is_null() || out_host.is_null() || out_client.is_null()
{
return LumenStatus::NullPointer;
}
let hconf = match unsafe { config_from_ptr(host_cfg) } {
Ok(c) => c,
Err(s) => return s,
};
let cconf = match unsafe { config_from_ptr(client_cfg) } {
Ok(c) => c,
Err(s) => return s,
};
let (ht, ct) = loopback_pair(hconf.loopback_drop_period, cconf.loopback_drop_period);
let hs = match Session::new(hconf, Box::new(ht)) {
Ok(s) => s,
Err(e) => return e.status(),
};
let cs = match Session::new(cconf, Box::new(ct)) {
Ok(s) => s,
Err(e) => return e.status(),
};
unsafe {
*out_host = new_handle(hs);
*out_client = new_handle(cs);
}
LumenStatus::Ok
})
}
/// Free a session handle. Safe to call with NULL.
///
/// # Safety
/// `s` must be a handle from `lumen_session_new`/`lumen_test_loopback_pair`, freed once.
#[no_mangle]
pub unsafe extern "C" fn lumen_session_free(s: *mut LumenSession) {
if !s.is_null() {
drop(unsafe { Box::from_raw(s) });
}
}
/// Host: FEC-protect, packetize, seal and send one encoded access unit.
///
/// # Safety
/// `s` is a valid host handle; `data` points to `len` readable bytes (or `len == 0`).
#[no_mangle]
pub unsafe extern "C" fn lumen_host_submit_frame(
s: *mut LumenSession,
data: *const u8,
len: usize,
pts_ns: u64,
flags: u32,
) -> LumenStatus {
guard(|| {
let s = match unsafe { s.as_mut() } {
Some(s) => s,
None => return LumenStatus::NullPointer,
};
if data.is_null() && len != 0 {
return LumenStatus::NullPointer;
}
let slice = if len == 0 {
&[][..]
} else {
unsafe { std::slice::from_raw_parts(data, len) }
};
match s.inner.submit_frame(slice, pts_ns, flags) {
Ok(()) => LumenStatus::Ok,
Err(e) => e.status(),
}
})
}
/// Client: poll for the next reassembled access unit. Returns [`LumenStatus::NoFrame`]
/// when nothing is ready yet. On `Ok`, `*out` borrows session memory until the next poll.
///
/// # Safety
/// `s` is a valid client handle; `out` points to a writable `LumenFrame`.
#[no_mangle]
pub unsafe extern "C" fn lumen_client_poll_frame(
s: *mut LumenSession,
out: *mut LumenFrame,
) -> LumenStatus {
guard(|| {
let s = match unsafe { s.as_mut() } {
Some(s) => s,
None => return LumenStatus::NullPointer,
};
if out.is_null() {
return LumenStatus::NullPointer;
}
match s.inner.poll_frame() {
Ok(frame) => {
s.last_frame = Some(frame);
let f = s.last_frame.as_ref().unwrap();
unsafe {
*out = LumenFrame {
data: f.data.as_ptr(),
len: f.data.len(),
frame_index: f.frame_index,
pts_ns: f.pts_ns,
flags: f.flags,
};
}
LumenStatus::Ok
}
Err(e) => e.status(),
}
})
}
/// Client: serialize and send one input event to the host.
///
/// # Safety
/// `s` is a valid client handle; `ev` points to a valid [`InputEvent`].
#[no_mangle]
pub unsafe extern "C" fn lumen_send_input(
s: *mut LumenSession,
ev: *const InputEvent,
) -> LumenStatus {
guard(|| {
let s = match unsafe { s.as_mut() } {
Some(s) => s,
None => return LumenStatus::NullPointer,
};
let ev = match unsafe { ev.as_ref() } {
Some(e) => e,
None => return LumenStatus::NullPointer,
};
match s.inner.send_input(ev) {
Ok(()) => LumenStatus::Ok,
Err(e) => e.status(),
}
})
}
/// Register the host-side input callback (pass a NULL fn pointer to clear). The callback
/// fires from within [`lumen_host_poll_input`], on the calling thread.
///
/// # Safety
/// `s` is a valid host handle; `user` is passed back verbatim to `cb`.
#[no_mangle]
pub unsafe extern "C" fn lumen_set_input_callback(
s: *mut LumenSession,
// Written as an explicit `Option<fn>` (not the `LumenInputCb` alias) so cbindgen
// emits a nullable C function pointer rather than an opaque wrapper struct.
cb: Option<extern "C" fn(event: *const InputEvent, user: *mut c_void)>,
user: *mut c_void,
) -> LumenStatus {
guard(|| {
let s = match unsafe { s.as_mut() } {
Some(s) => s,
None => return LumenStatus::NullPointer,
};
s.input_cb = cb.map(|c| (c, user));
LumenStatus::Ok
})
}
/// Host: drain all pending input events, invoking the registered callback for each.
/// Returns the count dispatched (≥ 0), or a negative [`LumenStatus`] on error.
///
/// # Safety
/// `s` is a valid host handle.
#[no_mangle]
pub unsafe extern "C" fn lumen_host_poll_input(s: *mut LumenSession) -> i32 {
let r = std::panic::catch_unwind(AssertUnwindSafe(|| {
let s = match unsafe { s.as_mut() } {
Some(s) => s,
None => return LumenStatus::NullPointer as i32,
};
let cb = s.input_cb;
let mut count = 0i32;
loop {
match s.inner.poll_input() {
Ok(Some(ev)) => {
if let Some((cb, user)) = cb {
cb(&ev as *const InputEvent, user);
}
count += 1;
}
Ok(None) => break,
Err(e) => return e.status() as i32,
}
}
count
}));
r.unwrap_or(LumenStatus::Panic as i32)
}
/// Copy session counters into `*out`.
///
/// # Safety
/// `s` is a valid handle; `out` points to a writable `LumenStats`.
#[no_mangle]
pub unsafe extern "C" fn lumen_get_stats(
s: *mut LumenSession,
out: *mut LumenStats,
) -> LumenStatus {
guard(|| {
let s = match unsafe { s.as_ref() } {
Some(s) => s,
None => return LumenStatus::NullPointer,
};
if out.is_null() {
return LumenStatus::NullPointer;
}
let stats = s.inner.stats();
unsafe { *out = LumenStats::from(stats) };
LumenStatus::Ok
})
}
+231
View File
@@ -0,0 +1,231 @@
//! Session configuration and protocol/FEC parameters.
use crate::error::{LumenError, Result};
use crate::packet::{CRYPTO_OVERHEAD, HEADER_LEN, MAX_DATAGRAM_BYTES};
use zeroize::Zeroize;
/// Which side of the stream this session drives.
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Role {
Host = 0,
Client = 1,
}
/// Negotiated protocol generation. P1 is GameStream-compatible (GF(2⁸)); P2 is the
/// `lumen/1` extension (GF(2¹⁶), multi-block framing, optional QUIC control).
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ProtocolPhase {
P1GameStream = 1,
P2Lumen = 2,
}
/// Erasure-coding field. Mirrors the on-wire `fec_scheme` tag.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FecScheme {
/// GF(2⁸) classic RS — Moonlight/GameStream compatible, ≤ 255 shards/block.
Gf8 = 0,
/// GF(2¹⁶) Leopard-RS — SIMD, O(n log n), up to 65535 shards/block.
Gf16 = 1,
}
impl FecScheme {
pub fn from_u8(v: u8) -> Option<FecScheme> {
match v {
0 => Some(FecScheme::Gf8),
1 => Some(FecScheme::Gf16),
_ => None,
}
}
/// Hard per-block total-shard ceiling for the field (data + recovery).
pub fn max_total_shards(self) -> usize {
match self {
FecScheme::Gf8 => 255,
FecScheme::Gf16 => u16::MAX as usize, // wire fields are u16
}
}
}
/// A client-sized display mode the host should produce on the virtual output.
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Mode {
pub width: u32,
pub height: u32,
pub refresh_hz: u32,
}
/// Per-block FEC parameters. Recovery count is derived from `fec_percent` exactly as
/// GameStream does: `m = ceil(k * fec_percent / 100)`.
#[derive(Clone, Copy, Debug)]
pub struct FecConfig {
pub scheme: FecScheme,
/// Recovery overhead as a percentage of data shards (0 disables FEC).
pub fec_percent: u8,
/// Maximum data shards per FEC block; larger frames split into multiple blocks.
/// GF(2⁸) is bounded at 255 total shards, so keep this ≤ ~200 for `Gf8`.
pub max_data_per_block: u16,
}
impl FecConfig {
/// Recovery (parity) shard count for a block of `data_shards` shards.
pub fn recovery_for(&self, data_shards: usize) -> usize {
if self.fec_percent == 0 || data_shards == 0 {
return 0;
}
// ceil(k * pct / 100)
(data_shards * self.fec_percent as usize).div_ceil(100)
}
}
/// Largest shard payload that still fits a datagram once header + crypto overhead are
/// added. Bounds `shard_payload` so packets never exceed [`MAX_DATAGRAM_BYTES`].
pub const fn max_shard_payload() -> usize {
MAX_DATAGRAM_BYTES - HEADER_LEN - CRYPTO_OVERHEAD
}
/// Everything needed to construct a [`Session`](crate::session::Session).
///
/// `Debug` is implemented by hand to redact `key`/`salt`, and `key`/`salt` are zeroized
/// on drop, so secrets neither leak into logs nor linger in freed memory.
#[derive(Clone)]
pub struct Config {
pub role: Role,
pub phase: ProtocolPhase,
pub fec: FecConfig,
/// Shard payload bytes per packet. Must be even and ≤ [`max_shard_payload`].
pub shard_payload: usize,
/// Largest encoded access unit the reassembler will accept (bounds memory against
/// hostile/corrupt headers; see [`Session`](crate::session::Session)).
pub max_frame_bytes: usize,
pub encrypt: bool,
/// AES-128 session key established during pairing. MUST be unique per session when
/// `encrypt` is set (see the nonce-uniqueness contract in [`crate::crypto`]).
pub key: [u8; 16],
/// Per-session nonce salt, established alongside `key` during pairing. MUST be
/// unique per (key, session).
pub salt: [u8; 4],
/// Test hook: when non-zero, the loopback transport deterministically drops one of
/// every `loopback_drop_period` packets it sends. 0 = lossless.
pub loopback_drop_period: u32,
}
impl Drop for Config {
fn drop(&mut self) {
self.key.zeroize();
self.salt.zeroize();
}
}
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config")
.field("role", &self.role)
.field("phase", &self.phase)
.field("fec", &self.fec)
.field("shard_payload", &self.shard_payload)
.field("max_frame_bytes", &self.max_frame_bytes)
.field("encrypt", &self.encrypt)
.field("key", &"<redacted>")
.field("salt", &"<redacted>")
.field("loopback_drop_period", &self.loopback_drop_period)
.finish()
}
}
impl Config {
/// Validate every invariant the hot path and the reassembler rely on. Rejecting here
/// is what keeps the receive-side parser's allocations bounded.
pub fn validate(&self) -> Result<()> {
if self.shard_payload == 0 || self.shard_payload % 2 != 0 {
return Err(LumenError::InvalidArg("shard_payload must be even and > 0"));
}
if self.shard_payload > max_shard_payload() {
return Err(LumenError::InvalidArg(
"shard_payload too large to fit a datagram (header + crypto overhead)",
));
}
if self.fec.max_data_per_block == 0 {
return Err(LumenError::InvalidArg("max_data_per_block must be > 0"));
}
// The per-block total (data + recovery) must fit both the field ceiling and the
// u16 wire fields.
let k = self.fec.max_data_per_block as usize;
let total = k + self.fec.recovery_for(k);
if total > self.fec.scheme.max_total_shards() {
return Err(LumenError::InvalidArg(
"max_data_per_block + recovery exceeds the FEC scheme's shard ceiling",
));
}
if self.max_frame_bytes == 0 {
return Err(LumenError::InvalidArg("max_frame_bytes must be > 0"));
}
// The frame must not need more FEC blocks than the u16 block-count field allows.
let total_data = self.max_frame_bytes.div_ceil(self.shard_payload).max(1);
let max_blocks = total_data.div_ceil(k).max(1);
if max_blocks > u16::MAX as usize {
return Err(LumenError::InvalidArg(
"max_frame_bytes too large for this shard/block configuration (block count overflows u16)",
));
}
if self.encrypt && self.key == [0u8; 16] {
return Err(LumenError::InvalidArg(
"encrypt requires a non-zero session key (see crypto nonce-uniqueness contract)",
));
}
Ok(())
}
/// Sensible P1 defaults: GF(2⁸), 15% FEC, ~1 KiB shards, no encryption, 64 MiB frame
/// cap. When enabling encryption, replace `key`/`salt` with per-session values from
/// pairing — the all-zero defaults are rejected by [`validate`](Self::validate).
pub fn p1_defaults(role: Role) -> Self {
Config {
role,
phase: ProtocolPhase::P1GameStream,
fec: FecConfig {
scheme: FecScheme::Gf8,
fec_percent: 15,
max_data_per_block: 200,
},
shard_payload: 1024,
max_frame_bytes: 64 * 1024 * 1024,
encrypt: false,
key: [0u8; 16],
salt: [0u8; 4],
loopback_drop_period: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rejects_encrypt_with_zero_key() {
let mut c = Config::p1_defaults(Role::Host);
c.encrypt = true; // key is still all-zero
assert!(c.validate().is_err());
c.key = [1u8; 16];
assert!(c.validate().is_ok());
}
#[test]
fn rejects_oversized_shard_payload() {
let mut c = Config::p1_defaults(Role::Host);
c.shard_payload = max_shard_payload() + 2; // still even, but won't fit a datagram
assert!(c.validate().is_err());
}
#[test]
fn rejects_block_exceeding_scheme_ceiling() {
let mut c = Config::p1_defaults(Role::Host); // Gf8, ceiling 255
c.fec.max_data_per_block = 250;
c.fec.fec_percent = 15; // 250 + ceil(250*15/100)=288 > 255
assert!(c.validate().is_err());
}
}
+149
View File
@@ -0,0 +1,149 @@
//! AES-128-GCM session sealing, matching GameStream's video crypto in P1.
//!
//! ## Nonce uniqueness (the GCM safety requirement)
//!
//! The 96-bit nonce is `salt (4 bytes) || sequence (8 bytes, big-endian)`. Reusing a
//! `(key, nonce)` pair under AES-GCM is catastrophic, so two precautions apply:
//!
//! 1. **Per-direction salts.** Host and client share one `key` and `salt`, and each
//! counts its sequence from 0. To stop the host's video stream and the client's input
//! stream from colliding on `(key, nonce)`, the top bit of `salt[0]` is set to the
//! sender's direction — so the two directions occupy disjoint nonce spaces.
//! 2. **Per-session key+salt.** The pairing layer MUST hand each session a fresh
//! `(key, salt)`; reusing them across sessions reintroduces nonce reuse. `Config`'s
//! all-zero key with `encrypt = true` is rejected by `Config::validate` to catch the
//! obvious footgun.
//!
//! The sequence number is also passed as AEAD associated data, so tampering with the
//! on-wire sequence is detected (the tag check fails) rather than silently shifting the
//! nonce. Note: this layer does not provide anti-replay — see `Session`.
use crate::config::Role;
use crate::error::{LumenError, Result};
use aes_gcm::aead::{Aead, KeyInit, Payload};
use aes_gcm::{Aes128Gcm, Key, Nonce};
/// 16-byte AEAD authentication tag appended by GCM.
pub const TAG_LEN: usize = 16;
pub struct SessionCrypto {
cipher: Aes128Gcm,
/// Salt for nonces we seal with (our direction).
send_salt: [u8; 4],
/// Salt for nonces we open with (the peer's direction).
recv_salt: [u8; 4],
}
impl SessionCrypto {
pub fn new(key: &[u8; 16], salt: [u8; 4], role: Role) -> Self {
let key = Key::<Aes128Gcm>::from_slice(key);
let own = direction(role);
SessionCrypto {
cipher: Aes128Gcm::new(key),
send_salt: dir_salt(salt, own),
recv_salt: dir_salt(salt, own ^ 1),
}
}
/// Seal `plaintext` for sequence `seq`, returning `ciphertext || tag`. `seq` is
/// authenticated as associated data.
pub fn seal(&self, seq: u64, plaintext: &[u8]) -> Result<Vec<u8>> {
let nonce = nonce(self.send_salt, seq);
self.cipher
.encrypt(
Nonce::from_slice(&nonce),
Payload {
msg: plaintext,
aad: &seq.to_be_bytes(),
},
)
.map_err(|_| LumenError::Crypto)
}
/// Open `ciphertext || tag` for sequence `seq` (also bound as associated data).
pub fn open(&self, seq: u64, ciphertext: &[u8]) -> Result<Vec<u8>> {
let nonce = nonce(self.recv_salt, seq);
self.cipher
.decrypt(
Nonce::from_slice(&nonce),
Payload {
msg: ciphertext,
aad: &seq.to_be_bytes(),
},
)
.map_err(|_| LumenError::Crypto)
}
}
fn direction(role: Role) -> u8 {
match role {
Role::Host => 0,
Role::Client => 1,
}
}
/// Fold a 1-bit direction into the salt (top bit of `salt[0]`) so the two directions of
/// a session never share a nonce under the same key.
fn dir_salt(mut salt: [u8; 4], dir: u8) -> [u8; 4] {
salt[0] = (salt[0] & 0x7f) | (dir << 7);
salt
}
fn nonce(salt: [u8; 4], seq: u64) -> [u8; 12] {
let mut n = [0u8; 12];
n[..4].copy_from_slice(&salt);
n[4..].copy_from_slice(&seq.to_be_bytes());
n
}
/// Generate a fresh random AES-128 session key (control-plane / pairing use).
pub fn random_key() -> [u8; 16] {
let mut k = [0u8; 16];
rand::RngCore::fill_bytes(&mut rand::rng(), &mut k);
k
}
/// Generate a fresh random per-session nonce salt.
pub fn random_salt() -> [u8; 4] {
let mut s = [0u8; 4];
rand::RngCore::fill_bytes(&mut rand::rng(), &mut s);
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn seal_open_roundtrip_cross_direction() {
let key = random_key();
let salt = random_salt();
let host = SessionCrypto::new(&key, salt, Role::Host);
let client = SessionCrypto::new(&key, salt, Role::Client);
let msg = b"the quick brown fox";
let sealed = host.seal(42, msg).unwrap(); // host -> client (video direction)
assert_ne!(&sealed[..msg.len()], &msg[..]); // actually encrypted
assert_eq!(sealed.len(), msg.len() + TAG_LEN);
assert_eq!(client.open(42, &sealed).unwrap(), msg);
// Wrong sequence (nonce + AAD) → authentication failure.
assert!(client.open(43, &sealed).is_err());
// Direction separation: the host opens with the peer (client) salt, so it cannot
// open its own outbound packet → distinct nonce spaces per direction.
assert!(host.open(42, &sealed).is_err());
}
#[test]
fn directions_use_distinct_nonce_spaces() {
let key = random_key();
let salt = [0u8; 4]; // even an all-zero base salt must separate the directions
let host = SessionCrypto::new(&key, salt, Role::Host);
let client = SessionCrypto::new(&key, salt, Role::Client);
// Same seq, same key, opposite directions → different ciphertext (no reuse).
assert_ne!(
host.seal(0, b"abc").unwrap(),
client.seal(0, b"abc").unwrap()
);
}
}
+56
View File
@@ -0,0 +1,56 @@
//! Error type and the stable C ABI status codes it maps to.
use thiserror::Error;
/// The core's internal error type. Crosses the C ABI as a [`LumenStatus`] code.
#[derive(Debug, Error)]
pub enum LumenError {
#[error("invalid argument: {0}")]
InvalidArg(&'static str),
#[error("fec error: {0}")]
Fec(#[from] crate::fec::FecError),
#[error("crypto seal/open failed")]
Crypto,
#[error("malformed packet")]
BadPacket,
#[error("no complete frame available yet")]
NoFrame,
#[error("unsupported: {0}")]
Unsupported(&'static str),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
pub type Result<T> = core::result::Result<T, LumenError>;
/// Stable C ABI status codes. `Ok` is 0; all errors are negative so callers can
/// test `rc < 0`. Do not renumber existing variants — only append.
#[repr(i32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LumenStatus {
Ok = 0,
InvalidArg = -1,
Fec = -2,
Crypto = -3,
BadPacket = -4,
NoFrame = -5,
Unsupported = -6,
Io = -7,
NullPointer = -8,
Panic = -99,
}
impl LumenError {
/// Map to the C ABI status code.
pub fn status(&self) -> LumenStatus {
match self {
LumenError::InvalidArg(_) => LumenStatus::InvalidArg,
LumenError::Fec(_) => LumenStatus::Fec,
LumenError::Crypto => LumenStatus::Crypto,
LumenError::BadPacket => LumenStatus::BadPacket,
LumenError::NoFrame => LumenStatus::NoFrame,
LumenError::Unsupported(_) => LumenStatus::Unsupported,
LumenError::Io(_) => LumenStatus::Io,
}
}
}
+84
View File
@@ -0,0 +1,84 @@
//! GF(2¹⁶) Leopard-RS backend (`reed-solomon-simd`). SIMD, O(n log n), up to 65535
//! shards/block — this is what removes the GameStream 255-shard / ~1 Gbps wall.
//! Shard length must be even.
use super::{validate_block_shape, validate_encode_shape, ErasureCoder, FecError};
use crate::config::FecScheme;
pub struct Gf16Coder;
impl ErasureCoder for Gf16Coder {
fn scheme(&self) -> FecScheme {
FecScheme::Gf16
}
fn encode(&self, data: &[Vec<u8>], recovery_count: usize) -> Result<Vec<Vec<u8>>, FecError> {
if recovery_count == 0 {
return Ok(Vec::new());
}
validate_encode_shape(data)?;
let k = data.len();
if data[0].len() % 2 != 0 {
return Err(FecError::Config("GF(2^16) shard length must be even"));
}
reed_solomon_simd::encode(k, recovery_count, data)
.map_err(|_| FecError::Backend("gf16 encode"))
}
fn reconstruct(
&self,
data_count: usize,
recovery_count: usize,
received: &mut [Option<Vec<u8>>],
) -> Result<Vec<Vec<u8>>, FecError> {
validate_block_shape(received, data_count, recovery_count)?;
let present = received.iter().filter(|s| s.is_some()).count();
if present < data_count {
return Err(FecError::TooFewShards {
have: present,
need: data_count,
});
}
// Fast path: all originals already present, or FEC disabled.
let originals_complete = received[..data_count].iter().all(|s| s.is_some());
if recovery_count == 0 || originals_complete {
let mut out = Vec::with_capacity(data_count);
for slot in received.iter().take(data_count) {
out.push(slot.clone().ok_or(FecError::TooFewShards {
have: present,
need: data_count,
})?);
}
return Ok(out);
}
// Hand the decoder the surviving originals and recovery shards, indexed.
let original_in: Vec<(usize, &[u8])> = received[..data_count]
.iter()
.enumerate()
.filter_map(|(i, s)| s.as_deref().map(|b| (i, b)))
.collect();
let recovery_in: Vec<(usize, &[u8])> = received[data_count..data_count + recovery_count]
.iter()
.enumerate()
.filter_map(|(j, s)| s.as_deref().map(|b| (j, b)))
.collect();
let restored =
reed_solomon_simd::decode(data_count, recovery_count, original_in, recovery_in)
.map_err(|_| FecError::Backend("gf16 decode"))?;
// Merge surviving originals with the recovered ones.
let mut out: Vec<Vec<u8>> = Vec::with_capacity(data_count);
for (i, slot) in received[..data_count].iter().enumerate() {
if let Some(s) = slot {
out.push(s.clone());
} else if let Some(s) = restored.get(&i) {
out.push(s.clone());
} else {
return Err(FecError::Backend("gf16 decode left an original missing"));
}
}
Ok(out)
}
}
+71
View File
@@ -0,0 +1,71 @@
//! GF(2⁸) classic ReedSolomon backend (`reed-solomon-erasure`), equivalent to the
//! `nanors` library Moonlight uses. Hard ceiling: data + recovery ≤ 255 shards/block.
use super::{validate_block_shape, validate_encode_shape, ErasureCoder, FecError};
use crate::config::FecScheme;
use reed_solomon_erasure::galois_8::ReedSolomon;
pub struct Gf8Coder;
impl ErasureCoder for Gf8Coder {
fn scheme(&self) -> FecScheme {
FecScheme::Gf8
}
fn encode(&self, data: &[Vec<u8>], recovery_count: usize) -> Result<Vec<Vec<u8>>, FecError> {
if recovery_count == 0 {
return Ok(Vec::new());
}
validate_encode_shape(data)?;
let k = data.len();
let shard_len = data[0].len();
let rs = ReedSolomon::new(k, recovery_count)
.map_err(|_| FecError::Config("invalid GF(2^8) shard counts"))?;
// reed-solomon-erasure fills parity in place: shards = data || zeroed parity.
let mut shards: Vec<Vec<u8>> = Vec::with_capacity(k + recovery_count);
shards.extend_from_slice(data);
shards.resize_with(k + recovery_count, || vec![0u8; shard_len]);
rs.encode(&mut shards)
.map_err(|_| FecError::Backend("gf8 encode"))?;
Ok(shards.split_off(k))
}
fn reconstruct(
&self,
data_count: usize,
recovery_count: usize,
received: &mut [Option<Vec<u8>>],
) -> Result<Vec<Vec<u8>>, FecError> {
validate_block_shape(received, data_count, recovery_count)?;
let present = received.iter().filter(|s| s.is_some()).count();
if present < data_count {
return Err(FecError::TooFewShards {
have: present,
need: data_count,
});
}
if recovery_count == 0 {
// No FEC: every original must already be present.
return collect_originals(received, data_count);
}
let rs = ReedSolomon::new(data_count, recovery_count)
.map_err(|_| FecError::Config("invalid GF(2^8) shard counts"))?;
rs.reconstruct_data(received)
.map_err(|_| FecError::Backend("gf8 reconstruct"))?;
collect_originals(received, data_count)
}
}
fn collect_originals(
received: &[Option<Vec<u8>>],
data_count: usize,
) -> Result<Vec<Vec<u8>>, FecError> {
let mut out = Vec::with_capacity(data_count);
for slot in received.iter().take(data_count) {
out.push(
slot.clone()
.ok_or(FecError::Backend("reconstruction left an original missing"))?,
);
}
Ok(out)
}
+167
View File
@@ -0,0 +1,167 @@
//! Erasure coding. Two backends behind one [`ErasureCoder`] trait: GF(2⁸) (classic
//! ReedSolomon, Moonlight-compatible, P1) and GF(2¹⁶) Leopard-RS (the wall-breaker, P2).
//!
//! The wall this breaks: GameStream's GF(2⁸) RS caps a block at 255 shards, which at
//! 5120×1440@240 is hit around 1 Gbps. GF(2¹⁶) raises that ceiling to 65535 shards and
//! runs in O(n log n) with SIMD, so the per-frame shard count stops being the limiter.
mod gf16;
mod gf8;
pub use gf16::Gf16Coder;
pub use gf8::Gf8Coder;
use crate::config::FecScheme;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum FecError {
#[error("invalid shard configuration: {0}")]
Config(&'static str),
#[error("too few shards to reconstruct (have {have}, need {need})")]
TooFewShards { have: usize, need: usize },
#[error("backend error: {0}")]
Backend(&'static str),
}
/// Backend-agnostic erasure coder. All shards in a block are equal length.
pub trait ErasureCoder: Send + Sync {
fn scheme(&self) -> FecScheme;
/// Encode `data` (K original shards) into `recovery_count` (M) parity shards.
/// Returns the M recovery shards. `recovery_count == 0` returns an empty `Vec`.
fn encode(&self, data: &[Vec<u8>], recovery_count: usize) -> Result<Vec<Vec<u8>>, FecError>;
/// Reconstruct the K original shards. `received` has length K+M: indices `0..K` are
/// originals, `K..K+M` are recovery shards; `Some` = present, `None` = lost.
/// Returns the K original shards in order.
fn reconstruct(
&self,
data_count: usize,
recovery_count: usize,
received: &mut [Option<Vec<u8>>],
) -> Result<Vec<Vec<u8>>, FecError>;
}
/// Construct the coder for a scheme.
pub fn coder_for(scheme: FecScheme) -> Box<dyn ErasureCoder> {
match scheme {
FecScheme::Gf8 => Box::new(Gf8Coder),
FecScheme::Gf16 => Box::new(Gf16Coder),
}
}
/// Validate the shape `reconstruct` promises: `received.len() == data + recovery`, and
/// every present shard shares one length. Both backends call this first so neither the
/// fast path nor a malformed caller can slip mismatched-length or wrong-count shards
/// through (the fast paths bypass the backend's own length checks otherwise).
pub(crate) fn validate_block_shape(
received: &[Option<Vec<u8>>],
data_count: usize,
recovery_count: usize,
) -> Result<(), FecError> {
if received.len() != data_count + recovery_count {
return Err(FecError::Config(
"received length must equal data + recovery",
));
}
let mut len = None;
for s in received.iter().flatten() {
match len {
None => len = Some(s.len()),
Some(l) if l != s.len() => {
return Err(FecError::Config("shards in a block must be equal length"));
}
_ => {}
}
}
Ok(())
}
/// Validate `encode` inputs: at least one data shard, all of equal length.
pub(crate) fn validate_encode_shape(data: &[Vec<u8>]) -> Result<(), FecError> {
let first = data
.first()
.ok_or(FecError::Config("no data shards"))?
.len();
if data.iter().any(|s| s.len() != first) {
return Err(FecError::Config("data shards must be equal length"));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
/// Round-trip a block through a coder, losing exactly `lose` shards (some data,
/// some recovery), and assert the originals come back byte-identical.
fn roundtrip(coder: &dyn ErasureCoder, k: usize, m: usize, shard_len: usize, lose: &[usize]) {
let data: Vec<Vec<u8>> = (0..k)
.map(|i| (0..shard_len).map(|b| (i * 31 + b * 7) as u8).collect())
.collect();
let recovery = coder.encode(&data, m).unwrap();
assert_eq!(recovery.len(), m);
let mut received: Vec<Option<Vec<u8>>> = Vec::with_capacity(k + m);
received.extend(data.iter().cloned().map(Some));
received.extend(recovery.iter().cloned().map(Some));
for &idx in lose {
received[idx] = None;
}
let restored = coder.reconstruct(k, m, &mut received).unwrap();
assert_eq!(restored, data);
}
#[test]
fn gf8_recovers_within_budget() {
// 16 data + 4 recovery; lose 2 data + 2 recovery (== budget).
roundtrip(&Gf8Coder, 16, 4, 256, &[0, 7, 16, 19]);
}
#[test]
fn gf16_recovers_within_budget() {
roundtrip(&Gf16Coder, 16, 4, 256, &[1, 9, 17, 18]);
}
#[test]
fn gf8_too_much_loss_errors() {
let data: Vec<Vec<u8>> = (0..8).map(|_| vec![0u8; 64]).collect();
let recovery = Gf8Coder.encode(&data, 2).unwrap();
let mut received: Vec<Option<Vec<u8>>> = data
.iter()
.cloned()
.map(Some)
.chain(recovery.into_iter().map(Some))
.collect();
// Lose 3 with only 2 recovery shards → unrecoverable.
received[0] = None;
received[1] = None;
received[2] = None;
assert!(Gf16Coder.scheme() == FecScheme::Gf16);
let err = Gf8Coder.reconstruct(8, 2, &mut received);
assert!(err.is_err());
}
#[test]
fn reconstruct_rejects_wrong_received_length() {
// data=2, recovery=2 expects a 4-element slice; a 3-element one must error, not
// panic on the recovery-slice index (both backends).
let mut recv: Vec<Option<Vec<u8>>> = vec![Some(vec![0u8; 8]), None, Some(vec![0u8; 8])];
assert!(Gf16Coder.reconstruct(2, 2, &mut recv).is_err());
let mut recv: Vec<Option<Vec<u8>>> = vec![Some(vec![0u8; 8]), None, Some(vec![0u8; 8])];
assert!(Gf8Coder.reconstruct(2, 2, &mut recv).is_err());
}
#[test]
fn reconstruct_rejects_mismatched_shard_lengths() {
// The GF16 fast path used to clone shards verbatim without a length check.
let mut recv: Vec<Option<Vec<u8>>> =
vec![Some(vec![0u8; 8]), Some(vec![0u8; 6]), None, None];
assert!(Gf16Coder.reconstruct(2, 2, &mut recv).is_err());
let mut recv: Vec<Option<Vec<u8>>> =
vec![Some(vec![0u8; 8]), Some(vec![0u8; 6]), None, None];
assert!(Gf8Coder.reconstruct(2, 2, &mut recv).is_err());
}
}
+113
View File
@@ -0,0 +1,113 @@
//! Input events flowing client → host (and the host-side receive callback).
//!
//! Input rides the same transport as video but on its own wire tag
//! ([`INPUT_MAGIC`]), so a session can demultiplex video from input by the first byte.
/// Wire tag distinguishing an input datagram from a video packet.
pub const INPUT_MAGIC: u8 = 0xC8;
/// Fixed serialized size of an [`InputEvent`] on the wire (tag + fields).
pub const INPUT_WIRE_LEN: usize = 1 + 1 + 4 + 4 + 4 + 4; // = 18
/// Kinds of input event. `#[repr(u8)]` so it crosses the C ABI as a byte tag.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum InputKind {
KeyDown = 0,
KeyUp = 1,
/// Relative motion: `x`/`y` carry `dx`/`dy`.
MouseMove = 2,
/// Absolute motion: `x`/`y` carry pixel coordinates.
MouseMoveAbs = 3,
MouseButtonDown = 4,
MouseButtonUp = 5,
/// `x` carries the (signed) scroll delta.
MouseScroll = 6,
GamepadButton = 7,
/// `code` = axis id, `x` = axis value.
GamepadAxis = 8,
}
impl InputKind {
pub fn from_u8(v: u8) -> Option<InputKind> {
use InputKind::*;
Some(match v {
0 => KeyDown,
1 => KeyUp,
2 => MouseMove,
3 => MouseMoveAbs,
4 => MouseButtonDown,
5 => MouseButtonUp,
6 => MouseScroll,
7 => GamepadButton,
8 => GamepadAxis,
_ => return None,
})
}
}
/// A single input event. `#[repr(C)]` — shared verbatim with the C ABI as
/// `LumenInputEvent`.
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct InputEvent {
pub kind: InputKind,
pub _pad: [u8; 3],
/// keycode / button id / axis id, depending on `kind`.
pub code: u32,
/// x / dx / abs-x / axis-value / scroll-delta, depending on `kind`.
pub x: i32,
/// y / dy / abs-y, depending on `kind`.
pub y: i32,
/// modifier bitmask or gamepad index.
pub flags: u32,
}
impl InputEvent {
/// Serialize to the fixed wire layout (`INPUT_MAGIC` + little-endian fields).
pub fn encode(&self) -> [u8; INPUT_WIRE_LEN] {
let mut b = [0u8; INPUT_WIRE_LEN];
b[0] = INPUT_MAGIC;
b[1] = self.kind as u8;
b[2..6].copy_from_slice(&self.code.to_le_bytes());
b[6..10].copy_from_slice(&self.x.to_le_bytes());
b[10..14].copy_from_slice(&self.y.to_le_bytes());
b[14..18].copy_from_slice(&self.flags.to_le_bytes());
b
}
/// Parse from the wire layout. Returns `None` on bad tag/length/kind.
pub fn decode(buf: &[u8]) -> Option<InputEvent> {
if buf.len() < INPUT_WIRE_LEN || buf[0] != INPUT_MAGIC {
return None;
}
let kind = InputKind::from_u8(buf[1])?;
Some(InputEvent {
kind,
_pad: [0; 3],
code: u32::from_le_bytes(buf[2..6].try_into().unwrap()),
x: i32::from_le_bytes(buf[6..10].try_into().unwrap()),
y: i32::from_le_bytes(buf[10..14].try_into().unwrap()),
flags: u32::from_le_bytes(buf[14..18].try_into().unwrap()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn input_wire_roundtrip() {
let e = InputEvent {
kind: InputKind::MouseMove,
_pad: [0; 3],
code: 0,
x: -12,
y: 34,
flags: 0xABCD,
};
assert_eq!(InputEvent::decode(&e.encode()), Some(e));
assert!(InputEvent::decode(&[0u8; INPUT_WIRE_LEN]).is_none()); // bad magic
}
}
+45
View File
@@ -0,0 +1,45 @@
//! # lumen-core
//!
//! The shared protocol / transport / FEC core for the lumen low-latency streaming
//! stack. It is compiled exactly once and linked by every host and client — directly
//! as a Rust `lib`, or across the [C ABI](crate::abi) by Swift / Kotlin / C clients.
//!
//! Everything platform-specific (capture, encode, decode, present, input injection)
//! lives *outside* this crate. What lives *here*:
//!
//! - [`fec`] — erasure coding. GF(2⁸) for GameStream/Moonlight compatibility (P1) and
//! GF(2¹⁶) Leopard-RS (P2) which removes the ~1 Gbps per-frame shard-count ceiling.
//! - [`packet`] — `#[repr(C)]` zero-copy wire framing: splitting an access unit into
//! FEC blocks of MTU-sized shards and reassembling them on the far side.
//! - [`crypto`] — AES-128-GCM session sealing, matching GameStream in P1.
//! - [`session`] — the host (submit frame → FEC → packetize → seal → send) and client
//! (recv → open → reorder → FEC recover → reassemble) state machines.
//! - [`transport`] — pluggable packet I/O (in-process loopback for tests; UDP for real).
//! - [`abi`] — the `extern "C"` surface and `cbindgen`-generated `lumen_core.h`.
//!
//! ## Threading contract
//!
//! Nothing in the per-frame path touches an async runtime. `tokio`/`quinn` are gated
//! behind the off-by-default `quic` feature and used only for the control plane.
#![forbid(unsafe_op_in_unsafe_fn)]
pub mod abi;
pub mod config;
pub mod crypto;
pub mod error;
pub mod fec;
pub mod input;
pub mod packet;
pub mod session;
pub mod stats;
pub mod transport;
pub use config::{Config, FecConfig, FecScheme, Mode, ProtocolPhase, Role};
pub use error::{LumenError, LumenStatus, Result};
pub use session::{Frame, Session};
pub use stats::Stats;
/// Bump on any breaking change to the [C ABI](crate::abi). Mirrors
/// `lumen_abi_version()` and is checked by clients before use.
pub const ABI_VERSION: u32 = 1;
+581
View File
@@ -0,0 +1,581 @@
//! Zero-copy wire framing: split an access unit into FEC blocks of MTU-sized shards,
//! and reassemble + FEC-recover them on the far side.
//!
//! ## Wire layout
//!
//! Each packet is a fixed [`PacketHeader`] followed by one FEC shard's payload. Fields
//! are host-endian for now (every target platform is little-endian); the `lumen/1` (P2)
//! spec will pin byte order explicitly when we talk to non-LE peers.
//!
//! ## GameStream mapping (P1)
//!
//! `frame_index`↔`frameIndex`, `stream_seq`↔`streamPacketIndex`,
//! (`block_index`,`block_count`)↔the `multiFecBlocks` nibbles, and
//! (`data_shards`,`recovery_shards`,`shard_index`)↔the `fecInfo` bitfield. We carry them
//! as explicit fields rather than bit-packing; full GameStream wire-exactness is an M2
//! concern (it also needs RTP framing + RTSP), this is the coherent internal format.
use crate::config::Config;
use crate::error::{LumenError, Result};
use crate::fec::ErasureCoder;
use crate::session::Frame;
use crate::stats::StatsCounters;
use std::collections::{BTreeMap, HashMap, HashSet};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
/// Identifies a lumen video packet (vs. an input datagram, see [`crate::input`]).
pub const LUMEN_MAGIC: u8 = 0xC9;
// Frame flags (mirroring GameStream's FLAG_*).
pub const FLAG_PIC: u8 = 0x1;
pub const FLAG_EOF: u8 = 0x2;
pub const FLAG_SOF: u8 = 0x4;
/// Crypto framing overhead [`Session`](crate::session::Session) adds when encrypting:
/// an 8-byte sequence prefix plus the GCM tag.
pub const CRYPTO_OVERHEAD: usize = 8 + crate::crypto::TAG_LEN;
/// Largest UDP datagram the core will send or accept. `Config::validate` bounds
/// `shard_payload` so `HEADER_LEN + shard_payload + CRYPTO_OVERHEAD ≤ MAX_DATAGRAM_BYTES`.
pub const MAX_DATAGRAM_BYTES: usize = 2048;
/// How many frames behind the newest the reassembler keeps before pruning stragglers.
const REORDER_WINDOW: u32 = 16;
/// Fixed per-packet header. `#[repr(C)]`, no padding, zero-copy (de)serializable.
#[repr(C)]
#[derive(Clone, Copy, Debug, FromBytes, IntoBytes, KnownLayout, Immutable)]
pub struct PacketHeader {
pub pts_ns: u64,
pub frame_index: u32,
pub stream_seq: u32,
pub frame_bytes: u32,
pub user_flags: u32,
pub block_index: u16,
pub block_count: u16,
pub data_shards: u16,
pub recovery_shards: u16,
pub shard_index: u16,
pub shard_bytes: u16,
pub magic: u8,
pub version: u8,
pub fec_scheme: u8,
pub flags: u8,
}
/// Size of [`PacketHeader`] on the wire (40 bytes).
pub const HEADER_LEN: usize = std::mem::size_of::<PacketHeader>();
const _: () = assert!(HEADER_LEN == 40, "PacketHeader must be 40 bytes / unpadded");
// ---------------------------------------------------------------------------
// Host side: packetization
// ---------------------------------------------------------------------------
/// Splits encoded access units into FEC-protected shard packets. Host-side only.
pub struct Packetizer {
next_frame_index: u32,
next_seq: u32,
shard_payload: usize,
fec: crate::config::FecConfig,
version: u8,
}
impl Packetizer {
pub fn new(config: &Config) -> Self {
Packetizer {
next_frame_index: 0,
next_seq: 0,
shard_payload: config.shard_payload,
fec: config.fec,
version: config.phase as u8,
}
}
/// Packetize one access unit into wire packets (header + shard payload each).
pub fn packetize(
&mut self,
frame: &[u8],
pts_ns: u64,
user_flags: u32,
coder: &dyn ErasureCoder,
) -> Result<Vec<Vec<u8>>> {
let payload = self.shard_payload;
let frame_index = self.next_frame_index;
self.next_frame_index = self.next_frame_index.wrapping_add(1);
// At least one (zero-padded) data shard even for an empty frame.
let total_data = frame.len().div_ceil(payload).max(1);
let max_block = self.fec.max_data_per_block as usize;
let block_count = total_data.div_ceil(max_block).max(1);
let frame_bytes = frame.len() as u32;
// Defend the u16 wire fields against silent truncation. `Config::validate`
// already rejects configs that could reach these for valid frame sizes; this is
// the belt-and-suspenders for a frame larger than the negotiated maximum.
if payload > u16::MAX as usize {
return Err(LumenError::InvalidArg("shard_payload exceeds u16"));
}
if block_count > u16::MAX as usize {
return Err(LumenError::Unsupported(
"frame too large: block count exceeds u16",
));
}
let mut packets = Vec::new();
for b in 0..block_count {
let first = b * max_block;
let last = ((b + 1) * max_block).min(total_data);
let block_data_count = last - first;
// Build this block's data shards (each `payload` bytes, last zero-padded).
let mut data_shards: Vec<Vec<u8>> = Vec::with_capacity(block_data_count);
for s in first..last {
let start = s * payload;
let end = (start + payload).min(frame.len());
let mut shard = vec![0u8; payload];
if start < frame.len() {
shard[..end - start].copy_from_slice(&frame[start..end]);
}
data_shards.push(shard);
}
let recovery_count = self.fec.recovery_for(block_data_count);
let recovery = coder.encode(&data_shards, recovery_count)?;
let total_shards = block_data_count + recovery_count;
if total_shards > u16::MAX as usize {
return Err(LumenError::Unsupported("block shard count exceeds u16"));
}
for shard_index in 0..total_shards {
let body: &[u8] = if shard_index < block_data_count {
&data_shards[shard_index]
} else {
&recovery[shard_index - block_data_count]
};
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
let mut flags = FLAG_PIC;
if b == 0 && shard_index == 0 {
flags |= FLAG_SOF;
}
if b + 1 == block_count && shard_index + 1 == total_shards {
flags |= FLAG_EOF;
}
let hdr = PacketHeader {
pts_ns,
frame_index,
stream_seq: seq,
frame_bytes,
user_flags,
block_index: b as u16,
block_count: block_count as u16,
data_shards: block_data_count as u16,
recovery_shards: recovery_count as u16,
shard_index: shard_index as u16,
shard_bytes: payload as u16,
magic: LUMEN_MAGIC,
version: self.version,
fec_scheme: coder.scheme() as u8,
flags,
};
let mut pkt = Vec::with_capacity(HEADER_LEN + body.len());
pkt.extend_from_slice(hdr.as_bytes());
pkt.extend_from_slice(body);
packets.push(pkt);
}
}
Ok(packets)
}
}
// ---------------------------------------------------------------------------
// Client side: reassembly + FEC recovery
// ---------------------------------------------------------------------------
struct BlockBuf {
data_shards: usize,
recovery_shards: usize,
shard_bytes: usize,
/// Length `data_shards + recovery_shards`; `Some` = received.
shards: Vec<Option<Vec<u8>>>,
received: usize,
done: bool,
}
struct FrameBuf {
frame_bytes: usize,
block_count: usize,
pts_ns: u64,
user_flags: u32,
blocks: HashMap<u16, BlockBuf>,
/// Reconstructed payload per completed block, ordered by block index.
block_data: BTreeMap<u16, Vec<u8>>,
}
/// Per-session bounds the reassembler enforces on every packet header *before*
/// allocating, so a hostile or corrupt header cannot drive unbounded memory use. All
/// derived from the negotiated [`Config`].
#[derive(Clone, Copy, Debug)]
pub struct ReassemblerLimits {
/// Expected shard payload length; every shard in the stream must match exactly.
pub shard_bytes: usize,
/// Max data shards per block (the negotiated `max_data_per_block`).
pub max_data_shards: usize,
/// Max total shards per block (data + recovery), capped by the FEC scheme ceiling.
pub max_total_shards: usize,
/// Max FEC blocks per frame.
pub max_blocks: usize,
/// Max accepted access-unit size.
pub max_frame_bytes: usize,
}
impl ReassemblerLimits {
pub fn from_config(c: &Config) -> Self {
let max_data = c.fec.max_data_per_block as usize;
let max_total =
(max_data + c.fec.recovery_for(max_data)).min(c.fec.scheme.max_total_shards());
let total_data = c.max_frame_bytes.div_ceil(c.shard_payload.max(1)).max(1);
ReassemblerLimits {
shard_bytes: c.shard_payload,
max_data_shards: max_data,
max_total_shards: max_total,
max_blocks: total_data.div_ceil(max_data).max(1),
max_frame_bytes: c.max_frame_bytes,
}
}
}
/// Buffers incoming shards, recovers lost ones via FEC, and emits whole access units.
/// Client-side only.
pub struct Reassembler {
limits: ReassemblerLimits,
frames: HashMap<u32, FrameBuf>,
/// Recently-emitted frames, so stray/late shards can't resurrect them. Pruned to
/// the reorder window alongside `frames`.
completed: HashSet<u32>,
newest_frame: Option<u32>,
}
impl Reassembler {
pub fn new(limits: ReassemblerLimits) -> Self {
Reassembler {
limits,
frames: HashMap::new(),
completed: HashSet::new(),
newest_frame: None,
}
}
/// Ingest one (already-decrypted) packet. Returns the access unit when its last
/// block completes, otherwise `None`.
pub fn push(
&mut self,
pkt: &[u8],
coder: &dyn ErasureCoder,
stats: &StatsCounters,
) -> Result<Option<Frame>> {
// On a lossy datagram link a malformed or non-video packet is dropped, never
// fatal: it must not abort `poll_frame`. Only a genuine FEC reconstruction
// failure propagates as an error.
if pkt.len() < HEADER_LEN {
StatsCounters::add(&stats.packets_dropped, 1);
return Ok(None);
}
let hdr = match PacketHeader::read_from_bytes(&pkt[..HEADER_LEN]) {
Ok(h) => h,
Err(_) => {
StatsCounters::add(&stats.packets_dropped, 1);
return Ok(None);
}
};
let lim = self.limits;
let shard_bytes = hdr.shard_bytes as usize;
let data_shards = hdr.data_shards as usize;
let recovery_shards = hdr.recovery_shards as usize;
let total = data_shards + recovery_shards;
let shard_index = hdr.shard_index as usize;
let block_count = hdr.block_count as usize;
let frame_bytes = hdr.frame_bytes as usize;
// Bound every attacker-controllable header field against the negotiated limits
// BEFORE allocating anything keyed on it — this is the firewall against a tiny
// datagram triggering a huge `vec![None; total]` / `Vec::with_capacity`.
let drop = |stats: &StatsCounters| {
StatsCounters::add(&stats.packets_dropped, 1);
};
if hdr.magic != LUMEN_MAGIC
|| shard_bytes != lim.shard_bytes
|| pkt.len() < HEADER_LEN + shard_bytes
|| data_shards == 0
|| data_shards > lim.max_data_shards
|| total == 0
|| total > lim.max_total_shards
|| shard_index >= total
|| block_count == 0
|| block_count > lim.max_blocks
|| hdr.block_index as usize >= block_count
|| frame_bytes > lim.max_frame_bytes
{
drop(stats);
return Ok(None);
}
let payload = pkt[HEADER_LEN..HEADER_LEN + shard_bytes].to_vec();
self.advance_window(hdr.frame_index, stats);
// Drop shards for frames we've already emitted (e.g. the recovery shards of a
// frame that completed early via the all-originals-present fast path) or that
// have fallen out of the reorder window.
if self.completed.contains(&hdr.frame_index) || self.is_stale(hdr.frame_index) {
drop(stats);
return Ok(None);
}
// First packet of a frame establishes its geometry; later packets must agree.
let frame = self
.frames
.entry(hdr.frame_index)
.or_insert_with(|| FrameBuf {
frame_bytes,
block_count,
pts_ns: hdr.pts_ns,
user_flags: hdr.user_flags,
blocks: HashMap::new(),
block_data: BTreeMap::new(),
});
if frame.block_count != block_count || frame.frame_bytes != frame_bytes {
drop(stats);
return Ok(None);
}
if frame.block_data.contains_key(&hdr.block_index) {
return Ok(None); // block already reconstructed; late/duplicate shard
}
// First packet of a block sizes its shard vector; later packets must match its
// (data, recovery, shard_bytes) geometry, so `shard_index` is always in bounds.
frame
.blocks
.entry(hdr.block_index)
.or_insert_with(|| BlockBuf {
data_shards,
recovery_shards,
shard_bytes,
shards: vec![None; total],
received: 0,
done: false,
});
let block = frame.blocks.get_mut(&hdr.block_index).unwrap();
if block.data_shards != data_shards
|| block.recovery_shards != recovery_shards
|| block.shard_bytes != shard_bytes
{
drop(stats);
return Ok(None);
}
if block.shards[shard_index].is_none() {
block.shards[shard_index] = Some(payload);
block.received += 1;
}
// Reconstruct as soon as we hold enough shards.
if !block.done && block.received >= block.data_shards {
let present_data = block.shards[..block.data_shards]
.iter()
.filter(|s| s.is_some())
.count();
let recovered =
coder.reconstruct(block.data_shards, block.recovery_shards, &mut block.shards)?;
block.done = true;
StatsCounters::add(
&stats.fec_recovered_shards,
(block.data_shards - present_data) as u64,
);
// Concatenate the block's data shards into its contiguous payload.
let mut block_payload = Vec::with_capacity(block.data_shards * block.shard_bytes);
for shard in &recovered {
block_payload.extend_from_slice(shard);
}
frame.block_data.insert(hdr.block_index, block_payload);
frame.blocks.remove(&hdr.block_index);
}
// Whole frame ready?
if frame.block_data.len() == frame.block_count {
let frame = self.frames.remove(&hdr.frame_index).unwrap();
self.completed.insert(hdr.frame_index);
// Reserve based on the bytes we actually hold, not the (already-bounded but
// still caller-supplied) frame_bytes, so a small frame can't over-reserve.
let actual: usize = frame.block_data.values().map(|b| b.len()).sum();
let mut data = Vec::with_capacity(actual);
for (_, block_payload) in frame.block_data.into_iter() {
data.extend_from_slice(&block_payload);
}
data.truncate(frame.frame_bytes); // trim trailing-shard zero padding
return Ok(Some(Frame {
data,
frame_index: hdr.frame_index,
pts_ns: frame.pts_ns,
flags: frame.user_flags,
}));
}
Ok(None)
}
/// Track the newest frame and prune stragglers that fell out of the reorder window
/// (counting them as dropped).
fn advance_window(&mut self, frame_index: u32, stats: &StatsCounters) {
let newest = match self.newest_frame {
// `frame_index` is newer iff it's within the forward half of the index space.
Some(n) if frame_index.wrapping_sub(n) > u32::MAX / 2 => n,
_ => frame_index,
};
self.newest_frame = Some(newest);
let before = self.frames.len();
self.frames
.retain(|&idx, _| newest.wrapping_sub(idx) <= REORDER_WINDOW);
let pruned = before - self.frames.len();
if pruned > 0 {
StatsCounters::add(&stats.frames_dropped, pruned as u64);
}
self.completed
.retain(|&idx| newest.wrapping_sub(idx) <= REORDER_WINDOW);
}
/// True if `frame_index` lies behind the newest frame by more than the reorder
/// window (so its shards arrive too late to be useful).
fn is_stale(&self, frame_index: u32) -> bool {
match self.newest_frame {
Some(n) => {
let behind = n.wrapping_sub(frame_index);
behind > REORDER_WINDOW && behind <= u32::MAX / 2
}
None => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::FecScheme;
use crate::fec::coder_for;
fn limits() -> ReassemblerLimits {
ReassemblerLimits {
shard_bytes: 16,
max_data_shards: 8,
max_total_shards: 12,
max_blocks: 4,
max_frame_bytes: 4096,
}
}
fn base_header() -> PacketHeader {
PacketHeader {
pts_ns: 0,
frame_index: 0,
stream_seq: 0,
frame_bytes: 16,
user_flags: 0,
block_index: 0,
block_count: 1,
data_shards: 1,
recovery_shards: 0,
shard_index: 0,
shard_bytes: 16,
magic: LUMEN_MAGIC,
version: 1,
fec_scheme: 0,
flags: FLAG_PIC,
}
}
fn packet(h: PacketHeader) -> Vec<u8> {
let mut p = Vec::new();
p.extend_from_slice(h.as_bytes());
p.extend_from_slice(&vec![0xAB; h.shard_bytes as usize]);
p
}
/// A header advertising 65535+65535 shards must be dropped, not allocate gigabytes.
#[test]
fn rejects_oversized_shard_counts() {
let mut r = Reassembler::new(limits());
let coder = coder_for(FecScheme::Gf8);
let stats = StatsCounters::default();
let mut h = base_header();
h.data_shards = 65535;
h.recovery_shards = 65535;
assert!(r
.push(&packet(h), coder.as_ref(), &stats)
.unwrap()
.is_none());
assert_eq!(stats.snapshot().packets_dropped, 1);
}
/// A second packet for a block whose geometry differs from the first must be dropped
/// — never index past the block's allocated shard vector (the old OOB panic).
#[test]
fn rejects_inconsistent_block_geometry_without_panicking() {
let mut r = Reassembler::new(limits());
let coder = coder_for(FecScheme::Gf8);
let stats = StatsCounters::default();
let mut h1 = base_header();
h1.data_shards = 4;
h1.recovery_shards = 2; // block sized to 6 slots
h1.frame_bytes = 64;
assert!(r
.push(&packet(h1), coder.as_ref(), &stats)
.unwrap()
.is_none());
// Same block, different geometry, shard_index valid for ITS total (8) but past
// the established block's 6 slots.
let mut h2 = base_header();
h2.data_shards = 6;
h2.recovery_shards = 2;
h2.shard_index = 7;
h2.frame_bytes = 64;
assert!(r
.push(&packet(h2), coder.as_ref(), &stats)
.unwrap()
.is_none());
assert_eq!(stats.snapshot().packets_dropped, 1);
}
#[test]
fn rejects_wrong_shard_bytes_and_oversized_frame() {
let coder = coder_for(FecScheme::Gf8);
let mut r = Reassembler::new(limits());
let stats = StatsCounters::default();
let mut h = base_header();
h.shard_bytes = 8; // != negotiated 16
assert!(r
.push(&packet(h), coder.as_ref(), &stats)
.unwrap()
.is_none());
assert_eq!(stats.snapshot().packets_dropped, 1);
let mut r = Reassembler::new(limits());
let stats = StatsCounters::default();
let mut h = base_header();
h.frame_bytes = 1_000_000; // > max_frame_bytes
assert!(r
.push(&packet(h), coder.as_ref(), &stats)
.unwrap()
.is_none());
assert_eq!(stats.snapshot().packets_dropped, 1);
}
}
+198
View File
@@ -0,0 +1,198 @@
//! Session lifecycle and the two hot-path state machines.
//!
//! - **Host** ([`Session::submit_frame`]): encoded access unit → FEC + packetize →
//! optional AES-GCM seal → transport send.
//! - **Client** ([`Session::poll_frame`]): transport recv → optional open → reorder +
//! FEC recover + reassemble → whole access unit.
//!
//! Both directions also carry input: a client [`Session::send_input`]s events; the host
//! drains them with [`Session::poll_input`].
use crate::config::{Config, Role};
use crate::crypto::SessionCrypto;
use crate::error::{LumenError, Result};
use crate::fec::{coder_for, ErasureCoder};
use crate::input::InputEvent;
use crate::packet::{Packetizer, Reassembler, ReassemblerLimits};
use crate::stats::{Stats, StatsCounters};
use crate::transport::Transport;
/// A reassembled, FEC-recovered access unit, ready to hand to the platform decoder.
pub struct Frame {
pub data: Vec<u8>,
pub frame_index: u32,
pub pts_ns: u64,
pub flags: u32,
}
/// One end of a stream. Constructed for a single [`Role`]; calling the other role's
/// methods returns [`LumenError::InvalidArg`].
///
/// Note: the AEAD layer authenticates each datagram but does **not** provide anti-replay.
/// Video replays are largely absorbed by the reassembler's per-frame dedup, but replayed
/// input events are not yet filtered. A sliding-window replay filter keyed on the
/// authenticated sequence belongs with the pairing/handshake layer (M2); until then,
/// rely on the LAN/VPN transport assumption (plan §1).
pub struct Session {
config: Config,
coder: Box<dyn ErasureCoder>,
crypto: Option<SessionCrypto>,
transport: Box<dyn Transport>,
packetizer: Packetizer,
reassembler: Reassembler,
stats: StatsCounters,
/// Monotonic wire sequence, also the AES-GCM nonce counter.
next_seq: u64,
}
impl Session {
pub fn new(config: Config, transport: Box<dyn Transport>) -> Result<Session> {
config.validate()?;
let coder = coder_for(config.fec.scheme);
let crypto = config
.encrypt
.then(|| SessionCrypto::new(&config.key, config.salt, config.role));
let packetizer = Packetizer::new(&config);
let reassembler = Reassembler::new(ReassemblerLimits::from_config(&config));
Ok(Session {
coder,
crypto,
transport,
packetizer,
reassembler,
stats: StatsCounters::default(),
next_seq: 0,
config,
})
}
pub fn role(&self) -> Role {
self.config.role
}
pub fn stats(&self) -> Stats {
self.stats.snapshot()
}
/// Wrap a packet for the wire: when encrypting, prepend the 8-byte big-endian
/// sequence (the receiver derives the GCM nonce from it) then the ciphertext.
fn seal_for_wire(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
match &self.crypto {
Some(c) => {
let ct = c.seal(seq, packet)?;
let mut wire = Vec::with_capacity(8 + ct.len());
wire.extend_from_slice(&seq.to_be_bytes());
wire.extend_from_slice(&ct);
Ok(wire)
}
None => Ok(packet.to_vec()),
}
}
/// Unwrap a wire datagram back into a plaintext packet.
fn open_from_wire(&self, wire: &[u8]) -> Result<Vec<u8>> {
match &self.crypto {
Some(c) => {
if wire.len() < 8 {
return Err(LumenError::BadPacket);
}
let seq = u64::from_be_bytes(wire[..8].try_into().unwrap());
c.open(seq, &wire[8..])
}
None => Ok(wire.to_vec()),
}
}
// -- Host path --------------------------------------------------------
/// Host: FEC-protect, packetize, seal, and send one encoded access unit.
pub fn submit_frame(&mut self, data: &[u8], pts_ns: u64, user_flags: u32) -> Result<()> {
if self.config.role != Role::Host {
return Err(LumenError::InvalidArg(
"submit_frame called on a client session",
));
}
let packets = self
.packetizer
.packetize(data, pts_ns, user_flags, self.coder.as_ref())?;
StatsCounters::add(&self.stats.frames_submitted, 1);
for pkt in packets {
let wire = self.seal_for_wire(&pkt)?;
StatsCounters::add(&self.stats.packets_sent, 1);
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
self.transport.send(&wire)?;
}
Ok(())
}
/// Host: drain one pending input event from the client, if any.
pub fn poll_input(&mut self) -> Result<Option<InputEvent>> {
if self.config.role != Role::Host {
return Err(LumenError::InvalidArg(
"poll_input called on a client session",
));
}
while let Some(wire) = self.transport.recv()? {
let pkt = match self.open_from_wire(&wire) {
Ok(p) => p,
Err(_) => continue, // drop undecryptable noise
};
StatsCounters::add(&self.stats.packets_received, 1);
if let Some(ev) = InputEvent::decode(&pkt) {
return Ok(Some(ev));
}
// Not an input datagram (e.g. stray video) — ignore and keep draining.
}
Ok(None)
}
// -- Client path ------------------------------------------------------
/// Client: drain the transport until a whole access unit is recovered, or no more
/// packets are pending ([`LumenError::NoFrame`]).
pub fn poll_frame(&mut self) -> Result<Frame> {
if self.config.role != Role::Client {
return Err(LumenError::InvalidArg(
"poll_frame called on a host session",
));
}
loop {
let wire = match self.transport.recv()? {
Some(w) => w,
None => return Err(LumenError::NoFrame),
};
let pkt = match self.open_from_wire(&wire) {
Ok(p) => p,
Err(_) => continue,
};
StatsCounters::add(&self.stats.packets_received, 1);
StatsCounters::add(&self.stats.bytes_received, pkt.len() as u64);
// The reassembler validates the packet via its parsed header (`magic`),
// ignoring anything that isn't a well-formed video packet.
if let Some(frame) = self
.reassembler
.push(&pkt, self.coder.as_ref(), &self.stats)?
{
StatsCounters::add(&self.stats.frames_completed, 1);
return Ok(frame);
}
}
}
/// Client: serialize and send one input event to the host.
pub fn send_input(&mut self, event: &InputEvent) -> Result<()> {
if self.config.role != Role::Client {
return Err(LumenError::InvalidArg(
"send_input called on a host session",
));
}
let pkt = event.encode();
let wire = self.seal_for_wire(&pkt)?;
StatsCounters::add(&self.stats.packets_sent, 1);
StatsCounters::add(&self.stats.bytes_sent, wire.len() as u64);
self.transport.send(&wire)?;
Ok(())
}
}
+55
View File
@@ -0,0 +1,55 @@
//! Live counters for the frame-pacing / quality logic and the web UI.
use std::sync::atomic::{AtomicU64, Ordering};
/// Immutable snapshot, copied across the C ABI as `LumenStats`.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Stats {
pub frames_submitted: u64,
pub frames_completed: u64,
pub frames_dropped: u64,
pub packets_sent: u64,
pub packets_received: u64,
pub packets_dropped: u64,
pub fec_recovered_shards: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
}
/// Atomic accumulators owned by a [`Session`](crate::session::Session). Snapshot to
/// [`Stats`] for readers. `Relaxed` ordering is fine: these are monotonic counters
/// read for display, never used to synchronize other memory.
#[derive(Default)]
pub struct StatsCounters {
pub frames_submitted: AtomicU64,
pub frames_completed: AtomicU64,
pub frames_dropped: AtomicU64,
pub packets_sent: AtomicU64,
pub packets_received: AtomicU64,
pub packets_dropped: AtomicU64,
pub fec_recovered_shards: AtomicU64,
pub bytes_sent: AtomicU64,
pub bytes_received: AtomicU64,
}
impl StatsCounters {
#[inline]
pub fn add(counter: &AtomicU64, n: u64) {
counter.fetch_add(n, Ordering::Relaxed);
}
pub fn snapshot(&self) -> Stats {
let l = Ordering::Relaxed;
Stats {
frames_submitted: self.frames_submitted.load(l),
frames_completed: self.frames_completed.load(l),
frames_dropped: self.frames_dropped.load(l),
packets_sent: self.packets_sent.load(l),
packets_received: self.packets_received.load(l),
packets_dropped: self.packets_dropped.load(l),
fec_recovered_shards: self.fec_recovered_shards.load(l),
bytes_sent: self.bytes_sent.load(l),
bytes_received: self.bytes_received.load(l),
}
}
}
@@ -0,0 +1,74 @@
//! In-process transport for unit tests and the C ABI harness. Two cross-wired
//! [`LoopbackTransport`]s form a host↔client link, with optional deterministic loss so
//! tests can exercise FEC recovery without a real network.
use super::Transport;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
/// One direction of the link.
struct Channel {
queue: Mutex<VecDeque<Vec<u8>>>,
/// Drop one of every `drop_period` packets (0 = lossless).
drop_period: u32,
sent: AtomicU64,
dropped: AtomicU64,
}
impl Channel {
fn new(drop_period: u32) -> Arc<Channel> {
Arc::new(Channel {
queue: Mutex::new(VecDeque::new()),
drop_period,
sent: AtomicU64::new(0),
dropped: AtomicU64::new(0),
})
}
}
/// Sends on `tx`, receives on `rx`. Created in cross-wired pairs by [`loopback_pair`].
pub struct LoopbackTransport {
tx: Arc<Channel>,
rx: Arc<Channel>,
}
impl LoopbackTransport {
/// Number of packets this transport's send side has deliberately dropped.
pub fn dropped(&self) -> u64 {
self.tx.dropped.load(Ordering::Relaxed)
}
}
/// Create a connected `(host, client)` pair. `host_drop_period` injects loss on the
/// host→client (video) path; `client_drop_period` on the reverse (input) path.
pub fn loopback_pair(
host_drop_period: u32,
client_drop_period: u32,
) -> (LoopbackTransport, LoopbackTransport) {
let h2c = Channel::new(host_drop_period);
let c2h = Channel::new(client_drop_period);
let host = LoopbackTransport {
tx: h2c.clone(),
rx: c2h.clone(),
};
let client = LoopbackTransport { tx: c2h, rx: h2c };
(host, client)
}
impl Transport for LoopbackTransport {
fn send(&self, packet: &[u8]) -> std::io::Result<()> {
let n = self.tx.sent.fetch_add(1, Ordering::Relaxed);
if self.tx.drop_period != 0 && (n % self.tx.drop_period as u64) == 0 {
// Deterministically drop in flight (the 1st of each `drop_period` group).
self.tx.dropped.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
self.tx.queue.lock().unwrap().push_back(packet.to_vec());
Ok(())
}
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
Ok(self.rx.queue.lock().unwrap().pop_front())
}
}
+15
View File
@@ -0,0 +1,15 @@
//! Pluggable packet I/O. The hot path calls [`Transport::send`] / [`Transport::recv`]
//! directly — no async runtime is involved.
mod loopback;
mod udp;
pub use loopback::{loopback_pair, LoopbackTransport};
pub use udp::UdpTransport;
/// A datagram transport. `recv` is non-blocking: it returns `Ok(None)` when no packet
/// is currently available, so the caller (decode/present thread) never blocks here.
pub trait Transport: Send + Sync {
fn send(&self, packet: &[u8]) -> std::io::Result<()>;
fn recv(&self) -> std::io::Result<Option<Vec<u8>>>;
}
+52
View File
@@ -0,0 +1,52 @@
//! Real UDP datagram transport — native sockets, no async runtime.
//!
//! M1 uses one `recv` syscall per packet; the latency budget (§7) calls for
//! `sendmmsg`/UDP-GSO batching to cut syscalls, which is a P2 optimization layered on
//! this same [`Transport`] seam.
use super::Transport;
use crate::packet::MAX_DATAGRAM_BYTES;
use std::net::UdpSocket;
/// Receive buffer size. `Config::validate` bounds `shard_payload` so a well-formed
/// datagram (header + shard + crypto overhead) always fits in [`MAX_DATAGRAM_BYTES`];
/// the `+ 1` byte lets us detect an oversized datagram (a full read) instead of
/// silently truncating it.
const RECV_BUF: usize = MAX_DATAGRAM_BYTES + 1;
pub struct UdpTransport {
socket: UdpSocket,
}
impl UdpTransport {
/// Bind `local` and `connect` to `peer`, so `send`/`recv` need no address and the
/// kernel filters to this peer. Non-blocking, matching the [`Transport`] contract.
pub fn connect(local: &str, peer: &str) -> std::io::Result<Self> {
let socket = UdpSocket::bind(local)?;
socket.connect(peer)?;
socket.set_nonblocking(true)?;
Ok(UdpTransport { socket })
}
}
impl Transport for UdpTransport {
fn send(&self, packet: &[u8]) -> std::io::Result<()> {
self.socket.send(packet)?;
Ok(())
}
fn recv(&self) -> std::io::Result<Option<Vec<u8>>> {
let mut buf = vec![0u8; RECV_BUF];
match self.socket.recv(&mut buf) {
// A read that fills the whole buffer means the datagram was larger than any
// valid packet — drop it rather than hand a truncated, corrupt packet up.
Ok(n) if n >= RECV_BUF => Ok(None),
Ok(n) => {
buf.truncate(n);
Ok(Some(buf))
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None),
Err(e) => Err(e),
}
}
}
+109
View File
@@ -0,0 +1,109 @@
/*
* lumen-core C ABI harness — M1 acceptance.
*
* Proves the core links from C and round-trips encoded access units through the full
* packetize -> FEC -> in-process loopback (with deterministic packet loss) -> FEC
* recover -> reassemble path, recovering every byte exactly.
*
* Build/run: see tests/c/run.sh (also driven by `cargo test --test c_abi`).
*/
#include "lumen_core.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static LumenConfig make_config(uint32_t role, uint32_t drop_period) {
LumenConfig c;
memset(&c, 0, sizeof(c));
c.struct_size = (uint32_t)sizeof(LumenConfig);
c.role = role; /* 0 = host, 1 = client */
c.phase = 1; /* P1, GameStream-compatible */
c.fec_scheme = 0; /* GF(2^8) */
c.fec_percent = 25;
c.max_data_per_block = 64;
c.shard_payload = 1024;
c.max_frame_bytes = 8 * 1024 * 1024;
c.encrypt = 0;
c.loopback_drop_period = drop_period;
return c;
}
int main(void) {
printf("lumen-core C ABI harness (abi_version=%u)\n", lumen_abi_version());
const uint32_t DROP_PERIOD = 8; /* drop 1 of every 8 packets */
LumenConfig host_cfg = make_config(0, DROP_PERIOD);
LumenConfig client_cfg = make_config(1, DROP_PERIOD);
LumenSession *host = NULL;
LumenSession *client = NULL;
LumenStatus rc = lumen_test_loopback_pair(&host_cfg, &client_cfg, &host, &client);
if (rc != LUMEN_STATUS_OK || !host || !client) {
fprintf(stderr, "FAIL: loopback_pair rc=%d\n", (int)rc);
return 1;
}
const size_t FRAME_LEN = 200000; /* ~196 shards across 4 FEC blocks */
const int FRAMES = 4;
uint8_t *buf = (uint8_t *)malloc(FRAME_LEN);
if (!buf) { fprintf(stderr, "FAIL: oom\n"); return 1; }
int failures = 0;
for (int f = 0; f < FRAMES; f++) {
for (size_t i = 0; i < FRAME_LEN; i++) {
buf[i] = (uint8_t)((i * 131u) + (unsigned)f * 17u);
}
rc = lumen_host_submit_frame(host, buf, FRAME_LEN, (uint64_t)f * 1000000u, 0);
if (rc != LUMEN_STATUS_OK) {
fprintf(stderr, "FAIL: submit frame %d rc=%d\n", f, (int)rc);
failures++;
continue;
}
LumenFrame out;
memset(&out, 0, sizeof(out));
rc = lumen_client_poll_frame(client, &out);
if (rc != LUMEN_STATUS_OK) {
fprintf(stderr, "FAIL: poll frame %d rc=%d (expected recovery)\n", f, (int)rc);
failures++;
continue;
}
if (out.len != FRAME_LEN || memcmp(out.data, buf, FRAME_LEN) != 0) {
fprintf(stderr, "FAIL: frame %d mismatch (len=%zu want=%zu)\n",
f, (size_t)out.len, FRAME_LEN);
failures++;
continue;
}
if (out.frame_index != (uint32_t)f) {
fprintf(stderr, "FAIL: frame %d wrong index %u\n", f, out.frame_index);
failures++;
}
}
LumenStats st;
memset(&st, 0, sizeof(st));
lumen_get_stats(client, &st);
printf("client stats: completed=%llu recovered_shards=%llu dropped_pkts=%llu rx_pkts=%llu\n",
(unsigned long long)st.frames_completed,
(unsigned long long)st.fec_recovered_shards,
(unsigned long long)st.packets_dropped,
(unsigned long long)st.packets_received);
if (st.fec_recovered_shards == 0) {
fprintf(stderr, "FAIL: expected FEC to recover lost shards, but recovered 0\n");
failures++;
}
free(buf);
lumen_session_free(host);
lumen_session_free(client);
if (failures == 0) {
printf("PASS: %d frames round-tripped byte-exact through lossy loopback\n", FRAMES);
return 0;
}
fprintf(stderr, "FAILED with %d errors\n", failures);
return 1;
}
+34
View File
@@ -0,0 +1,34 @@
#!/usr/bin/env bash
# Build lumen-core's staticlib, then compile + link + run the C ABI harness against it.
# Proves the core links from C. Works on Linux and macOS (link flags come from rustc).
set -euo pipefail
here="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ws="$(cd "$here/../../../.." && pwd)" # tests/c -> crates/lumen-core -> crates -> ws
cd "$ws"
profile="${1:-debug}"
build_flag=""
[ "$profile" = "release" ] && build_flag="--release"
echo ">> building lumen-core staticlib ($profile)"
cargo build -p lumen-core $build_flag >/dev/null
staticlib="$ws/target/$profile/liblumen_core.a"
header_dir="$ws/include"
[ -f "$staticlib" ] || { echo "missing $staticlib"; exit 1; }
[ -f "$header_dir/lumen_core.h" ] || { echo "missing generated header"; exit 1; }
# Ask rustc what native libs the staticlib needs to link into a C program.
native_libs="$(cargo rustc -p lumen-core --lib --crate-type staticlib $build_flag -- \
--print native-static-libs 2>&1 | sed -n 's/.*native-static-libs: //p' | tail -1)"
echo ">> native libs: ${native_libs:-<none>}"
out="$(mktemp -d)/lumen_harness"
cc="${CC:-cc}"
echo ">> compiling + linking harness"
$cc -std=c11 -Wall -Wextra -O2 -I "$header_dir" \
"$here/harness.c" "$staticlib" $native_libs -o "$out"
echo ">> running"
"$out"
+85
View File
@@ -0,0 +1,85 @@
//! Runs the C ABI harness under `cargo test`: compiles `tests/c/harness.c`, links it
//! against the freshly built `liblumen_core.a`, and asserts it round-trips frames
//! through the lossy loopback. The cross-platform canonical path (querying rustc for
//! link flags) is `tests/c/run.sh`; this mirrors it so `cargo test` alone covers the
//! C boundary.
use std::path::{Path, PathBuf};
use std::process::Command;
/// Native libs the Rust staticlib needs, minus the ones `cc` already links by default
/// (`-lSystem`/`-lc`), to avoid duplicate-library linker warnings. See
/// `rustc --print native-static-libs`.
fn native_libs() -> &'static [&'static str] {
if cfg!(target_os = "macos") {
&["-liconv", "-lm"]
} else if cfg!(target_os = "linux") {
&["-lgcc_s", "-lutil", "-lrt", "-lpthread", "-lm", "-ldl"]
} else {
&[]
}
}
fn ensure_staticlib(profile_dir: &Path) -> PathBuf {
let staticlib = profile_dir.join("liblumen_core.a");
if !staticlib.exists() {
// `cargo test` doesn't always emit the standalone staticlib; build it. The
// outer cargo's build lock is released during test execution, so this is safe.
let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".into());
let _ = Command::new(cargo)
.args(["build", "-p", "lumen-core"])
.status();
}
staticlib
}
#[test]
fn c_abi_harness_round_trips() {
let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")); // crates/lumen-core
let harness = manifest.join("tests/c/harness.c");
let include = manifest.join("../../include");
let exe = std::env::current_exe().expect("current_exe");
// .../target/<profile>/deps/c_abi-<hash> -> target/<profile>
let profile_dir = exe
.parent()
.and_then(Path::parent)
.expect("profile dir")
.to_path_buf();
let staticlib = ensure_staticlib(&profile_dir);
assert!(
staticlib.exists(),
"staticlib not found at {} (run `cargo build -p lumen-core`)",
staticlib.display()
);
assert!(
include.join("lumen_core.h").exists(),
"generated header missing; build lumen-core to regenerate it"
);
let cc = std::env::var("CC").unwrap_or_else(|_| "cc".into());
let out = profile_dir.join("lumen_c_harness");
let mut compile = Command::new(&cc);
compile
.args(["-std=c11", "-Wall", "-Wextra", "-O2", "-I"])
.arg(&include)
.arg(&harness)
.arg(&staticlib)
.args(native_libs())
.arg("-o")
.arg(&out);
match compile.status() {
Ok(s) => assert!(s.success(), "C harness failed to compile/link"),
Err(e) => {
// No C toolchain (unusual) — don't fail the whole suite; run.sh covers CI.
eprintln!("skipping C ABI test: cannot invoke `{cc}`: {e}");
return;
}
}
let run = Command::new(&out).status().expect("run C harness");
assert!(run.success(), "C harness reported a round-trip failure");
}
+185
View File
@@ -0,0 +1,185 @@
//! M1 acceptance: round-trip access units through the full host→client path
//! (packetize → FEC → loopback with simulated loss → recover → reassemble) and assert
//! byte-exact recovery, for both FEC schemes, with and without encryption. Plus
//! property tests over the FEC layer's loss patterns.
use lumen_core::config::{Config, FecConfig, FecScheme, ProtocolPhase, Role};
use lumen_core::fec::coder_for;
use lumen_core::input::{InputEvent, InputKind};
use lumen_core::session::Session;
use lumen_core::transport::loopback_pair;
use proptest::prelude::*;
fn config(role: Role, scheme: FecScheme, encrypt: bool, drop_period: u32) -> Config {
Config {
role,
phase: match scheme {
FecScheme::Gf8 => ProtocolPhase::P1GameStream,
FecScheme::Gf16 => ProtocolPhase::P2Lumen,
},
fec: FecConfig {
scheme,
fec_percent: 25,
max_data_per_block: 32,
},
shard_payload: 1024,
max_frame_bytes: 8 * 1024 * 1024,
encrypt,
key: [7u8; 16],
salt: [1, 2, 3, 4],
loopback_drop_period: drop_period,
}
}
/// Drive `frames` access units host→client over a lossy loopback and assert each one
/// comes back byte-identical. Returns the client's final stats.
fn run_stream(
scheme: FecScheme,
encrypt: bool,
drop_period: u32,
frames: &[Vec<u8>],
) -> lumen_core::Stats {
let (host_tp, client_tp) = loopback_pair(drop_period, 0);
let mut host = Session::new(
config(Role::Host, scheme, encrypt, drop_period),
Box::new(host_tp),
)
.unwrap();
let mut client = Session::new(
config(Role::Client, scheme, encrypt, drop_period),
Box::new(client_tp),
)
.unwrap();
for (i, frame) in frames.iter().enumerate() {
host.submit_frame(frame, i as u64 * 1_000_000, 0).unwrap();
let got = client
.poll_frame()
.expect("frame should recover despite loss");
assert_eq!(&got.data, frame, "frame {i} mismatched after recovery");
assert_eq!(got.frame_index, i as u32);
assert_eq!(got.pts_ns, i as u64 * 1_000_000);
}
client.stats()
}
fn sample_frames() -> Vec<Vec<u8>> {
(0..5usize)
.map(|f| {
let len = 1 + f * 40_000; // 1, 40k, 80k, 120k, 160k → single- and multi-block
(0..len)
.map(|b| (b.wrapping_mul(31).wrapping_add(f * 7)) as u8)
.collect()
})
.collect()
}
#[test]
fn gf8_stream_recovers_under_loss() {
let frames = sample_frames();
// drop_period 8 deletes the 1st of every 8 packets → real data-shard loss.
let stats = run_stream(FecScheme::Gf8, false, 8, &frames);
assert_eq!(stats.frames_completed, frames.len() as u64);
assert!(
stats.fec_recovered_shards > 0,
"loss should have forced FEC recovery"
);
}
#[test]
fn gf16_stream_recovers_under_loss() {
let frames = sample_frames();
let stats = run_stream(FecScheme::Gf16, false, 8, &frames);
assert_eq!(stats.frames_completed, frames.len() as u64);
assert!(stats.fec_recovered_shards > 0);
}
#[test]
fn encrypted_stream_recovers_under_loss() {
let frames = sample_frames();
let stats = run_stream(FecScheme::Gf8, true, 8, &frames);
assert_eq!(stats.frames_completed, frames.len() as u64);
}
#[test]
fn lossless_stream_is_exact() {
let frames = sample_frames();
let stats = run_stream(FecScheme::Gf16, false, 0, &frames);
assert_eq!(stats.frames_completed, frames.len() as u64);
assert_eq!(
stats.fec_recovered_shards, 0,
"no loss → nothing to recover"
);
}
#[test]
fn input_round_trips_client_to_host() {
let (host_tp, client_tp) = loopback_pair(0, 0);
let mut host = Session::new(
config(Role::Host, FecScheme::Gf8, false, 0),
Box::new(host_tp),
)
.unwrap();
let mut client = Session::new(
config(Role::Client, FecScheme::Gf8, false, 0),
Box::new(client_tp),
)
.unwrap();
let sent = InputEvent {
kind: InputKind::MouseMove,
_pad: [0; 3],
code: 0,
x: -7,
y: 13,
flags: 0,
};
client.send_input(&sent).unwrap();
let got = host
.poll_input()
.unwrap()
.expect("host should receive the input event");
assert_eq!(got, sent);
}
// ---- property tests over the FEC layer --------------------------------------
proptest! {
/// For random shard counts and an erasure set within the recovery budget, every
/// original shard is reconstructed byte-identically — for both backends.
#[test]
fn fec_recovers_any_loss_within_budget(
k in 1usize..40,
extra in 0usize..16, // recovery beyond the bare minimum
shard_half in 1usize..64, // shard_len = 2*shard_half (even)
seed in any::<u64>(),
) {
let m = (extra + 1).min(40);
let shard_len = shard_half * 2;
for coder in [coder_for(FecScheme::Gf8), coder_for(FecScheme::Gf16)] {
// Gf8 ceiling: data + recovery <= 255.
if matches!(coder.scheme(), FecScheme::Gf8) && k + m > 255 { continue; }
let data: Vec<Vec<u8>> = (0..k)
.map(|i| (0..shard_len).map(|b| (i ^ b).wrapping_add(seed as usize) as u8).collect())
.collect();
let recovery = coder.encode(&data, m).unwrap();
let mut received: Vec<Option<Vec<u8>>> =
data.iter().cloned().map(Some).chain(recovery.into_iter().map(Some)).collect();
// Erase up to `m` shards chosen by a cheap PRNG over the seed.
let total = k + m;
let lose = (seed as usize % (m + 1)).min(m);
let mut s = seed | 1;
for _ in 0..lose {
s = s.wrapping_mul(6364136223846793005).wrapping_add(1);
let idx = (s >> 33) as usize % total;
received[idx] = None;
}
let restored = coder.reconstruct(k, m, &mut received).unwrap();
prop_assert_eq!(restored, data);
}
}
}
+27
View File
@@ -0,0 +1,27 @@
[package]
name = "lumen-host"
description = "lumen Linux streaming host: virtual display, capture, encode, input injection"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
[dependencies]
lumen-core = { path = "../lumen-core" }
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Linux backends are wired in M0/M2. They live behind `#[cfg(target_os = "linux")]`
# so the workspace stays green on macOS; the dep list (per plan §4) is:
#
# [target.'cfg(target_os = "linux")'.dependencies]
# pipewire = "..." # ScreenCast portal stream -> dmabuf (capture)
# ashpd = "..." # xdg-desktop-portal: ScreenCast, RemoteDesktop
# zbus = "..." # DBus: KWin/Mutter virtual-output creation
# ffmpeg-next / rsmpeg # VAAPI / NVENC encode, dmabuf import
# reis = "..." # libei input injection (Wayland-native)
# input-linux = "..." # uinput fallback
# axum + tokio # web config / pairing API
+32
View File
@@ -0,0 +1,32 @@
//! Frame capture (plan §7). On Linux: a PipeWire ScreenCast portal stream delivering
//! dmabuf frames with no copy to the CPU. The encoder imports the dmabuf directly.
use anyhow::Result;
/// A captured frame. For zero-copy the real type wraps a dmabuf fd + modifier; the CPU
/// buffer is only a fallback path (plan §9 risk: per-GPU dmabuf import quirks).
pub struct CapturedFrame {
pub width: u32,
pub height: u32,
pub pts_ns: u64,
/// Fallback CPU pixels (empty when a dmabuf is used).
pub cpu_bytes: Vec<u8>,
}
/// Produces frames from a captured output. Lives on its own thread, feeding the encoder
/// over a bounded drop-oldest channel (never block the compositor).
pub trait Capturer: Send {
fn next_frame(&mut self) -> Result<CapturedFrame>;
}
/// Open a capturer for a PipeWire node id (from the ScreenCast portal).
pub fn open_pipewire(_node_id: u32) -> Result<Box<dyn Capturer>> {
#[cfg(target_os = "linux")]
{
anyhow::bail!("pipewire capture not yet implemented (M0)")
}
#[cfg(not(target_os = "linux"))]
{
anyhow::bail!("capture requires Linux + PipeWire")
}
}
+40
View File
@@ -0,0 +1,40 @@
//! Hardware video encode (plan §7). Binds FFmpeg (VAAPI / NVENC); never rewrites codecs.
//! Low-latency preset, lookahead off, dmabuf import for zero-copy from [`crate::capture`].
use crate::capture::CapturedFrame;
use anyhow::Result;
/// An encoded access unit (one NAL/AU) to hand to `lumen_core` for FEC + packetization.
pub struct EncodedFrame {
pub data: Vec<u8>,
pub pts_ns: u64,
/// True for IDR/keyframes (sets the SOF/keyframe wire flags).
pub keyframe: bool,
}
/// Codec selection negotiated with the client.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Codec {
H264,
H265,
Av1,
}
/// A hardware encoder. One per session; runs on the encode thread.
pub trait Encoder: Send {
fn submit(&mut self, frame: &CapturedFrame) -> Result<()>;
/// Pull the next encoded AU if one is ready.
fn poll(&mut self) -> Result<Option<EncodedFrame>>;
}
/// Open an encoder. `bitrate_bps` and `codec` come from session negotiation.
pub fn open(_codec: Codec, _bitrate_bps: u64) -> Result<Box<dyn Encoder>> {
#[cfg(target_os = "linux")]
{
anyhow::bail!("VAAPI/NVENC encode not yet implemented (M0)")
}
#[cfg(not(target_os = "linux"))]
{
anyhow::bail!("encode requires Linux (VAAPI/NVENC via FFmpeg)")
}
}
+30
View File
@@ -0,0 +1,30 @@
//! Input injection (plan §4): turn client [`lumen_core::input::InputEvent`]s into host
//! input. Wayland-native via libei (`reis`) first; uinput as the universal fallback.
use anyhow::Result;
use lumen_core::input::InputEvent;
/// Injects input events into the host session.
pub trait InputInjector: Send {
fn inject(&mut self, event: &InputEvent) -> Result<()>;
}
/// Preferred injection backend.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Backend {
/// libei via `reis` — Wayland-native (RemoteDesktop portal).
Libei,
/// `/dev/uinput` — universal fallback, always available.
Uinput,
}
pub fn open(_backend: Backend) -> Result<Box<dyn InputInjector>> {
#[cfg(target_os = "linux")]
{
anyhow::bail!("libei/uinput injection not yet implemented (M2)")
}
#[cfg(not(target_os = "linux"))]
{
anyhow::bail!("input injection requires Linux (libei/uinput)")
}
}
+61
View File
@@ -0,0 +1,61 @@
//! `lumen-host` — the Linux streaming host (plan §2, §6, §7).
//!
//! Creates a client-sized virtual display, captures it via PipeWire, encodes with
//! VAAPI/NVENC, and hands encoded access units to `lumen_core` for FEC + packetization +
//! pacing + send. Input flows back via libei/uinput. The platform backends are
//! `#[cfg(target_os = "linux")]`; the crate compiles everywhere so the workspace builds
//! on non-Linux dev machines — it just can't run the pipeline there.
//!
//! Status: scaffold. M0 wires capture→encode→file; M2 wires the full P1 host that a
//! stock Moonlight client connects to.
// Scaffold: trait methods and config paths are defined ahead of their backends.
#![allow(dead_code)]
mod capture;
mod encode;
mod inject;
mod pipeline;
mod vdisplay;
mod web;
use vdisplay::{Compositor, Mode};
fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.init();
tracing::info!(
"lumen-host scaffold (lumen_core ABI v{})",
lumen_core::ABI_VERSION
);
// The intended startup sequence (each step is a separate, pluggable subsystem):
// 1. negotiate mode + codec + FEC scheme over the control plane (web::WebConfig)
// 2. vdisplay::open(compositor).create(mode) -> client-sized virtual output
// 3. capture::open_pipewire(node) ; encode::open(codec, bitrate)
// 4. build a lumen_core::Session (host role) over a UDP transport to the client
// 5. loop pipeline::pump_once(..) until disconnect, then destroy the output
let target_mode = Mode {
width: 2560,
height: 1440,
refresh_hz: 240,
};
let compositor = Compositor::Kwin; // MVP target
if cfg!(target_os = "linux") {
tracing::info!(
?compositor,
?target_mode,
"would create a virtual output and start streaming (backends pending M0/M2)"
);
} else {
tracing::warn!(
"this is a Linux host; on {} only the shared lumen_core builds and is testable",
std::env::consts::OS
);
}
}
+39
View File
@@ -0,0 +1,39 @@
//! The host hot path (plan §7), wiring the platform stages to `lumen_core`:
//!
//! ```text
//! capture(dmabuf) → encode(NVENC/VAAPI) → core[FEC+packetize+pace+send]
//! ```
//!
//! Each stage runs on its own native OS thread, connected by bounded SPSC channels with
//! drop-oldest on overflow so the encoder is never blocked. No async runtime here.
use crate::capture::Capturer;
use crate::encode::{EncodedFrame, Encoder};
use anyhow::Result;
use lumen_core::packet::{FLAG_PIC, FLAG_SOF};
use lumen_core::Session;
/// Drive one capture→encode→submit step. The real pipeline spawns threads and uses
/// bounded channels; this documents the data flow and the `lumen_core` submit contract.
pub fn pump_once(
capturer: &mut dyn Capturer,
encoder: &mut dyn Encoder,
session: &mut Session,
) -> Result<()> {
let frame = capturer.next_frame()?;
encoder.submit(&frame)?;
while let Some(EncodedFrame {
data,
pts_ns,
keyframe,
}) = encoder.poll()?
{
let mut flags = FLAG_PIC as u32;
if keyframe {
flags |= FLAG_SOF as u32;
}
// core does FEC + packetize + pace + send.
session.submit_frame(&data, pts_ns, flags)?;
}
Ok(())
}
+96
View File
@@ -0,0 +1,96 @@
//! Virtual display orchestration (plan §6) — the project's differentiator.
//!
//! A [`VirtualDisplay`] creates a client-sized output on demand, to be captured and
//! streamed, then torn down on disconnect. Two deployment models exist (Model A: attach
//! to the running session; Model B: dedicated headless session); both sit behind this
//! trait so compositors are pluggable and a stuck one never blocks the project.
//!
//! Backends are `#[cfg(target_os = "linux")]` and currently stubs (see the per-backend
//! modules). The MVP target is KWin; a wlroots spike validates the pipeline first.
use anyhow::Result;
pub use lumen_core::Mode;
/// Opaque handle to a created virtual output, returned by [`VirtualDisplay::create`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct OutputHandle(pub u64);
/// Pluggable virtual-output creation, per compositor.
pub trait VirtualDisplay: Send {
/// Human-readable backend name (e.g. `"kwin"`, `"wlroots"`, `"mutter"`).
fn name(&self) -> &'static str;
/// Create a virtual output of the given mode.
fn create(&mut self, mode: Mode) -> Result<OutputHandle>;
/// Destroy a previously created output.
fn destroy(&mut self, handle: OutputHandle) -> Result<()>;
}
/// Compositors lumen knows how to drive (plan §6).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Compositor {
/// KWin / Plasma 6 — MVP target (matches the CachyOS/KDE daily driver).
Kwin,
/// wlroots (Sway/Hyprland) — fastest to prototype the pipeline.
Wlroots,
/// Mutter / GNOME — headless backend + Mutter DBus.
Mutter,
}
/// Detect or select a backend and return its driver.
pub fn open(compositor: Compositor) -> Result<Box<dyn VirtualDisplay>> {
#[cfg(target_os = "linux")]
{
match compositor {
Compositor::Kwin => Ok(Box::new(linux::kwin::KwinDisplay::new()?)),
Compositor::Wlroots => Ok(Box::new(linux::wlroots::WlrootsDisplay::new()?)),
Compositor::Mutter => Ok(Box::new(linux::mutter::MutterDisplay::new()?)),
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = compositor;
anyhow::bail!("virtual displays require Linux (Wayland compositor)")
}
}
#[cfg(target_os = "linux")]
mod linux {
//! Linux backends. TODO(M2): drive KWin via DBus (study KRdp's source for the
//! virtual-output path); wlroots via `create_output` on the headless backend;
//! Mutter via `org.gnome.Mutter.*`.
macro_rules! stub_backend {
($modname:ident, $ty:ident, $name:literal) => {
pub mod $modname {
use super::super::{Mode, OutputHandle, VirtualDisplay};
use anyhow::Result;
pub struct $ty;
impl $ty {
pub fn new() -> Result<Self> {
Ok($ty)
}
}
impl VirtualDisplay for $ty {
fn name(&self) -> &'static str {
$name
}
fn create(&mut self, _mode: Mode) -> Result<OutputHandle> {
anyhow::bail!(concat!(
$name,
" virtual-output creation not yet implemented"
))
}
fn destroy(&mut self, _handle: OutputHandle) -> Result<()> {
anyhow::bail!(concat!(
$name,
" virtual-output destroy not yet implemented"
))
}
}
}
};
}
stub_backend!(kwin, KwinDisplay, "kwin");
stub_backend!(wlroots, WlrootsDisplay, "wlroots");
stub_backend!(mutter, MutterDisplay, "mutter");
}
+24
View File
@@ -0,0 +1,24 @@
//! Web config / pairing API (plan §4) — control plane only. This is where `tokio`/`axum`
//! are permitted; the per-frame pipeline never touches them. Serves pairing, client
//! identity/permissions, and surfaces [`lumen_core::Stats`] for the measurement UI (M3).
use anyhow::Result;
/// Control-plane configuration server. Stub until the pairing/RTSP surface is scoped
/// (plan §12 action 4: confirm exactly which serverinfo/RTSP/pairing messages a current
/// Moonlight client needs for P1).
pub struct WebConfig {
pub bind: String,
}
impl WebConfig {
pub fn new(bind: impl Into<String>) -> Self {
WebConfig { bind: bind.into() }
}
/// Run the control-plane server. TODO(M2): axum + tokio; GameStream `serverinfo`,
/// pairing handshake, RTSP SETUP with the `lumen/1` capability flag for negotiation.
pub fn run(&self) -> Result<()> {
anyhow::bail!("web/pairing control plane not yet implemented (M2)")
}
}
+296
View File
@@ -0,0 +1,296 @@
# lumen — Implementation Plan
*A ground-up low-latency desktop streaming stack, built Linux-first, with a shared Rust protocol core and native clients per platform.*
> `lumen` is a placeholder codename — rename freely. It fits the lowercase house style (`unom`, `played`, `remplir`) and reads as "glass-to-glass light," which is the whole point.
---
## 0. The thesis (why this is worth building)
Two concrete gaps justify a new project rather than another fork:
1. **The 1 Gbps wall is a FEC design limit, not a bandwidth limit.** Moonlight/Sunshine protect each frame with ReedSolomon over GF(2⁸), which caps a block at 255 shards. At 5120×1440@240 that ceiling is hit around 1 Gbps. Switching the erasure code to **Leopard-RS over GF(2¹⁶)** (via the `reed-solomon-simd` crate) raises the per-block shard limit to 65,536 and runs in O(n log n) with SIMD. The wall disappears as a *consequence* of a better core, not as a hack.
2. **Linux software virtual displays are a real, unfilled gap.** The compositor-side capability now exists (Mutter headless virtual monitors since GNOME 40; wlroots headless outputs; KWin virtual outputs in Plasma 6), but no streaming host *drives* those APIs to create a client-sized output on demand, capture it via PipeWire, and route input back via libei. Apollo's virtual display is Windows-only. This is the immediate, shippable win.
**Strategic ordering:** ship the Linux virtual-display host speaking the *existing* Moonlight protocol first (every Moonlight/Artemis client works on day one, no client to write). Only then introduce the new GF(2¹⁶) transport as a negotiated protocol extension with our own clients. Value early, hard parts deferred until de-risked.
---
## 1. Scope & non-goals
**In scope (eventually):**
- Linux streaming host with on-demand software virtual displays (KWin first, then wlroots, then Mutter).
- A shared Rust protocol/transport/FEC core exposed over a stable C ABI.
- A modern transport that removes the 1 Gbps ceiling.
- Native clients: Rust (Linux), Swift (macOS/iOS), Kotlin (Android) — all linking the same core.
**Explicit non-goals (at least at first):**
- Windows *host* support (Sunshine/Apollo already do this well; no gap to fill).
- Internet/NAT-traversal relay infrastructure (LAN/VPN first; you already run Headscale/NetBird — lean on that).
- Reinventing encoders/decoders (bind to FFmpeg + vendor SDKs; never rewrite codecs).
- A bespoke compositor (drive existing ones; only consider a dedicated headless compositor as a *deployment mode*, see §6).
---
## 2. Architecture overview
```mermaid
flowchart TD
subgraph Host["Linux Host (Rust)"]
VD["Virtual display orchestrator<br/>(KWin / wlroots / Mutter)"]
CAP["Capture<br/>(PipeWire / dmabuf)"]
ENC["Encoder<br/>(VAAPI / NVENC via FFmpeg)"]
VD --> CAP --> ENC
ENC --> COREH
IN_H["Input injector<br/>(libei / uinput)"]
COREH["lumen-core (C ABI)<br/>protocol · FEC · pacing · crypto"]
COREH --> IN_H
end
COREH <-->|"UDP+FEC video / QUIC control+audio"| COREC
subgraph Client["Client (Rust / Swift / Kotlin)"]
COREC["lumen-core (same crate, C ABI)"]
DEC["Decoder<br/>(VideoToolbox / NVDEC / VAAPI)"]
PRES["Present + frame pacing"]
INP["Input capture"]
COREC --> DEC --> PRES
INP --> COREC
end
```
**The load-bearing decision:** `lumen-core` is one crate, compiled once, linked by every host and client through a C ABI. Protocol logic, FEC, packet pacing, jitter buffering, pairing, and crypto live there and exist exactly once. Platform code (capture, encode, decode, present, input, UI) lives outside the core and is written in whatever language suits the platform.
---
## 3. Protocol strategy (three phases)
| Phase | Protocol | Clients that work | Bitrate ceiling | Purpose |
|------|----------|-------------------|-----------------|---------|
| **P1** | GameStream-compatible (existing Moonlight wire format) | All existing Moonlight/Artemis clients | ~1 Gbps (legacy GF(2⁸) FEC) | Ship the Linux virtual-display win with zero client work |
| **P2** | `lumen/1` negotiated extension: GF(2¹⁶) FEC, multi-block framing, optional QUIC control | lumen clients only; falls back to P1 for others | Multi-Gbps | Break the wall; introduce native clients |
| **P3** | `lumen/1` as primary; GameStream kept as compat shim | lumen everywhere, Moonlight as fallback | Multi-Gbps | Full control of features (mic passthrough, per-client identity, HDR signalling) |
Negotiation: extend the `serverinfo`/RTSP `SETUP` handshake with a capability flag. Old clients never see the flag and get P1 behavior. This is how Apollo/Artemis diverge cleanly, and it keeps you compatible while you build.
---
## 4. Tech stack (settled)
**Language split:** Rust for the core and all non-Apple platform code; Swift only for the macOS/iOS client UI + VideoToolbox/Metal; Kotlin for Android UI + MediaCodec. The C ABI is the seam.
**Threading:** native OS threads for the video hot path. `tokio` is allowed *only* for the control plane (pairing, web config, QUIC control stream). The per-frame pipeline must never touch an async runtime.
### Core crate dependencies
| Concern | Crate | Notes |
|--------|-------|-------|
| FEC | `reed-solomon-simd` (v3+) | Leopard/GF(2¹⁶), SIMD, O(n log n) — the wall-breaker |
| QUIC (control/audio) | `quinn` | Datagram ext for audio; reliable streams for control |
| TLS / crypto | `rustls` + `ring` (or `aws-lc-rs`) | Pairing, session keys (AES-GCM to match GameStream in P1) |
| Serialization | `zerocopy` / `bytes` | Wire structs `#[repr(C)]`, zero-copy parse |
| C header gen | `cbindgen` | Generates `lumen_core.h` from the ABI module |
| Error/log | `tracing` | Structured; feature-gate off the hot path |
### Linux host dependencies
| Concern | Crate / API | Notes |
|--------|-------------|-------|
| Capture | `pipewire` (pipewire-rs) | ScreenCast portal stream → dmabuf |
| Portal / DBus | `ashpd` + `zbus` | xdg-desktop-portal: ScreenCast, RemoteDesktop |
| Encode | `ffmpeg-next` or `rsmpeg` | VAAPI / NVENC, dmabuf import (zero-copy) |
| Input inject | `reis` (libei) + `input-linux` (uinput fallback) | Wayland-native first, uinput as universal fallback |
| Virtual output | per-compositor (see §6) | KWin DBus / Sway `create_output` / Mutter DBus |
| Web config | `axum` + `tokio` + small Vite/React UI | You own this stack already |
### Apple client (P2+)
Swift + VideoToolbox (decode) + Metal (present) + SwiftUI. Imports `lumen_core.h` directly via a module map — no glue layer.
### Ruled out
- **Swift for the host/core:** no Linux Wayland/PipeWire/DRM/VAAPI ecosystem; ARC in hot loops. (Excellent *Apple-client* language, wrong for systems/Linux.)
- **Go:** GC disqualifies the hot path.
- **C++:** throws away the safety/concurrency wins that justified greenfield over forking.
- **Zig:** best-in-class C interop, but pre-1.0 with no Wayland/QUIC ecosystem — too much risk for a multi-month build. Revisit later if desired.
---
## 5. The C ABI boundary
Design it on day one; retrofitting an ABI is painful.
**Principles**
- Opaque handles only across the boundary: `LumenSession*`, never Rust types.
- All cross-boundary structs are `#[repr(C)]`; primitives + pointer/len pairs for buffers.
- Async events via registered C callbacks (`fn ptr` + `void* userdata`).
- Explicit, documented ownership: who frees what, when. Provide `lumen_*_free` for every allocation that crosses out.
- Versioned ABI: `uint32_t lumen_abi_version(void)` + a `LumenConfig` struct whose first field is its own size for forward-compat.
**Minimal surface (sketch)**
```c
// lifecycle
LumenSession* lumen_session_new(const LumenConfig* cfg);
void lumen_session_free(LumenSession*);
// host: feed an encoded access unit (the core does FEC + packetize + pace + send)
int lumen_host_submit_frame(LumenSession*, const uint8_t* data, size_t len,
uint64_t pts_ns, LumenFrameFlags flags);
// client: pull a reassembled, FEC-recovered access unit ready to decode
int lumen_client_poll_frame(LumenSession*, LumenFrame* out /*borrowed until next poll*/);
// input (both directions): client captures, host receives via callback
int lumen_send_input(LumenSession*, const LumenInputEvent*);
void lumen_set_input_callback(LumenSession*, LumenInputCb, void* user);
// stats for the frame-pacing/quality logic and the web UI
void lumen_get_stats(LumenSession*, LumenStats* out);
```
Keep it this small. Everything platform-specific (how you got the encoded bytes, how you decode them) stays on the platform side.
---
## 6. Virtual display orchestration
This is the differentiator and the most fragmented part. Two deployment models — support both eventually, pick one for the MVP.
**Model A — Attach to the running session.** Create a client-sized virtual output *inside the user's live desktop*, stream it, tear it down on disconnect. This is "add a monitor to my actual PC." Best UX, hardest because it depends on per-compositor runtime APIs.
**Model B — Dedicated headless session.** Spawn a separate headless compositor purely for the stream (e.g. `gnome-shell --headless --virtual-monitor WxH`, or a headless wlroots compositor). Cleaner isolation, sidesteps runtime-output APIs, ideal for "remote second PC." Worse for "mirror/extend my real desktop."
**Per-compositor (Model A) runtime virtual-output creation:**
- **KWin / Plasma 6 (recommended MVP target — matches your CachyOS/KDE daily driver and where the gap is loudest):** KWin can create virtual outputs; KRdp already does this internally for remote sessions. Drive it via the KWin DBus interface; capture via `xdg-desktop-portal-kde` ScreenCast (PipeWire); inject input via the RemoteDesktop portal or `reis`.
- **wlroots (Sway/Hyprland — fastest to *prototype* the pipeline):** enable the headless backend (`WLR_BACKENDS=…,headless`), then `swaymsg create_output` / `hyprctl output create headless`. Capture via `wlr-screencopy` or the portal. Simplest API; good for validating capture→encode→send before fighting KWin/Mutter.
- **Mutter / GNOME:** virtual monitors via the headless backend; runtime creation via Mutter DBus (`org.gnome.Mutter.*` — partly experimental). Capture via `xdg-desktop-portal-gnome` ScreenCast.
**Recommendation:** do a 12 day wlroots spike to prove the *pipeline*, then build the real MVP on KWin because that's your deployment target. Abstract virtual-output creation behind a trait so compositors are pluggable:
```rust
trait VirtualDisplay {
fn create(&self, mode: Mode) -> Result<OutputHandle>;
fn destroy(&self, h: OutputHandle) -> Result<()>;
}
```
---
## 7. The hot path: pipeline & latency budget
Per-frame pipeline, each stage on its own thread, connected by bounded SPSC channels (drop-oldest on overflow, never block the encoder):
```
capture(dmabuf) → encode(NVENC/VAAPI) → core[FEC+packetize+pace+send]
│ network
client: recv → core[reorder+FEC recover+jitter] → decode → present
```
**Glass-to-glass budget (LAN, 240 Hz = 4.17 ms/frame):**
| Stage | Target | Notes |
|------|--------|-------|
| Capture latency | ≤ 1 frame | dmabuf, no copy to CPU |
| Encode | 14 ms | NVENC low-latency preset; tune lookahead off |
| FEC + packetize | < 1 ms | SIMD RS; pre-allocated shard buffers |
| Network (LAN) | < 1 ms | `sendmmsg` / UDP GSO to cut syscalls |
| Jitter buffer | 01 frame | adaptive; minimum that hides observed jitter |
| FEC recover + reassemble | < 1 ms | only when loss occurs |
| Decode | 14 ms | hardware decoder |
| Present | ≤ 1 frame | align to client vsync |
**Target: 1535 ms glass-to-glass on LAN.** The art is *frame pacing* — matching capture/encode cadence to the client's actual refresh and keeping the jitter buffer as small as the link allows. This, not the codec, is what separates good from bad streaming. Budget real time for it.
**Throughput math to keep honest:** 5120×1440@240 ≈ 1.77 Gpx/s. At 0.5 bpp that's ~885 Mbps; 0.6 bpp ≈ 1.06 Gbps; 0.8 bpp (4:4:4 headroom) ≈ 1.4 Gbps. The GF(2¹⁶) FEC + multi-block framing must sustain these without the per-frame shard count being the limiter — which it no longer is once you leave GF(2⁸).
---
## 8. Milestones
Sizing is rough and relative (Spike / S / M / L) for a focused solo dev; treat as ordering, not deadlines.
**M0 — Pipeline spike (S).** wlroots headless output → PipeWire capture → VAAPI/NVENC encode → dump H.265 to a file that plays. *Acceptance:* a valid encoded file from a virtual output, no streaming yet. Proves the Linux capture+encode chain end-to-end.
**M1 — `lumen-core` skeleton + C ABI (M).** Session lifecycle, GameStream-compatible packetization and GF(2⁸) FEC (P1), AES-GCM, `cbindgen` header, a tiny C test harness. *Acceptance:* core links from C; round-trips packets in a loopback test with simulated loss.
**M2 — P1 host: stream to stock Moonlight (L).** Wire M0's pipeline into the core; implement `serverinfo`/pairing/RTSP enough for a real Moonlight client to connect, with a KWin virtual output created on connect and destroyed on disconnect. Input via `reis`/uinput. *Acceptance:* **you play a game on your KDE box streamed to a stock Moonlight client on a virtual display, no dummy plug, no kernel args.** This is the shippable milestone and the project's reason to exist.
**M3 — Measurement harness (S).** Glass-to-glass latency measurement (on-screen QR/timestamp or photodiode), packet-loss injection, frame-pacing and stall metrics surfaced in the web UI. *Acceptance:* you can quantify a regression. Build this before optimizing anything.
**M4 — P2 transport: break the wall (L).** Add `lumen/1` negotiation; swap to `reed-solomon-simd` GF(2¹⁶) with multi-block per-frame framing; optional QUIC control/audio. Write a minimal **Rust** reference client (decode via VAAPI, present via wgpu/Vulkan) to exercise it. *Acceptance:* a stable stream above 1.4 Gbps at 5120×1440@240 with loss recovery working; latency unchanged vs. M2.
**M5 — Apple client (L).** Swift + VideoToolbox + Metal + SwiftUI, linking `lumen-core` via the C header. *Acceptance:* the Mac Studio plays a stream at native resolution/refresh.
**M6 — Feature surface (M, ongoing).** Mic passthrough as a proper encrypted, per-client reverse audio stream (the thing the upstream PR got wrong); HDR signalling; per-client identity/permissions; pause/resume. *Acceptance:* feature parity with Apollo on the items you care about, plus mic done right.
---
## 9. Risk register
| Risk | Likelihood | Impact | Mitigation |
|------|-----------|--------|------------|
| KWin runtime virtual-output API is undocumented/unstable | High | High | Spike on wlroots first to de-risk the pipeline; study KRdp's source for the KWin path; keep `VirtualDisplay` pluggable so a stuck compositor doesn't block the project |
| Wayland input injection gaps (libei still evolving) | Med | Med | uinput fallback always available; `reis` for the Wayland-native path |
| dmabuf → encoder zero-copy import quirks per GPU/driver | High | Med | Validate on your actual NVIDIA + AMD hardware early (M0); have a CPU-copy fallback path |
| Encoder/decoder can't sustain 1.77 Gpx/s @ 240 | Med | High | Measure in M0/M4 on real silicon; this is a hardware ceiling no rewrite fixes — discover it before P2, not after |
| Frame pacing eats more time than expected | High | Med | M3 measurement harness first; treat pacing as a first-class subsystem, not a polish step |
| Scope creep into a full Moonlight replacement | High | High | P1 (stock-client compat) is the firewall: it forces you to ship value before writing a client |
| Solo bandwidth vs. your other projects (ENRW thesis, played) | High | Med | M2 is a complete, useful artifact on its own; the plan is safe to pause after any milestone |
---
## 10. Testing & measurement
- **Loopback correctness:** core encodes→FEC→loss-inject→recover→decode in-process; property tests over loss patterns and shard counts (proptest).
- **Glass-to-glass latency:** rendered timestamp/QR on host, read back on client capture; or a photodiode for true photons. Track p50/p99.
- **Loss resilience:** `tc netem` to inject loss/jitter/reorder; verify FEC recovery and graceful degradation.
- **Pacing:** log present timestamps vs. client vsync; alert on stalls and duplicate/dropped frames.
- **Soak:** multi-hour streams; watch for buffer growth, fd leaks, encoder session exhaustion.
- **Hardware matrix:** your NVIDIA box (NVENC), an AMD/Intel box (VAAPI), Mac Studio (VideoToolbox decode). Catch driver quirks early.
---
## 11. Repo / workspace structure
```
lumen/
├── Cargo.toml # workspace
├── crates/
│ ├── lumen-core/ # protocol, FEC, pacing, crypto — C ABI (cdylib + staticlib)
│ │ ├── src/abi.rs # #[no_mangle] extern "C" surface
│ │ ├── src/fec.rs # GF(2^16) blocking over reed-solomon-simd
│ │ ├── src/transport/ # udp+fec video, quinn control/audio
│ │ ├── src/protocol/ # gamestream-compat (P1) + lumen/1 (P2)
│ │ └── cbindgen.toml
│ ├── lumen-host/ # Linux host binary
│ │ ├── src/capture/ # pipewire / portal
│ │ ├── src/encode/ # ffmpeg vaapi/nvenc
│ │ ├── src/vdisplay/ # trait + kwin/wlroots/mutter impls
│ │ ├── src/input/ # reis + uinput
│ │ └── src/web/ # axum config/pairing API
│ └── lumen-client-rs/ # reference Rust client (M4)
├── clients/
│ ├── apple/ # Swift package, imports lumen_core.h (M5)
│ └── android/ # Kotlin + JNI (later)
├── include/ # generated lumen_core.h
└── tools/
├── latency-probe/
└── loss-harness/
```
---
## 12. Immediate next actions (first week)
1. **Stand up the workspace** with `lumen-core` (empty ABI + `cbindgen`) and `lumen-host` skeletons; CI on your Gitea (you already have BuildKit pipelines).
2. **M0 spike on wlroots:** headless output → PipeWire capture → NVENC/VAAPI encode → playable file. This validates the riskiest *pipeline* assumptions in days, on your real GPU.
3. **Read KRdp's source** for how KDE creates virtual outputs and casts them — it's the closest existing reference for the KWin path you'll need in M2.
4. **Decide P1 protocol depth:** confirm exactly which `serverinfo`/RTSP/pairing messages a current Moonlight client requires for a successful connect, so M2's compat surface is scoped precisely (this is also the question to take back to the dev who mentioned the 1G limit).
---
*The shape of the bet: M2 alone — virtual-display streaming to stock Moonlight clients on Linux — is a complete, useful, gap-filling release. Everything after it (the wall-breaking transport, native clients, mic-done-right) is upside you unlock from a position of having already shipped, with the hard transport work resting on a FEC core that makes the 1 Gbps ceiling a thing of the past rather than a thing to hack around.*
+237
View File
@@ -0,0 +1,237 @@
/* lumen-core C ABI — see crates/lumen-core/src/abi.rs */
#ifndef LUMEN_CORE_H
#define LUMEN_CORE_H
#pragma once
/* Generated by cbindgen from lumen-core. Do not edit by hand. */
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
// Bump on any breaking change to the [C ABI](crate::abi). Mirrors
// `lumen_abi_version()` and is checked by clients before use.
#define ABI_VERSION 1
// 16-byte AEAD authentication tag appended by GCM.
#define TAG_LEN 16
// Wire tag distinguishing an input datagram from a video packet.
#define INPUT_MAGIC 200
// Fixed serialized size of an [`InputEvent`] on the wire (tag + fields).
#define INPUT_WIRE_LEN (((((1 + 1) + 4) + 4) + 4) + 4)
// Identifies a lumen video packet (vs. an input datagram, see [`crate::input`]).
#define LUMEN_MAGIC 201
#define FLAG_PIC 1
#define FLAG_EOF 2
#define FLAG_SOF 4
// Largest UDP datagram the core will send or accept. `Config::validate` bounds
// `shard_payload` so `HEADER_LEN + shard_payload + CRYPTO_OVERHEAD ≤ MAX_DATAGRAM_BYTES`.
#define MAX_DATAGRAM_BYTES 2048
// Stable C ABI status codes. `Ok` is 0; all errors are negative so callers can
// test `rc < 0`. Do not renumber existing variants — only append.
enum LumenStatus
#if defined(__cplusplus) || __STDC_VERSION__ >= 202311L
: int32_t
#endif // defined(__cplusplus) || __STDC_VERSION__ >= 202311L
{
LUMEN_STATUS_OK = 0,
LUMEN_STATUS_INVALID_ARG = -1,
LUMEN_STATUS_FEC = -2,
LUMEN_STATUS_CRYPTO = -3,
LUMEN_STATUS_BAD_PACKET = -4,
LUMEN_STATUS_NO_FRAME = -5,
LUMEN_STATUS_UNSUPPORTED = -6,
LUMEN_STATUS_IO = -7,
LUMEN_STATUS_NULL_POINTER = -8,
LUMEN_STATUS_PANIC = -99,
};
#ifndef __cplusplus
#if __STDC_VERSION__ >= 202311L
typedef enum LumenStatus LumenStatus;
#else
typedef int32_t LumenStatus;
#endif // __STDC_VERSION__ >= 202311L
#endif // __cplusplus
// Kinds of input event. `#[repr(u8)]` so it crosses the C ABI as a byte tag.
enum LumenInputKind
#if defined(__cplusplus) || __STDC_VERSION__ >= 202311L
: uint8_t
#endif // defined(__cplusplus) || __STDC_VERSION__ >= 202311L
{
LUMEN_INPUT_KIND_KEY_DOWN = 0,
LUMEN_INPUT_KIND_KEY_UP = 1,
// Relative motion: `x`/`y` carry `dx`/`dy`.
LUMEN_INPUT_KIND_MOUSE_MOVE = 2,
// Absolute motion: `x`/`y` carry pixel coordinates.
LUMEN_INPUT_KIND_MOUSE_MOVE_ABS = 3,
LUMEN_INPUT_KIND_MOUSE_BUTTON_DOWN = 4,
LUMEN_INPUT_KIND_MOUSE_BUTTON_UP = 5,
// `x` carries the (signed) scroll delta.
LUMEN_INPUT_KIND_MOUSE_SCROLL = 6,
LUMEN_INPUT_KIND_GAMEPAD_BUTTON = 7,
// `code` = axis id, `x` = axis value.
LUMEN_INPUT_KIND_GAMEPAD_AXIS = 8,
};
#ifndef __cplusplus
#if __STDC_VERSION__ >= 202311L
typedef enum LumenInputKind LumenInputKind;
#else
typedef uint8_t LumenInputKind;
#endif // __STDC_VERSION__ >= 202311L
#endif // __cplusplus
// Opaque session handle. Pointer-only from C.
typedef struct LumenSession LumenSession;
// Forward-compatible session configuration. The caller MUST set `struct_size` to
// `sizeof(LumenConfig)`; the core uses it to detect ABI skew.
typedef struct {
uint32_t struct_size;
// 0 = host, 1 = client.
uint32_t role;
// 1 = P1 (GameStream-compatible), 2 = P2 (`lumen/1`).
uint32_t phase;
// 0 = GF(2⁸), 1 = GF(2¹⁶).
uint32_t fec_scheme;
uint32_t fec_percent;
uint32_t max_data_per_block;
uint32_t shard_payload;
// Non-zero enables AES-128-GCM.
uint32_t encrypt;
uint8_t key[16];
uint8_t salt[4];
// Test hook for the loopback transport; 0 in production.
uint32_t loopback_drop_period;
// Largest encoded access unit the receiver will accept (bounds reassembler memory).
uint64_t max_frame_bytes;
} LumenConfig;
// A reassembled access unit. `data`/`len` borrow session-owned memory valid until the
// next `lumen_client_poll_frame`/`lumen_session_free` on the same session.
typedef struct {
const uint8_t *data;
uintptr_t len;
uint32_t frame_index;
uint64_t pts_ns;
uint32_t flags;
} LumenFrame;
// A single input event. `#[repr(C)]` — shared verbatim with the C ABI as
// `LumenInputEvent`.
typedef struct {
LumenInputKind kind;
uint8_t _pad[3];
// keycode / button id / axis id, depending on `kind`.
uint32_t code;
// x / dx / abs-x / axis-value / scroll-delta, depending on `kind`.
int32_t x;
// y / dy / abs-y, depending on `kind`.
int32_t y;
// modifier bitmask or gamepad index.
uint32_t flags;
} LumenInputEvent;
// Snapshot of session counters.
typedef struct {
uint64_t frames_submitted;
uint64_t frames_completed;
uint64_t frames_dropped;
uint64_t packets_sent;
uint64_t packets_received;
uint64_t packets_dropped;
uint64_t fec_recovered_shards;
uint64_t bytes_sent;
uint64_t bytes_received;
} LumenStats;
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
// Current ABI version. Mismatch with [`crate::ABI_VERSION`] means incompatible core.
uint32_t lumen_abi_version(void);
// Create a session over a real UDP transport (`local`/`peer` are `host:port` strings).
// Returns NULL on error.
//
// # Safety
// `cfg`, `local`, `peer` must be valid pointers; the strings must be NUL-terminated.
LumenSession *lumen_session_new(const LumenConfig *cfg, const char *local, const char *peer);
// Create a connected host+client session pair sharing an in-process loopback
// transport. Test/dev only — exercises the full FEC + framing path without a network.
//
// # Safety
// All four pointers must be valid; the two out-params receive owned handles.
LumenStatus lumen_test_loopback_pair(const LumenConfig *host_cfg,
const LumenConfig *client_cfg,
LumenSession **out_host,
LumenSession **out_client);
// Free a session handle. Safe to call with NULL.
//
// # Safety
// `s` must be a handle from `lumen_session_new`/`lumen_test_loopback_pair`, freed once.
void lumen_session_free(LumenSession *s);
// Host: FEC-protect, packetize, seal and send one encoded access unit.
//
// # Safety
// `s` is a valid host handle; `data` points to `len` readable bytes (or `len == 0`).
LumenStatus lumen_host_submit_frame(LumenSession *s,
const uint8_t *data,
uintptr_t len,
uint64_t pts_ns,
uint32_t flags);
// Client: poll for the next reassembled access unit. Returns [`LumenStatus::NoFrame`]
// when nothing is ready yet. On `Ok`, `*out` borrows session memory until the next poll.
//
// # Safety
// `s` is a valid client handle; `out` points to a writable `LumenFrame`.
LumenStatus lumen_client_poll_frame(LumenSession *s, LumenFrame *out);
// Client: serialize and send one input event to the host.
//
// # Safety
// `s` is a valid client handle; `ev` points to a valid [`InputEvent`].
LumenStatus lumen_send_input(LumenSession *s, const LumenInputEvent *ev);
// Register the host-side input callback (pass a NULL fn pointer to clear). The callback
// fires from within [`lumen_host_poll_input`], on the calling thread.
//
// # Safety
// `s` is a valid host handle; `user` is passed back verbatim to `cb`.
LumenStatus lumen_set_input_callback(LumenSession *s, void (*cb)(const LumenInputEvent *event,
void *user), void *user);
// Host: drain all pending input events, invoking the registered callback for each.
// Returns the count dispatched (≥ 0), or a negative [`LumenStatus`] on error.
//
// # Safety
// `s` is a valid host handle.
int32_t lumen_host_poll_input(LumenSession *s);
// Copy session counters into `*out`.
//
// # Safety
// `s` is a valid handle; `out` points to a writable `LumenStats`.
LumenStatus lumen_get_stats(LumenSession *s, LumenStats *out);
#ifdef __cplusplus
} // extern "C"
#endif // __cplusplus
#endif /* LUMEN_CORE_H */
+3
View File
@@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = ["rustfmt", "clippy"]
+11
View File
@@ -0,0 +1,11 @@
[package]
name = "latency-probe"
description = "Glass-to-glass latency measurement (plan §10, M3)"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
[dependencies]
+11
View File
@@ -0,0 +1,11 @@
//! `latency-probe` — glass-to-glass latency measurement (plan §10, M3).
//!
//! Renders a timestamp/QR on the host, reads it back on the client capture (or a
//! photodiode for true photons), and tracks p50/p99. Build this before optimizing
//! anything, so regressions are quantifiable.
//!
//! Status: scaffold.
fn main() {
println!("latency-probe: scaffold (M3 — render timestamp/QR on host, read back on client; track p50/p99)");
}
+12
View File
@@ -0,0 +1,12 @@
[package]
name = "loss-harness"
description = "FEC loss-resilience sweep over the lumen-core loopback (plan §10)"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
[dependencies]
lumen-core = { path = "../../crates/lumen-core" }
+81
View File
@@ -0,0 +1,81 @@
//! `loss-harness` — sweep packet loss against the FEC and report recovery (plan §10).
//!
//! Drives access units through the in-process loopback at increasing loss rates, for
//! both FEC schemes, and prints how many frames survive. A pure-software stand-in for
//! `tc netem` that needs no network and runs anywhere `lumen_core` builds. The real M3
//! harness adds `tc netem` jitter/reorder on the UDP path.
use lumen_core::config::{Config, FecConfig, FecScheme, ProtocolPhase, Role};
use lumen_core::error::LumenError;
use lumen_core::session::Session;
use lumen_core::transport::loopback_pair;
fn config(role: Role, scheme: FecScheme, drop_period: u32) -> Config {
Config {
role,
phase: match scheme {
FecScheme::Gf8 => ProtocolPhase::P1GameStream,
FecScheme::Gf16 => ProtocolPhase::P2Lumen,
},
fec: FecConfig {
scheme,
fec_percent: 25,
max_data_per_block: 64,
},
shard_payload: 1024,
max_frame_bytes: 8 * 1024 * 1024,
encrypt: false,
key: [0u8; 16],
salt: [0u8; 4],
loopback_drop_period: drop_period,
}
}
/// Returns (frames_completed, frames_attempted) for a loss setting.
fn run(scheme: FecScheme, drop_period: u32, frames: usize, frame_len: usize) -> (usize, usize) {
let (h, c) = loopback_pair(drop_period, 0);
let mut host = Session::new(config(Role::Host, scheme, drop_period), Box::new(h)).unwrap();
let mut client = Session::new(config(Role::Client, scheme, drop_period), Box::new(c)).unwrap();
let mut completed = 0;
for f in 0..frames {
let frame: Vec<u8> = (0..frame_len).map(|b| (b ^ f) as u8).collect();
host.submit_frame(&frame, f as u64, 0).unwrap();
match client.poll_frame() {
Ok(got) => {
if got.data == frame {
completed += 1;
}
}
Err(LumenError::NoFrame) => {} // unrecoverable at this loss rate
Err(e) => panic!("unexpected error: {e}"),
}
}
(completed, frames)
}
fn main() {
let frames = 50;
let frame_len = 100_000; // ~98 shards across 2 FEC blocks
let periods = [0u32, 32, 16, 8, 6, 4, 3, 2];
println!("lumen loss-harness — 25% FEC, {frames} frames of {frame_len} bytes");
println!("(GF8 = P1/GameStream-compat, GF16 = P2/wall-breaker)\n");
println!(
"{:>10} {:>9} {:>14} {:>14}",
"drop 1/N", "~loss %", "GF8 recovered", "GF16 recovered"
);
println!("{}", "-".repeat(56));
for &p in &periods {
let loss = if p == 0 { 0.0 } else { 100.0 / p as f64 };
let (g8, n) = run(FecScheme::Gf8, p, frames, frame_len);
let (g16, _) = run(FecScheme::Gf16, p, frames, frame_len);
let label = if p == 0 {
"none".to_string()
} else {
format!("1/{p}")
};
println!("{label:>10} {loss:>8.1}% {:>11}/{n} {:>11}/{n}", g8, g16);
}
println!("\nNote: recovery drops off once per-block loss exceeds the 25% recovery budget.");
}