diff --git a/clients/android/app/src/main/AndroidManifest.xml b/clients/android/app/src/main/AndroidManifest.xml index 4cadcfb..6edf87d 100644 --- a/clients/android/app/src/main/AndroidManifest.xml +++ b/clients/android/app/src/main/AndroidManifest.xml @@ -27,8 +27,15 @@ + *mut c_void; +type CreateSessionFn = unsafe extern "C" fn(*mut c_void, *const i32, usize, i64) -> *mut c_void; +type ReportFn = unsafe extern "C" fn(*mut c_void, i64) -> c_int; +type UpdateTargetFn = unsafe extern "C" fn(*mut c_void, i64) -> c_int; +type CloseFn = unsafe extern "C" fn(*mut c_void); + +/// The entry points we use, resolved once from `libandroid.so`, plus the process-wide manager. +struct Api { + create_session: CreateSessionFn, + report: ReportFn, + update_target: UpdateTargetFn, + close: CloseFn, + manager: *mut c_void, +} + +/// Resolve the ADPF entry points + the process manager, or `None` on API < 33 (symbols absent) or if +/// the manager is unavailable. +fn resolve_api() -> Option { + // SAFETY: `dlopen` of an always-present system library with a NUL-terminated name; it returns + // null on failure (checked below). `libandroid.so` is already mapped into every app process, so + // this only bumps its refcount — we intentionally never `dlclose` (process-lifetime handle). + let lib = unsafe { libc::dlopen(c"libandroid.so".as_ptr(), libc::RTLD_NOW) }; + if lib.is_null() { + return None; + } + // SAFETY: `dlsym` on the valid handle above with NUL-terminated symbol names; each returns null + // when the symbol is absent (device API < 33), which we check before transmuting the non-null + // pointer to its fn-pointer type (layout-compatible; a resolved symbol is a valid code address). + unsafe { + let get_manager = libc::dlsym(lib, c"APerformanceHint_getManager".as_ptr()); + let create_session = libc::dlsym(lib, c"APerformanceHint_createSession".as_ptr()); + let report = libc::dlsym(lib, c"APerformanceHint_reportActualWorkDuration".as_ptr()); + let update_target = libc::dlsym(lib, c"APerformanceHint_updateTargetWorkDuration".as_ptr()); + let close = libc::dlsym(lib, c"APerformanceHint_closeSession".as_ptr()); + if get_manager.is_null() + || create_session.is_null() + || report.is_null() + || update_target.is_null() + || close.is_null() + { + return None; // device API < 33 — no ADPF + } + let get_manager = std::mem::transmute::<*mut c_void, GetManagerFn>(get_manager); + let manager = get_manager(); + if manager.is_null() { + return None; + } + Some(Api { + create_session: std::mem::transmute::<*mut c_void, CreateSessionFn>(create_session), + report: std::mem::transmute::<*mut c_void, ReportFn>(report), + update_target: std::mem::transmute::<*mut c_void, UpdateTargetFn>(update_target), + close: std::mem::transmute::<*mut c_void, CloseFn>(close), + manager, + }) + } +} + +/// A live ADPF hint session bound to a set of thread ids. Dropping it closes the session. Holds raw +/// handles, so it is `!Send`/`!Sync` — created and used only on the `pf-decode` thread. +pub struct HintSession { + api: Api, + session: *mut c_void, +} + +impl HintSession { + /// Open a session hinting `tids` with an initial per-frame target of `target_ns` nanoseconds. + /// `None` when ADPF is unavailable (device API < 33) or the platform declines — the caller then + /// runs unhinted (a no-op, not an error). + pub fn create(target_ns: i64, tids: &[i32]) -> Option { + if target_ns <= 0 || tids.is_empty() { + return None; + } + let api = resolve_api()?; + // SAFETY: `api.manager` is the live process manager returned above; `tids` is a valid slice + // of `len` i32s that `createSession` copies; it returns null on failure (checked). + let session = + unsafe { (api.create_session)(api.manager, tids.as_ptr(), tids.len(), target_ns) }; + if session.is_null() { + return None; + } + Some(Self { api, session }) + } + + /// Report the wall-clock time the hinted thread spent producing the last displayed frame. When + /// it exceeds the session target the governor boosts the cores running the thread; when it + /// stays under, clocks may relax. No-op on a non-positive duration (the API rejects it). + pub fn report_actual(&self, actual_ns: i64) { + if actual_ns <= 0 { + return; + } + // SAFETY: `self.session` is a live session for `self`'s lifetime. + unsafe { (self.api.report)(self.session, actual_ns) }; + } + + /// Update the per-frame target (e.g. after a mid-session refresh-rate change). Unused today — + /// the decode thread restarts on renegotiation — but kept for that path. + #[allow(dead_code)] + pub fn update_target(&self, target_ns: i64) { + if target_ns <= 0 { + return; + } + // SAFETY: `self.session` is a live session for `self`'s lifetime. + unsafe { (self.api.update_target)(self.session, target_ns) }; + } +} + +impl Drop for HintSession { + fn drop(&mut self) { + // SAFETY: `self.session` was created by `createSession` and is closed exactly once, here. + unsafe { (self.api.close)(self.session) }; + } +} diff --git a/clients/android/native/src/audio.rs b/clients/android/native/src/audio.rs index e700adf..0c6b3e3 100644 --- a/clients/android/native/src/audio.rs +++ b/clients/android/native/src/audio.rs @@ -324,6 +324,10 @@ fn decode_loop( counters: Arc, channels: usize, ) { + // Fold this Opus→AAudio thread into the client's hot-thread set so the ADPF session the decode + // thread opens also keeps audio decode on a fast core (registered before the video pump's first + // frame arrives, so it's captured when that session is created). No-op below API 33. + client.register_hot_thread(); // Interleaved f32 samples per millisecond at this layout — the ring's 5 ms reserve check below. let ms = (SAMPLE_RATE as usize / 1000) * channels; // Opus decode scratch: worst-case 120 ms frame (5760 samples/ch) × channels. diff --git a/clients/android/native/src/decode.rs b/clients/android/native/src/decode.rs index 21e030c..81bc009 100644 --- a/clients/android/native/src/decode.rs +++ b/clients/android/native/src/decode.rs @@ -61,7 +61,14 @@ pub fn run( // realtime priority + the target frame rate, so vendor decoders (e.g. Qualcomm) run at full // clocks instead of a power-saving cadence that adds dequeue latency. format.set_i32("priority", 0); // 0 = realtime - format.set_i32("operating-rate", mode.refresh_hz as i32); + // Operating rate = the codec's clock hint. Setting it to the display rate merely asks the + // decoder to *sustain* that cadence — a Qualcomm decoder can meet 60/120 fps at a power-saving + // clock that adds a millisecond-plus of decode latency per frame. Setting it to the AOSP + // "unbounded" sentinel (Short.MAX) instead asks the decoder to run each frame at max clocks and + // finish ASAP, minimising per-frame decode latency — the right trade for a real-time stream + // (costs power/heat; the dial to lower if a device thermally throttles over a long session). + // Ignored where unsupported. + format.set_i32("operating-rate", i16::MAX as i32); // 32767 = "as fast as possible" // HDR static metadata (ST.2086 mastering + content light level): when an HDR session was // negotiated, set KEY_HDR_STATIC_INFO so the display tone-maps from the source's real grade. @@ -104,6 +111,25 @@ pub fn run( ); } + // ADPF: hint the platform that the whole video pipeline — this pf-decode feed/drain/present + // loop, the core's data-plane pump (UDP receive + FEC reassembly), and the audio thread — runs a + // per-frame real-time workload, so the CPU governor keeps those threads on fast cores at high + // clocks instead of down-clocking between frames or parking them on a little core. Snapdragon's + // ADPF backend responds well to this. We register this thread now but create the session lazily + // on the first presented frame: by then the pump + audio threads have registered their ids too, + // and ADPF `createSession` rejects a set with any not-yet-live/dead tid. No-op below API 33. + let frame_period_ns = if mode.refresh_hz > 0 { + 1_000_000_000i64 / mode.refresh_hz as i64 + } else { + 0 + }; + client.register_hot_thread(); // this decode thread → the pipeline's hot-thread set + let mut hint: Option = None; + let mut hint_tried = false; + // Accumulates the loop's productive (feed+drain) time between displayed frames; reported to ADPF + // once per rendered frame against the frame-period target. + let mut work_accum_ns: i64 = 0; + let mut fed: u64 = 0; let mut rendered: u64 = 0; let mut discarded: u64 = 0; @@ -154,6 +180,9 @@ pub fn run( Err(_) => break, // session closed } } + // Time the productive work (feed + drain) only — the `next_frame` poll wait above is idle + // and excluded, so ADPF sees this thread's real per-frame CPU cost, not the poll timeout. + let work_t0 = Instant::now(); if let Some(frame) = pending.take() { if feed(&codec, &frame.data, frame.pts_ns / 1000) { fed += 1; @@ -177,6 +206,36 @@ pub fn run( rendered += r; discarded += d; + // ADPF: attribute this iteration's feed+drain time to the frame being produced, and report + // the accumulated per-frame work once one is actually presented (r > 0). Under back-pressure + // the short output-dequeue wait is included in the tally — for a latency-first client, + // biasing the governor toward "boost" is the desired behaviour. Cheap when `hint` is None + // (one `Instant` diff, no report). + work_accum_ns += work_t0.elapsed().as_nanos() as i64; + if r > 0 { + if !hint_tried { + // First presented frame: the pump + audio threads have registered their ids by now. + // Build one ADPF session over the whole pipeline's thread set (empty below API 33, + // or where the platform declines → `None`, and the loop runs unhinted). + hint_tried = true; + let tids = client.hot_thread_ids(); + hint = crate::adpf::HintSession::create(frame_period_ns, &tids); + log::info!( + "decode: ADPF hint session {} — {} hot thread(s), target {frame_period_ns} ns", + if hint.is_some() { + "active" + } else { + "unavailable" + }, + tids.len(), + ); + } + if let Some(h) = &hint { + h.report_actual(work_accum_ns); + } + work_accum_ns = 0; + } + // Loss recovery: under infinite GOP the only recovery keyframe is one we request. The // reassembler drops unrecoverable AUs (frames_dropped); the decoder then conceals the // reference-missing delta frames that follow and renders them without error, so keying off diff --git a/clients/android/native/src/lib.rs b/clients/android/native/src/lib.rs index a3594b6..7efba50 100644 --- a/clients/android/native/src/lib.rs +++ b/clients/android/native/src/lib.rs @@ -25,6 +25,8 @@ use jni::objects::JObject; use jni::sys::jint; use jni::JNIEnv; +#[cfg(target_os = "android")] +mod adpf; #[cfg(target_os = "android")] mod audio; #[cfg(target_os = "android")] diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 396118a..cc5a123 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -176,6 +176,12 @@ pub struct NativeClient { /// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss /// yields reference-missing frames the decoder silently conceals (a decode-error trigger misses them). frames_dropped: Arc, + /// Kernel ids of the client's latency-critical native threads: the internal data-plane pump + /// (UDP receive + FEC reassembly) plus any embedder plane threads registered via + /// [`NativeClient::register_hot_thread`]. The Android client feeds these to an ADPF hint session + /// so the CPU governor keeps the whole video pipeline on fast cores. Empty on platforms without + /// `gettid` (see [`current_hot_tid`]). + hot_tids: Arc>>, worker: Option>, /// The currently active session mode (the Welcome's, then updated by every accepted /// [`NativeClient::request_mode`]). @@ -242,6 +248,32 @@ fn pin_thread_user_interactive() { #[cfg(not(target_vendor = "apple"))] fn pin_thread_user_interactive() {} +/// The calling thread's kernel id, for hot-thread performance hints (the Android client's ADPF +/// session today; the consumer is platform-specific). Linux/Android expose `gettid`; elsewhere +/// there's nothing to hint with, so registration is a no-op. +#[cfg(any(target_os = "android", target_os = "linux"))] +fn current_hot_tid() -> Option { + // SAFETY: `gettid` reads the calling thread's kernel id — an always-safe syscall, no args. + Some(unsafe { libc::gettid() }) +} +#[cfg(not(any(target_os = "android", target_os = "linux")))] +fn current_hot_tid() -> Option { + None +} + +/// Record the calling thread's id in the shared hot-thread registry (deduped). Best-effort: a +/// platform without `gettid` or a poisoned lock just skips it — a missed performance hint, not an +/// error on the data path. +fn register_hot_tid(reg: &Mutex>) { + if let Some(t) = current_hot_tid() { + if let Ok(mut v) = reg.lock() { + if !v.contains(&t) { + v.push(t); + } + } + } +} + impl NativeClient { /// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the /// handshake completes or `timeout` elapses. @@ -292,12 +324,14 @@ impl NativeClient { let mode_slot = Arc::new(std::sync::Mutex::new(mode)); let probe = Arc::new(Mutex::new(ProbeState::default())); let frames_dropped = Arc::new(AtomicU64::new(0)); + let hot_tids = Arc::new(Mutex::new(Vec::new())); let host = host.to_string(); let shutdown_w = shutdown.clone(); let mode_slot_w = mode_slot.clone(); let probe_w = probe.clone(); let frames_dropped_w = frames_dropped.clone(); + let hot_tids_w = hot_tids.clone(); let ctrl_tx_pump = ctrl_tx.clone(); // the data-plane pump sends adaptive-FEC LossReports let worker = std::thread::Builder::new() .name("punktfunk-client".into()) @@ -346,6 +380,7 @@ impl NativeClient { mode_slot: mode_slot_w, probe: probe_w, frames_dropped: frames_dropped_w, + hot_tids: hot_tids_w, })); }) .map_err(PunktfunkError::Io)?; @@ -385,6 +420,7 @@ impl NativeClient { shutdown, worker: Some(worker), frames_dropped, + hot_tids, mode: mode_slot, host_fingerprint: fingerprint, resolved_compositor, @@ -526,6 +562,25 @@ impl NativeClient { self.frames_dropped.load(Ordering::Relaxed) } + /// Register the calling thread as latency-critical so a later + /// [`hot_thread_ids`](Self::hot_thread_ids) includes it. An embedder calls this from its own + /// plane threads (e.g. the Android client's decode + audio threads) to fold them into the same + /// performance-hint session as the internal data-plane pump. Idempotent per thread; a no-op on + /// platforms without `gettid`. + pub fn register_hot_thread(&self) { + register_hot_tid(&self.hot_tids); + } + + /// Kernel ids of the client's latency-critical threads: the internal data-plane pump (UDP + /// receive + FEC reassembly) plus any registered via + /// [`register_hot_thread`](Self::register_hot_thread). The Android client feeds these to an ADPF + /// hint session so the CPU governor keeps the whole video pipeline on fast cores. Empty where + /// thread ids aren't available (platforms without `gettid`); call after the first frame so the + /// pump has registered. + pub fn hot_thread_ids(&self) -> Vec { + self.hot_tids.lock().map(|v| v.clone()).unwrap_or_default() + } + /// Start a bandwidth speed test: ask the host to burst filler over the data plane at /// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the /// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its @@ -723,6 +778,7 @@ struct WorkerArgs { mode_slot: Arc>, probe: Arc>, frames_dropped: Arc, + hot_tids: Arc>>, } /// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking @@ -757,6 +813,7 @@ async fn worker_main(args: WorkerArgs) { mode_slot, probe, frames_dropped, + hot_tids, } = args; let setup = async { let remote: std::net::SocketAddr = format!("{host}:{port}") @@ -1063,11 +1120,13 @@ async fn worker_main(args: WorkerArgs) { // decoder queue — it isn't video. let pump_shutdown = shutdown.clone(); let pump_probe = probe.clone(); + let pump_hot_tids = hot_tids.clone(); let _ = tokio::task::spawn_blocking(move || { pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump - // Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the - // window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can - // size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it). + register_hot_tid(&pump_hot_tids); // this thread does UDP receive + FEC reassembly — hint it + // Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the + // window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can + // size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it). const ADAPT_REPORT_INTERVAL: Duration = Duration::from_millis(750); let mut last_report = Instant::now(); let (mut last_recovered, mut last_received, mut last_dropped) = (0u64, 0u64, 0u64);