From c7630ff5dc22f8dc666e66a3b917e4f55120f6b7 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 3 Jul 2026 20:58:06 +0000 Subject: [PATCH] =?UTF-8?q?fix(host/audio):=20mic=20pump=20=E2=80=94=20ope?= =?UTF-8?q?n=20handshake=20on=20Linux=20+=20rapid-death=20backoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Found by a live boot-order test (host started before the user session's PipeWire): PwMicSource::open returned Ok before the daemon connection was attempted, so a PipeWire that wasn't running surfaced as an instantly-dead instance instead of an open failure — and the pump churned open→die→reopen at heartbeat rate (1 Hz "virtual mic ready" log spam) instead of backing off. - PwMicSource::open now has a bring-up handshake (mirrors the Windows backend): ready only after connect + stream connect succeed, so a down daemon is an open ERROR and the pump's backoff engages. - The pump triages deaths: an instance that lived >= 5 s (a one-off daemon restart) reopens immediately with the backoff reset; one that died right after opening counts as a failed open and backs off (2 s → 60 s cap). New pump test rapid_death_backs_off. Re-validated live: host started with PipeWire stopped → throttled "unavailable" warns, zero churn; daemon started → mic node up on the next retry; exactly one pump + one loop thread (no leak). Co-Authored-By: Claude Fable 5 --- crates/punktfunk-host/src/audio.rs | 102 ++++- crates/punktfunk-host/src/audio/linux/mod.rs | 455 ++++++++++--------- 2 files changed, 319 insertions(+), 238 deletions(-) diff --git a/crates/punktfunk-host/src/audio.rs b/crates/punktfunk-host/src/audio.rs index 3d8853b..3523442 100644 --- a/crates/punktfunk-host/src/audio.rs +++ b/crates/punktfunk-host/src/audio.rs @@ -128,6 +128,10 @@ struct PumpTuning { /// An uplink gap longer than this discards the backend's buffered audio before pushing the /// next frame (a recorder must never hear a stale burst from before a mute/session end). stale_gap: std::time::Duration, + /// A backend that dies before living this long counts as a FAILED open for backoff purposes + /// (an open that succeeds but dies instantly — e.g. a flapping daemon — must not churn at + /// heartbeat rate); one that lived longer resets the backoff. + stable_after: std::time::Duration, } const PUMP_TUNING: PumpTuning = PumpTuning { @@ -135,6 +139,7 @@ const PUMP_TUNING: PumpTuning = PumpTuning { backoff_cap: std::time::Duration::from_secs(60), heartbeat: std::time::Duration::from_secs(1), stale_gap: std::time::Duration::from_millis(600), + stable_after: std::time::Duration::from_secs(5), }; /// Host-lifetime virtual-microphone pump: one thread owns the [`VirtualMic`] backend + an Opus @@ -188,6 +193,26 @@ impl MicPump { } } +/// Sleep for `dur` while draining (and dropping) queued frames, so a closed/reopening backend +/// never accumulates a stale backlog and senders never see a wedged queue. Returns `false` when +/// every sender is gone (host shutdown). +#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))] +fn drain_sleep(rx: &std::sync::mpsc::Receiver>, dur: std::time::Duration) -> bool { + use std::sync::mpsc::RecvTimeoutError; + let deadline = std::time::Instant::now() + dur; + loop { + let left = deadline.saturating_duration_since(std::time::Instant::now()); + if left.is_zero() { + return true; + } + match rx.recv_timeout(left.min(std::time::Duration::from_millis(250))) { + Ok(_) => {} // drop frames while closed + Err(RecvTimeoutError::Timeout) => {} // keep waiting + Err(RecvTimeoutError::Disconnected) => return false, // host shutdown + } + } +} + /// The pump loop. `opener` is injected so the tests can run the REAL loop against a mock /// backend; production passes [`open_virtual_mic`]. #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))] @@ -200,9 +225,8 @@ where let mut backoff = tuning.backoff_start; let mut open_fails: u64 = 0; - 'reopen: loop { - // Open phase — eager, from thread start. While closed, keep draining the queue so a - // reopen never replays a backlog of stale frames (and senders never see a wedged queue). + loop { + // Open phase — eager, from thread start. let (mic, mut decoder) = loop { let opened = opener().and_then(|m| { let d = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Stereo) @@ -219,28 +243,20 @@ where tracing::warn!(error = %format!("{e:#}"), attempts = open_fails, "virtual mic unavailable — retrying with backoff"); } - let deadline = Instant::now() + backoff; - loop { - let left = deadline.saturating_duration_since(Instant::now()); - if left.is_zero() { - break; - } - match rx.recv_timeout(left.min(std::time::Duration::from_millis(250))) { - Ok(_) => {} // drop frames while closed - Err(RecvTimeoutError::Timeout) => {} // keep waiting - Err(RecvTimeoutError::Disconnected) => return, // host shutdown - } + if !drain_sleep(&rx, backoff) { + return; } backoff = (backoff * 2).min(tuning.backoff_cap); } } }; - backoff = tuning.backoff_start; - open_fails = 0; tracing::info!("virtual mic ready (host-lifetime)"); - // Drop anything queued while (re)opening — it predates the backend. + // Drop anything queued while (re)opening — it predates the backend. (The backoff does + // NOT reset here: only an instance that proves stable resets it — see the death triage.) while rx.try_recv().is_ok() {} + let opened_at = Instant::now(); + // Pump phase — runs until the backend dies (break) or the host shuts down (return). let mut decode_fails: u64 = 0; let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch let mut last_push = Instant::now(); @@ -258,7 +274,7 @@ where let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len()); if !mic.push(&pcm[..total]) { tracing::warn!("virtual mic backend died — reopening"); - continue 'reopen; + break; } last_push = Instant::now(); decode_fails = 0; @@ -277,7 +293,7 @@ where Err(RecvTimeoutError::Timeout) => { if !mic.alive() { tracing::warn!("virtual mic backend died while idle — reopening"); - continue 'reopen; + break; } } Err(RecvTimeoutError::Disconnected) => { @@ -286,6 +302,21 @@ where } } } + + // Death triage: an instance that lived is a one-off (PipeWire/audio-engine restart) — + // reopen immediately with the backoff reset. One that died right after opening is a + // failed open in disguise (flapping daemon, endpoint racing away): back off like the + // open loop, or the pump would churn open→die→reopen at heartbeat rate. + if opened_at.elapsed() >= tuning.stable_after { + backoff = tuning.backoff_start; + open_fails = 0; + } else { + open_fails += 1; + if !drain_sleep(&rx, backoff) { + return; + } + backoff = (backoff * 2).min(tuning.backoff_cap); + } } } @@ -343,8 +374,10 @@ mod pump_tests { } /// Run the REAL pump loop against mock backends; `fail_first` opens fail before the first - /// success (exercises the eager retry/backoff path). - fn start(fail_first: usize) -> Harness { + /// success (exercises the eager retry/backoff path). `dead_on_arrival` opens every instance + /// pre-killed (exercises the rapid-death churn guard). `stable_after` mirrors the tuning + /// field (ZERO = every death counts as stable → immediate reopen, keeping tests fast). + fn start_tuned(fail_first: usize, dead_on_arrival: bool, stable_after: Duration) -> Harness { let (tx, rx) = std::sync::mpsc::sync_channel::>(MIC_QUEUE_CAP); let opens = Arc::new(AtomicUsize::new(0)); let alive = Arc::new(Mutex::new(None::>)); @@ -361,6 +394,7 @@ mod pump_tests { backoff_cap: Duration::from_millis(40), heartbeat: Duration::from_millis(20), stale_gap: Duration::from_millis(80), + stable_after, }; let join = std::thread::spawn(move || { pump_thread( @@ -370,7 +404,7 @@ mod pump_tests { if n < fail_first { anyhow::bail!("backend not up yet (simulated)"); } - let a = Arc::new(AtomicBool::new(true)); + let a = Arc::new(AtomicBool::new(!dead_on_arrival)); *alive2.lock().unwrap() = Some(a.clone()); Ok(Box::new(MockMic { alive: a, @@ -391,6 +425,10 @@ mod pump_tests { } } + fn start(fail_first: usize) -> Harness { + start_tuned(fail_first, false, Duration::ZERO) + } + fn wait_until(what: &str, mut cond: impl FnMut() -> bool) { for _ in 0..200 { if cond() { @@ -471,6 +509,26 @@ mod pump_tests { h.join.join().unwrap(); } + /// Instances that die immediately after opening must be retried with BACKOFF, not at + /// heartbeat rate — a flapping backend (daemon up but dropping us instantly) would + /// otherwise churn open→die→reopen every heartbeat forever. + #[test] + fn rapid_death_backs_off() { + // Every instance is dead on arrival; stability threshold high so each death counts + // as a failed open. Without the guard: ~1 reopen per heartbeat (20 ms) ≈ 25 opens in + // 500 ms. With backoff 10→20→40 (cap): ≈ 7. + let h = start_tuned(0, true, Duration::from_secs(10)); + std::thread::sleep(Duration::from_millis(500)); + let opens = h.opens.load(Ordering::SeqCst); + assert!(opens >= 2, "must keep retrying (got {opens})"); + assert!( + opens <= 15, + "must back off, not churn per heartbeat (got {opens})" + ); + drop(h.tx); + h.join.join().unwrap(); + } + /// An uplink gap discards buffered-stale audio before the next frame plays. #[test] fn discards_after_gap() { diff --git a/crates/punktfunk-host/src/audio/linux/mod.rs b/crates/punktfunk-host/src/audio/linux/mod.rs index a9fccd7..23a476f 100644 --- a/crates/punktfunk-host/src/audio/linux/mod.rs +++ b/crates/punktfunk-host/src/audio/linux/mod.rs @@ -147,24 +147,32 @@ impl PwMicSource { let (quit_tx, quit_rx) = pipewire::channel::channel::(); let alive = Arc::new(AtomicBool::new(true)); let flush = Arc::new(AtomicBool::new(false)); + // Bring-up handshake (mirrors the Windows backend): a PipeWire that isn't running (host + // service started before the user session) must surface as an open ERROR — engaging the + // pump's backoff — not as an instantly-dead instance the pump would churn on. + let (ready_tx, ready_rx) = sync_channel::>(1); let (alive_t, flush_t) = (alive.clone(), flush.clone()); thread::Builder::new() .name("punktfunk-pw-mic".into()) .spawn(move || { - if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels, flush_t) { + if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels, flush_t, ready_tx) { tracing::error!(error = %format!("{e:#}"), "pipewire virtual-mic thread failed"); } // Whether a clean quit or a daemon death: this instance is done — the pump reopens. alive_t.store(false, Ordering::Release); }) .context("spawn pipewire virtual-mic thread")?; - Ok(PwMicSource { - pcm: pcm_tx, - channels, - quit: quit_tx, - alive, - flush, - }) + match ready_rx.recv_timeout(Duration::from_secs(5)) { + Ok(Ok(())) => Ok(PwMicSource { + pcm: pcm_tx, + channels, + quit: quit_tx, + alive, + flush, + }), + Ok(Err(e)) => Err(e), + Err(_) => Err(anyhow!("pipewire virtual-mic init timed out")), + } } } @@ -224,237 +232,252 @@ fn mic_pw_thread( quit_rx: pipewire::channel::Receiver, channels: u32, flush: Arc, + ready: std::sync::mpsc::SyncSender>, ) -> Result<()> { use pipewire as pw; use pw::{properties::properties, spa}; use spa::param::audio::{AudioFormat, AudioInfoRaw}; use spa::pod::Pod; - crate::pwinit::ensure_init(); - let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?; - let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?; - let core = context - .connect_rc(None) - .context("pw mic connect (is PipeWire running in this session?)")?; + // The PipeWire objects are lifetime-chained (guards borrow the mainloop/core), so setup and + // the blocking run share one frame; the IIFE lets every setup `?` funnel through the ready + // handshake below (mirrors the Windows render_thread). + let result = (|| -> Result<()> { + crate::pwinit::ensure_init(); + let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?; + let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?; + let core = context + .connect_rc(None) + .context("pw mic connect (is PipeWire running in this session?)")?; - let _quit_guard = quit_rx.attach(mainloop.loop_(), { - let mainloop = mainloop.clone(); - move |_| mainloop.quit() - }); - - // Death detection: a core error (the daemon restarted/went away — our remote node no longer - // exists) ends this thread, flipping the owner's `alive` flag so the pump reopens against the - // new daemon. Without this, a PipeWire restart left the loop idling on a dead connection and - // the mic silently broken for the rest of the host's life. - let _core_listener = core - .add_listener_local() - .error({ + let _quit_guard = quit_rx.attach(mainloop.loop_(), { let mainloop = mainloop.clone(); - move |id, _seq, res, message| { - tracing::warn!( - id, - res, - message, - "pipewire core error — virtual mic reopening" - ); - mainloop.quit(); - } - }) - .register(); + move |_| mainloop.quit() + }); - // media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a - // playback stream — without it, Direction::Output + Playback would route to the speakers. - let stream = pw::stream::StreamBox::new( - &core, - "punktfunk-mic", - properties! { - *pw::keys::MEDIA_TYPE => "Audio", - *pw::keys::MEDIA_CLASS => "Audio/Source", - *pw::keys::NODE_NAME => "punktfunk-mic", - *pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone", - // ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks. - *pw::keys::NODE_LATENCY => "240/48000", - // Win WirePlumber's default-source election. This fixes TWO failures (both diagnosed - // live on a Bazzite host, PipeWire 1.4.10): - // 1. Apps that record the *default* input (games, Discord, arecord) get the client's - // mic — the Linux analogue of the Windows host forcing the default recording - // endpoint (audio/windows/audio_control.rs). Without it the source is never the - // default, so default-input recorders hear silence. - // 2. On PipeWire 1.4.x, a *non-default* Audio/Source recorded via `--target` never - // gets a driver assigned — the {source, recorder} group stays orphaned (pw-top: - // QUANT/RATE 0, `driver-node None`), so the RT `process()` callback never fires and - // even an explicitly-selected mic is pure silence. Making it the default source - // keeps WirePlumber driving it, so `process()` runs and audio flows. (PipeWire 1.6 - // drives any recorded source regardless, which is why this only bit the 1.4 host.) - // Reproduced with a faithful standalone copy of this node: no priority.session → silent, - // priority.session set → audio, on the same 1.4.10 daemon. Only overrides WirePlumber's - // *auto* default (a user's explicit default.configured.audio.source still wins); the - // value clears typical real-hardware source priorities (~1000–1900). - "priority.session" => "3000", - }, - ) - .context("pw mic Stream")?; - - let ud = MicUserData { - rx: pcm_rx, - ring: VecDeque::new(), - channels: channels as usize, - primed: false, - flush, - last_run: None, - }; - - let _listener = stream - .add_local_listener_with_user_data(ud) - .state_changed({ - let mainloop = mainloop.clone(); - move |_s, _ud, old, new| { - tracing::info!(?old, ?new, "pipewire virtual-mic stream state"); - // A stream error is unrecoverable for this instance — exit so the pump reopens. - if matches!(new, pw::stream::StreamState::Error(_)) { + // Death detection: a core error (the daemon restarted/went away — our remote node no longer + // exists) ends this thread, flipping the owner's `alive` flag so the pump reopens against the + // new daemon. Without this, a PipeWire restart left the loop idling on a dead connection and + // the mic silently broken for the rest of the host's life. + let _core_listener = core + .add_listener_local() + .error({ + let mainloop = mainloop.clone(); + move |id, _seq, res, message| { + tracing::warn!( + id, + res, + message, + "pipewire core error — virtual mic reopening" + ); mainloop.quit(); } - } - }) - .param_changed(|_s, _ud, id, param| { - let Some(param) = param else { return }; - if id != pw::spa::param::ParamType::Format.as_raw() { - return; - } - let mut info = AudioInfoRaw::default(); - if info.parse(param).is_ok() { - tracing::info!( - format = ?info.format(), - rate = info.rate(), - channels = info.channels(), - "virtual-mic format negotiated" - ); - } - }) - .process(|stream, ud| { - let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - let Some(mut buffer) = stream.dequeue_buffer() else { - return; - }; - // Stale-audio guard, BEFORE pulling new frames: drop the ring when a flush was - // requested (uplink gap — see the pump) or when this callback itself hasn't run - // for a while (the stream idled with no recorder attached; whatever the ring - // holds predates the consumer). A recorder must never hear a burst of old audio. - let now = std::time::Instant::now(); - let idled = ud - .last_run - .is_some_and(|t| now.duration_since(t) > MIC_STALE); - if ud.flush.swap(false, std::sync::atomic::Ordering::AcqRel) || idled { - ud.ring.clear(); - ud.primed = false; - } - ud.last_run = Some(now); - // Pull all newly-decoded PCM into the ring, aging out chunks that sat in the - // channel while nothing consumed them (same staleness rule). - while let Ok((t, frame)) = ud.rx.try_recv() { - if now.duration_since(t) <= MIC_STALE { - ud.ring.extend(frame); + }) + .register(); + + // media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a + // playback stream — without it, Direction::Output + Playback would route to the speakers. + let stream = pw::stream::StreamBox::new( + &core, + "punktfunk-mic", + properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CLASS => "Audio/Source", + *pw::keys::NODE_NAME => "punktfunk-mic", + *pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone", + // ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks. + *pw::keys::NODE_LATENCY => "240/48000", + // Win WirePlumber's default-source election. This fixes TWO failures (both diagnosed + // live on a Bazzite host, PipeWire 1.4.10): + // 1. Apps that record the *default* input (games, Discord, arecord) get the client's + // mic — the Linux analogue of the Windows host forcing the default recording + // endpoint (audio/windows/audio_control.rs). Without it the source is never the + // default, so default-input recorders hear silence. + // 2. On PipeWire 1.4.x, a *non-default* Audio/Source recorded via `--target` never + // gets a driver assigned — the {source, recorder} group stays orphaned (pw-top: + // QUANT/RATE 0, `driver-node None`), so the RT `process()` callback never fires and + // even an explicitly-selected mic is pure silence. Making it the default source + // keeps WirePlumber driving it, so `process()` runs and audio flows. (PipeWire 1.6 + // drives any recorded source regardless, which is why this only bit the 1.4 host.) + // Reproduced with a faithful standalone copy of this node: no priority.session → silent, + // priority.session set → audio, on the same 1.4.10 daemon. Only overrides WirePlumber's + // *auto* default (a user's explicit default.configured.audio.source still wins); the + // value clears typical real-hardware source priorities (~1000–1900). + "priority.session" => "3000", + }, + ) + .context("pw mic Stream")?; + + let ud = MicUserData { + rx: pcm_rx, + ring: VecDeque::new(), + channels: channels as usize, + primed: false, + flush, + last_run: None, + }; + + let _listener = stream + .add_local_listener_with_user_data(ud) + .state_changed({ + let mainloop = mainloop.clone(); + move |_s, _ud, old, new| { + tracing::info!(?old, ?new, "pipewire virtual-mic stream state"); + // A stream error is unrecoverable for this instance — exit so the pump reopens. + if matches!(new, pw::stream::StreamState::Error(_)) { + mainloop.quit(); } } - let stride = 4 * ud.channels; // F32LE interleaved - let datas = buffer.datas_mut(); - if datas.is_empty() { + }) + .param_changed(|_s, _ud, id, param| { + let Some(param) = param else { return }; + if id != pw::spa::param::ParamType::Format.as_raw() { return; } - let data = &mut datas[0]; - let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0); - let want = want_frames * ud.channels; // interleaved samples this quantum needs - static FIRST: std::sync::atomic::AtomicBool = - std::sync::atomic::AtomicBool::new(true); - if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) { + let mut info = AudioInfoRaw::default(); + if info.parse(param).is_ok() { tracing::info!( - quantum_frames = want_frames, - quantum_ms = want_frames as f32 / 48.0, - "virtual-mic consumer connected" + format = ?info.format(), + rate = info.rate(), + channels = info.channels(), + "virtual-mic format negotiated" ); } - - // Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a - // whole *quantum* (often 20–43 ms) from an independent clock. A drain of one - // quantum must not outrun what's buffered, or every call underruns to silence - // (the original ~58% gaps). So prime to ~3 quanta before producing, hold there, - // and re-prime only after a genuine full drain (the client went quiet). The ring - // is capped at a few quanta so latency stays bounded. - let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels); - while ud.ring.len() > target.max(want) + want { - ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack - } - if !ud.primed && ud.ring.len() >= target { - ud.primed = true; - } - - let n_frames = if let Some(slice) = data.data() { - for k in 0..want { - let s = if ud.primed { - ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun - } else { - 0.0 // not yet primed — emit silence while the buffer fills - }; - let off = k * 4; - slice[off..off + 4].copy_from_slice(&s.to_le_bytes()); + }) + .process(|stream, ud| { + let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; + }; + // Stale-audio guard, BEFORE pulling new frames: drop the ring when a flush was + // requested (uplink gap — see the pump) or when this callback itself hasn't run + // for a while (the stream idled with no recorder attached; whatever the ring + // holds predates the consumer). A recorder must never hear a burst of old audio. + let now = std::time::Instant::now(); + let idled = ud + .last_run + .is_some_and(|t| now.duration_since(t) > MIC_STALE); + if ud.flush.swap(false, std::sync::atomic::Ordering::AcqRel) || idled { + ud.ring.clear(); + ud.primed = false; } - want_frames - } else { - 0 - }; - if ud.ring.is_empty() { - ud.primed = false; // fully drained — re-prime before producing again + ud.last_run = Some(now); + // Pull all newly-decoded PCM into the ring, aging out chunks that sat in the + // channel while nothing consumed them (same staleness rule). + while let Ok((t, frame)) = ud.rx.try_recv() { + if now.duration_since(t) <= MIC_STALE { + ud.ring.extend(frame); + } + } + let stride = 4 * ud.channels; // F32LE interleaved + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0); + let want = want_frames * ud.channels; // interleaved samples this quantum needs + static FIRST: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) { + tracing::info!( + quantum_frames = want_frames, + quantum_ms = want_frames as f32 / 48.0, + "virtual-mic consumer connected" + ); + } + + // Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a + // whole *quantum* (often 20–43 ms) from an independent clock. A drain of one + // quantum must not outrun what's buffered, or every call underruns to silence + // (the original ~58% gaps). So prime to ~3 quanta before producing, hold there, + // and re-prime only after a genuine full drain (the client went quiet). The ring + // is capped at a few quanta so latency stays bounded. + let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels); + while ud.ring.len() > target.max(want) + want { + ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack + } + if !ud.primed && ud.ring.len() >= target { + ud.primed = true; + } + + let n_frames = if let Some(slice) = data.data() { + for k in 0..want { + let s = if ud.primed { + ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun + } else { + 0.0 // not yet primed — emit silence while the buffer fills + }; + let off = k * 4; + slice[off..off + 4].copy_from_slice(&s.to_le_bytes()); + } + want_frames + } else { + 0 + }; + if ud.ring.is_empty() { + ud.primed = false; // fully drained — re-prime before producing again + } + let chunk = data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as _; + *chunk.size_mut() = (stride * n_frames) as _; + })); + if outcome.is_err() { + tracing::error!("panic in pipewire virtual-mic callback"); } - let chunk = data.chunk_mut(); - *chunk.offset_mut() = 0; - *chunk.stride_mut() = stride as _; - *chunk.size_mut() = (stride * n_frames) as _; - })); - if outcome.is_err() { - tracing::error!("panic in pipewire virtual-mic callback"); - } - }) - .register() - .context("register virtual-mic stream listener")?; + }) + .register() + .context("register virtual-mic stream listener")?; - let mut info = AudioInfoRaw::new(); - info.set_format(AudioFormat::F32LE); - info.set_rate(SAMPLE_RATE); - info.set_channels(channels); - info.set_position(spa_positions(channels)); - let obj = pw::spa::pod::Object { - type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), - id: pw::spa::param::ParamType::EnumFormat.as_raw(), - properties: info.into(), - }; - let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( - std::io::Cursor::new(Vec::new()), - &pw::spa::pod::Value::Object(obj), - ) - .context("serialize mic format pod")? - .0 - .into_inner(); - let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?]; - - // RT_PROCESS: run the producer callback on PipeWire's realtime data loop, so the source is a - // *synchronous* graph node that joins its consumer's driver group and is actually driven. Without - // it the node is async/main-loop and, in the host's busy multi-stream graph (desktop-audio + - // video capture + the session), never acquires a driver — it stays suspended and its process() - // never fires, so every recorder hears pure silence (the long-standing "Linux host mic broken"). - stream - .connect( - spa::utils::Direction::Output, // we PRODUCE samples (a source) - None, - pw::stream::StreamFlags::AUTOCONNECT - | pw::stream::StreamFlags::MAP_BUFFERS - | pw::stream::StreamFlags::RT_PROCESS, - &mut params, + let mut info = AudioInfoRaw::new(); + info.set_format(AudioFormat::F32LE); + info.set_rate(SAMPLE_RATE); + info.set_channels(channels); + info.set_position(spa_positions(channels)); + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), ) - .context("pw mic stream connect")?; + .context("serialize mic format pod")? + .0 + .into_inner(); + let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?]; - mainloop.run(); - tracing::debug!("pipewire virtual-mic loop exited (source dropped)"); - Ok(()) + // RT_PROCESS: run the producer callback on PipeWire's realtime data loop, so the source is a + // *synchronous* graph node that joins its consumer's driver group and is actually driven. Without + // it the node is async/main-loop and, in the host's busy multi-stream graph (desktop-audio + + // video capture + the session), never acquires a driver — it stays suspended and its process() + // never fires, so every recorder hears pure silence (the long-standing "Linux host mic broken"). + stream + .connect( + spa::utils::Direction::Output, // we PRODUCE samples (a source) + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + ) + .context("pw mic stream connect")?; + + // Setup complete: the daemon connection and stream connect succeeded — report ready, + // then block until quit/death. (A PipeWire that isn't running never reaches this line; + // its connect error surfaces through the handshake as an OPEN failure, so the pump + // backs off instead of churning on instantly-dead instances.) + let _ = ready.send(Ok(())); + mainloop.run(); + tracing::debug!("pipewire virtual-mic loop exited (source dropped)"); + Ok(()) + })(); + if let Err(e) = &result { + let _ = ready.send(Err(anyhow!("{e:#}"))); + } + result } fn pw_thread(