// The platform-independent heart of the presenters: one thread pulling AUs from the // connection into an AVSampleBufferDisplayLayer, with the format description refreshed // on every IDR (the host opens with an IDR carrying in-band parameter sets; recovery // keyframes re-send them — there is no out-of-band extradata, ever). Shared by the // macOS StreamLayerView and the iOS/iPadOS stream view. import AVFoundation import Foundation import os private let pumpLog = Logger(subsystem: "io.unom.punktfunk", category: "video") /// Cancellation handle owned by exactly one pump thread — a restart hands the old pump /// its own token, so it can never be revived by a newer start(). private final class PumpToken: @unchecked Sendable { private let lock = NSLock() private var live = true var isLive: Bool { lock.lock() defer { lock.unlock() } return live } func cancel() { lock.lock() live = false lock.unlock() } } /// One pump per instance; create a fresh StreamPump per start (cancel is permanent). final class StreamPump { private let token = PumpToken() /// Pump thread: pull AUs, wrap, enqueue. Non-IDR AUs before the first format /// description are dropped. `onFrame`/`onSessionEnd` fire on the pump thread. func start( connection: PunktfunkConnection, layer: AVSampleBufferDisplayLayer, onFrame: (@Sendable (AccessUnit) -> Void)?, onSessionEnd: (@Sendable () -> Void)? ) { let token = token // The layer is non-Sendable but its enqueue/flush are documented thread-safe, and after // this point only the pump thread drives it — assert that so the @Sendable Thread closure // may capture it. nonisolated(unsafe) let layer = layer layer.flush() // drop any frames a previous connection left queued let thread = Thread { var format: CMVideoFormatDescription? var lastKeyframeRequest = Date.distantPast var lastFramesDropped = connection.framesDropped() // Recovery is a persistent WANT, not a one-shot edge: set it on detected loss (or a // decoder reset), retry the throttled request EVERY iteration, and clear it only when a // fresh IDR actually re-anchors decode. The old code advanced `lastFramesDropped` on the // same edge it fired the throttled request — so a request swallowed by the throttle (a // second drop within the window, e.g. the lost recovery IDR itself being pruned) was // never re-sent: the counter went flat, the climb never re-fired, and the picture stayed // frozen for good while audio kept playing. The iPhone's lossy Wi-Fi hits this where the // Mac's Ethernet never does. var awaitingIDR = false var awaitingSince = Date.distantPast // when the current recovery began (for the resume log) var wasFailed = false // Coalesced host keyframe request. 100 ms throttle (matches the working Android path): // fast enough that a lost recovery IDR is re-requested promptly, bounded so a sustained // freeze can't flood the control stream. func requestKeyframeThrottled() { let now = Date() if now.timeIntervalSince(lastKeyframeRequest) > 0.1 { connection.requestKeyframe() lastKeyframeRequest = now } } while token.isLive { do { // Loss recovery (the primary path). Under the host's infinite GOP the only // recovery keyframe is one we request. The reassembler drops unrecoverable AUs // (framesDropped); the decoder then *conceals* the reference-missing deltas — a // frozen / garbage picture that never flips the layer to .failed — so key off the // drop count climbing, then keep asking (awaitingIDR) until an IDR lands. Polled // every iteration so a total-loss drought still recovers when packets resume. let dropped = connection.framesDropped() if dropped > lastFramesDropped { // Log only on the false→true transition (once per recovery cycle), not per // dropped AU, so heavy loss doesn't spam the log. if !awaitingIDR { awaitingSince = Date() pumpLog.notice( "video: unrecoverable drop (framesDropped=\(dropped, privacy: .public)) — requesting recovery IDR") } lastFramesDropped = dropped awaitingIDR = true } if awaitingIDR { requestKeyframeThrottled() } guard let au = try connection.nextAU(timeoutMs: 100) else { continue } onFrame?(au) let idrFormat = AnnexB.formatDescription(fromIDR: au.data) if let f = idrFormat { format = f // refreshed on every IDR (mode changes included) if awaitingIDR { let ms = Int(Date().timeIntervalSince(awaitingSince) * 1000) pumpLog.notice("video: recovery IDR received — resumed after \(ms, privacy: .public) ms") } awaitingIDR = false // a fresh IDR re-anchored decode — recovery complete } let failed = layer.status == .failed if failed { // Decode wedged hard (the cold-first-connect case — a lost/corrupt opening // IDR): flush and, unless THIS AU is the recovering IDR (re-anchored above), // re-gate on the next in-band parameter sets and keep asking — enqueuing a // delta into a failed layer can't recover it. if !wasFailed { pumpLog.warning("video: display layer .failed — flushing + re-anchoring") } layer.flush() if idrFormat == nil { format = nil awaitingIDR = true } } wasFailed = failed guard let f = format, let sample = AnnexB.sampleBuffer(au: au, format: f), token.isLive // don't enqueue a stale frame after a restart else { continue } layer.enqueue(sample) } catch { if token.isLive { onSessionEnd?() } break // session closed } } } thread.name = "punktfunk-pump" thread.qualityOfService = .userInteractive thread.start() } /// Stop pumping (≤ one poll timeout). Does not close the connection. func stop() { token.cancel() } deinit { token.cancel() } }