diff --git a/crates/lumen-host/src/audio.rs b/crates/lumen-host/src/audio.rs index 882d1b4..81fca09 100644 --- a/crates/lumen-host/src/audio.rs +++ b/crates/lumen-host/src/audio.rs @@ -15,6 +15,10 @@ pub trait AudioCapturer: Send { /// Block until the next chunk of interleaved samples is available (variable size). The /// caller reframes into fixed Opus frames. fn next_chunk(&mut self) -> Result>; + + /// Discard any buffered chunks (called when a persistent capturer is reused for a new + /// stream, so the client doesn't hear stale audio captured while idle). Default: no-op. + fn drain(&mut self) {} } /// Open a live capturer for the default sink monitor (system output) via PipeWire. diff --git a/crates/lumen-host/src/audio/linux.rs b/crates/lumen-host/src/audio/linux.rs index 9f0067a..6de725f 100644 --- a/crates/lumen-host/src/audio/linux.rs +++ b/crates/lumen-host/src/audio/linux.rs @@ -39,6 +39,10 @@ impl AudioCapturer for PwAudioCapturer { Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")), } } + + fn drain(&mut self) { + while self.chunks.try_recv().is_ok() {} + } } fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { diff --git a/crates/lumen-host/src/capture.rs b/crates/lumen-host/src/capture.rs index 012a746..704a61f 100644 --- a/crates/lumen-host/src/capture.rs +++ b/crates/lumen-host/src/capture.rs @@ -58,6 +58,12 @@ pub trait Capturer: Send { fn try_latest(&mut self) -> Result> { self.next_frame().map(Some) } + + /// Gate expensive per-frame work so the capturer can be kept alive (reused) between + /// streams without burning CPU. The portal capturer skips the de-pad copy while inactive; + /// the default is a no-op (synthetic sources are produced on demand). Set `true` for the + /// duration of a stream, `false` when it ends. + fn set_active(&self, _active: bool) {} } /// A deterministic moving test pattern (BGRx). Lets M0 exercise the encode → file → diff --git a/crates/lumen-host/src/capture/linux.rs b/crates/lumen-host/src/capture/linux.rs index d3575d1..8c7f18f 100644 --- a/crates/lumen-host/src/capture/linux.rs +++ b/crates/lumen-host/src/capture/linux.rs @@ -18,13 +18,19 @@ use super::{CapturedFrame, Capturer, PixelFormat}; use anyhow::{anyhow, Context, Result}; use std::os::fd::OwnedFd; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, TryRecvError}; +use std::sync::Arc; use std::thread; use std::time::Duration; -/// Live monitor capturer backed by the portal + PipeWire threads. +/// Live monitor capturer backed by the portal + PipeWire threads. Kept alive (reused) across +/// streams — [`set_active`](Capturer::set_active) gates the per-frame de-pad copy so it costs +/// almost nothing between streams while the screencast session stays up (instant reconnect, +/// and no second session to conflict with). pub struct PortalCapturer { frames: Receiver, + active: Arc, } impl PortalCapturer { @@ -48,16 +54,21 @@ impl PortalCapturer { // Frames flow from the pipewire thread over a small bounded channel. let (frame_tx, frame_rx) = sync_channel::(8); + let active = Arc::new(AtomicBool::new(false)); + let active_cb = active.clone(); thread::Builder::new() .name("lumen-pipewire".into()) .spawn(move || { - if let Err(e) = pipewire::pipewire_thread(fd, node_id, frame_tx) { + if let Err(e) = pipewire::pipewire_thread(fd, node_id, frame_tx, active_cb) { tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed"); } }) .context("spawn pipewire thread")?; - Ok(PortalCapturer { frames: frame_rx }) + Ok(PortalCapturer { + frames: frame_rx, + active, + }) } } @@ -86,6 +97,10 @@ impl Capturer for PortalCapturer { } Ok(latest) } + + fn set_active(&self, active: bool) { + self.active.store(active, Ordering::Relaxed); + } } /// The portal handshake: connect ScreenCast, select a single monitor, start, open the @@ -180,7 +195,9 @@ mod pipewire { use pipewire as pw; use pw::{properties::properties, spa}; use std::os::fd::OwnedFd; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::SyncSender; + use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use spa::param::video::{VideoFormat, VideoInfoRaw}; @@ -205,9 +222,16 @@ mod pipewire { /// Negotiated layout (`None` until param_changed, or if unsupported). format: Option, tx: SyncSender, + /// When false (no active stream), skip the de-pad copy — the buffer is just released. + active: Arc, } - pub fn pipewire_thread(fd: OwnedFd, node_id: u32, tx: SyncSender) -> Result<()> { + pub fn pipewire_thread( + fd: OwnedFd, + node_id: u32, + tx: SyncSender, + active: Arc, + ) -> Result<()> { crate::pwinit::ensure_init(); let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?; @@ -220,6 +244,7 @@ mod pipewire { info: VideoInfoRaw::default(), format: None, tx, + active, }; let stream = pw::stream::StreamBox::new( @@ -278,6 +303,10 @@ mod pipewire { let Some(mut buffer) = stream.dequeue_buffer() else { return; }; + // No active stream: release the buffer without the (expensive at 5K) de-pad. + if !ud.active.load(Ordering::Relaxed) { + return; + } let datas = buffer.datas_mut(); if datas.is_empty() { return; diff --git a/crates/lumen-host/src/gamestream/audio.rs b/crates/lumen-host/src/gamestream/audio.rs index 22300e0..27fac9c 100644 --- a/crates/lumen-host/src/gamestream/audio.rs +++ b/crates/lumen-host/src/gamestream/audio.rs @@ -12,7 +12,7 @@ //! left in the clear). Reed-Solomon audio FEC is layered on top in P1.5. use super::AUDIO_PORT; -use crate::audio::{self, CHANNELS, SAMPLE_RATE}; +use crate::audio::{self, AudioCapturer, CHANNELS, SAMPLE_RATE}; use anyhow::{Context, Result}; use cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit}; use opus::{Application, Bitrate, Channels, Encoder}; @@ -31,14 +31,17 @@ const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; const AUDIO_PACKET_TYPE: u8 = 97; const OPUS_BITRATE: i32 = 128_000; +/// Slot for the persistent audio capturer, reused across streams (no leaked PipeWire thread). +pub type AudioCapSlot = Arc>>>; + /// Spawn the audio stream thread (idempotent via `running`). Stops when `running` clears. /// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption. -pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32) { +pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32, audio_cap: AudioCapSlot) { let _ = std::thread::Builder::new() .name("lumen-audio".into()) .spawn(move || { tracing::info!("audio stream starting"); - if let Err(e) = run(&running, &gcm_key, rikeyid) { + if let Err(e) = run(&running, &gcm_key, rikeyid, &audio_cap) { tracing::error!(error = %format!("{e:#}"), "audio stream failed"); } running.store(false, Ordering::SeqCst); @@ -46,7 +49,12 @@ pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32) { }); } -fn run(running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32) -> Result<()> { +fn run( + running: &AtomicBool, + gcm_key: &[u8; 16], + rikeyid: i32, + audio_cap: &std::sync::Mutex>>, +) -> Result<()> { let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?; // The client pings the audio port (~every 500ms) so we learn where to send. sock.set_read_timeout(Some(Duration::from_secs(10)))?; @@ -59,7 +67,26 @@ fn run(running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32) -> Result<()> { .context("connect client audio endpoint")?; tracing::info!(%client, "audio: client endpoint learned"); - let mut cap = audio::open_audio_capture().context("open audio capture")?; + // Reuse the persistent capturer (create on first stream); drain stale buffered audio. + let mut cap = match audio_cap.lock().unwrap().take() { + Some(mut c) => { + c.drain(); + c + } + None => audio::open_audio_capture().context("open audio capture")?, + }; + let result = audio_body(&mut *cap, &sock, gcm_key, rikeyid, running); + *audio_cap.lock().unwrap() = Some(cap); + result +} + +fn audio_body( + cap: &mut dyn AudioCapturer, + sock: &UdpSocket, + gcm_key: &[u8; 16], + rikeyid: i32, + running: &AtomicBool, +) -> Result<()> { // RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte constant, // which the client asserts per stream. let mut enc = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::LowDelay) diff --git a/crates/lumen-host/src/gamestream/mod.rs b/crates/lumen-host/src/gamestream/mod.rs index ed8a814..da1948a 100644 --- a/crates/lumen-host/src/gamestream/mod.rs +++ b/crates/lumen-host/src/gamestream/mod.rs @@ -92,6 +92,13 @@ pub struct AppState { /// Set by the control stream when the client requests an IDR / invalidates reference /// frames (recovery after loss); the video thread forces a keyframe and clears it. pub force_idr: std::sync::Arc, + /// Persistent screen capturer, reused across streams so reconnects don't spawn a second + /// (conflicting) screencast session. The video thread borrows it for the stream's duration + /// and returns it; `set_active` gates its cost while idle. + pub video_cap: std::sync::Arc>>>, + /// Persistent audio capturer, reused across streams (avoids leaking a PipeWire capture + /// thread per reconnect); drained on reuse so no stale audio is sent. + pub audio_cap: std::sync::Arc>>>, } /// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers. @@ -108,6 +115,8 @@ pub fn serve() -> Result<()> { streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), + video_cap: std::sync::Arc::new(std::sync::Mutex::new(None)), + audio_cap: std::sync::Arc::new(std::sync::Mutex::new(None)), }); tracing::info!( hostname = %state.host.hostname, diff --git a/crates/lumen-host/src/gamestream/rtsp.rs b/crates/lumen-host/src/gamestream/rtsp.rs index e5ea433..afce083 100644 --- a/crates/lumen-host/src/gamestream/rtsp.rs +++ b/crates/lumen-host/src/gamestream/rtsp.rs @@ -167,7 +167,12 @@ fn handle_request(req: &Request, state: &AppState) -> String { match cfg { Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => { tracing::info!("RTSP PLAY — starting video stream"); - stream::start(cfg, state.streaming.clone(), state.force_idr.clone()); + stream::start( + cfg, + state.streaming.clone(), + state.force_idr.clone(), + state.video_cap.clone(), + ); } Some(_) => tracing::info!("RTSP PLAY — stream already running"), None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), @@ -178,7 +183,12 @@ fn handle_request(req: &Request, state: &AppState) -> String { if let Some(ls) = launch { if !state.audio_streaming.swap(true, Ordering::SeqCst) { tracing::info!("RTSP PLAY — starting audio stream"); - audio::start(state.audio_streaming.clone(), ls.gcm_key, ls.rikeyid); + audio::start( + state.audio_streaming.clone(), + ls.gcm_key, + ls.rikeyid, + state.audio_cap.clone(), + ); } } response(&req.cseq, &[("Session", "DEADBEEFCAFE;timeout = 90")], None) diff --git a/crates/lumen-host/src/gamestream/stream.rs b/crates/lumen-host/src/gamestream/stream.rs index 4f0aca9..e0bf092 100644 --- a/crates/lumen-host/src/gamestream/stream.rs +++ b/crates/lumen-host/src/gamestream/stream.rs @@ -27,14 +27,24 @@ pub struct StreamConfig { pub min_fec: u8, } +/// Slot for the persistent screen capturer, shared with the control plane and reused across +/// streams so a reconnect doesn't open a second (conflicting) screencast session. +pub type CapturerSlot = Arc>>>; + /// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears. -/// `force_idr` is set by the control stream on a client recovery request. -pub fn start(cfg: StreamConfig, running: Arc, force_idr: Arc) { +/// `force_idr` is set by the control stream on a client recovery request; `video_cap` holds +/// the persistent capturer the thread borrows for the stream's duration. +pub fn start( + cfg: StreamConfig, + running: Arc, + force_idr: Arc, + video_cap: CapturerSlot, +) { let _ = std::thread::Builder::new() .name("lumen-video".into()) .spawn(move || { tracing::info!(?cfg, "video stream starting"); - if let Err(e) = run(cfg, &running, &force_idr) { + if let Err(e) = run(cfg, &running, &force_idr, &video_cap) { tracing::error!(error = %format!("{e:#}"), "video stream failed"); } running.store(false, Ordering::SeqCst); @@ -42,7 +52,12 @@ pub fn start(cfg: StreamConfig, running: Arc, force_idr: Arc Result<()> { +fn run( + cfg: StreamConfig, + running: &AtomicBool, + force_idr: &AtomicBool, + video_cap: &std::sync::Mutex>>, +) -> Result<()> { let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?; // The client pings the video port so we learn where to send; it re-pings until video // flows, so a missed early ping is fine. @@ -59,15 +74,37 @@ fn run(cfg: StreamConfig, running: &AtomicBool, force_idr: &AtomicBool) -> Resul .context("connect client video endpoint")?; tracing::info!(%client, "video: client endpoint learned"); - let use_portal = std::env::var("LUMEN_VIDEO_SOURCE").is_ok_and(|v| v == "portal"); - let mut capturer: Box = if use_portal { - tracing::info!("video source: portal desktop capture"); - capture::open_portal_monitor().context("open portal capturer")? - } else { - tracing::info!("video source: synthetic test pattern"); - Box::new(FastSyntheticCapturer::new(cfg.width, cfg.height)) + // Reuse the persistent capturer (one screencast session → clean reconnect); create it on + // the first stream. Borrow it for this stream and return it on exit. + let mut capturer: Box = match video_cap.lock().unwrap().take() { + Some(c) => { + tracing::info!("video source: reusing capturer"); + c + } + None if std::env::var("LUMEN_VIDEO_SOURCE").is_ok_and(|v| v == "portal") => { + tracing::info!("video source: portal desktop capture"); + capture::open_portal_monitor().context("open portal capturer")? + } + None => { + tracing::info!("video source: synthetic test pattern"); + Box::new(FastSyntheticCapturer::new(cfg.width, cfg.height)) + } }; + capturer.set_active(true); + let result = stream_body(&mut *capturer, &sock, cfg, running, force_idr); + capturer.set_active(false); + *video_cap.lock().unwrap() = Some(capturer); + result +} +/// The encode → packetize → paced-send loop, over a borrowed capturer. +fn stream_body( + capturer: &mut dyn Capturer, + sock: &UdpSocket, + cfg: StreamConfig, + running: &AtomicBool, + force_idr: &AtomicBool, +) -> Result<()> { // The first frame establishes the authoritative size/format for the encoder. let mut frame = capturer.next_frame().context("capture first frame")?; if frame.width != cfg.width || frame.height != cfg.height {