diff --git a/crates/punktfunk-host/src/audio.rs b/crates/punktfunk-host/src/audio.rs index 017ddb8..171c203 100644 --- a/crates/punktfunk-host/src/audio.rs +++ b/crates/punktfunk-host/src/audio.rs @@ -16,7 +16,10 @@ pub const CHANNELS: usize = 2; /// falls behind). pub trait AudioCapturer: Send { /// Block until the next chunk of interleaved samples is available (variable size). The - /// caller reframes into fixed Opus frames. + /// caller reframes into fixed Opus frames. An **empty** chunk means "no samples right now" + /// (e.g. a quiet sink that hit the internal idle timeout) — NOT an error: the caller keeps the + /// capturer. `Err` is reserved for a genuinely dead capture thread, signalling the caller to + /// reopen. fn next_chunk(&mut self) -> Result>; /// The interleaved channel count this capturer delivers (what it was opened with). diff --git a/crates/punktfunk-host/src/audio/linux.rs b/crates/punktfunk-host/src/audio/linux.rs index 6c359c6..52a7f8d 100644 --- a/crates/punktfunk-host/src/audio/linux.rs +++ b/crates/punktfunk-host/src/audio/linux.rs @@ -65,7 +65,9 @@ impl AudioCapturer for PwAudioCapturer { fn next_chunk(&mut self) -> Result> { match self.chunks.recv_timeout(Duration::from_secs(5)) { Ok(c) => Ok(c), - Err(RecvTimeoutError::Timeout) => Err(anyhow!("no PipeWire audio within 5s")), + // A quiet sink (paused game, idle desktop) is NOT a failure — return an empty chunk so the + // caller keeps the capturer alive. Only a dead capture thread is an Err (→ caller reopens). + Err(RecvTimeoutError::Timeout) => Ok(Vec::new()), Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")), } } diff --git a/crates/punktfunk-host/src/audio/wasapi_cap.rs b/crates/punktfunk-host/src/audio/wasapi_cap.rs index 0bd4d8c..d26cb8d 100644 --- a/crates/punktfunk-host/src/audio/wasapi_cap.rs +++ b/crates/punktfunk-host/src/audio/wasapi_cap.rs @@ -77,7 +77,9 @@ impl AudioCapturer for WasapiLoopbackCapturer { fn next_chunk(&mut self) -> Result> { match self.chunks.recv_timeout(Duration::from_secs(5)) { Ok(c) => Ok(c), - Err(RecvTimeoutError::Timeout) => Err(anyhow!("no WASAPI audio within 5s")), + // A quiet sink is NOT a failure — return an empty chunk so the caller keeps the capturer + // alive. Only a dead capture thread is an Err (→ caller reopens). Matches the Linux path. + Err(RecvTimeoutError::Timeout) => Ok(Vec::new()), Err(RecvTimeoutError::Disconnected) => Err(anyhow!("wasapi audio thread ended")), } } diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 073ff1a..f0e0306 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1385,7 +1385,7 @@ fn audio_thread(conn: quinn::Connection, stop: Arc, audio_cap: Audio const FRAME_MS: usize = 5; const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; // 240 - let mut capturer = match audio_cap.lock().unwrap().take() { + let capturer = match audio_cap.lock().unwrap().take() { Some(mut c) => { c.drain(); // discard audio captured between sessions c @@ -1417,15 +1417,43 @@ fn audio_thread(conn: quinn::Connection, stop: Arc, audio_cap: Audio let mut acc: Vec = Vec::with_capacity(frame_len * 4); let mut opus_buf = vec![0u8; 1500]; let mut seq: u32 = 0; - let mut capture_dead = false; + // Reopen-with-backoff: hold the capturer in an Option so a mid-session capture-thread death + // (device unplug, daemon restart) reopens instead of muting the rest of a multi-hour session. + // A quiet sink is NOT a death — `next_chunk` returns an empty chunk on its idle timeout — so only + // a genuine thread-ended Err drops the capturer. Reopens are throttled by INJECTOR_REOPEN_BACKOFF. + // The Opus encoder and the monotonic `seq` are kept across reopens (the client sees a gap, not a + // restart). The first open already happened above; failing THAT still ends the session quietly. + let mut capturer = Some(capturer); + let mut last_failed: Option = None; tracing::info!("punktfunk/1 audio streaming (Opus 48 kHz stereo, 5 ms datagrams)"); 'session: while !stop.load(Ordering::SeqCst) { - let chunk = match capturer.next_chunk() { + if capturer.is_none() { + if last_failed.is_some_and(|t| t.elapsed() < INJECTOR_REOPEN_BACKOFF) { + std::thread::sleep(std::time::Duration::from_millis(200)); + continue; + } + match crate::audio::open_audio_capture(CHANNELS as u32) { + Ok(c) => { + tracing::info!("punktfunk/1 audio capture reopened"); + capturer = Some(c); + last_failed = None; + acc.clear(); // drop the partial frame straddling the gap + } + Err(e) => { + tracing::debug!(error = %format!("{e:#}"), "audio reopen failed — will retry"); + last_failed = Some(std::time::Instant::now()); + std::thread::sleep(std::time::Duration::from_millis(200)); + continue; + } + } + } + let chunk = match capturer.as_mut().unwrap().next_chunk() { Ok(c) => c, Err(e) => { - tracing::warn!(error = %format!("{e:#}"), "audio capture ended"); - capture_dead = true; - break; + tracing::warn!(error = %format!("{e:#}"), "audio capture lost — reopening"); + capturer = None; + last_failed = Some(std::time::Instant::now()); + continue; } }; acc.extend_from_slice(&chunk); @@ -1445,10 +1473,9 @@ fn audio_thread(conn: quinn::Connection, stop: Arc, audio_cap: Audio } } } - // Return the live capturer for the next session; a dead one is dropped so the next - // session reopens fresh. - if !capture_dead { - *audio_cap.lock().unwrap() = Some(capturer); + // Return the live capturer for the next session (None if it died and never reopened). + if let Some(c) = capturer { + *audio_cap.lock().unwrap() = Some(c); } } @@ -2052,6 +2079,12 @@ fn virtual_stream( let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let mut next = std::time::Instant::now(); let mut sent: u64 = 0; + // Rebuild-in-place on capture loss: track the live mode (a mode switch updates it) so a rebuild + // targets the CURRENT mode, and cap consecutive rebuilds so a flapping source can't loop the + // client through endless cold restarts. + let mut cur_mode = mode; + const MAX_CAPTURE_REBUILDS: u32 = 5; + let mut capture_rebuilds: u32 = 0; while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { // Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in // place — a different compositor at the SAME client mode — keeping the Session + send thread @@ -2085,8 +2118,12 @@ fn virtual_stream( let rebuilt = (|| -> Result<(Box, Pipeline)> { let mut new_vd = crate::vdisplay::open(sw.compositor)?; - let pipe = - build_pipeline_with_retry(&mut new_vd, mode, bitrate_kbps, bit_depth)?; + let pipe = build_pipeline_with_retry( + &mut new_vd, + cur_mode, + bitrate_kbps, + bit_depth, + )?; Ok((new_vd, pipe)) })(); match rebuilt { @@ -2132,6 +2169,7 @@ fn virtual_stream( match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth) { Ok(next_pipe) => { (capturer, enc, frame, interval) = next_pipe; + cur_mode = new_mode; next = std::time::Instant::now(); } Err(e) => { @@ -2152,8 +2190,35 @@ fn virtual_stream( tracing::debug!("forcing keyframe (client decode recovery)"); enc.request_keyframe(); } - if let Some(f) = capturer.try_latest().context("capture")? { - frame = f; + match capturer.try_latest() { + Ok(Some(f)) => { + frame = f; + capture_rebuilds = 0; // a delivered frame clears the consecutive-loss counter + } + Ok(None) => {} // no new frame (static desktop / mid-rebuild) — repeat the last frame + // The capture source died (PipeWire/compositor thread ended, virtual output gone). Rather + // than tear the whole session down — the client has no reconnect path and would have to + // cold-restart the handshake — rebuild the pipeline IN PLACE at the current mode, exactly + // like a mode/session switch. A genuinely dead source still ends the session once the + // bounded retry is exhausted; the consecutive cap stops a flapping source from looping the + // client through endless cold IDRs. + Err(e) => { + capture_rebuilds += 1; + if capture_rebuilds > MAX_CAPTURE_REBUILDS { + return Err(e).context("capture lost — rebuild attempts exhausted"); + } + tracing::warn!(error = %format!("{e:#}"), rebuild = capture_rebuilds, + "capture lost — rebuilding pipeline in place"); + let (new_cap, new_enc, new_frame, new_interval) = + build_pipeline_with_retry(&mut vd, cur_mode, bitrate_kbps, bit_depth) + .context("rebuild after capture loss")?; + capturer = new_cap; + enc = new_enc; + frame = new_frame; + interval = new_interval; + enc.request_keyframe(); // belt-and-suspenders; a fresh encoder opens on an IDR anyway + next = std::time::Instant::now(); + } } let capture_ns = now_ns(); enc.submit(&frame).context("encoder submit")?;