fix(host): self-heal capture loss + audio-thread death mid-session

Two steady-state faults previously bubbled a bare `?` to conn.close / silently
muted the rest of a session. Recover in place instead.

#4 — capture loss (virtual_stream): a mid-session capture stall/disconnect
(`try_latest` Err: PipeWire/compositor thread ended, virtual output gone) ended
the whole session — and the native client has no reconnect path, so it had to
cold-restart the handshake. Now rebuild the pipeline IN PLACE at the current
mode via build_pipeline_with_retry (same primitive the mode/session switch uses),
force a keyframe, and only propagate when the bounded retry is exhausted. A
consecutive-rebuild cap stops a flapping source from looping the client through
endless cold IDRs. Track the live mode so a rebuild after a mode switch targets
the right mode (also fixes the session-switch rebuild using the stale mode).

#3 — native audio thread (audio_thread): broke the loop on ANY next_chunk Err,
spawned once per session and never restarted, so a transient 5 s quiet-sink
timeout permanently muted a multi-hour session. Make a quiet sink return an empty
chunk (not an Err) in both backends so only a genuinely dead capture thread is an
Err, and reopen-with-backoff (INJECTOR_REOPEN_BACKOFF) on death, keeping the Opus
encoder + monotonic seq. Documents the next_chunk contract; also makes the
GameStream audio sender survive quiet sinks for free.

Resolves reliability backlog #3 and #4.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 11:04:42 +00:00
parent e8619c2362
commit 55d5a4278f
4 changed files with 89 additions and 17 deletions
+4 -1
View File
@@ -16,7 +16,10 @@ pub const CHANNELS: usize = 2;
/// falls behind). /// falls behind).
pub trait AudioCapturer: Send { pub trait AudioCapturer: Send {
/// Block until the next chunk of interleaved samples is available (variable size). The /// Block until the next chunk of interleaved samples is available (variable size). The
/// caller reframes into fixed Opus frames. /// caller reframes into fixed Opus frames. An **empty** chunk means "no samples right now"
/// (e.g. a quiet sink that hit the internal idle timeout) — NOT an error: the caller keeps the
/// capturer. `Err` is reserved for a genuinely dead capture thread, signalling the caller to
/// reopen.
fn next_chunk(&mut self) -> Result<Vec<f32>>; fn next_chunk(&mut self) -> Result<Vec<f32>>;
/// The interleaved channel count this capturer delivers (what it was opened with). /// The interleaved channel count this capturer delivers (what it was opened with).
+3 -1
View File
@@ -65,7 +65,9 @@ impl AudioCapturer for PwAudioCapturer {
fn next_chunk(&mut self) -> Result<Vec<f32>> { fn next_chunk(&mut self) -> Result<Vec<f32>> {
match self.chunks.recv_timeout(Duration::from_secs(5)) { match self.chunks.recv_timeout(Duration::from_secs(5)) {
Ok(c) => Ok(c), Ok(c) => Ok(c),
Err(RecvTimeoutError::Timeout) => Err(anyhow!("no PipeWire audio within 5s")), // A quiet sink (paused game, idle desktop) is NOT a failure — return an empty chunk so the
// caller keeps the capturer alive. Only a dead capture thread is an Err (→ caller reopens).
Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")), Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")),
} }
} }
@@ -77,7 +77,9 @@ impl AudioCapturer for WasapiLoopbackCapturer {
fn next_chunk(&mut self) -> Result<Vec<f32>> { fn next_chunk(&mut self) -> Result<Vec<f32>> {
match self.chunks.recv_timeout(Duration::from_secs(5)) { match self.chunks.recv_timeout(Duration::from_secs(5)) {
Ok(c) => Ok(c), Ok(c) => Ok(c),
Err(RecvTimeoutError::Timeout) => Err(anyhow!("no WASAPI audio within 5s")), // A quiet sink is NOT a failure — return an empty chunk so the caller keeps the capturer
// alive. Only a dead capture thread is an Err (→ caller reopens). Matches the Linux path.
Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
Err(RecvTimeoutError::Disconnected) => Err(anyhow!("wasapi audio thread ended")), Err(RecvTimeoutError::Disconnected) => Err(anyhow!("wasapi audio thread ended")),
} }
} }
+79 -14
View File
@@ -1385,7 +1385,7 @@ fn audio_thread(conn: quinn::Connection, stop: Arc<AtomicBool>, audio_cap: Audio
const FRAME_MS: usize = 5; const FRAME_MS: usize = 5;
const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; // 240 const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; // 240
let mut capturer = match audio_cap.lock().unwrap().take() { let capturer = match audio_cap.lock().unwrap().take() {
Some(mut c) => { Some(mut c) => {
c.drain(); // discard audio captured between sessions c.drain(); // discard audio captured between sessions
c c
@@ -1417,15 +1417,43 @@ fn audio_thread(conn: quinn::Connection, stop: Arc<AtomicBool>, audio_cap: Audio
let mut acc: Vec<f32> = Vec::with_capacity(frame_len * 4); let mut acc: Vec<f32> = Vec::with_capacity(frame_len * 4);
let mut opus_buf = vec![0u8; 1500]; let mut opus_buf = vec![0u8; 1500];
let mut seq: u32 = 0; let mut seq: u32 = 0;
let mut capture_dead = false; // Reopen-with-backoff: hold the capturer in an Option so a mid-session capture-thread death
// (device unplug, daemon restart) reopens instead of muting the rest of a multi-hour session.
// A quiet sink is NOT a death — `next_chunk` returns an empty chunk on its idle timeout — so only
// a genuine thread-ended Err drops the capturer. Reopens are throttled by INJECTOR_REOPEN_BACKOFF.
// The Opus encoder and the monotonic `seq` are kept across reopens (the client sees a gap, not a
// restart). The first open already happened above; failing THAT still ends the session quietly.
let mut capturer = Some(capturer);
let mut last_failed: Option<std::time::Instant> = None;
tracing::info!("punktfunk/1 audio streaming (Opus 48 kHz stereo, 5 ms datagrams)"); tracing::info!("punktfunk/1 audio streaming (Opus 48 kHz stereo, 5 ms datagrams)");
'session: while !stop.load(Ordering::SeqCst) { 'session: while !stop.load(Ordering::SeqCst) {
let chunk = match capturer.next_chunk() { if capturer.is_none() {
if last_failed.is_some_and(|t| t.elapsed() < INJECTOR_REOPEN_BACKOFF) {
std::thread::sleep(std::time::Duration::from_millis(200));
continue;
}
match crate::audio::open_audio_capture(CHANNELS as u32) {
Ok(c) => {
tracing::info!("punktfunk/1 audio capture reopened");
capturer = Some(c);
last_failed = None;
acc.clear(); // drop the partial frame straddling the gap
}
Err(e) => {
tracing::debug!(error = %format!("{e:#}"), "audio reopen failed — will retry");
last_failed = Some(std::time::Instant::now());
std::thread::sleep(std::time::Duration::from_millis(200));
continue;
}
}
}
let chunk = match capturer.as_mut().unwrap().next_chunk() {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
tracing::warn!(error = %format!("{e:#}"), "audio capture ended"); tracing::warn!(error = %format!("{e:#}"), "audio capture lost — reopening");
capture_dead = true; capturer = None;
break; last_failed = Some(std::time::Instant::now());
continue;
} }
}; };
acc.extend_from_slice(&chunk); acc.extend_from_slice(&chunk);
@@ -1445,10 +1473,9 @@ fn audio_thread(conn: quinn::Connection, stop: Arc<AtomicBool>, audio_cap: Audio
} }
} }
} }
// Return the live capturer for the next session; a dead one is dropped so the next // Return the live capturer for the next session (None if it died and never reopened).
// session reopens fresh. if let Some(c) = capturer {
if !capture_dead { *audio_cap.lock().unwrap() = Some(c);
*audio_cap.lock().unwrap() = Some(capturer);
} }
} }
@@ -2052,6 +2079,12 @@ fn virtual_stream(
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
let mut next = std::time::Instant::now(); let mut next = std::time::Instant::now();
let mut sent: u64 = 0; let mut sent: u64 = 0;
// Rebuild-in-place on capture loss: track the live mode (a mode switch updates it) so a rebuild
// targets the CURRENT mode, and cap consecutive rebuilds so a flapping source can't loop the
// client through endless cold restarts.
let mut cur_mode = mode;
const MAX_CAPTURE_REBUILDS: u32 = 5;
let mut capture_rebuilds: u32 = 0;
while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
// Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in // Mid-stream session switch (the box flipped Gaming↔Desktop): rebuild the WHOLE backend in
// place — a different compositor at the SAME client mode — keeping the Session + send thread // place — a different compositor at the SAME client mode — keeping the Session + send thread
@@ -2085,8 +2118,12 @@ fn virtual_stream(
let rebuilt = let rebuilt =
(|| -> Result<(Box<dyn crate::vdisplay::VirtualDisplay>, Pipeline)> { (|| -> Result<(Box<dyn crate::vdisplay::VirtualDisplay>, Pipeline)> {
let mut new_vd = crate::vdisplay::open(sw.compositor)?; let mut new_vd = crate::vdisplay::open(sw.compositor)?;
let pipe = let pipe = build_pipeline_with_retry(
build_pipeline_with_retry(&mut new_vd, mode, bitrate_kbps, bit_depth)?; &mut new_vd,
cur_mode,
bitrate_kbps,
bit_depth,
)?;
Ok((new_vd, pipe)) Ok((new_vd, pipe))
})(); })();
match rebuilt { match rebuilt {
@@ -2132,6 +2169,7 @@ fn virtual_stream(
match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth) { match build_pipeline(&mut vd, new_mode, bitrate_kbps, bit_depth) {
Ok(next_pipe) => { Ok(next_pipe) => {
(capturer, enc, frame, interval) = next_pipe; (capturer, enc, frame, interval) = next_pipe;
cur_mode = new_mode;
next = std::time::Instant::now(); next = std::time::Instant::now();
} }
Err(e) => { Err(e) => {
@@ -2152,8 +2190,35 @@ fn virtual_stream(
tracing::debug!("forcing keyframe (client decode recovery)"); tracing::debug!("forcing keyframe (client decode recovery)");
enc.request_keyframe(); enc.request_keyframe();
} }
if let Some(f) = capturer.try_latest().context("capture")? { match capturer.try_latest() {
frame = f; Ok(Some(f)) => {
frame = f;
capture_rebuilds = 0; // a delivered frame clears the consecutive-loss counter
}
Ok(None) => {} // no new frame (static desktop / mid-rebuild) — repeat the last frame
// The capture source died (PipeWire/compositor thread ended, virtual output gone). Rather
// than tear the whole session down — the client has no reconnect path and would have to
// cold-restart the handshake — rebuild the pipeline IN PLACE at the current mode, exactly
// like a mode/session switch. A genuinely dead source still ends the session once the
// bounded retry is exhausted; the consecutive cap stops a flapping source from looping the
// client through endless cold IDRs.
Err(e) => {
capture_rebuilds += 1;
if capture_rebuilds > MAX_CAPTURE_REBUILDS {
return Err(e).context("capture lost — rebuild attempts exhausted");
}
tracing::warn!(error = %format!("{e:#}"), rebuild = capture_rebuilds,
"capture lost — rebuilding pipeline in place");
let (new_cap, new_enc, new_frame, new_interval) =
build_pipeline_with_retry(&mut vd, cur_mode, bitrate_kbps, bit_depth)
.context("rebuild after capture loss")?;
capturer = new_cap;
enc = new_enc;
frame = new_frame;
interval = new_interval;
enc.request_keyframe(); // belt-and-suspenders; a fresh encoder opens on an IDR anyway
next = std::time::Instant::now();
}
} }
let capture_ns = now_ns(); let capture_ns = now_ns();
enc.submit(&frame).context("encoder submit")?; enc.submit(&frame).context("encoder submit")?;