From 0755c823a5331c2f9abcb3f3168d225f963ea8ab Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Wed, 10 Jun 2026 22:15:07 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20mic=20passthrough=20=E2=80=94=20client?= =?UTF-8?q?=20microphone=20=E2=86=92=20host=20virtual=20PipeWire=20source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The inverse of the host→client audio path: the client's mic, Opus-encoded, rides a new 0xCB QUIC datagram to the host, which decodes it into a virtual PipeWire Audio/Source its apps can record from (voice chat, etc.). Protocol (punktfunk-core): - MIC_MAGIC 0xCB + encode/decode_mic_datagram (mirror of the 0xC9 audio datagram). - NativeClient::send_mic(seq, pts_ns, opus) over a new outbound channel + worker task (mirror of send_input); C ABI punktfunk_connection_send_mic for native clients. Host: - audio::VirtualMic + PwMicSource: a PipeWire output stream tagged media.class= Audio/Source (Direction::Output) — a recordable microphone node, fed decoded PCM. - MicService: host-lifetime owner of the source + Opus decoder (mirror of InjectorService / the audio capturer slot); lazily opened, persists across sessions, self-heals. The per-session datagram reader now demuxes 0xCB→mic / 0xC8→input over a single read_datagram loop (two loops would race). - Adaptive jitter buffer in the producer: primes to ~3 consumer quanta before emitting, so the 5 ms push / N ms pull clock skew never underruns — without it ~58% of output was silence; with it, glitch-free across consumer quanta. Client: punktfunk-client-rs --mic-test streams a synthetic 440 Hz Opus tone as the mic uplink (opus dep added) for end-to-end validation without a real microphone. Validated live on headless KWin: client tone → host source → pw-record shows the punktfunk-mic Audio/Source node, 440 Hz dominant (Goertzel power 20.7 vs <0.001 elsewhere), RMS 0.179 ≈ the ideal 0.177, 0.3–0.4% silence at both 256 ms and 10 ms consumer quanta. Tests +1 (mic datagram roundtrip); workspace green, clippy/fmt clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 1 + crates/punktfunk-client-rs/Cargo.toml | 1 + crates/punktfunk-client-rs/src/main.rs | 50 ++++- crates/punktfunk-core/src/abi.rs | 36 ++++ crates/punktfunk-core/src/client.rs | 26 +++ crates/punktfunk-core/src/quic.rs | 48 ++++- crates/punktfunk-host/src/audio.rs | 27 +++ crates/punktfunk-host/src/audio/linux.rs | 229 ++++++++++++++++++++++- crates/punktfunk-host/src/m3.rs | 111 ++++++++++- include/punktfunk_core.h | 26 ++- 10 files changed, 545 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 652556d..783f07f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1953,6 +1953,7 @@ name = "punktfunk-client-rs" version = "0.0.1" dependencies = [ "anyhow", + "opus", "punktfunk-core", "quinn", "tokio", diff --git a/crates/punktfunk-client-rs/Cargo.toml b/crates/punktfunk-client-rs/Cargo.toml index a7f8053..3d7c413 100644 --- a/crates/punktfunk-client-rs/Cargo.toml +++ b/crates/punktfunk-client-rs/Cargo.toml @@ -13,5 +13,6 @@ punktfunk-core = { path = "../punktfunk-core", features = ["quic"] } quinn = "0.11" tokio = { version = "1", features = ["rt-multi-thread", "net", "time", "macros"] } anyhow = "1" +opus = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/punktfunk-client-rs/src/main.rs b/crates/punktfunk-client-rs/src/main.rs index f351264..cbc2822 100644 --- a/crates/punktfunk-client-rs/src/main.rs +++ b/crates/punktfunk-client-rs/src/main.rs @@ -7,7 +7,9 @@ //! stamps each frame with its capture wall clock; same-host runs share that clock). //! //! `--input-test` exercises the input plane: scripted mouse/keyboard datagrams during the -//! stream (watch them land in the host session, e.g. xev inside gamescope). +//! stream (watch them land in the host session, e.g. xev inside gamescope). `--mic-test` +//! exercises the mic uplink: a synthetic 440 Hz tone streamed as Opus (0xCB) → the host's +//! virtual microphone source (record it host-side to hear the tone). //! //! `--pin <64-hex>` pins the host's certificate fingerprint (the host logs it at startup); //! without it the client trusts on first use and prints the observed fingerprint to pin. @@ -37,6 +39,8 @@ struct Args { mode: Mode, out: Option, input_test: bool, + /// `--mic-test` — stream a synthetic 440 Hz tone as the mic uplink (proves the mic path). + mic_test: bool, pin: Option<[u8; 32]>, /// `--remode WxHxFPS:SECS` — request this mode SECS seconds into the stream. remode: Option<(Mode, u32)>, @@ -137,6 +141,7 @@ fn parse_args() -> Args { mode, out: get("--out").map(String::from), input_test: argv.iter().any(|a| a == "--input-test"), + mic_test: argv.iter().any(|a| a == "--mic-test"), pin, remode, pair: get("--pair").map(String::from), @@ -348,6 +353,49 @@ async fn session(args: Args) -> Result<()> { }); } + // Mic plane: stream a synthetic 440 Hz tone as the mic uplink (0xCB), Opus-encoded 5 ms + // stereo frames — proves client→host mic passthrough end to end without a real microphone + // (the host decodes it into its virtual PipeWire source; record that source to hear the tone). + if args.mic_test { + let conn2 = conn.clone(); + tokio::spawn(async move { + let mut enc = + match opus::Encoder::new(48_000, opus::Channels::Stereo, opus::Application::Voip) { + Ok(e) => e, + Err(e) => { + tracing::error!(error = %e, "mic-test: opus encoder init failed"); + return; + } + }; + let _ = enc.set_bitrate(opus::Bitrate::Bits(64_000)); + tracing::info!("mic-test: streaming a 440 Hz tone as the mic uplink"); + let mut phase = 0.0f32; + let step = 2.0 * std::f32::consts::PI * 440.0 / 48_000.0; + let mut pcm = [0f32; 240 * 2]; // 5 ms stereo + let mut out = [0u8; 4000]; + let mut interval = tokio::time::interval(std::time::Duration::from_millis(5)); + for seq in 0u32.. { + interval.tick().await; + for f in 0..240 { + let s = (phase.sin()) * 0.25; + phase += step; + if phase > std::f32::consts::PI * 2.0 { + phase -= std::f32::consts::PI * 2.0; + } + pcm[f * 2] = s; + pcm[f * 2 + 1] = s; + } + if let Ok(n) = enc.encode_float(&pcm, &mut out) { + let d = punktfunk_core::quic::encode_mic_datagram(seq, now_ns(), &out[..n]); + if conn2.send_datagram(d.into()).is_err() { + break; + } + } + } + tracing::info!("mic-test: done"); + }); + } + // Closed-flag for the blocking receive loop. let closed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); { diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index 1190ed4..bcf7d05 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -885,6 +885,42 @@ pub unsafe extern "C" fn punktfunk_connection_send_input( }) } +/// Send one Opus mic frame to the host as a QUIC datagram (48 kHz; the host decodes it into a +/// virtual microphone source its apps can record). Non-blocking enqueue; the host uses `seq`/ +/// `pts_ns` (the caller's own counters) only for diagnostics. `opus_data`/`len` may be empty +/// (a DTX silence frame). The data is copied; the caller may reuse the buffer after this returns. +/// +/// # Safety +/// `c` is a valid connection handle; `opus_data` is valid for `len` bytes (or `len == 0`). +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_send_mic( + c: *mut PunktfunkConnection, + opus_data: *const u8, + len: usize, + seq: u32, + pts_ns: u64, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + if opus_data.is_null() && len != 0 { + return PunktfunkStatus::NullPointer; + } + let opus = if len == 0 { + Vec::new() + } else { + unsafe { std::slice::from_raw_parts(opus_data, len) }.to_vec() + }; + match c.inner.send_mic(seq, pts_ns, opus) { + Ok(()) => PunktfunkStatus::Ok, + Err(e) => e.status(), + } + }) +} + /// The currently active session mode — the Welcome's, until an accepted /// [`punktfunk_connection_request_mode`] switches it. Safe any time after connect. /// diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index bf12ce6..640034e 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -50,6 +50,8 @@ pub struct NativeClient { audio: Receiver, rumble: Receiver<(u16, u16, u16)>, input_tx: tokio::sync::mpsc::UnboundedSender, + /// Outbound mic frames `(seq, pts_ns, opus)` → encoded as 0xCB datagrams by the worker. + mic_tx: tokio::sync::mpsc::UnboundedSender<(u32, u64, Vec)>, reconfig_tx: tokio::sync::mpsc::UnboundedSender, shutdown: Arc, worker: Option>, @@ -85,6 +87,7 @@ impl NativeClient { let (audio_tx, audio_rx) = std::sync::mpsc::sync_channel::(AUDIO_QUEUE); let (rumble_tx, rumble_rx) = std::sync::mpsc::sync_channel::<(u16, u16, u16)>(RUMBLE_QUEUE); let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (mic_tx, mic_rx) = tokio::sync::mpsc::unbounded_channel::<(u32, u64, Vec)>(); let (reconfig_tx, reconfig_rx) = tokio::sync::mpsc::unbounded_channel::(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); let shutdown = Arc::new(AtomicBool::new(false)); @@ -118,6 +121,7 @@ impl NativeClient { audio_tx, rumble_tx, input_rx, + mic_rx, reconfig_rx, ready_tx, shutdown: shutdown_w, @@ -140,6 +144,7 @@ impl NativeClient { audio: audio_rx, rumble: rumble_rx, input_tx, + mic_tx, reconfig_tx, shutdown, worker: Some(worker), @@ -296,6 +301,16 @@ impl NativeClient { pub fn send_input(&self, ev: &InputEvent) -> Result<()> { self.input_tx.send(*ev).map_err(|_| PunktfunkError::Closed) } + + /// Queue one Opus mic frame for delivery as a 0xCB uplink datagram (the inverse of + /// [`next_audio`](Self::next_audio)). `seq`/`pts_ns` are the caller's own counters (the host + /// uses them only for diagnostics). The host decodes it into a virtual microphone source. + /// Best-effort — like every datagram, it's dropped under loss; no retransmit. + pub fn send_mic(&self, seq: u32, pts_ns: u64, opus: Vec) -> Result<()> { + self.mic_tx + .send((seq, pts_ns, opus)) + .map_err(|_| PunktfunkError::Closed) + } } impl Drop for NativeClient { @@ -318,6 +333,7 @@ struct WorkerArgs { audio_tx: SyncSender, rumble_tx: SyncSender<(u16, u16, u16)>, input_rx: tokio::sync::mpsc::UnboundedReceiver, + mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec)>, reconfig_rx: tokio::sync::mpsc::UnboundedReceiver, ready_tx: std::sync::mpsc::Sender>, shutdown: Arc, @@ -338,6 +354,7 @@ async fn worker_main(args: WorkerArgs) { audio_tx, rumble_tx, mut input_rx, + mut mic_rx, mut reconfig_rx, ready_tx, shutdown, @@ -429,6 +446,15 @@ async fn worker_main(args: WorkerArgs) { } }); + // Mic task: embedder Opus mic frames → 0xCB uplink datagrams (best-effort, dropped on loss). + let mic_conn = conn.clone(); + tokio::spawn(async move { + while let Some((seq, pts_ns, opus)) = mic_rx.recv().await { + let d = crate::quic::encode_mic_datagram(seq, pts_ns, &opus); + let _ = mic_conn.send_datagram(d.into()); + } + }); + // Control task: the handshake stream stays open for mid-stream renegotiation. One // request at a time — write Reconfigure, await Reconfigured, publish the active mode. { diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index df90644..7370271 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -554,10 +554,14 @@ pub fn frame(payload: &[u8]) -> Vec { } /// Datagram wire tags. Video rides UDP; everything low-rate rides QUIC datagrams, -/// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8), -/// audio = [`AUDIO_MAGIC`], rumble = [`RUMBLE_MAGIC`]. +/// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8, client→host), +/// audio = [`AUDIO_MAGIC`] (0xC9, host→client), rumble = [`RUMBLE_MAGIC`] (0xCA, host→client), +/// mic = [`MIC_MAGIC`] (0xCB, client→host). pub const AUDIO_MAGIC: u8 = 0xC9; pub const RUMBLE_MAGIC: u8 = 0xCA; +/// Microphone uplink: the client's mic, Opus-encoded, client → host (the inverse of +/// [`AUDIO_MAGIC`]). The host feeds it into a virtual PipeWire source so its apps can record it. +pub const MIC_MAGIC: u8 = 0xCB; /// Audio datagram, host → client: `[0xC9][u32 seq LE][u64 pts_ns LE][opus payload]`. /// One Opus frame per datagram (5 ms — well under any MTU); QUIC already encrypts. @@ -600,6 +604,27 @@ pub fn decode_rumble_datagram(b: &[u8]) -> Option<(u16, u16, u16)> { Some((u16at(1), u16at(3), u16at(5))) } +/// Mic datagram, client → host: `[0xCB][u32 seq LE][u64 pts_ns LE][opus payload]` — the same +/// layout as [`encode_audio_datagram`] with [`MIC_MAGIC`], one Opus frame per datagram. +pub fn encode_mic_datagram(seq: u32, pts_ns: u64, opus: &[u8]) -> Vec { + let mut b = Vec::with_capacity(13 + opus.len()); + b.push(MIC_MAGIC); + b.extend_from_slice(&seq.to_le_bytes()); + b.extend_from_slice(&pts_ns.to_le_bytes()); + b.extend_from_slice(opus); + b +} + +/// Parse a mic datagram → `(seq, pts_ns, opus payload)`. `None` on bad tag/length. +pub fn decode_mic_datagram(b: &[u8]) -> Option<(u32, u64, &[u8])> { + if b.len() < 13 || b[0] != MIC_MAGIC { + return None; + } + let seq = u32::from_le_bytes(b[1..5].try_into().unwrap()); + let pts_ns = u64::from_le_bytes(b[5..13].try_into().unwrap()); + Some((seq, pts_ns, &b[13..])) +} + /// Async framed-message IO over a quinn stream (`u16 LE length || payload`). pub mod io { /// Read one framed message (bounded at 64 KiB — control messages are tiny). @@ -1178,6 +1203,25 @@ mod tests { assert!(decode_rumble_datagram(&d[..6]).is_none()); } + #[test] + fn mic_datagram_roundtrip_and_disjoint_from_audio() { + let opus = [0x5Au8; 80]; + let d = encode_mic_datagram(42, 9_999, &opus); + assert_eq!(d[0], MIC_MAGIC); + let (seq, pts, payload) = decode_mic_datagram(&d).unwrap(); + assert_eq!((seq, pts), (42, 9_999)); + assert_eq!(payload, opus); + assert!(decode_mic_datagram(&d[..12]).is_none()); // truncated + // Tag separation: a mic datagram is not an audio datagram and vice-versa. + assert!(decode_audio_datagram(&d).is_none()); + assert!(decode_mic_datagram(&encode_audio_datagram(1, 2, &opus)).is_none()); + // Empty payload (DTX) is legal. + assert!(decode_mic_datagram(&encode_mic_datagram(0, 0, &[])) + .unwrap() + .2 + .is_empty()); + } + #[test] fn fingerprint_is_sha256_of_der() { // Stable across calls, distinct for distinct certs. diff --git a/crates/punktfunk-host/src/audio.rs b/crates/punktfunk-host/src/audio.rs index a7a69b3..aa9ef87 100644 --- a/crates/punktfunk-host/src/audio.rs +++ b/crates/punktfunk-host/src/audio.rs @@ -42,5 +42,32 @@ pub fn open_audio_capture(_channels: u32) -> Result> { anyhow::bail!("audio capture requires Linux + PipeWire") } +/// The inverse of [`AudioCapturer`]: a virtual microphone the host *produces*. It registers a +/// PipeWire `Audio/Source` node that host apps can record from; the host [`push`](Self::push)es +/// decoded client-mic PCM (interleaved `f32` at [`SAMPLE_RATE`]) into it, and PipeWire delivers +/// it to whichever app records the source — silence when no input is flowing. This is how the +/// client's microphone reaches host applications (mic passthrough). +pub trait VirtualMic: Send { + /// Push one chunk of interleaved `f32` PCM. Non-blocking — drops if PipeWire is behind + /// (mic audio is lossy/real-time; a stale chunk is worse than a dropped one). + fn push(&self, pcm: &[f32]); + + /// The interleaved channel count the source was opened with. + fn channels(&self) -> u32 { + CHANNELS as u32 + } +} + +/// Open a virtual microphone PipeWire source with `channels` interleaved channels (1 or 2). +#[cfg(target_os = "linux")] +pub fn open_virtual_mic(channels: u32) -> Result> { + linux::PwMicSource::open(channels).map(|m| Box::new(m) as Box) +} + +#[cfg(not(target_os = "linux"))] +pub fn open_virtual_mic(_channels: u32) -> Result> { + anyhow::bail!("virtual mic requires Linux + PipeWire") +} + #[cfg(target_os = "linux")] mod linux; diff --git a/crates/punktfunk-host/src/audio/linux.rs b/crates/punktfunk-host/src/audio/linux.rs index 684cf83..6c359c6 100644 --- a/crates/punktfunk-host/src/audio/linux.rs +++ b/crates/punktfunk-host/src/audio/linux.rs @@ -13,8 +13,9 @@ //! stream down promptly — required so a surround session can replace a stereo capturer //! without leaking a PipeWire consumer (see CLAUDE.md: a wedged link head-blocks the daemon). -use super::{AudioCapturer, SAMPLE_RATE}; +use super::{AudioCapturer, VirtualMic, SAMPLE_RATE}; use anyhow::{anyhow, Context, Result}; +use std::collections::VecDeque; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError}; use std::thread; use std::time::Duration; @@ -105,6 +106,232 @@ fn spa_positions(channels: u32) -> [u32; 64] { pos } +/// Virtual microphone: a PipeWire `Audio/Source` node host apps can record from. The host pushes +/// decoded client-mic PCM in; the loop thread's producer callback drains it (silence on +/// underrun) into PipeWire buffers. Mirrors [`PwAudioCapturer`] but inverted (Direction::Output). +pub struct PwMicSource { + pcm: std::sync::mpsc::SyncSender>, + channels: u32, + quit: pipewire::channel::Sender, +} + +impl PwMicSource { + pub fn open(channels: u32) -> Result { + anyhow::ensure!( + matches!(channels, 1 | 2), + "virtual mic supports 1 or 2 channels, got {channels}" + ); + let (pcm_tx, pcm_rx) = sync_channel::>(64); + let (quit_tx, quit_rx) = pipewire::channel::channel::(); + thread::Builder::new() + .name("punktfunk-pw-mic".into()) + .spawn(move || { + if let Err(e) = mic_pw_thread(pcm_rx, quit_rx, channels) { + tracing::error!(error = %format!("{e:#}"), "pipewire virtual-mic thread failed"); + } + }) + .context("spawn pipewire virtual-mic thread")?; + Ok(PwMicSource { + pcm: pcm_tx, + channels, + quit: quit_tx, + }) + } +} + +impl Drop for PwMicSource { + fn drop(&mut self) { + let _ = self.quit.send(Terminate); + } +} + +impl VirtualMic for PwMicSource { + fn push(&self, pcm: &[f32]) { + let _ = self.pcm.try_send(pcm.to_vec()); // drop if the PipeWire side is behind + } + fn channels(&self) -> u32 { + self.channels + } +} + +/// Producer-side state for the virtual-mic loop: incoming decoded PCM and a small ring buffer +/// the process callback drains into PipeWire buffers (capped, so latency stays bounded). +/// `primed` is a jitter buffer gate — see the process callback. +struct MicUserData { + rx: Receiver>, + ring: VecDeque, + channels: usize, + primed: bool, +} + +fn mic_pw_thread( + pcm_rx: Receiver>, + quit_rx: pipewire::channel::Receiver, + channels: u32, +) -> Result<()> { + use pipewire as pw; + use pw::{properties::properties, spa}; + use spa::param::audio::{AudioFormat, AudioInfoRaw}; + use spa::pod::Pod; + + crate::pwinit::ensure_init(); + let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?; + let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?; + let core = context + .connect_rc(None) + .context("pw mic connect (is PipeWire running in this session?)")?; + + let _quit_guard = quit_rx.attach(mainloop.loop_(), { + let mainloop = mainloop.clone(); + move |_| mainloop.quit() + }); + + // media.class=Audio/Source advertises us as a microphone (a recordable source), NOT a + // playback stream — without it, Direction::Output + Playback would route to the speakers. + let stream = pw::stream::StreamBox::new( + &core, + "punktfunk-mic", + properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CLASS => "Audio/Source", + *pw::keys::NODE_NAME => "punktfunk-mic", + *pw::keys::NODE_DESCRIPTION => "Punktfunk Remote Microphone", + // ~5 ms quantum (one Opus frame) so recording apps get smooth low-latency chunks. + *pw::keys::NODE_LATENCY => "240/48000", + }, + ) + .context("pw mic Stream")?; + + let ud = MicUserData { + rx: pcm_rx, + ring: VecDeque::new(), + channels: channels as usize, + primed: false, + }; + + let _listener = stream + .add_local_listener_with_user_data(ud) + .state_changed(|_s, _ud, old, new| { + tracing::info!(?old, ?new, "pipewire virtual-mic stream state"); + }) + .param_changed(|_s, _ud, id, param| { + let Some(param) = param else { return }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + let mut info = AudioInfoRaw::default(); + if info.parse(param).is_ok() { + tracing::info!( + format = ?info.format(), + rate = info.rate(), + channels = info.channels(), + "virtual-mic format negotiated" + ); + } + }) + .process(|stream, ud| { + let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; + }; + // Pull all newly-decoded PCM into the ring. + while let Ok(frame) = ud.rx.try_recv() { + ud.ring.extend(frame); + } + let stride = 4 * ud.channels; // F32LE interleaved + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0); + let want = want_frames * ud.channels; // interleaved samples this quantum needs + static FIRST: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) { + tracing::info!( + quantum_frames = want_frames, + quantum_ms = want_frames as f32 / 48.0, + "virtual-mic consumer connected" + ); + } + + // Adaptive jitter buffer. The client pushes 5 ms frames; the recorder pulls a + // whole *quantum* (often 20–43 ms) from an independent clock. A drain of one + // quantum must not outrun what's buffered, or every call underruns to silence + // (the original ~58% gaps). So prime to ~3 quanta before producing, hold there, + // and re-prime only after a genuine full drain (the client went quiet). The ring + // is capped at a few quanta so latency stays bounded. + let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels); + while ud.ring.len() > target.max(want) + want { + ud.ring.pop_front(); // bound latency: drop the oldest beyond ~1 quantum slack + } + if !ud.primed && ud.ring.len() >= target { + ud.primed = true; + } + + let n_frames = if let Some(slice) = data.data() { + for k in 0..want { + let s = if ud.primed { + ud.ring.pop_front().unwrap_or(0.0) // silence on a momentary underrun + } else { + 0.0 // not yet primed — emit silence while the buffer fills + }; + let off = k * 4; + slice[off..off + 4].copy_from_slice(&s.to_le_bytes()); + } + want_frames + } else { + 0 + }; + if ud.ring.is_empty() { + ud.primed = false; // fully drained — re-prime before producing again + } + let chunk = data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as _; + *chunk.size_mut() = (stride * n_frames) as _; + })); + if outcome.is_err() { + tracing::error!("panic in pipewire virtual-mic callback"); + } + }) + .register() + .context("register virtual-mic stream listener")?; + + let mut info = AudioInfoRaw::new(); + info.set_format(AudioFormat::F32LE); + info.set_rate(SAMPLE_RATE); + info.set_channels(channels); + info.set_position(spa_positions(channels)); + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .context("serialize mic format pod")? + .0 + .into_inner(); + let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?]; + + stream + .connect( + spa::utils::Direction::Output, // we PRODUCE samples (a source) + None, + pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS, + &mut params, + ) + .context("pw mic stream connect")?; + + mainloop.run(); + tracing::debug!("pipewire virtual-mic loop exited (source dropped)"); + Ok(()) +} + fn pw_thread( tx: std::sync::mpsc::SyncSender>, quit_rx: pipewire::channel::Receiver, diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index a128e40..40c0a78 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -185,6 +185,9 @@ async fn serve(opts: M3Options) -> Result<()> { // session — which, under rapid client reconnects, raced a prior session's portal teardown and // wedged KWin's EIS setup ("EIS setup timed out"). Gamepads stay per-session (uinput). let injector = InjectorService::start(); + // One virtual microphone for the whole host lifetime (see MicService): the client's mic uplink + // (0xCB) is Opus-decoded and fed into a persistent PipeWire Audio/Source host apps record from. + let mic_service = MicService::start(); let paired_at = match &opts.paired_store { Some(p) => p.clone(), None => paired_path()?, @@ -233,6 +236,7 @@ async fn serve(opts: M3Options) -> Result<()> { &opts, &audio_cap, injector.sender(), + mic_service.sender(), &fingerprint, &paired, &last_pairing, @@ -350,6 +354,7 @@ async fn serve_session( opts: &M3Options, audio_cap: &AudioCapSlot, inj_tx: std::sync::mpsc::Sender, + mic_tx: std::sync::mpsc::Sender>, host_fp: &[u8; 32], paired: &PairedStore, last_pairing: &std::sync::Mutex>, @@ -521,18 +526,30 @@ async fn serve_session( .spawn(move || input_thread(input_rx, conn, inj_tx)) .context("spawn input thread")? }; + // One reader for ALL client→host datagrams, demuxed by magic byte (two read_datagram loops + // would race for datagrams): 0xCB → mic uplink (Opus, forwarded to the host-lifetime mic + // service), 0xC8 → input (forwarded to the per-session input thread). The magics are disjoint, + // so decode order doesn't matter. Unknown tags are ignored. let input_conn = conn.clone(); tokio::spawn(async move { - let mut count = 0u64; + let (mut input_count, mut mic_count) = (0u64, 0u64); while let Ok(d) = input_conn.read_datagram().await { - if let Some(ev) = InputEvent::decode(&d) { - count += 1; + if let Some((_seq, _pts, opus)) = punktfunk_core::quic::decode_mic_datagram(&d) { + mic_count += 1; + // Host-lifetime mic service; a send error just means the host is shutting down. + let _ = mic_tx.send(opus.to_vec()); + } else if let Some(ev) = InputEvent::decode(&d) { + input_count += 1; if input_tx.send(ev).is_err() { break; } } } - tracing::info!(count, "input datagram stream ended"); + tracing::info!( + input = input_count, + mic = mic_count, + "client datagram stream ended" + ); }); // Stop signal: stream duration elapsed or the client went away. @@ -758,6 +775,92 @@ fn injector_service_thread(rx: std::sync::mpsc::Receiver) { tracing::debug!("injector service stopped (host shutting down)"); } +/// Mic is 48 kHz stereo — matches the Opus stereo decoder and the host→client audio layout. +const MIC_CHANNELS: u32 = 2; + +/// Host-lifetime virtual microphone, shared across punktfunk/1 sessions (mirror of +/// [`InjectorService`]). One thread owns the PipeWire `Audio/Source` + an Opus decoder; sessions +/// forward the client's Opus mic frames over a clonable `Send` channel, the thread decodes and +/// feeds the source. Opened lazily on the first frame, the source node persists across sessions +/// (no per-session registration churn), and reopens after a backoff if the source/decoder fails. +struct MicService { + tx: std::sync::mpsc::Sender>, +} + +impl MicService { + fn start() -> MicService { + let (tx, rx) = std::sync::mpsc::channel::>(); + if let Err(e) = std::thread::Builder::new() + .name("punktfunk-m3-mic".into()) + .spawn(move || mic_service_thread(rx)) + { + tracing::error!(error = %e, "mic service thread spawn failed — mic passthrough disabled"); + } + MicService { tx } + } + + /// A sender a session forwards the client's Opus mic frames to. Cloned per session; dropping a + /// clone does NOT stop the service (it holds the original sender for the host life). + fn sender(&self) -> std::sync::mpsc::Sender> { + self.tx.clone() + } +} + +/// The host-lifetime mic worker: lazily open the virtual mic + decoder, then Opus-decode each +/// forwarded frame and push the PCM into the source. Reopen (after [`INJECTOR_REOPEN_BACKOFF`]) +/// on open failure or a decode error. Exits when every session sender and the service's own +/// sender drop (host shutdown), tearing the PipeWire source down. +fn mic_service_thread(rx: std::sync::mpsc::Receiver>) { + let mut mic: Option> = None; + let mut decoder: Option = None; + let mut last_failed: Option = None; + let mut pcm = vec![0f32; 5760 * MIC_CHANNELS as usize]; // up to 120 ms scratch + for opus_frame in rx { + if opus_frame.is_empty() { + continue; // DTX silence — the source underruns to silence on its own + } + if mic.is_none() || decoder.is_none() { + if last_failed.is_some_and(|t| t.elapsed() < INJECTOR_REOPEN_BACKOFF) { + continue; // still within the reopen backoff window + } + let opened = crate::audio::open_virtual_mic(MIC_CHANNELS).and_then(|m| { + let d = opus::Decoder::new(48_000, opus::Channels::Stereo) + .map_err(|e| anyhow!("opus decoder: {e}"))?; + Ok((m, d)) + }); + match opened { + Ok((m, d)) => { + tracing::info!("punktfunk/1 virtual mic ready (host-lifetime)"); + mic = Some(m); + decoder = Some(d); + last_failed = None; + } + Err(e) => { + tracing::error!(error = %format!("{e:#}"), "virtual mic unavailable — will retry"); + last_failed = Some(std::time::Instant::now()); + continue; + } + } + } + let (Some(m), Some(dec)) = (mic.as_ref(), decoder.as_mut()) else { + continue; + }; + match dec.decode_float(&opus_frame, &mut pcm, false) { + Ok(samples_per_ch) => { + let total = (samples_per_ch * MIC_CHANNELS as usize).min(pcm.len()); + m.push(&pcm[..total]); + } + Err(e) => { + tracing::warn!(error = %e, "mic opus decode failed — reopening"); + mic = None; + decoder = None; + last_failed = Some(std::time::Instant::now()); + } + } + } + tracing::debug!("mic service stopped (host shutting down)"); +} + /// The per-session input thread: route pointer/keyboard events to the host-lifetime injector /// service (`inj_tx`) and gamepad events to this session's own [`GamepadManager`] /// (crate::inject::gamepad), with force feedback pumped between events and sent back as rumble diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index e6ad7c3..88a4cf8 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -135,8 +135,9 @@ #if defined(PUNKTFUNK_FEATURE_QUIC) // Datagram wire tags. Video rides UDP; everything low-rate rides QUIC datagrams, -// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8), -// audio = [`AUDIO_MAGIC`], rumble = [`RUMBLE_MAGIC`]. +// demultiplexed by the first byte: input = [`crate::input::INPUT_MAGIC`] (0xC8, client→host), +// audio = [`AUDIO_MAGIC`] (0xC9, host→client), rumble = [`RUMBLE_MAGIC`] (0xCA, host→client), +// mic = [`MIC_MAGIC`] (0xCB, client→host). #define PUNKTFUNK_AUDIO_MAGIC 201 #endif @@ -144,6 +145,12 @@ #define PUNKTFUNK_RUMBLE_MAGIC 202 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Microphone uplink: the client's mic, Opus-encoded, client → host (the inverse of +// [`AUDIO_MAGIC`]). The host feeds it into a virtual PipeWire source so its apps can record it. +#define MIC_MAGIC 203 +#endif + // Stable C ABI status codes. `Ok` is 0; all errors are negative so callers can // test `rc < 0`. Do not renumber existing variants — only append. enum PunktfunkStatus @@ -507,6 +514,21 @@ PunktfunkStatus punktfunk_connection_send_input(PunktfunkConnection *c, const PunktfunkInputEvent *ev); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Send one Opus mic frame to the host as a QUIC datagram (48 kHz; the host decodes it into a +// virtual microphone source its apps can record). Non-blocking enqueue; the host uses `seq`/ +// `pts_ns` (the caller's own counters) only for diagnostics. `opus_data`/`len` may be empty +// (a DTX silence frame). The data is copied; the caller may reuse the buffer after this returns. +// +// # Safety +// `c` is a valid connection handle; `opus_data` is valid for `len` bytes (or `len == 0`). +PunktfunkStatus punktfunk_connection_send_mic(PunktfunkConnection *c, + const uint8_t *opus_data, + uintptr_t len, + uint32_t seq, + uint64_t pts_ns); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // The currently active session mode — the Welcome's, until an accepted // [`punktfunk_connection_request_mode`] switches it. Safe any time after connect.