From 278a6330dee13809c95ada6126fccb39c796ed25 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Tue, 9 Jun 2026 10:39:22 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20M2=20P1.6=20=E2=80=94=20audio=20(Opus?= =?UTF-8?q?=20+=20AES-CBC)=20and=20steady-rate=20video=20pacing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A stock Moonlight client now gets video + full input + AUDIO from the from-scratch GameStream host (verified live end-to-end on a macOS client). Audio (audio.rs, audio/linux.rs, gamestream/audio.rs): - Capture the default PipeWire sink's monitor (system output) as interleaved f32 stereo @ 48kHz via stream.capture.sink, on its own thread. - Opus-encode 5ms/240-sample stereo frames (RESTRICTED_LOWDELAY, CBR) and send as GameStream RTP audio: 12-byte BE RTP_PACKET (packetType 97, seq+1/pkt, timestamp += packetDuration, ssrc 0) on UDP 48000, after learning the client endpoint from its port-learning ping. - Encrypt the Opus payload with AES-128-CBC (PKCS7), key = launch rikey, IV = BE32(rikeyid + seq) in [0..4]. Like the control stream, modern Moonlight always decrypts audio regardless of the negotiated flags — plaintext makes it log "Failed to decrypt audio packet" and play silence (diagnosed from the client log). RTP header stays in the clear. Scheme cross-checked against Sunshine stream.cpp/crypto.cpp + moonlight AudioStream.c. - Pace each frame to its 5ms slot (PipeWire delivers ~1024-frame buffers) to avoid bursts the client's jitter buffer hears as glitches. LUMEN_AUDIO_GAIN applies optional linear gain for quiet sources. - DESCRIBE SDP advertises the stereo Opus config (a=fmtp:97 surround-params). Video (stream.rs): pace at a steady ≤60fps, re-encoding the last captured frame when the compositor produces none. wlroots only emits on damage, so a static or slow-updating desktop previously starved the client into a "network too slow" abort; an unchanged frame costs a near-empty P-frame. Adds a non-blocking Capturer::try_latest (portal drains to the freshest queued frame). Misc: serialize pipewire init across the video + audio capture threads (pwinit.rs, std::sync::Once) to avoid a concurrent pw_init race. Deps: opus, cbc; libopus-dev in bootstrap-ubuntu.sh. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 41 +++++ crates/lumen-host/Cargo.toml | 3 + crates/lumen-host/src/audio.rs | 32 ++++ crates/lumen-host/src/audio/linux.rs | 168 +++++++++++++++++++++ crates/lumen-host/src/capture.rs | 8 + crates/lumen-host/src/capture/linux.rs | 20 ++- crates/lumen-host/src/gamestream/audio.rs | 161 ++++++++++++++++++++ crates/lumen-host/src/gamestream/mod.rs | 4 + crates/lumen-host/src/gamestream/rtsp.rs | 18 ++- crates/lumen-host/src/gamestream/stream.rs | 33 ++-- crates/lumen-host/src/main.rs | 2 + crates/lumen-host/src/pwinit.rs | 11 ++ scripts/bootstrap-ubuntu.sh | 1 + 13 files changed, 486 insertions(+), 16 deletions(-) create mode 100644 crates/lumen-host/src/audio.rs create mode 100644 crates/lumen-host/src/audio/linux.rs create mode 100644 crates/lumen-host/src/gamestream/audio.rs create mode 100644 crates/lumen-host/src/pwinit.rs diff --git a/Cargo.lock b/Cargo.lock index a26f7dd..6e3fed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "audiopus_sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62314a1546a2064e033665d658e88c620a62904be945f8147e6b16c3db9f8651" +dependencies = [ + "cmake", + "log", + "pkg-config", +] + [[package]] name = "autocfg" version = "1.5.1" @@ -413,6 +424,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.20.3" @@ -425,6 +445,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cbindgen" version = "0.29.3" @@ -1206,6 +1235,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ + "block-padding", "generic-array", ] @@ -1454,11 +1484,13 @@ dependencies = [ "ashpd", "axum", "axum-server", + "cbc", "ffmpeg-next", "hex", "libc", "lumen-core", "mdns-sd", + "opus", "pipewire", "rand 0.8.6", "rcgen", @@ -1700,6 +1732,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "opus" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3809943dff6fbad5f0484449ea26bdb9cb7d8efdf26ed50d3c7f227f69eb5c" +dependencies = [ + "audiopus_sys", +] + [[package]] name = "ordered-stream" version = "0.2.0" diff --git a/crates/lumen-host/Cargo.toml b/crates/lumen-host/Cargo.toml index 1f593a2..6ebbdd1 100644 --- a/crates/lumen-host/Cargo.toml +++ b/crates/lumen-host/Cargo.toml @@ -20,6 +20,7 @@ rsa = "0.9" sha2 = { version = "0.10", features = ["oid"] } aes = "0.8" aes-gcm = "0.10" +cbc = { version = "0.1", features = ["alloc"] } rand = "0.8" hex = "0.4" rcgen = { version = "0.13", default-features = false, features = ["aws_lc_rs", "pem"] } @@ -49,3 +50,5 @@ wayland-protocols-wlr = { version = "0.3", features = ["client"] } wayland-protocols-misc = { version = "0.3", features = ["client"] } # Builds/validates the xkb keymap uploaded to the virtual keyboard + tracks modifier state. xkbcommon = "0.8" +# Opus encode for the GameStream audio stream (links system libopus). +opus = "0.3" diff --git a/crates/lumen-host/src/audio.rs b/crates/lumen-host/src/audio.rs new file mode 100644 index 0000000..882d1b4 --- /dev/null +++ b/crates/lumen-host/src/audio.rs @@ -0,0 +1,32 @@ +//! Desktop audio capture for the GameStream audio stream. On Linux: a PipeWire stream that +//! records the default sink's monitor (i.e. everything playing out of the system), delivered +//! as interleaved `f32` stereo PCM at 48 kHz. The audio data plane (`gamestream::audio`) +//! reframes this into fixed Opus frames, encodes, and sends it. + +use anyhow::Result; + +/// Opus/GameStream audio is 48 kHz stereo. +pub const SAMPLE_RATE: u32 = 48_000; +pub const CHANNELS: usize = 2; + +/// Produces interleaved `f32` stereo PCM (L,R,L,R,…) at [`SAMPLE_RATE`]. Lives on its own +/// thread; never blocks the capture loop (drops if the consumer falls behind). +pub trait AudioCapturer: Send { + /// Block until the next chunk of interleaved samples is available (variable size). The + /// caller reframes into fixed Opus frames. + fn next_chunk(&mut self) -> Result>; +} + +/// Open a live capturer for the default sink monitor (system output) via PipeWire. +#[cfg(target_os = "linux")] +pub fn open_audio_capture() -> Result> { + linux::PwAudioCapturer::open().map(|c| Box::new(c) as Box) +} + +#[cfg(not(target_os = "linux"))] +pub fn open_audio_capture() -> Result> { + anyhow::bail!("audio capture requires Linux + PipeWire") +} + +#[cfg(target_os = "linux")] +mod linux; diff --git a/crates/lumen-host/src/audio/linux.rs b/crates/lumen-host/src/audio/linux.rs new file mode 100644 index 0000000..9f0067a --- /dev/null +++ b/crates/lumen-host/src/audio/linux.rs @@ -0,0 +1,168 @@ +//! PipeWire audio capture of the default sink's monitor (system output). +//! +//! Connects to the user's PipeWire daemon (via `XDG_RUNTIME_DIR`, inherited from the Sway +//! session) and opens an input stream with `stream.capture.sink=true`, which routes the +//! default sink's monitor into us — no portal needed (unlike screen capture). The (`!Send`) +//! MainLoop/Stream live on a dedicated thread; interleaved `f32` chunks leave over a bounded +//! channel (dropped if the encoder falls behind, never blocking the PipeWire loop). + +use super::{AudioCapturer, CHANNELS, SAMPLE_RATE}; +use anyhow::{anyhow, Context, Result}; +use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError}; +use std::thread; +use std::time::Duration; + +pub struct PwAudioCapturer { + chunks: Receiver>, +} + +impl PwAudioCapturer { + pub fn open() -> Result { + let (tx, rx) = sync_channel::>(64); + thread::Builder::new() + .name("lumen-pw-audio".into()) + .spawn(move || { + if let Err(e) = pw_thread(tx) { + tracing::error!(error = %format!("{e:#}"), "pipewire audio thread failed"); + } + }) + .context("spawn pipewire audio thread")?; + Ok(PwAudioCapturer { chunks: rx }) + } +} + +impl AudioCapturer for PwAudioCapturer { + fn next_chunk(&mut self) -> Result> { + match self.chunks.recv_timeout(Duration::from_secs(5)) { + Ok(c) => Ok(c), + Err(RecvTimeoutError::Timeout) => Err(anyhow!("no PipeWire audio within 5s")), + Err(RecvTimeoutError::Disconnected) => Err(anyhow!("pipewire audio thread ended")), + } + } +} + +fn pw_thread(tx: std::sync::mpsc::SyncSender>) -> Result<()> { + use pipewire as pw; + use pw::{properties::properties, spa}; + use spa::param::audio::{AudioFormat, AudioInfoRaw}; + use spa::pod::Pod; + + crate::pwinit::ensure_init(); + let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw audio MainLoop")?; + let context = pw::context::ContextRc::new(&mainloop, None).context("pw audio Context")?; + let core = context + .connect_rc(None) + .context("pw audio connect (is PipeWire running in this session?)")?; + + let stream = pw::stream::StreamBox::new( + &core, + "lumen-audio", + properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Capture", + *pw::keys::MEDIA_ROLE => "Music", + // Capture the default sink's monitor (system output), not a microphone. + *pw::keys::STREAM_CAPTURE_SINK => "true", + // Ask for a ~5ms quantum (= one Opus frame) so buffers arrive smoothly rather than + // in large bursts the client's low-latency jitter buffer would hear as glitching. + *pw::keys::NODE_LATENCY => "240/48000", + }, + ) + .context("pw audio Stream")?; + + let _listener = stream + .add_local_listener_with_user_data(tx) + .state_changed(|_s, _ud, old, new| { + tracing::info!(?old, ?new, "pipewire audio stream state"); + }) + .param_changed(|_stream, _tx, id, param| { + let Some(param) = param else { return }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + let mut info = AudioInfoRaw::default(); + if info.parse(param).is_ok() { + tracing::info!( + format = ?info.format(), + rate = info.rate(), + channels = info.channels(), + "audio format negotiated" + ); + } + }) + .process(|stream, tx| { + let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; + }; + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let d = &mut datas[0]; + let (offset, size) = { + let c = d.chunk(); + (c.offset() as usize, c.size() as usize) + }; + let Some(buf) = d.data() else { return }; + if offset > buf.len() { + return; + } + let region = &buf[offset..(offset + size).min(buf.len())]; + // Negotiated as F32LE; reinterpret the byte region as interleaved f32. + let n = region.len() / 4; + static FIRST: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(true); + if FIRST.swap(false, std::sync::atomic::Ordering::Relaxed) { + tracing::info!(samples = n, frames = n / 2, "audio first capture buffer"); + } + let mut samples = Vec::with_capacity(n); + for i in 0..n { + let b = [ + region[i * 4], + region[i * 4 + 1], + region[i * 4 + 2], + region[i * 4 + 3], + ]; + samples.push(f32::from_le_bytes(b)); + } + let _ = tx.try_send(samples); // drop if the encoder is behind + })); + if outcome.is_err() { + tracing::error!("panic in pipewire audio callback — chunk dropped"); + } + }) + .register() + .context("register audio stream listener")?; + + // Request F32LE, 48 kHz, stereo. + let mut info = AudioInfoRaw::new(); + info.set_format(AudioFormat::F32LE); + info.set_rate(SAMPLE_RATE); + info.set_channels(CHANNELS as u32); + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .context("serialize audio format pod")? + .0 + .into_inner(); + let mut params = [Pod::from_bytes(&values).context("audio pod from bytes")?]; + + stream + .connect( + spa::utils::Direction::Input, + None, // PW_ID_ANY — autoconnect to the default sink monitor + pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS, + &mut params, + ) + .context("pw audio stream connect")?; + + mainloop.run(); + Ok(()) +} diff --git a/crates/lumen-host/src/capture.rs b/crates/lumen-host/src/capture.rs index 6bc26a6..012a746 100644 --- a/crates/lumen-host/src/capture.rs +++ b/crates/lumen-host/src/capture.rs @@ -50,6 +50,14 @@ pub struct CapturedFrame { /// over a bounded drop-oldest channel (never block the compositor). pub trait Capturer: Send { fn next_frame(&mut self) -> Result; + + /// Non-blocking: the freshest frame available since the last call, or `None` if none has + /// arrived (the caller reuses its last frame to hold a steady output rate). The default + /// just produces a frame each call — fine for instant synthetic sources; the portal + /// overrides it to drain its channel without blocking. + fn try_latest(&mut self) -> Result> { + self.next_frame().map(Some) + } } /// A deterministic moving test pattern (BGRx). Lets M0 exercise the encode → file → diff --git a/crates/lumen-host/src/capture/linux.rs b/crates/lumen-host/src/capture/linux.rs index 5c12297..d3575d1 100644 --- a/crates/lumen-host/src/capture/linux.rs +++ b/crates/lumen-host/src/capture/linux.rs @@ -18,7 +18,7 @@ use super::{CapturedFrame, Capturer, PixelFormat}; use anyhow::{anyhow, Context, Result}; use std::os::fd::OwnedFd; -use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError}; +use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, TryRecvError}; use std::thread; use std::time::Duration; @@ -70,6 +70,22 @@ impl Capturer for PortalCapturer { Err(RecvTimeoutError::Disconnected) => Err(anyhow!("PipeWire capture thread ended")), } } + + fn try_latest(&mut self) -> Result> { + // Drain to the newest queued frame without blocking; `None` means the compositor + // hasn't produced a new frame since last call (static/idle desktop). + let mut latest = None; + loop { + match self.frames.try_recv() { + Ok(frame) => latest = Some(frame), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(anyhow!("PipeWire capture thread ended")) + } + } + } + Ok(latest) + } } /// The portal handshake: connect ScreenCast, select a single monitor, start, open the @@ -192,7 +208,7 @@ mod pipewire { } pub fn pipewire_thread(fd: OwnedFd, node_id: u32, tx: SyncSender) -> Result<()> { - pw::init(); + crate::pwinit::ensure_init(); let mainloop = pw::main_loop::MainLoopRc::new(None).context("pw MainLoop")?; let context = pw::context::ContextRc::new(&mainloop, None).context("pw Context")?; diff --git a/crates/lumen-host/src/gamestream/audio.rs b/crates/lumen-host/src/gamestream/audio.rs new file mode 100644 index 0000000..22300e0 --- /dev/null +++ b/crates/lumen-host/src/gamestream/audio.rs @@ -0,0 +1,161 @@ +//! The audio data plane (UDP 48000). On RTSP PLAY we learn the client's audio endpoint from +//! its port-learning ping, capture the default-sink monitor, Opus-encode 5 ms stereo frames, +//! and send each as a GameStream RTP audio packet. +//! +//! Wire format (moonlight-common-c `AudioStream.c`): a 12-byte big-endian `RTP_PACKET` +//! (`packetType = 97`, `sequenceNumber++`, `timestamp += packetDuration`, `ssrc = 0`) +//! followed by the AES-128-CBC-encrypted Opus payload. Stereo Opus is a single coupled +//! multistream, so a plain `opus_encode` bitstream is what the client's multistream decoder +//! expects. Like the control stream, modern Moonlight always AES-CBC-decrypts audio (it +//! reports "Failed to decrypt audio packet" on plaintext), so we encrypt the payload under the +//! `/launch` `rikey` with a per-packet IV `BE32(rikeyid + seq)` (PKCS7 padding, RTP header +//! left in the clear). Reed-Solomon audio FEC is layered on top in P1.5. + +use super::AUDIO_PORT; +use crate::audio::{self, CHANNELS, SAMPLE_RATE}; +use anyhow::{Context, Result}; +use cbc::cipher::{block_padding::Pkcs7, BlockEncryptMut, KeyIvInit}; +use opus::{Application, Bitrate, Channels, Encoder}; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +type Aes128CbcEnc = cbc::Encryptor; + +/// Opus frame duration; 5 ms is moonlight's default (`x-nv-aqos.packetDuration`). +const FRAME_MS: usize = 5; +/// Samples per channel per Opus frame (48 kHz · 5 ms = 240). +const SAMPLES_PER_FRAME: usize = SAMPLE_RATE as usize * FRAME_MS / 1000; +/// RTP payload type for audio (moonlight `AudioStream.c` checks `packetType == 97`). +const AUDIO_PACKET_TYPE: u8 = 97; +const OPUS_BITRATE: i32 = 128_000; + +/// Spawn the audio stream thread (idempotent via `running`). Stops when `running` clears. +/// `gcm_key`/`rikeyid` come from `/launch` and key the AES-CBC payload encryption. +pub fn start(running: Arc, gcm_key: [u8; 16], rikeyid: i32) { + let _ = std::thread::Builder::new() + .name("lumen-audio".into()) + .spawn(move || { + tracing::info!("audio stream starting"); + if let Err(e) = run(&running, &gcm_key, rikeyid) { + tracing::error!(error = %format!("{e:#}"), "audio stream failed"); + } + running.store(false, Ordering::SeqCst); + tracing::info!("audio stream stopped"); + }); +} + +fn run(running: &AtomicBool, gcm_key: &[u8; 16], rikeyid: i32) -> Result<()> { + let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?; + // The client pings the audio port (~every 500ms) so we learn where to send. + sock.set_read_timeout(Some(Duration::from_secs(10)))?; + tracing::info!(port = AUDIO_PORT, "audio: awaiting client ping"); + let mut probe = [0u8; 256]; + let (_, client) = sock + .recv_from(&mut probe) + .context("audio: no client ping within 10s")?; + sock.connect(client) + .context("connect client audio endpoint")?; + tracing::info!(%client, "audio: client endpoint learned"); + + let mut cap = audio::open_audio_capture().context("open audio capture")?; + // RESTRICTED_LOWDELAY + CBR, matching Sunshine — CBR keeps the Opus TOC byte constant, + // which the client asserts per stream. + let mut enc = Encoder::new(SAMPLE_RATE, Channels::Stereo, Application::LowDelay) + .context("create Opus encoder")?; + enc.set_bitrate(Bitrate::Bits(OPUS_BITRATE)).ok(); + enc.set_vbr(false).ok(); + + let frame_len = SAMPLES_PER_FRAME * CHANNELS; // interleaved samples per Opus frame + let mut acc: Vec = Vec::with_capacity(frame_len * 4); + let mut out = vec![0u8; 1400]; + let mut seq: u16 = 0; + let mut timestamp: u32 = 0; + let mut sent: u64 = 0; + // Pacing anchor: PipeWire hands us large capture buffers (~1024 frames), so we'd otherwise + // emit packets in bursts the client's low-latency jitter buffer hears as glitching. Emit + // each frame at its 5 ms slot instead. Production is real-time, so the backlog stays small. + let start = Instant::now(); + let mut frame_no: u64 = 0; + // Optional linear gain for quiet capture sources (LUMEN_AUDIO_GAIN, default 1.0). + let gain: f32 = std::env::var("LUMEN_AUDIO_GAIN") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1.0); + + while running.load(Ordering::SeqCst) { + let chunk = cap.next_chunk().context("capture audio chunk")?; + acc.extend_from_slice(&chunk); + while acc.len() >= frame_len { + let mut frame: Vec = acc.drain(..frame_len).collect(); + if gain != 1.0 { + for s in &mut frame { + *s = (*s * gain).clamp(-1.0, 1.0); + } + } + let n = enc.encode_float(&frame, &mut out).context("opus encode")?; + // AES-128-CBC the Opus payload (RTP header stays plaintext). Per-packet IV = + // BE32(rikeyid + seq) in [0..4], zero elsewhere; PKCS7 padding. + let iv_seq = (rikeyid as u32).wrapping_add(seq as u32); + let mut iv = [0u8; 16]; + iv[0..4].copy_from_slice(&iv_seq.to_be_bytes()); + let ct = Aes128CbcEnc::new(gcm_key.into(), (&iv).into()) + .encrypt_padded_vec_mut::(&out[..n]); + let pkt = build_rtp(seq, timestamp, &ct); + if sock.send(&pkt).is_err() { + tracing::info!(sent, "audio: client unreachable — stopping"); + return Ok(()); + } + seq = seq.wrapping_add(1); + // GameStream's audio RTP timestamp ticks by packetDuration (ms), not by samples. + timestamp = timestamp.wrapping_add(FRAME_MS as u32); + sent += 1; + if sent % 400 == 0 { + tracing::info!(sent, "audio: streaming"); + } + + // Hold each frame to its 5 ms slot (skip if we've fallen behind a burst). + frame_no += 1; + let scheduled = start + Duration::from_millis(5 * frame_no); + let now = Instant::now(); + if scheduled > now { + std::thread::sleep((scheduled - now).min(Duration::from_millis(20))); + } + } + } + Ok(()) +} + +/// Build a GameStream RTP audio packet: 12-byte BE `RTP_PACKET` header + Opus payload. +fn build_rtp(seq: u16, timestamp: u32, opus: &[u8]) -> Vec { + let mut p = Vec::with_capacity(12 + opus.len()); + p.push(0x80); // RTP version 2, no padding/extension/CSRC + p.push(AUDIO_PACKET_TYPE); + p.extend_from_slice(&seq.to_be_bytes()); + p.extend_from_slice(×tamp.to_be_bytes()); + p.extend_from_slice(&0u32.to_be_bytes()); // ssrc + p.extend_from_slice(opus); + p +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rtp_header_layout() { + let p = build_rtp(0x0102, 0x03040506, &[0xaa, 0xbb]); + assert_eq!(p[0], 0x80); + assert_eq!(p[1], 97); + assert_eq!(&p[2..4], &[0x01, 0x02]); // seq BE + assert_eq!(&p[4..8], &[0x03, 0x04, 0x05, 0x06]); // timestamp BE + assert_eq!(&p[8..12], &[0, 0, 0, 0]); // ssrc + assert_eq!(&p[12..], &[0xaa, 0xbb]); // opus payload + } + + #[test] + fn frame_sizing() { + assert_eq!(SAMPLES_PER_FRAME, 240); + } +} diff --git a/crates/lumen-host/src/gamestream/mod.rs b/crates/lumen-host/src/gamestream/mod.rs index f85c570..7922801 100644 --- a/crates/lumen-host/src/gamestream/mod.rs +++ b/crates/lumen-host/src/gamestream/mod.rs @@ -6,6 +6,7 @@ //! Status: P1.1 — mDNS `_nvstream._tcp` advertisement + `/serverinfo`. Pairing, RTSP, and //! the media streams follow (see the M2 task list / plan). +mod audio; mod cert; mod control; mod crypto; @@ -86,6 +87,8 @@ pub struct AppState { pub stream: std::sync::Mutex>, /// True while the video stream thread is running (also its keep-running flag). pub streaming: std::sync::Arc, + /// True while the audio stream thread is running (also its keep-running flag). + pub audio_streaming: std::sync::Arc, } /// Run the GameStream control plane (blocks): mDNS advertisement + the nvhttp servers. @@ -100,6 +103,7 @@ pub fn serve() -> Result<()> { launch: std::sync::Mutex::new(None), stream: std::sync::Mutex::new(None), streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), + audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), }); tracing::info!( hostname = %state.host.hostname, diff --git a/crates/lumen-host/src/gamestream/rtsp.rs b/crates/lumen-host/src/gamestream/rtsp.rs index bdb14e4..f5898dd 100644 --- a/crates/lumen-host/src/gamestream/rtsp.rs +++ b/crates/lumen-host/src/gamestream/rtsp.rs @@ -7,6 +7,7 @@ //! Runs on its own native thread (control-plane setup, not the per-frame hot path), one //! thread per connection. Plaintext only for now (encryption is negotiated; P1.5). +use super::audio; use super::stream::{self, StreamConfig}; use super::{AppState, AUDIO_PORT, CONTROL_PORT, RTSP_PORT, VIDEO_PORT}; use crate::encode::Codec; @@ -171,10 +172,21 @@ fn handle_request(req: &Request, state: &AppState) -> String { Some(_) => tracing::info!("RTSP PLAY — stream already running"), None => tracing::warn!("RTSP PLAY — no negotiated config (ANNOUNCE missing)"), } + // Audio runs independently (stereo Opus on UDP 48000); it needs the launch key for + // the AES-CBC payload encryption the client expects. + let launch = *state.launch.lock().unwrap(); + if let Some(ls) = launch { + if !state.audio_streaming.swap(true, Ordering::SeqCst) { + tracing::info!("RTSP PLAY — starting audio stream"); + audio::start(state.audio_streaming.clone(), ls.gcm_key, ls.rikeyid); + } + } response(&req.cseq, &[("Session", "DEADBEEFCAFE;timeout = 90")], None) } "TEARDOWN" => { - state.streaming.store(false, Ordering::SeqCst); // signal the stream thread to stop + // Signal both stream threads to stop. + state.streaming.store(false, Ordering::SeqCst); + state.audio_streaming.store(false, Ordering::SeqCst); response(&req.cseq, &[], None) } other => { @@ -194,6 +206,10 @@ fn describe_sdp() -> String { "a=x-ss-general.encryptionRequested:0", "sprop-parameter-sets=AAAAAU", // HEVC capability indicator "a=rtpmap:98 AV1/90000", // AV1 capability indicator + // Opus config the client matches by channel count (Sunshine emits one per config): + // surround-params = channelCount, streams, coupledStreams, then the channel mapping. + // The client negotiated stereo, so advertise just that. + "a=fmtp:97 surround-params=21101", // stereo: 2ch, 1 stream, 1 coupled, mapping [0,1] "", ] .join("\r\n") diff --git a/crates/lumen-host/src/gamestream/stream.rs b/crates/lumen-host/src/gamestream/stream.rs index 95ed81e..bbd1faf 100644 --- a/crates/lumen-host/src/gamestream/stream.rs +++ b/crates/lumen-host/src/gamestream/stream.rs @@ -84,13 +84,23 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { .context("open NVENC for stream")?; let mut pk = VideoPacketizer::new(cfg.packet_size); - let frame_interval = Duration::from_secs_f64(1.0 / cfg.fps as f64); - let mut frame_idx: u32 = 0; + // Pace at a steady rate (capped at 60fps), re-encoding the last captured frame when the + // compositor produced no new one. wlroots only emits frames on damage, so a static or + // slow-updating desktop would otherwise starve the client into a "network too slow" abort. + // Re-encoding an unchanged frame is cheap — NVENC emits a near-empty P-frame. + let target_fps = cfg.fps.clamp(1, 60); + let frame_interval = Duration::from_secs_f64(1.0 / target_fps as f64); let mut sent_pkts: u64 = 0; + let mut fps_count: u32 = 0; + let mut fps_t = Instant::now(); let stream_start = Instant::now(); while running.load(Ordering::SeqCst) { let tick = Instant::now(); + // Advance to the freshest captured frame if one arrived; otherwise reuse the last. + if let Some(f) = capturer.try_latest().context("capture frame")? { + frame = f; + } enc.submit(&frame).context("encoder submit")?; // 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct. @@ -118,19 +128,16 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> { break; } - frame_idx += 1; - if frame_idx % (cfg.fps.max(1) * 2) == 0 { - tracing::info!(frame_idx, sent_pkts, "video: streaming"); + fps_count += 1; + if fps_t.elapsed() >= Duration::from_secs(1) { + tracing::info!(fps = fps_count, sent_pkts, "video: streaming"); + fps_count = 0; + fps_t = Instant::now(); } - // Synthetic produces instantly, so pace it; the portal's next_frame() blocks at the - // capture rate and paces itself. - if !use_portal { - let elapsed = tick.elapsed(); - if elapsed < frame_interval { - std::thread::sleep(frame_interval - elapsed); - } + let elapsed = tick.elapsed(); + if elapsed < frame_interval { + std::thread::sleep(frame_interval - elapsed); } - frame = capturer.next_frame().context("capture frame")?; } Ok(()) } diff --git a/crates/lumen-host/src/main.rs b/crates/lumen-host/src/main.rs index 5d379f2..44cf864 100644 --- a/crates/lumen-host/src/main.rs +++ b/crates/lumen-host/src/main.rs @@ -13,12 +13,14 @@ // Scaffold: trait methods and config paths are defined ahead of their backends. #![allow(dead_code)] +mod audio; mod capture; mod encode; mod gamestream; mod inject; mod m0; mod pipeline; +mod pwinit; mod vdisplay; mod web; diff --git a/crates/lumen-host/src/pwinit.rs b/crates/lumen-host/src/pwinit.rs new file mode 100644 index 0000000..a0b8323 --- /dev/null +++ b/crates/lumen-host/src/pwinit.rs @@ -0,0 +1,11 @@ +//! One-time PipeWire library initialization, shared by the video (portal) and audio capture +//! threads. `pw_init` must not be called concurrently from multiple threads on first use; both +//! capture paths connect to PipeWire at nearly the same moment (RTSP PLAY starts video + audio +//! together), so we serialize the init through a `Once`. + +#[cfg(target_os = "linux")] +pub fn ensure_init() { + use std::sync::Once; + static ONCE: Once = Once::new(); + ONCE.call_once(pipewire::init); +} diff --git a/scripts/bootstrap-ubuntu.sh b/scripts/bootstrap-ubuntu.sh index 61d1b68..3111600 100755 --- a/scripts/bootstrap-ubuntu.sh +++ b/scripts/bootstrap-ubuntu.sh @@ -90,6 +90,7 @@ apt_install "PipeWire + dev" pipewire pipewire-pulse wireplumber libpipewire- apt_install "desktop portals" xdg-desktop-portal xdg-desktop-portal-wlr xdg-desktop-portal-gtk apt_install "Sway + wlroots" sway swaybg xwayland wlr-randr foot seatd apt_install "Wayland dev" libwayland-dev wayland-protocols wayland-utils libxkbcommon-dev +apt_install "Opus (audio enc)" libopus-dev apt_install "DRM/EGL/GBM/VA" libdrm-dev libgbm-dev libgbm1 libegl-dev libegl1 libgles-dev mesa-common-dev libva-dev apt_install "capture + dbus" wf-recorder grim dbus-user-session drm-info mesa-utils apt_try "NVIDIA EGL platform (multiverse)" libnvidia-egl-wayland1 libnvidia-egl-gbm1