fix(apple): resolve QoS priority inversions + two Swift concurrency warnings
apple / swift (push) Successful in 55s
ci / rust (push) Successful in 1m31s
android / android (push) Successful in 1m48s
ci / web (push) Successful in 27s
ci / docs-site (push) Successful in 33s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
ci / bench (push) Successful in 1m35s
decky / build-publish (push) Successful in 11s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 2m19s
flatpak / build-publish (push) Successful in 4m2s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 5m22s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m39s
apple / swift (push) Successful in 55s
ci / rust (push) Successful in 1m31s
android / android (push) Successful in 1m48s
ci / web (push) Successful in 27s
ci / docs-site (push) Successful in 33s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
ci / bench (push) Successful in 1m35s
decky / build-publish (push) Successful in 11s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 2m19s
flatpak / build-publish (push) Successful in 4m2s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 5m22s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m39s
Priority inversions (Thread Performance Checker): the Apple client drains every plane on .userInteractive threads (video pump, audio, gamepad feedback) and connects on a .userInitiated Task, but the connector's producer threads ran at the default QoS — so a high-QoS consumer parked waiting on a lower-QoS producer. Pin the connector's producers (outer worker thread, all tokio runtime threads via on_thread_start, and the data-plane spawn_blocking pump) to .userInteractive on Apple so they match the consumers. #[cfg(target_vendor = "apple")] helper using the existing libc dep; no-op off Apple, no Swift-side change (no latency regression). GamepadFeedback.swift: the init's MainActor hop captured self implicitly-strong while the inner $active sink captured it weakly — capture [weak self] in the hop too (the sink stays weak to avoid the retain cycle). StreamPump.swift: the @Sendable pump-thread closure captured the non-Sendable AVSampleBufferDisplayLayer. enqueue/flush are documented thread-safe and only the pump thread drives it after start(), so assert that with nonisolated(unsafe). cargo build/test/clippy/fmt green (core + host); xcframework rebuilt; swift build + iOS/tvOS targets clean with both warnings gone. Runtime confirmation of the inversion warnings needs a GUI run under Xcode's Thread Performance Checker. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -173,7 +173,10 @@ public final class GamepadFeedback {
|
|||||||
|
|
||||||
public init(connection: PunktfunkConnection, manager: GamepadManager) {
|
public init(connection: PunktfunkConnection, manager: GamepadManager) {
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
Task { @MainActor in
|
// Capture self weakly in the hop too, so the inner sink's weak capture isn't shadowing
|
||||||
|
// an implicit strong one — and the subscription (stored on self) never retain-cycles.
|
||||||
|
Task { @MainActor [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
self.activeSub = manager.$active.sink { [weak self] dc in
|
self.activeSub = manager.$active.sink { [weak self] dc in
|
||||||
MainActor.assumeIsolated { self?.retarget(dc?.controller) }
|
MainActor.assumeIsolated { self?.retarget(dc?.controller) }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,10 @@ final class StreamPump {
|
|||||||
onSessionEnd: (@Sendable () -> Void)?
|
onSessionEnd: (@Sendable () -> Void)?
|
||||||
) {
|
) {
|
||||||
let token = token
|
let token = token
|
||||||
|
// The layer is non-Sendable but its enqueue/flush are documented thread-safe, and after
|
||||||
|
// this point only the pump thread drives it — assert that so the @Sendable Thread closure
|
||||||
|
// may capture it.
|
||||||
|
nonisolated(unsafe) let layer = layer
|
||||||
layer.flush() // drop any frames a previous connection left queued
|
layer.flush() // drop any frames a previous connection left queued
|
||||||
|
|
||||||
let thread = Thread {
|
let thread = Thread {
|
||||||
|
|||||||
@@ -153,6 +153,25 @@ pub struct NativeClient {
|
|||||||
pub clock_offset_ns: i64,
|
pub clock_offset_ns: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pin the calling thread to the user-interactive QoS class on Apple targets.
|
||||||
|
///
|
||||||
|
/// The Apple client drains every plane on `.userInteractive` Thread s (video pump, audio,
|
||||||
|
/// gamepad feedback) and connects on a `.userInitiated` Task. Those consumers block on the
|
||||||
|
/// std channels these worker threads feed; if the producers run at the default QoS, the
|
||||||
|
/// kernel sees a high-QoS thread parked waiting on a lower-QoS one and the Thread Performance
|
||||||
|
/// Checker flags a priority inversion. Matching the producers to the consumers' QoS removes
|
||||||
|
/// the inversion without slowing the Swift side. No-op off Apple (the Linux client/host don't
|
||||||
|
/// run a QoS scheduler, and `punktfunk-client-rs` doesn't care).
|
||||||
|
#[cfg(target_vendor = "apple")]
|
||||||
|
fn pin_thread_user_interactive() {
|
||||||
|
// SAFETY: sets only the current thread's QoS class — always valid to call.
|
||||||
|
unsafe {
|
||||||
|
libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(not(target_vendor = "apple"))]
|
||||||
|
fn pin_thread_user_interactive() {}
|
||||||
|
|
||||||
impl NativeClient {
|
impl NativeClient {
|
||||||
/// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the
|
/// Connect to a `punktfunk/1` host and start the session at (up to) `mode`. Blocks until the
|
||||||
/// handshake completes or `timeout` elapses.
|
/// handshake completes or `timeout` elapses.
|
||||||
@@ -197,8 +216,12 @@ impl NativeClient {
|
|||||||
let worker = std::thread::Builder::new()
|
let worker = std::thread::Builder::new()
|
||||||
.name("punktfunk-client".into())
|
.name("punktfunk-client".into())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
pin_thread_user_interactive(); // this thread drives the runtime + handshake
|
||||||
let rt = match tokio::runtime::Builder::new_multi_thread()
|
let rt = match tokio::runtime::Builder::new_multi_thread()
|
||||||
.worker_threads(2)
|
.worker_threads(2)
|
||||||
|
// Every runtime thread (async workers + the spawn_blocking pool that runs
|
||||||
|
// the data-plane pump) matches the Apple client's QoS — no priority inversion.
|
||||||
|
.on_thread_start(pin_thread_user_interactive)
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
{
|
{
|
||||||
@@ -839,6 +862,7 @@ async fn worker_main(args: WorkerArgs) {
|
|||||||
let pump_shutdown = shutdown.clone();
|
let pump_shutdown = shutdown.clone();
|
||||||
let pump_probe = probe.clone();
|
let pump_probe = probe.clone();
|
||||||
let _ = tokio::task::spawn_blocking(move || {
|
let _ = tokio::task::spawn_blocking(move || {
|
||||||
|
pin_thread_user_interactive(); // feeds frame_tx → the client's user-interactive video pump
|
||||||
while !pump_shutdown.load(Ordering::SeqCst) {
|
while !pump_shutdown.load(Ordering::SeqCst) {
|
||||||
match session.poll_frame() {
|
match session.poll_frame() {
|
||||||
Ok(frame) => {
|
Ok(frame) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user