75627c8afe
apple / swift (push) Failing after 10s
release / apple (push) Failing after 7s
apple / screenshots (push) Has been skipped
audit / cargo-audit (push) Failing after 1m19s
windows-host / package (push) Failing after 2m44s
windows-msix / package (arm64, C:\Users\Public\ffmpeg-arm64, aarch64-pc-windows-msvc, C:\t-a64) (push) Failing after 39s
windows-msix / package (x64, C:\Users\Public\ffmpeg, x86_64-pc-windows-msvc, C:\t) (push) Failing after 39s
windows / build (aarch64-pc-windows-msvc) (push) Failing after 45s
android / android (push) Successful in 5m17s
windows / build (x86_64-pc-windows-msvc) (push) Failing after 45s
ci / web (push) Successful in 57s
ci / docs-site (push) Successful in 56s
ci / rust (push) Successful in 9m19s
ci / bench (push) Successful in 4m40s
decky / build-publish (push) Successful in 26s
deb / build-publish (push) Successful in 2m57s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 33s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 2m56s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 2m35s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 2m20s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 53s
flatpak / build-publish (push) Successful in 4m22s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 8m51s
docker / deploy-docs (push) Successful in 21s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 8m50s
Adds negotiated 5.1/7.1 surround to the punktfunk/1 protocol and every client (previously stereo-only): - core: new shared `audio` layout table (LAYOUT_51/71 + identity multistream mapping, canonical wire order FL FR FC LFE RL RR SL SR); Hello/Welcome `audio_channels` negotiation via the trailing-byte back-compat pattern (old peers fall back to stereo); C-ABI `punktfunk_connect_ex6`, `punktfunk_connection_audio_channels`, and in-core multistream decode `punktfunk_connection_next_audio_pcm` for embedders without a multistream Opus decoder. Real-libopus channel-identity round-trip test. - host: native audio thread captures + Opus-(multi)stream-encodes at the negotiated count (with a cross-session cached-capturer channel-mismatch fix); GameStream surround unified onto the safe `opus::MSEncoder`, dropping `audiopus_sys` (~4 unsafe blocks) and un-gating Windows GameStream surround; WASAPI loopback capture relaxed to 2/6/8 with the correct dwChannelMask. - clients: Linux (PipeWire), Windows (WASAPI), Android (AAudio) decode via `opus::MSDecoder` + render multichannel; Apple decodes in-core to PCM → AVAudioEngine with an explicit wire-order channel layout; each gains a Stereo/5.1/7.1 setting. `punktfunk-probe --audio-channels N` is the headless validator. Verified on Linux: core/host/linux/probe test suites + the Android Rust (cargo-ndk) build, clippy -D warnings, and rustfmt all green. Windows/Apple builds, all on-glass checks, and the live native loopback are pending (CI / a free box). Also lands the concurrent in-tree HEVC 4:4:4 host work (PUNKTFUNK_444): it shares the same touched files (quic.rs, punktfunk1.rs, encode/*, ...) and so cannot be committed separately from the surround changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
394 lines
14 KiB
Rust
394 lines
14 KiB
Rust
//! Audio: playback (decoded PCM → a PipeWire playback stream) and the microphone uplink
|
||
//! (PipeWire capture → Opus → 0xCB datagrams, the inverse of the host's virtual mic).
|
||
//!
|
||
//! Playback mirrors the host's virtual-mic producer (`punktfunk-host::audio::linux`) with
|
||
//! the same adaptive jitter buffer: the session pump pushes 5 ms Opus-decoded chunks on
|
||
//! the network clock; PipeWire pulls whole quanta on the device clock. Prime to ~3
|
||
//! quanta before producing, cap the ring so latency stays bounded, re-prime after a real
|
||
//! drain.
|
||
|
||
use anyhow::{Context, Result};
|
||
use punktfunk_core::client::NativeClient;
|
||
use std::collections::VecDeque;
|
||
use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
|
||
use std::sync::Arc;
|
||
|
||
const SAMPLE_RATE: u32 = 48_000;
|
||
const CHANNELS: usize = 2;
|
||
/// Mic frames are 20 ms (960 samples/channel) — any size ≤ 120 ms is fine host-side.
|
||
const MIC_FRAME: usize = 960;
|
||
|
||
struct Terminate;
|
||
|
||
pub struct AudioPlayer {
|
||
pcm_tx: SyncSender<Vec<f32>>,
|
||
quit_tx: pipewire::channel::Sender<Terminate>,
|
||
thread: Option<std::thread::JoinHandle<()>>,
|
||
}
|
||
|
||
impl AudioPlayer {
|
||
/// Spawn the PipeWire playback thread for `channels` (2/6/8, canonical wire order
|
||
/// FL FR FC LFE RL RR SL SR). Failure (no PipeWire in the session) is survivable — the
|
||
/// caller streams video-only.
|
||
pub fn spawn(channels: u32) -> Result<AudioPlayer> {
|
||
// 64 × 5 ms = 320 ms of slack between the pump and the PipeWire loop.
|
||
let (pcm_tx, pcm_rx) = std::sync::mpsc::sync_channel::<Vec<f32>>(64);
|
||
let (quit_tx, quit_rx) = pipewire::channel::channel::<Terminate>();
|
||
let thread = std::thread::Builder::new()
|
||
.name("punktfunk-audio".into())
|
||
.spawn(move || {
|
||
if let Err(e) = pw_thread(pcm_rx, quit_rx, channels as usize) {
|
||
tracing::warn!(error = %e, "audio playback thread ended");
|
||
}
|
||
})
|
||
.context("spawn audio thread")?;
|
||
Ok(AudioPlayer {
|
||
pcm_tx,
|
||
quit_tx,
|
||
thread: Some(thread),
|
||
})
|
||
}
|
||
|
||
/// Queue one interleaved f32 chunk (in the session's channel layout). Drops the chunk if the
|
||
/// PipeWire side is wedged (the renderer conceals the gap; never block the session pump).
|
||
pub fn push(&self, pcm: Vec<f32>) {
|
||
if let Err(TrySendError::Disconnected(_)) = self.pcm_tx.try_send(pcm) {
|
||
// Thread already dead — Drop will reap it; nothing to do per-chunk.
|
||
}
|
||
}
|
||
}
|
||
|
||
impl Drop for AudioPlayer {
|
||
fn drop(&mut self) {
|
||
let _ = self.quit_tx.send(Terminate);
|
||
if let Some(t) = self.thread.take() {
|
||
let _ = t.join();
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Producer-side state: incoming decoded PCM and the ring the process callback drains.
|
||
struct PlayerData {
|
||
rx: Receiver<Vec<f32>>,
|
||
ring: VecDeque<f32>,
|
||
primed: bool,
|
||
/// Interleaved channel count this stream was opened with (2/6/8).
|
||
channels: usize,
|
||
}
|
||
|
||
fn pw_thread(
|
||
pcm_rx: Receiver<Vec<f32>>,
|
||
quit_rx: pipewire::channel::Receiver<Terminate>,
|
||
channels: usize,
|
||
) -> Result<()> {
|
||
use pipewire as pw;
|
||
use pw::{properties::properties, spa};
|
||
use spa::param::audio::{AudioFormat, AudioInfoRaw};
|
||
use spa::pod::Pod;
|
||
|
||
static PW_INIT: std::sync::Once = std::sync::Once::new();
|
||
PW_INIT.call_once(pw::init);
|
||
|
||
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?;
|
||
let context = pw::context::ContextRc::new(&mainloop, None).context("pw Context")?;
|
||
let core = context
|
||
.connect_rc(None)
|
||
.context("pw connect (is PipeWire running in this session?)")?;
|
||
|
||
let _quit_guard = quit_rx.attach(mainloop.loop_(), {
|
||
let mainloop = mainloop.clone();
|
||
move |_| mainloop.quit()
|
||
});
|
||
|
||
let stream = pw::stream::StreamBox::new(
|
||
&core,
|
||
"punktfunk-client",
|
||
properties! {
|
||
*pw::keys::MEDIA_TYPE => "Audio",
|
||
*pw::keys::MEDIA_CATEGORY => "Playback",
|
||
*pw::keys::MEDIA_ROLE => "Game",
|
||
*pw::keys::NODE_NAME => "punktfunk-client",
|
||
*pw::keys::NODE_DESCRIPTION => "Punktfunk Stream",
|
||
// ~5 ms quantum (one Opus frame) keeps the ring — and so the latency — small.
|
||
*pw::keys::NODE_LATENCY => "240/48000",
|
||
},
|
||
)
|
||
.context("pw Stream")?;
|
||
|
||
let ud = PlayerData {
|
||
rx: pcm_rx,
|
||
ring: VecDeque::new(),
|
||
primed: false,
|
||
channels,
|
||
};
|
||
|
||
let _listener = stream
|
||
.add_local_listener_with_user_data(ud)
|
||
.state_changed(|_s, _ud, old, new| {
|
||
tracing::debug!(?old, ?new, "pipewire playback stream state");
|
||
})
|
||
.process(|stream, ud| {
|
||
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||
return;
|
||
};
|
||
while let Ok(chunk) = ud.rx.try_recv() {
|
||
ud.ring.extend(chunk);
|
||
}
|
||
let stride = 4 * ud.channels; // F32LE interleaved
|
||
let datas = buffer.datas_mut();
|
||
if datas.is_empty() {
|
||
return;
|
||
}
|
||
let data = &mut datas[0];
|
||
let want_frames = data.data().map(|s| s.len() / stride).unwrap_or(0);
|
||
let want = want_frames * ud.channels;
|
||
|
||
// Adaptive jitter buffer (same shape as the host's virtual mic): prime to
|
||
// ~3 quanta, cap at ~1 quantum of slack beyond that, re-prime after a
|
||
// genuine drain.
|
||
let target = (3 * want).clamp(720 * ud.channels, 9600 * ud.channels);
|
||
while ud.ring.len() > target.max(want) + want {
|
||
ud.ring.pop_front();
|
||
}
|
||
if !ud.primed && ud.ring.len() >= target {
|
||
ud.primed = true;
|
||
}
|
||
|
||
let n_frames = if let Some(slice) = data.data() {
|
||
for k in 0..want {
|
||
let s = if ud.primed {
|
||
ud.ring.pop_front().unwrap_or(0.0)
|
||
} else {
|
||
0.0
|
||
};
|
||
let off = k * 4;
|
||
slice[off..off + 4].copy_from_slice(&s.to_le_bytes());
|
||
}
|
||
want_frames
|
||
} else {
|
||
0
|
||
};
|
||
if ud.ring.is_empty() {
|
||
ud.primed = false;
|
||
}
|
||
let chunk = data.chunk_mut();
|
||
*chunk.offset_mut() = 0;
|
||
*chunk.stride_mut() = stride as _;
|
||
*chunk.size_mut() = (stride * n_frames) as _;
|
||
}));
|
||
if outcome.is_err() {
|
||
tracing::error!("panic in pipewire playback callback");
|
||
}
|
||
})
|
||
.register()
|
||
.context("register playback listener")?;
|
||
|
||
let mut info = AudioInfoRaw::new();
|
||
info.set_format(AudioFormat::F32LE);
|
||
info.set_rate(SAMPLE_RATE);
|
||
info.set_channels(channels as u32);
|
||
// Channel positions in canonical wire order (FL FR FC LFE RL RR SL SR) so PipeWire routes each
|
||
// slot to the matching speaker (and downmixes when the sink has fewer). Identity, no permute.
|
||
let order = punktfunk_core::audio::spa_positions(channels as u8);
|
||
let mut positions = [0u32; 64];
|
||
positions[..order.len()].copy_from_slice(order);
|
||
info.set_position(positions);
|
||
let obj = pw::spa::pod::Object {
|
||
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
|
||
id: pw::spa::param::ParamType::EnumFormat.as_raw(),
|
||
properties: info.into(),
|
||
};
|
||
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
|
||
std::io::Cursor::new(Vec::new()),
|
||
&pw::spa::pod::Value::Object(obj),
|
||
)
|
||
.context("serialize format pod")?
|
||
.0
|
||
.into_inner();
|
||
let mut params = [Pod::from_bytes(&values).context("pod from bytes")?];
|
||
|
||
stream
|
||
.connect(
|
||
spa::utils::Direction::Output,
|
||
None,
|
||
pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS,
|
||
&mut params,
|
||
)
|
||
.context("pw stream connect")?;
|
||
|
||
mainloop.run();
|
||
tracing::debug!("pipewire playback loop exited");
|
||
Ok(())
|
||
}
|
||
|
||
/// The microphone uplink: capture the default input device, Opus-encode 20 ms chunks,
|
||
/// ship them as 0xCB datagrams into the host's virtual PipeWire source.
|
||
pub struct MicStreamer {
|
||
quit_tx: pipewire::channel::Sender<Terminate>,
|
||
thread: Option<std::thread::JoinHandle<()>>,
|
||
}
|
||
|
||
impl MicStreamer {
|
||
pub fn spawn(connector: Arc<NativeClient>) -> Result<MicStreamer> {
|
||
let (quit_tx, quit_rx) = pipewire::channel::channel::<Terminate>();
|
||
let thread = std::thread::Builder::new()
|
||
.name("punktfunk-mic".into())
|
||
.spawn(move || {
|
||
if let Err(e) = mic_thread(&connector, quit_rx) {
|
||
tracing::warn!(error = %e, "mic uplink thread ended");
|
||
}
|
||
})
|
||
.context("spawn mic thread")?;
|
||
Ok(MicStreamer {
|
||
quit_tx,
|
||
thread: Some(thread),
|
||
})
|
||
}
|
||
}
|
||
|
||
impl Drop for MicStreamer {
|
||
fn drop(&mut self) {
|
||
let _ = self.quit_tx.send(Terminate);
|
||
if let Some(t) = self.thread.take() {
|
||
let _ = t.join();
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Capture-side state: accumulated PCM and the Opus encoder (encoding a 20 ms frame is
|
||
/// ~100 µs — fine inside the process callback).
|
||
struct MicData {
|
||
connector: Arc<NativeClient>,
|
||
ring: VecDeque<f32>,
|
||
encoder: opus::Encoder,
|
||
seq: u32,
|
||
out: Vec<u8>,
|
||
}
|
||
|
||
fn mic_thread(
|
||
connector: &Arc<NativeClient>,
|
||
quit_rx: pipewire::channel::Receiver<Terminate>,
|
||
) -> Result<()> {
|
||
use pipewire as pw;
|
||
use pw::{properties::properties, spa};
|
||
use spa::param::audio::{AudioFormat, AudioInfoRaw};
|
||
use spa::pod::Pod;
|
||
|
||
static PW_INIT: std::sync::Once = std::sync::Once::new();
|
||
PW_INIT.call_once(pw::init);
|
||
|
||
let mut encoder =
|
||
opus::Encoder::new(SAMPLE_RATE, opus::Channels::Stereo, opus::Application::Voip)
|
||
.map_err(|e| anyhow::anyhow!("opus encoder: {e}"))?;
|
||
let _ = encoder.set_bitrate(opus::Bitrate::Bits(64_000));
|
||
|
||
let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw mic MainLoop")?;
|
||
let context = pw::context::ContextRc::new(&mainloop, None).context("pw mic Context")?;
|
||
let core = context
|
||
.connect_rc(None)
|
||
.context("pw mic connect (is PipeWire running in this session?)")?;
|
||
|
||
let _quit_guard = quit_rx.attach(mainloop.loop_(), {
|
||
let mainloop = mainloop.clone();
|
||
move |_| mainloop.quit()
|
||
});
|
||
|
||
let stream = pw::stream::StreamBox::new(
|
||
&core,
|
||
"punktfunk-mic-capture",
|
||
properties! {
|
||
*pw::keys::MEDIA_TYPE => "Audio",
|
||
*pw::keys::MEDIA_CATEGORY => "Capture",
|
||
*pw::keys::MEDIA_ROLE => "Communication",
|
||
*pw::keys::NODE_NAME => "punktfunk-mic-capture",
|
||
*pw::keys::NODE_DESCRIPTION => "Punktfunk Microphone",
|
||
},
|
||
)
|
||
.context("pw mic Stream")?;
|
||
|
||
let ud = MicData {
|
||
connector: connector.clone(),
|
||
ring: VecDeque::new(),
|
||
encoder,
|
||
seq: 0,
|
||
out: vec![0u8; 4000],
|
||
};
|
||
|
||
let _listener = stream
|
||
.add_local_listener_with_user_data(ud)
|
||
.state_changed(|_s, _ud, old, new| {
|
||
tracing::debug!(?old, ?new, "pipewire mic capture stream state");
|
||
})
|
||
.process(|stream, ud| {
|
||
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||
return;
|
||
};
|
||
let datas = buffer.datas_mut();
|
||
if datas.is_empty() {
|
||
return;
|
||
}
|
||
let data = &mut datas[0];
|
||
let n = data.chunk().size() as usize;
|
||
if let Some(slice) = data.data() {
|
||
for s in slice[..n.min(slice.len())].chunks_exact(4) {
|
||
ud.ring
|
||
.push_back(f32::from_le_bytes([s[0], s[1], s[2], s[3]]));
|
||
}
|
||
}
|
||
// Ship every complete 20 ms stereo frame.
|
||
while ud.ring.len() >= MIC_FRAME * CHANNELS {
|
||
let pcm: Vec<f32> = ud.ring.drain(..MIC_FRAME * CHANNELS).collect();
|
||
match ud.encoder.encode_float(&pcm, &mut ud.out) {
|
||
Ok(len) => {
|
||
let pts = std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.map(|d| d.as_nanos() as u64)
|
||
.unwrap_or(0);
|
||
let _ = ud.connector.send_mic(ud.seq, pts, ud.out[..len].to_vec());
|
||
ud.seq = ud.seq.wrapping_add(1);
|
||
}
|
||
Err(e) => tracing::debug!(error = %e, "opus mic encode"),
|
||
}
|
||
}
|
||
}));
|
||
if outcome.is_err() {
|
||
tracing::error!("panic in pipewire mic callback");
|
||
}
|
||
})
|
||
.register()
|
||
.context("register mic listener")?;
|
||
|
||
let mut info = AudioInfoRaw::new();
|
||
info.set_format(AudioFormat::F32LE);
|
||
info.set_rate(SAMPLE_RATE);
|
||
info.set_channels(CHANNELS as u32);
|
||
let obj = pw::spa::pod::Object {
|
||
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
|
||
id: pw::spa::param::ParamType::EnumFormat.as_raw(),
|
||
properties: info.into(),
|
||
};
|
||
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
|
||
std::io::Cursor::new(Vec::new()),
|
||
&pw::spa::pod::Value::Object(obj),
|
||
)
|
||
.context("serialize mic format pod")?
|
||
.0
|
||
.into_inner();
|
||
let mut params = [Pod::from_bytes(&values).context("mic pod from bytes")?];
|
||
|
||
stream
|
||
.connect(
|
||
spa::utils::Direction::Input,
|
||
None,
|
||
pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS,
|
||
&mut params,
|
||
)
|
||
.context("pw mic stream connect")?;
|
||
|
||
mainloop.run();
|
||
tracing::debug!("pipewire mic capture loop exited");
|
||
Ok(())
|
||
}
|