fix(host/audio): rebuild mic passthrough — eager, self-healing virtual mic on both hosts
apple / swift (push) Successful in 1m7s
ci / rust (push) Successful in 1m57s
ci / web (push) Successful in 59s
android / android (push) Successful in 3m19s
ci / docs-site (push) Successful in 1m0s
apple / screenshots (push) Successful in 5m12s
windows-host / package (push) Successful in 7m2s
ci / bench (push) Successful in 4m52s
decky / build-publish (push) Successful in 14s
deb / build-publish (push) Successful in 4m37s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 8s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 6s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 2m14s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m40s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 9m28s

Mic passthrough silently died on real hosts. Root causes, all fixed:

- No liveness anywhere: a PipeWire restart (Linux) or any WASAPI device
  error (Windows) killed the backend worker; push() fed the dead queue
  for the rest of the host's life. VirtualMic now has a liveness
  contract (push -> bool, alive(), discard()) and the new shared
  audio::MicPump reopens with backoff, probing on an idle heartbeat so
  the mic heals BETWEEN sessions too. Validated live: systemctl restart
  pipewire -> node back in ~0.5 s, tone flows through the reopened
  backend.

- Lazy creation: the mic device didn't exist until the first 0xCB
  frame, but games bind their capture device at launch and never
  re-follow. The pump opens eagerly at host start (node exists with
  zero clients, elected default source).

- Windows headless dead-end: with VB-CABLE as the ONLY render endpoint
  (exactly what the installer ships), the anti-echo guard rejected the
  cable as the default render endpoint -> mic permanently dead. The new
  wiring_plan (pure, unit-tested on every platform) assigns the mic its
  endpoint FIRST (cable reserved for the mic), points the loopback at a
  DIFFERENT endpoint, and the capture side now yields (explicit
  endpoint or honest error) instead of the mic dying. Plan recomputed
  per (re)open — endpoints churn at boot/logon/driver installs.

- Stale bursts: buffered audio from a previous session played into a
  newly-attached recorder (observed live). Timestamped chunks + a
  consumer-gap check in the process callback age everything past 1 s.

The Linux node mechanism stays the stream-based Audio/Source with
RT_PROCESS + priority.session: the canonical null-audio-sink adapter
recipe was tested on this box (PipeWire 1.6.2) and never gets a clock
(QUANT 0 -> pure silence), and WirePlumber reroutes a feeder targeting
it to the default sink (echo). Decision documented in the module docs.

Live-validated on this box (synthetic host + probe --mic-test,
pw-record): eager node, both attach orderings, PipeWire-restart
self-heal, post-session silence. Windows side compile/CI + on-glass
validation pending.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-03 20:41:19 +00:00
parent b7048446c4
commit 2c7ded0f3c
7 changed files with 969 additions and 371 deletions
+392 -5
View File
@@ -42,7 +42,8 @@ pub fn open_audio_capture(channels: u32) -> Result<Box<dyn AudioCapturer>> {
#[cfg(target_os = "windows")]
pub fn open_audio_capture(channels: u32) -> Result<Box<dyn AudioCapturer>> {
audio_control::ensure_wired_once();
// The capture thread runs the audio wiring plan itself (audio_control::wire_now) before
// resolving its endpoint — a fresh plan per open, because Windows endpoints churn.
wasapi_cap::WasapiLoopbackCapturer::open(channels)
.map(|c| Box::new(c) as Box<dyn AudioCapturer>)
}
@@ -57,10 +58,27 @@ pub fn open_audio_capture(_channels: u32) -> Result<Box<dyn AudioCapturer>> {
/// decoded client-mic PCM (interleaved `f32` at [`SAMPLE_RATE`]) into it, and PipeWire delivers
/// it to whichever app records the source — silence when no input is flowing. This is how the
/// client's microphone reaches host applications (mic passthrough).
///
/// **Liveness contract.** Both backends run a worker thread that CAN die under the host's feet
/// (Linux: the PipeWire daemon restarts with the session; Windows: the audio endpoint is
/// invalidated/removed). A dead backend must be observable — [`push`](Self::push) returns `false`
/// and [`alive`](Self::alive) turns false — so the owning [`MicPump`] drops the instance and
/// reopens. Before this contract existed, a single backend death left `push` feeding a dead
/// queue for the rest of the host's life: the historical "mic passthrough works on no host" bug.
pub trait VirtualMic: Send {
/// Push one chunk of interleaved `f32` PCM. Non-blocking — drops if PipeWire is behind
/// (mic audio is lossy/real-time; a stale chunk is worse than a dropped one).
fn push(&self, pcm: &[f32]);
/// Push one chunk of interleaved `f32` PCM. Non-blocking — drops if the backend is behind
/// (mic audio is lossy/real-time; a stale chunk is worse than a dropped one). Returns
/// `false` iff the backend is DEAD (worker thread gone) — the caller must reopen; a merely
/// congested backend drops the chunk and returns `true`.
fn push(&self, pcm: &[f32]) -> bool;
/// Backend liveness without pushing data — lets an idle pump notice a death between
/// sessions, so the mic is already healthy again when the next client connects.
fn alive(&self) -> bool;
/// Drop any buffered-but-unplayed audio. Called after an uplink gap (client muted,
/// session ended) so a recorder never hears a stale burst when audio resumes.
fn discard(&self);
/// The interleaved channel count the source was opened with.
fn channels(&self) -> u32 {
@@ -78,7 +96,8 @@ pub fn open_virtual_mic(channels: u32) -> Result<Box<dyn VirtualMic>> {
#[cfg(target_os = "windows")]
pub fn open_virtual_mic(channels: u32) -> Result<Box<dyn VirtualMic>> {
audio_control::ensure_wired_once();
// The render thread runs the wiring plan itself (audio_control::wire_now) to resolve — and,
// via the plan's default-device changes, to RESERVE — its target endpoint.
wasapi_mic::WasapiVirtualMic::open(channels).map(|m| Box::new(m) as Box<dyn VirtualMic>)
}
@@ -87,6 +106,189 @@ pub fn open_virtual_mic(_channels: u32) -> Result<Box<dyn VirtualMic>> {
anyhow::bail!("virtual mic requires Linux + PipeWire or Windows + a virtual audio device")
}
/// Mic is 48 kHz stereo — matches the Opus stereo decoder and the host→client audio layout.
pub const MIC_CHANNELS: u32 = 2;
/// Bound for the shared mic frame queue (drop-newest when full): the host-lifetime queue is
/// shared across all concurrent sessions and must not grow without limit under a near-line-rate
/// flood (security-review 2026-06-28 S6). 64 × 520 ms frames ≈ 0.31.3 s of slack.
const MIC_QUEUE_CAP: usize = 64;
/// Tuning for [`MicPump`]'s open/reopen/flush behaviour — parameterized so the tests can run the
/// real pump loop in milliseconds instead of seconds.
#[derive(Clone, Copy)]
struct PumpTuning {
/// First-retry delay after a failed backend open; doubles per failure up to `backoff_cap`
/// (a persistently-absent PipeWire session / audio endpoint isn't hammered), resets on
/// success.
backoff_start: std::time::Duration,
backoff_cap: std::time::Duration,
/// Idle liveness-probe interval: with no frames flowing, the pump still notices a dead
/// backend this often and reopens — so the mic is healthy BEFORE the next session starts.
heartbeat: std::time::Duration,
/// An uplink gap longer than this discards the backend's buffered audio before pushing the
/// next frame (a recorder must never hear a stale burst from before a mute/session end).
stale_gap: std::time::Duration,
}
const PUMP_TUNING: PumpTuning = PumpTuning {
backoff_start: std::time::Duration::from_secs(2),
backoff_cap: std::time::Duration::from_secs(60),
heartbeat: std::time::Duration::from_secs(1),
stale_gap: std::time::Duration::from_millis(600),
};
/// Host-lifetime virtual-microphone pump: one thread owns the [`VirtualMic`] backend + an Opus
/// decoder; sessions forward the client's Opus mic frames (0xCB) over a clonable `Send` sender,
/// the thread decodes and feeds the backend.
///
/// The rock-solid properties live HERE, not in the backends:
/// - **Eager**: the backend opens at host start (retrying with backoff), NOT on the first mic
/// frame — so the virtual mic device already exists when host apps/games launch and bind
/// their capture device (most games never re-follow a default-device change mid-run).
/// - **Self-healing**: a dead backend (PipeWire restart, Windows endpoint churn) is detected on
/// every push and on an idle heartbeat, and reopened with backoff. Sessions keep their
/// senders; nothing upstream notices.
/// - **Stale-flush**: buffered audio is discarded after an uplink gap (see [`PumpTuning`]).
///
/// Per-frame Opus DECODE errors stay non-fatal (dropped frame): the mic is shared across every
/// concurrent session, so one paired client's junk frames must not deny everyone's mic
/// (security-review 2026-06-28 S2). The thread exits when every sender is dropped (host
/// shutdown), tearing the backend down.
pub struct MicPump {
tx: std::sync::mpsc::SyncSender<Vec<u8>>,
}
impl MicPump {
/// Start the host-lifetime pump (Linux/Windows). On platforms without a virtual-mic backend
/// the thread just drains and drops frames (sessions still count the datagrams).
pub fn start() -> MicPump {
let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(MIC_QUEUE_CAP);
let spawned = std::thread::Builder::new()
.name("punktfunk-mic-pump".into())
.spawn(move || {
#[cfg(any(target_os = "linux", target_os = "windows"))]
pump_thread(rx, || open_virtual_mic(MIC_CHANNELS), PUMP_TUNING);
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
tracing::warn!("mic passthrough unsupported on this platform — frames dropped");
for _ in rx {}
}
});
if let Err(e) = spawned {
tracing::error!(error = %e, "mic pump thread spawn failed — mic passthrough disabled");
}
MicPump { tx }
}
/// A sender a session forwards the client's Opus mic frames to (`try_send` — never block a
/// datagram loop). Cloned per session; dropping a clone does NOT stop the pump (it holds
/// the original sender for the host life).
pub fn sender(&self) -> std::sync::mpsc::SyncSender<Vec<u8>> {
self.tx.clone()
}
}
/// The pump loop. `opener` is injected so the tests can run the REAL loop against a mock
/// backend; production passes [`open_virtual_mic`].
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), allow(dead_code))]
fn pump_thread<O>(rx: std::sync::mpsc::Receiver<Vec<u8>>, opener: O, tuning: PumpTuning)
where
O: Fn() -> Result<Box<dyn VirtualMic>>,
{
use std::sync::mpsc::RecvTimeoutError;
use std::time::Instant;
let mut backoff = tuning.backoff_start;
let mut open_fails: u64 = 0;
'reopen: loop {
// Open phase — eager, from thread start. While closed, keep draining the queue so a
// reopen never replays a backlog of stale frames (and senders never see a wedged queue).
let (mic, mut decoder) = loop {
let opened = opener().and_then(|m| {
let d = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Stereo)
.map_err(|e| anyhow::anyhow!("opus decoder: {e}"))?;
Ok((m, d))
});
match opened {
Ok(pair) => break pair,
Err(e) => {
// Throttle (1st, 2nd, 4th, 8th … failure): a box without a PipeWire session
// or virtual audio device would otherwise log every backoff forever.
open_fails += 1;
if open_fails.is_power_of_two() {
tracing::warn!(error = %format!("{e:#}"), attempts = open_fails,
"virtual mic unavailable — retrying with backoff");
}
let deadline = Instant::now() + backoff;
loop {
let left = deadline.saturating_duration_since(Instant::now());
if left.is_zero() {
break;
}
match rx.recv_timeout(left.min(std::time::Duration::from_millis(250))) {
Ok(_) => {} // drop frames while closed
Err(RecvTimeoutError::Timeout) => {} // keep waiting
Err(RecvTimeoutError::Disconnected) => return, // host shutdown
}
}
backoff = (backoff * 2).min(tuning.backoff_cap);
}
}
};
backoff = tuning.backoff_start;
open_fails = 0;
tracing::info!("virtual mic ready (host-lifetime)");
// Drop anything queued while (re)opening — it predates the backend.
while rx.try_recv().is_ok() {}
let mut decode_fails: u64 = 0;
let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch
let mut last_push = Instant::now();
loop {
match rx.recv_timeout(tuning.heartbeat) {
Ok(frame) => {
if frame.is_empty() {
continue; // DTX silence — the source underruns to silence on its own
}
if last_push.elapsed() > tuning.stale_gap {
mic.discard();
}
match decoder.decode_float(&frame, &mut pcm, false) {
Ok(samples_per_ch) => {
let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len());
if !mic.push(&pcm[..total]) {
tracing::warn!("virtual mic backend died — reopening");
continue 'reopen;
}
last_push = Instant::now();
decode_fails = 0;
}
Err(e) => {
// Malformed/garbage frame: drop it, keep the shared mic + decoder
// (see the struct docs). Throttled log (1, 2, 4, … fails).
decode_fails += 1;
if decode_fails.is_power_of_two() {
tracing::warn!(error = %e, fails = decode_fails,
"mic opus decode failed — dropping frame");
}
}
}
}
Err(RecvTimeoutError::Timeout) => {
if !mic.alive() {
tracing::warn!("virtual mic backend died while idle — reopening");
continue 'reopen;
}
}
Err(RecvTimeoutError::Disconnected) => {
tracing::debug!("mic pump stopped (host shutting down)");
return;
}
}
}
}
}
#[cfg(target_os = "windows")]
#[path = "audio/windows/audio_control.rs"]
mod audio_control;
@@ -98,3 +300,188 @@ mod wasapi_cap;
#[cfg(target_os = "windows")]
#[path = "audio/windows/wasapi_mic.rs"]
mod wasapi_mic;
#[cfg_attr(not(target_os = "windows"), allow(dead_code))]
#[path = "audio/wiring_plan.rs"]
pub(crate) mod wiring_plan;
#[cfg(test)]
mod pump_tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
/// Mock backend: records pushes/discards, dies on command.
struct MockMic {
alive: Arc<AtomicBool>,
pushed: Arc<AtomicUsize>,
discards: Arc<AtomicUsize>,
}
impl VirtualMic for MockMic {
fn push(&self, pcm: &[f32]) -> bool {
if !self.alive.load(Ordering::Acquire) {
return false;
}
self.pushed.fetch_add(pcm.len(), Ordering::Relaxed);
true
}
fn alive(&self) -> bool {
self.alive.load(Ordering::Acquire)
}
fn discard(&self) {
self.discards.fetch_add(1, Ordering::Relaxed);
}
}
struct Harness {
tx: std::sync::mpsc::SyncSender<Vec<u8>>,
opens: Arc<AtomicUsize>,
alive: Arc<Mutex<Option<Arc<AtomicBool>>>>, // latest instance's kill switch
pushed: Arc<AtomicUsize>,
discards: Arc<AtomicUsize>,
join: std::thread::JoinHandle<()>,
}
/// Run the REAL pump loop against mock backends; `fail_first` opens fail before the first
/// success (exercises the eager retry/backoff path).
fn start(fail_first: usize) -> Harness {
let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(MIC_QUEUE_CAP);
let opens = Arc::new(AtomicUsize::new(0));
let alive = Arc::new(Mutex::new(None::<Arc<AtomicBool>>));
let pushed = Arc::new(AtomicUsize::new(0));
let discards = Arc::new(AtomicUsize::new(0));
let (opens2, alive2, pushed2, discards2) = (
opens.clone(),
alive.clone(),
pushed.clone(),
discards.clone(),
);
let tuning = PumpTuning {
backoff_start: Duration::from_millis(10),
backoff_cap: Duration::from_millis(40),
heartbeat: Duration::from_millis(20),
stale_gap: Duration::from_millis(80),
};
let join = std::thread::spawn(move || {
pump_thread(
rx,
move || {
let n = opens2.fetch_add(1, Ordering::SeqCst);
if n < fail_first {
anyhow::bail!("backend not up yet (simulated)");
}
let a = Arc::new(AtomicBool::new(true));
*alive2.lock().unwrap() = Some(a.clone());
Ok(Box::new(MockMic {
alive: a,
pushed: pushed2.clone(),
discards: discards2.clone(),
}) as Box<dyn VirtualMic>)
},
tuning,
)
});
Harness {
tx,
opens,
alive,
pushed,
discards,
join,
}
}
fn wait_until(what: &str, mut cond: impl FnMut() -> bool) {
for _ in 0..200 {
if cond() {
return;
}
std::thread::sleep(Duration::from_millis(10));
}
panic!("timed out waiting for: {what}");
}
fn opus_frame() -> Vec<u8> {
let mut enc = opus::Encoder::new(48_000, opus::Channels::Stereo, opus::Application::Voip)
.expect("opus encoder");
let pcm = [0.1f32; 960 * 2]; // 20 ms stereo
let mut out = vec![0u8; 4000];
let n = enc.encode_float(&pcm, &mut out).expect("encode");
out.truncate(n);
out
}
/// Eager: the backend opens (after transient failures) with NO frame ever sent.
#[test]
fn opens_eagerly_with_backoff() {
let h = start(3);
wait_until("eager open after 3 failures", || {
h.opens.load(Ordering::SeqCst) >= 4 && h.alive.lock().unwrap().is_some()
});
drop(h.tx);
h.join.join().unwrap();
}
/// Frames flow: opus in → PCM pushed to the backend.
#[test]
fn decodes_and_pushes() {
let h = start(0);
wait_until("open", || h.alive.lock().unwrap().is_some());
h.tx.send(opus_frame()).unwrap();
wait_until("pcm pushed", || h.pushed.load(Ordering::SeqCst) > 0);
drop(h.tx);
h.join.join().unwrap();
}
/// A dead backend is noticed WHILE IDLE (heartbeat) and reopened without any traffic.
#[test]
fn reopens_after_idle_death() {
let h = start(0);
wait_until("first open", || h.opens.load(Ordering::SeqCst) >= 1);
wait_until("instance", || h.alive.lock().unwrap().is_some());
h.alive
.lock()
.unwrap()
.as_ref()
.unwrap()
.store(false, Ordering::Release); // kill it
wait_until("reopen after idle death", || {
h.opens.load(Ordering::SeqCst) >= 2
});
drop(h.tx);
h.join.join().unwrap();
}
/// A death detected on push (frame flowing) also reopens, and the frame after reopen flows.
#[test]
fn reopens_after_push_death() {
let h = start(0);
wait_until("instance", || h.alive.lock().unwrap().is_some());
h.alive
.lock()
.unwrap()
.as_ref()
.unwrap()
.store(false, Ordering::Release);
h.tx.send(opus_frame()).unwrap(); // push sees death → reopen
wait_until("reopen", || h.opens.load(Ordering::SeqCst) >= 2);
h.tx.send(opus_frame()).unwrap();
wait_until("pcm after reopen", || h.pushed.load(Ordering::SeqCst) > 0);
drop(h.tx);
h.join.join().unwrap();
}
/// An uplink gap discards buffered-stale audio before the next frame plays.
#[test]
fn discards_after_gap() {
let h = start(0);
wait_until("instance", || h.alive.lock().unwrap().is_some());
h.tx.send(opus_frame()).unwrap();
wait_until("first push", || h.pushed.load(Ordering::SeqCst) > 0);
std::thread::sleep(Duration::from_millis(150)); // > stale_gap
h.tx.send(opus_frame()).unwrap();
wait_until("discard on gap", || h.discards.load(Ordering::SeqCst) >= 1);
drop(h.tx);
h.join.join().unwrap();
}
}