diff --git a/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift b/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift index dae9e4f..8cea563 100644 --- a/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift +++ b/clients/apple/Sources/PunktfunkKit/GamepadFeedback.swift @@ -173,7 +173,10 @@ public final class GamepadFeedback { public init(connection: PunktfunkConnection, manager: GamepadManager) { self.connection = connection - Task { @MainActor in + // Capture self weakly in the hop too, so the inner sink's weak capture isn't shadowing + // an implicit strong one — and the subscription (stored on self) never retain-cycles. + Task { @MainActor [weak self] in + guard let self else { return } self.activeSub = manager.$active.sink { [weak self] dc in MainActor.assumeIsolated { self?.retarget(dc?.controller) } } diff --git a/clients/apple/Sources/PunktfunkKit/StreamPump.swift b/clients/apple/Sources/PunktfunkKit/StreamPump.swift index 5b15efd..4204290 100644 --- a/clients/apple/Sources/PunktfunkKit/StreamPump.swift +++ b/clients/apple/Sources/PunktfunkKit/StreamPump.swift @@ -37,6 +37,10 @@ final class StreamPump { onSessionEnd: (@Sendable () -> Void)? ) { let token = token + // The layer is non-Sendable but its enqueue/flush are documented thread-safe, and after + // this point only the pump thread drives it — assert that so the @Sendable Thread closure + // may capture it. + nonisolated(unsafe) let layer = layer layer.flush() // drop any frames a previous connection left queued let thread = Thread { diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 7e2c1a6..da9450c 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -153,6 +153,25 @@ pub struct NativeClient { pub clock_offset_ns: i64, } +/// Pin the calling thread to the user-interactive QoS class on Apple targets. +/// +/// The Apple client drains every plane on `.userInteractive` Thread s (video pump, audio, +/// gamepad feedback) and connects on a `.userInitiated` Task. Those consumers block on the +/// std channels these worker threads feed; if the producers run at the default QoS, the +/// kernel sees a high-QoS thread parked waiting on a lower-QoS one and the Thread Performance +/// Checker flags a priority inversion. Matching the producers to the consumers' QoS removes +/// the inversion without slowing the Swift side. No-op off Apple (the Linux client/host don't +/// run a QoS scheduler, and `punktfunk-client-rs` doesn't care). +#[cfg(target_vendor = "apple")] +fn pin_thread_user_interactive() { + // SAFETY: sets only the current thread's QoS class — always valid to call. + unsafe { + libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, 0); + } +} +#[cfg(not(target_vendor = "apple"))] +fn pin_thread_user_interactive() {} + impl NativeClient { /// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the /// handshake completes or `timeout` elapses. @@ -197,8 +216,12 @@ impl NativeClient { let worker = std::thread::Builder::new() .name("punktfunk-client".into()) .spawn(move || { + pin_thread_user_interactive(); // this thread drives the runtime + handshake let rt = match tokio::runtime::Builder::new_multi_thread() .worker_threads(2) + // Every runtime thread (async workers + the spawn_blocking pool that runs + // the data-plane pump) matches the Apple client's QoS — no priority inversion. + .on_thread_start(pin_thread_user_interactive) .enable_all() .build() { @@ -839,6 +862,7 @@ async fn worker_main(args: WorkerArgs) { let pump_shutdown = shutdown.clone(); let pump_probe = probe.clone(); let _ = tokio::task::spawn_blocking(move || { + pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump while !pump_shutdown.load(Ordering::SeqCst) { match session.poll_frame() { Ok(frame) => {