feat: M2 teardown — persistent capturers for clean reconnects

Disconnect/reconnect now works reliably. Previously each stream spawned its own
portal+PipeWire (and PipeWire audio) capture threads and never stopped them, so a
reconnect opened a SECOND screencast session that conflicted with the leaked
first one ("no PipeWire frame within 10s" → black screen on reconnect).

- The screen capturer and audio capturer are now persistent, held in AppState and
  reused across streams (created on the first stream). One screencast session for
  the host's lifetime → no conflict, and instant reconnect (no re-handshake).
  Verified live: 3 stream cycles, 1 create + 2 "reusing capturer", clean every time.
- Capturer::set_active gates the (5K, ~1.3 GB/s) de-pad copy to active streams, so
  the persistent video capturer is nearly free while idle between streams.
- AudioCapturer::drain discards buffered chunks on reuse so the client never hears
  stale audio captured while idle.
- stream.rs / gamestream/audio.rs split into a borrow-the-capturer wrapper + the
  encode/send body, so the capturer is always returned to its slot on exit.

This holds whether the client reconnects via /resume (Moonlight's "running →
play/continue") or a fresh /launch — both re-run RTSP PLAY → a new stream cycle.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 12:35:10 +00:00
parent af4360c930
commit 6de09fd822
8 changed files with 148 additions and 22 deletions
+4
View File
@@ -15,6 +15,10 @@ pub trait AudioCapturer: Send {
/// Block until the next chunk of interleaved samples is available (variable size). The /// Block until the next chunk of interleaved samples is available (variable size). The
/// caller reframes into fixed Opus frames. /// caller reframes into fixed Opus frames.
fn next_chunk(&mut self) -> Result<Vec<f32>>; fn next_chunk(&mut self) -> Result<Vec<f32>>;
/// Discard any buffered chunks (called when a persistent capturer is reused for a new
/// stream, so the client doesn't hear stale audio captured while idle). Default: no-op.
fn drain(&mut self) {}
} }
/// Open a live capturer for the default sink monitor (system output) via PipeWire. /// Open a live capturer for the default sink monitor (system output) via PipeWire.
+4
View File
@@ -39,6 +39,10 @@ impl AudioCapturer for PwAudioCapturer {
Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")), Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")),
} }
} }
fn drain(&mut self) {
while self.chunks.try_recv().is_ok() {}
}
} }
fn pw_thread(tx: std::sync::mpsc::SyncSender<Vec<f32>>) -> Result<()> { fn pw_thread(tx: std::sync::mpsc::SyncSender<Vec<f32>>) -> Result<()> {
+6
View File
@@ -58,6 +58,12 @@ pub trait Capturer: Send {
fn try_latest(&mut self) -> Result<Option<CapturedFrame>> { fn try_latest(&mut self) -> Result<Option<CapturedFrame>> {
self.next_frame().map(Some) self.next_frame().map(Some)
} }
/// Gate expensive per-frame work so the capturer can be kept alive (reused) between
/// streams without burning CPU. The portal capturer skips the de-pad copy while inactive;
/// the default is a no-op (synthetic sources are produced on demand). Set `true` for the
/// duration of a stream, `false` when it ends.
fn set_active(&self, _active: bool) {}
} }
/// A deterministic moving test pattern (BGRx). Lets M0 exercise the encode → file → /// A deterministic moving test pattern (BGRx). Lets M0 exercise the encode → file →
+33 -4
View File
@@ -18,13 +18,19 @@
use super::{CapturedFrame, Capturer, PixelFormat}; use super::{CapturedFrame, Capturer, PixelFormat};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use std::os::fd::OwnedFd; use std::os::fd::OwnedFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, TryRecvError}; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, TryRecvError};
use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
/// Live monitor capturer backed by the portal + PipeWire threads. /// Live monitor capturer backed by the portal + PipeWire threads. Kept alive (reused) across
/// streams — [`set_active`](Capturer::set_active) gates the per-frame de-pad copy so it costs
/// almost nothing between streams while the screencast session stays up (instant reconnect,
/// and no second session to conflict with).
pub struct PortalCapturer { pub struct PortalCapturer {
frames: Receiver<CapturedFrame>, frames: Receiver<CapturedFrame>,
active: Arc<AtomicBool>,
} }
impl PortalCapturer { impl PortalCapturer {
@@ -48,16 +54,21 @@ impl PortalCapturer {
// Frames flow from the pipewire thread over a small bounded channel. // Frames flow from the pipewire thread over a small bounded channel.
let (frame_tx, frame_rx) = sync_channel::<CapturedFrame>(8); let (frame_tx, frame_rx) = sync_channel::<CapturedFrame>(8);
let active = Arc::new(AtomicBool::new(false));
let active_cb = active.clone();
thread::Builder::new() thread::Builder::new()
.name("lumen-pipewire".into()) .name("lumen-pipewire".into())
.spawn(move || { .spawn(move || {
if let Err(e) = pipewire::pipewire_thread(fd, node_id, frame_tx) { if let Err(e) = pipewire::pipewire_thread(fd, node_id, frame_tx, active_cb) {
tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed"); tracing::error!(error = %format!("{e:#}"), "pipewire capture thread failed");
} }
}) })
.context("spawn pipewire thread")?; .context("spawn pipewire thread")?;
Ok(PortalCapturer { frames: frame_rx }) Ok(PortalCapturer {
frames: frame_rx,
active,
})
} }
} }
@@ -86,6 +97,10 @@ impl Capturer for PortalCapturer {
} }
Ok(latest) Ok(latest)
} }
fn set_active(&self, active: bool) {
self.active.store(active, Ordering::Relaxed);
}
} }
/// The portal handshake: connect ScreenCast, select a single monitor, start, open the /// The portal handshake: connect ScreenCast, select a single monitor, start, open the
@@ -180,7 +195,9 @@ mod pipewire {
use pipewire as pw; use pipewire as pw;
use pw::{properties::properties, spa}; use pw::{properties::properties, spa};
use std::os::fd::OwnedFd; use std::os::fd::OwnedFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::SyncSender; use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use spa::param::video::{VideoFormat, VideoInfoRaw}; use spa::param::video::{VideoFormat, VideoInfoRaw};
@@ -205,9 +222,16 @@ mod pipewire {
/// Negotiated layout (`None` until param_changed, or if unsupported). /// Negotiated layout (`None` until param_changed, or if unsupported).
format: Option<PixelFormat>, format: Option<PixelFormat>,
tx: SyncSender<CapturedFrame>, tx: SyncSender<CapturedFrame>,
/// When false (no active stream), skip the de-pad copy — the buffer is just released.
active: Arc<AtomicBool>,
} }
pub fn pipewire_thread(fd: OwnedFd, node_id: u32, tx: SyncSender<CapturedFrame>) -> Result<()> { pub fn pipewire_thread(
fd: OwnedFd,
node_id: u32,
tx: SyncSender<CapturedFrame>,
active: Arc<AtomicBool>,
) -> Result<()> {
crate::pwinit::ensure_init(); crate::pwinit::ensure_init();
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?; let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?;
@@ -220,6 +244,7 @@ mod pipewire {
info: VideoInfoRaw::default(), info: VideoInfoRaw::default(),
format: None, format: None,
tx, tx,
active,
}; };
let stream = pw::stream::StreamBox::new( let stream = pw::stream::StreamBox::new(
@@ -278,6 +303,10 @@ mod pipewire {
let Some(mut buffer) = stream.dequeue_buffer() else { let Some(mut buffer) = stream.dequeue_buffer() else {
return; return;
}; };
// No active stream: release the buffer without the (expensive at 5K) de-pad.
if !ud.active.load(Ordering::Relaxed) {
return;
}
let datas = buffer.datas_mut(); let datas = buffer.datas_mut();
if datas.is_empty() { if datas.is_empty() {
return; return;
+32 -5
View File
@@ -12,7 +12,7 @@
//! left in the clear). Reed-Solomon audio FEC is layered on top in P1.5. //! left in the clear). Reed-Solomon audio FEC is layered on top in P1.5.
use super::AUDIO_PORT; use super::AUDIO_PORT;
use crate::audio::{self, CHANNELS, SAMPLE_RATE}; use crate::audio::{self, AudioCapturer, CHANNELS, SAMPLE_RATE};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit}; use cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit};
use opus::{Application, Bitrate, Channels, Encoder}; use opus::{Application, Bitrate, Channels, Encoder};
@@ -31,14 +31,17 @@ const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000;
const AUDIO_PACKET_TYPE: u8 = 97; const AUDIO_PACKET_TYPE: u8 = 97;
const OPUS_BITRATE: i32 = 128_000; const OPUS_BITRATE: i32 = 128_000;
/// Slot for the persistent audio capturer, reused across streams (no leaked PipeWire thread).
pub type AudioCapSlot = Arc<std::sync::Mutex<Option<Box<dyn AudioCapturer>>>>;
/// Spawn the audio stream thread (idempotent via `running`). Stops when `running` clears. /// Spawn the audio stream thread (idempotent via `running`). Stops when `running` clears.
/// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption. /// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption.
pub fn start(running: Arc<AtomicBool>, gcm_key: [u8; 16], rikeyid: i32) { pub fn start(running: Arc<AtomicBool>, gcm_key: [u8; 16], rikeyid: i32, audio_cap: AudioCapSlot) {
let _ = std::thread::Builder::new() let _ = std::thread::Builder::new()
.name("lumen-audio".into()) .name("lumen-audio".into())
.spawn(move || { .spawn(move || {
tracing::info!("audio stream starting"); tracing::info!("audio stream starting");
if let Err(e) = run(&running, &gcm_key, rikeyid) { if let Err(e) = run(&running, &gcm_key, rikeyid, &audio_cap) {
tracing::error!(error = %format!("{e:#}"), "audio stream failed"); tracing::error!(error = %format!("{e:#}"), "audio stream failed");
} }
running.store(false, Ordering::SeqCst); running.store(false, Ordering::SeqCst);
@@ -46,7 +49,12 @@ pub fn start(running: Arc<AtomicBool>, gcm_key: [u8; 16], rikeyid: i32) {
}); });
} }
fn run(running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32) -> Result<()> { fn run(
running: &AtomicBool,
gcm_key: &[u8; 16],
rikeyid: i32,
audio_cap: &std::sync::Mutex<Option<Box<dyn AudioCapturer>>>,
) -> Result<()> {
let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?; let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?;
// The client pings the audio port (~every 500ms) so we learn where to send. // The client pings the audio port (~every 500ms) so we learn where to send.
sock.set_read_timeout(Some(Duration::from_secs(10)))?; sock.set_read_timeout(Some(Duration::from_secs(10)))?;
@@ -59,7 +67,26 @@ fn run(running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32) -> Result<()> {
.context("connect client audio endpoint")?; .context("connect client audio endpoint")?;
tracing::info!(%client, "audio: client endpoint learned"); tracing::info!(%client, "audio: client endpoint learned");
let mut cap = audio::open_audio_capture().context("open audio capture")?; // Reuse the persistent capturer (create on first stream); drain stale buffered audio.
let mut cap = match audio_cap.lock().unwrap().take() {
Some(mut c) => {
c.drain();
c
}
None => audio::open_audio_capture().context("open audio capture")?,
};
let result = audio_body(&mut *cap, &sock, gcm_key, rikeyid, running);
*audio_cap.lock().unwrap() = Some(cap);
result
}
fn audio_body(
cap: &mut dyn AudioCapturer,
sock: &UdpSocket,
gcm_key: &[u8; 16],
rikeyid: i32,
running: &AtomicBool,
) -> Result<()> {
// RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte constant, // RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte constant,
// which the client asserts per stream. // which the client asserts per stream.
let mut enc = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::LowDelay) let mut enc = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::LowDelay)
+9
View File
@@ -92,6 +92,13 @@ pub struct AppState {
/// Set by the control stream when the client requests an IDR / invalidates reference /// Set by the control stream when the client requests an IDR / invalidates reference
/// frames (recovery after loss); the video thread forces a keyframe and clears it. /// frames (recovery after loss); the video thread forces a keyframe and clears it.
pub force_idr: std::sync::Arc<std::sync::atomic::AtomicBool>, pub force_idr: std::sync::Arc<std::sync::atomic::AtomicBool>,
/// Persistent screen capturer, reused across streams so reconnects don't spawn a second
/// (conflicting) screencast session. The video thread borrows it for the stream's duration
/// and returns it; `set_active` gates its cost while idle.
pub video_cap: std::sync::Arc<std::sync::Mutex<Option<Box<dyn crate::capture::Capturer>>>>,
/// Persistent audio capturer, reused across streams (avoids leaking a PipeWire capture
/// thread per reconnect); drained on reuse so no stale audio is sent.
pub audio_cap: std::sync::Arc<std::sync::Mutex<Option<Box<dyn crate::audio::AudioCapturer>>>>,
} }
/// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers. /// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers.
@@ -108,6 +115,8 @@ pub fn serve() -> Result<()> {
streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
video_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
audio_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
}); });
tracing::info!( tracing::info!(
hostname = %state.host.hostname, hostname = %state.host.hostname,
+12 -2
View File
@@ -167,7 +167,12 @@ fn handle_request(req: &Request, state: &AppState) -> String {
match cfg { match cfg {
Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => { Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => {
tracing::info!("RTSP PLAY — starting video stream"); tracing::info!("RTSP PLAY — starting video stream");
stream::start(cfg, state.streaming.clone(), state.force_idr.clone()); stream::start(
cfg,
state.streaming.clone(),
state.force_idr.clone(),
state.video_cap.clone(),
);
} }
Some(_) => tracing::info!("RTSP PLAY — stream already running"), Some(_) => tracing::info!("RTSP PLAY — stream already running"),
None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"),
@@ -178,7 +183,12 @@ fn handle_request(req: &Request, state: &AppState) -> String {
if let Some(ls) = launch { if let Some(ls) = launch {
if !state.audio_streaming.swap(true, Ordering::SeqCst) { if !state.audio_streaming.swap(true, Ordering::SeqCst) {
tracing::info!("RTSP PLAY — starting audio stream"); tracing::info!("RTSP PLAY — starting audio stream");
audio::start(state.audio_streaming.clone(), ls.gcm_key, ls.rikeyid); audio::start(
state.audio_streaming.clone(),
ls.gcm_key,
ls.rikeyid,
state.audio_cap.clone(),
);
} }
} }
response(&req.cseq, &[("Session", "DEADBEEFCAFE;timeout = 90")], None) response(&req.cseq, &[("Session", "DEADBEEFCAFE;timeout = 90")], None)
+48 -11
View File
@@ -27,14 +27,24 @@ pub struct StreamConfig {
pub min_fec: u8, pub min_fec: u8,
} }
/// Slot for the persistent screen capturer, shared with the control plane and reused across
/// streams so a reconnect doesn't open a second (conflicting) screencast session.
pub type CapturerSlot = Arc<std::sync::Mutex<Option<Box<dyn Capturer>>>>;
/// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears. /// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears.
/// `force_idr` is set by the control stream on a client recovery request. /// `force_idr` is set by the control stream on a client recovery request; `video_cap` holds
pub fn start(cfg: StreamConfig, running: Arc<AtomicBool>, force_idr: Arc<AtomicBool>) { /// the persistent capturer the thread borrows for the stream's duration.
pub fn start(
cfg: StreamConfig,
running: Arc<AtomicBool>,
force_idr: Arc<AtomicBool>,
video_cap: CapturerSlot,
) {
let _ = std::thread::Builder::new() let _ = std::thread::Builder::new()
.name("lumen-video".into()) .name("lumen-video".into())
.spawn(move || { .spawn(move || {
tracing::info!(?cfg, "video stream starting"); tracing::info!(?cfg, "video stream starting");
if let Err(e) = run(cfg, &running, &force_idr) { if let Err(e) = run(cfg, &running, &force_idr, &video_cap) {
tracing::error!(error = %format!("{e:#}"), "video stream failed"); tracing::error!(error = %format!("{e:#}"), "video stream failed");
} }
running.store(false, Ordering::SeqCst); running.store(false, Ordering::SeqCst);
@@ -42,7 +52,12 @@ pub fn start(cfg: StreamConfig, running: Arc<AtomicBool>, force_idr: Arc<AtomicB
}); });
} }
fn run(cfg: StreamConfig, running: &AtomicBool, force_idr: &AtomicBool) -> Result<()> { fn run(
cfg: StreamConfig,
running: &AtomicBool,
force_idr: &AtomicBool,
video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>,
) -> Result<()> {
let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?; let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?;
// The client pings the video port so we learn where to send; it re-pings until video // The client pings the video port so we learn where to send; it re-pings until video
// flows, so a missed early ping is fine. // flows, so a missed early ping is fine.
@@ -59,15 +74,37 @@ fn run(cfg: StreamConfig, running: &AtomicBool, force_idr: &AtomicBool) -> Resul
.context("connect client video endpoint")?; .context("connect client video endpoint")?;
tracing::info!(%client, "video: client endpoint learned"); tracing::info!(%client, "video: client endpoint learned");
let use_portal = std::env::var("LUMEN_VIDEO_SOURCE").is_ok_and(|v| v == "portal"); // Reuse the persistent capturer (one screencast session → clean reconnect); create it on
let mut capturer: Box<dyn Capturer> = if use_portal { // the first stream. Borrow it for this stream and return it on exit.
tracing::info!("video source: portal desktop capture"); let mut capturer: Box<dyn Capturer> = match video_cap.lock().unwrap().take() {
capture::open_portal_monitor().context("open portal capturer")? Some(c) => {
} else { tracing::info!("video source: reusing capturer");
tracing::info!("video source: synthetic test pattern"); c
Box::new(FastSyntheticCapturer::new(cfg.width, cfg.height)) }
None if std::env::var("LUMEN_VIDEO_SOURCE").is_ok_and(|v| v == "portal") => {
tracing::info!("video source: portal desktop capture");
capture::open_portal_monitor().context("open portal capturer")?
}
None => {
tracing::info!("video source: synthetic test pattern");
Box::new(FastSyntheticCapturer::new(cfg.width, cfg.height))
}
}; };
capturer.set_active(true);
let result = stream_body(&mut *capturer, &sock, cfg, running, force_idr);
capturer.set_active(false);
*video_cap.lock().unwrap() = Some(capturer);
result
}
/// The encode → packetize → paced-send loop, over a borrowed capturer.
fn stream_body(
capturer: &mut dyn Capturer,
sock: &UdpSocket,
cfg: StreamConfig,
running: &AtomicBool,
force_idr: &AtomicBool,
) -> Result<()> {
// The first frame establishes the authoritative size/format for the encoder. // The first frame establishes the authoritative size/format for the encoder.
let mut frame = capturer.next_frame().context("capture first frame")?; let mut frame = capturer.next_frame().context("capture first frame")?;
if frame.width != cfg.width || frame.height != cfg.height { if frame.width != cfg.width || frame.height != cfg.height {