diff --git a/CLAUDE.md b/CLAUDE.md index f2697da..8ee1b03 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -68,7 +68,11 @@ Low-latency desktop/game streaming stack, Linux-first, with a shared Rust protoc at high res). 3. **punktfunk/1 protocol growth**: concurrent sessions (today: one at a time, extras wait in the accept queue); mgmt REST endpoints for the punktfunk/1 paired-client list. -4. **M2 polish**: HDR negotiation, reconnect-at-new-mode robustness. +4. **M2 polish**: HDR/10-bit (needs HDR capture + metadata plumbing; `av1_nvenc + -highbitdepth 1` already encodes Main10 from 8-bit input on this box), + reconnect-at-new-mode robustness. AV1 negotiation and surround audio are implemented + and unit/live-capture tested — both still need a live Moonlight confirmation (select + AV1 in a stock client; a real 5.1/7.1 listen incl. FEC under loss). 5. **Native clients** (`clients/{apple,android}` scaffolds) consuming `punktfunk_core.h`. Box one-time setup is complete: udev rule + `input` group (gamepads validated live), diff --git a/Cargo.lock b/Cargo.lock index ae77d4e..293e09c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,6 +1952,7 @@ dependencies = [ "anyhow", "ash", "ashpd", + "audiopus_sys", "axum", "axum-server", "cbc", diff --git a/crates/punktfunk-host/Cargo.toml b/crates/punktfunk-host/Cargo.toml index 84bb485..4f82283 100644 --- a/crates/punktfunk-host/Cargo.toml +++ b/crates/punktfunk-host/Cargo.toml @@ -76,6 +76,10 @@ serde_json = "1" xkbcommon = "0.8" # Opus encode for the GameStream audio stream (links system libopus). opus = "0.3" +# The safe `opus` crate is stereo-only; surround (5.1/7.1) needs the libopus *multistream* +# encoder (`opus_multistream_encoder_*`). `audiopus_sys` is the sys layer `opus` already +# vendors (same libopus link), so this adds bindings, not a second copy of the library. +audiopus_sys = "0.2" # libei (EI sender) for the portable input path on KWin/GNOME (RemoteDesktop portal). # The `tokio` feature wires reis's event stream into tokio's reactor. reis = { version = "0.6.1", features = ["tokio"] } diff --git a/crates/punktfunk-host/src/audio.rs b/crates/punktfunk-host/src/audio.rs index 81fca09..a7a69b3 100644 --- a/crates/punktfunk-host/src/audio.rs +++ b/crates/punktfunk-host/src/audio.rs @@ -1,34 +1,44 @@ //! Desktop audio capture for the GameStream audio stream. On Linux: a PipeWire stream that //! records the default sink's monitor (i.e. everything playing out of the system), delivered -//! as interleaved `f32` stereo PCM at 48 kHz. The audio data plane (`gamestream::audio`) -//! reframes this into fixed Opus frames, encodes, and sends it. +//! as interleaved `f32` PCM at 48 kHz in the requested channel count (stereo, 5.1 or 7.1 — +//! GameStream surround order FL FR FC LFE RL RR [SL SR]). The audio data plane +//! (`gamestream::audio`) reframes this into fixed Opus frames, encodes, and sends it. use anyhow::Result; -/// Opus/GameStream audio is 48 kHz stereo. +/// Opus/GameStream audio is 48 kHz. pub const SAMPLE_RATE: u32 = 48_000; +/// Stereo channel count — the default and the punktfunk/1 (M3) audio plane's fixed layout. pub const CHANNELS: usize = 2; -/// Produces interleaved `f32` stereo PCM (L,R,L,R,…) at [`SAMPLE_RATE`]. Lives on its own -/// thread; never blocks the capture loop (drops if the consumer falls behind). +/// Produces interleaved `f32` PCM at [`SAMPLE_RATE`] in the channel count it was opened +/// with. Lives on its own thread; never blocks the capture loop (drops if the consumer +/// falls behind). pub trait AudioCapturer: Send { /// Block until the next chunk of interleaved samples is available (variable size). The /// caller reframes into fixed Opus frames. fn next_chunk(&mut self) -> Result>; + /// The interleaved channel count this capturer delivers (what it was opened with). + fn channels(&self) -> u32 { + CHANNELS as u32 + } + /// Discard any buffered chunks (called when a persistent capturer is reused for a new /// stream, so the client doesn't hear stale audio captured while idle). Default: no-op. fn drain(&mut self) {} } -/// Open a live capturer for the default sink monitor (system output) via PipeWire. +/// Open a live capturer for the default sink monitor (system output) via PipeWire, asking +/// for `channels` interleaved channels. If the sink has fewer channels than requested, +/// PipeWire's channel-mixer fills the missing positions with silence (zero upmix). #[cfg(target_os = "linux")] -pub fn open_audio_capture() -> Result> { - linux::PwAudioCapturer::open().map(|c| Box::new(c) as Box) +pub fn open_audio_capture(channels: u32) -> Result> { + linux::PwAudioCapturer::open(channels).map(|c| Box::new(c) as Box) } #[cfg(not(target_os = "linux"))] -pub fn open_audio_capture() -> Result> { +pub fn open_audio_capture(_channels: u32) -> Result> { anyhow::bail!("audio capture requires Linux + PipeWire") } diff --git a/crates/punktfunk-host/src/audio/linux.rs b/crates/punktfunk-host/src/audio/linux.rs index e908e74..684cf83 100644 --- a/crates/punktfunk-host/src/audio/linux.rs +++ b/crates/punktfunk-host/src/audio/linux.rs @@ -5,29 +5,58 @@ //! default sink's monitor into us — no portal needed (unlike screen capture). The (`!Send`) //! MainLoop/Stream live on a dedicated thread; interleaved `f32` chunks leave over a bounded //! channel (dropped if the encoder falls behind, never blocking the PipeWire loop). +//! +//! The stream is opened at the *session's* channel count (2/6/8). If the sink has fewer +//! channels than requested, PipeWire's channel-mixer fills the extra positions with silence +//! (zero upmix), so a stereo desktop still produces a valid 5.1/7.1 capture. Dropping the +//! capturer quits the loop thread (via a `pipewire::channel` Terminate message), tearing the +//! stream down promptly — required so a surround session can replace a stereo capturer +//! without leaking a PipeWire consumer (see CLAUDE.md: a wedged link head-blocks the daemon). -use super::{AudioCapturer, CHANNELS, SAMPLE_RATE}; +use super::{AudioCapturer, SAMPLE_RATE}; use anyhow::{anyhow, Context, Result}; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError}; use std::thread; use std::time::Duration; +/// Message asking the PipeWire loop thread to quit (sent from `Drop`). +struct Terminate; + pub struct PwAudioCapturer { chunks: Receiver>, + channels: u32, + quit: pipewire::channel::Sender, } impl PwAudioCapturer { - pub fn open() -> Result { + pub fn open(channels: u32) -> Result { + anyhow::ensure!( + matches!(channels, 1 | 2 | 6 | 8), + "unsupported audio channel count {channels} (want 2, 6 or 8)" + ); let (tx, rx) = sync_channel::>(64); + let (quit_tx, quit_rx) = pipewire::channel::channel::(); thread::Builder::new() .name("punktfunk-pw-audio".into()) .spawn(move || { - if let Err(e) = pw_thread(tx) { + if let Err(e) = pw_thread(tx, quit_rx, channels) { tracing::error!(error = %format!("{e:#}"), "pipewire audio thread failed"); } }) .context("spawn pipewire audio thread")?; - Ok(PwAudioCapturer { chunks: rx }) + Ok(PwAudioCapturer { + chunks: rx, + channels, + quit: quit_tx, + }) + } +} + +impl Drop for PwAudioCapturer { + fn drop(&mut self) { + // Ask the loop thread to quit; the stream/core/loop unwind there (RAII). A failed + // send means the thread already exited — nothing to tear down. + let _ = self.quit.send(Terminate); } } @@ -40,12 +69,47 @@ impl AudioCapturer for PwAudioCapturer { } } + fn channels(&self) -> u32 { + self.channels + } + fn drain(&mut self) { while self.chunks.try_recv().is_ok() {} } } -fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { +/// SPA channel position array for the GameStream surround order FL FR FC LFE RL RR [SL SR] +/// (= the PipeWire/PulseAudio default map for 6/8 channels, and the order Moonlight's +/// renderers expect — moonlight-common-c: "we use FL FR C LFE RL RR SL SR"). Values are +/// `enum spa_audio_channel` (spa/param/audio/raw.h): FL=3 FR=4 FC=5 LFE=6 SL=7 SR=8 RL=12 +/// RR=13. +fn spa_positions(channels: u32) -> [u32; 64] { + const FL: u32 = 3; + const FR: u32 = 4; + const FC: u32 = 5; + const LFE: u32 = 6; + const SL: u32 = 7; + const SR: u32 = 8; + const RL: u32 = 12; + const RR: u32 = 13; + const MONO: u32 = 2; + let mut pos = [0u32; 64]; + let order: &[u32] = match channels { + 1 => &[MONO], + 2 => &[FL, FR], + 6 => &[FL, FR, FC, LFE, RL, RR], + 8 => &[FL, FR, FC, LFE, RL, RR, SL, SR], + _ => unreachable!("validated in open()"), + }; + pos[..order.len()].copy_from_slice(order); + pos +} + +fn pw_thread( + tx: std::sync::mpsc::SyncSender>, + quit_rx: pipewire::channel::Receiver, + channels: u32, +) -> Result<()> { use pipewire as pw; use pw::{properties::properties, spa}; use spa::param::audio::{AudioFormat, AudioInfoRaw}; @@ -58,6 +122,12 @@ fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { .connect_rc(None) .context("pw audio connect (is PipeWire running in this session?)")?; + // Cross-thread teardown: the capturer's Drop sends Terminate; quit the loop here. + let _quit_guard = quit_rx.attach(mainloop.loop_(), { + let mainloop = mainloop.clone(); + move |_| mainloop.quit() + }); + let stream = pw::stream::StreamBox::new( &core, "punktfunk-audio", @@ -118,7 +188,7 @@ fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { static FIRST: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true); if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) { - tracing::info!(samples = n, frames = n / 2, "audio first capture buffer"); + tracing::info!(samples = n, "audio first capture buffer"); } let mut samples = Vec::with_capacity(n); for i in 0..n { @@ -139,11 +209,13 @@ fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { .register() .context("register audio stream listener")?; - // Request F32LE, 48 kHz, stereo. + // Request F32LE, 48 kHz, at the session's channel count with explicit positions — + // PipeWire's channel-mixer up/downmixes the sink monitor to this layout. let mut info = AudioInfoRaw::new(); info.set_format(AudioFormat::F32LE); info.set_rate(SAMPLE_RATE); - info.set_channels(CHANNELS as u32); + info.set_channels(channels); + info.set_position(spa_positions(channels)); let obj = pw::spa::pod::Object { type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), id: pw::spa::param::ParamType::EnumFormat.as_raw(), @@ -168,5 +240,6 @@ fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { .context("pw audio stream connect")?; mainloop.run(); + tracing::debug!("pipewire audio loop exited (capturer dropped)"); Ok(()) } diff --git a/crates/punktfunk-host/src/gamestream/audio.rs b/crates/punktfunk-host/src/gamestream/audio.rs index b408562..09e2d7f 100644 --- a/crates/punktfunk-host/src/gamestream/audio.rs +++ b/crates/punktfunk-host/src/gamestream/audio.rs @@ -1,47 +1,276 @@ //! The audio data plane (UDP 48000). On RTSP PLAY we learn the client's audio endpoint from -//! its port-learning ping, capture the default-sink monitor, Opus-encode 5 ms stereo frames, -//! and send each as a GameStream RTP audio packet. +//! its port-learning ping, capture the default-sink monitor at the negotiated channel count, +//! Opus-encode fixed frames (stereo: plain Opus; 5.1/7.1: libopus *multistream*), and send +//! each as a GameStream RTP audio packet. //! -//! Wire format (moonlight-common-c `AudioStream.c`): a 12-byte big-endian `RTP_PACKET` -//! (`packetType = 97`, `sequenceNumber++`, `timestamp += packetDuration`, `ssrc = 0`) -//! followed by the AES-128-CBC-encrypted Opus payload. Stereo Opus is a single coupled -//! multistream, so a plain `opus_encode` bitstream is what the client's multistream decoder -//! expects. Like the control stream, modern Moonlight always AES-CBC-decrypts audio (it -//! reports "Failed to decrypt audio packet" on plaintext), so we encrypt the payload under the -//! `/launch` `rikey` with a per-packet IV `BE32(rikeyid + seq)` (PKCS7 padding, RTP header -//! left in the clear). Reed-Solomon audio FEC is layered on top in P1.5. +//! Wire format (moonlight-common-c `AudioStream.c`/`RtpAudioQueue.c`, verified verbatim +//! 2026-06-10): a 12-byte big-endian `RTP_PACKET` (`packetType = 97`, `sequenceNumber++`, +//! `timestamp += packetDuration`, `ssrc = 0`) followed by the AES-128-CBC-encrypted Opus +//! payload. Like the control stream, modern Moonlight always AES-CBC-decrypts audio (it +//! reports "Failed to decrypt audio packet" on plaintext), so we encrypt the payload under +//! the `/launch` `rikey` with a per-packet IV `BE32(rikeyid + seq)` (PKCS7 padding, RTP +//! header left in the clear). +//! +//! Surround sessions additionally carry Sunshine-style audio FEC: every aligned block of 4 +//! data packets is followed by 2 Reed–Solomon parity packets (`packetType = 127`, an +//! `AUDIO_FEC_HEADER` after the RTP header). FEC is opportunistic on the client — in-order +//! data packets are consumed immediately and missing parity only costs loss recovery — so +//! the validated stereo path stays byte-identical (data packets only, exactly as before). -use super::AUDIO_PORT; -use crate::audio::{self, AudioCapturer, CHANNELS, SAMPLE_RATE}; -use anyhow::{Context, Result}; -use cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit}; -use opus::{Application, Bitrate, Channels, Encoder}; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use crate::audio::SAMPLE_RATE; +#[cfg(target_os = "linux")] +use { + super::AUDIO_PORT, + crate::audio::{self, AudioCapturer}, + anyhow::{Context, Result}, + cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit}, + std::net::UdpSocket, + std::sync::atomic::{AtomicBool, Ordering}, + std::sync::Arc, + std::time::{Duration, Instant}, +}; +#[cfg(target_os = "linux")] type Aes128CbcEnc = cbc::Encryptor; -/// Opus frame duration; 5 ms is moonlight's default (`x-nv-aqos.packetDuration`). -const FRAME_MS: usize = 5; -/// Samples per channel per Opus frame (48 kHz · 5 ms = 240). -const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; -/// RTP payload type for audio (moonlight `AudioStream.c` checks `packetType == 97`). +/// RTP payload types (moonlight-common-c `RtpAudioQueue.c`: `RTP_PAYLOAD_TYPE_AUDIO 97`, +/// `RTP_PAYLOAD_TYPE_FEC 127`). const AUDIO_PACKET_TYPE: u8 = 97; +const AUDIO_FEC_PACKET_TYPE: u8 = 127; +/// Stereo Opus bitrate (unchanged from the live-validated stereo path). const OPUS_BITRATE: i32 = 128_000; -/// Slot for the persistent audio capturer, reused across streams (no leaked PipeWire thread). +/// Audio FEC geometry (moonlight-common-c `RtpAudioQueue.h`: `RTPA_DATA_SHARDS 4`, +/// `RTPA_FEC_SHARDS 2`). Blocks are aligned: the client synthesizes the block base as +/// `(seq / 4) * 4`, so parity always covers data seqs `[base, base+4)`. +const FEC_DATA_SHARDS: usize = 4; +const FEC_PARITY_SHARDS: usize = 2; +/// The RS(4,2) parity rows both ends hardcode (Sunshine `stream.cpp` and moonlight-common-c +/// `RtpAudioQueue.c` patch their nanors context with these same bytes — the OpenFEC matrix +/// NVIDIA used, NOT nanors' own Cauchy matrix). nanors `reed_solomon_encode` (gemm over the +/// row-major `m×k` matrix, GF(2⁸) poly 0x11d) computes +/// `parity[j] = XOR_i gfmul(M[j][i], data[i])` — replicated in [`audio_parity`]. +const FEC_MATRIX: [[u8; FEC_DATA_SHARDS]; FEC_PARITY_SHARDS] = + [[0x77, 0x40, 0x38, 0x0e], [0xc7, 0xa7, 0x0d, 0x6c]]; + +/// Per-session audio parameters from the RTSP ANNOUNCE (`x-nv-audio.surround.numChannels`, +/// `x-nv-audio.surround.AudioQuality`, `x-nv-aqos.packetDuration` — the attribute set +/// moonlight-common-c `SdpGenerator.c` emits). Defaults match Moonlight's: stereo, normal +/// quality, 5 ms. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct AudioParams { + /// Negotiated channel count: 2, 6 (5.1) or 8 (7.1). + pub channels: u8, + /// `AudioQuality == 1` — uncoupled high-bitrate multistream (client opted in, only + /// offered when our DESCRIBE advertises a second surround-params line). + pub high_quality: bool, + /// Opus frame duration in ms; Moonlight sends 5 (default) or 10 (slow decoder/link). + pub packet_duration_ms: u8, +} + +impl Default for AudioParams { + fn default() -> Self { + AudioParams { + channels: 2, + high_quality: false, + packet_duration_ms: 5, + } + } +} + +/// One Opus (multi)stream layout. Channel order is the GameStream/Moonlight order +/// FL FR FC LFE RL RR [SL SR]; `mapping` is the libopus multistream mapping we *encode* +/// with — identical to Sunshine's `audio.cpp stream_configs` (verified verbatim 2026-06-10): +/// identity mapping, so normal quality couples (FL,FR) and (FC,LFE) [+ (RL,RR) on 7.1] with +/// the remaining channels as mono streams; high quality is one mono stream per channel. +/// Bitrates are Sunshine's per-config values (stereo keeps punktfunk's existing 128 kbps). +pub struct OpusLayout { + pub channels: u8, + pub streams: u8, + pub coupled: u8, + pub mapping: &'static [u8], + pub bitrate: i32, +} + +pub const LAYOUT_STEREO: OpusLayout = OpusLayout { + channels: 2, + streams: 1, + coupled: 1, + mapping: &[0, 1], + bitrate: OPUS_BITRATE, +}; +pub const LAYOUT_51: OpusLayout = OpusLayout { + channels: 6, + streams: 4, + coupled: 2, + mapping: &[0, 1, 2, 3, 4, 5], + bitrate: 256_000, +}; +pub const LAYOUT_51_HQ: OpusLayout = OpusLayout { + channels: 6, + streams: 6, + coupled: 0, + mapping: &[0, 1, 2, 3, 4, 5], + bitrate: 1_536_000, +}; +pub const LAYOUT_71: OpusLayout = OpusLayout { + channels: 8, + streams: 5, + coupled: 3, + mapping: &[0, 1, 2, 3, 4, 5, 6, 7], + bitrate: 450_000, +}; +pub const LAYOUT_71_HQ: OpusLayout = OpusLayout { + channels: 8, + streams: 8, + coupled: 0, + mapping: &[0, 1, 2, 3, 4, 5, 6, 7], + bitrate: 2_048_000, +}; + +/// Pick the encoder layout for the negotiated session parameters. Unknown channel counts +/// fall back to stereo (the client can only request 2/6/8 — `AUDIO_CONFIGURATION_*` in +/// Limelight.h). +pub fn layout_for(params: &AudioParams) -> &'static OpusLayout { + match (params.channels, params.high_quality) { + (6, false) => &LAYOUT_51, + (6, true) => &LAYOUT_51_HQ, + (8, false) => &LAYOUT_71, + (8, true) => &LAYOUT_71_HQ, + _ => &LAYOUT_STEREO, + } +} + +/// The `a=fmtp:97 surround-params=` digit string for a layout: channelCount, streams, +/// coupledStreams, then one mapping digit per channel. +/// +/// moonlight-common-c (`RtspConnection.c parseOpusConfigurations`, verified verbatim +/// 2026-06-10) post-processes the NORMAL-quality mapping it parses — GFE advertised the +/// FL FR C RL RR SL SR LFE channel order, the client wants FL FR C LFE RL RR SL SR — by +/// moving the *last* digit to index 3 and sliding the rest up. We therefore pre-rotate the +/// encoder mapping (`adv[3..ch-1] = enc[4..ch]`, `adv[ch-1] = enc[3]`) so the client's +/// post-swap decoder mapping equals our encoder mapping exactly. The HIGH-quality string +/// (the second surround-params match for a channel count) is used verbatim — no swap. +/// +/// NB: Sunshine pre-rotates only indices `[3, 6)` (`audio::MAX_STREAM_CONFIG`, a config +/// count, not a channel index), which leaves 7.1 LFE/SL/SR scrambled after the client's +/// swap; we rotate over `[3, channels)` so 7.1 round-trips correctly. +pub fn surround_params(layout: &OpusLayout, high_quality: bool) -> String { + let ch = layout.channels as usize; + let mut mapping = layout.mapping.to_vec(); + if !high_quality && ch > 2 { + mapping[3..ch - 1].copy_from_slice(&layout.mapping[4..ch]); + mapping[ch - 1] = layout.mapping[3]; + } + let mut s = format!("{}{}{}", layout.channels, layout.streams, layout.coupled); + for m in mapping { + s.push((b'0' + m) as char); + } + s +} + +/// GF(2⁸) multiply, reduction poly 0x11d (the nanors/oblas field both wire ends use). +fn gf_mul(mut a: u8, mut b: u8) -> u8 { + let mut p = 0u8; + for _ in 0..8 { + if b & 1 != 0 { + p ^= a; + } + let hi = a & 0x80; + a <<= 1; + if hi != 0 { + a ^= 0x1d; + } + b >>= 1; + } + p +} + +/// RS(4,2) parity over one aligned block of 4 encrypted payloads, exactly as nanors +/// `reed_solomon_encode` with the patched [`FEC_MATRIX`] computes it. Returns `None` if the +/// shard lengths differ — impossible under hard-CBR Opus + PKCS7 of equal input, but FEC is +/// opportunistic so skipping a block is always safe (the client just loses recovery for it). +fn audio_parity(data: &[Vec]) -> Option>> { + debug_assert_eq!(data.len(), FEC_DATA_SHARDS); + let len = data[0].len(); + if data.iter().any(|d| d.len() != len) { + return None; + } + let mut parity = vec![vec![0u8; len]; FEC_PARITY_SHARDS]; + for (j, row) in FEC_MATRIX.iter().enumerate() { + for (i, shard) in data.iter().enumerate() { + let coef = row[i]; + for (p, &d) in parity[j].iter_mut().zip(shard.iter()) { + *p ^= gf_mul(coef, d); + } + } + } + Some(parity) +} + +/// Build a GameStream RTP audio data packet: 12-byte BE `RTP_PACKET` header + Opus payload. +fn build_rtp(seq: u16, timestamp: u32, opus: &[u8]) -> Vec { + let mut p = Vec::with_capacity(12 + opus.len()); + p.push(0x80); // RTP version 2, no padding/extension/CSRC + p.push(AUDIO_PACKET_TYPE); + p.extend_from_slice(&seq.to_be_bytes()); + p.extend_from_slice(×tamp.to_be_bytes()); + p.extend_from_slice(&0u32.to_be_bytes()); // ssrc + p.extend_from_slice(opus); + p +} + +/// Build a GameStream audio FEC packet: 12-byte RTP header (`packetType = 127`, +/// `timestamp = 0`, like Sunshine) + 12-byte `AUDIO_FEC_HEADER { fecShardIndex u8, +/// payloadType = 97, baseSequenceNumber BE u16, baseTimestamp BE u32, ssrc = 0 }` +/// (moonlight-common-c `RtpAudioQueue.h`) + the parity bytes. +fn build_fec_rtp( + rtp_seq: u16, + shard_index: u8, + base_seq: u16, + base_ts: u32, + parity: &[u8], +) -> Vec { + let mut p = Vec::with_capacity(24 + parity.len()); + p.push(0x80); + p.push(AUDIO_FEC_PACKET_TYPE); + p.extend_from_slice(&rtp_seq.to_be_bytes()); + p.extend_from_slice(&0u32.to_be_bytes()); // timestamp (Sunshine leaves it 0) + p.extend_from_slice(&0u32.to_be_bytes()); // ssrc + p.push(shard_index); + p.push(AUDIO_PACKET_TYPE); // fecHeader.payloadType — stamped onto recovered packets + p.extend_from_slice(&base_seq.to_be_bytes()); + p.extend_from_slice(&base_ts.to_be_bytes()); + p.extend_from_slice(&0u32.to_be_bytes()); // fecHeader.ssrc + p.extend_from_slice(parity); + p +} + +/// Slot for the persistent audio capturer, reused across streams (no leaked PipeWire +/// thread). A surround session that needs a different channel count drops the cached +/// capturer (clean RAII teardown) and opens a fresh one. +#[cfg(target_os = "linux")] pub type AudioCapSlot = Arc>>>; +#[cfg(not(target_os = "linux"))] +pub type AudioCapSlot = + std::sync::Arc>>>; /// Spawn the audio stream thread (idempotent via `running`). Stops when `running` clears. -/// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption. -pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32, audio_cap: AudioCapSlot) { +/// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption; +/// `params` is the negotiated [`AudioParams`] from the RTSP ANNOUNCE. +#[cfg(target_os = "linux")] +pub fn start( + running: Arc, + gcm_key: [u8; 16], + rikeyid: i32, + params: AudioParams, + audio_cap: AudioCapSlot, +) { let _ = std::thread::Builder::new() .name("punktfunk-audio".into()) .spawn(move || { - tracing::info!("audio stream starting"); - if let Err(e) = run(&running, &gcm_key, rikeyid, &audio_cap) { + tracing::info!(?params, "audio stream starting"); + if let Err(e) = run(&running, &gcm_key, rikeyid, params, &audio_cap) { tracing::error!(error = %format!("{e:#}"), "audio stream failed"); } running.store(false, Ordering::SeqCst); @@ -49,10 +278,27 @@ pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32, audio_ca }); } +/// Stub — the audio plane needs Linux (PipeWire capture + libopus); this keeps non-Linux +/// dev builds compiling (crate doc: "the crate compiles everywhere"). Reports failure the +/// same way the real stream thread does: by clearing `running`. +#[cfg(not(target_os = "linux"))] +pub fn start( + running: std::sync::Arc, + _gcm_key: [u8; 16], + _rikeyid: i32, + _params: AudioParams, + _audio_cap: AudioCapSlot, +) { + tracing::error!("GameStream audio requires Linux (PipeWire + libopus)"); + running.store(false, std::sync::atomic::Ordering::SeqCst); +} + +#[cfg(target_os = "linux")] fn run( running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32, + params: AudioParams, audio_cap: &std::sync::Mutex>>, ) -> Result<()> { let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?; @@ -67,42 +313,176 @@ fn run( .context("connect client audio endpoint")?; tracing::info!(%client, "audio: client endpoint learned"); - // Reuse the persistent capturer (create on first stream); drain stale buffered audio. + // Reuse the persistent capturer when its channel count still matches (drain stale + // buffered audio); otherwise drop it (clean PipeWire teardown) and open at the new count. + let want = layout_for(¶ms).channels as u32; let mut cap = match audio_cap.lock().unwrap().take() { - Some(mut c) => { + Some(mut c) if c.channels() == want => { c.drain(); c } - None => audio::open_audio_capture().context("open audio capture")?, + Some(c) => { + tracing::info!( + have = c.channels(), + want, + "audio capturer channel count changed — reopening" + ); + drop(c); + audio::open_audio_capture(want).context("open audio capture")? + } + None => audio::open_audio_capture(want).context("open audio capture")?, }; - let result = audio_body(&mut *cap, &sock, gcm_key, rikeyid, running); + let result = audio_body(&mut *cap, &sock, gcm_key, rikeyid, params, running); *audio_cap.lock().unwrap() = Some(cap); result } +/// Opus encoder for one session: the plain stereo encoder (the live-validated path, byte +/// identical) or a libopus multistream encoder for 5.1/7.1. +#[cfg(target_os = "linux")] +enum SessionEncoder { + Stereo(opus::Encoder), + Surround(MsEncoder), +} + +#[cfg(target_os = "linux")] +impl SessionEncoder { + fn new(layout: &'static OpusLayout) -> Result { + if layout.channels == 2 { + // RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte + // constant, which the client asserts per stream. + let mut enc = opus::Encoder::new( + SAMPLE_RATE, + opus::Channels::Stereo, + opus::Application::LowDelay, + ) + .context("create Opus encoder")?; + enc.set_bitrate(opus::Bitrate::Bits(layout.bitrate)).ok(); + enc.set_vbr(false).ok(); + Ok(SessionEncoder::Stereo(enc)) + } else { + Ok(SessionEncoder::Surround(MsEncoder::new(layout)?)) + } + } + + /// Encode one interleaved frame (`samples_per_channel * channels` f32s) into `out`, + /// returning the packet length. + fn encode_float( + &mut self, + frame: &[f32], + samples_per_channel: usize, + out: &mut [u8], + ) -> Result { + match self { + SessionEncoder::Stereo(enc) => enc.encode_float(frame, out).context("opus encode"), + SessionEncoder::Surround(enc) => enc.encode_float(frame, samples_per_channel, out), + } + } +} + +/// RAII wrapper for `OpusMSEncoder` (the safe `opus` crate is stereo-only; the multistream +/// API comes from `audiopus_sys`, the same libopus the crate already links). Configured like +/// the stereo path: RESTRICTED_LOWDELAY, hard CBR, per-layout bitrate. +#[cfg(target_os = "linux")] +struct MsEncoder { + st: std::ptr::NonNull, +} + +// The raw encoder state has no thread affinity; the session owns it on one thread at a time. +#[cfg(target_os = "linux")] +unsafe impl Send for MsEncoder {} + +#[cfg(target_os = "linux")] +impl MsEncoder { + fn new(layout: &OpusLayout) -> Result { + use std::os::raw::c_int; + let mut err: c_int = 0; + let st = unsafe { + audiopus_sys::opus_multistream_encoder_create( + SAMPLE_RATE as i32, + layout.channels as c_int, + layout.streams as c_int, + layout.coupled as c_int, + layout.mapping.as_ptr(), + audiopus_sys::OPUS_APPLICATION_RESTRICTED_LOWDELAY, + &mut err, + ) + }; + let st = std::ptr::NonNull::new(st) + .filter(|_| err == audiopus_sys::OPUS_OK) + .ok_or_else(|| anyhow::anyhow!("opus_multistream_encoder_create failed ({err})"))?; + unsafe { + audiopus_sys::opus_multistream_encoder_ctl( + st.as_ptr(), + audiopus_sys::OPUS_SET_BITRATE_REQUEST, + layout.bitrate as c_int, + ); + audiopus_sys::opus_multistream_encoder_ctl( + st.as_ptr(), + audiopus_sys::OPUS_SET_VBR_REQUEST, + 0 as c_int, // hard CBR (constant packet size — also what audio FEC relies on) + ); + } + Ok(MsEncoder { st }) + } + + fn encode_float( + &mut self, + frame: &[f32], + samples_per_channel: usize, + out: &mut [u8], + ) -> Result { + let n = unsafe { + audiopus_sys::opus_multistream_encode_float( + self.st.as_ptr(), + frame.as_ptr(), + samples_per_channel as std::os::raw::c_int, + out.as_mut_ptr(), + out.len() as i32, + ) + }; + anyhow::ensure!(n > 0, "opus_multistream_encode_float failed ({n})"); + Ok(n as usize) + } +} + +#[cfg(target_os = "linux")] +impl Drop for MsEncoder { + fn drop(&mut self) { + unsafe { audiopus_sys::opus_multistream_encoder_destroy(self.st.as_ptr()) } + } +} + +#[cfg(target_os = "linux")] fn audio_body( cap: &mut dyn AudioCapturer, sock: &UdpSocket, gcm_key: &[u8; 16], rikeyid: i32, + params: AudioParams, running: &AtomicBool, ) -> Result<()> { - // RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte constant, - // which the client asserts per stream. - let mut enc = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::LowDelay) - .context("create Opus encoder")?; - enc.set_bitrate(Bitrate::Bits(OPUS_BITRATE)).ok(); - enc.set_vbr(false).ok(); - - let frame_len = SAMPLES_PER_FRAME * CHANNELS; // interleaved samples per Opus frame + let layout = layout_for(¶ms); + let mut enc = SessionEncoder::new(layout)?; + // Opus frame duration; Moonlight negotiates 5 ms (default) or 10 ms via + // `x-nv-aqos.packetDuration` and sizes its decoder at `48 * duration` samples. + let frame_ms = params.packet_duration_ms.clamp(5, 10) as usize; + let samples_per_channel = SAMPLE_RATE as usize * frame_ms / 1000; + let frame_len = samples_per_channel * layout.channels as usize; // interleaved samples let mut acc: Vec = Vec::with_capacity(frame_len * 4); let mut out = vec![0u8; 1400]; let mut seq: u16 = 0; let mut timestamp: u32 = 0; let mut sent: u64 = 0; + // Surround sessions carry RS(4,2) FEC; the stereo wire stays exactly as validated. + let fec = layout.channels > 2; + let mut fec_block: Vec> = Vec::with_capacity(FEC_DATA_SHARDS); + let (mut fec_base_seq, mut fec_base_ts) = (0u16, 0u32); + let mut fec_skipped = false; // Pacing anchor: PipeWire hands us large capture buffers (~1024 frames), so we'd otherwise // emit packets in bursts the client's low-latency jitter buffer hears as glitching. Emit - // each frame at its 5 ms slot instead. Production is real-time, so the backlog stays small. + // each frame at its packet-duration slot instead. Production is real-time, so the backlog + // stays small. let start = Instant::now(); let mut frame_no: u64 = 0; // Optional linear gain for quiet capture sources (PUNKTFUNK_AUDIO_GAIN, default 1.0). @@ -110,6 +490,15 @@ fn audio_body( .ok() .and_then(|v| v.parse().ok()) .unwrap_or(1.0); + tracing::info!( + channels = layout.channels, + streams = layout.streams, + coupled = layout.coupled, + bitrate = layout.bitrate, + frame_ms, + fec, + "audio: encoder configured" + ); while running.load(Ordering::SeqCst) { let chunk = cap.next_chunk().context("capture audio chunk")?; @@ -121,7 +510,7 @@ fn audio_body( *s = (*s * gain).clamp(-1.0, 1.0); } } - let n = enc.encode_float(&frame, &mut out).context("opus encode")?; + let n = enc.encode_float(&frame, samples_per_channel, &mut out)?; // AES-128-CBC the Opus payload (RTP header stays plaintext). Per-packet IV = // BE32(rikeyid + seq) in [0..4], zero elsewhere; PKCS7 padding. let iv_seq = (rikeyid as u32).wrapping_add(seq as u32); @@ -134,17 +523,51 @@ fn audio_body( tracing::info!(sent, "audio: client unreachable — stopping"); return Ok(()); } + // Surround FEC: accumulate the encrypted payloads of the aligned 4-packet block; + // close the block with 2 parity packets (RTP seqs continue past the block, like + // Sunshine — the client places parity by the FEC header, not the RTP seq). + if fec { + if seq % FEC_DATA_SHARDS as u16 == 0 { + fec_block.clear(); + fec_base_seq = seq; + fec_base_ts = timestamp; + } + fec_block.push(ct); + if fec_block.len() == FEC_DATA_SHARDS { + match audio_parity(&fec_block) { + Some(parity) => { + for (x, par) in parity.iter().enumerate() { + let rtp_seq = + fec_base_seq.wrapping_add((FEC_DATA_SHARDS + x) as u16); + let fp = + build_fec_rtp(rtp_seq, x as u8, fec_base_seq, fec_base_ts, par); + if sock.send(&fp).is_err() { + tracing::info!(sent, "audio: client unreachable — stopping"); + return Ok(()); + } + } + } + None if !fec_skipped => { + // Shouldn't happen under hard CBR; log once and keep streaming. + tracing::warn!("audio: unequal packet sizes — FEC block skipped"); + fec_skipped = true; + } + None => {} + } + fec_block.clear(); + } + } seq = seq.wrapping_add(1); // GameStream's audio RTP timestamp ticks by packetDuration (ms), not by samples. - timestamp = timestamp.wrapping_add(FRAME_MS as u32); + timestamp = timestamp.wrapping_add(frame_ms as u32); sent += 1; if sent % 400 == 0 { tracing::info!(sent, "audio: streaming"); } - // Hold each frame to its 5 ms slot (skip if we've fallen behind a burst). + // Hold each frame to its packet-duration slot (skip if we've fallen behind a burst). frame_no += 1; - let scheduled = start + Duration::from_millis(5 * frame_no); + let scheduled = start + Duration::from_millis(frame_ms as u64 * frame_no); let now = Instant::now(); if scheduled > now { std::thread::sleep((scheduled - now).min(Duration::from_millis(20))); @@ -154,18 +577,6 @@ fn audio_body( Ok(()) } -/// Build a GameStream RTP audio packet: 12-byte BE `RTP_PACKET` header + Opus payload. -fn build_rtp(seq: u16, timestamp: u32, opus: &[u8]) -> Vec { - let mut p = Vec::with_capacity(12 + opus.len()); - p.push(0x80); // RTP version 2, no padding/extension/CSRC - p.push(AUDIO_PACKET_TYPE); - p.extend_from_slice(&seq.to_be_bytes()); - p.extend_from_slice(×tamp.to_be_bytes()); - p.extend_from_slice(&0u32.to_be_bytes()); // ssrc - p.extend_from_slice(opus); - p -} - #[cfg(test)] mod tests { use super::*; @@ -183,6 +594,261 @@ mod tests { #[test] fn frame_sizing() { - assert_eq!(SAMPLES_PER_FRAME, 240); + // 48 kHz · 5 ms = 240 samples/channel (the validated stereo default). + assert_eq!(SAMPLE_RATE as usize * 5 / 1000, 240); + assert_eq!(SAMPLE_RATE as usize * 10 / 1000, 480); + } + + /// FEC datagram layout: RTP(12, packetType 127) + AUDIO_FEC_HEADER(12) + parity. + #[test] + fn fec_packet_layout() { + let p = build_fec_rtp(0x1234, 1, 0x1230, 0xAABBCCDD, &[0xEE, 0xFF]); + assert_eq!(p[0], 0x80); + assert_eq!(p[1], 127); // RTP_PAYLOAD_TYPE_FEC + assert_eq!(&p[2..4], &[0x12, 0x34]); // RTP seq BE + assert_eq!(&p[4..8], &[0, 0, 0, 0]); // RTP timestamp (Sunshine: 0) + assert_eq!(&p[8..12], &[0, 0, 0, 0]); // RTP ssrc + assert_eq!(p[12], 1); // fecShardIndex + assert_eq!(p[13], 97); // fecHeader.payloadType (stamped on recovered packets) + assert_eq!(&p[14..16], &[0x12, 0x30]); // baseSequenceNumber BE + assert_eq!(&p[16..20], &[0xAA, 0xBB, 0xCC, 0xDD]); // baseTimestamp BE + assert_eq!(&p[20..24], &[0, 0, 0, 0]); // fecHeader.ssrc + assert_eq!(&p[24..], &[0xEE, 0xFF]); // parity payload + } + + /// The advertised surround-params strings, locked: any change breaks what stock + /// Moonlight clients derive their decoder mapping from. + #[test] + fn surround_params_strings() { + assert_eq!(surround_params(&LAYOUT_STEREO, false), "21101"); + assert_eq!(surround_params(&LAYOUT_51, false), "642012453"); + assert_eq!(surround_params(&LAYOUT_51_HQ, true), "660012345"); + assert_eq!(surround_params(&LAYOUT_71, false), "85301245673"); + assert_eq!(surround_params(&LAYOUT_71_HQ, true), "88001234567"); + } + + /// moonlight-common-c's normal-quality mapping swap (RtspConnection.c: `mapping[3] = + /// old[ch-1]; mapping[4..] = old[3..ch-1]`), replicated byte-for-byte. + fn client_swap(adv: &[u8]) -> Vec { + let ch = adv.len(); + let mut m = adv.to_vec(); + m[3] = adv[ch - 1]; + m[4..ch].copy_from_slice(&adv[3..ch - 1]); + m + } + + /// Protocol math, end to end: the mapping a stock client computes from our advertised + /// surround-params must equal the mapping we encode with — for the normal-quality + /// layouts after the client's GFE-order swap, for HQ layouts verbatim. + #[test] + fn client_derived_mapping_matches_encoder() { + for (layout, hq) in [ + (&LAYOUT_51, false), + (&LAYOUT_51_HQ, true), + (&LAYOUT_71, false), + (&LAYOUT_71_HQ, true), + ] { + let s = surround_params(layout, hq); + let digits: Vec = s.bytes().map(|b| b - b'0').collect(); + assert_eq!(digits[0], layout.channels); + assert_eq!(digits[1], layout.streams); + assert_eq!(digits[2], layout.coupled); + let adv = &digits[3..]; + let client = if hq { adv.to_vec() } else { client_swap(adv) }; + assert_eq!( + client, layout.mapping, + "layout {}ch hq={hq}", + layout.channels + ); + } + } + + /// GF(2⁸) inverse by brute force (test-only). + fn gf_inv(a: u8) -> u8 { + (1..=255u8).find(|&b| gf_mul(a, b) == 1).unwrap() + } + + /// Self-consistency of the RS(4,2) code: erase any 2 data shards, solve the remaining + /// 2×2 system over GF(2⁸) with the same matrix, and recover the originals. (Matrix bytes + /// and gemm semantics are from moonlight-common-c `RtpAudioQueue.c` and `nanors/rs.c`; + /// this proves our parity is consistent with that generator matrix.) + #[test] + fn fec_parity_recovers_two_losses() { + let data: Vec> = vec![ + vec![0x11, 0x22, 0x33], + vec![0x44, 0x55, 0x66], + vec![0x77, 0x88, 0x99], + vec![0xaa, 0xbb, 0xcc], + ]; + let parity = audio_parity(&data).unwrap(); + for (e0, e1) in [(0usize, 1usize), (1, 3), (0, 3), (2, 3)] { + // parity[j] = sum_i M[j][i] d[i] → with d[e0], d[e1] unknown: + // M[j][e0]·x + M[j][e1]·y = parity[j] ^ sum_{i∉{e0,e1}} M[j][i]·d[i] + let mut rhs = [parity[0].clone(), parity[1].clone()]; + for j in 0..2 { + for (i, d) in data.iter().enumerate() { + if i != e0 && i != e1 { + for (r, &b) in rhs[j].iter_mut().zip(d.iter()) { + *r ^= gf_mul(FEC_MATRIX[j][i], b); + } + } + } + } + // Cramer over GF(2⁸): det = a·d ^ b·c (addition = XOR). + let (a, b) = (FEC_MATRIX[0][e0], FEC_MATRIX[0][e1]); + let (c, d) = (FEC_MATRIX[1][e0], FEC_MATRIX[1][e1]); + let det = gf_mul(a, d) ^ gf_mul(b, c); + assert_ne!(det, 0, "erasures {e0},{e1} must be solvable"); + let det_inv = gf_inv(det); + for k in 0..data[0].len() { + let (r0, r1) = (rhs[0][k], rhs[1][k]); + let x = gf_mul(det_inv, gf_mul(d, r0) ^ gf_mul(b, r1)); + let y = gf_mul(det_inv, gf_mul(c, r0) ^ gf_mul(a, r1)); + assert_eq!(x, data[e0][k], "shard {e0} byte {k}"); + assert_eq!(y, data[e1][k], "shard {e1} byte {k}"); + } + } + } + + /// Unequal shard sizes must skip the block, never panic or emit bogus parity. + #[test] + fn fec_parity_rejects_unequal_shards() { + let data = vec![vec![0u8; 10], vec![0u8; 10], vec![0u8; 9], vec![0u8; 10]]; + assert!(audio_parity(&data).is_none()); + } + + /// Real-codec proof of the 5.1 mapping math: encode with our encoder layout, decode with + /// the mapping a stock Moonlight client derives from our advertised surround-params + /// (parse → GFE swap), and verify a tone fed into each input channel comes out on the + /// same output channel. + #[cfg(target_os = "linux")] + #[test] + fn multistream_51_roundtrip_channel_identity() { + let layout = &LAYOUT_51; + let samples = 240; // 5 ms + let ch = layout.channels as usize; + + // Client-side decoder mapping derived exactly as moonlight-common-c does. + let s = surround_params(layout, false); + let digits: Vec = s.bytes().map(|b| b - b'0').collect(); + let client_mapping = client_swap(&digits[3..]); + + let mut err = 0i32; + let dec = unsafe { + audiopus_sys::opus_multistream_decoder_create( + SAMPLE_RATE as i32, + ch as i32, + layout.streams as i32, + layout.coupled as i32, + client_mapping.as_ptr(), + &mut err, + ) + }; + assert_eq!(err, audiopus_sys::OPUS_OK); + assert!(!dec.is_null()); + + for tone_ch in 0..ch { + let mut enc = MsEncoder::new(layout).unwrap(); + let mut out = vec![0u8; 1400]; + let mut decoded = vec![0f32; samples * ch]; + let mut energy = vec![0f64; ch]; + // A few frames so the codec converges past its startup transient. + for f in 0..8 { + let mut frame = vec![0f32; samples * ch]; + for t in 0..samples { + let phase = (f * samples + t) as f32 * 440.0 * 2.0 * std::f32::consts::PI + / SAMPLE_RATE as f32; + frame[t * ch + tone_ch] = 0.5 * phase.sin(); + } + let n = enc.encode_float(&frame, samples, &mut out).unwrap(); + assert!(n > 0); + let got = unsafe { + audiopus_sys::opus_multistream_decode_float( + dec, + out.as_ptr(), + n as i32, + decoded.as_mut_ptr(), + samples as i32, + 0, + ) + }; + assert_eq!(got as usize, samples); + if f >= 4 { + for t in 0..samples { + for c in 0..ch { + energy[c] += (decoded[t * ch + c] as f64).powi(2); + } + } + } + } + let loudest = (0..ch) + .max_by(|&a, &b| energy[a].total_cmp(&energy[b])) + .unwrap(); + assert_eq!( + loudest, tone_ch, + "tone in input channel {tone_ch} must come out on output channel {tone_ch} \ + (energies: {energy:?})" + ); + } + unsafe { audiopus_sys::opus_multistream_decoder_destroy(dec) }; + } + + /// Live 5.1 capture → multistream encode → decode, against a real PipeWire session. + /// Manual validation (needs `pactl load-module module-null-sink sink_name=pf51 + /// channels=6 rate=48000` as the default sink): + /// cargo test -p punktfunk-host --lib -- --ignored surround_capture + #[cfg(target_os = "linux")] + #[test] + #[ignore] + fn surround_capture_live() { + let mut cap = crate::audio::open_audio_capture(6).expect("open 6ch capture"); + let layout = &LAYOUT_51; + let mut enc = MsEncoder::new(layout).unwrap(); + let mut out = vec![0u8; 1400]; + let mut acc: Vec = Vec::new(); + let frame_len = 240 * 6; + let mut packets = 0; + let mut sizes = std::collections::BTreeSet::new(); + while packets < 100 { + let chunk = cap.next_chunk().expect("capture chunk"); + acc.extend_from_slice(&chunk); + while acc.len() >= frame_len && packets < 100 { + let frame: Vec = acc.drain(..frame_len).collect(); + let n = enc.encode_float(&frame, 240, &mut out).unwrap(); + sizes.insert(n); + packets += 1; + } + } + // Hard CBR: every multistream packet must be the same size (audio FEC relies on it). + assert_eq!(sizes.len(), 1, "CBR sizes: {sizes:?}"); + // And a stock client's decoder must accept them. + let s = surround_params(layout, false); + let digits: Vec = s.bytes().map(|b| b - b'0').collect(); + let client_mapping = client_swap(&digits[3..]); + let mut err = 0i32; + let dec = unsafe { + audiopus_sys::opus_multistream_decoder_create( + 48000, + 6, + layout.streams as i32, + layout.coupled as i32, + client_mapping.as_ptr(), + &mut err, + ) + }; + assert_eq!(err, audiopus_sys::OPUS_OK); + let mut pcm = vec![0f32; 240 * 6]; + let got = unsafe { + audiopus_sys::opus_multistream_decode_float( + dec, + out.as_ptr(), + *sizes.first().unwrap() as i32, + pcm.as_mut_ptr(), + 240, + 0, + ) + }; + unsafe { audiopus_sys::opus_multistream_decoder_destroy(dec) }; + assert_eq!(got, 240); } } diff --git a/crates/punktfunk-host/src/gamestream/mod.rs b/crates/punktfunk-host/src/gamestream/mod.rs index 3094b59..6db65e9 100644 --- a/crates/punktfunk-host/src/gamestream/mod.rs +++ b/crates/punktfunk-host/src/gamestream/mod.rs @@ -7,26 +7,9 @@ //! the media streams follow (see the M2 task list / plan). pub mod apps; -#[cfg(target_os = "linux")] +// Platform-neutral wire/negotiation logic + the Linux capture/encode pipeline (non-Linux +// builds get a stub `start` inside the module). mod audio; -/// Stub — the audio plane needs Linux (PipeWire capture + libopus); this keeps non-Linux -/// dev builds compiling (crate doc: "the crate compiles everywhere"). Reports failure the -/// same way the real stream thread does: by clearing `running`. -#[cfg(not(target_os = "linux"))] -mod audio { - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, Mutex}; - - pub fn start( - running: Arc, - _gcm_key: [u8; 16], - _rikeyid: i32, - _audio_cap: Arc>>>, - ) { - tracing::error!("GameStream audio requires Linux (PipeWire + libopus)"); - running.store(false, Ordering::SeqCst); - } -} pub(crate) mod cert; mod control; mod crypto; @@ -57,8 +40,20 @@ pub const AUDIO_PORT: u16 = 48000; /// Advertised host version. Major ≥ 7 tells Moonlight to use SHA-256 for pairing. pub const APP_VERSION: &str = "7.1.431.-1"; pub const GFE_VERSION: &str = "3.23.0.74"; -/// Codec support bitmask: 3=H264, 259=+HEVC, 3843=+AV1 (we encode HEVC/H264/AV1 via NVENC). -pub const SERVER_CODEC_MODE_SUPPORT: u32 = 3843; +/// `ServerCodecModeSupport` flags, from moonlight-common-c `src/Limelight.h` (verified +/// against master, 2026-06-10): SCM_H264 0x1, SCM_HEVC 0x100, SCM_HEVC_MAIN10 0x200, +/// SCM_AV1_MAIN8 0x10000, SCM_AV1_MAIN10 0x20000 (+ 4:4:4 Sunshine extensions we don't do). +pub const SCM_H264: u32 = 0x0000_0001; +pub const SCM_HEVC: u32 = 0x0000_0100; +pub const SCM_HEVC_MAIN10: u32 = 0x0000_0200; +pub const SCM_AV1_MAIN8: u32 = 0x0001_0000; +pub const SCM_AV1_MAIN10: u32 = 0x0002_0000; +/// What we actually encode via NVENC: H.264, HEVC Main, AV1 Main 8-bit (= 65793). The +/// 10-bit flags are deliberately NOT advertised: Moonlight only selects Main10 profiles for +/// HDR streaming, and our capture path is 8-bit SDR BGRx with no HDR metadata plumbing — +/// advertising them would let clients enable an HDR mode we can't deliver. (The previous +/// placeholder 3843 = 0xF03 wrongly claimed HEVC Main10 + 4:4:4 and *no* AV1.) +pub const SERVER_CODEC_MODE_SUPPORT: u32 = SCM_H264 | SCM_HEVC | SCM_AV1_MAIN8; /// Stable host identity + advertised capabilities, shared across control-plane handlers. pub struct Host { @@ -108,6 +103,9 @@ pub struct AppState { pub launch: std::sync::Mutex>, /// Negotiated video config from RTSP ANNOUNCE (consumed by the stream on PLAY). pub stream: std::sync::Mutex>, + /// Negotiated audio parameters from RTSP ANNOUNCE (channels/quality/packet duration); + /// defaults to stereo when a client never ANNOUNCEs them. + pub audio_params: std::sync::Mutex, /// True while the video stream thread is running (also its keep-running flag). pub streaming: std::sync::Arc, /// True while the audio stream thread is running (also its keep-running flag). @@ -119,8 +117,9 @@ pub struct AppState { /// (conflicting) screencast session. The video thread borrows it for the stream's duration /// and returns it; `set_active` gates its cost while idle. pub video_cap: std::sync::Arc>>>, - /// Persistent audio capturer, reused across streams (avoids leaking a PipeWire capture - /// thread per reconnect); drained on reuse so no stale audio is sent. + /// Persistent audio capturer, reused across streams when the channel count still matches + /// (avoids a PipeWire stream setup per reconnect); drained on reuse so no stale audio is + /// sent, dropped + reopened when a session negotiates a different channel count. pub audio_cap: std::sync::Arc>>>, } @@ -135,6 +134,7 @@ impl AppState { paired: std::sync::Mutex::new(load_paired()), launch: std::sync::Mutex::new(None), stream: std::sync::Mutex::new(None), + audio_params: std::sync::Mutex::new(audio::AudioParams::default()), streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), diff --git a/crates/punktfunk-host/src/gamestream/rtsp.rs b/crates/punktfunk-host/src/gamestream/rtsp.rs index 8808c4f..6322317 100644 --- a/crates/punktfunk-host/src/gamestream/rtsp.rs +++ b/crates/punktfunk-host/src/gamestream/rtsp.rs @@ -160,6 +160,9 @@ fn handle_request(req: &Request, state: &AppState) -> String { } None => tracing::warn!("RTSP ANNOUNCE — missing required video config keys"), } + let ap = audio_params(&map); + tracing::info!(?ap, "RTSP ANNOUNCE — negotiated audio params"); + *state.audio_params.lock().unwrap() = ap; response(&req.cseq, &[], None) } "PLAY" => { @@ -185,8 +188,9 @@ fn handle_request(req: &Request, state: &AppState) -> String { Some(_) => tracing::info!("RTSP PLAY — stream already running"), None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), } - // Audio runs independently (stereo Opus on UDP 48000); it needs the launch key for - // the AES-CBC payload encryption the client expects. + // Audio runs independently (Opus on UDP 48000, stereo or 5.1/7.1 multistream per + // the ANNOUNCE); it needs the launch key for the AES-CBC payload encryption the + // client expects. let launch = *state.launch.lock().unwrap(); if let Some(ls) = launch { if !state.audio_streaming.swap(true, Ordering::SeqCst) { @@ -195,6 +199,7 @@ fn handle_request(req: &Request, state: &AppState) -> String { state.audio_streaming.clone(), ls.gcm_key, ls.rikeyid, + *state.audio_params.lock().unwrap(), state.audio_cap.clone(), ); } @@ -218,19 +223,34 @@ fn handle_request(req: &Request, state: &AppState) -> String { /// (plaintext streams for now; P1.5 adds the negotiated AES paths). fn describe_sdp() -> String { // Line-oriented a=key:value, matching what moonlight-common-c scans for. - [ - "a=x-ss-general.featureFlags:0", - "a=x-ss-general.encryptionSupported:0", - "a=x-ss-general.encryptionRequested:0", - "sprop-parameter-sets=AAAAAU", // HEVC capability indicator - "a=rtpmap:98 AV1/90000", // AV1 capability indicator - // Opus config the client matches by channel count (Sunshine emits one per config): - // surround-params = channelCount, streams, coupledStreams, then the channel mapping. - // The client negotiated stereo, so advertise just that. - "a=fmtp:97 surround-params=21101", // stereo: 2ch, 1 stream, 1 coupled, mapping [0,1] - "", - ] - .join("\r\n") + let mut lines: Vec = vec![ + "a=x-ss-general.featureFlags:0".into(), + "a=x-ss-general.encryptionSupported:0".into(), + "a=x-ss-general.encryptionRequested:0".into(), + "sprop-parameter-sets=AAAAAU".into(), // HEVC capability indicator + "a=rtpmap:98 AV1/90000".into(), // AV1 capability indicator + ]; + // Opus configs, one line per layout (Sunshine's order): the client scans for the FIRST + // `surround-params=` match as its normal-quality decoder config and a + // SECOND match as the high-quality config (which is also what makes it offer HQ at all), + // so normal must precede HQ per channel count. Stereo lines are emitted for parity with + // Sunshine but ignored by 2-channel clients (they hardcode 21101). See + // `audio::surround_params` for the mapping pre-rotation the normal-quality lines carry. + for (layout, hq) in [ + (&audio::LAYOUT_STEREO, false), + (&audio::LAYOUT_STEREO, true), + (&audio::LAYOUT_51, false), + (&audio::LAYOUT_51_HQ, true), + (&audio::LAYOUT_71, false), + (&audio::LAYOUT_71_HQ, true), + ] { + lines.push(format!( + "a=fmtp:97 surround-params={}", + audio::surround_params(layout, hq) + )); + } + lines.push(String::new()); + lines.join("\r\n") } /// Parse an ANNOUNCE SDP body's `a=key:value` lines into a map. @@ -256,11 +276,20 @@ fn stream_config(map: &HashMap) -> Option { .filter(|&f| f > 0) .unwrap_or(60); let bitrate_kbps = parse_u("x-nv-vqos[0].bw.maximumBitrateKbps").unwrap_or(20_000); + // Client codec choice (moonlight-common-c SdpGenerator.c): 0=H264, 1=HEVC, 2=AV1. let codec = match map.get("x-nv-vqos[0].bitStreamFormat").map(|s| s.trim()) { Some("1") => Codec::H265, Some("2") => Codec::Av1, _ => Codec::H264, }; + // 10-bit/HDR request flag. We never advertise the Main10 SCM bits, so a compliant + // client can't ask — if one does anyway, stream 8-bit SDR rather than failing. + if parse_u("x-nv-video[0].dynamicRangeMode").unwrap_or(0) != 0 { + tracing::warn!( + "client requested HDR/10-bit (dynamicRangeMode != 0) — not advertised/supported, \ + streaming 8-bit SDR" + ); + } // Parity floor the client asks for (protects small frames); clamp to a sane max. let min_fec = parse_u("x-nv-vqos[0].fec.minRequiredFecPackets") .unwrap_or(2) @@ -276,6 +305,33 @@ fn stream_config(map: &HashMap) -> Option { }) } +/// Map the negotiated ANNOUNCE keys to the session [`audio::AudioParams`]. Attribute names +/// per moonlight-common-c `SdpGenerator.c` (verified 2026-06-10): the client always emits +/// `x-nv-audio.surround.numChannels`/`channelMask` and `x-nv-aqos.packetDuration`; +/// `x-nv-audio.surround.AudioQuality` is 1 only when it saw our second surround-params line +/// and opted into high-quality surround. Unknown channel counts fall back to stereo. +fn audio_params(map: &HashMap) -> audio::AudioParams { + let parse_u = |k: &str| map.get(k).and_then(|s| s.trim().parse::().ok()); + let requested = parse_u("x-nv-audio.surround.numChannels").unwrap_or(2); + let channels = match requested { + 2 | 6 | 8 => requested as u8, + other => { + tracing::warn!(channels = other, "unsupported channel count — using stereo"); + 2 + } + }; + let high_quality = parse_u("x-nv-audio.surround.AudioQuality") == Some(1); + // Moonlight uses 5 ms (default) or 10 ms (slow decoder / low-bitrate links). + let packet_duration_ms = parse_u("x-nv-aqos.packetDuration") + .map(|d| d.clamp(5, 10) as u8) + .unwrap_or(5); + audio::AudioParams { + channels, + high_quality, + packet_duration_ms, + } +} + /// Extract the stream type from a SETUP URI like `…/streamid=video/0/0`. fn stream_type(uri: &str) -> Option<&str> { let after = uri.split("streamid=").nth(1)?; @@ -316,3 +372,113 @@ fn header_value<'a>(head: &'a str, key_lower: &str) -> Option<&'a str> { (k.trim().eq_ignore_ascii_case(key_lower)).then(|| v.trim_start()) }) } + +#[cfg(test)] +mod tests { + use super::*; + + fn announce(extra: &[(&str, &str)]) -> HashMap { + let mut body = String::from( + "v=0\r\n\ + a=x-nv-video[0].clientViewportWd:1920\r\n\ + a=x-nv-video[0].clientViewportHt:1080\r\n\ + a=x-nv-video[0].packetSize:1392\r\n\ + a=x-nv-video[0].maxFPS:120\r\n\ + a=x-nv-vqos[0].bw.maximumBitrateKbps:40000\r\n", + ); + for (k, v) in extra { + body.push_str(&format!("a={k}:{v}\r\n")); + } + parse_announce(&body) + } + + /// `x-nv-vqos[0].bitStreamFormat` → codec (0=H264, 1=HEVC, 2=AV1; missing = H264). + #[test] + fn announce_codec_selection() { + for (fmt, codec) in [ + (Some("0"), Codec::H264), + (Some("1"), Codec::H265), + (Some("2"), Codec::Av1), + (None, Codec::H264), + ] { + let map = match fmt { + Some(f) => announce(&[("x-nv-vqos[0].bitStreamFormat", f)]), + None => announce(&[]), + }; + let cfg = stream_config(&map).expect("required keys present"); + assert_eq!(cfg.codec, codec, "bitStreamFormat {fmt:?}"); + assert_eq!((cfg.width, cfg.height, cfg.fps), (1920, 1080, 120)); + assert_eq!(cfg.bitrate_kbps, 40_000); + } + } + + /// Missing required video keys → no config (the PLAY handler then refuses to stream). + #[test] + fn announce_missing_required_keys() { + let mut map = announce(&[]); + map.remove("x-nv-video[0].packetSize"); + assert!(stream_config(&map).is_none()); + } + + /// Audio negotiation: numChannels/AudioQuality/packetDuration, with Moonlight defaults. + #[test] + fn announce_audio_params() { + // Stereo defaults when the attributes are absent (and the legacy path). + assert_eq!(audio_params(&announce(&[])), audio::AudioParams::default()); + // 5.1 normal quality at 5 ms. + let ap = audio_params(&announce(&[ + ("x-nv-audio.surround.numChannels", "6"), + ("x-nv-audio.surround.channelMask", "63"), + ("x-nv-audio.surround.AudioQuality", "0"), + ("x-nv-aqos.packetDuration", "5"), + ])); + assert_eq!( + (ap.channels, ap.high_quality, ap.packet_duration_ms), + (6, false, 5) + ); + // 7.1 high quality; 10 ms duration honored. + let ap = audio_params(&announce(&[ + ("x-nv-audio.surround.numChannels", "8"), + ("x-nv-audio.surround.AudioQuality", "1"), + ("x-nv-aqos.packetDuration", "10"), + ])); + assert_eq!( + (ap.channels, ap.high_quality, ap.packet_duration_ms), + (8, true, 10) + ); + // Bogus channel count falls back to stereo. + let ap = audio_params(&announce(&[("x-nv-audio.surround.numChannels", "4")])); + assert_eq!(ap.channels, 2); + } + + /// The DESCRIBE SDP carries the codec indicators and all six Opus configs, normal + /// quality before high quality per channel count (the client takes the first match as + /// its normal config and a second match as HQ). + #[test] + fn describe_advertises_codecs_and_surround() { + let sdp = describe_sdp(); + assert!( + sdp.contains("sprop-parameter-sets=AAAAAU"), + "HEVC indicator" + ); + assert!(sdp.contains("a=rtpmap:98 AV1/90000"), "AV1 indicator"); + for params in [ + "21101", // stereo (clients hardcode this; emitted for Sunshine parity) + "642012453", // 5.1 normal — pre-rotated for the client's GFE-order swap + "660012345", // 5.1 high quality — verbatim + "85301245673", // 7.1 normal — pre-rotated over [3, 8) + "88001234567", // 7.1 high quality — verbatim + ] { + assert!( + sdp.contains(&format!("a=fmtp:97 surround-params={params}")), + "missing surround-params={params} in:\n{sdp}" + ); + } + // Normal precedes HQ for each surround channel count. + let n51 = sdp.find("surround-params=642").unwrap(); + let h51 = sdp.find("surround-params=660").unwrap(); + let n71 = sdp.find("surround-params=853").unwrap(); + let h71 = sdp.find("surround-params=880").unwrap(); + assert!(n51 < h51 && n71 < h71); + } +} diff --git a/crates/punktfunk-host/src/gamestream/serverinfo.rs b/crates/punktfunk-host/src/gamestream/serverinfo.rs index eda813e..cd0daee 100644 --- a/crates/punktfunk-host/src/gamestream/serverinfo.rs +++ b/crates/punktfunk-host/src/gamestream/serverinfo.rs @@ -40,3 +40,40 @@ pub fn serverinfo_xml(host: &Host, https: bool) -> String { local_ip = host.local_ip, ) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::gamestream::{SCM_AV1_MAIN8, SCM_H264, SCM_HEVC, SCM_HEVC_MAIN10}; + + /// The advertised codec mask: H.264 + HEVC + AV1 Main8 (= 65793), and explicitly *no* + /// 10-bit bits — Moonlight gates its HDR mode on those, which we can't deliver (8-bit + /// SDR capture). Flag values are moonlight-common-c `Limelight.h`. + #[test] + fn codec_mode_support_mask() { + assert_eq!(SERVER_CODEC_MODE_SUPPORT, 0x1 | 0x100 | 0x10000); + assert_eq!(SERVER_CODEC_MODE_SUPPORT, 65793); + assert_eq!( + SERVER_CODEC_MODE_SUPPORT & SCM_HEVC_MAIN10, + 0, + "no 10-bit/HDR claim" + ); + assert_eq!( + SERVER_CODEC_MODE_SUPPORT, + SCM_H264 | SCM_HEVC | SCM_AV1_MAIN8 + ); + } + + #[test] + fn serverinfo_xml_carries_codec_mask() { + let host = Host { + hostname: "test".into(), + uniqueid: "uid".into(), + local_ip: std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), + http_port: 47989, + https_port: 47984, + }; + let xml = serverinfo_xml(&host, false); + assert!(xml.contains("65793")); + } +} diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index b9dfdcd..957642e 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -164,7 +164,7 @@ async fn serve(opts: M3Options) -> Result<()> { ); // One audio capturer for the whole host lifetime, handed from session to session - // (PipeWire streams have no cheap teardown — see AudioCapSlot). + // (avoids a PipeWire stream setup per session — see AudioCapSlot). let audio_cap: AudioCapSlot = Arc::new(std::sync::Mutex::new(None)); let paired_at = match &opts.paired_store { Some(p) => p.clone(), @@ -214,8 +214,8 @@ async fn serve(opts: M3Options) -> Result<()> { const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); /// Persistent audio-capturer slot, reused across sessions (same pattern as the GameStream -/// path): `PwAudioCapturer` has no teardown — dropping one per session would leak its -/// PipeWire thread + core connection + live capture node on the daemon every session. +/// path): keeps one warm PipeWire capture stream instead of a connect/negotiate cycle — +/// and a daemon-side node churn — per session. (Drop now tears a capturer down cleanly.) type AudioCapSlot = Arc>>>; /// Pairing needs a human in the loop (reading the PIN off the host, typing it into the @@ -660,7 +660,7 @@ fn audio_thread(conn: quinn::Connection, stop: Arc, audio_cap: Audio c.drain(); // discard audio captured between sessions c } - None => match crate::audio::open_audio_capture() { + None => match crate::audio::open_audio_capture(CHANNELS as u32) { Ok(c) => c, Err(e) => { tracing::warn!(error = %format!("{e:#}"), "punktfunk/1 audio unavailable — session continues without it");