Files
punktfunk/clients/apple/Sources/PunktfunkKit/StreamPump.swift
T
enricobuehler e99a1aea43
apple / swift (push) Successful in 55s
ci / rust (push) Successful in 1m31s
android / android (push) Successful in 1m48s
ci / web (push) Successful in 27s
ci / docs-site (push) Successful in 33s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
ci / bench (push) Successful in 1m35s
decky / build-publish (push) Successful in 11s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 3s
deb / build-publish (push) Successful in 2m19s
flatpak / build-publish (push) Successful in 4m2s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 5m22s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 4m39s
fix(apple): resolve QoS priority inversions + two Swift concurrency warnings
Priority inversions (Thread Performance Checker): the Apple client drains every
plane on .userInteractive threads (video pump, audio, gamepad feedback) and
connects on a .userInitiated Task, but the connector's producer threads ran at
the default QoS — so a high-QoS consumer parked waiting on a lower-QoS producer.
Pin the connector's producers (outer worker thread, all tokio runtime threads via
on_thread_start, and the data-plane spawn_blocking pump) to .userInteractive on
Apple so they match the consumers. #[cfg(target_vendor = "apple")] helper using
the existing libc dep; no-op off Apple, no Swift-side change (no latency
regression).

GamepadFeedback.swift: the init's MainActor hop captured self implicitly-strong
while the inner $active sink captured it weakly — capture [weak self] in the hop
too (the sink stays weak to avoid the retain cycle).

StreamPump.swift: the @Sendable pump-thread closure captured the non-Sendable
AVSampleBufferDisplayLayer. enqueue/flush are documented thread-safe and only the
pump thread drives it after start(), so assert that with nonisolated(unsafe).

cargo build/test/clippy/fmt green (core + host); xcframework rebuilt; swift build
+ iOS/tvOS targets clean with both warnings gone. Runtime confirmation of the
inversion warnings needs a GUI run under Xcode's Thread Performance Checker.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 22:48:10 +02:00

96 lines
4.0 KiB
Swift

// The platform-independent heart of the presenters: one thread pulling AUs from the
// connection into an AVSampleBufferDisplayLayer, with the format description refreshed
// on every IDR (the host opens with an IDR carrying in-band parameter sets; recovery
// keyframes re-send them there is no out-of-band extradata, ever). Shared by the
// macOS StreamLayerView and the iOS/iPadOS stream view.
import AVFoundation
import Foundation
/// Cancellation handle owned by exactly one pump thread a restart hands the old pump
/// its own token, so it can never be revived by a newer start().
private final class PumpToken: @unchecked Sendable {
private let lock = NSLock()
private var live = true
var isLive: Bool {
lock.lock()
defer { lock.unlock() }
return live
}
func cancel() {
lock.lock()
live = false
lock.unlock()
}
}
/// One pump per instance; create a fresh StreamPump per start (cancel is permanent).
final class StreamPump {
private let token = PumpToken()
/// Pump thread: pull AUs, wrap, enqueue. Non-IDR AUs before the first format
/// description are dropped. `onFrame`/`onSessionEnd` fire on the pump thread.
func start(
connection: PunktfunkConnection,
layer: AVSampleBufferDisplayLayer,
onFrame: (@Sendable (AccessUnit) -> Void)?,
onSessionEnd: (@Sendable () -> Void)?
) {
let token = token
// The layer is non-Sendable but its enqueue/flush are documented thread-safe, and after
// this point only the pump thread drives it assert that so the @Sendable Thread closure
// may capture it.
nonisolated(unsafe) let layer = layer
layer.flush() // drop any frames a previous connection left queued
let thread = Thread {
var format: CMVideoFormatDescription?
var lastKeyframeRequest = Date.distantPast
while token.isLive {
do {
guard let au = try connection.nextAU(timeoutMs: 100) else { continue }
onFrame?(au)
if let f = AnnexB.formatDescription(fromIDR: au.data) {
format = f // refreshed on every IDR (mode changes included)
}
if layer.status == .failed {
// Decode wedged: flush and re-gate on the next in-band parameter sets
// (resuming with a delta frame can't recover), AND ask the host for a
// fresh IDR. With the host's infinite GOP the next keyframe could be
// far off, so without the request the picture stays frozen the
// intermittent first-connect freeze. Throttled: the layer stays .failed
// across several polls until the IDR lands, and one request suffices.
layer.flush()
format = AnnexB.formatDescription(fromIDR: au.data)
let now = Date()
if now.timeIntervalSince(lastKeyframeRequest) > 0.25 {
connection.requestKeyframe()
lastKeyframeRequest = now
}
}
guard let f = format,
let sample = AnnexB.sampleBuffer(au: au, format: f),
token.isLive // don't enqueue a stale frame after a restart
else { continue }
layer.enqueue(sample)
} catch {
if token.isLive {
onSessionEnd?()
}
break // session closed
}
}
}
thread.name = "punktfunk-pump"
thread.qualityOfService = .userInteractive
thread.start()
}
/// Stop pumping ( one poll timeout). Does not close the connection.
func stop() {
token.cancel()
}
deinit { token.cancel() }
}