fix(android/audio): kill the AAudio crackle (RT-safe ring + deeper buffer + XRun sizing)
The jitter ring was a port of the Linux client's, but Linux runs on PipeWire (adaptive resampling masks host↔DAC drift + a shallow buffer); AAudio hands us a raw realtime callback and we own the buffer, so the same code crackled only on Android. Three converging causes, all fixed: - Heap free on the realtime audio thread every quantum (Android's Scudo free() has unbounded tail latency → XRun → click). Decoded buffers are now recycled back to the producer via a free-list instead of freed on the audio thread; the ring is pre-reserved so extend() never reallocates there. - The ring collapsed to ~15 ms on the tiny LowLatency burst and re-primed (a fresh silence) on every single empty callback. Now ~40 ms prime / ~150 ms hard cap, decoupled from the burst size, with de-prime hysteresis (re-prime only after a sustained drain). - AAudio's anti-glitch knobs were unused: prime the HW buffer above its 2-burst default and grow it on getXRunCount(). The post-open log now reports perf/sharing/buffer so a fall to a resampled legacy path is visible. Steady-state audio latency ~15 → ~40 ms (within lip-sync tolerance; matches the Moonlight/Sunshine operating point). cargo-ndk build both ABIs + fmt + clippy green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,17 @@
|
|||||||
//! Android audio playback (android-only): pull Opus packets from the connector, decode to
|
//! Android audio playback (android-only): pull Opus packets from the connector, decode to
|
||||||
//! interleaved f32 stereo, and feed AAudio (LowLatency) via its realtime data callback through a
|
//! interleaved f32 stereo, and feed AAudio (LowLatency) via its realtime data callback through a
|
||||||
//! jitter ring. Mirrors [`crate::decode`]: one thread we own (the Opus decode producer) plus a
|
//! jitter ring. Mirrors [`crate::decode`]: one thread we own (the Opus decode producer) plus a
|
||||||
//! shutdown flag; the realtime callback thread is owned by AAudio. Ring logic ported from
|
//! shutdown flag; the realtime callback thread is owned by AAudio.
|
||||||
//! `punktfunk-client-linux/src/audio.rs` (prime ~3 quanta, drop-oldest cap, re-prime on drain).
|
//!
|
||||||
|
//! The ring started as a port of `punktfunk-client-linux/src/audio.rs`, but AAudio — unlike
|
||||||
|
//! PipeWire, which adaptively rate-matches the stream and absorbs a shallow buffer — hands us a raw
|
||||||
|
//! realtime callback and makes us own the buffer. So this client diverges deliberately to stop the
|
||||||
|
//! Android-only crackle: (1) the callback is allocation/free-free — decoded buffers are recycled to
|
||||||
|
//! the producer via a free-list instead of being freed on the audio thread (Android's Scudo `free`
|
||||||
|
//! has unbounded tail latency); (2) the jitter ring is deeper (~40 ms prime / ~150 ms hard cap) and
|
||||||
|
//! decoupled from the tiny LowLatency burst size, with de-prime hysteresis so a transient drain
|
||||||
|
//! doesn't manufacture a silence; (3) the AAudio HW buffer is primed above its 2-burst default and
|
||||||
|
//! grown on XRuns (Google's anti-glitch technique).
|
||||||
|
|
||||||
use ndk::audio::{
|
use ndk::audio::{
|
||||||
AudioCallbackResult, AudioDirection, AudioFormat, AudioPerformanceMode, AudioSharingMode,
|
AudioCallbackResult, AudioDirection, AudioFormat, AudioPerformanceMode, AudioSharingMode,
|
||||||
@@ -13,7 +22,7 @@ use punktfunk_core::error::PunktfunkError;
|
|||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::ffi::c_void;
|
use std::ffi::c_void;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
|
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -24,6 +33,29 @@ const RING_CHUNKS: usize = 64;
|
|||||||
/// Opus decode scratch: worst-case 120 ms stereo frame (5760 samples/ch × 2 ch).
|
/// Opus decode scratch: worst-case 120 ms stereo frame (5760 samples/ch × 2 ch).
|
||||||
const PCM_SCRATCH: usize = 5760 * CHANNELS;
|
const PCM_SCRATCH: usize = 5760 * CHANNELS;
|
||||||
|
|
||||||
|
// --- Jitter-ring depths, in interleaved-f32 samples (all expressed in ms via `MS`). -----------
|
||||||
|
// Unlike the Linux client (PipeWire adaptively rate-matches the stream to the graph clock, masking
|
||||||
|
// host↔DAC drift + a shallow ring), AAudio hands us a raw callback and we own the buffer: drift and
|
||||||
|
// WiFi power-save bunching land as underruns/overflows = crackle. So Android runs a deliberately
|
||||||
|
// deeper, smoothly-managed ring than Linux — keep the two clients' depths intentionally divergent.
|
||||||
|
/// Interleaved f32 samples per millisecond (48 kHz × 2 ch).
|
||||||
|
const MS: usize = (SAMPLE_RATE as usize / 1000) * CHANNELS; // 96
|
||||||
|
/// Prime/target floor: fill to ~40 ms before playing (and after a sustained drain). Deep enough to
|
||||||
|
/// ride out WiFi arrival jitter + clock drift; the dominant Android-only anti-crackle lever.
|
||||||
|
const PRIME_FLOOR: usize = 40 * MS;
|
||||||
|
/// Ceiling for the burst-scaled target (so a large quantum can't push the prime depth too high).
|
||||||
|
const PRIME_CEIL: usize = 80 * MS;
|
||||||
|
/// Drop-oldest headroom above the target before trimming — a ~80 ms band swallows an arrival burst
|
||||||
|
/// without overflowing.
|
||||||
|
const JITTER_HEADROOM: usize = 80 * MS;
|
||||||
|
/// Hard latency bound: never let the ring exceed ~150 ms (the only thing that caps added latency).
|
||||||
|
const HARD_CAP: usize = 150 * MS;
|
||||||
|
/// Re-prime (go silent to refill) only after this many CONSECUTIVE empty callbacks, so one transient
|
||||||
|
/// drain doesn't manufacture a fresh 40 ms silence (the old `if ring.is_empty()` re-primed instantly).
|
||||||
|
const DEPRIME_AFTER_CALLBACKS: u32 = 5;
|
||||||
|
/// Throttle the AAudio XRun-driven HW-buffer grow check (cheap, but no need to poll every quantum).
|
||||||
|
const XRUN_CHECK_EVERY: u32 = 128;
|
||||||
|
|
||||||
/// Diagnostics — written by the decode thread + the realtime callback, logged periodically. The
|
/// Diagnostics — written by the decode thread + the realtime callback, logged periodically. The
|
||||||
/// audio analogue of the video `fed`/`rendered` counters (we can't "screenshot" sound).
|
/// audio analogue of the video `fed`/`rendered` counters (we can't "screenshot" sound).
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -47,22 +79,41 @@ impl AudioPlayback {
|
|||||||
pub fn start(client: Arc<NativeClient>) -> Option<AudioPlayback> {
|
pub fn start(client: Arc<NativeClient>) -> Option<AudioPlayback> {
|
||||||
let counters = Arc::new(Counters::default());
|
let counters = Arc::new(Counters::default());
|
||||||
let (tx, rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
let (tx, rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
||||||
|
// Recycle free-list: drained PCM buffers go BACK to the decode thread to be refilled, so the
|
||||||
|
// realtime callback never frees heap (Android's Scudo allocator has unbounded free() tail
|
||||||
|
// latency — a free on the audio thread is an XRun = a click) and the decode thread rarely
|
||||||
|
// allocates. Same depth as the data channel.
|
||||||
|
let (free_tx, free_rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
||||||
|
|
||||||
// Realtime consumer state, owned by the callback (FnMut) — no lock: AAudio calls it from a
|
// Realtime consumer state, owned by the callback (FnMut) — no lock: AAudio calls it from a
|
||||||
// single high-priority thread, and the decode thread only touches `tx`.
|
// single high-priority thread, and the decode thread only touches `tx`/`free_rx`.
|
||||||
let cb_counters = counters.clone();
|
let cb_counters = counters.clone();
|
||||||
let mut ring: VecDeque<f32> = VecDeque::with_capacity(PCM_SCRATCH);
|
// Pre-reserve the ring so `extend` never reallocates on the realtime thread. Worst transient
|
||||||
|
// before the trim below = the hard cap plus one full channel of 5 ms (480-f32) frames — the
|
||||||
|
// punktfunk protocol always sends 5 ms Opus frames (host `audio_thread`); a larger frame
|
||||||
|
// would force a one-time realloc, asserted (not silently corrupted) in `decode_loop`.
|
||||||
|
let mut ring: VecDeque<f32> = VecDeque::with_capacity(HARD_CAP + RING_CHUNKS * 5 * MS);
|
||||||
let mut primed = false;
|
let mut primed = false;
|
||||||
let callback = move |_s: &AudioStream, data: *mut c_void, num_frames: i32| {
|
let mut empties: u32 = 0; // consecutive empty callbacks (de-prime hysteresis)
|
||||||
|
let mut cb_count: u32 = 0; // callbacks since open (throttles the XRun grow check)
|
||||||
|
let mut last_xrun: i32 = 0; // last AAudio XRun count we grew the buffer for
|
||||||
|
let callback = move |s: &AudioStream, data: *mut c_void, num_frames: i32| {
|
||||||
let want = num_frames as usize * CHANNELS;
|
let want = num_frames as usize * CHANNELS;
|
||||||
// SAFETY: AAudio provides `num_frames * channel_count` F32 slots at `data`.
|
// SAFETY: AAudio provides `num_frames * channel_count` F32 slots at `data`.
|
||||||
let out = unsafe { std::slice::from_raw_parts_mut(data as *mut f32, want) };
|
let out = unsafe { std::slice::from_raw_parts_mut(data as *mut f32, want) };
|
||||||
while let Ok(chunk) = rx.try_recv() {
|
// Drain decoded chunks into the ring WITHOUT freeing on the RT thread: `drain(..)` empties
|
||||||
ring.extend(chunk);
|
// each Vec but keeps its capacity, then the empty buffer is handed back for reuse. The
|
||||||
|
// only RT-thread free is the rare case where the recycle channel is momentarily full.
|
||||||
|
while let Ok(mut chunk) = rx.try_recv() {
|
||||||
|
ring.extend(chunk.drain(..));
|
||||||
|
let _ = free_tx.try_send(chunk);
|
||||||
}
|
}
|
||||||
// Prime to ~3 quanta (15 ms; floor 15 ms / ceiling 200 ms); drop OLDEST above the cap.
|
// Jitter buffer: prime to ~40 ms (PRIME_FLOOR) before playing and after a sustained drain;
|
||||||
let target = (3 * want).clamp(720 * CHANNELS, 9600 * CHANNELS);
|
// drop-oldest only above a wide ~120 ms band. Decoupled from the AAudio burst `want` (tiny
|
||||||
while ring.len() > target.max(want) + want {
|
// on the LowLatency MMAP path) so the depth doesn't collapse to a single quantum.
|
||||||
|
let target = (3 * want).clamp(PRIME_FLOOR, PRIME_CEIL);
|
||||||
|
let hard_cap = (target + JITTER_HEADROOM).min(HARD_CAP);
|
||||||
|
while ring.len() > hard_cap {
|
||||||
ring.pop_front();
|
ring.pop_front();
|
||||||
}
|
}
|
||||||
if !primed && ring.len() >= target {
|
if !primed && ring.len() >= target {
|
||||||
@@ -79,12 +130,34 @@ impl AudioPlayback {
|
|||||||
out.fill(0.0);
|
out.fill(0.0);
|
||||||
cb_counters.underruns.fetch_add(1, Ordering::Relaxed);
|
cb_counters.underruns.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
// Re-prime only after a RUN of empty callbacks, not a single transient one — otherwise
|
||||||
|
// every momentary drain costs a fresh 40 ms silence (the old behaviour, self-inflicted
|
||||||
|
// crackle on any jitter spike).
|
||||||
if ring.is_empty() {
|
if ring.is_empty() {
|
||||||
primed = false; // re-prime after a genuine drain (avoids sustained crackle on loss)
|
empties += 1;
|
||||||
|
if empties >= DEPRIME_AFTER_CALLBACKS {
|
||||||
|
primed = false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
empties = 0;
|
||||||
}
|
}
|
||||||
cb_counters
|
cb_counters
|
||||||
.ring_depth
|
.ring_depth
|
||||||
.store(ring.len() as u64, Ordering::Relaxed);
|
.store(ring.len() as u64, Ordering::Relaxed);
|
||||||
|
// Google's AAudio anti-glitch technique: when the device reports new XRuns, grow the HW
|
||||||
|
// buffer by one burst (up to capacity). getXRunCount + setBufferSizeInFrames are both
|
||||||
|
// callback-safe / non-blocking, and set clamps to capacity so it self-limits. Throttled.
|
||||||
|
cb_count = cb_count.wrapping_add(1);
|
||||||
|
if cb_count % XRUN_CHECK_EVERY == 0 {
|
||||||
|
let xr = s.x_run_count();
|
||||||
|
if xr > last_xrun {
|
||||||
|
last_xrun = xr;
|
||||||
|
let burst = s.frames_per_burst().max(1);
|
||||||
|
let grown =
|
||||||
|
(s.buffer_size_in_frames() + burst).min(s.buffer_capacity_in_frames());
|
||||||
|
let _ = s.set_buffer_size_in_frames(grown);
|
||||||
|
}
|
||||||
|
}
|
||||||
AudioCallbackResult::Continue
|
AudioCallbackResult::Continue
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -109,19 +182,31 @@ impl AudioPlayback {
|
|||||||
log::error!("audio: request_start: {e}");
|
log::error!("audio: request_start: {e}");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
// Lift the AAudio HW buffer off its brittle ~2-burst LowLatency default so a single late
|
||||||
|
// callback doesn't immediately underrun; the in-callback XRun loop grows it further if the
|
||||||
|
// device still glitches. set_buffer_size_in_frames clamps to capacity.
|
||||||
|
let burst = stream.frames_per_burst().max(1);
|
||||||
|
let _ =
|
||||||
|
stream.set_buffer_size_in_frames((burst * 3).min(stream.buffer_capacity_in_frames()));
|
||||||
|
// perf != LowLatency or rate != 48000 means AAudio silently fell to a resampled legacy path
|
||||||
|
// (different burst behaviour) — surface it so the field can tell that apart from plain jitter.
|
||||||
log::info!(
|
log::info!(
|
||||||
"audio: AAudio started rate={} ch={} fmt={:?} burst={}",
|
"audio: AAudio started rate={} ch={} fmt={:?} perf={:?} share={:?} burst={} buf={}/{}",
|
||||||
stream.sample_rate(),
|
stream.sample_rate(),
|
||||||
stream.channel_count(),
|
stream.channel_count(),
|
||||||
stream.format(),
|
stream.format(),
|
||||||
|
stream.performance_mode(),
|
||||||
|
stream.sharing_mode(),
|
||||||
stream.frames_per_burst(),
|
stream.frames_per_burst(),
|
||||||
|
stream.buffer_size_in_frames(),
|
||||||
|
stream.buffer_capacity_in_frames(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let shutdown = Arc::new(AtomicBool::new(false));
|
let shutdown = Arc::new(AtomicBool::new(false));
|
||||||
let sd = shutdown.clone();
|
let sd = shutdown.clone();
|
||||||
let join = std::thread::Builder::new()
|
let join = std::thread::Builder::new()
|
||||||
.name("pf-audio".into())
|
.name("pf-audio".into())
|
||||||
.spawn(move || decode_loop(client, tx, sd, counters))
|
.spawn(move || decode_loop(client, tx, free_rx, sd, counters))
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
Some(AudioPlayback {
|
Some(AudioPlayback {
|
||||||
@@ -143,9 +228,12 @@ impl Drop for AudioPlayback {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Producer: `next_audio` → Opus `decode_float` → push interleaved f32 into the ring channel.
|
/// Producer: `next_audio` → Opus `decode_float` → push interleaved f32 into the ring channel.
|
||||||
|
/// Buffers come from (and return to) the realtime callback's recycle free-list so the steady state
|
||||||
|
/// is allocation-free on both threads.
|
||||||
fn decode_loop(
|
fn decode_loop(
|
||||||
client: Arc<NativeClient>,
|
client: Arc<NativeClient>,
|
||||||
tx: SyncSender<Vec<f32>>,
|
tx: SyncSender<Vec<f32>>,
|
||||||
|
free_rx: Receiver<Vec<f32>>,
|
||||||
shutdown: Arc<AtomicBool>,
|
shutdown: Arc<AtomicBool>,
|
||||||
counters: Arc<Counters>,
|
counters: Arc<Counters>,
|
||||||
) {
|
) {
|
||||||
@@ -166,8 +254,22 @@ fn decode_loop(
|
|||||||
for &s in &pcm[..n] {
|
for &s in &pcm[..n] {
|
||||||
window_peak = window_peak.max(s.abs());
|
window_peak = window_peak.max(s.abs());
|
||||||
}
|
}
|
||||||
|
// The ring's pre-reservation in `start` assumes the protocol's 5 ms (≤480-f32)
|
||||||
|
// frames; a larger frame would force a one-time realloc on the RT thread. Catch a
|
||||||
|
// future host frame-size change here in debug, not as a silent audio glitch.
|
||||||
|
debug_assert!(
|
||||||
|
n <= 5 * MS,
|
||||||
|
"audio frame {n} f32 exceeds the 5 ms ring reserve"
|
||||||
|
);
|
||||||
let count = counters.opus_decoded.fetch_add(1, Ordering::Relaxed) + 1;
|
let count = counters.opus_decoded.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
match tx.try_send(pcm[..n].to_vec()) {
|
// Reuse a recycled buffer if the callback handed one back; only allocate when the
|
||||||
|
// free-list is momentarily empty (startup / after a backpressure drop).
|
||||||
|
let mut buf = free_rx
|
||||||
|
.try_recv()
|
||||||
|
.unwrap_or_else(|_| Vec::with_capacity(PCM_SCRATCH));
|
||||||
|
buf.clear();
|
||||||
|
buf.extend_from_slice(&pcm[..n]);
|
||||||
|
match tx.try_send(buf) {
|
||||||
Ok(()) | Err(TrySendError::Full(_)) => {} // drop-newest under backpressure
|
Ok(()) | Err(TrySendError::Full(_)) => {} // drop-newest under backpressure
|
||||||
Err(TrySendError::Disconnected(_)) => break,
|
Err(TrySendError::Disconnected(_)) => break,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user