fix(host/audio): mic pump — open handshake on Linux + rapid-death backoff
apple / swift (push) Successful in 1m8s
apple / screenshots (push) Successful in 5m18s
android / android (push) Successful in 3m21s
windows-host / package (push) Successful in 6m58s
ci / rust (push) Successful in 1m58s
ci / web (push) Successful in 50s
ci / docs-site (push) Successful in 1m1s
ci / bench (push) Successful in 4m49s
deb / build-publish (push) Successful in 4m37s
decky / build-publish (push) Successful in 14s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 6s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 5s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 2m17s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m59s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 9m34s

Found by a live boot-order test (host started before the user session's
PipeWire): PwMicSource::open returned Ok before the daemon connection was
attempted, so a PipeWire that wasn't running surfaced as an instantly-dead
instance instead of an open failure — and the pump churned
open→die→reopen at heartbeat rate (1 Hz "virtual mic ready" log spam)
instead of backing off.

- PwMicSource::open now has a bring-up handshake (mirrors the Windows
  backend): ready only after connect + stream connect succeed, so a
  down daemon is an open ERROR and the pump's backoff engages.
- The pump triages deaths: an instance that lived >= 5 s (a one-off
  daemon restart) reopens immediately with the backoff reset; one that
  died right after opening counts as a failed open and backs off
  (2 s → 60 s cap). New pump test rapid_death_backs_off.

Re-validated live: host started with PipeWire stopped → throttled
"unavailable" warns, zero churn; daemon started → mic node up on the
next retry; exactly one pump + one loop thread (no leak).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-03 20:58:06 +00:00
parent 2c7ded0f3c
commit c7630ff5dc
2 changed files with 319 additions and 238 deletions
+80 -22
View File
@@ -128,6 +128,10 @@ struct PumpTuning {
/// An uplink gap longer than this discards the backend's buffered audio before pushing the /// An uplink gap longer than this discards the backend's buffered audio before pushing the
/// next frame (a recorder must never hear a stale burst from before a mute/session end). /// next frame (a recorder must never hear a stale burst from before a mute/session end).
stale_gap: std::time::Duration, stale_gap: std::time::Duration,
/// A backend that dies before living this long counts as a FAILED open for backoff purposes
/// (an open that succeeds but dies instantly — e.g. a flapping daemon — must not churn at
/// heartbeat rate); one that lived longer resets the backoff.
stable_after: std::time::Duration,
} }
const PUMP_TUNING: PumpTuning = PumpTuning { const PUMP_TUNING: PumpTuning = PumpTuning {
@@ -135,6 +139,7 @@ const PUMP_TUNING: PumpTuning = PumpTuning {
backoff_cap: std::time::Duration::from_secs(60), backoff_cap: std::time::Duration::from_secs(60),
heartbeat: std::time::Duration::from_secs(1), heartbeat: std::time::Duration::from_secs(1),
stale_gap: std::time::Duration::from_millis(600), stale_gap: std::time::Duration::from_millis(600),
stable_after: std::time::Duration::from_secs(5),
}; };
/// Host-lifetime virtual-microphone pump: one thread owns the [`VirtualMic`] backend + an Opus /// Host-lifetime virtual-microphone pump: one thread owns the [`VirtualMic`] backend + an Opus
@@ -188,6 +193,26 @@ impl MicPump {
} }
} }
/// Sleep for `dur` while draining (and dropping) queued frames, so a closed/reopening backend
/// never accumulates a stale backlog and senders never see a wedged queue. Returns `false` when
/// every sender is gone (host shutdown).
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))]
fn drain_sleep(rx: &std::sync::mpsc::Receiver<Vec<u8>>, dur: std::time::Duration) -> bool {
use std::sync::mpsc::RecvTimeoutError;
let deadline = std::time::Instant::now() + dur;
loop {
let left = deadline.saturating_duration_since(std::time::Instant::now());
if left.is_zero() {
return true;
}
match rx.recv_timeout(left.min(std::time::Duration::from_millis(250))) {
Ok(_) => {} // drop frames while closed
Err(RecvTimeoutError::Timeout) => {} // keep waiting
Err(RecvTimeoutError::Disconnected) => return false, // host shutdown
}
}
}
/// The pump loop. `opener` is injected so the tests can run the REAL loop against a mock /// The pump loop. `opener` is injected so the tests can run the REAL loop against a mock
/// backend; production passes [`open_virtual_mic`]. /// backend; production passes [`open_virtual_mic`].
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))] #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))]
@@ -200,9 +225,8 @@ where
let mut backoff = tuning.backoff_start; let mut backoff = tuning.backoff_start;
let mut open_fails: u64 = 0; let mut open_fails: u64 = 0;
'reopen: loop { loop {
// Open phase — eager, from thread start. While closed, keep draining the queue so a // Open phase — eager, from thread start.
// reopen never replays a backlog of stale frames (and senders never see a wedged queue).
let (mic, mut decoder) = loop { let (mic, mut decoder) = loop {
let opened = opener().and_then(|m| { let opened = opener().and_then(|m| {
let d = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Stereo) let d = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Stereo)
@@ -219,28 +243,20 @@ where
tracing::warn!(error = %format!("{e:#}"), attempts = open_fails, tracing::warn!(error = %format!("{e:#}"), attempts = open_fails,
"virtual mic unavailable — retrying with backoff"); "virtual mic unavailable — retrying with backoff");
} }
let deadline = Instant::now() + backoff; if !drain_sleep(&rx, backoff) {
loop { return;
let left = deadline.saturating_duration_since(Instant::now());
if left.is_zero() {
break;
}
match rx.recv_timeout(left.min(std::time::Duration::from_millis(250))) {
Ok(_) => {} // drop frames while closed
Err(RecvTimeoutError::Timeout) => {} // keep waiting
Err(RecvTimeoutError::Disconnected) => return, // host shutdown
}
} }
backoff = (backoff * 2).min(tuning.backoff_cap); backoff = (backoff * 2).min(tuning.backoff_cap);
} }
} }
}; };
backoff = tuning.backoff_start;
open_fails = 0;
tracing::info!("virtual mic ready (host-lifetime)"); tracing::info!("virtual mic ready (host-lifetime)");
// Drop anything queued while (re)opening — it predates the backend. // Drop anything queued while (re)opening — it predates the backend. (The backoff does
// NOT reset here: only an instance that proves stable resets it — see the death triage.)
while rx.try_recv().is_ok() {} while rx.try_recv().is_ok() {}
let opened_at = Instant::now();
// Pump phase — runs until the backend dies (break) or the host shuts down (return).
let mut decode_fails: u64 = 0; let mut decode_fails: u64 = 0;
let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch
let mut last_push = Instant::now(); let mut last_push = Instant::now();
@@ -258,7 +274,7 @@ where
let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len()); let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len());
if !mic.push(&pcm[..total]) { if !mic.push(&pcm[..total]) {
tracing::warn!("virtual mic backend died — reopening"); tracing::warn!("virtual mic backend died — reopening");
continue 'reopen; break;
} }
last_push = Instant::now(); last_push = Instant::now();
decode_fails = 0; decode_fails = 0;
@@ -277,7 +293,7 @@ where
Err(RecvTimeoutError::Timeout) => { Err(RecvTimeoutError::Timeout) => {
if !mic.alive() { if !mic.alive() {
tracing::warn!("virtual mic backend died while idle — reopening"); tracing::warn!("virtual mic backend died while idle — reopening");
continue 'reopen; break;
} }
} }
Err(RecvTimeoutError::Disconnected) => { Err(RecvTimeoutError::Disconnected) => {
@@ -286,6 +302,21 @@ where
} }
} }
} }
// Death triage: an instance that lived is a one-off (PipeWire/audio-engine restart) —
// reopen immediately with the backoff reset. One that died right after opening is a
// failed open in disguise (flapping daemon, endpoint racing away): back off like the
// open loop, or the pump would churn open→die→reopen at heartbeat rate.
if opened_at.elapsed() >= tuning.stable_after {
backoff = tuning.backoff_start;
open_fails = 0;
} else {
open_fails += 1;
if !drain_sleep(&rx, backoff) {
return;
}
backoff = (backoff * 2).min(tuning.backoff_cap);
}
} }
} }
@@ -343,8 +374,10 @@ mod pump_tests {
} }
/// Run the REAL pump loop against mock backends; `fail_first` opens fail before the first /// Run the REAL pump loop against mock backends; `fail_first` opens fail before the first
/// success (exercises the eager retry/backoff path). /// success (exercises the eager retry/backoff path). `dead_on_arrival` opens every instance
fn start(fail_first: usize) -> Harness { /// pre-killed (exercises the rapid-death churn guard). `stable_after` mirrors the tuning
/// field (ZERO = every death counts as stable → immediate reopen, keeping tests fast).
fn start_tuned(fail_first: usize, dead_on_arrival: bool, stable_after: Duration) -> Harness {
let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(MIC_QUEUE_CAP); let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(MIC_QUEUE_CAP);
let opens = Arc::new(AtomicUsize::new(0)); let opens = Arc::new(AtomicUsize::new(0));
let alive = Arc::new(Mutex::new(None::<Arc<AtomicBool>>)); let alive = Arc::new(Mutex::new(None::<Arc<AtomicBool>>));
@@ -361,6 +394,7 @@ mod pump_tests {
backoff_cap: Duration::from_millis(40), backoff_cap: Duration::from_millis(40),
heartbeat: Duration::from_millis(20), heartbeat: Duration::from_millis(20),
stale_gap: Duration::from_millis(80), stale_gap: Duration::from_millis(80),
stable_after,
}; };
let join = std::thread::spawn(move || { let join = std::thread::spawn(move || {
pump_thread( pump_thread(
@@ -370,7 +404,7 @@ mod pump_tests {
if n < fail_first { if n < fail_first {
anyhow::bail!("backend not up yet (simulated)"); anyhow::bail!("backend not up yet (simulated)");
} }
let a = Arc::new(AtomicBool::new(true)); let a = Arc::new(AtomicBool::new(!dead_on_arrival));
*alive2.lock().unwrap() = Some(a.clone()); *alive2.lock().unwrap() = Some(a.clone());
Ok(Box::new(MockMic { Ok(Box::new(MockMic {
alive: a, alive: a,
@@ -391,6 +425,10 @@ mod pump_tests {
} }
} }
fn start(fail_first: usize) -> Harness {
start_tuned(fail_first, false, Duration::ZERO)
}
fn wait_until(what: &str, mut cond: impl FnMut() -> bool) { fn wait_until(what: &str, mut cond: impl FnMut() -> bool) {
for _ in 0..200 { for _ in 0..200 {
if cond() { if cond() {
@@ -471,6 +509,26 @@ mod pump_tests {
h.join.join().unwrap(); h.join.join().unwrap();
} }
/// Instances that die immediately after opening must be retried with BACKOFF, not at
/// heartbeat rate — a flapping backend (daemon up but dropping us instantly) would
/// otherwise churn open→die→reopen every heartbeat forever.
#[test]
fn rapid_death_backs_off() {
// Every instance is dead on arrival; stability threshold high so each death counts
// as a failed open. Without the guard: ~1 reopen per heartbeat (20 ms) ≈ 25 opens in
// 500 ms. With backoff 10→20→40 (cap): ≈ 7.
let h = start_tuned(0, true, Duration::from_secs(10));
std::thread::sleep(Duration::from_millis(500));
let opens = h.opens.load(Ordering::SeqCst);
assert!(opens >= 2, "must keep retrying (got {opens})");
assert!(
opens <= 15,
"must back off, not churn per heartbeat (got {opens})"
);
drop(h.tx);
h.join.join().unwrap();
}
/// An uplink gap discards buffered-stale audio before the next frame plays. /// An uplink gap discards buffered-stale audio before the next frame plays.
#[test] #[test]
fn discards_after_gap() { fn discards_after_gap() {
+239 -216
View File
@@ -147,24 +147,32 @@ impl PwMicSource {
let (quit_tx, quit_rx) = pipewire::channel::channel::<Terminate>(); let (quit_tx, quit_rx) = pipewire::channel::channel::<Terminate>();
let alive = Arc::new(AtomicBool::new(true)); let alive = Arc::new(AtomicBool::new(true));
let flush = Arc::new(AtomicBool::new(false)); let flush = Arc::new(AtomicBool::new(false));
// Bring-up handshake (mirrors the Windows backend): a PipeWire that isn't running (host
// service started before the user session) must surface as an open ERROR — engaging the
// pump's backoff — not as an instantly-dead instance the pump would churn on.
let (ready_tx, ready_rx) = sync_channel::<Result<()>>(1);
let (alive_t, flush_t) = (alive.clone(), flush.clone()); let (alive_t, flush_t) = (alive.clone(), flush.clone());
thread::Builder::new() thread::Builder::new()
.name("punktfunk-pw-mic".into()) .name("punktfunk-pw-mic".into())
.spawn(move || { .spawn(move || {
if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels, flush_t) { if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels, flush_t, ready_tx) {
tracing::error!(error = %format!("{e:#}"), "pipewire virtual-mic thread failed"); tracing::error!(error = %format!("{e:#}"), "pipewire virtual-mic thread failed");
} }
// Whether a clean quit or a daemon death: this instance is done — the pump reopens. // Whether a clean quit or a daemon death: this instance is done — the pump reopens.
alive_t.store(false, Ordering::Release); alive_t.store(false, Ordering::Release);
}) })
.context("spawn pipewire virtual-mic thread")?; .context("spawn pipewire virtual-mic thread")?;
Ok(PwMicSource { match ready_rx.recv_timeout(Duration::from_secs(5)) {
pcm: pcm_tx, Ok(Ok(())) => Ok(PwMicSource {
channels, pcm: pcm_tx,
quit: quit_tx, channels,
alive, quit: quit_tx,
flush, alive,
}) flush,
}),
Ok(Err(e)) => Err(e),
Err(_) => Err(anyhow!("pipewire virtual-mic init timed out")),
}
} }
} }
@@ -224,237 +232,252 @@ fn mic_pw_thread(
quit_rx: pipewire::channel::Receiver<Terminate>, quit_rx: pipewire::channel::Receiver<Terminate>,
channels: u32, channels: u32,
flush: Arc<AtomicBool>, flush: Arc<AtomicBool>,
ready: std::sync::mpsc::SyncSender<Result<()>>,
) -> Result<()> { ) -> Result<()> {
use pipewire as pw; use pipewire as pw;
use pw::{properties::properties, spa}; use pw::{properties::properties, spa};
use spa::param::audio::{AudioFormat, AudioInfoRaw}; use spa::param::audio::{AudioFormat, AudioInfoRaw};
use spa::pod::Pod; use spa::pod::Pod;
crate::pwinit::ensure_init(); // The PipeWire objects are lifetime-chained (guards borrow the mainloop/core), so setup and
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?; // the blocking run share one frame; the IIFE lets every setup `?` funnel through the ready
let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?; // handshake below (mirrors the Windows render_thread).
let core = context let result = (|| -> Result<()> {
.connect_rc(None) crate::pwinit::ensure_init();
.context("pw mic connect (is PipeWire running in this session?)")?; let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?;
let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?;
let core = context
.connect_rc(None)
.context("pw mic connect (is PipeWire running in this session?)")?;
let _quit_guard = quit_rx.attach(mainloop.loop_(), { let _quit_guard = quit_rx.attach(mainloop.loop_(), {
let mainloop = mainloop.clone();
move |_| mainloop.quit()
});
// Death detection: a core error (the daemon restarted/went away — our remote node no longer
// exists) ends this thread, flipping the owner's `alive` flag so the pump reopens against the
// new daemon. Without this, a PipeWire restart left the loop idling on a dead connection and
// the mic silently broken for the rest of the host's life.
let _core_listener = core
.add_listener_local()
.error({
let mainloop = mainloop.clone(); let mainloop = mainloop.clone();
move |id, _seq, res, message| { move |_| mainloop.quit()
tracing::warn!( });
id,
res,
message,
"pipewire core error — virtual mic reopening"
);
mainloop.quit();
}
})
.register();
// media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a // Death detection: a core error (the daemon restarted/went away — our remote node no longer
// playback stream — without it, Direction::Output + Playback would route to the speakers. // exists) ends this thread, flipping the owner's `alive` flag so the pump reopens against the
let stream = pw::stream::StreamBox::new( // new daemon. Without this, a PipeWire restart left the loop idling on a dead connection and
&core, // the mic silently broken for the rest of the host's life.
"punktfunk-mic", let _core_listener = core
properties! { .add_listener_local()
*pw::keys::MEDIA_TYPE => "Audio", .error({
*pw::keys::MEDIA_CLASS => "Audio/Source", let mainloop = mainloop.clone();
*pw::keys::NODE_NAME => "punktfunk-mic", move |id, _seq, res, message| {
*pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone", tracing::warn!(
// ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks. id,
*pw::keys::NODE_LATENCY => "240/48000", res,
// Win WirePlumber's default-source election. This fixes TWO failures (both diagnosed message,
// live on a Bazzite host, PipeWire 1.4.10): "pipewire core error — virtual mic reopening"
// 1. Apps that record the *default* input (games, Discord, arecord) get the client's );
// mic — the Linux analogue of the Windows host forcing the default recording
// endpoint (audio/windows/audio_control.rs). Without it the source is never the
// default, so default-input recorders hear silence.
// 2. On PipeWire 1.4.x, a *non-default* Audio/Source recorded via `--target` never
// gets a driver assigned — the {source, recorder} group stays orphaned (pw-top:
// QUANT/RATE 0, `driver-node None`), so the RT `process()` callback never fires and
// even an explicitly-selected mic is pure silence. Making it the default source
// keeps WirePlumber driving it, so `process()` runs and audio flows. (PipeWire 1.6
// drives any recorded source regardless, which is why this only bit the 1.4 host.)
// Reproduced with a faithful standalone copy of this node: no priority.session → silent,
// priority.session set → audio, on the same 1.4.10 daemon. Only overrides WirePlumber's
// *auto* default (a user's explicit default.configured.audio.source still wins); the
// value clears typical real-hardware source priorities (~10001900).
"priority.session" => "3000",
},
)
.context("pw mic Stream")?;
let ud = MicUserData {
rx: pcm_rx,
ring: VecDeque::new(),
channels: channels as usize,
primed: false,
flush,
last_run: None,
};
let _listener = stream
.add_local_listener_with_user_data(ud)
.state_changed({
let mainloop = mainloop.clone();
move |_s, _ud, old, new| {
tracing::info!(?old, ?new, "pipewire virtual-mic stream state");
// A stream error is unrecoverable for this instance — exit so the pump reopens.
if matches!(new, pw::stream::StreamState::Error(_)) {
mainloop.quit(); mainloop.quit();
} }
} })
}) .register();
.param_changed(|_s, _ud, id, param| {
let Some(param) = param else { return }; // media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a
if id != pw::spa::param::ParamType::Format.as_raw() { // playback stream — without it, Direction::Output + Playback would route to the speakers.
return; let stream = pw::stream::StreamBox::new(
} &core,
let mut info = AudioInfoRaw::default(); "punktfunk-mic",
if info.parse(param).is_ok() { properties! {
tracing::info!( *pw::keys::MEDIA_TYPE => "Audio",
format = ?info.format(), *pw::keys::MEDIA_CLASS => "Audio/Source",
rate = info.rate(), *pw::keys::NODE_NAME => "punktfunk-mic",
channels = info.channels(), *pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone",
"virtual-mic format negotiated" // ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks.
); *pw::keys::NODE_LATENCY => "240/48000",
} // Win WirePlumber's default-source election. This fixes TWO failures (both diagnosed
}) // live on a Bazzite host, PipeWire 1.4.10):
.process(|stream, ud| { // 1. Apps that record the *default* input (games, Discord, arecord) get the client's
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { // mic — the Linux analogue of the Windows host forcing the default recording
let Some(mut buffer) = stream.dequeue_buffer() else { // endpoint (audio/windows/audio_control.rs). Without it the source is never the
return; // default, so default-input recorders hear silence.
}; // 2. On PipeWire 1.4.x, a *non-default* Audio/Source recorded via `--target` never
// Stale-audio guard, BEFORE pulling new frames: drop the ring when a flush was // gets a driver assigned — the {source, recorder} group stays orphaned (pw-top:
// requested (uplink gap — see the pump) or when this callback itself hasn't run // QUANT/RATE 0, `driver-node None`), so the RT `process()` callback never fires and
// for a while (the stream idled with no recorder attached; whatever the ring // even an explicitly-selected mic is pure silence. Making it the default source
// holds predates the consumer). A recorder must never hear a burst of old audio. // keeps WirePlumber driving it, so `process()` runs and audio flows. (PipeWire 1.6
let now = std::time::Instant::now(); // drives any recorded source regardless, which is why this only bit the 1.4 host.)
let idled = ud // Reproduced with a faithful standalone copy of this node: no priority.session → silent,
.last_run // priority.session set → audio, on the same 1.4.10 daemon. Only overrides WirePlumber's
.is_some_and(|t| now.duration_since(t) > MIC_STALE); // *auto* default (a user's explicit default.configured.audio.source still wins); the
if ud.flush.swap(false, std::sync::atomic::Ordering::AcqRel) || idled { // value clears typical real-hardware source priorities (~10001900).
ud.ring.clear(); "priority.session" => "3000",
ud.primed = false; },
} )
ud.last_run = Some(now); .context("pw mic Stream")?;
// Pull all newly-decoded PCM into the ring, aging out chunks that sat in the
// channel while nothing consumed them (same staleness rule). let ud = MicUserData {
while let Ok((t, frame)) = ud.rx.try_recv() { rx: pcm_rx,
if now.duration_since(t) <= MIC_STALE { ring: VecDeque::new(),
ud.ring.extend(frame); channels: channels as usize,
primed: false,
flush,
last_run: None,
};
let _listener = stream
.add_local_listener_with_user_data(ud)
.state_changed({
let mainloop = mainloop.clone();
move |_s, _ud, old, new| {
tracing::info!(?old, ?new, "pipewire virtual-mic stream state");
// A stream error is unrecoverable for this instance — exit so the pump reopens.
if matches!(new, pw::stream::StreamState::Error(_)) {
mainloop.quit();
} }
} }
let stride = 4 * ud.channels; // F32LE interleaved })
let datas = buffer.datas_mut(); .param_changed(|_s, _ud, id, param| {
if datas.is_empty() { let Some(param) = param else { return };
if id != pw::spa::param::ParamType::Format.as_raw() {
return; return;
} }
let data = &mut datas[0]; let mut info = AudioInfoRaw::default();
let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0); if info.parse(param).is_ok() {
let want = want_frames * ud.channels; // interleaved samples this quantum needs
static FIRST: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) {
tracing::info!( tracing::info!(
quantum_frames = want_frames, format = ?info.format(),
quantum_ms = want_frames as f32 / 48.0, rate = info.rate(),
"virtual-mic consumer connected" channels = info.channels(),
"virtual-mic format negotiated"
); );
} }
})
// Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a .process(|stream, ud| {
// whole *quantum* (often 2043 ms) from an independent clock. A drain of one let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
// quantum must not outrun what's buffered, or every call underruns to silence let Some(mut buffer) = stream.dequeue_buffer() else {
// (the original ~58% gaps). So prime to ~3 quanta before producing, hold there, return;
// and re-prime only after a genuine full drain (the client went quiet). The ring };
// is capped at a few quanta so latency stays bounded. // Stale-audio guard, BEFORE pulling new frames: drop the ring when a flush was
let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels); // requested (uplink gap — see the pump) or when this callback itself hasn't run
while ud.ring.len() > target.max(want) + want { // for a while (the stream idled with no recorder attached; whatever the ring
ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack // holds predates the consumer). A recorder must never hear a burst of old audio.
} let now = std::time::Instant::now();
if !ud.primed && ud.ring.len() >= target { let idled = ud
ud.primed = true; .last_run
} .is_some_and(|t| now.duration_since(t) > MIC_STALE);
if ud.flush.swap(false, std::sync::atomic::Ordering::AcqRel) || idled {
let n_frames = if let Some(slice) = data.data() { ud.ring.clear();
for k in 0..want { ud.primed = false;
let s = if ud.primed {
ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun
} else {
0.0 // not yet primed — emit silence while the buffer fills
};
let off = k * 4;
slice[off..off + 4].copy_from_slice(&s.to_le_bytes());
} }
want_frames ud.last_run = Some(now);
} else { // Pull all newly-decoded PCM into the ring, aging out chunks that sat in the
0 // channel while nothing consumed them (same staleness rule).
}; while let Ok((t, frame)) = ud.rx.try_recv() {
if ud.ring.is_empty() { if now.duration_since(t) <= MIC_STALE {
ud.primed = false; // fully drained — re-prime before producing again ud.ring.extend(frame);
}
}
let stride = 4 * ud.channels; // F32LE interleaved
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0);
let want = want_frames * ud.channels; // interleaved samples this quantum needs
static FIRST: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) {
tracing::info!(
quantum_frames = want_frames,
quantum_ms = want_frames as f32 / 48.0,
"virtual-mic consumer connected"
);
}
// Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a
// whole *quantum* (often 2043 ms) from an independent clock. A drain of one
// quantum must not outrun what's buffered, or every call underruns to silence
// (the original ~58% gaps). So prime to ~3 quanta before producing, hold there,
// and re-prime only after a genuine full drain (the client went quiet). The ring
// is capped at a few quanta so latency stays bounded.
let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels);
while ud.ring.len() > target.max(want) + want {
ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack
}
if !ud.primed && ud.ring.len() >= target {
ud.primed = true;
}
let n_frames = if let Some(slice) = data.data() {
for k in 0..want {
let s = if ud.primed {
ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun
} else {
0.0 // not yet primed — emit silence while the buffer fills
};
let off = k * 4;
slice[off..off + 4].copy_from_slice(&s.to_le_bytes());
}
want_frames
} else {
0
};
if ud.ring.is_empty() {
ud.primed = false; // fully drained — re-prime before producing again
}
let chunk = data.chunk_mut();
*chunk.offset_mut() = 0;
*chunk.stride_mut() = stride as _;
*chunk.size_mut() = (stride * n_frames) as _;
}));
if outcome.is_err() {
tracing::error!("panic in pipewire virtual-mic callback");
} }
let chunk = data.chunk_mut(); })
*chunk.offset_mut() = 0; .register()
*chunk.stride_mut() = stride as _; .context("register virtual-mic stream listener")?;
*chunk.size_mut() = (stride * n_frames) as _;
}));
if outcome.is_err() {
tracing::error!("panic in pipewire virtual-mic callback");
}
})
.register()
.context("register virtual-mic stream listener")?;
let mut info = AudioInfoRaw::new(); let mut info = AudioInfoRaw::new();
info.set_format(AudioFormat::F32LE); info.set_format(AudioFormat::F32LE);
info.set_rate(SAMPLE_RATE); info.set_rate(SAMPLE_RATE);
info.set_channels(channels); info.set_channels(channels);
info.set_position(spa_positions(channels)); info.set_position(spa_positions(channels));
let obj = pw::spa::pod::Object { let obj = pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
id: pw::spa::param::ParamType::EnumFormat.as_raw(), id: pw::spa::param::ParamType::EnumFormat.as_raw(),
properties: info.into(), properties: info.into(),
}; };
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize( let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()), std::io::Cursor::new(Vec::new()),
&pw::spa::pod::Value::Object(obj), &pw::spa::pod::Value::Object(obj),
)
.context("serialize mic format pod")?
.0
.into_inner();
let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?];
// RT_PROCESS: run the producer callback on PipeWire's realtime data loop, so the source is a
// *synchronous* graph node that joins its consumer's driver group and is actually driven. Without
// it the node is async/main-loop and, in the host's busy multi-stream graph (desktop-audio +
// video capture + the session), never acquires a driver — it stays suspended and its process()
// never fires, so every recorder hears pure silence (the long-standing "Linux host mic broken").
stream
.connect(
spa::utils::Direction::Output, // we PRODUCE samples (a source)
None,
pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::MAP_BUFFERS
| pw::stream::StreamFlags::RT_PROCESS,
&mut params,
) )
.context("pw mic stream connect")?; .context("serialize mic format pod")?
.0
.into_inner();
let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?];
mainloop.run(); // RT_PROCESS: run the producer callback on PipeWire's realtime data loop, so the source is a
tracing::debug!("pipewire virtual-mic loop exited (source dropped)"); // *synchronous* graph node that joins its consumer's driver group and is actually driven. Without
Ok(()) // it the node is async/main-loop and, in the host's busy multi-stream graph (desktop-audio +
// video capture + the session), never acquires a driver — it stays suspended and its process()
// never fires, so every recorder hears pure silence (the long-standing "Linux host mic broken").
stream
.connect(
spa::utils::Direction::Output, // we PRODUCE samples (a source)
None,
pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::MAP_BUFFERS
| pw::stream::StreamFlags::RT_PROCESS,
&mut params,
)
.context("pw mic stream connect")?;
// Setup complete: the daemon connection and stream connect succeeded — report ready,
// then block until quit/death. (A PipeWire that isn't running never reaches this line;
// its connect error surfaces through the handshake as an OPEN failure, so the pump
// backs off instead of churning on instantly-dead instances.)
let _ = ready.send(Ok(()));
mainloop.run();
tracing::debug!("pipewire virtual-mic loop exited (source dropped)");
Ok(())
})();
if let Err(e) = &result {
let _ = ready.send(Err(anyhow!("{e:#}")));
}
result
} }
fn pw_thread( fn pw_thread(