diff --git a/Cargo.lock b/Cargo.lock index 114e2f9..98e3cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2547,6 +2547,7 @@ dependencies = [ "jni", "log", "ndk", + "opus", "punktfunk-core", ] diff --git a/clients/android/app/src/main/kotlin/io/unom/punktfunk/MainActivity.kt b/clients/android/app/src/main/kotlin/io/unom/punktfunk/MainActivity.kt index a988940..7843e98 100644 --- a/clients/android/app/src/main/kotlin/io/unom/punktfunk/MainActivity.kt +++ b/clients/android/app/src/main/kotlin/io/unom/punktfunk/MainActivity.kt @@ -137,7 +137,8 @@ private fun StreamScreen(handle: Long, onDisconnect: () -> Unit) { window?.addFlags(WindowManager.LayoutParams.FLAG_KEEP_SCREEN_ON) onDispose { window?.clearFlags(WindowManager.LayoutParams.FLAG_KEEP_SCREEN_ON) - // Leaving the stream: stop the decode thread and tear down the session. + // Leaving the stream: stop the audio + decode threads and tear down the session. + NativeBridge.nativeStopAudio(handle) NativeBridge.nativeStopVideo(handle) NativeBridge.nativeClose(handle) } @@ -152,11 +153,13 @@ private fun StreamScreen(handle: Long, onDisconnect: () -> Unit) { holder.addCallback(object : SurfaceHolder.Callback { override fun surfaceCreated(holder: SurfaceHolder) { NativeBridge.nativeStartVideo(handle, holder.surface) + NativeBridge.nativeStartAudio(handle) } override fun surfaceChanged(holder: SurfaceHolder, format: Int, width: Int, height: Int) {} override fun surfaceDestroyed(holder: SurfaceHolder) { + NativeBridge.nativeStopAudio(handle) NativeBridge.nativeStopVideo(handle) } }) diff --git a/clients/android/kit/build.gradle.kts b/clients/android/kit/build.gradle.kts index a4f6f4c..4ab22e6 100644 --- a/clients/android/kit/build.gradle.kts +++ b/clients/android/kit/build.gradle.kts @@ -56,14 +56,29 @@ fun registerCargoNdk(taskName: String, release: Boolean) = description = "cargo-ndk build of punktfunk-android (${if (release) "release" else "debug"})" workingDir = repoRoot val sdk = androidSdkDir() - // A GUI Android Studio launch does not source the login shell, so make cargo + the NDK - // discoverable explicitly (works the same from a bare CLI). - environment("PATH", cargoBin + File.pathSeparator + System.getenv("PATH")) + // A GUI Android Studio launch does not source the login shell, so make cargo, the NDK, and + // cmake (libopus builds via the cmake crate) discoverable explicitly — same as a bare CLI. + val cmakeBin = "$sdk/cmake/3.22.1/bin" + environment( + "PATH", + cargoBin + File.pathSeparator + cmakeBin + File.pathSeparator + System.getenv("PATH"), + ) environment("ANDROID_HOME", sdk) environment("ANDROID_NDK_HOME", "$sdk/ndk/$ndkVer") + // CMake's built-in Android support (used by the cmake crate for libopus) finds the NDK via + // these, and uses Ninja (bundled next to the SDK cmake) since there's no `make`. + environment("ANDROID_NDK_ROOT", "$sdk/ndk/$ndkVer") + environment("ANDROID_NDK", "$sdk/ndk/$ndkVer") + environment("CMAKE_GENERATOR", "Ninja") + // audiopus_sys picks static-vs-dynamic by HOST not target — force the bundled static libopus + // (pure C) so the android .so links it instead of looking for the host's libopus.so. + environment("LIBOPUS_STATIC", "1") + environment("LIBOPUS_NO_PKG", "1") val cmd = mutableListOf( "cargo", "ndk", "-t", "arm64-v8a", "-t", "x86_64", + // Link against the minSdk-31 sysroot so libaaudio (API 26+) is found. + "--platform", "31", "-o", file("src/main/jniLibs").absolutePath, "build", "-p", "punktfunk-android", ) diff --git a/clients/android/kit/src/main/kotlin/io/unom/punktfunk/kit/NativeBridge.kt b/clients/android/kit/src/main/kotlin/io/unom/punktfunk/kit/NativeBridge.kt index 72a0a43..2138f2b 100644 --- a/clients/android/kit/src/main/kotlin/io/unom/punktfunk/kit/NativeBridge.kt +++ b/clients/android/kit/src/main/kotlin/io/unom/punktfunk/kit/NativeBridge.kt @@ -37,4 +37,13 @@ object NativeBridge { /** Stop + join the decode thread without closing the session. No-op on `0`. */ external fun nativeStopVideo(handle: Long) + + /** + * Start host→client audio: Opus decode → jitter ring → AAudio (LowLatency), all in Rust. No-op + * if already started. Best-effort — a failure leaves video streaming. + */ + external fun nativeStartAudio(handle: Long) + + /** Stop + join the audio thread and close AAudio, without closing the session. No-op on `0`. */ + external fun nativeStopAudio(handle: Long) } diff --git a/crates/punktfunk-android/Cargo.toml b/crates/punktfunk-android/Cargo.toml index 880f170..8685c9f 100644 --- a/crates/punktfunk-android/Cargo.toml +++ b/crates/punktfunk-android/Cargo.toml @@ -25,7 +25,11 @@ log = "0.4" # `ndk` and Oboe/Opus audio later) is only pulled in for the real `*-linux-android` targets. [target.'cfg(target_os = "android")'.dependencies] android_logger = "0.14" -# NDK bindings for the per-frame video path: AMediaCodec (HEVC hardware decode) + ANativeWindow -# (the SurfaceView surface). Links libmediandk/libnativewindow. Decode runs entirely in Rust — no -# per-frame JNI crossing (the "no async / native threads on the hot path" invariant). -ndk = { version = "0.9", features = ["media"] } +# NDK bindings. "media" = AMediaCodec/ANativeWindow (video); "audio" = AAudio (audio playback). +# Pure-Rust FFI to libmediandk/libnativewindow/libaaudio — no C++/libc++_shared to bundle. Decode + +# audio run entirely in Rust on native threads (the "no async on the hot path" invariant). +ndk = { version = "0.9", features = ["media", "audio"] } +# Opus decode for the host→client audio plane (0xC9: 48 kHz stereo, 5 ms frames). Same crate the +# host + Linux client use. audiopus_sys vendors libopus (pure C) and builds it static via cmake — +# the cargo-ndk build sets LIBOPUS_STATIC=1/LIBOPUS_NO_PKG=1 so it links the bundled lib, not the host's. +opus = "0.3" diff --git a/crates/punktfunk-android/src/audio.rs b/crates/punktfunk-android/src/audio.rs new file mode 100644 index 0000000..2df785c --- /dev/null +++ b/crates/punktfunk-android/src/audio.rs @@ -0,0 +1,196 @@ +//! Android audio playback (android-only): pull Opus packets from the connector, decode to +//! interleaved f32 stereo, and feed AAudio (LowLatency) via its realtime data callback through a +//! jitter ring. Mirrors [`crate::decode`]: one thread we own (the Opus decode producer) plus a +//! shutdown flag; the realtime callback thread is owned by AAudio. Ring logic ported from +//! `punktfunk-client-linux/src/audio.rs` (prime ~3 quanta, drop-oldest cap, re-prime on drain). + +use ndk::audio::{ + AudioCallbackResult, AudioDirection, AudioFormat, AudioPerformanceMode, AudioSharingMode, + AudioStream, AudioStreamBuilder, +}; +use punktfunk_core::client::NativeClient; +use punktfunk_core::error::PunktfunkError; +use std::collections::VecDeque; +use std::ffi::c_void; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::mpsc::{sync_channel, SyncSender, TrySendError}; +use std::sync::Arc; +use std::time::Duration; + +const CHANNELS: usize = 2; +const SAMPLE_RATE: i32 = 48_000; +/// Decoded-chunk hand-off depth: 64 × 5 ms = 320 ms slack (matches the core's AUDIO_QUEUE). +const RING_CHUNKS: usize = 64; +/// Opus decode scratch: worst-case 120 ms stereo frame (5760 samples/ch × 2 ch). +const PCM_SCRATCH: usize = 5760 * CHANNELS; + +/// Diagnostics — written by the decode thread + the realtime callback, logged periodically. The +/// audio analogue of the video `fed`/`rendered` counters (we can't "screenshot" sound). +#[derive(Default)] +struct Counters { + opus_decoded: AtomicU64, // Opus packets decoded OK (~200/s at 5 ms frames) + pcm_written: AtomicU64, // PCM frames copied out to AAudio (device clock is pulling) + underruns: AtomicU64, // callbacks that emitted silence (ring not primed / drained) + ring_depth: AtomicU64, // ring sample count at the last callback +} + +/// Owned by [`crate::session::SessionHandle`]: the live AAudio stream + the decode thread. +pub struct AudioPlayback { + _stream: AudioStream, // dropping it stops + closes the AAudio stream + shutdown: Arc, + join: Option>, +} + +impl AudioPlayback { + /// Open AAudio (LowLatency, 48 kHz/stereo/f32) with a realtime callback draining a jitter ring, + /// then spawn the Opus decode thread. `None` on failure (the caller leaves video streaming). + pub fn start(client: Arc) -> Option { + let counters = Arc::new(Counters::default()); + let (tx, rx) = sync_channel::>(RING_CHUNKS); + + // Realtime consumer state, owned by the callback (FnMut) — no lock: AAudio calls it from a + // single high-priority thread, and the decode thread only touches `tx`. + let cb_counters = counters.clone(); + let mut ring: VecDeque = VecDeque::with_capacity(PCM_SCRATCH); + let mut primed = false; + let callback = move |_s: &AudioStream, data: *mut c_void, num_frames: i32| { + let want = num_frames as usize * CHANNELS; + // SAFETY: AAudio provides `num_frames * channel_count` F32 slots at `data`. + let out = unsafe { std::slice::from_raw_parts_mut(data as *mut f32, want) }; + while let Ok(chunk) = rx.try_recv() { + ring.extend(chunk); + } + // Prime to ~3 quanta (15 ms; floor 15 ms / ceiling 200 ms); drop OLDEST above the cap. + let target = (3 * want).clamp(720 * CHANNELS, 9600 * CHANNELS); + while ring.len() > target.max(want) + want { + ring.pop_front(); + } + if !primed && ring.len() >= target { + primed = true; + } + if primed { + for slot in out.iter_mut() { + *slot = ring.pop_front().unwrap_or(0.0); + } + cb_counters + .pcm_written + .fetch_add(num_frames as u64, Ordering::Relaxed); + } else { + out.fill(0.0); + cb_counters.underruns.fetch_add(1, Ordering::Relaxed); + } + if ring.is_empty() { + primed = false; // re-prime after a genuine drain (avoids sustained crackle on loss) + } + cb_counters + .ring_depth + .store(ring.len() as u64, Ordering::Relaxed); + AudioCallbackResult::Continue + }; + + let stream = AudioStreamBuilder::new() + .map_err(|e| log::error!("audio: AudioStreamBuilder::new: {e}")) + .ok()? + .direction(AudioDirection::Output) + .sample_rate(SAMPLE_RATE) + .channel_count(CHANNELS as i32) + .format(AudioFormat::PCM_Float) + .performance_mode(AudioPerformanceMode::LowLatency) + .sharing_mode(AudioSharingMode::Shared) + .data_callback(Box::new(callback)) + .error_callback(Box::new(|_s, e| { + log::warn!("audio: AAudio error (device reroute/disconnect?): {e:?}"); + })) + .open_stream() + .map_err(|e| log::error!("audio: open_stream: {e}")) + .ok()?; + + if let Err(e) = stream.request_start() { + log::error!("audio: request_start: {e}"); + return None; + } + log::info!( + "audio: AAudio started rate={} ch={} fmt={:?} burst={}", + stream.sample_rate(), + stream.channel_count(), + stream.format(), + stream.frames_per_burst(), + ); + + let shutdown = Arc::new(AtomicBool::new(false)); + let sd = shutdown.clone(); + let join = std::thread::Builder::new() + .name("pf-audio".into()) + .spawn(move || decode_loop(client, tx, sd, counters)) + .ok(); + + Some(AudioPlayback { + _stream: stream, + shutdown, + join, + }) + } +} + +impl Drop for AudioPlayback { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::SeqCst); + if let Some(j) = self.join.take() { + let _ = j.join(); + } + // `_stream` drops here → AAudio request_stop + close. + } +} + +/// Producer: `next_audio` → Opus `decode_float` → push interleaved f32 into the ring channel. +fn decode_loop( + client: Arc, + tx: SyncSender>, + shutdown: Arc, + counters: Arc, +) { + let mut dec = match opus::Decoder::new(SAMPLE_RATE as u32, opus::Channels::Stereo) { + Ok(d) => d, + Err(e) => { + log::error!("audio: opus decoder init: {e} — audio disabled"); + return; + } + }; + let mut pcm = vec![0f32; PCM_SCRATCH]; + let mut window_peak = 0f32; // loudest |sample| since the last log — tells a tone from silence + while !shutdown.load(Ordering::Relaxed) { + match client.next_audio(Duration::from_millis(5)) { + Ok(pkt) => match dec.decode_float(&pkt.data, &mut pcm, false) { + Ok(samples) => { + let n = samples * CHANNELS; + for &s in &pcm[..n] { + window_peak = window_peak.max(s.abs()); + } + let count = counters.opus_decoded.fetch_add(1, Ordering::Relaxed) + 1; + match tx.try_send(pcm[..n].to_vec()) { + Ok(()) | Err(TrySendError::Full(_)) => {} // drop-newest under backpressure + Err(TrySendError::Disconnected(_)) => break, + } + if count % 600 == 0 { + log::info!( + "audio: opus={count} pcm_frames={} underruns={} ring={} peak={window_peak:.3}", + counters.pcm_written.load(Ordering::Relaxed), + counters.underruns.load(Ordering::Relaxed), + counters.ring_depth.load(Ordering::Relaxed), + ); + window_peak = 0.0; + } + } + Err(e) => log::debug!("audio: opus decode: {e}"), + }, + Err(PunktfunkError::NoFrame) => {} // timeout + Err(_) => break, // session closed + } + } + log::info!( + "audio: stopped (opus={} pcm_frames={} underruns={})", + counters.opus_decoded.load(Ordering::Relaxed), + counters.pcm_written.load(Ordering::Relaxed), + counters.underruns.load(Ordering::Relaxed), + ); +} diff --git a/crates/punktfunk-android/src/lib.rs b/crates/punktfunk-android/src/lib.rs index 0dc3162..5093617 100644 --- a/crates/punktfunk-android/src/lib.rs +++ b/crates/punktfunk-android/src/lib.rs @@ -21,6 +21,8 @@ use jni::objects::JObject; use jni::sys::jint; use jni::JNIEnv; +#[cfg(target_os = "android")] +mod audio; #[cfg(target_os = "android")] mod decode; mod session; diff --git a/crates/punktfunk-android/src/session.rs b/crates/punktfunk-android/src/session.rs index 8e66e96..f8f09f0 100644 --- a/crates/punktfunk-android/src/session.rs +++ b/crates/punktfunk-android/src/session.rs @@ -28,6 +28,8 @@ pub(crate) struct SessionHandle { #[cfg_attr(not(target_os = "android"), allow(dead_code))] pub client: Arc, video: Mutex>, + #[cfg(target_os = "android")] + audio: Mutex>, } struct VideoThread { @@ -45,11 +47,20 @@ impl SessionHandle { } } } + + /// Stop + close audio playback. Dropping the [`crate::audio::AudioPlayback`] joins its decode + /// thread and closes the AAudio stream. Idempotent. + #[cfg(target_os = "android")] + fn stop_audio(&self) { + let _ = self.audio.lock().unwrap().take(); + } } impl Drop for SessionHandle { fn drop(&mut self) { self.stop_video(); + #[cfg(target_os = "android")] + self.stop_audio(); } } @@ -90,6 +101,8 @@ pub extern "system" fn Java_io_unom_punktfunk_kit_NativeBridge_nativeConnect<'lo let handle = SessionHandle { client: Arc::new(client), video: Mutex::new(None), + #[cfg(target_os = "android")] + audio: Mutex::new(None), }; Box::into_raw(Box::new(handle)) as jlong } @@ -175,3 +188,43 @@ pub extern "system" fn Java_io_unom_punktfunk_kit_NativeBridge_nativeStopVideo( h.stop_video(); } } + +/// `NativeBridge.nativeStartAudio(handle)` — start the Opus→AAudio playback thread. No-op if already +/// started or on a `0` handle. Best-effort: a failure leaves video streaming. +#[cfg(target_os = "android")] +#[no_mangle] +pub extern "system" fn Java_io_unom_punktfunk_kit_NativeBridge_nativeStartAudio( + _env: JNIEnv, + _this: JObject, + handle: jlong, +) { + if handle == 0 { + return; + } + // SAFETY: live handle per the nativeConnect/nativeClose contract. + let h = unsafe { &*(handle as *const SessionHandle) }; + let mut guard = h.audio.lock().unwrap(); + if guard.is_some() { + return; // already playing + } + match crate::audio::AudioPlayback::start(h.client.clone()) { + Some(p) => *guard = Some(p), + None => log::error!("nativeStartAudio: playback init failed (video unaffected)"), + } +} + +/// `NativeBridge.nativeStopAudio(handle)` — stop + join the audio thread and close AAudio (without +/// closing the session). No-op on `0`. +#[cfg(target_os = "android")] +#[no_mangle] +pub extern "system" fn Java_io_unom_punktfunk_kit_NativeBridge_nativeStopAudio( + _env: JNIEnv, + _this: JObject, + handle: jlong, +) { + if handle != 0 { + // SAFETY: live handle per the contract. + let h = unsafe { &*(handle as *const SessionHandle) }; + h.stop_audio(); + } +}