diff --git a/crates/lumen-host/src/encode.rs b/crates/lumen-host/src/encode.rs index a1208a1..8b402e7 100644 --- a/crates/lumen-host/src/encode.rs +++ b/crates/lumen-host/src/encode.rs @@ -40,6 +40,9 @@ impl Codec { /// A hardware encoder. One per session; runs on the encode thread. pub trait Encoder: Send { fn submit(&mut self, frame: &CapturedFrame) -> Result<()>; + /// Force the next submitted frame to be an IDR keyframe (e.g. after a client + /// reference-frame-invalidation request). Default: no-op. + fn request_keyframe(&mut self) {} /// Pull the next encoded AU if one is ready. fn poll(&mut self) -> Result>; /// Signal end-of-stream. After this, drain the remaining AUs with [`poll`](Self::poll) diff --git a/crates/lumen-host/src/encode/linux.rs b/crates/lumen-host/src/encode/linux.rs index 69fc77f..c4b3568 100644 --- a/crates/lumen-host/src/encode/linux.rs +++ b/crates/lumen-host/src/encode/linux.rs @@ -42,6 +42,8 @@ pub struct NvencEncoder { fps: u32, /// Monotonic presentation index, in `1/fps` time-base units. frame_idx: i64, + /// Force the next submitted frame to be an IDR (set by [`request_keyframe`]). + force_kf: bool, } impl NvencEncoder { @@ -95,6 +97,7 @@ impl NvencEncoder { height, fps, frame_idx: 0, + force_kf: false, }) } } @@ -147,10 +150,21 @@ impl Encoder for NvencEncoder { } self.frame.set_pts(Some(self.frame_idx)); self.frame_idx += 1; + // Force an IDR when requested (client RFI); otherwise let NVENC pick (GOP/P-frame). + if self.force_kf { + self.frame.set_kind(ffmpeg::picture::Type::I); + self.force_kf = false; + } else { + self.frame.set_kind(ffmpeg::picture::Type::None); + } self.enc.send_frame(&self.frame).context("send_frame")?; Ok(()) } + fn request_keyframe(&mut self) { + self.force_kf = true; + } + fn poll(&mut self) -> Result> { let mut pkt = Packet::empty(); match self.enc.receive_packet(&mut pkt) { diff --git a/crates/lumen-host/src/gamestream/control.rs b/crates/lumen-host/src/gamestream/control.rs index 3451950..9688280 100644 --- a/crates/lumen-host/src/gamestream/control.rs +++ b/crates/lumen-host/src/gamestream/control.rs @@ -125,6 +125,22 @@ fn on_receive( } }; + // Recovery requests after loss: invalidate-reference-frames (0x0301, Gen7) or request-IDR + // (0x0302, Gen7Enc). Force a keyframe so the client can resync without a multi-second stall. + if pt.len() >= 2 { + let inner = u16::from_le_bytes([pt[0], pt[1]]); + if matches!(inner, 0x0301 | 0x0302 | 0x0305) { + state + .force_idr + .store(true, std::sync::atomic::Ordering::SeqCst); + tracing::info!( + ty = format!("{inner:#06x}"), + "control: IDR/RFI request → keyframe" + ); + return; + } + } + let events = super::input::decode(&pt); if events.is_empty() { return; // keepalive / QoS / unhandled input kind diff --git a/crates/lumen-host/src/gamestream/mod.rs b/crates/lumen-host/src/gamestream/mod.rs index 7922801..ed8a814 100644 --- a/crates/lumen-host/src/gamestream/mod.rs +++ b/crates/lumen-host/src/gamestream/mod.rs @@ -89,6 +89,9 @@ pub struct AppState { pub streaming: std::sync::Arc, /// True while the audio stream thread is running (also its keep-running flag). pub audio_streaming: std::sync::Arc, + /// Set by the control stream when the client requests an IDR / invalidates reference + /// frames (recovery after loss); the video thread forces a keyframe and clears it. + pub force_idr: std::sync::Arc, } /// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers. @@ -104,6 +107,7 @@ pub fn serve() -> Result<()> { stream: std::sync::Mutex::new(None), streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), + force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), }); tracing::info!( hostname = %state.host.hostname, diff --git a/crates/lumen-host/src/gamestream/rtsp.rs b/crates/lumen-host/src/gamestream/rtsp.rs index f5898dd..e5ea433 100644 --- a/crates/lumen-host/src/gamestream/rtsp.rs +++ b/crates/lumen-host/src/gamestream/rtsp.rs @@ -167,7 +167,7 @@ fn handle_request(req: &Request, state: &AppState) -> String { match cfg { Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => { tracing::info!("RTSP PLAY — starting video stream"); - stream::start(cfg, state.streaming.clone()); + stream::start(cfg, state.streaming.clone(), state.force_idr.clone()); } Some(_) => tracing::info!("RTSP PLAY — stream already running"), None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), @@ -243,6 +243,10 @@ fn stream_config(map: &HashMap) -> Option { Some("2") => Codec::Av1, _ => Codec::H264, }; + // Parity floor the client asks for (protects small frames); clamp to a sane max. + let min_fec = parse_u("x-nv-vqos[0].fec.minRequiredFecPackets") + .unwrap_or(2) + .min(16) as u8; Some(StreamConfig { width, height, @@ -250,6 +254,7 @@ fn stream_config(map: &HashMap) -> Option { packet_size, bitrate_kbps, codec, + min_fec, }) } diff --git a/crates/lumen-host/src/gamestream/stream.rs b/crates/lumen-host/src/gamestream/stream.rs index f55f7f9..4f0aca9 100644 --- a/crates/lumen-host/src/gamestream/stream.rs +++ b/crates/lumen-host/src/gamestream/stream.rs @@ -23,15 +23,18 @@ pub struct StreamConfig { pub packet_size: usize, pub bitrate_kbps: u32, pub codec: Codec, + /// Client's `x-nv-vqos[0].fec.minRequiredFecPackets` — parity floor per FEC block. + pub min_fec: u8, } /// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears. -pub fn start(cfg: StreamConfig, running: Arc) { +/// `force_idr` is set by the control stream on a client recovery request. +pub fn start(cfg: StreamConfig, running: Arc, force_idr: Arc) { let _ = std::thread::Builder::new() .name("lumen-video".into()) .spawn(move || { tracing::info!(?cfg, "video stream starting"); - if let Err(e) = run(cfg, &running) { + if let Err(e) = run(cfg, &running, &force_idr) { tracing::error!(error = %format!("{e:#}"), "video stream failed"); } running.store(false, Ordering::SeqCst); @@ -39,7 +42,7 @@ pub fn start(cfg: StreamConfig, running: Arc) { }); } -fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { +fn run(cfg: StreamConfig, running: &AtomicBool, force_idr: &AtomicBool) -> Result<()> { let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?; // The client pings the video port so we learn where to send; it re-pings until video // flows, so a missed early ping is fine. @@ -88,7 +91,7 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(20); - let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct); + let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec); // Pace at a steady rate (capped at 60fps), re-encoding the last captured frame when the // compositor produced no new one. wlroots only emits frames on damage, so a static or @@ -114,31 +117,47 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { if let Some(f) = capturer.try_latest().context("capture frame")? { frame = f; } + // Honor a client recovery request (RFI / request-IDR): force a keyframe so the client + // resyncs immediately instead of waiting for the next GOP boundary. + if force_idr.swap(false, Ordering::SeqCst) { + enc.request_keyframe(); + } enc.submit(&frame).context("encoder submit")?; // 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct. let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32; - let mut client_gone = false; + let mut batch: Vec> = Vec::new(); while let Some(au) = enc.poll().context("encoder poll")? { let ft = if au.keyframe { FrameType::Idr } else { FrameType::P }; - for pkt in pk.packetize(&au.data, ft, ts) { - // Simulated network loss: build the packet (advances seq) but skip the send. + batch.extend(pk.packetize(&au.data, ft, ts)); + } + + // Pace the frame's packets evenly across the frame interval rather than blasting them + // at line rate (a real link drops microbursts). The per-packet schedule also paces the + // frame itself; sub-500µs sleeps are skipped (unreliable), batching the spread. + let mut client_gone = false; + let n = batch.len(); + if n > 0 { + let per_ns = frame_interval.as_nanos() as u64 / n as u64; + for (i, pkt) in batch.iter().enumerate() { if drop_pct > 0 && rng.gen_range(0..100) < drop_pct { - dropped += 1; - continue; - } - if sock.send(&pkt).is_err() { + dropped += 1; // simulated loss: built the packet, skip the send + } else if sock.send(pkt).is_err() { client_gone = true; break; + } else { + sent_pkts += 1; + } + let target = tick + Duration::from_nanos(per_ns * (i as u64 + 1)); + if let Some(ahead) = target.checked_duration_since(Instant::now()) { + if ahead >= Duration::from_micros(500) { + std::thread::sleep(ahead); + } } - sent_pkts += 1; - } - if client_gone { - break; } } if client_gone { @@ -152,6 +171,8 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { fps_count = 0; fps_t = Instant::now(); } + // Backstop the frame rate when few/no packets were produced (the packet pacing above + // otherwise consumes ~one frame interval on its own). let elapsed = tick.elapsed(); if elapsed < frame_interval { std::thread::sleep(frame_interval - elapsed); diff --git a/crates/lumen-host/src/gamestream/video.rs b/crates/lumen-host/src/gamestream/video.rs index 4441ebc..a1d4488 100644 --- a/crates/lumen-host/src/gamestream/video.rs +++ b/crates/lumen-host/src/gamestream/video.rs @@ -43,17 +43,21 @@ pub struct VideoPacketizer { /// Requested FEC overhead percent (0 = data shards only). The wire carries the recomputed /// per-block `(100·m)/k` so Moonlight derives the same parity count. fec_percentage: usize, + /// Minimum parity shards per block (the client's `fec.minRequiredFecPackets`) — protects + /// small frames whose `⌈k·pct/100⌉` would otherwise be just 1. + min_fec: usize, frame_index: u32, /// Monotonic per-stream packet counter (the RTP sequence / streamPacketIndex source). seq: u32, } impl VideoPacketizer { - pub fn new(packet_size: usize, fec_percentage: u8) -> Self { + pub fn new(packet_size: usize, fec_percentage: u8, min_fec: u8) -> Self { VideoPacketizer { packet_size, payload_per_shard: packet_size + 16 - SHARD_HEADER, fec_percentage: fec_percentage as usize, + min_fec: min_fec as usize, frame_index: 0, seq: 0, } @@ -130,9 +134,14 @@ impl VideoPacketizer { shards.push(buf); } - // 2. m = ⌈k·pct/100⌉ parity shards over the full datagrams. The wire percentage is - // recomputed from m so the client derives the same parity count. - let m = if pct > 0 { (k * pct).div_ceil(100) } else { 0 }; + // 2. m = ⌈k·pct/100⌉ parity shards (floored at the client's min, capped so k+m≤255) + // over the full datagrams. The wire percentage is recomputed from m so the client + // derives the same count. + let m = if pct > 0 { + (k * pct).div_ceil(100).max(self.min_fec).min(255 - k) + } else { + 0 + }; let wire_pct = if m > 0 { (100 * m) / k } else { 0 }; let parity = if m > 0 { Gf8Coder.encode(&shards, m).unwrap_or_default() @@ -217,7 +226,7 @@ mod tests { #[test] fn single_block_layout() { - let mut pk = VideoPacketizer::new(1392, 0); // data-only; pps = 1392+16-32 = 1376 + let mut pk = VideoPacketizer::new(1392, 0, 0); // data-only; pps = 1392+16-32 = 1376 assert_eq!(pk.payload_per_shard, 1376); let au = vec![0xABu8; 4000]; // 8+4000 = 4008 → ceil(4008/1376) = 3 data shards let pkts = pk.packetize(&au, FrameType::Idr, 90_000); @@ -245,7 +254,7 @@ mod tests { #[test] fn multi_block_split() { - let mut pk = VideoPacketizer::new(1392, 0); // data-only + let mut pk = VideoPacketizer::new(1392, 0, 0); // data-only let au = vec![0u8; 600_000]; let pkts = pk.packetize(&au, FrameType::P, 0); let total = (8 + au.len()).div_ceil(1376); @@ -257,7 +266,7 @@ mod tests { #[test] fn emits_parity_shards() { - let mut pk = VideoPacketizer::new(1392, 20); // pps = 1376, 20% FEC + let mut pk = VideoPacketizer::new(1392, 20, 0); // pps = 1376, 20% FEC let au = vec![0xABu8; 4000]; // 8+4000 = 4008 → 3 data shards (k=3) let pkts = pk.packetize(&au, FrameType::Idr, 0); // m = ceil(3*20/100) = 1 parity shard → 4 packets; wire_pct = 100*1/3 = 33. @@ -284,7 +293,7 @@ mod tests { /// payload AND its NV `flags` byte (the byte Moonlight validates), proving the layout. #[test] fn parity_recovers_full_datagram_incl_flags() { - let mut pk = VideoPacketizer::new(1392, 50); // high pct → plenty of parity + let mut pk = VideoPacketizer::new(1392, 50, 0); // high pct → plenty of parity let au = vec![0x5Au8; 4000]; // k = 3 let pkts = pk.packetize(&au, FrameType::Idr, 0); let k = 3usize;