diff --git a/CLAUDE.md b/CLAUDE.md index 55732dc..800e3b3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -26,13 +26,21 @@ Low-latency desktop/game streaming stack, Linux-first, with a shared Rust protoc socket, wlr protocols on Sway) and **gamepads** (uinput X-Box-360 pads + rumble back-channel; live validation pending the udev rule below). Management REST API + checked-in OpenAPI doc (`mgmt.rs`). -- **M3 (`lumen/1`, the native protocol): seeded and validated.** QUIC control plane - (`lumen-core` `quic` feature: Hello{mode}/Welcome{full Config}/Start), data plane = the - hardened M1 `Session` over raw UDP with **GF(2¹⁶) Leopard FEC + AES-GCM** (inexpressible - in GameStream), input over **QUIC datagrams**, host creates the native virtual output at - the client's requested mode. Measured on-box at 720p120: 1680/1680 frames, **p50 0.83 ms** - capture→encode→FEC→crypto→UDP→reassembled. `lumen-client-rs` is a working (headless) - reference client. Trust is seed-stage (self-signed / accept-any). +- **M3 (`lumen/1`, the native protocol): full session planes, validated live.** QUIC + control plane (`lumen-core` `quic` feature: Hello{mode}/Welcome{full Config}/Start), data + plane = the hardened M1 `Session` over raw UDP with **GF(2¹⁶) Leopard FEC + AES-GCM** + (inexpressible in GameStream), host creates the native virtual output at the client's + requested mode. `m3-host` is a **persistent listener** (sessions back to back; + `--max-sessions`). QUIC datagrams carry the side planes, demuxed by first byte: input + 0xC8 (incl. **gamepads** — incremental events accumulated into the uinput xpad), **Opus + audio** 0xC9 (48 kHz stereo, 5 ms, host→client), **rumble** 0xCA (host→client). **Trust:** + host serves its persistent identity (`~/.config/lumen/cert.pem`, shared with GameStream + pairing) and logs the SHA-256 fingerprint; clients pin it (TOFU on first connect — + `endpoint::client_pinned`). Measured on-box at 720p120: 1680/1680 frames, **p50 0.83 ms** + capture→…→reassembled; audio measured live (~200 pkts/s). `lumen-client-rs` is the + working reference client (`--pin`, datagram counters, `--input-test` incl. gamepad). + The embeddable connector (`NativeClient`) exposes it all over the C ABI: `lumen_connect` + (pin/TOFU) + `next_au`/`next_audio`/`next_rumble`/`send_input`. ## What's left @@ -45,10 +53,12 @@ Low-latency desktop/game streaming stack, Linux-first, with a shared Rust protoc 2. **Sub-frame pipelining**: overlap encode and transmit within a frame. Requires a direct NVENC SDK wrapper (libavcodec only emits whole AUs) — the next big latency lever (~2–4 ms at high res). -3. **lumen/1 trust model**: pairing + certificate pinning to replace accept-any. +3. **lumen/1 protocol growth**: a PIN-style pairing ceremony on top of fingerprint pinning, + mid-stream mode renegotiation (the Welcome is one-shot today), concurrent sessions + (today: one at a time, extras wait in the accept queue). 4. **M2 polish**: wlroots/Sway `VirtualDisplay` backend (deferred; swaymsg `create_output`), - GNOME live validation, gamepad live validation, HDR/10-bit/AV1 negotiation, surround - audio, reconnect-at-new-mode robustness. + GNOME live validation, gamepad live validation (blocked on the udev rule below), + HDR/10-bit/AV1 negotiation, surround audio, reconnect-at-new-mode robustness. 5. **Native clients** (`clients/{apple,android}` scaffolds) consuming `lumen_core.h`. 6. **This box, one-time setup still pending**: `sudo cp scripts/60-lumen.rules /etc/udev/rules.d/` + user into `input` group (gamepads); `sudo ninja -C @@ -59,7 +69,7 @@ Low-latency desktop/game streaming stack, Linux-first, with a shared Rust protoc ```sh cargo build --workspace # green on Linux and macOS -cargo test --workspace # unit + loopback + proptest + C ABI harness (~92 tests) +cargo test --workspace # unit + loopback + proptest + C ABI harness (~97 tests) cargo clippy --workspace --all-targets -- -D warnings cargo fmt --all --check @@ -123,9 +133,10 @@ kwin_wayland --virtual --width 1920 --height 1080 --no-lockscreen --socket wayla WAYLAND_DISPLAY=wayland-kde XDG_CURRENT_DESKTOP=KDE LUMEN_VIDEO_SOURCE=virtual \ LUMEN_ZEROCOPY=1 PATH=/tmp/gamescope-src/build/src:$PATH cargo run -rp lumen-host -- serve -# lumen/1 native loopback test (no Moonlight needed): -cargo run -rp lumen-host -- m3-host --source virtual --seconds 10 # + LUMEN_COMPOSITOR=gamescope etc. -cargo run -rp lumen-client-rs -- --mode 1280x720x120 --out /tmp/a.h265 --input-test +# lumen/1 native loopback test (no Moonlight needed; same env as serve, listener persists +# across sessions — bound it with --max-sessions): +cargo run -rp lumen-host -- m3-host --source virtual --seconds 10 --max-sessions 1 +cargo run -rp lumen-client-rs -- --mode 1280x720x120 --out /tmp/a.h265 --input-test # + --pin HEX ``` Pinned crate facts: `ashpd` 0.13 + `pipewire` 0.9 (must match ashpd's) + `ffmpeg-next` 8.x diff --git a/Cargo.lock b/Cargo.lock index 0c0c242..fde1b66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,6 +1486,8 @@ dependencies = [ "rcgen", "reed-solomon-simd", "rustls", + "rustls-pki-types", + "sha2", "thiserror 2.0.18", "tokio", "tracing", diff --git a/clients/apple/README.md b/clients/apple/README.md index a60a41e..77eb995 100644 --- a/clients/apple/README.md +++ b/clients/apple/README.md @@ -8,15 +8,20 @@ is the Swift shell: decode (VideoToolbox), present (SwiftUI), input capture. ## What exists (built + tested on the Linux host) - **The connector**: `lumen_core::client::NativeClient` (Rust) exposed over the C ABI as - `lumen_connect` / `lumen_connection_next_au` / `lumen_connection_send_input` / - `lumen_connection_mode` / `lumen_connection_close` (see `include/lumen_core.h`, guarded - by `LUMEN_FEATURE_QUIC`). **End-to-end tested through the C ABI** against an in-process - host (`crates/lumen-host/src/m3.rs::tests::c_abi_connection_roundtrip`). + `lumen_connect` / `lumen_connection_next_au` / `lumen_connection_next_audio` / + `lumen_connection_next_rumble` / `lumen_connection_send_input` / `lumen_connection_mode` + / `lumen_connection_close` (see `include/lumen_core.h`, guarded by `LUMEN_FEATURE_QUIC`). + **End-to-end tested through the C ABI** against an in-process host + (`crates/lumen-host/src/m3.rs::tests::c_abi_connection_roundtrip` — three sequential + sessions: TOFU, pinned reconnect, wrong-pin rejection). - **The host to test against**: `lumen-host m3-host --source virtual --seconds 60` on the - Linux box (it creates a native virtual output at whatever mode the client requests and - streams HEVC; `LUMEN_COMPOSITOR=gamescope LUMEN_GAMESCOPE_APP=vkcube` for moving content). + Linux box — a **persistent listener** (sessions back to back, reconnect at will during + development; `--max-sessions N` to bound it). It creates a native virtual output at + whatever mode the client requests and streams HEVC + desktop **Opus audio**; + `LUMEN_COMPOSITOR=gamescope LUMEN_GAMESCOPE_APP=vkcube` for moving content. - **This package (SCAFFOLD — written on Linux, never compiled in Xcode)**: - - `LumenConnection.swift` — Swift wrapper over the C ABI (AUs copied into `Data`). + - `LumenConnection.swift` — Swift wrapper over the C ABI (AUs/audio copied into `Data`; + certificate pinning + TOFU fingerprint via `pinSHA256:`/`hostFingerprint`). - `AnnexB.swift` — in-band VPS/SPS/PPS → `CMVideoFormatDescription`; Annex-B → AVCC `CMSampleBuffer` with `DisplayImmediately` set. - `StreamView.swift` — SwiftUI `NSViewRepresentable` over `AVSampleBufferDisplayLayer` @@ -62,8 +67,9 @@ struct ContentView: View { zero-init, `_pad` tuple shape on `LumenInputEvent`. 2. **ABI contract** (matches `lumen_core.h` docs): `next_au`'s pointer is valid only until the *next* call on that handle (we copy to `Data` immediately); one pump thread per - connection; `send_input` is enqueue-only and thread-safe alongside it; `close` joins the - Rust threads — never call it with a `next_au` call in flight. + connection, plus optionally one *separate* audio thread for `next_audio` (independent + borrow slots); `send_input` is enqueue-only and thread-safe alongside both; `close` + joins the Rust threads — never call it with a `next_au`/`next_audio` call in flight. 3. **Decode flow**: the host opens every stream with an IDR carrying VPS/SPS/PPS in-band, and recovery keyframes re-send them — so "wait for the first format description, refresh it on every IDR" (already what `StreamView` does) is sufficient; there is no out-of-band @@ -79,21 +85,27 @@ struct ContentView: View { and add glass-to-glass measurement (`tools/latency-probe` is the scaffold; the host already stamps `pts_ns` with its capture wall clock — across machines you'll need a clock-offset estimate from the QUIC RTT, or the probe's visual timestamp loop). -6. **Gamepads**: `GCController` → `GamepadButton`/`GamepadAxis` `LumenInputEvent`s. The - host does NOT yet route those kinds in `m3.rs`'s injector path (mouse/keys work; the - gamepad kinds need a `GamepadManager` hookup like the GameStream control stream has — - small host-side task). -7. **Trust model is seed-stage**: the client accepts any host certificate - (`endpoint::client_insecure`). Pairing + pinning is a planned lumen-core task; design it - alongside this client's "add host" UX. -8. **iOS**: same package (`BUILD_IOS=1` for the xcframework slice); `StreamView` needs the +6. **Audio**: `nextAudio()` yields raw Opus packets (48 kHz stereo, one 5 ms frame each, + sequence-numbered). Decode with libopus or `AVAudioConverter`/`kAudioFormatOpus` into an + `AVAudioEngine` source node; conceal gaps (drop/dup) rather than blocking — the Rust + side buffers 320 ms and drops the newest packet when the puller lags. Wall-clock + `ptsNs` shares the host clock with video AUs for A/V sync. +7. **Gamepads**: `GCController` → `.gamepadButton(...)`/`.gamepadAxis(...)` events (wire + contract documented on the constructors; the host accumulates them into a virtual + Xbox 360 pad). Poll `nextRumble()` and feed `GCDeviceHaptics` for force feedback. +8. **Trust**: connect once with `pinSHA256: nil` (TOFU), persist `hostFingerprint` keyed + by host, pass it on every later connect — a mismatch throws `.connectFailed`. The host + logs its fingerprint at startup ("clients pin this fingerprint") for out-of-band + verification UX; a PIN-style pairing ceremony is a later lumen-core task. +9. **iOS**: same package (`BUILD_IOS=1` for the xcframework slice); `StreamView` needs the `UIViewRepresentable` twin and touch→input mapping. ## Known limitations of the current host (relevant to client UX) -- `m3-host` serves **one session and exits** — fine for development; the persistent - lumen/1 listener (serve-style) is a small host-side task. -- No audio on lumen/1 yet (the GameStream path has it; porting the Opus stream onto a - second datagram flow is straightforward). +- One session **at a time** (the listener is persistent, but a second concurrent client + waits in the accept queue until the current session ends — the virtual output and + encoder are single-tenant). - Mid-stream renegotiation (resolution change without reconnect) is designed-for but not implemented (the Welcome is one-shot today). +- Host-side gamepad injection needs `/dev/uinput` access on the box (udev rule from + `docs/linux-setup.md`). diff --git a/clients/apple/Sources/LumenKit/LumenConnection.swift b/clients/apple/Sources/LumenKit/LumenConnection.swift index ea597b5..0f5cea4 100644 --- a/clients/apple/Sources/LumenKit/LumenConnection.swift +++ b/clients/apple/Sources/LumenKit/LumenConnection.swift @@ -1,9 +1,14 @@ // Swift wrapper around the lumen-core C ABI's lumen/1 connection API. // // Threading contract (mirrors the C header): one LumenConnection is used from a single -// pump thread for nextAU(); sendInput() is enqueue-only and safe alongside it. The pointer -// inside an AU is only valid until the next nextAU() call, so we copy into Data here — -// the copy is small (an encoded AU, tens of KB) and keeps the Swift side memory-safe. +// pump thread for nextAU(); nextAudio() may run on its own (single) audio thread; +// sendInput() is enqueue-only and safe alongside both. The pointers inside an AU/audio +// packet are only valid until the next call of the same kind, so we copy into Data here — +// the copies are small and keep the Swift side memory-safe. +// +// Trust: pass the host's pinned certificate fingerprint (the host logs it at startup, and +// `hostFingerprint` reports what a trust-on-first-use connect observed — persist it, e.g. +// in UserDefaults keyed by host, and pin it from then on). // // SCAFFOLD: written on the Linux host, not yet compiled against Xcode — expect to fix // trivial issues on first build (see README.md "Handoff"). @@ -19,8 +24,20 @@ public struct AccessUnit: Sendable { public let flags: UInt32 } +/// One Opus audio packet (48 kHz stereo, 5 ms frames) — decode with AVAudioConverter +/// (`kAudioFormatOpus`) or libopus into an AVAudioEngine source node. +public struct AudioPacket: Sendable { + public let data: Data + public let ptsNs: UInt64 + public let seq: UInt32 +} + public enum LumenClientError: Error { + /// Connect failed — wrong host/port, timeout, or a certificate-pin mismatch. case connectFailed + /// `pinSHA256` was non-nil but not exactly 32 bytes. Failing closed: connecting + /// unpinned when the caller asked for verification would be a silent trust downgrade. + case invalidPin case closed } @@ -32,17 +49,36 @@ public final class LumenConnection { public private(set) var height: UInt32 = 0 public private(set) var refreshHz: UInt32 = 0 + /// SHA-256 fingerprint of the certificate the host presented (32 bytes). After a + /// trust-on-first-use connect, persist this and pass it as `pinSHA256` next time. + public private(set) var hostFingerprint: Data = Data() + /// Connect and start a session at the requested mode (the host creates a native virtual /// output at exactly this size/refresh). Blocks up to `timeoutMs`. + /// + /// `pinSHA256`: the host's expected certificate fingerprint (exactly 32 bytes, else + /// `invalidPin` is thrown — never silently downgraded); nil = trust on first use + /// (check `hostFingerprint` afterwards). A pinned mismatch throws. public init( host: String, port: UInt16 = 9777, width: UInt32, height: UInt32, refreshHz: UInt32, + pinSHA256: Data? = nil, timeoutMs: UInt32 = 10_000 ) throws { + if let pin = pinSHA256, pin.count != 32 { throw LumenClientError.invalidPin } + var observed = [UInt8](repeating: 0, count: 32) handle = host.withCString { cs in - lumen_connect(cs, port, width, height, refreshHz, timeoutMs) + if let pin = pinSHA256 { + return pin.withUnsafeBytes { p in + lumen_connect( + cs, port, width, height, refreshHz, + p.bindMemory(to: UInt8.self).baseAddress, &observed, timeoutMs) + } + } + return lumen_connect(cs, port, width, height, refreshHz, nil, &observed, timeoutMs) } guard handle != nil else { throw LumenClientError.connectFailed } + hostFingerprint = Data(observed) var w: UInt32 = 0, h: UInt32 = 0, hz: UInt32 = 0 _ = lumen_connection_mode(handle, &w, &h, &hz) self.width = w @@ -68,6 +104,35 @@ public final class LumenConnection { } } + /// Pull the next Opus audio packet; nil on timeout, throws once the session is closed. + /// Drain from a dedicated audio thread — packets arrive every 5 ms (320 ms buffered). + public func nextAudio(timeoutMs: UInt32 = 100) throws -> AudioPacket? { + var pkt = LumenAudioPacket() + switch lumen_connection_next_audio(handle, &pkt, timeoutMs) { + case LUMEN_STATUS_OK: + let data = Data(bytes: pkt.data, count: pkt.len) // copy: ptr valid only until next call + return AudioPacket(data: data, ptsNs: pkt.pts_ns, seq: pkt.seq) + case LUMEN_STATUS_NO_FRAME: + return nil + default: + throw LumenClientError.closed + } + } + + /// Pull the next force-feedback update for the GCController haptics engine: + /// `(pad, lowFrequency, highFrequency)` with 0...0xFFFF amplitudes, (0, 0) = stop. + public func nextRumble(timeoutMs: UInt32 = 100) throws -> (pad: UInt16, low: UInt16, high: UInt16)? { + var pad: UInt16 = 0, low: UInt16 = 0, high: UInt16 = 0 + switch lumen_connection_next_rumble(handle, &pad, &low, &high, timeoutMs) { + case LUMEN_STATUS_OK: + return (pad, low, high) + case LUMEN_STATUS_NO_FRAME: + return nil + default: + throw LumenClientError.closed + } + } + /// Send one input event (delivered to the host as a QUIC datagram). public func send(_ event: LumenInputEvent) { var ev = event @@ -103,4 +168,23 @@ public extension LumenInputEvent { static func scroll(_ delta: Int32) -> LumenInputEvent { LumenInputEvent(kind: LUMEN_INPUT_KIND_MOUSE_SCROLL, _pad: (0, 0, 0), code: 0, x: delta, y: 0, flags: 0) } + + // Gamepad (wire contract in lumen_core::input::gamepad): one transition per event, + // `pad` = controller index, accumulated host-side into a virtual Xbox 360 pad. + + /// `button` is a GameStream buttonFlags bit (A=0x1000 B=0x2000 X=0x4000 Y=0x8000, + /// dpad=0x1/2/4/8, start=0x10 back=0x20 LS=0x40 RS=0x80 LB=0x100 RB=0x200 guide=0x400). + static func gamepadButton(_ button: UInt32, down: Bool, pad: UInt32 = 0) -> LumenInputEvent { + LumenInputEvent( + kind: LUMEN_INPUT_KIND_GAMEPAD_BUTTON, + _pad: (0, 0, 0), code: button, x: down ? 1 : 0, y: 0, flags: pad) + } + + /// Axis ids: 0=LSX 1=LSY 2=RSX 3=RSY (−32768...32767, XInput convention: +y = UP — + /// `GCControllerDirectionPad.yAxis` already matches, no flip), 4=LT 5=RT (0...255). + static func gamepadAxis(_ axis: UInt32, value: Int32, pad: UInt32 = 0) -> LumenInputEvent { + LumenInputEvent( + kind: LUMEN_INPUT_KIND_GAMEPAD_AXIS, + _pad: (0, 0, 0), code: axis, x: value, y: 0, flags: pad) + } } diff --git a/crates/lumen-client-rs/src/main.rs b/crates/lumen-client-rs/src/main.rs index eaed65a..90ea18f 100644 --- a/crates/lumen-client-rs/src/main.rs +++ b/crates/lumen-client-rs/src/main.rs @@ -9,8 +9,13 @@ //! `--input-test` exercises the input plane: scripted mouse/keyboard datagrams during the //! stream (watch them land in the host session, e.g. xev inside gamescope). //! -//! Usage: `lumen-client-rs [--connect HOST:PORT] [--mode WxHxFPS] [--out FILE] [--input-test]` -//! (M4 adds VAAPI decode + wgpu present on this same skeleton.) +//! `--pin <64-hex>` pins the host's certificate fingerprint (the host logs it at startup); +//! without it the client trusts on first use and prints the observed fingerprint to pin. +//! Host→client datagrams (Opus audio, rumble) are counted and reported with the stream +//! stats — decode/playback is the platform clients' job. +//! +//! Usage: `lumen-client-rs [--connect HOST:PORT] [--mode WxHxFPS] [--out FILE] [--input-test] +//! [--pin HEX]` (M4 adds VAAPI decode + wgpu present on this same skeleton.) use anyhow::{anyhow, Context, Result}; use lumen_core::config::Role; @@ -25,6 +30,22 @@ struct Args { mode: Mode, out: Option, input_test: bool, + pin: Option<[u8; 32]>, +} + +fn parse_hex32(s: &str) -> Option<[u8; 32]> { + if s.len() != 64 { + return None; + } + let mut out = [0u8; 32]; + for (i, b) in out.iter_mut().enumerate() { + *b = u8::from_str_radix(&s[2 * i..2 * i + 2], 16).ok()?; + } + Some(out) +} + +fn hex(fp: &[u8; 32]) -> String { + fp.iter().map(|b| format!("{b:02x}")).collect() } fn parse_args() -> Args { @@ -49,11 +70,26 @@ fn parse_args() -> Args { height: 720, refresh_hz: 60, }); + // A present-but-malformed --pin must abort, not silently downgrade to trust-on-first-use + // (the user asked for verification; fail closed). + let pin = match get("--pin") { + None => None, + Some(s) => { + match parse_hex32(s) { + Some(p) => Some(p), + None => { + eprintln!("--pin must be exactly 64 hex chars (the host logs its fingerprint at startup)"); + std::process::exit(2); + } + } + } + }; Args { connect: get("--connect").unwrap_or("127.0.0.1:9777").to_string(), mode, out: get("--out").map(String::from), input_test: argv.iter().any(|a| a == "--input-test"), + pin, } } @@ -87,13 +123,22 @@ fn run(args: Args) -> Result<()> { async fn session(args: Args) -> Result<()> { let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?; - let ep = endpoint::client_insecure().map_err(|e| anyhow!("QUIC client endpoint: {e}"))?; + let (ep, observed) = endpoint::client_pinned(args.pin); + let ep = ep.map_err(|e| anyhow!("QUIC client endpoint: {e}"))?; let conn = ep .connect(remote, "lumen") .context("connect")? .await - .context("QUIC handshake")?; - tracing::info!(%remote, "lumen/1 connected"); + .context("QUIC handshake (a pin mismatch fails here)")?; + match (args.pin, *observed.lock().unwrap()) { + (Some(_), _) => tracing::info!(%remote, "lumen/1 connected — host fingerprint pinned"), + (None, Some(fp)) => tracing::info!( + %remote, + fingerprint = %hex(&fp), + "lumen/1 connected (trust-on-first-use) — pass --pin to verify this host" + ), + (None, None) => tracing::info!(%remote, "lumen/1 connected"), + } let (mut send, mut recv) = conn.open_bi().await.context("open control stream")?; io::write_msg( @@ -163,6 +208,29 @@ async fn session(args: Args) -> Result<()> { }; let _ = conn2.send_datagram(key.encode().to_vec().into()); } + // Gamepad plane: tap A + sweep the left stick on pad 0 (the host + // accumulates these into its virtual xpad; needs /dev/uinput access). + use lumen_core::input::gamepad::{AXIS_LS_X, BTN_A}; + let pad_events = [ + (InputKind::GamepadButton, BTN_A, 1), + (InputKind::GamepadButton, BTN_A, 0), + ( + InputKind::GamepadAxis, + AXIS_LS_X, + ((i as i32) % 64 - 32) * 1024, + ), + ]; + for (kind, code, x) in pad_events { + let ev = InputEvent { + kind, + _pad: [0; 3], + code, + x, + y: 0, + flags: 0, // pad index 0 + }; + let _ = conn2.send_datagram(ev.encode().to_vec().into()); + } } tokio::time::sleep(std::time::Duration::from_millis(40)).await; } @@ -181,13 +249,34 @@ async fn session(args: Args) -> Result<()> { }); } + // Host→client datagrams: count Opus audio + rumble (playback is the platform clients' + // job; here we verify the planes flow). + let audio_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let audio_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let rumble_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + { + let (a, ab, r) = (audio_pkts.clone(), audio_bytes.clone(), rumble_pkts.clone()); + let conn2 = conn.clone(); + tokio::spawn(async move { + use std::sync::atomic::Ordering::Relaxed; + while let Ok(d) = conn2.read_datagram().await { + if let Some((_, _, opus)) = lumen_core::quic::decode_audio_datagram(&d) { + a.fetch_add(1, Relaxed); + ab.fetch_add(opus.len() as u64, Relaxed); + } else if lumen_core::quic::decode_rumble_datagram(&d).is_some() { + r.fetch_add(1, Relaxed); + } + } + }); + } + let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port); let cfg = welcome.session_config(Role::Client); let expected = welcome.frames; let out_path = args.out.clone(); // Data plane on a blocking thread (native threads only on the frame path). - tokio::task::spawn_blocking(move || -> Result<()> { + let result = tokio::task::spawn_blocking(move || -> Result<()> { let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string()) .context("bind data plane")?; @@ -281,10 +370,28 @@ async fn session(args: Args) -> Result<()> { } Ok(()) }) - .await??; + .await?; + + // Report the side planes whether or not the video plane succeeded. + { + use std::sync::atomic::Ordering::Relaxed; + let (a, ab, r) = ( + audio_pkts.load(Relaxed), + audio_bytes.load(Relaxed), + rumble_pkts.load(Relaxed), + ); + if a > 0 || r > 0 { + tracing::info!( + audio_pkts = a, + audio_kb = ab / 1000, + rumble_pkts = r, + "host→client datagrams (Opus 48 kHz stereo, 5 ms frames)" + ); + } + } conn.close(0u32.into(), b"done"); - Ok(()) + result } /// The host's deterministic test frame (mirror of `lumen-host::m3::test_frame`). diff --git a/crates/lumen-core/Cargo.toml b/crates/lumen-core/Cargo.toml index fe1e8ca..dc2bb39 100644 --- a/crates/lumen-core/Cargo.toml +++ b/crates/lumen-core/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["lib", "cdylib", "staticlib"] default = [] # Control-plane QUIC (pairing, config, reverse audio). tokio is permitted ONLY here, # never on the per-frame hot path. Off by default so the core stays runtime-free. -quic = ["dep:quinn", "dep:tokio", "dep:rustls", "dep:rcgen"] +quic = ["dep:quinn", "dep:tokio", "dep:rustls", "dep:rcgen", "dep:rustls-pki-types", "dep:sha2"] [dependencies] reed-solomon-simd = "3.1" # GF(2^16) Leopard-RS, SIMD, O(n log n) — the wall-breaker (P2) @@ -39,6 +39,8 @@ zeroize = "1" quinn = { version = "0.11", optional = true } rustls = { version = "0.23", optional = true, default-features = false, features = ["ring", "std"] } rcgen = { version = "0.13", optional = true, default-features = false, features = ["aws_lc_rs"] } +rustls-pki-types = { version = "1", optional = true } +sha2 = { version = "0.10", optional = true } tokio = { version = "1", optional = true, features = ["rt-multi-thread", "net", "sync", "macros"] } [dev-dependencies] diff --git a/crates/lumen-core/cbindgen.toml b/crates/lumen-core/cbindgen.toml index 25a9c53..ea52896 100644 --- a/crates/lumen-core/cbindgen.toml +++ b/crates/lumen-core/cbindgen.toml @@ -15,6 +15,31 @@ parse_deps = false [export.rename] "InputEvent" = "LumenInputEvent" "InputKind" = "LumenInputKind" +# Gamepad wire constants: bare BTN_* names collide with (at +# DIFFERENT values — last definition silently wins); prefix everything we export. +"BTN_DPAD_UP" = "LUMEN_BTN_DPAD_UP" +"BTN_DPAD_DOWN" = "LUMEN_BTN_DPAD_DOWN" +"BTN_DPAD_LEFT" = "LUMEN_BTN_DPAD_LEFT" +"BTN_DPAD_RIGHT" = "LUMEN_BTN_DPAD_RIGHT" +"BTN_START" = "LUMEN_BTN_START" +"BTN_BACK" = "LUMEN_BTN_BACK" +"BTN_LS_CLICK" = "LUMEN_BTN_LS_CLICK" +"BTN_RS_CLICK" = "LUMEN_BTN_RS_CLICK" +"BTN_LB" = "LUMEN_BTN_LB" +"BTN_RB" = "LUMEN_BTN_RB" +"BTN_GUIDE" = "LUMEN_BTN_GUIDE" +"BTN_A" = "LUMEN_BTN_A" +"BTN_B" = "LUMEN_BTN_B" +"BTN_X" = "LUMEN_BTN_X" +"BTN_Y" = "LUMEN_BTN_Y" +"AXIS_LS_X" = "LUMEN_AXIS_LS_X" +"AXIS_LS_Y" = "LUMEN_AXIS_LS_Y" +"AXIS_RS_X" = "LUMEN_AXIS_RS_X" +"AXIS_RS_Y" = "LUMEN_AXIS_RS_Y" +"AXIS_LT" = "LUMEN_AXIS_LT" +"AXIS_RT" = "LUMEN_AXIS_RT" +"AUDIO_MAGIC" = "LUMEN_AUDIO_MAGIC" +"RUMBLE_MAGIC" = "LUMEN_RUMBLE_MAGIC" # QualifiedScreamingSnakeCase already qualifies each variant with the enum name # (LumenStatus::Ok -> LUMEN_STATUS_OK); do NOT also set prefix_with_name or it doubles. diff --git a/crates/lumen-core/src/abi.rs b/crates/lumen-core/src/abi.rs index 6d81dac..94fe555 100644 --- a/crates/lumen-core/src/abi.rs +++ b/crates/lumen-core/src/abi.rs @@ -450,18 +450,31 @@ pub unsafe extern "C" fn lumen_get_stats( /// Opaque handle to a live `lumen/1` connection (QUIC control plane + UDP data plane, all /// pumped on internal threads). +/// +/// Thread contract: each plane (video `next_au`, audio `next_audio`, rumble `next_rumble`) +/// may be pulled from its own thread, at most one thread per plane. The accessors only +/// take shared references internally (per-plane mutexed borrow slots), so cross-plane +/// concurrency is sound — never two threads on the *same* plane. #[cfg(feature = "quic")] pub struct LumenConnection { inner: crate::client::NativeClient, /// Backs the pointer returned by the last `lumen_connection_next_au` (borrow-until-next-call). - last: Option, + last: std::sync::Mutex>, + /// Same, for `lumen_connection_next_audio` (independent of the video slot). + last_audio: std::sync::Mutex>, } /// Connect to a `lumen/1` host and start a session at `width`x`height`@`refresh_hz`. /// Blocks up to `timeout_ms` for the handshake. Returns NULL on failure. /// +/// Trust: `pin_sha256` (NULL or 32 bytes) is the expected SHA-256 fingerprint of the host's +/// certificate — a mismatching host is rejected. NULL = trust on first use; persist the +/// fingerprint written to `observed_sha256_out` (NULL or 32 bytes, filled on success) and +/// pass it as the pin on every later connect. +/// /// # Safety -/// `host` is a NUL-terminated UTF-8 string (IP or hostname resolvable by the platform). +/// `host` is a NUL-terminated UTF-8 string (IP or hostname resolvable by the platform); +/// `pin_sha256`/`observed_sha256_out` are each NULL or valid for 32 bytes. #[cfg(feature = "quic")] #[no_mangle] pub unsafe extern "C" fn lumen_connect( @@ -470,6 +483,8 @@ pub unsafe extern "C" fn lumen_connect( width: u32, height: u32, refresh_hz: u32, + pin_sha256: *const u8, + observed_sha256_out: *mut u8, timeout_ms: u32, ) -> *mut LumenConnection { let r = std::panic::catch_unwind(AssertUnwindSafe(|| { @@ -485,16 +500,33 @@ pub unsafe extern "C" fn lumen_connect( height, refresh_hz, }; + let pin = if pin_sha256.is_null() { + None + } else { + let mut p = [0u8; 32]; + p.copy_from_slice(unsafe { std::slice::from_raw_parts(pin_sha256, 32) }); + Some(p) + }; match crate::client::NativeClient::connect( host, port, mode, + pin, std::time::Duration::from_millis(timeout_ms as u64), ) { - Ok(c) => Box::into_raw(Box::new(LumenConnection { - inner: c, - last: None, - })), + Ok(c) => { + if !observed_sha256_out.is_null() { + unsafe { + std::slice::from_raw_parts_mut(observed_sha256_out, 32) + .copy_from_slice(&c.host_fingerprint); + } + } + Box::into_raw(Box::new(LumenConnection { + inner: c, + last: std::sync::Mutex::new(None), + last_audio: std::sync::Mutex::new(None), + })) + } Err(_) => std::ptr::null_mut(), } })); @@ -503,10 +535,12 @@ pub unsafe extern "C" fn lumen_connect( /// Pull the next reassembled access unit, waiting up to `timeout_ms`. Returns /// [`LumenStatus::NoFrame`] on timeout and [`LumenStatus::Closed`] once the session ended. -/// On `Ok`, `*out` borrows connection memory **until the next call** on this handle. +/// On `Ok`, `*out` borrows connection memory **until the next `next_au` call** on this +/// handle (the audio/rumble planes do not invalidate it). /// /// # Safety -/// `c` is a valid connection handle used from a single thread; `out` is writable. +/// `c` is a valid connection handle; `out` is writable. At most one thread pulls video — +/// it may run concurrently with one audio-pulling and one rumble-pulling thread. #[cfg(feature = "quic")] #[no_mangle] pub unsafe extern "C" fn lumen_connection_next_au( @@ -515,7 +549,8 @@ pub unsafe extern "C" fn lumen_connection_next_au( timeout_ms: u32, ) -> LumenStatus { guard(|| { - let c = match unsafe { c.as_mut() } { + // Shared reference only: video and audio threads must never alias a `&mut`. + let c = match unsafe { c.as_ref() } { Some(c) => c, None => return LumenStatus::NullPointer, }; @@ -527,8 +562,9 @@ pub unsafe extern "C" fn lumen_connection_next_au( .next_frame(std::time::Duration::from_millis(timeout_ms as u64)) { Ok(frame) => { - c.last = Some(frame); - let f = c.last.as_ref().unwrap(); + let mut slot = c.last.lock().unwrap(); + *slot = Some(frame); + let f = slot.as_ref().unwrap(); unsafe { *out = LumenFrame { data: f.data.as_ptr(), @@ -545,6 +581,108 @@ pub unsafe extern "C" fn lumen_connection_next_au( }) } +/// One Opus audio packet pulled off a `lumen/1` connection (48 kHz stereo, 5 ms frames). +/// `data` borrows connection memory until the next `lumen_connection_next_audio` call. +#[cfg(feature = "quic")] +#[repr(C)] +pub struct LumenAudioPacket { + pub data: *const u8, + pub len: usize, + pub seq: u32, + pub pts_ns: u64, +} + +/// Pull the next Opus audio packet, waiting up to `timeout_ms`. Returns +/// [`LumenStatus::NoFrame`] on timeout and [`LumenStatus::Closed`] once the session ended. +/// On `Ok`, `out->data` borrows connection memory **until the next audio call** on this +/// handle (independent of the video slot). Drain from a dedicated audio thread — packets +/// arrive every 5 ms and the internal queue holds 320 ms. +/// +/// # Safety +/// `c` is a valid connection handle; `out` is writable. At most one thread pulls audio — +/// it may run concurrently with the video/rumble pullers. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn lumen_connection_next_audio( + c: *mut LumenConnection, + out: *mut LumenAudioPacket, + timeout_ms: u32, +) -> LumenStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return LumenStatus::NullPointer, + }; + if out.is_null() { + return LumenStatus::NullPointer; + } + match c + .inner + .next_audio(std::time::Duration::from_millis(timeout_ms as u64)) + { + Ok(pkt) => { + let mut slot = c.last_audio.lock().unwrap(); + *slot = Some(pkt); + let p = slot.as_ref().unwrap(); + unsafe { + *out = LumenAudioPacket { + data: p.data.as_ptr(), + len: p.data.len(), + seq: p.seq, + pts_ns: p.pts_ns, + }; + } + LumenStatus::Ok + } + Err(e) => e.status(), + } + }) +} + +/// Pull the next rumble (force-feedback) update, waiting up to `timeout_ms`. Amplitudes +/// are 0..0xFFFF (`low` = low-frequency motor, `high` = high-frequency), `(0, 0)` = stop. +/// Same timeout/closed semantics as [`lumen_connection_next_audio`]. +/// +/// # Safety +/// `c` is a valid connection handle; out pointers are writable (NULLs are skipped). At +/// most one thread pulls rumble — it may run concurrently with the video/audio pullers. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn lumen_connection_next_rumble( + c: *mut LumenConnection, + pad: *mut u16, + low: *mut u16, + high: *mut u16, + timeout_ms: u32, +) -> LumenStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return LumenStatus::NullPointer, + }; + match c + .inner + .next_rumble(std::time::Duration::from_millis(timeout_ms as u64)) + { + Ok((p, l, h)) => { + unsafe { + if !pad.is_null() { + *pad = p; + } + if !low.is_null() { + *low = l; + } + if !high.is_null() { + *high = h; + } + } + LumenStatus::Ok + } + Err(e) => e.status(), + } + }) +} + /// Send one input event to the host as a QUIC datagram (non-blocking enqueue). /// /// # Safety diff --git a/crates/lumen-core/src/client.rs b/crates/lumen-core/src/client.rs index 66fb9f5..3997986 100644 --- a/crates/lumen-core/src/client.rs +++ b/crates/lumen-core/src/client.rs @@ -27,22 +27,57 @@ use std::time::Duration; /// (display freshness over completeness — FEC/keyframes recover). const FRAME_QUEUE: usize = 16; +/// Audio packets buffered for the embedder: 64 × 5 ms = 320 ms of slack. A lagging +/// embedder drops the newest packet (the audio renderer conceals the gap). +const AUDIO_QUEUE: usize = 64; + +/// Rumble updates buffered for the embedder. Overflow drops the NEWEST update (same +/// `try_send` discipline as the other planes) — the host re-sends rumble state +/// periodically, so a dropped transition (including a stop) heals within ~500 ms. +const RUMBLE_QUEUE: usize = 16; + +/// One Opus packet from the host's audio datagram stream (48 kHz stereo, 5 ms frames). +#[derive(Clone, Debug)] +pub struct AudioPacket { + pub seq: u32, + pub pts_ns: u64, + /// The raw Opus payload — feed it to an Opus decoder as one frame. + pub data: Vec, +} + pub struct NativeClient { frames: Receiver, + audio: Receiver, + rumble: Receiver<(u16, u16, u16)>, input_tx: tokio::sync::mpsc::UnboundedSender, shutdown: Arc, worker: Option>, /// The host-confirmed session mode (from the Welcome). pub mode: Mode, + /// SHA-256 fingerprint of the certificate the host actually presented. A TOFU caller + /// (`pin = None`) persists this and passes it as the pin from then on. + pub host_fingerprint: [u8; 32], } impl NativeClient { /// Connect to a `lumen/1` host and start the session at (up to) `mode`. Blocks until the /// handshake completes or `timeout` elapses. - pub fn connect(host: &str, port: u16, mode: Mode, timeout: Duration) -> Result { + /// + /// `pin`: expected SHA-256 of the host's certificate. `Some` and the host presents + /// anything else → the handshake is rejected ([`LumenError::Crypto`]). `None` = trust on + /// first use; check [`NativeClient::host_fingerprint`] after connecting. + pub fn connect( + host: &str, + port: u16, + mode: Mode, + pin: Option<[u8; 32]>, + timeout: Duration, + ) -> Result { let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::(FRAME_QUEUE); + let (audio_tx, audio_rx) = std::sync::mpsc::sync_channel::(AUDIO_QUEUE); + let (rumble_tx, rumble_rx) = std::sync::mpsc::sync_channel::<(u16, u16, u16)>(RUMBLE_QUEUE); let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); + let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); let shutdown = Arc::new(AtomicBool::new(false)); let host = host.to_string(); @@ -61,14 +96,23 @@ impl NativeClient { return; } }; - rt.block_on(worker_main( - host, port, mode, frame_tx, input_rx, ready_tx, shutdown_w, - )); + rt.block_on(worker_main(WorkerArgs { + host, + port, + mode, + pin, + frame_tx, + audio_tx, + rumble_tx, + input_rx, + ready_tx, + shutdown: shutdown_w, + })); }) .map_err(LumenError::Io)?; - let negotiated = match ready_rx.recv_timeout(timeout) { - Ok(Ok(m)) => m, + let (negotiated, fingerprint) = match ready_rx.recv_timeout(timeout) { + Ok(Ok(t)) => t, Ok(Err(e)) => return Err(e), Err(_) => { shutdown.store(true, Ordering::SeqCst); @@ -77,16 +121,24 @@ impl NativeClient { }; Ok(NativeClient { frames: frame_rx, + audio: audio_rx, + rumble: rumble_rx, input_tx, shutdown, worker: Some(worker), mode: negotiated, + host_fingerprint: fingerprint, }) } /// Pull the next reassembled, FEC-recovered access unit; [`LumenError::NoFrame`] on /// timeout, [`LumenError::Closed`]-class errors once the session ended. - pub fn next_frame(&mut self, timeout: Duration) -> Result { + /// + /// Plane concurrency: each pull method drains its own queue, so video, audio and + /// rumble may each be pulled from their own thread — but at most one thread per plane + /// (`&self` here supports the cross-plane sharing; a plane's queue is still + /// single-consumer by contract). + pub fn next_frame(&self, timeout: Duration) -> Result { match self.frames.recv_timeout(timeout) { Ok(f) => Ok(f), Err(RecvTimeoutError::Timeout) => Err(LumenError::NoFrame), @@ -94,6 +146,27 @@ impl NativeClient { } } + /// Pull the next Opus audio packet; [`LumenError::NoFrame`] on timeout, + /// [`LumenError::Closed`] once the session ended. Drain on a dedicated audio thread — + /// packets arrive every 5 ms. + pub fn next_audio(&self, timeout: Duration) -> Result { + match self.audio.recv_timeout(timeout) { + Ok(p) => Ok(p), + Err(RecvTimeoutError::Timeout) => Err(LumenError::NoFrame), + Err(RecvTimeoutError::Disconnected) => Err(LumenError::Closed), + } + } + + /// Pull the next rumble update `(pad, low, high)`; same semantics as + /// [`NativeClient::next_audio`]. Amplitudes are 0..0xFFFF, `(0, 0)` = stop. + pub fn next_rumble(&self, timeout: Duration) -> Result<(u16, u16, u16)> { + match self.rumble.recv_timeout(timeout) { + Ok(r) => Ok(r), + Err(RecvTimeoutError::Timeout) => Err(LumenError::NoFrame), + Err(RecvTimeoutError::Disconnected) => Err(LumenError::Closed), + } + } + /// Queue one input event for delivery as a QUIC datagram. pub fn send_input(&self, ev: &InputEvent) -> Result<()> { self.input_tx.send(*ev).map_err(|_| LumenError::Closed) @@ -109,27 +182,55 @@ impl Drop for NativeClient { } } -/// The worker: QUIC handshake, then the input task + the blocking data-plane pump. -async fn worker_main( +struct WorkerArgs { host: String, port: u16, mode: Mode, + pin: Option<[u8; 32]>, frame_tx: SyncSender, - mut input_rx: tokio::sync::mpsc::UnboundedReceiver, - ready_tx: std::sync::mpsc::Sender>, + audio_tx: SyncSender, + rumble_tx: SyncSender<(u16, u16, u16)>, + input_rx: tokio::sync::mpsc::UnboundedReceiver, + ready_tx: std::sync::mpsc::Sender>, shutdown: Arc, -) { +} + +/// The worker: QUIC handshake, then the input/datagram tasks + the blocking data-plane pump. +async fn worker_main(args: WorkerArgs) { + let WorkerArgs { + host, + port, + mode, + pin, + frame_tx, + audio_tx, + rumble_tx, + mut input_rx, + ready_tx, + shutdown, + } = args; let setup = async { let remote: std::net::SocketAddr = format!("{host}:{port}") .parse() .map_err(|_| LumenError::InvalidArg("host:port"))?; - let ep = endpoint::client_insecure() - .map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?; + let (ep, observed) = endpoint::client_pinned(pin); + let ep = ep.map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?; let conn = ep .connect(remote, "lumen") .map_err(|_| LumenError::InvalidArg("connect"))? .await - .map_err(|e| LumenError::Io(std::io::Error::other(e.to_string())))?; + .map_err(|e| { + // A pin mismatch surfaces as a TLS failure; report it as a crypto error so + // the embedder can distinguish "wrong host identity" from plain IO trouble. + let fp_mismatch = pin.is_some() + && observed.lock().unwrap().map(|fp| Some(fp) != pin) == Some(true); + if fp_mismatch { + LumenError::Crypto + } else { + LumenError::Io(std::io::Error::other(e.to_string())) + } + })?; + let fingerprint = observed.lock().unwrap().unwrap_or([0u8; 32]); let (mut send, mut recv) = conn .open_bi() .await @@ -163,17 +264,17 @@ async fn worker_main( let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())?; let session = Session::new(welcome.session_config(Role::Client), Box::new(transport))?; - Ok::<_, LumenError>((conn, session, welcome.mode)) + Ok::<_, LumenError>((conn, session, welcome.mode, fingerprint)) }; - let (conn, mut session, negotiated) = match setup.await { + let (conn, mut session, negotiated, fingerprint) = match setup.await { Ok(t) => t, Err(e) => { let _ = ready_tx.send(Err(e)); return; } }; - let _ = ready_tx.send(Ok(negotiated)); + let _ = ready_tx.send(Ok((negotiated, fingerprint))); // Input task: embedder events → QUIC datagrams. let input_conn = conn.clone(); @@ -183,6 +284,31 @@ async fn worker_main( } }); + // Datagram demux: host → client audio/rumble (try_send: a lagging embedder drops the + // newest packet rather than backing up the QUIC receive path). + let dgram_conn = conn.clone(); + tokio::spawn(async move { + while let Ok(d) = dgram_conn.read_datagram().await { + match d.first() { + Some(&crate::quic::AUDIO_MAGIC) => { + if let Some((seq, pts_ns, opus)) = crate::quic::decode_audio_datagram(&d) { + let _ = audio_tx.try_send(AudioPacket { + seq, + pts_ns, + data: opus.to_vec(), + }); + } + } + Some(&crate::quic::RUMBLE_MAGIC) => { + if let Some(r) = crate::quic::decode_rumble_datagram(&d) { + let _ = rumble_tx.try_send(r); + } + } + _ => {} // unknown tag — a newer host; ignore + } + } + }); + // Watch for connection close → stop the pump. { let shutdown = shutdown.clone(); diff --git a/crates/lumen-core/src/input.rs b/crates/lumen-core/src/input.rs index 186e0f5..d1e7434 100644 --- a/crates/lumen-core/src/input.rs +++ b/crates/lumen-core/src/input.rs @@ -23,11 +23,49 @@ pub enum InputKind { MouseButtonUp = 5, /// `x` carries the (signed) scroll delta. MouseScroll = 6, + /// `code` = button bit ([`gamepad`] `BTN_*`), `x` ≠ 0 = pressed, `flags` = pad index. GamepadButton = 7, - /// `code` = axis id, `x` = axis value. + /// `code` = axis id ([`gamepad`] `AXIS_*`), `x` = axis value, `flags` = pad index. + /// Sticks are i16 range (−32768..32767) in the XInput/Moonlight convention — **+y = + /// up** (unlike mouse coordinates); triggers 0..255. GamepadAxis = 8, } +/// The gamepad wire contract for [`InputKind::GamepadButton`]/[`InputKind::GamepadAxis`]. +/// +/// Everything follows the GameStream/XInput conventions end to end: buttons reuse +/// GameStream's `buttonFlags` bit positions, sticks are −32768..32767 with **+y = up**, +/// triggers 0..255 (what Moonlight sends and what the host's virtual xpad already +/// consumes). One event carries one transition: `code` = the bit below, `x` = 1 pressed / +/// 0 released. Axes are sent individually; the host accumulates per-pad state and emits +/// one evdev SYN per event. +pub mod gamepad { + pub const BTN_DPAD_UP: u32 = 0x0001; + pub const BTN_DPAD_DOWN: u32 = 0x0002; + pub const BTN_DPAD_LEFT: u32 = 0x0004; + pub const BTN_DPAD_RIGHT: u32 = 0x0008; + pub const BTN_START: u32 = 0x0010; + pub const BTN_BACK: u32 = 0x0020; + pub const BTN_LS_CLICK: u32 = 0x0040; + pub const BTN_RS_CLICK: u32 = 0x0080; + pub const BTN_LB: u32 = 0x0100; + pub const BTN_RB: u32 = 0x0200; + pub const BTN_GUIDE: u32 = 0x0400; + pub const BTN_A: u32 = 0x1000; + pub const BTN_B: u32 = 0x2000; + pub const BTN_X: u32 = 0x4000; + pub const BTN_Y: u32 = 0x8000; + + /// Axis ids for `InputKind::GamepadAxis`. + pub const AXIS_LS_X: u32 = 0; + pub const AXIS_LS_Y: u32 = 1; + pub const AXIS_RS_X: u32 = 2; + pub const AXIS_RS_Y: u32 = 3; + /// Triggers: value range 0..255. + pub const AXIS_LT: u32 = 4; + pub const AXIS_RT: u32 = 5; +} + impl InputKind { pub fn from_u8(v: u8) -> Option { use InputKind::*; diff --git a/crates/lumen-core/src/quic.rs b/crates/lumen-core/src/quic.rs index dde35f1..e3df367 100644 --- a/crates/lumen-core/src/quic.rs +++ b/crates/lumen-core/src/quic.rs @@ -16,9 +16,11 @@ //! Leopard, which GameStream can't express), shard sizing, crypto key/salt — so the data //! plane is exactly the hardened M1 `Session`. //! -//! Seed-stage transport security: the host presents a self-signed certificate and the client -//! accepts any (pairing/pinning lands with the trust model; the data plane's AES-GCM is -//! already real). All integers little-endian; every message is `u16 length || payload`. +//! Transport security: the host presents a long-lived self-signed certificate +//! ([`endpoint::server_with_identity`]) and the client pins its SHA-256 fingerprint +//! ([`endpoint::client_pinned`]; no pin = trust-on-first-use, with the observed fingerprint +//! reported back for persisting). The data plane adds AES-GCM on top. +//! All integers little-endian; every message is `u16 length || payload`. use crate::config::{Config, FecConfig, FecScheme, Mode, ProtocolPhase, Role}; use crate::error::{LumenError, Result}; @@ -183,6 +185,53 @@ pub fn frame(payload: &[u8]) -> Vec { b } +/// Datagram wire tags. Video rides UDP; everything low-rate rides QUIC datagrams, +/// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8), +/// audio = [`AUDIO_MAGIC`], rumble = [`RUMBLE_MAGIC`]. +pub const AUDIO_MAGIC: u8 = 0xC9; +pub const RUMBLE_MAGIC: u8 = 0xCA; + +/// Audio datagram, host → client: `[0xC9][u32 seq LE][u64 pts_ns LE][opus payload]`. +/// One Opus frame per datagram (5 ms — well under any MTU); QUIC already encrypts. +pub fn encode_audio_datagram(seq: u32, pts_ns: u64, opus: &[u8]) -> Vec { + let mut b = Vec::with_capacity(13 + opus.len()); + b.push(AUDIO_MAGIC); + b.extend_from_slice(&seq.to_le_bytes()); + b.extend_from_slice(&pts_ns.to_le_bytes()); + b.extend_from_slice(opus); + b +} + +/// Parse an audio datagram → `(seq, pts_ns, opus payload)`. `None` on bad tag/length. +pub fn decode_audio_datagram(b: &[u8]) -> Option<(u32, u64, &[u8])> { + if b.len() < 13 || b[0] != AUDIO_MAGIC { + return None; + } + let seq = u32::from_le_bytes(b[1..5].try_into().unwrap()); + let pts_ns = u64::from_le_bytes(b[5..13].try_into().unwrap()); + Some((seq, pts_ns, &b[13..])) +} + +/// Rumble datagram, host → client: `[0xCA][u16 pad LE][u16 low LE][u16 high LE]`. +/// Force-feedback state for pad `pad` (0xFFFF amplitudes, 0/0 = stop). +pub fn encode_rumble_datagram(pad: u16, low: u16, high: u16) -> [u8; 7] { + let mut b = [0u8; 7]; + b[0] = RUMBLE_MAGIC; + b[1..3].copy_from_slice(&pad.to_le_bytes()); + b[3..5].copy_from_slice(&low.to_le_bytes()); + b[5..7].copy_from_slice(&high.to_le_bytes()); + b +} + +/// Parse a rumble datagram → `(pad, low, high)`. `None` on bad tag/length. +pub fn decode_rumble_datagram(b: &[u8]) -> Option<(u16, u16, u16)> { + if b.len() < 7 || b[0] != RUMBLE_MAGIC { + return None; + } + let u16at = |o: usize| u16::from_le_bytes([b[o], b[o + 1]]); + Some((u16at(1), u16at(3), u16at(5))) +} + /// Async framed-message IO over a quinn stream (`u16 LE length || payload`). pub mod io { /// Read one framed message (bounded at 64 KiB — control messages are tiny). @@ -207,35 +256,99 @@ pub mod io { } } -/// quinn endpoint constructors (host self-signed; client accepts-any — seed-stage trust). +/// quinn endpoint constructors. Host: self-signed identity (fresh, or persisted PEMs via +/// [`endpoint::server_with_identity`]). Client: fingerprint pinning / TOFU via +/// [`endpoint::client_pinned`] ([`endpoint::client_insecure`] is the no-pin special case). pub mod endpoint { - use std::sync::Arc; + use std::sync::{Arc, Mutex}; - /// Server endpoint with a fresh self-signed certificate. + /// Server endpoint with a fresh self-signed certificate (tests/dev — production hosts + /// persist an identity and use [`server_with_identity`] so clients can pin it). pub fn server(addr: std::net::SocketAddr) -> anyhow_result::Result { let cert = rcgen::generate_simple_self_signed(vec!["lumen".into()]) .map_err(|e| anyhow_result::Error::msg(format!("self-signed cert: {e}")))?; let cert_der = rustls::pki_types::CertificateDer::from(cert.cert); let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); - let server_config = - quinn::ServerConfig::with_single_cert(vec![cert_der], key_der.into()) - .map_err(|e| anyhow_result::Error::msg(format!("server config: {e}")))?; + server_from_der(cert_der, key_der.into(), addr) + } + + /// Server endpoint from a persisted PEM identity (certificate + PKCS#8 private key) — + /// the host's long-lived self-signed cert, so the fingerprint clients pin is stable + /// across restarts. + pub fn server_with_identity( + addr: std::net::SocketAddr, + cert_pem: &str, + key_pem: &str, + ) -> anyhow_result::Result { + use rustls::pki_types::pem::PemObject; + let cert_der = rustls::pki_types::CertificateDer::from_pem_slice(cert_pem.as_bytes()) + .map_err(|e| anyhow_result::Error::msg(format!("cert pem: {e}")))?; + let key_der = rustls::pki_types::PrivateKeyDer::from_pem_slice(key_pem.as_bytes()) + .map_err(|e| anyhow_result::Error::msg(format!("key pem: {e}")))?; + server_from_der(cert_der, key_der, addr) + } + + fn server_from_der( + cert_der: rustls::pki_types::CertificateDer<'static>, + key_der: rustls::pki_types::PrivateKeyDer<'static>, + addr: std::net::SocketAddr, + ) -> anyhow_result::Result { + let server_config = quinn::ServerConfig::with_single_cert(vec![cert_der], key_der) + .map_err(|e| anyhow_result::Error::msg(format!("server config: {e}")))?; Ok(quinn::Endpoint::server(server_config, addr)?) } - /// Client endpoint that skips certificate verification (seed stage; pinning lands with - /// the pairing/trust model). + /// SHA-256 of a certificate's DER encoding — the fingerprint clients pin. + pub fn cert_fingerprint(cert_der: &[u8]) -> [u8; 32] { + use sha2::Digest; + sha2::Sha256::digest(cert_der).into() + } + + /// Fingerprint of a PEM-encoded certificate (what a host logs/shows for pairing UX — + /// must match what the client's verifier computes from the DER on the wire). + pub fn fingerprint_of_pem(cert_pem: &str) -> anyhow_result::Result<[u8; 32]> { + use rustls::pki_types::pem::PemObject; + let der = rustls::pki_types::CertificateDer::from_pem_slice(cert_pem.as_bytes()) + .map_err(|e| anyhow_result::Error::msg(format!("cert pem: {e}")))?; + Ok(cert_fingerprint(der.as_ref())) + } + + /// Client endpoint that skips certificate verification (TOFU bootstrap — read the + /// observed fingerprint off the slot and pin it on the next connect). pub fn client_insecure() -> anyhow_result::Result { - let _ = rustls::crypto::ring::default_provider().install_default(); - let rustls_cfg = rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(SkipVerify)) - .with_no_client_auth(); - let quic_cfg = quinn::crypto::rustls::QuicClientConfig::try_from(rustls_cfg) - .map_err(|e| anyhow_result::Error::msg(format!("quic client config: {e}")))?; - let mut ep = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?; - ep.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_cfg))); - Ok(ep) + client_pinned(None).0 + } + + /// What [`client_pinned`] returns: the endpoint plus the slot the verifier writes the + /// observed host fingerprint into during the handshake. + pub type PinnedClient = ( + anyhow_result::Result, + Arc>>, + ); + + /// Client endpoint that verifies the host by certificate fingerprint. + /// + /// `pin = Some(sha256)` rejects any host whose leaf cert doesn't hash to `sha256`; + /// `None` accepts any (trust-on-first-use). Either way the observed fingerprint is + /// written to the returned slot during the handshake, so a TOFU caller can persist it. + pub fn client_pinned(pin: Option<[u8; 32]>) -> PinnedClient { + let observed = Arc::new(Mutex::new(None)); + let ep = (|| { + let _ = rustls::crypto::ring::default_provider().install_default(); + let rustls_cfg = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(PinVerify { + pin, + observed: observed.clone(), + })) + .with_no_client_auth(); + let quic_cfg = quinn::crypto::rustls::QuicClientConfig::try_from(rustls_cfg) + .map_err(|e| anyhow_result::Error::msg(format!("quic client config: {e}")))?; + let mut ep = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?; + ep.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_cfg))); + Ok(ep) + })(); + (ep, observed) } /// Minimal error plumbing without pulling anyhow into lumen-core's public API. @@ -261,40 +374,69 @@ pub mod endpoint { } } + /// Fingerprint-pinning verifier: trust is the SHA-256 of the host's (self-signed) leaf + /// cert, not a CA chain. With no pin it accepts any cert (TOFU) but still records what + /// it saw, so the embedder can persist the fingerprint and pin it from then on. #[derive(Debug)] - struct SkipVerify; + struct PinVerify { + pin: Option<[u8; 32]>, + observed: Arc>>, + } - impl rustls::client::danger::ServerCertVerifier for SkipVerify { + impl rustls::client::danger::ServerCertVerifier for PinVerify { fn verify_server_cert( &self, - _end_entity: &rustls::pki_types::CertificateDer<'_>, + end_entity: &rustls::pki_types::CertificateDer<'_>, _intermediates: &[rustls::pki_types::CertificateDer<'_>], _server_name: &rustls::pki_types::ServerName<'_>, _ocsp: &[u8], _now: rustls::pki_types::UnixTime, ) -> std::result::Result { + let fp = cert_fingerprint(end_entity.as_ref()); + *self.observed.lock().unwrap() = Some(fp); + if let Some(expected) = self.pin { + if fp != expected { + return Err(rustls::Error::InvalidCertificate( + rustls::CertificateError::ApplicationVerificationFailure, + )); + } + } Ok(rustls::client::danger::ServerCertVerified::assertion()) } + // The handshake signatures MUST be verified for real even though we pin the cert: + // CertificateVerify is what proves the peer *holds the pinned cert's private key* — + // skip it and an active MITM can replay the host's (public) certificate, match the + // pin, and complete the handshake with its own key. fn verify_tls12_signature( &self, - _message: &[u8], - _cert: &rustls::pki_types::CertificateDer<'_>, - _dss: &rustls::DigitallySignedStruct, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, ) -> std::result::Result { - Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) } fn verify_tls13_signature( &self, - _message: &[u8], - _cert: &rustls::pki_types::CertificateDer<'_>, - _dss: &rustls::DigitallySignedStruct, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, ) -> std::result::Result { - Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) } fn supported_verify_schemes(&self) -> Vec { @@ -349,4 +491,37 @@ mod tests { }; assert_eq!(Start::decode(&s.encode()).unwrap(), s); } + + #[test] + fn audio_datagram_roundtrip() { + let opus = [0x42u8; 97]; + let d = encode_audio_datagram(7, 1_000_000_123, &opus); + assert_eq!(d[0], AUDIO_MAGIC); + let (seq, pts, payload) = decode_audio_datagram(&d).unwrap(); + assert_eq!((seq, pts), (7, 1_000_000_123)); + assert_eq!(payload, opus); + assert!(decode_audio_datagram(&d[..12]).is_none()); // truncated header + assert!(decode_audio_datagram(&[0u8; 13]).is_none()); // bad magic + + // Empty payload is legal (DTX) — header-only datagram. + let header_only = encode_audio_datagram(0, 0, &[]); + let (_, _, empty) = decode_audio_datagram(&header_only).unwrap(); + assert!(empty.is_empty()); + } + + #[test] + fn rumble_datagram_roundtrip() { + let d = encode_rumble_datagram(1, 0x1234, 0xFFFF); + assert_eq!(d[0], RUMBLE_MAGIC); + assert_eq!(decode_rumble_datagram(&d), Some((1, 0x1234, 0xFFFF))); + assert!(decode_rumble_datagram(&d[..6]).is_none()); + } + + #[test] + fn fingerprint_is_sha256_of_der() { + // Stable across calls, distinct for distinct certs. + let a = endpoint::cert_fingerprint(b"cert-a"); + assert_eq!(a, endpoint::cert_fingerprint(b"cert-a")); + assert_ne!(a, endpoint::cert_fingerprint(b"cert-b")); + } } diff --git a/crates/lumen-host/src/m3.rs b/crates/lumen-host/src/m3.rs index f6f34b1..252e1a4 100644 --- a/crates/lumen-host/src/m3.rs +++ b/crates/lumen-host/src/m3.rs @@ -10,12 +10,21 @@ //! capture→encode→FEC→UDP→reassemble latency per frame. //! //! `lumen-host m3-host [--port 9777] [--source synthetic|virtual] [--seconds 30] -//! [--frames 300]` serves one session; `lumen-client-rs --connect host:9777` is the -//! counterpart. The data plane runs on native threads (no async on the frame path). +//! [--frames 300]` serves sessions back to back (one at a time — the virtual output and +//! encoder are single-tenant); `lumen-client-rs --connect host:9777` is the counterpart. +//! The data plane runs on native threads (no async on the frame path). +//! +//! Alongside video + input, a session carries **audio** (desktop Opus, 5 ms frames, host → +//! client QUIC datagrams tagged [`lumen_core::quic::AUDIO_MAGIC`]) and **gamepads** (client +//! GamepadButton/GamepadAxis datagrams accumulated into per-pad state for the virtual xpad; +//! force feedback flows back as [`lumen_core::quic::RUMBLE_MAGIC`] datagrams). +//! +//! Trust: the host serves with its persistent identity (`~/.config/lumen/cert.pem`, shared +//! with GameStream pairing) and logs the SHA-256 fingerprint clients pin. use anyhow::{anyhow, Context, Result}; use lumen_core::config::{FecConfig, FecScheme, Role}; -use lumen_core::input::InputEvent; +use lumen_core::input::{InputEvent, InputKind}; use lumen_core::packet::{FLAG_PIC, FLAG_SOF}; use lumen_core::quic::{endpoint, io, Hello, Start, Welcome}; use lumen_core::transport::UdpTransport; @@ -39,6 +48,8 @@ pub struct M3Options { pub seconds: u32, /// Synthetic-source frame count. pub frames: u32, + /// Exit after this many sessions (0 = serve forever). + pub max_sessions: u32, } /// Deterministic test frame: `u32 LE index` then `data[i] = idx + i` (wrapping). @@ -64,78 +75,157 @@ pub fn run(opts: M3Options) -> Result<()> { .enable_all() .build() .context("tokio runtime")?; - rt.block_on(serve_one(opts)) + rt.block_on(serve(opts)) } -async fn serve_one(opts: M3Options) -> Result<()> { - let ep = endpoint::server(([0, 0, 0, 0], opts.port).into()) - .map_err(|e| anyhow!("QUIC server endpoint: {e}"))?; - tracing::info!(port = opts.port, source = ?opts.source, "lumen/1 host listening (QUIC)"); +fn fingerprint_hex(fp: &[u8; 32]) -> String { + fp.iter().map(|b| format!("{b:02x}")).collect() +} - let incoming = ep - .accept() - .await - .ok_or_else(|| anyhow!("endpoint closed"))?; - let conn = incoming.await.context("QUIC accept")?; - let peer = conn.remote_address(); - tracing::info!(%peer, "lumen/1 client connected"); - let (mut send, mut recv) = conn.accept_bi().await.context("accept control stream")?; - - let hello = Hello::decode(&io::read_msg(&mut recv).await?) - .map_err(|e| anyhow!("Hello decode: {e:?}"))?; - anyhow::ensure!( - hello.abi_version == lumen_core::ABI_VERSION, - "ABI mismatch: client {} host {}", - hello.abi_version, - lumen_core::ABI_VERSION - ); - crate::encode::validate_dimensions( - crate::encode::Codec::H265, - hello.mode.width, - hello.mode.height, +/// The persistent listener: accept clients back to back on one endpoint. Sessions are +/// served one at a time (the virtual output + NVENC are single-tenant); a client that +/// connects mid-session waits in the accept queue. A failed session logs and the loop +/// keeps serving — only endpoint-level failures are fatal. +async fn serve(opts: M3Options) -> Result<()> { + let identity = crate::gamestream::cert::ServerIdentity::load_or_create() + .context("load host identity (~/.config/lumen)")?; + let fingerprint = endpoint::fingerprint_of_pem(&identity.cert_pem) + .map_err(|e| anyhow!("cert fingerprint: {e}"))?; + let ep = endpoint::server_with_identity( + ([0, 0, 0, 0], opts.port).into(), + &identity.cert_pem, + &identity.key_pem, ) - .context("client-requested mode")?; + .map_err(|e| anyhow!("QUIC server endpoint: {e}"))?; + tracing::info!( + port = opts.port, + source = ?opts.source, + fingerprint = %fingerprint_hex(&fingerprint), + "lumen/1 host listening (QUIC) — clients pin this fingerprint" + ); - // Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport). - let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; - let udp_port = probe.local_addr()?.port(); - drop(probe); + // One audio capturer for the whole host lifetime, handed from session to session + // (PipeWire streams have no cheap teardown — see AudioCapSlot). + let audio_cap: AudioCapSlot = Arc::new(std::sync::Mutex::new(None)); - let mut key = [0u8; 16]; - rand::thread_rng().fill_bytes(&mut key); - let welcome = Welcome { - abi_version: lumen_core::ABI_VERSION, - udp_port, - mode: hello.mode, - // The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption. - fec: FecConfig { - scheme: FecScheme::Gf16, - fec_percent: 20, - max_data_per_block: 4096, - }, - shard_payload: 1200, - encrypt: true, - key, - salt: *b"lmn1", - frames: match opts.source { - M3Source::Synthetic => opts.frames, - M3Source::Virtual => 0, // unbounded — client streams until we close - }, + let mut served = 0u32; + loop { + let incoming = ep + .accept() + .await + .ok_or_else(|| anyhow!("endpoint closed"))?; + let conn = match incoming.await { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %e, "QUIC accept failed"); + continue; + } + }; + let peer = conn.remote_address(); + tracing::info!(%peer, "lumen/1 client connected"); + if let Err(e) = serve_session(conn, &opts, &audio_cap).await { + tracing::warn!(%peer, error = %format!("{e:#}"), "session ended with error"); + } else { + tracing::info!(%peer, "session complete"); + } + served += 1; + if opts.max_sessions != 0 && served >= opts.max_sessions { + break; + } + tracing::info!("ready for the next client"); + } + ep.wait_idle().await; + Ok(()) +} + +/// The accept loop is sequential, so the control phase must be bounded — a client that +/// connects and never finishes the handshake would otherwise wedge the host for everyone. +const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +/// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream +/// path): `PwAudioCapturer` has no teardown — dropping one per session would leak its +/// PipeWire thread + core connection + live capture node on the daemon every session. +type AudioCapSlot = Arc>>>; + +/// One client session: handshake → input/audio planes → data plane until done/disconnect. +/// Everything torn down on return (RAII: virtual output, encoder, threads via channel close). +async fn serve_session( + conn: quinn::Connection, + opts: &M3Options, + audio_cap: &AudioCapSlot, +) -> Result<()> { + let peer = conn.remote_address(); + + let source = opts.source; + let frames = opts.frames; + let handshake = async { + let (mut send, mut recv) = conn.accept_bi().await.context("accept control stream")?; + + let hello = Hello::decode(&io::read_msg(&mut recv).await?) + .map_err(|e| anyhow!("Hello decode: {e:?}"))?; + anyhow::ensure!( + hello.abi_version == lumen_core::ABI_VERSION, + "ABI mismatch: client {} host {}", + hello.abi_version, + lumen_core::ABI_VERSION + ); + crate::encode::validate_dimensions( + crate::encode::Codec::H265, + hello.mode.width, + hello.mode.height, + ) + .context("client-requested mode")?; + + // Reserve a UDP port for the data plane (bind, read it back, rebind in UdpTransport). + let probe = std::net::UdpSocket::bind("0.0.0.0:0")?; + let udp_port = probe.local_addr()?.port(); + drop(probe); + + let mut key = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut key); + let welcome = Welcome { + abi_version: lumen_core::ABI_VERSION, + udp_port, + mode: hello.mode, + // The post-GameStream point of lumen/1: Leopard GF(2¹⁶) FEC + real encryption. + fec: FecConfig { + scheme: FecScheme::Gf16, + fec_percent: 20, + max_data_per_block: 4096, + }, + shard_payload: 1200, + encrypt: true, + key, + salt: *b"lmn1", + frames: match source { + M3Source::Synthetic => frames, + M3Source::Virtual => 0, // unbounded — client streams until we close + }, + }; + io::write_msg(&mut send, &welcome.encode()).await?; + + let start = Start::decode(&io::read_msg(&mut recv).await?) + .map_err(|e| anyhow!("Start decode: {e:?}"))?; + Ok::<_, anyhow::Error>((hello, welcome, udp_port, start)) }; - io::write_msg(&mut send, &welcome.encode()).await?; - - let start = Start::decode(&io::read_msg(&mut recv).await?) - .map_err(|e| anyhow!("Start decode: {e:?}"))?; + let (hello, welcome, udp_port, start) = tokio::time::timeout(HANDSHAKE_TIMEOUT, handshake) + .await + .map_err(|_| anyhow!("handshake timed out after {HANDSHAKE_TIMEOUT:?}"))??; let client_udp = std::net::SocketAddr::new(peer.ip(), start.client_udp_port); tracing::info!(%client_udp, udp_port, mode = ?hello.mode, "handshake complete — streaming"); // Input plane: QUIC datagrams → channel → a native injector thread (the injector owns - // non-Send compositor state, so it lives on its own thread). + // non-Send compositor state, so it lives on its own thread). The thread also owns the + // session's virtual gamepads and sends force feedback back over `conn`. It exits when + // the channel closes (datagram task ends on disconnect) — fresh state per session. let (input_tx, input_rx) = std::sync::mpsc::channel::(); - std::thread::Builder::new() - .name("lumen-m3-input".into()) - .spawn(move || input_thread(input_rx)) - .context("spawn input thread")?; + let input_handle = { + let conn = conn.clone(); + std::thread::Builder::new() + .name("lumen-m3-input".into()) + .spawn(move || input_thread(input_rx, conn)) + .context("spawn input thread")? + }; let input_conn = conn.clone(); tokio::spawn(async move { let mut count = 0u64; @@ -161,53 +251,271 @@ async fn serve_one(opts: M3Options) -> Result<()> { }); } + // Audio plane (virtual source only — synthetic runs are protocol tests): desktop Opus + // → host→client QUIC datagrams, on its own native thread. Best-effort on every failure + // (no PipeWire audio, spawn error): the session continues without audio — and a spawn + // error must NOT early-return here, the threads above are already running. + let audio_handle = if opts.source == M3Source::Virtual { + let conn = conn.clone(); + let stop = stop.clone(); + let cap = audio_cap.clone(); + std::thread::Builder::new() + .name("lumen-m3-audio".into()) + .spawn(move || audio_thread(conn, stop, cap)) + .map_err(|e| tracing::error!(error = %e, "audio thread spawn failed — session continues without audio")) + .ok() + } else { + None + }; + // Data plane on a native thread (no async on the hot path — design invariant). let cfg = welcome.session_config(Role::Host); let source = opts.source; let (seconds, frames) = (opts.seconds, opts.frames); let mode = hello.mode; let stop_stream = stop.clone(); - tokio::task::spawn_blocking(move || -> Result<()> { - let transport = - UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string()) - .context("bind data plane")?; - let mut session = - Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("host session: {e:?}"))?; - match source { - M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream), - M3Source::Virtual => virtual_stream(&mut session, mode, seconds, &stop_stream), - } - }) - .await - .context("stream thread")??; + let result: Result<()> = async { + tokio::task::spawn_blocking(move || -> Result<()> { + let transport = + UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string()) + .context("bind data plane")?; + let mut session = Session::new(cfg, Box::new(transport)) + .map_err(|e| anyhow!("host session: {e:?}"))?; + match source { + M3Source::Synthetic => synthetic_stream(&mut session, frames, &stop_stream), + M3Source::Virtual => virtual_stream(&mut session, mode, seconds, &stop_stream), + } + }) + .await + .context("stream thread")??; + // Give the client a moment to drain before the close. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + } + .await; - // Give the client a moment to drain, then close cleanly. - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - conn.close(0u32.into(), b"done"); - ep.wait_idle().await; - Ok(()) + // Teardown on EVERY path (a failed data plane must not leave the connection open with + // audio still streaming): stop the audio thread, close, then join both side-plane + // threads so the next session starts fresh (closing the connection ends the datagram + // task, which drops the input channel, which exits the input thread + its gamepads). + stop.store(true, Ordering::SeqCst); + conn.close( + if result.is_ok() { 0u32 } else { 1u32 }.into(), + if result.is_ok() { b"done" } else { b"error" }, + ); + let _ = tokio::task::spawn_blocking(move || { + if let Some(h) = audio_handle { + let _ = h.join(); + } + let _ = input_handle.join(); + }) + .await; + result } +/// Per-pad accumulated state: lumen/1 gamepad events are incremental (one button or axis +/// per datagram, see `lumen_core::input::gamepad`), the virtual xpad applies full frames. +#[derive(Clone, Copy, Default)] +struct PadState { + buttons: u32, + left_trigger: u8, + right_trigger: u8, + ls_x: i16, + ls_y: i16, + rs_x: i16, + rs_y: i16, +} + +impl PadState { + /// Fold one wire event into the state. `false` = unknown axis id (event dropped). + fn apply(&mut self, ev: &InputEvent) -> bool { + if ev.kind == InputKind::GamepadButton { + if ev.x != 0 { + self.buttons |= ev.code; + } else { + self.buttons &= !ev.code; + } + return true; + } + use lumen_core::input::gamepad::*; + let stick = ev.x.clamp(i16::MIN as i32, i16::MAX as i32) as i16; + let trigger = ev.x.clamp(0, 255) as u8; + match ev.code { + AXIS_LS_X => self.ls_x = stick, + AXIS_LS_Y => self.ls_y = stick, + AXIS_RS_X => self.rs_x = stick, + AXIS_RS_Y => self.rs_y = stick, + AXIS_LT => self.left_trigger = trigger, + AXIS_RT => self.right_trigger = trigger, + _ => return false, + } + true + } + + fn frame(&self, index: usize, active_mask: u16) -> crate::gamestream::gamepad::GamepadFrame { + crate::gamestream::gamepad::GamepadFrame { + index: index as i16, + active_mask, + buttons: self.buttons, + left_trigger: self.left_trigger, + right_trigger: self.right_trigger, + ls_x: self.ls_x, + ls_y: self.ls_y, + rs_x: self.rs_x, + rs_y: self.rs_y, + } + } +} + +/// Highest pad index addressable on the wire (`flags` field); the uinput manager caps +/// actual pad creation at its own MAX_PADS. +const MAX_WIRE_PADS: usize = 16; + /// The injector thread: open the session's input backend on first event, then inject. -fn input_thread(rx: std::sync::mpsc::Receiver) { +/// Gamepad kinds route to the session's [`GamepadManager`](crate::inject::gamepad), with +/// force feedback pumped between events and sent back as rumble datagrams. +fn input_thread(rx: std::sync::mpsc::Receiver, conn: quinn::Connection) { let mut injector: Option> = None; - while let Ok(ev) = rx.recv() { - if injector.is_none() { - let backend = crate::inject::default_backend(); - match crate::inject::open(backend) { - Ok(i) => { - tracing::info!(?backend, "lumen/1 input injector opened"); - injector = Some(i); + let mut injector_broken = false; + let mut pads = crate::inject::gamepad::GamepadManager::new(); + let mut pad_state = [PadState::default(); MAX_WIRE_PADS]; + let mut pad_mask = 0u16; + // Rumble is idempotent state on a lossy channel (client-side overflow drops datagrams), + // so re-send the current state of every rumbling-capable pad every 500 ms — a dropped + // transition (including a stop) heals on the next refresh. + let mut rumble_state = [(0u16, 0u16); MAX_WIRE_PADS]; + let mut rumble_seen = [false; MAX_WIRE_PADS]; + let mut last_refresh = std::time::Instant::now(); + loop { + match rx.recv_timeout(std::time::Duration::from_millis(4)) { + Ok(ev) => match ev.kind { + InputKind::GamepadButton | InputKind::GamepadAxis => { + let idx = ev.flags as usize; + if idx >= MAX_WIRE_PADS || !pad_state[idx].apply(&ev) { + continue; + } + pad_mask |= 1 << idx; + let frame = pad_state[idx].frame(idx, pad_mask); + pads.handle(&crate::gamestream::gamepad::GamepadEvent::State(frame)); } - Err(e) => { - tracing::error!(error = %format!("{e:#}"), "input injection unavailable"); - return; + _ => { + if injector.is_none() && !injector_broken { + let backend = crate::inject::default_backend(); + match crate::inject::open(backend) { + Ok(i) => { + tracing::info!(?backend, "lumen/1 input injector opened"); + injector = Some(i); + } + Err(e) => { + // Keep running for gamepads — uinput pads work even when + // the pointer/keyboard backend doesn't. + tracing::error!(error = %format!("{e:#}"), "pointer/keyboard injection unavailable"); + injector_broken = true; + } + } + } + if let Some(inj) = injector.as_mut() { + if let Err(e) = inj.inject(&ev) { + tracing::warn!(error = %format!("{e:#}"), "inject failed"); + } + } + } + }, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, + } + // Service force feedback every iteration (≤4 ms latency; games block on EVIOCSFF). + pads.pump_rumble(|pad, low, high| { + if let Some(s) = rumble_state.get_mut(pad as usize) { + *s = (low, high); + rumble_seen[pad as usize] = true; + } + let d = lumen_core::quic::encode_rumble_datagram(pad, low, high); + let _ = conn.send_datagram(d.to_vec().into()); + }); + if last_refresh.elapsed() >= std::time::Duration::from_millis(500) { + last_refresh = std::time::Instant::now(); + for (i, &(low, high)) in rumble_state.iter().enumerate() { + if rumble_seen[i] { + let d = lumen_core::quic::encode_rumble_datagram(i as u16, low, high); + let _ = conn.send_datagram(d.to_vec().into()); } } } - if let Err(e) = injector.as_mut().unwrap().inject(&ev) { - tracing::warn!(error = %format!("{e:#}"), "inject failed"); + } +} + +/// The audio thread: desktop capture → Opus (48 kHz stereo, 5 ms, CBR — same tuning as the +/// GameStream path) → `AUDIO_MAGIC` datagrams. QUIC already encrypts; no extra layer. +/// The capturer comes from (and returns to) the persistent slot — see [`AudioCapSlot`]. +fn audio_thread(conn: quinn::Connection, stop: Arc, audio_cap: AudioCapSlot) { + use crate::audio::{CHANNELS, SAMPLE_RATE}; + const FRAME_MS: usize = 5; + const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; // 240 + + let mut capturer = match audio_cap.lock().unwrap().take() { + Some(mut c) => { + c.drain(); // discard audio captured between sessions + c } + None => match crate::audio::open_audio_capture() { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %format!("{e:#}"), "lumen/1 audio unavailable — session continues without it"); + return; + } + }, + }; + let mut enc = match opus::Encoder::new( + SAMPLE_RATE, + opus::Channels::Stereo, + opus::Application::LowDelay, + ) { + Ok(e) => e, + Err(e) => { + tracing::error!(error = %e, "opus encoder"); + *audio_cap.lock().unwrap() = Some(capturer); + return; + } + }; + enc.set_bitrate(opus::Bitrate::Bits(128_000)).ok(); + enc.set_vbr(false).ok(); + + let frame_len = SAMPLES_PER_FRAME * CHANNELS; + let mut acc: Vec = Vec::with_capacity(frame_len * 4); + let mut opus_buf = vec![0u8; 1500]; + let mut seq: u32 = 0; + let mut capture_dead = false; + tracing::info!("lumen/1 audio streaming (Opus 48 kHz stereo, 5 ms datagrams)"); + 'session: while !stop.load(Ordering::SeqCst) { + let chunk = match capturer.next_chunk() { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %format!("{e:#}"), "audio capture ended"); + capture_dead = true; + break; + } + }; + acc.extend_from_slice(&chunk); + while acc.len() >= frame_len { + let frame: Vec = acc.drain(..frame_len).collect(); + let pts_ns = now_ns(); + match enc.encode_float(&frame, &mut opus_buf) { + Ok(n) => { + let d = lumen_core::quic::encode_audio_datagram(seq, pts_ns, &opus_buf[..n]); + if conn.send_datagram(d.into()).is_err() { + break 'session; // connection gone + } + seq = seq.wrapping_add(1); + } + Err(e) => tracing::warn!(error = %e, "opus encode"), + } + } + } + // Return the live capturer for the next session; a dead one is dropped so the next + // session reopens fresh. + if !capture_dead { + *audio_cap.lock().unwrap() = Some(capturer); } } @@ -290,42 +598,51 @@ fn virtual_stream( mod tests { use super::*; - /// End-to-end through the C ABI — the exact contract platform clients (Swift) link: - /// in-process lumen/1 host, `lumen_connect` → `lumen_connection_next_au` pulls verified - /// frames → `lumen_connection_send_input` enqueues → `lumen_connection_close`. + fn gp(kind: InputKind, code: u32, x: i32, pad: u32) -> InputEvent { + InputEvent { + kind, + _pad: [0; 3], + code, + x, + y: 0, + flags: pad, + } + } + + /// Incremental wire events accumulate into the full pad frame the virtual xpad applies. #[test] - fn c_abi_connection_roundtrip() { - use lumen_core::abi::{ - lumen_connect, lumen_connection_close, lumen_connection_mode, lumen_connection_next_au, - lumen_connection_send_input, - }; + fn gamepad_accumulator() { + use lumen_core::input::gamepad::*; + let mut s = PadState::default(); + assert!(s.apply(&gp(InputKind::GamepadButton, BTN_A, 1, 0))); + assert!(s.apply(&gp(InputKind::GamepadButton, BTN_LB, 1, 0))); + assert!(s.apply(&gp(InputKind::GamepadAxis, AXIS_LS_X, -32768, 0))); + assert!(s.apply(&gp(InputKind::GamepadAxis, AXIS_RT, 255, 0))); + let f = s.frame(2, 0b0100); + assert_eq!(f.buttons, BTN_A | BTN_LB); + assert_eq!((f.ls_x, f.right_trigger), (-32768, 255)); + assert_eq!((f.index, f.active_mask), (2, 0b0100)); + + // Release folds out; axis values clamp; unknown axis ids are rejected. + assert!(s.apply(&gp(InputKind::GamepadButton, BTN_A, 0, 0))); + assert_eq!(s.frame(0, 1).buttons, BTN_LB); + assert!(s.apply(&gp(InputKind::GamepadAxis, AXIS_LT, 9_999, 0))); + assert_eq!(s.left_trigger, 255); + assert!(!s.apply(&gp(InputKind::GamepadAxis, 42, 1, 0))); + + // The lumen/1 button bits are the GameStream bits — one wire contract end to end. + assert_eq!(BTN_A, crate::gamestream::gamepad::BTN_A); + assert_eq!(BTN_GUIDE, crate::gamestream::gamepad::BTN_GUIDE); + assert_eq!(BTN_DPAD_UP, crate::gamestream::gamepad::BTN_DPAD_UP); + } + + /// Pull and byte-verify `count` synthetic frames through the C ABI connection. + unsafe fn pull_verified(conn: *mut lumen_core::abi::LumenConnection, count: u32) { use lumen_core::error::LumenStatus; - - let host = std::thread::spawn(|| { - run(M3Options { - port: 19777, - source: M3Source::Synthetic, - seconds: 0, - frames: 25, - }) - }); - std::thread::sleep(std::time::Duration::from_millis(500)); - - let addr = std::ffi::CString::new("127.0.0.1").unwrap(); - let conn = unsafe { lumen_connect(addr.as_ptr(), 19777, 1280, 720, 60, 10_000) }; - assert!(!conn.is_null(), "lumen_connect failed"); - - let (mut w, mut h, mut hz) = (0u32, 0u32, 0u32); - assert_eq!( - unsafe { lumen_connection_mode(conn, &mut w, &mut h, &mut hz) }, - LumenStatus::Ok - ); - assert_eq!((w, h, hz), (1280, 720, 60)); - let mut got = 0u32; let mut frame = unsafe { std::mem::zeroed() }; - while got < 25 { - match unsafe { lumen_connection_next_au(conn, &mut frame, 2000) } { + while got < count { + match unsafe { lumen_core::abi::lumen_connection_next_au(conn, &mut frame, 2000) } { LumenStatus::Ok => { let data = unsafe { std::slice::from_raw_parts(frame.data, frame.len) }; let idx = u32::from_le_bytes(data[0..4].try_into().unwrap()); @@ -340,6 +657,58 @@ mod tests { other => panic!("next_au: {other:?}"), } } + } + + /// End-to-end through the C ABI — the exact contract platform clients (Swift) link: + /// in-process lumen/1 host, `lumen_connect` (TOFU → pinned reconnect) → + /// `lumen_connection_next_au` pulls verified frames → `lumen_connection_send_input` + /// enqueues → `lumen_connection_close`. Three sequential sessions against ONE host + /// process prove the persistent listener, and a wrong pin is rejected. + #[test] + fn c_abi_connection_roundtrip() { + use lumen_core::abi::{ + lumen_connect, lumen_connection_close, lumen_connection_mode, + lumen_connection_send_input, + }; + use lumen_core::error::LumenStatus; + + let host = std::thread::spawn(|| { + run(M3Options { + port: 19777, + source: M3Source::Synthetic, + seconds: 0, + frames: 25, + max_sessions: 3, + }) + }); + std::thread::sleep(std::time::Duration::from_millis(500)); + + // Session 1: TOFU (no pin) — observe the host fingerprint. + let addr = std::ffi::CString::new("127.0.0.1").unwrap(); + let mut observed = [0u8; 32]; + let conn = unsafe { + lumen_connect( + addr.as_ptr(), + 19777, + 1280, + 720, + 60, + std::ptr::null(), + observed.as_mut_ptr(), + 10_000, + ) + }; + assert!(!conn.is_null(), "lumen_connect failed"); + assert_ne!(observed, [0u8; 32], "fingerprint not reported"); + + let (mut w, mut h, mut hz) = (0u32, 0u32, 0u32); + assert_eq!( + unsafe { lumen_connection_mode(conn, &mut w, &mut h, &mut hz) }, + LumenStatus::Ok + ); + assert_eq!((w, h, hz), (1280, 720, 60)); + + unsafe { pull_verified(conn, 25) }; let ev = lumen_core::input::InputEvent { kind: lumen_core::input::InputKind::MouseMove, @@ -353,8 +722,60 @@ mod tests { unsafe { lumen_connection_send_input(conn, &ev) }, LumenStatus::Ok ); - unsafe { lumen_connection_close(conn) }; + + // Session 2 (same host process — the listener survived): pin the fingerprint. + let conn2 = unsafe { + lumen_connect( + addr.as_ptr(), + 19777, + 1280, + 720, + 60, + observed.as_ptr(), + std::ptr::null_mut(), + 10_000, + ) + }; + assert!(!conn2.is_null(), "pinned reconnect failed"); + unsafe { pull_verified(conn2, 25) }; + unsafe { lumen_connection_close(conn2) }; + + // Session 3: a wrong pin must be rejected by the handshake. + let bad = [0xAAu8; 32]; + let conn3 = unsafe { + lumen_connect( + addr.as_ptr(), + 19777, + 1280, + 720, + 60, + bad.as_ptr(), + std::ptr::null_mut(), + 10_000, + ) + }; + assert!(conn3.is_null(), "wrong pin must fail the handshake"); + + // The host saw the rejected handshake attempt as session 3? No — a TLS-failed + // handshake never yields a connection, so accept() is still waiting. Connect once + // more (TOFU) to complete the host's third session and let it exit. + let conn4 = unsafe { + lumen_connect( + addr.as_ptr(), + 19777, + 1280, + 720, + 60, + std::ptr::null(), + std::ptr::null_mut(), + 10_000, + ) + }; + assert!(!conn4.is_null()); + unsafe { pull_verified(conn4, 25) }; + unsafe { lumen_connection_close(conn4) }; + host.join().unwrap().unwrap(); } } diff --git a/crates/lumen-host/src/main.rs b/crates/lumen-host/src/main.rs index b9bbeb8..7d14f13 100644 --- a/crates/lumen-host/src/main.rs +++ b/crates/lumen-host/src/main.rs @@ -84,6 +84,9 @@ fn real_main() -> Result<()> { source, seconds: get("--seconds").and_then(|s| s.parse().ok()).unwrap_or(30), frames: get("--frames").and_then(|s| s.parse().ok()).unwrap_or(300), + max_sessions: get("--max-sessions") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), }) } Some("-h") | Some("--help") | Some("help") | None => { @@ -297,6 +300,7 @@ USAGE: lumen-host serve [OPTIONS] GameStream host control plane (M2: mDNS + serverinfo …) + the management REST API lumen-host openapi print the management API's OpenAPI document (codegen) + lumen-host m3-host [OPTIONS] native lumen/1 host (QUIC control plane + UDP data plane) lumen-host m0 [OPTIONS] M0 capture→encode→file pipeline spike SERVE OPTIONS: @@ -304,6 +308,13 @@ SERVE OPTIONS: --mgmt-token bearer token for the management API (or LUMEN_MGMT_TOKEN); required when --mgmt-bind is not loopback +M3-HOST OPTIONS: + --port QUIC listen port (default: 9777) + --source test frames, or virtual display + NVENC (default: synthetic) + --seconds per-session stream duration, virtual source (default: 30) + --frames per-session frame count, synthetic source (default: 300) + --max-sessions exit after N sessions; 0 = serve forever (default: 0) + M0 OPTIONS: --source frame source (default: portal). 'kwin-virtual' creates a diff --git a/include/lumen_core.h b/include/lumen_core.h index 75628fc..b8a5f4c 100644 --- a/include/lumen_core.h +++ b/include/lumen_core.h @@ -25,6 +25,50 @@ // Fixed serialized size of an [`InputEvent`] on the wire (tag + fields). #define INPUT_WIRE_LEN (((((1 + 1) + 4) + 4) + 4) + 4) +#define LUMEN_BTN_DPAD_UP 1 + +#define LUMEN_BTN_DPAD_DOWN 2 + +#define LUMEN_BTN_DPAD_LEFT 4 + +#define LUMEN_BTN_DPAD_RIGHT 8 + +#define LUMEN_BTN_START 16 + +#define LUMEN_BTN_BACK 32 + +#define LUMEN_BTN_LS_CLICK 64 + +#define LUMEN_BTN_RS_CLICK 128 + +#define LUMEN_BTN_LB 256 + +#define LUMEN_BTN_RB 512 + +#define LUMEN_BTN_GUIDE 1024 + +#define LUMEN_BTN_A 4096 + +#define LUMEN_BTN_B 8192 + +#define LUMEN_BTN_X 16384 + +#define LUMEN_BTN_Y 32768 + +// Axis ids for `InputKind::GamepadAxis`. +#define LUMEN_AXIS_LS_X 0 + +#define LUMEN_AXIS_LS_Y 1 + +#define LUMEN_AXIS_RS_X 2 + +#define LUMEN_AXIS_RS_Y 3 + +// Triggers: value range 0..255. +#define LUMEN_AXIS_LT 4 + +#define LUMEN_AXIS_RT 5 + // Identifies a lumen video packet (vs. an input datagram, see [`crate::input`]). #define LUMEN_MAGIC 201 @@ -38,6 +82,17 @@ // `shard_payload` so `HEADER_LEN + shard_payload + CRYPTO_OVERHEAD ≤ MAX_DATAGRAM_BYTES`. #define MAX_DATAGRAM_BYTES 2048 +#if defined(LUMEN_FEATURE_QUIC) +// Datagram wire tags. Video rides UDP; everything low-rate rides QUIC datagrams, +// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8), +// audio = [`AUDIO_MAGIC`], rumble = [`RUMBLE_MAGIC`]. +#define LUMEN_AUDIO_MAGIC 201 +#endif + +#if defined(LUMEN_FEATURE_QUIC) +#define LUMEN_RUMBLE_MAGIC 202 +#endif + // Stable C ABI status codes. `Ok` is 0; all errors are negative so callers can // test `rc < 0`. Do not renumber existing variants — only append. enum LumenStatus @@ -82,8 +137,11 @@ enum LumenInputKind LUMEN_INPUT_KIND_MOUSE_BUTTON_UP = 5, // `x` carries the (signed) scroll delta. LUMEN_INPUT_KIND_MOUSE_SCROLL = 6, + // `code` = button bit ([`gamepad`] `BTN_*`), `x` ≠ 0 = pressed, `flags` = pad index. LUMEN_INPUT_KIND_GAMEPAD_BUTTON = 7, - // `code` = axis id, `x` = axis value. + // `code` = axis id ([`gamepad`] `AXIS_*`), `x` = axis value, `flags` = pad index. + // Sticks are i16 range (−32768..32767) in the XInput/Moonlight convention — **+y = + // up** (unlike mouse coordinates); triggers 0..255. LUMEN_INPUT_KIND_GAMEPAD_AXIS = 8, }; #ifndef __cplusplus @@ -97,6 +155,11 @@ typedef uint8_t LumenInputKind; #if defined(LUMEN_FEATURE_QUIC) // Opaque handle to a live `lumen/1` connection (QUIC control plane + UDP data plane, all // pumped on internal threads). +// +// Thread contract: each plane (video `next_au`, audio `next_audio`, rumble `next_rumble`) +// may be pulled from its own thread, at most one thread per plane. The accessors only +// take shared references internally (per-plane mutexed borrow slots), so cross-plane +// concurrency is sound — never two threads on the *same* plane. typedef struct LumenConnection LumenConnection; #endif @@ -164,6 +227,17 @@ typedef struct { uint64_t bytes_received; } LumenStats; +#if defined(LUMEN_FEATURE_QUIC) +// One Opus audio packet pulled off a `lumen/1` connection (48 kHz stereo, 5 ms frames). +// `data` borrows connection memory until the next `lumen_connection_next_audio` call. +typedef struct { + const uint8_t *data; + uintptr_t len; + uint32_t seq; + uint64_t pts_ns; +} LumenAudioPacket; +#endif + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -242,26 +316,66 @@ LumenStatus lumen_get_stats(LumenSession *s, LumenStats *out); // Connect to a `lumen/1` host and start a session at `width`x`height`@`refresh_hz`. // Blocks up to `timeout_ms` for the handshake. Returns NULL on failure. // +// Trust: `pin_sha256` (NULL or 32 bytes) is the expected SHA-256 fingerprint of the host's +// certificate — a mismatching host is rejected. NULL = trust on first use; persist the +// fingerprint written to `observed_sha256_out` (NULL or 32 bytes, filled on success) and +// pass it as the pin on every later connect. +// // # Safety -// `host` is a NUL-terminated UTF-8 string (IP or hostname resolvable by the platform). +// `host` is a NUL-terminated UTF-8 string (IP or hostname resolvable by the platform); +// `pin_sha256`/`observed_sha256_out` are each NULL or valid for 32 bytes. LumenConnection *lumen_connect(const char *host, uint16_t port, uint32_t width, uint32_t height, uint32_t refresh_hz, + const uint8_t *pin_sha256, + uint8_t *observed_sha256_out, uint32_t timeout_ms); #endif #if defined(LUMEN_FEATURE_QUIC) // Pull the next reassembled access unit, waiting up to `timeout_ms`. Returns // [`LumenStatus::NoFrame`] on timeout and [`LumenStatus::Closed`] once the session ended. -// On `Ok`, `*out` borrows connection memory **until the next call** on this handle. +// On `Ok`, `*out` borrows connection memory **until the next `next_au` call** on this +// handle (the audio/rumble planes do not invalidate it). // // # Safety -// `c` is a valid connection handle used from a single thread; `out` is writable. +// `c` is a valid connection handle; `out` is writable. At most one thread pulls video — +// it may run concurrently with one audio-pulling and one rumble-pulling thread. LumenStatus lumen_connection_next_au(LumenConnection *c, LumenFrame *out, uint32_t timeout_ms); #endif +#if defined(LUMEN_FEATURE_QUIC) +// Pull the next Opus audio packet, waiting up to `timeout_ms`. Returns +// [`LumenStatus::NoFrame`] on timeout and [`LumenStatus::Closed`] once the session ended. +// On `Ok`, `out->data` borrows connection memory **until the next audio call** on this +// handle (independent of the video slot). Drain from a dedicated audio thread — packets +// arrive every 5 ms and the internal queue holds 320 ms. +// +// # Safety +// `c` is a valid connection handle; `out` is writable. At most one thread pulls audio — +// it may run concurrently with the video/rumble pullers. +LumenStatus lumen_connection_next_audio(LumenConnection *c, + LumenAudioPacket *out, + uint32_t timeout_ms); +#endif + +#if defined(LUMEN_FEATURE_QUIC) +// Pull the next rumble (force-feedback) update, waiting up to `timeout_ms`. Amplitudes +// are 0..0xFFFF (`low` = low-frequency motor, `high` = high-frequency), `(0, 0)` = stop. +// Same timeout/closed semantics as [`lumen_connection_next_audio`]. +// +// # Safety +// `c` is a valid connection handle; out pointers are writable (NULLs are skipped). At +// most one thread pulls rumble — it may run concurrently with the video/audio pullers. +LumenStatus lumen_connection_next_rumble(LumenConnection *c, + uint16_t *pad, + uint16_t *low, + uint16_t *high, + uint32_t timeout_ms); +#endif + #if defined(LUMEN_FEATURE_QUIC) // Send one input event to the host as a QUIC datagram (non-blocking enqueue). //