feat: mic passthrough — client microphone → host virtual PipeWire source
ci / rust (push) Has been cancelled

The inverse of the host→client audio path: the client's mic, Opus-encoded, rides a
new 0xCB QUIC datagram to the host, which decodes it into a virtual PipeWire
Audio/Source its apps can record from (voice chat, etc.).

Protocol (punktfunk-core):
- MIC_MAGIC 0xCB + encode/decode_mic_datagram (mirror of the 0xC9 audio datagram).
- NativeClient::send_mic(seq, pts_ns, opus) over a new outbound channel + worker task
  (mirror of send_input); C ABI punktfunk_connection_send_mic for native clients.

Host:
- audio::VirtualMic + PwMicSource: a PipeWire output stream tagged media.class=
  Audio/Source (Direction::Output) — a recordable microphone node, fed decoded PCM.
- MicService: host-lifetime owner of the source + Opus decoder (mirror of
  InjectorService / the audio capturer slot); lazily opened, persists across sessions,
  self-heals. The per-session datagram reader now demuxes 0xCB→mic / 0xC8→input over a
  single read_datagram loop (two loops would race).
- Adaptive jitter buffer in the producer: primes to ~3 consumer quanta before emitting,
  so the 5 ms push / N ms pull clock skew never underruns — without it ~58% of output
  was silence; with it, glitch-free across consumer quanta.

Client: punktfunk-client-rs --mic-test streams a synthetic 440 Hz Opus tone as the mic
uplink (opus dep added) for end-to-end validation without a real microphone.

Validated live on headless KWin: client tone → host source → pw-record shows the
punktfunk-mic Audio/Source node, 440 Hz dominant (Goertzel power 20.7 vs <0.001
elsewhere), RMS 0.179 ≈ the ideal 0.177, 0.3–0.4% silence at both 256 ms and 10 ms
consumer quanta. Tests +1 (mic datagram roundtrip); workspace green, clippy/fmt clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 22:15:07 +00:00
parent f3ff5f648a
commit 0755c823a5
10 changed files with 545 additions and 10 deletions
+27
View File
@@ -42,5 +42,32 @@ pub fn open_audio_capture(_channels: u32) -> Result<Box<dyn AudioCapturer>> {
anyhow::bail!("audio capture requires Linux + PipeWire")
}
/// The inverse of [`AudioCapturer`]: a virtual microphone the host *produces*. It registers a
/// PipeWire `Audio/Source` node that host apps can record from; the host [`push`](Self::push)es
/// decoded client-mic PCM (interleaved `f32` at [`SAMPLE_RATE`]) into it, and PipeWire delivers
/// it to whichever app records the source — silence when no input is flowing. This is how the
/// client's microphone reaches host applications (mic passthrough).
pub trait VirtualMic: Send {
/// Push one chunk of interleaved `f32` PCM. Non-blocking — drops if PipeWire is behind
/// (mic audio is lossy/real-time; a stale chunk is worse than a dropped one).
fn push(&self, pcm: &[f32]);
/// The interleaved channel count the source was opened with.
fn channels(&self) -> u32 {
CHANNELS as u32
}
}
/// Open a virtual microphone PipeWire source with `channels` interleaved channels (1 or 2).
#[cfg(target_os = "linux")]
pub fn open_virtual_mic(channels: u32) -> Result<Box<dyn VirtualMic>> {
linux::PwMicSource::open(channels).map(|m| Box::new(m) as Box<dyn VirtualMic>)
}
#[cfg(not(target_os = "linux"))]
pub fn open_virtual_mic(_channels: u32) -> Result<Box<dyn VirtualMic>> {
anyhow::bail!("virtual mic requires Linux + PipeWire")
}
#[cfg(target_os = "linux")]
mod linux;
+228 -1
View File
@@ -13,8 +13,9 @@
//! stream down promptly — required so a surround session can replace a stereo capturer
//! without leaking a PipeWire consumer (see CLAUDE.md: a wedged link head-blocks the daemon).
use super::{AudioCapturer, SAMPLE_RATE};
use super::{AudioCapturer, VirtualMic, SAMPLE_RATE};
use anyhow::{anyhow, Context, Result};
use std::collections::VecDeque;
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError};
use std::thread;
use std::time::Duration;
@@ -105,6 +106,232 @@ fn spa_positions(channels: u32) -> [u32; 64] {
pos
}
/// Virtual microphone: a PipeWire `Audio/Source` node host apps can record from. The host pushes
/// decoded client-mic PCM in; the loop thread's producer callback drains it (silence on
/// underrun) into PipeWire buffers. Mirrors [`PwAudioCapturer`] but inverted (Direction::Output).
pub struct PwMicSource {
pcm: std::sync::mpsc::SyncSender<Vec<f32>>,
channels: u32,
quit: pipewire::channel::Sender<Terminate>,
}
impl PwMicSource {
pub fn open(channels: u32) -> Result<PwMicSource> {
anyhow::ensure!(
matches!(channels, 1 | 2),
"virtual mic supports 1 or 2 channels, got {channels}"
);
let (pcm_tx, pcm_rx) = sync_channel::<Vec<f32>>(64);
let (quit_tx, quit_rx) = pipewire::channel::channel::<Terminate>();
thread::Builder::new()
.name("punktfunk-pw-mic".into())
.spawn(move || {
if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels) {
tracing::error!(error = %format!("{e:#}"), "pipewire virtual-mic thread failed");
}
})
.context("spawn pipewire virtual-mic thread")?;
Ok(PwMicSource {
pcm: pcm_tx,
channels,
quit: quit_tx,
})
}
}
impl Drop for PwMicSource {
fn drop(&mut self) {
let _ = self.quit.send(Terminate);
}
}
impl VirtualMic for PwMicSource {
fn push(&self, pcm: &[f32]) {
let _ = self.pcm.try_send(pcm.to_vec()); // drop if the PipeWire side is behind
}
fn channels(&self) -> u32 {
self.channels
}
}
/// Producer-side state for the virtual-mic loop: incoming decoded PCM and a small ring buffer
/// the process callback drains into PipeWire buffers (capped, so latency stays bounded).
/// `primed` is a jitter buffer gate — see the process callback.
struct MicUserData {
rx: Receiver<Vec<f32>>,
ring: VecDeque<f32>,
channels: usize,
primed: bool,
}
fn mic_pw_thread(
pcm_rx: Receiver<Vec<f32>>,
quit_rx: pipewire::channel::Receiver<Terminate>,
channels: u32,
) -> Result<()> {
use pipewire as pw;
use pw::{properties::properties, spa};
use spa::param::audio::{AudioFormat, AudioInfoRaw};
use spa::pod::Pod;
crate::pwinit::ensure_init();
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?;
let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?;
let core = context
.connect_rc(None)
.context("pw mic connect (is PipeWire running in this session?)")?;
let _quit_guard = quit_rx.attach(mainloop.loop_(), {
let mainloop = mainloop.clone();
move |_| mainloop.quit()
});
// media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a
// playback stream — without it, Direction::Output + Playback would route to the speakers.
let stream = pw::stream::StreamBox::new(
&core,
"punktfunk-mic",
properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CLASS => "Audio/Source",
*pw::keys::NODE_NAME => "punktfunk-mic",
*pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone",
// ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks.
*pw::keys::NODE_LATENCY => "240/48000",
},
)
.context("pw mic Stream")?;
let ud = MicUserData {
rx: pcm_rx,
ring: VecDeque::new(),
channels: channels as usize,
primed: false,
};
let _listener = stream
.add_local_listener_with_user_data(ud)
.state_changed(|_s, _ud, old, new| {
tracing::info!(?old, ?new, "pipewire virtual-mic stream state");
})
.param_changed(|_s, _ud, id, param| {
let Some(param) = param else { return };
if id != pw::spa::param::ParamType::Format.as_raw() {
return;
}
let mut info = AudioInfoRaw::default();
if info.parse(param).is_ok() {
tracing::info!(
format = ?info.format(),
rate = info.rate(),
channels = info.channels(),
"virtual-mic format negotiated"
);
}
})
.process(|stream, ud| {
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
// Pull all newly-decoded PCM into the ring.
while let Ok(frame) = ud.rx.try_recv() {
ud.ring.extend(frame);
}
let stride = 4 * ud.channels; // F32LE interleaved
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0);
let want = want_frames * ud.channels; // interleaved samples this quantum needs
static FIRST: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) {
tracing::info!(
quantum_frames = want_frames,
quantum_ms = want_frames as f32 / 48.0,
"virtual-mic consumer connected"
);
}
// Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a
// whole *quantum* (often 2043 ms) from an independent clock. A drain of one
// quantum must not outrun what's buffered, or every call underruns to silence
// (the original ~58% gaps). So prime to ~3 quanta before producing, hold there,
// and re-prime only after a genuine full drain (the client went quiet). The ring
// is capped at a few quanta so latency stays bounded.
let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels);
while ud.ring.len() > target.max(want) + want {
ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack
}
if !ud.primed && ud.ring.len() >= target {
ud.primed = true;
}
let n_frames = if let Some(slice) = data.data() {
for k in 0..want {
let s = if ud.primed {
ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun
} else {
0.0 // not yet primed — emit silence while the buffer fills
};
let off = k * 4;
slice[off..off + 4].copy_from_slice(&s.to_le_bytes());
}
want_frames
} else {
0
};
if ud.ring.is_empty() {
ud.primed = false; // fully drained — re-prime before producing again
}
let chunk = data.chunk_mut();
*chunk.offset_mut() = 0;
*chunk.stride_mut() = stride as _;
*chunk.size_mut() = (stride * n_frames) as _;
}));
if outcome.is_err() {
tracing::error!("panic in pipewire virtual-mic callback");
}
})
.register()
.context("register virtual-mic stream listener")?;
let mut info = AudioInfoRaw::new();
info.set_format(AudioFormat::F32LE);
info.set_rate(SAMPLE_RATE);
info.set_channels(channels);
info.set_position(spa_positions(channels));
let obj = pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
id: pw::spa::param::ParamType::EnumFormat.as_raw(),
properties: info.into(),
};
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&pw::spa::pod::Value::Object(obj),
)
.context("serialize mic format pod")?
.0
.into_inner();
let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?];
stream
.connect(
spa::utils::Direction::Output, // we PRODUCE samples (a source)
None,
pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS,
&mut params,
)
.context("pw mic stream connect")?;
mainloop.run();
tracing::debug!("pipewire virtual-mic loop exited (source dropped)");
Ok(())
}
fn pw_thread(
tx: std::sync::mpsc::SyncSender<Vec<f32>>,
quit_rx: pipewire::channel::Receiver<Terminate>,
+107 -4
View File
@@ -185,6 +185,9 @@ async fn serve(opts: M3Options) -> Result<()> {
// session — which, under rapid client reconnects, raced a prior session's portal teardown and
// wedged KWin's EIS setup ("EIS setup timed out"). Gamepads stay per-session (uinput).
let injector = InjectorService::start();
// One virtual microphone for the whole host lifetime (see MicService): the client's mic uplink
// (0xCB) is Opus-decoded and fed into a persistent PipeWire Audio/Source host apps record from.
let mic_service = MicService::start();
let paired_at = match &opts.paired_store {
Some(p) => p.clone(),
None => paired_path()?,
@@ -233,6 +236,7 @@ async fn serve(opts: M3Options) -> Result<()> {
&opts,
&audio_cap,
injector.sender(),
mic_service.sender(),
&fingerprint,
&paired,
&last_pairing,
@@ -350,6 +354,7 @@ async fn serve_session(
opts: &M3Options,
audio_cap: &AudioCapSlot,
inj_tx: std::sync::mpsc::Sender<InputEvent>,
mic_tx: std::sync::mpsc::Sender<Vec<u8>>,
host_fp: &[u8; 32],
paired: &PairedStore,
last_pairing: &std::sync::Mutex<Option<std::time::Instant>>,
@@ -521,18 +526,30 @@ async fn serve_session(
.spawn(move || input_thread(input_rx, conn, inj_tx))
.context("spawn input thread")?
};
// One reader for ALL client→host datagrams, demuxed by magic byte (two read_datagram loops
// would race for datagrams): 0xCB → mic uplink (Opus, forwarded to the host-lifetime mic
// service), 0xC8 → input (forwarded to the per-session input thread). The magics are disjoint,
// so decode order doesn't matter. Unknown tags are ignored.
let input_conn = conn.clone();
tokio::spawn(async move {
let mut count = 0u64;
let (mut input_count, mut mic_count) = (0u64, 0u64);
while let Ok(d) = input_conn.read_datagram().await {
if let Some(ev) = InputEvent::decode(&d) {
count += 1;
if let Some((_seq, _pts, opus)) = punktfunk_core::quic::decode_mic_datagram(&d) {
mic_count += 1;
// Host-lifetime mic service; a send error just means the host is shutting down.
let _ = mic_tx.send(opus.to_vec());
} else if let Some(ev) = InputEvent::decode(&d) {
input_count += 1;
if input_tx.send(ev).is_err() {
break;
}
}
}
tracing::info!(count, "input datagram stream ended");
tracing::info!(
input = input_count,
mic = mic_count,
"client datagram stream ended"
);
});
// Stop signal: stream duration elapsed or the client went away.
@@ -758,6 +775,92 @@ fn injector_service_thread(rx: std::sync::mpsc::Receiver<InputEvent>) {
tracing::debug!("injector service stopped (host shutting down)");
}
/// Mic is 48 kHz stereo — matches the Opus stereo decoder and the host→client audio layout.
const MIC_CHANNELS: u32 = 2;
/// Host-lifetime virtual microphone, shared across punktfunk/1 sessions (mirror of
/// [`InjectorService`]). One thread owns the PipeWire `Audio/Source` + an Opus decoder; sessions
/// forward the client's Opus mic frames over a clonable `Send` channel, the thread decodes and
/// feeds the source. Opened lazily on the first frame, the source node persists across sessions
/// (no per-session registration churn), and reopens after a backoff if the source/decoder fails.
struct MicService {
tx: std::sync::mpsc::Sender<Vec<u8>>,
}
impl MicService {
fn start() -> MicService {
let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
if let Err(e) = std::thread::Builder::new()
.name("punktfunk-m3-mic".into())
.spawn(move || mic_service_thread(rx))
{
tracing::error!(error = %e, "mic service thread spawn failed — mic passthrough disabled");
}
MicService { tx }
}
/// A sender a session forwards the client's Opus mic frames to. Cloned per session; dropping a
/// clone does NOT stop the service (it holds the original sender for the host life).
fn sender(&self) -> std::sync::mpsc::Sender<Vec<u8>> {
self.tx.clone()
}
}
/// The host-lifetime mic worker: lazily open the virtual mic + decoder, then Opus-decode each
/// forwarded frame and push the PCM into the source. Reopen (after [`INJECTOR_REOPEN_BACKOFF`])
/// on open failure or a decode error. Exits when every session sender and the service's own
/// sender drop (host shutdown), tearing the PipeWire source down.
fn mic_service_thread(rx: std::sync::mpsc::Receiver<Vec<u8>>) {
let mut mic: Option<Box<dyn crate::audio::VirtualMic>> = None;
let mut decoder: Option<opus::Decoder> = None;
let mut last_failed: Option<std::time::Instant> = None;
let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch
for opus_frame in rx {
if opus_frame.is_empty() {
continue; // DTX silence — the source underruns to silence on its own
}
if mic.is_none() || decoder.is_none() {
if last_failed.is_some_and(|t| t.elapsed() < INJECTOR_REOPEN_BACKOFF) {
continue; // still within the reopen backoff window
}
let opened = crate::audio::open_virtual_mic(MIC_CHANNELS).and_then(|m| {
let d = opus::Decoder::new(48_000, opus::Channels::Stereo)
.map_err(|e| anyhow!("opus decoder: {e}"))?;
Ok((m, d))
});
match opened {
Ok((m, d)) => {
tracing::info!("punktfunk/1 virtual mic ready (host-lifetime)");
mic = Some(m);
decoder = Some(d);
last_failed = None;
}
Err(e) => {
tracing::error!(error = %format!("{e:#}"), "virtual mic unavailable — will retry");
last_failed = Some(std::time::Instant::now());
continue;
}
}
}
let (Some(m), Some(dec)) = (mic.as_ref(), decoder.as_mut()) else {
continue;
};
match dec.decode_float(&opus_frame, &mut pcm, false) {
Ok(samples_per_ch) => {
let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len());
m.push(&pcm[..total]);
}
Err(e) => {
tracing::warn!(error = %e, "mic opus decode failed — reopening");
mic = None;
decoder = None;
last_failed = Some(std::time::Instant::now());
}
}
}
tracing::debug!("mic service stopped (host shutting down)");
}
/// The per-session input thread: route pointer/keyboard events to the host-lifetime injector
/// service (`inj_tx`) and gamepad events to this session's own [`GamepadManager`]
/// (crate::inject::gamepad), with force feedback pumped between events and sent back as rumble