diff --git a/Cargo.lock b/Cargo.lock index 072deda..2d1cf48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,7 @@ dependencies = [ "rsa", "rustls", "rustls-pemfile", + "rusty_enet", "sha2", "tokio", "tracing", @@ -2318,6 +2319,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rusty_enet" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1c0b43e27d8d86bb3a15b9eac232b6d78a06afee258a9861deb5a0d15b65c33" +dependencies = [ + "js-sys", +] + [[package]] name = "ryu" version = "1.0.23" diff --git a/crates/lumen-host/Cargo.toml b/crates/lumen-host/Cargo.toml index 5ad6c8e..08d186c 100644 --- a/crates/lumen-host/Cargo.toml +++ b/crates/lumen-host/Cargo.toml @@ -26,6 +26,7 @@ x509-parser = "0.16" axum-server = { version = "0.7", features = ["tls-rustls"] } rustls = "0.23" rustls-pemfile = "2" +rusty_enet = "0.4" [target.'cfg(target_os = "linux")'.dependencies] # `screencast` gates the ScreenCast portal module; `tokio` is the default runtime. diff --git a/crates/lumen-host/src/gamestream/control.rs b/crates/lumen-host/src/gamestream/control.rs new file mode 100644 index 0000000..f8be93b --- /dev/null +++ b/crates/lumen-host/src/gamestream/control.rs @@ -0,0 +1,75 @@ +//! The GameStream control stream: an ENet host on UDP 47999. Moonlight connects this +//! BEFORE the video stream starts (`STAGE_CONTROL_STREAM_START` precedes +//! `STAGE_VIDEO_STREAM_START`), so it must be up or the whole connection aborts. P1.4 here +//! just accepts the connection + services ENet (keepalive/timeouts) so video can flow; +//! decoding control messages into input injection (mouse/keyboard/gamepad) is the next step. +//! +//! Plaintext for now (we negotiate `encryptionEnabled=0` in DESCRIBE); the encrypted +//! SS_ENC_CONTROL_V2 framing is P1.5. Runs on its own native thread for the host's lifetime. + +use super::CONTROL_PORT; +use anyhow::{anyhow, Context, Result}; +use rusty_enet::{Event, Host, HostSettings}; +use std::net::UdpSocket; +use std::time::Duration; + +/// Bind the ENet control host on 47999 and service it forever on a dedicated thread. +pub fn spawn() -> Result<()> { + let socket = UdpSocket::bind(("0.0.0.0", CONTROL_PORT)).context("bind control UDP")?; + socket + .set_nonblocking(true) + .context("control socket nonblocking")?; + let mut host = Host::new( + socket, + HostSettings { + peer_limit: 4, + channel_limit: 8, + ..Default::default() + }, + ) + .map_err(|e| anyhow!("ENet host init: {e:?}"))?; + tracing::info!(port = CONTROL_PORT, "ENet control listening"); + + std::thread::Builder::new() + .name("lumen-control".into()) + .spawn(move || loop { + loop { + match host.service() { + Ok(Some(event)) => match event { + Event::Connect { .. } => { + tracing::info!("control: client connected"); + } + Event::Disconnect { .. } => { + tracing::info!("control: client disconnected"); + } + Event::Receive { + channel_id, packet, .. + } => { + let d = packet.data(); + let opcode = if d.len() >= 2 { + u16::from_le_bytes([d[0], d[1]]) + } else { + 0 + }; + // TODO(P1.4): decode input events (mouse/keyboard/gamepad) → inject.rs. + tracing::debug!( + channel_id, + len = d.len(), + opcode = format!("0x{opcode:04x}"), + "control: message" + ); + } + }, + Ok(None) => break, + Err(e) => { + tracing::warn!(error = %format!("{e:?}"), "control: service error"); + break; + } + } + } + // ENet needs frequent servicing for handshake/keepalive/retransmit. + std::thread::sleep(Duration::from_millis(2)); + }) + .context("spawn control thread")?; + Ok(()) +} diff --git a/crates/lumen-host/src/gamestream/mod.rs b/crates/lumen-host/src/gamestream/mod.rs index 1fcbfcf..06628fa 100644 --- a/crates/lumen-host/src/gamestream/mod.rs +++ b/crates/lumen-host/src/gamestream/mod.rs @@ -7,12 +7,14 @@ //! the media streams follow (see the M2 task list / plan). mod cert; +mod control; mod crypto; mod mdns; mod nvhttp; mod pairing; mod rtsp; mod serverinfo; +mod stream; mod tls; mod video; @@ -79,6 +81,10 @@ pub struct AppState { pub paired: std::sync::Mutex>>, /// The active launch session (set by `/launch`, consumed by RTSP/media). pub launch: std::sync::Mutex>, + /// Negotiated video config from RTSP ANNOUNCE (consumed by the stream on PLAY). + pub stream: std::sync::Mutex>, + /// True while the video stream thread is running (also its keep-running flag). + pub streaming: std::sync::Arc, } /// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers. @@ -91,6 +97,8 @@ pub fn serve() -> Result<()> { pairing: pairing::Pairing::new(), paired: std::sync::Mutex::new(Vec::new()), launch: std::sync::Mutex::new(None), + stream: std::sync::Mutex::new(None), + streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), }); tracing::info!( hostname = %state.host.hostname, @@ -104,6 +112,7 @@ pub fn serve() -> Result<()> { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let _advert = mdns::advertise(&state.host).context("mDNS advertise")?; rtsp::spawn(state.clone()).context("start RTSP server")?; + control::spawn().context("start ENet control server")?; nvhttp::run(state).await }) } diff --git a/crates/lumen-host/src/gamestream/rtsp.rs b/crates/lumen-host/src/gamestream/rtsp.rs index 10d6961..bdb14e4 100644 --- a/crates/lumen-host/src/gamestream/rtsp.rs +++ b/crates/lumen-host/src/gamestream/rtsp.rs @@ -7,11 +7,14 @@ //! Runs on its own native thread (control-plane setup, not the per-frame hot path), one //! thread per connection. Plaintext only for now (encryption is negotiated; P1.5). +use super::stream::{self, StreamConfig}; use super::{AppState, AUDIO_PORT, CONTROL_PORT, RTSP_PORT, VIDEO_PORT}; +use crate::encode::Codec; use anyhow::{Context, Result}; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::Ordering; use std::sync::Arc; /// Opaque per-session payload the client echoes as its first UDP datagram (port-learning). @@ -148,47 +151,32 @@ fn handle_request(req: &Request, state: &AppState) -> String { ) } "ANNOUNCE" => { - let cfg = parse_announce(&req.body); - tracing::info!( - width = cfg - .get("x-nv-video[0].clientViewportWd") - .map(String::as_str) - .unwrap_or("?"), - height = cfg - .get("x-nv-video[0].clientViewportHt") - .map(String::as_str) - .unwrap_or("?"), - fps = cfg - .get("x-nv-video[0].maxFPS") - .map(String::as_str) - .unwrap_or("?"), - bitrate_kbps = cfg - .get("x-nv-vqos[0].bw.maximumBitrateKbps") - .map(String::as_str) - .unwrap_or("?"), - packet_size = cfg - .get("x-nv-video[0].packetSize") - .map(String::as_str) - .unwrap_or("?"), - codec = cfg - .get("x-nv-vqos[0].bitStreamFormat") - .map(String::as_str) - .unwrap_or("?"), - fec_pct = cfg - .get("x-nv-vqos[0].fec.repairPercent") - .map(String::as_str) - .unwrap_or("?"), - "RTSP ANNOUNCE — negotiated stream config" - ); - // TODO(P1.3): map `cfg` → lumen_core::Config and stash it for the media stages. - let _ = state; + let map = parse_announce(&req.body); + match stream_config(&map) { + Some(cfg) => { + tracing::info!(?cfg, "RTSP ANNOUNCE — negotiated stream config"); + *state.stream.lock().unwrap() = Some(cfg); + } + None => tracing::warn!("RTSP ANNOUNCE — missing required video config keys"), + } response(&req.cseq, &[], None) } "PLAY" => { - tracing::info!("RTSP PLAY — client ready; media streams would start here (P1.3)"); + let cfg = *state.stream.lock().unwrap(); + match cfg { + Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => { + tracing::info!("RTSP PLAY — starting video stream"); + stream::start(cfg, state.streaming.clone()); + } + Some(_) => tracing::info!("RTSP PLAY — stream already running"), + None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), + } response(&req.cseq, &[("Session", "DEADBEEFCAFE;timeout = 90")], None) } - "TEARDOWN" => response(&req.cseq, &[], None), + "TEARDOWN" => { + state.streaming.store(false, Ordering::SeqCst); // signal the stream thread to stop + response(&req.cseq, &[], None) + } other => { tracing::warn!(method = other, "RTSP unsupported method"); response_status("501 Not Implemented", &req.cseq, &[], None) @@ -224,6 +212,31 @@ fn parse_announce(body: &str) -> HashMap { map } +/// Map the negotiated ANNOUNCE keys to a [`StreamConfig`] (resolution/packetSize required). +fn stream_config(map: &HashMap) -> Option { + let parse_u = |k: &str| map.get(k).and_then(|s| s.trim().parse::().ok()); + let width = parse_u("x-nv-video[0].clientViewportWd")?; + let height = parse_u("x-nv-video[0].clientViewportHt")?; + let packet_size = parse_u("x-nv-video[0].packetSize")? as usize; + let fps = parse_u("x-nv-video[0].maxFPS") + .filter(|&f| f > 0) + .unwrap_or(60); + let bitrate_kbps = parse_u("x-nv-vqos[0].bw.maximumBitrateKbps").unwrap_or(20_000); + let codec = match map.get("x-nv-vqos[0].bitStreamFormat").map(|s| s.trim()) { + Some("1") => Codec::H265, + Some("2") => Codec::Av1, + _ => Codec::H264, + }; + Some(StreamConfig { + width, + height, + fps, + packet_size, + bitrate_kbps, + codec, + }) +} + /// Extract the stream type from a SETUP URI like `…/streamid=video/0/0`. fn stream_type(uri: &str) -> Option<&str> { let after = uri.split("streamid=").nth(1)?; diff --git a/crates/lumen-host/src/gamestream/stream.rs b/crates/lumen-host/src/gamestream/stream.rs new file mode 100644 index 0000000..8f8e3f9 --- /dev/null +++ b/crates/lumen-host/src/gamestream/stream.rs @@ -0,0 +1,133 @@ +//! The video data plane: on RTSP PLAY, learn the client's UDP endpoint (it pings the video +//! port), then run capture → NVENC encode → [`VideoPacketizer`] → UDP send. P1.3 uses a fast +//! synthetic test pattern (proves the wire format with no compositor); swapping in real +//! portal desktop capture is a one-line source change. Runs on its own native thread. + +use super::video::{FrameType, VideoPacketizer}; +use super::VIDEO_PORT; +use crate::capture::{CapturedFrame, PixelFormat}; +use crate::encode::{self, Codec}; +use anyhow::{Context, Result}; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// Negotiated video parameters from the RTSP ANNOUNCE. +#[derive(Clone, Copy, Debug)] +pub struct StreamConfig { + pub width: u32, + pub height: u32, + pub fps: u32, + pub packet_size: usize, + pub bitrate_kbps: u32, + pub codec: Codec, +} + +/// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears. +pub fn start(cfg: StreamConfig, running: Arc) { + let _ = std::thread::Builder::new() + .name("lumen-video".into()) + .spawn(move || { + tracing::info!(?cfg, "video stream starting"); + if let Err(e) = run(cfg, &running) { + tracing::error!(error = %format!("{e:#}"), "video stream failed"); + } + running.store(false, Ordering::SeqCst); + tracing::info!("video stream stopped"); + }); +} + +fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { + let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?; + // The client pings the video port so we learn where to send; it re-pings until video + // flows, so a missed early ping is fine. + sock.set_read_timeout(Some(Duration::from_secs(10)))?; + tracing::info!( + port = VIDEO_PORT, + "video: awaiting client ping to learn endpoint" + ); + let mut probe = [0u8; 256]; + let (_, client) = sock + .recv_from(&mut probe) + .context("video: no client ping within 10s")?; + sock.connect(client) + .context("connect client video endpoint")?; + tracing::info!(%client, "video: client endpoint learned"); + + let (w, h, fps) = (cfg.width, cfg.height, cfg.fps); + let mut enc = encode::open_video( + cfg.codec, + PixelFormat::Bgrx, + w, + h, + fps, + cfg.bitrate_kbps as u64 * 1000, + ) + .context("open NVENC for stream")?; + let mut pk = VideoPacketizer::new(cfg.packet_size); + + let bpp = 4usize; + let row = w as usize * bpp; + let mut buf = vec![0u8; row * h as usize]; + let frame_interval = Duration::from_secs_f64(1.0 / fps as f64); + let mut frame_idx: u32 = 0; + let mut sent_pkts: u64 = 0; + + while running.load(Ordering::SeqCst) { + let tick = Instant::now(); + + // Fast synthetic test pattern: a pulsing grey field + a white band sweeping down. + let shade = (frame_idx % 256) as u8; + buf.fill(shade); + let band_h = (h as usize / 20).max(1); + let band_y = (frame_idx as usize * 6) % h as usize; + for y in band_y..(band_y + band_h).min(h as usize) { + buf[y * row..(y + 1) * row].fill(0xff); + } + + let frame = CapturedFrame { + width: w, + height: h, + pts_ns: frame_idx as u64 * 1_000_000_000 / fps as u64, + format: PixelFormat::Bgrx, + cpu_bytes: std::mem::take(&mut buf), + }; + enc.submit(&frame).context("encoder submit")?; + buf = frame.cpu_bytes; // reclaim the buffer (no per-frame realloc) + + let ts = (frame_idx as u64 * 90_000 / fps as u64) as u32; + let mut client_gone = false; + while let Some(au) = enc.poll().context("encoder poll")? { + let ft = if au.keyframe { + FrameType::Idr + } else { + FrameType::P + }; + for pkt in pk.packetize(&au.data, ft, ts) { + if sock.send(&pkt).is_err() { + client_gone = true; + break; + } + sent_pkts += 1; + } + if client_gone { + break; + } + } + if client_gone { + tracing::info!(sent_pkts, "video: client unreachable — stopping stream"); + break; + } + + frame_idx += 1; + if frame_idx % (fps.max(1) * 2) == 0 { + tracing::info!(frame_idx, sent_pkts, "video: streaming"); + } + let elapsed = tick.elapsed(); + if elapsed < frame_interval { + std::thread::sleep(frame_interval - elapsed); + } + } + Ok(()) +}