feat(client/android): Snapdragon latency tuning — ADPF pipeline hints, game mode, max-clock decode
Three levers to lower and steady decode latency on Snapdragon (Adreno) devices:
- ADPF (Adaptive Performance Framework): a new dlsym-resolved hint session
(native/src/adpf.rs; API-33+, resolved at runtime so there's no build-time
link dependency and libpunktfunk_android.so still loads on API 31/32) tells
the CPU governor the video pipeline runs a per-frame real-time workload, so it
keeps those threads on fast cores at high clocks. It now covers all three
latency-critical threads — the pf-decode feed/drain/present loop, the core
data-plane pump (UDP receive + FEC reassembly), and the audio thread — via a
new generic hot-thread registry on NativeClient (register_hot_thread /
hot_thread_ids; the pump self-registers). The session is built lazily on the
first presented frame, since ADPF createSession rejects a set containing any
not-yet-live tid.
- operating-rate -> Short.MAX ("as fast as possible"): pushes the Qualcomm
decoder to run each frame at max clocks instead of merely sustaining the
display rate at a power-saving clock that adds per-frame decode latency.
- appCategory="game": makes the app eligible for OEM Game Mode / Game Dashboard
performance profiles.
The core registry is cross-platform (gettid on Linux/Android, a no-op
elsewhere) — no Android-specific pollution of the shared core. Host workspace +
64 core tests green; Android arm64-v8a + x86_64 (platform 31) build + clippy
clean. On-device Snapdragon validation pending.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -27,8 +27,15 @@
|
|||||||
<uses-feature android:name="android.software.leanback" android:required="false" />
|
<uses-feature android:name="android.software.leanback" android:required="false" />
|
||||||
<uses-feature android:name="android.hardware.gamepad" android:required="false" />
|
<uses-feature android:name="android.hardware.gamepad" android:required="false" />
|
||||||
|
|
||||||
|
<!-- appCategory="game": a game-streaming client IS a game as far as the SoC is concerned.
|
||||||
|
On Snapdragon devices (and other OEMs with a Game Mode / Game Dashboard) this makes the app
|
||||||
|
eligible for the vendor's game performance profile — the aggressive CPU/GPU governor and
|
||||||
|
scheduler treatment games get — which, together with the ADPF hints in the native decode
|
||||||
|
path, is what keeps clocks up for low, consistent decode latency. Also groups it correctly
|
||||||
|
under Games in battery/data usage. Advisory: devices without Game Mode ignore it. -->
|
||||||
<application
|
<application
|
||||||
android:allowBackup="false"
|
android:allowBackup="false"
|
||||||
|
android:appCategory="game"
|
||||||
android:icon="@mipmap/ic_launcher"
|
android:icon="@mipmap/ic_launcher"
|
||||||
android:roundIcon="@mipmap/ic_launcher_round"
|
android:roundIcon="@mipmap/ic_launcher_round"
|
||||||
android:label="@string/app_name"
|
android:label="@string/app_name"
|
||||||
|
|||||||
@@ -0,0 +1,137 @@
|
|||||||
|
//! Android Adaptive Performance Framework (ADPF) — CPU performance hints for the decode thread.
|
||||||
|
//!
|
||||||
|
//! ADPF lets a latency-critical app tell the platform "these threads run a repeating workload with
|
||||||
|
//! this per-cycle deadline, and here's how long they *actually* took." The kernel's CPU governor
|
||||||
|
//! (on Qualcomm Snapdragon in particular — its ADPF backend is among the most responsive) then keeps
|
||||||
|
//! those threads on the fast cores at high clocks instead of migrating them to a little core or
|
||||||
|
//! down-clocking between frames. For a stream client the win is on the in-process hot path we
|
||||||
|
//! control — the `pf-decode` feed/drain/present loop — *not* the hardware codec itself (that decodes
|
||||||
|
//! in the mediacodec service, a separate process we can't hint); keeping our loop from being
|
||||||
|
//! scheduled late directly trims the jitter between "AU received" and "buffer released to the
|
||||||
|
//! Surface." It complements the codec-side `operating-rate`/`priority` hints, which push the codec's
|
||||||
|
//! own clocks.
|
||||||
|
//!
|
||||||
|
//! The `APerformanceHint_*` API arrived in NDK **API level 33**. minSdk is 31, so we CANNOT link the
|
||||||
|
//! symbols directly: a `libpunktfunk_android.so` carrying an unresolved
|
||||||
|
//! `APerformanceHint_createSession` import fails to load on API 31/32 devices
|
||||||
|
//! (`System.loadLibrary` throws) even if the code path is never taken. Instead we resolve the
|
||||||
|
//! entry points from `libandroid.so` with `dlsym` at runtime — absent on < 33 ⇒
|
||||||
|
//! [`HintSession::create`] returns `None` and the decode loop simply runs without hints.
|
||||||
|
|
||||||
|
use std::ffi::c_void;
|
||||||
|
use std::os::raw::c_int;
|
||||||
|
|
||||||
|
// `APerformanceHint_*` function-pointer types. The manager/session handles are opaque, so we treat
|
||||||
|
// them as `*mut c_void`.
|
||||||
|
type GetManagerFn = unsafe extern "C" fn() -> *mut c_void;
|
||||||
|
type CreateSessionFn = unsafe extern "C" fn(*mut c_void, *const i32, usize, i64) -> *mut c_void;
|
||||||
|
type ReportFn = unsafe extern "C" fn(*mut c_void, i64) -> c_int;
|
||||||
|
type UpdateTargetFn = unsafe extern "C" fn(*mut c_void, i64) -> c_int;
|
||||||
|
type CloseFn = unsafe extern "C" fn(*mut c_void);
|
||||||
|
|
||||||
|
/// The entry points we use, resolved once from `libandroid.so`, plus the process-wide manager.
|
||||||
|
struct Api {
|
||||||
|
create_session: CreateSessionFn,
|
||||||
|
report: ReportFn,
|
||||||
|
update_target: UpdateTargetFn,
|
||||||
|
close: CloseFn,
|
||||||
|
manager: *mut c_void,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve the ADPF entry points + the process manager, or `None` on API < 33 (symbols absent) or if
|
||||||
|
/// the manager is unavailable.
|
||||||
|
fn resolve_api() -> Option<Api> {
|
||||||
|
// SAFETY: `dlopen` of an always-present system library with a NUL-terminated name; it returns
|
||||||
|
// null on failure (checked below). `libandroid.so` is already mapped into every app process, so
|
||||||
|
// this only bumps its refcount — we intentionally never `dlclose` (process-lifetime handle).
|
||||||
|
let lib = unsafe { libc::dlopen(c"libandroid.so".as_ptr(), libc::RTLD_NOW) };
|
||||||
|
if lib.is_null() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
// SAFETY: `dlsym` on the valid handle above with NUL-terminated symbol names; each returns null
|
||||||
|
// when the symbol is absent (device API < 33), which we check before transmuting the non-null
|
||||||
|
// pointer to its fn-pointer type (layout-compatible; a resolved symbol is a valid code address).
|
||||||
|
unsafe {
|
||||||
|
let get_manager = libc::dlsym(lib, c"APerformanceHint_getManager".as_ptr());
|
||||||
|
let create_session = libc::dlsym(lib, c"APerformanceHint_createSession".as_ptr());
|
||||||
|
let report = libc::dlsym(lib, c"APerformanceHint_reportActualWorkDuration".as_ptr());
|
||||||
|
let update_target = libc::dlsym(lib, c"APerformanceHint_updateTargetWorkDuration".as_ptr());
|
||||||
|
let close = libc::dlsym(lib, c"APerformanceHint_closeSession".as_ptr());
|
||||||
|
if get_manager.is_null()
|
||||||
|
|| create_session.is_null()
|
||||||
|
|| report.is_null()
|
||||||
|
|| update_target.is_null()
|
||||||
|
|| close.is_null()
|
||||||
|
{
|
||||||
|
return None; // device API < 33 — no ADPF
|
||||||
|
}
|
||||||
|
let get_manager = std::mem::transmute::<*mut c_void, GetManagerFn>(get_manager);
|
||||||
|
let manager = get_manager();
|
||||||
|
if manager.is_null() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(Api {
|
||||||
|
create_session: std::mem::transmute::<*mut c_void, CreateSessionFn>(create_session),
|
||||||
|
report: std::mem::transmute::<*mut c_void, ReportFn>(report),
|
||||||
|
update_target: std::mem::transmute::<*mut c_void, UpdateTargetFn>(update_target),
|
||||||
|
close: std::mem::transmute::<*mut c_void, CloseFn>(close),
|
||||||
|
manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A live ADPF hint session bound to a set of thread ids. Dropping it closes the session. Holds raw
|
||||||
|
/// handles, so it is `!Send`/`!Sync` — created and used only on the `pf-decode` thread.
|
||||||
|
pub struct HintSession {
|
||||||
|
api: Api,
|
||||||
|
session: *mut c_void,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HintSession {
|
||||||
|
/// Open a session hinting `tids` with an initial per-frame target of `target_ns` nanoseconds.
|
||||||
|
/// `None` when ADPF is unavailable (device API < 33) or the platform declines — the caller then
|
||||||
|
/// runs unhinted (a no-op, not an error).
|
||||||
|
pub fn create(target_ns: i64, tids: &[i32]) -> Option<Self> {
|
||||||
|
if target_ns <= 0 || tids.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let api = resolve_api()?;
|
||||||
|
// SAFETY: `api.manager` is the live process manager returned above; `tids` is a valid slice
|
||||||
|
// of `len` i32s that `createSession` copies; it returns null on failure (checked).
|
||||||
|
let session =
|
||||||
|
unsafe { (api.create_session)(api.manager, tids.as_ptr(), tids.len(), target_ns) };
|
||||||
|
if session.is_null() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(Self { api, session })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Report the wall-clock time the hinted thread spent producing the last displayed frame. When
|
||||||
|
/// it exceeds the session target the governor boosts the cores running the thread; when it
|
||||||
|
/// stays under, clocks may relax. No-op on a non-positive duration (the API rejects it).
|
||||||
|
pub fn report_actual(&self, actual_ns: i64) {
|
||||||
|
if actual_ns <= 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// SAFETY: `self.session` is a live session for `self`'s lifetime.
|
||||||
|
unsafe { (self.api.report)(self.session, actual_ns) };
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the per-frame target (e.g. after a mid-session refresh-rate change). Unused today —
|
||||||
|
/// the decode thread restarts on renegotiation — but kept for that path.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn update_target(&self, target_ns: i64) {
|
||||||
|
if target_ns <= 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// SAFETY: `self.session` is a live session for `self`'s lifetime.
|
||||||
|
unsafe { (self.api.update_target)(self.session, target_ns) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for HintSession {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// SAFETY: `self.session` was created by `createSession` and is closed exactly once, here.
|
||||||
|
unsafe { (self.api.close)(self.session) };
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -324,6 +324,10 @@ fn decode_loop(
|
|||||||
counters: Arc<Counters>,
|
counters: Arc<Counters>,
|
||||||
channels: usize,
|
channels: usize,
|
||||||
) {
|
) {
|
||||||
|
// Fold this Opus→AAudio thread into the client's hot-thread set so the ADPF session the decode
|
||||||
|
// thread opens also keeps audio decode on a fast core (registered before the video pump's first
|
||||||
|
// frame arrives, so it's captured when that session is created). No-op below API 33.
|
||||||
|
client.register_hot_thread();
|
||||||
// Interleaved f32 samples per millisecond at this layout — the ring's 5 ms reserve check below.
|
// Interleaved f32 samples per millisecond at this layout — the ring's 5 ms reserve check below.
|
||||||
let ms = (SAMPLE_RATE as usize / 1000) * channels;
|
let ms = (SAMPLE_RATE as usize / 1000) * channels;
|
||||||
// Opus decode scratch: worst-case 120 ms frame (5760 samples/ch) × channels.
|
// Opus decode scratch: worst-case 120 ms frame (5760 samples/ch) × channels.
|
||||||
|
|||||||
@@ -61,7 +61,14 @@ pub fn run(
|
|||||||
// realtime priority + the target frame rate, so vendor decoders (e.g. Qualcomm) run at full
|
// realtime priority + the target frame rate, so vendor decoders (e.g. Qualcomm) run at full
|
||||||
// clocks instead of a power-saving cadence that adds dequeue latency.
|
// clocks instead of a power-saving cadence that adds dequeue latency.
|
||||||
format.set_i32("priority", 0); // 0 = realtime
|
format.set_i32("priority", 0); // 0 = realtime
|
||||||
format.set_i32("operating-rate", mode.refresh_hz as i32);
|
// Operating rate = the codec's clock hint. Setting it to the display rate merely asks the
|
||||||
|
// decoder to *sustain* that cadence — a Qualcomm decoder can meet 60/120 fps at a power-saving
|
||||||
|
// clock that adds a millisecond-plus of decode latency per frame. Setting it to the AOSP
|
||||||
|
// "unbounded" sentinel (Short.MAX) instead asks the decoder to run each frame at max clocks and
|
||||||
|
// finish ASAP, minimising per-frame decode latency — the right trade for a real-time stream
|
||||||
|
// (costs power/heat; the dial to lower if a device thermally throttles over a long session).
|
||||||
|
// Ignored where unsupported.
|
||||||
|
format.set_i32("operating-rate", i16::MAX as i32); // 32767 = "as fast as possible"
|
||||||
|
|
||||||
// HDR static metadata (ST.2086 mastering + content light level): when an HDR session was
|
// HDR static metadata (ST.2086 mastering + content light level): when an HDR session was
|
||||||
// negotiated, set KEY_HDR_STATIC_INFO so the display tone-maps from the source's real grade.
|
// negotiated, set KEY_HDR_STATIC_INFO so the display tone-maps from the source's real grade.
|
||||||
@@ -104,6 +111,25 @@ pub fn run(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ADPF: hint the platform that the whole video pipeline — this pf-decode feed/drain/present
|
||||||
|
// loop, the core's data-plane pump (UDP receive + FEC reassembly), and the audio thread — runs a
|
||||||
|
// per-frame real-time workload, so the CPU governor keeps those threads on fast cores at high
|
||||||
|
// clocks instead of down-clocking between frames or parking them on a little core. Snapdragon's
|
||||||
|
// ADPF backend responds well to this. We register this thread now but create the session lazily
|
||||||
|
// on the first presented frame: by then the pump + audio threads have registered their ids too,
|
||||||
|
// and ADPF `createSession` rejects a set with any not-yet-live/dead tid. No-op below API 33.
|
||||||
|
let frame_period_ns = if mode.refresh_hz > 0 {
|
||||||
|
1_000_000_000i64 / mode.refresh_hz as i64
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
client.register_hot_thread(); // this decode thread → the pipeline's hot-thread set
|
||||||
|
let mut hint: Option<crate::adpf::HintSession> = None;
|
||||||
|
let mut hint_tried = false;
|
||||||
|
// Accumulates the loop's productive (feed+drain) time between displayed frames; reported to ADPF
|
||||||
|
// once per rendered frame against the frame-period target.
|
||||||
|
let mut work_accum_ns: i64 = 0;
|
||||||
|
|
||||||
let mut fed: u64 = 0;
|
let mut fed: u64 = 0;
|
||||||
let mut rendered: u64 = 0;
|
let mut rendered: u64 = 0;
|
||||||
let mut discarded: u64 = 0;
|
let mut discarded: u64 = 0;
|
||||||
@@ -154,6 +180,9 @@ pub fn run(
|
|||||||
Err(_) => break, // session closed
|
Err(_) => break, // session closed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Time the productive work (feed + drain) only — the `next_frame` poll wait above is idle
|
||||||
|
// and excluded, so ADPF sees this thread's real per-frame CPU cost, not the poll timeout.
|
||||||
|
let work_t0 = Instant::now();
|
||||||
if let Some(frame) = pending.take() {
|
if let Some(frame) = pending.take() {
|
||||||
if feed(&codec, &frame.data, frame.pts_ns / 1000) {
|
if feed(&codec, &frame.data, frame.pts_ns / 1000) {
|
||||||
fed += 1;
|
fed += 1;
|
||||||
@@ -177,6 +206,36 @@ pub fn run(
|
|||||||
rendered += r;
|
rendered += r;
|
||||||
discarded += d;
|
discarded += d;
|
||||||
|
|
||||||
|
// ADPF: attribute this iteration's feed+drain time to the frame being produced, and report
|
||||||
|
// the accumulated per-frame work once one is actually presented (r > 0). Under back-pressure
|
||||||
|
// the short output-dequeue wait is included in the tally — for a latency-first client,
|
||||||
|
// biasing the governor toward "boost" is the desired behaviour. Cheap when `hint` is None
|
||||||
|
// (one `Instant` diff, no report).
|
||||||
|
work_accum_ns += work_t0.elapsed().as_nanos() as i64;
|
||||||
|
if r > 0 {
|
||||||
|
if !hint_tried {
|
||||||
|
// First presented frame: the pump + audio threads have registered their ids by now.
|
||||||
|
// Build one ADPF session over the whole pipeline's thread set (empty below API 33,
|
||||||
|
// or where the platform declines → `None`, and the loop runs unhinted).
|
||||||
|
hint_tried = true;
|
||||||
|
let tids = client.hot_thread_ids();
|
||||||
|
hint = crate::adpf::HintSession::create(frame_period_ns, &tids);
|
||||||
|
log::info!(
|
||||||
|
"decode: ADPF hint session {} — {} hot thread(s), target {frame_period_ns} ns",
|
||||||
|
if hint.is_some() {
|
||||||
|
"active"
|
||||||
|
} else {
|
||||||
|
"unavailable"
|
||||||
|
},
|
||||||
|
tids.len(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(h) = &hint {
|
||||||
|
h.report_actual(work_accum_ns);
|
||||||
|
}
|
||||||
|
work_accum_ns = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Loss recovery: under infinite GOP the only recovery keyframe is one we request. The
|
// Loss recovery: under infinite GOP the only recovery keyframe is one we request. The
|
||||||
// reassembler drops unrecoverable AUs (frames_dropped); the decoder then conceals the
|
// reassembler drops unrecoverable AUs (frames_dropped); the decoder then conceals the
|
||||||
// reference-missing delta frames that follow and renders them without error, so keying off
|
// reference-missing delta frames that follow and renders them without error, so keying off
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ use jni::objects::JObject;
|
|||||||
use jni::sys::jint;
|
use jni::sys::jint;
|
||||||
use jni::JNIEnv;
|
use jni::JNIEnv;
|
||||||
|
|
||||||
|
#[cfg(target_os = "android")]
|
||||||
|
mod adpf;
|
||||||
#[cfg(target_os = "android")]
|
#[cfg(target_os = "android")]
|
||||||
mod audio;
|
mod audio;
|
||||||
#[cfg(target_os = "android")]
|
#[cfg(target_os = "android")]
|
||||||
|
|||||||
@@ -176,6 +176,12 @@ pub struct NativeClient {
|
|||||||
/// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss
|
/// a recovery keyframe under infinite GOP — the correct loss trigger, since unrecoverable loss
|
||||||
/// yields reference-missing frames the decoder silently conceals (a decode-error trigger misses them).
|
/// yields reference-missing frames the decoder silently conceals (a decode-error trigger misses them).
|
||||||
frames_dropped: Arc<AtomicU64>,
|
frames_dropped: Arc<AtomicU64>,
|
||||||
|
/// Kernel ids of the client's latency-critical native threads: the internal data-plane pump
|
||||||
|
/// (UDP receive + FEC reassembly) plus any embedder plane threads registered via
|
||||||
|
/// [`NativeClient::register_hot_thread`]. The Android client feeds these to an ADPF hint session
|
||||||
|
/// so the CPU governor keeps the whole video pipeline on fast cores. Empty on platforms without
|
||||||
|
/// `gettid` (see [`current_hot_tid`]).
|
||||||
|
hot_tids: Arc<Mutex<Vec<i32>>>,
|
||||||
worker: Option<std::thread::JoinHandle<()>>,
|
worker: Option<std::thread::JoinHandle<()>>,
|
||||||
/// The currently active session mode (the Welcome's, then updated by every accepted
|
/// The currently active session mode (the Welcome's, then updated by every accepted
|
||||||
/// [`NativeClient::request_mode`]).
|
/// [`NativeClient::request_mode`]).
|
||||||
@@ -242,6 +248,32 @@ fn pin_thread_user_interactive() {
|
|||||||
#[cfg(not(target_vendor = "apple"))]
|
#[cfg(not(target_vendor = "apple"))]
|
||||||
fn pin_thread_user_interactive() {}
|
fn pin_thread_user_interactive() {}
|
||||||
|
|
||||||
|
/// The calling thread's kernel id, for hot-thread performance hints (the Android client's ADPF
|
||||||
|
/// session today; the consumer is platform-specific). Linux/Android expose `gettid`; elsewhere
|
||||||
|
/// there's nothing to hint with, so registration is a no-op.
|
||||||
|
#[cfg(any(target_os = "android", target_os = "linux"))]
|
||||||
|
fn current_hot_tid() -> Option<i32> {
|
||||||
|
// SAFETY: `gettid` reads the calling thread's kernel id — an always-safe syscall, no args.
|
||||||
|
Some(unsafe { libc::gettid() })
|
||||||
|
}
|
||||||
|
#[cfg(not(any(target_os = "android", target_os = "linux")))]
|
||||||
|
fn current_hot_tid() -> Option<i32> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record the calling thread's id in the shared hot-thread registry (deduped). Best-effort: a
|
||||||
|
/// platform without `gettid` or a poisoned lock just skips it — a missed performance hint, not an
|
||||||
|
/// error on the data path.
|
||||||
|
fn register_hot_tid(reg: &Mutex<Vec<i32>>) {
|
||||||
|
if let Some(t) = current_hot_tid() {
|
||||||
|
if let Ok(mut v) = reg.lock() {
|
||||||
|
if !v.contains(&t) {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl NativeClient {
|
impl NativeClient {
|
||||||
/// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the
|
/// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the
|
||||||
/// handshake completes or `timeout` elapses.
|
/// handshake completes or `timeout` elapses.
|
||||||
@@ -292,12 +324,14 @@ impl NativeClient {
|
|||||||
let mode_slot = Arc::new(std::sync::Mutex::new(mode));
|
let mode_slot = Arc::new(std::sync::Mutex::new(mode));
|
||||||
let probe = Arc::new(Mutex::new(ProbeState::default()));
|
let probe = Arc::new(Mutex::new(ProbeState::default()));
|
||||||
let frames_dropped = Arc::new(AtomicU64::new(0));
|
let frames_dropped = Arc::new(AtomicU64::new(0));
|
||||||
|
let hot_tids = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
|
||||||
let host = host.to_string();
|
let host = host.to_string();
|
||||||
let shutdown_w = shutdown.clone();
|
let shutdown_w = shutdown.clone();
|
||||||
let mode_slot_w = mode_slot.clone();
|
let mode_slot_w = mode_slot.clone();
|
||||||
let probe_w = probe.clone();
|
let probe_w = probe.clone();
|
||||||
let frames_dropped_w = frames_dropped.clone();
|
let frames_dropped_w = frames_dropped.clone();
|
||||||
|
let hot_tids_w = hot_tids.clone();
|
||||||
let ctrl_tx_pump = ctrl_tx.clone(); // the data-plane pump sends adaptive-FEC LossReports
|
let ctrl_tx_pump = ctrl_tx.clone(); // the data-plane pump sends adaptive-FEC LossReports
|
||||||
let worker = std::thread::Builder::new()
|
let worker = std::thread::Builder::new()
|
||||||
.name("punktfunk-client".into())
|
.name("punktfunk-client".into())
|
||||||
@@ -346,6 +380,7 @@ impl NativeClient {
|
|||||||
mode_slot: mode_slot_w,
|
mode_slot: mode_slot_w,
|
||||||
probe: probe_w,
|
probe: probe_w,
|
||||||
frames_dropped: frames_dropped_w,
|
frames_dropped: frames_dropped_w,
|
||||||
|
hot_tids: hot_tids_w,
|
||||||
}));
|
}));
|
||||||
})
|
})
|
||||||
.map_err(PunktfunkError::Io)?;
|
.map_err(PunktfunkError::Io)?;
|
||||||
@@ -385,6 +420,7 @@ impl NativeClient {
|
|||||||
shutdown,
|
shutdown,
|
||||||
worker: Some(worker),
|
worker: Some(worker),
|
||||||
frames_dropped,
|
frames_dropped,
|
||||||
|
hot_tids,
|
||||||
mode: mode_slot,
|
mode: mode_slot,
|
||||||
host_fingerprint: fingerprint,
|
host_fingerprint: fingerprint,
|
||||||
resolved_compositor,
|
resolved_compositor,
|
||||||
@@ -526,6 +562,25 @@ impl NativeClient {
|
|||||||
self.frames_dropped.load(Ordering::Relaxed)
|
self.frames_dropped.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register the calling thread as latency-critical so a later
|
||||||
|
/// [`hot_thread_ids`](Self::hot_thread_ids) includes it. An embedder calls this from its own
|
||||||
|
/// plane threads (e.g. the Android client's decode + audio threads) to fold them into the same
|
||||||
|
/// performance-hint session as the internal data-plane pump. Idempotent per thread; a no-op on
|
||||||
|
/// platforms without `gettid`.
|
||||||
|
pub fn register_hot_thread(&self) {
|
||||||
|
register_hot_tid(&self.hot_tids);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Kernel ids of the client's latency-critical threads: the internal data-plane pump (UDP
|
||||||
|
/// receive + FEC reassembly) plus any registered via
|
||||||
|
/// [`register_hot_thread`](Self::register_hot_thread). The Android client feeds these to an ADPF
|
||||||
|
/// hint session so the CPU governor keeps the whole video pipeline on fast cores. Empty where
|
||||||
|
/// thread ids aren't available (platforms without `gettid`); call after the first frame so the
|
||||||
|
/// pump has registered.
|
||||||
|
pub fn hot_thread_ids(&self) -> Vec<i32> {
|
||||||
|
self.hot_tids.lock().map(|v| v.clone()).unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
/// Start a bandwidth speed test: ask the host to burst filler over the data plane at
|
||||||
/// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the
|
/// `target_kbps` of goodput for `duration_ms`, *briefly pausing video*. Non-blocking — the
|
||||||
/// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its
|
/// measurement accumulates in the background; poll [`NativeClient::probe_result`] until its
|
||||||
@@ -723,6 +778,7 @@ struct WorkerArgs {
|
|||||||
mode_slot: Arc<std::sync::Mutex<Mode>>,
|
mode_slot: Arc<std::sync::Mutex<Mode>>,
|
||||||
probe: Arc<Mutex<ProbeState>>,
|
probe: Arc<Mutex<ProbeState>>,
|
||||||
frames_dropped: Arc<AtomicU64>,
|
frames_dropped: Arc<AtomicU64>,
|
||||||
|
hot_tids: Arc<Mutex<Vec<i32>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking
|
/// The worker: QUIC handshake, then the input/datagram/control tasks + the blocking
|
||||||
@@ -757,6 +813,7 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
mode_slot,
|
mode_slot,
|
||||||
probe,
|
probe,
|
||||||
frames_dropped,
|
frames_dropped,
|
||||||
|
hot_tids,
|
||||||
} = args;
|
} = args;
|
||||||
let setup = async {
|
let setup = async {
|
||||||
let remote: std::net::SocketAddr = format!("{host}:{port}")
|
let remote: std::net::SocketAddr = format!("{host}:{port}")
|
||||||
@@ -1063,8 +1120,10 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
// decoder queue — it isn't video.
|
// decoder queue — it isn't video.
|
||||||
let pump_shutdown = shutdown.clone();
|
let pump_shutdown = shutdown.clone();
|
||||||
let pump_probe = probe.clone();
|
let pump_probe = probe.clone();
|
||||||
|
let pump_hot_tids = hot_tids.clone();
|
||||||
let _ = tokio::task::spawn_blocking(move || {
|
let _ = tokio::task::spawn_blocking(move || {
|
||||||
pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump
|
pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump
|
||||||
|
register_hot_tid(&pump_hot_tids); // this thread does UDP receive + FEC reassembly — hint it
|
||||||
// Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the
|
// Adaptive-FEC loss reporting: every ADAPT_REPORT_INTERVAL, report the loss observed over the
|
||||||
// window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can
|
// window (shards FEC recovered, plus a bump if any frame went unrecoverable) so the host can
|
||||||
// size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it).
|
// size FEC to the link. Suppressed during a speed test (its FLAG_PROBE filler would skew it).
|
||||||
|
|||||||
Reference in New Issue
Block a user