refactor(android): split session JNI into modules, HUD-gated stats, AAudio open retry
- native: the 756-line session.rs becomes session/{mod,connect,input,planes}.rs
around a SessionHandle (connect lifecycle + trust, input plane shims, plane
start/stop + stats drain).
- Decode-stats sampling is HUD-gated (nativeSetVideoStatsEnabled): with the
overlay hidden the decode thread skips the per-AU clock read + lock; enabling
resets the measurement window.
- audio: the AAudio open path is a per-sharing-mode try_open closure — the
realtime callback state (ring, prime, free-list) is rebuilt per attempt, so a
failed exclusive-mode try can't leak state into the shared-mode retry.
- Kotlin: ConnectScreen/StreamScreen slimmed by extracting ConnectDialogs,
StatsOverlay and TouchInput.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
//! Android microphone uplink (android-only): capture mic PCM via AAudio (LowLatency **input**),
|
||||
//! Opus-encode 20 ms stereo frames, and push them to the host over the connector's mic plane
|
||||
//! (`send_mic` → 0xCB datagram). The mirror of [`crate::audio`] in reverse: AAudio's realtime input
|
||||
//! callback hands captured interleaved f32 to a channel; a worker thread we own does the Opus encode
|
||||
//! + send (encoding is too heavy for the realtime callback, exactly as decode is on the playback
|
||||
//! side). Format matches the host decoder + the Linux client: 48 kHz **stereo**, 20 ms, Opus VOIP.
|
||||
//! callback hands captured interleaved f32 to a channel; a worker thread we own does the Opus
|
||||
//! encode + send (encoding is too heavy for the realtime callback, exactly as decode is on the
|
||||
//! playback side). Like the playback path, the realtime callback is allocation-free: captured
|
||||
//! bursts are copied into pre-allocated buffers from a recycle free-list (pool empty = drop the
|
||||
//! chunk, never allocate on the capture thread). Format matches the host decoder + the Linux
|
||||
//! client: 48 kHz **stereo**, 20 ms, Opus VOIP.
|
||||
|
||||
use ndk::audio::{
|
||||
AudioCallbackResult, AudioDirection, AudioFormat, AudioPerformanceMode, AudioSharingMode,
|
||||
@@ -13,7 +16,7 @@ use punktfunk_core::client::NativeClient;
|
||||
use std::collections::VecDeque;
|
||||
use std::ffi::c_void;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, TrySendError};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TrySendError};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
@@ -23,6 +26,10 @@ const SAMPLE_RATE: i32 = 48_000;
|
||||
const FRAME_SAMPLES: usize = 960;
|
||||
/// Captured-chunk hand-off depth (each ~ one burst); drops on overflow (best-effort uplink).
|
||||
const RING_CHUNKS: usize = 64;
|
||||
/// Free-list buffer capacity, in interleaved f32 samples: comfortably above a LowLatency input
|
||||
/// burst (typically ≤ ~480 frames). A device with larger bursts costs each buffer a one-time grow
|
||||
/// on the capture thread, after which the steady state is allocation-free again.
|
||||
const CHUNK_CAP_SAMPLES: usize = 1920; // 20 ms stereo
|
||||
/// Opus VOIP target bitrate (speech; tunable).
|
||||
const MIC_BITRATE: i32 = 64_000;
|
||||
|
||||
@@ -38,56 +45,109 @@ impl MicCapture {
|
||||
/// forwards captured PCM to a channel, then spawn the Opus encode + uplink thread. `None` on
|
||||
/// failure (the caller leaves the rest of the session streaming).
|
||||
pub fn start(client: Arc<NativeClient>) -> Option<MicCapture> {
|
||||
let (tx, rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
||||
let captured = Arc::new(AtomicU64::new(0));
|
||||
let cb_captured = captured.clone();
|
||||
// Chunks discarded on the capture thread (free-list empty / encoder lagging); logged
|
||||
// throttled from the encode worker.
|
||||
let dropped = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let callback = move |_s: &AudioStream, data: *mut c_void, num_frames: i32| {
|
||||
let n = num_frames as usize * CHANNELS;
|
||||
// SAFETY: for an input stream AAudio provides `num_frames * channel_count` captured F32
|
||||
// samples at `data` (read-only for us).
|
||||
let inp = unsafe { std::slice::from_raw_parts(data as *const f32, n) };
|
||||
match tx.try_send(inp.to_vec()) {
|
||||
Ok(()) | Err(TrySendError::Full(_)) => {} // drop-newest if the encoder lags
|
||||
Err(TrySendError::Disconnected(_)) => return AudioCallbackResult::Stop,
|
||||
// One open attempt at a given sharing mode (same pattern as [`crate::audio`]: `open_stream`
|
||||
// consumes the builder AND the callback, so each try rebuilds the channels it captures).
|
||||
let try_open = |sharing: AudioSharingMode| -> ndk::audio::Result<(
|
||||
AudioStream,
|
||||
Receiver<Vec<f32>>,
|
||||
SyncSender<Vec<f32>>,
|
||||
)> {
|
||||
let (tx, rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
||||
// Recycle free-list, mirroring the playback path: the realtime capture callback must
|
||||
// not touch the allocator (Android's Scudo has unbounded malloc/free tail latency — an
|
||||
// allocation here is a missed burst), so it pops a pre-allocated buffer, copies the
|
||||
// burst in and sends it; the encode worker returns drained buffers. Pool empty = DROP
|
||||
// the chunk (counted) rather than allocate.
|
||||
let (free_tx, free_rx) = sync_channel::<Vec<f32>>(RING_CHUNKS);
|
||||
for _ in 0..RING_CHUNKS {
|
||||
let _ = free_tx.try_send(Vec::with_capacity(CHUNK_CAP_SAMPLES));
|
||||
}
|
||||
cb_captured.fetch_add(num_frames as u64, Ordering::Relaxed);
|
||||
AudioCallbackResult::Continue
|
||||
let cb_captured = captured.clone();
|
||||
let cb_dropped = dropped.clone();
|
||||
let cb_free_tx = free_tx.clone(); // returns the buffer when the data channel is full
|
||||
|
||||
let callback = move |_s: &AudioStream, data: *mut c_void, num_frames: i32| {
|
||||
let n = num_frames as usize * CHANNELS;
|
||||
// SAFETY: for an input stream AAudio provides `num_frames * channel_count` captured
|
||||
// F32 samples at `data` (read-only for us).
|
||||
let inp = unsafe { std::slice::from_raw_parts(data as *const f32, n) };
|
||||
cb_captured.fetch_add(num_frames as u64, Ordering::Relaxed);
|
||||
match free_rx.try_recv() {
|
||||
Ok(mut buf) => {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(inp); // retained capacity — no realloc past the first
|
||||
match tx.try_send(buf) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(buf)) => {
|
||||
// Encoder lagging: drop the chunk, hand the buffer straight back.
|
||||
let _ = cb_free_tx.try_send(buf);
|
||||
cb_dropped.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(TrySendError::Disconnected(_)) => return AudioCallbackResult::Stop,
|
||||
}
|
||||
}
|
||||
// Pool empty (every buffer in flight): drop, never allocate on this thread.
|
||||
Err(_) => {
|
||||
cb_dropped.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
AudioCallbackResult::Continue
|
||||
};
|
||||
|
||||
let stream = AudioStreamBuilder::new()?
|
||||
.direction(AudioDirection::Input)
|
||||
.sample_rate(SAMPLE_RATE)
|
||||
.channel_count(CHANNELS as i32)
|
||||
.format(AudioFormat::PCM_Float)
|
||||
.performance_mode(AudioPerformanceMode::LowLatency)
|
||||
.sharing_mode(sharing)
|
||||
.data_callback(Box::new(callback))
|
||||
.error_callback(Box::new(|_s, e| {
|
||||
log::warn!("mic: AAudio error (device reroute/disconnect?): {e:?}");
|
||||
}))
|
||||
.open_stream()?;
|
||||
Ok((stream, rx, free_tx))
|
||||
};
|
||||
|
||||
let stream = AudioStreamBuilder::new()
|
||||
.map_err(|e| log::error!("mic: AudioStreamBuilder::new: {e}"))
|
||||
.ok()?
|
||||
.direction(AudioDirection::Input)
|
||||
.sample_rate(SAMPLE_RATE)
|
||||
.channel_count(CHANNELS as i32)
|
||||
.format(AudioFormat::PCM_Float)
|
||||
.performance_mode(AudioPerformanceMode::LowLatency)
|
||||
.sharing_mode(AudioSharingMode::Shared)
|
||||
.data_callback(Box::new(callback))
|
||||
.error_callback(Box::new(|_s, e| {
|
||||
log::warn!("mic: AAudio error (device reroute/disconnect?): {e:?}");
|
||||
}))
|
||||
.open_stream()
|
||||
.map_err(|e| log::error!("mic: open_stream (RECORD_AUDIO granted?): {e}"))
|
||||
.ok()?;
|
||||
// Exclusive first — MMAP-exclusive is AAudio's lowest-latency path — falling back to Shared
|
||||
// when the device refuses (no MMAP, mic claimed, …). The started-log below prints the mode
|
||||
// the device actually GRANTED (`share=`).
|
||||
let (stream, rx, free_tx) = match try_open(AudioSharingMode::Exclusive) {
|
||||
Ok(opened) => opened,
|
||||
Err(e) => {
|
||||
log::info!("mic: Exclusive open failed ({e}) — retrying Shared");
|
||||
match try_open(AudioSharingMode::Shared) {
|
||||
Ok(opened) => opened,
|
||||
Err(e) => {
|
||||
log::error!("mic: open_stream (RECORD_AUDIO granted?): {e}");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = stream.request_start() {
|
||||
log::error!("mic: request_start: {e}");
|
||||
return None;
|
||||
}
|
||||
log::info!(
|
||||
"mic: AAudio input started rate={} ch={} fmt={:?}",
|
||||
"mic: AAudio input started rate={} ch={} fmt={:?} share={:?}",
|
||||
stream.sample_rate(),
|
||||
stream.channel_count(),
|
||||
stream.format(),
|
||||
stream.sharing_mode(),
|
||||
);
|
||||
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let sd = shutdown.clone();
|
||||
let join = std::thread::Builder::new()
|
||||
.name("pf-mic".into())
|
||||
.spawn(move || encode_loop(client, rx, sd, captured))
|
||||
.spawn(move || encode_loop(client, rx, free_tx, sd, captured, dropped))
|
||||
.ok();
|
||||
|
||||
Some(MicCapture {
|
||||
@@ -109,11 +169,15 @@ impl Drop for MicCapture {
|
||||
}
|
||||
|
||||
/// Consumer: drain captured f32 → accumulate → Opus `encode_float` 20 ms stereo frames → `send_mic`.
|
||||
/// Drained chunk buffers go back to the callback's free-list; the encode scratch is reused across
|
||||
/// frames (only the packet Vec handed to `send_mic` is allocated per frame — it's sent away owned).
|
||||
fn encode_loop(
|
||||
client: Arc<NativeClient>,
|
||||
rx: Receiver<Vec<f32>>,
|
||||
free_tx: SyncSender<Vec<f32>>,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
captured: Arc<AtomicU64>,
|
||||
dropped: Arc<AtomicU64>,
|
||||
) {
|
||||
let mut enc = match opus::Encoder::new(
|
||||
SAMPLE_RATE as u32,
|
||||
@@ -130,6 +194,7 @@ fn encode_loop(
|
||||
|
||||
let frame = FRAME_SAMPLES * CHANNELS;
|
||||
let mut ring: VecDeque<f32> = VecDeque::with_capacity(frame * 4);
|
||||
let mut pcm = vec![0f32; frame]; // reusable encode scratch (one 20 ms frame)
|
||||
let mut out = vec![0u8; 4000]; // max Opus packet for a 20 ms frame fits easily
|
||||
let mut seq: u32 = 0;
|
||||
let mut sent: u64 = 0;
|
||||
@@ -137,12 +202,19 @@ fn encode_loop(
|
||||
|
||||
while !shutdown.load(Ordering::Relaxed) {
|
||||
match rx.recv_timeout(Duration::from_millis(100)) {
|
||||
Ok(chunk) => ring.extend(chunk),
|
||||
Ok(mut chunk) => {
|
||||
// `drain(..)` keeps the Vec's capacity; hand the emptied buffer back to the
|
||||
// callback's free-list (dropped only if the pool is momentarily full).
|
||||
ring.extend(chunk.drain(..));
|
||||
let _ = free_tx.try_send(chunk);
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => continue, // wake to re-check shutdown
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
while ring.len() >= frame {
|
||||
let pcm: Vec<f32> = ring.drain(..frame).collect();
|
||||
for (dst, src) in pcm.iter_mut().zip(ring.drain(..frame)) {
|
||||
*dst = src;
|
||||
}
|
||||
for &s in &pcm {
|
||||
peak = peak.max(s.abs());
|
||||
}
|
||||
@@ -157,8 +229,9 @@ fn encode_loop(
|
||||
sent += 1;
|
||||
if sent % 250 == 0 {
|
||||
log::info!(
|
||||
"mic: sent={sent} captured_frames={} peak={peak:.3}",
|
||||
"mic: sent={sent} captured_frames={} dropped_chunks={} peak={peak:.3}",
|
||||
captured.load(Ordering::Relaxed),
|
||||
dropped.load(Ordering::Relaxed),
|
||||
);
|
||||
peak = 0.0;
|
||||
}
|
||||
@@ -168,7 +241,8 @@ fn encode_loop(
|
||||
}
|
||||
}
|
||||
log::info!(
|
||||
"mic: stopped (sent={sent} captured_frames={})",
|
||||
"mic: stopped (sent={sent} captured_frames={} dropped_chunks={})",
|
||||
captured.load(Ordering::Relaxed),
|
||||
dropped.load(Ordering::Relaxed),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user