From e99a1aea43267384e5e6c6b68180fa17adceb507 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Mon, 15 Jun 2026 22:48:10 +0200 Subject: [PATCH] fix(apple): resolve QoS priority inversions + two Swift concurrency warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Priority inversions (Thread Performance Checker): the Apple client drains every plane on .userInteractive threads (video pump, audio, gamepad feedback) and connects on a .userInitiated Task, but the connector's producer threads ran at the default QoS — so a high-QoS consumer parked waiting on a lower-QoS producer. Pin the connector's producers (outer worker thread, all tokio runtime threads via on_thread_start, and the data-plane spawn_blocking pump) to .userInteractive on Apple so they match the consumers. #[cfg(target_vendor = "apple")] helper using the existing libc dep; no-op off Apple, no Swift-side change (no latency regression). GamepadFeedback.swift: the init's MainActor hop captured self implicitly-strong while the inner $active sink captured it weakly — capture [weak self] in the hop too (the sink stays weak to avoid the retain cycle). StreamPump.swift: the @Sendable pump-thread closure captured the non-Sendable AVSampleBufferDisplayLayer. enqueue/flush are documented thread-safe and only the pump thread drives it after start(), so assert that with nonisolated(unsafe). cargo build/test/clippy/fmt green (core + host); xcframework rebuilt; swift build + iOS/tvOS targets clean with both warnings gone. Runtime confirmation of the inversion warnings needs a GUI run under Xcode's Thread Performance Checker. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../PunktfunkKit/GamepadFeedback.swift | 5 +++- .../Sources/PunktfunkKit/StreamPump.swift | 4 ++++ crates/punktfunk-core/src/client.rs | 24 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift b/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift index dae9e4f..8cea563 100644 --- a/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift +++ b/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift @@ -173,7 +173,10 @@ public final class GamepadFeedback { public init(connection: PunktfunkConnection, manager: GamepadManager) { self.connection = connection - Task { @MainActor in + // Capture self weakly in the hop too, so the inner sink's weak capture isn't shadowing + // an implicit strong one — and the subscription (stored on self) never retain-cycles. + Task { @MainActor [weak self] in + guard let self else { return } self.activeSub = manager.$active.sink { [weak self] dc in MainActor.assumeIsolated { self?.retarget(dc?.controller) } } diff --git a/clients/apple/Sources/PunktfunkKit/StreamPump.swift b/clients/apple/Sources/PunktfunkKit/StreamPump.swift index 5b15efd..4204290 100644 --- a/clients/apple/Sources/PunktfunkKit/StreamPump.swift +++ b/clients/apple/Sources/PunktfunkKit/StreamPump.swift @@ -37,6 +37,10 @@ final class StreamPump { onSessionEnd: (@Sendable () -> Void)? ) { let token = token + // The layer is non-Sendable but its enqueue/flush are documented thread-safe, and after + // this point only the pump thread drives it — assert that so the @Sendable Thread closure + // may capture it. + nonisolated(unsafe) let layer = layer layer.flush() // drop any frames a previous connection left queued let thread = Thread { diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 7e2c1a6..da9450c 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -153,6 +153,25 @@ pub struct NativeClient { pub clock_offset_ns: i64, } +/// Pin the calling thread to the user-interactive QoS class on Apple targets. +/// +/// The Apple client drains every plane on `.userInteractive` Thread s (video pump, audio, +/// gamepad feedback) and connects on a `.userInitiated` Task. Those consumers block on the +/// std channels these worker threads feed; if the producers run at the default QoS, the +/// kernel sees a high-QoS thread parked waiting on a lower-QoS one and the Thread Performance +/// Checker flags a priority inversion. Matching the producers to the consumers' QoS removes +/// the inversion without slowing the Swift side. No-op off Apple (the Linux client/host don't +/// run a QoS scheduler, and `punktfunk-client-rs` doesn't care). +#[cfg(target_vendor = "apple")] +fn pin_thread_user_interactive() { + // SAFETY: sets only the current thread's QoS class — always valid to call. + unsafe { + libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, 0); + } +} +#[cfg(not(target_vendor = "apple"))] +fn pin_thread_user_interactive() {} + impl NativeClient { /// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the /// handshake completes or `timeout` elapses. @@ -197,8 +216,12 @@ impl NativeClient { let worker = std::thread::Builder::new() .name("punktfunk-client".into()) .spawn(move || { + pin_thread_user_interactive(); // this thread drives the runtime + handshake let rt = match tokio::runtime::Builder::new_multi_thread() .worker_threads(2) + // Every runtime thread (async workers + the spawn_blocking pool that runs + // the data-plane pump) matches the Apple client's QoS — no priority inversion. + .on_thread_start(pin_thread_user_interactive) .enable_all() .build() { @@ -839,6 +862,7 @@ async fn worker_main(args: WorkerArgs) { let pump_shutdown = shutdown.clone(); let pump_probe = probe.clone(); let _ = tokio::task::spawn_blocking(move || { + pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump while !pump_shutdown.load(Ordering::SeqCst) { match session.poll_frame() { Ok(frame) => {