From 823e0e737a4705f77bedb2aa34f65583bc8f8d5c Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Thu, 2 Jul 2026 15:21:55 +0000 Subject: [PATCH] feat(host/windows): two-thread async NVENC retrieve (PUNKTFUNK_NVENC_ASYNC, opt-in) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gpu-contention plan's §5.B lever: today submit and the blocking lock_bitstream share one thread, so under a GPU-saturating game the pipeline serializes on the WDDM scheduling wait (1000/17ms ≈ 59fps — the depth-1 collapse; the old 'deeper pipeline just stacks latency' result was a same-thread implementation, not a disproof). Async mode opens the session enableEncodeAsync=1, registers an auto-reset completion event per pool bitstream, and moves the wait+lock+copy+ unlock onto an internal retrieve thread feeding poll() through a channel — the exact split the NVENC guide mandates. Register/map/unmap stay on the encode thread; teardown drops the job channel, joins the thread, THEN destroys the session. In-flight depth is bounded by PUNKTFUNK_NVENC_ASYNC_DEPTH (default 4, hard cap POOL-1) — both for output-buffer reuse and because NVENC encodes the capture ring's textures in place. Idle latency cost ≈ 0 (same-tick pickup); under contention completed frames queue instead of stalling capture. CI-compile validated only — on-glass A/B under game load on the RTX box still pending (box offline). Co-Authored-By: Claude Fable 5 --- .../src/encode/windows/nvenc.rs | 329 +++++++++++++++++- 1 file changed, 316 insertions(+), 13 deletions(-) diff --git a/crates/punktfunk-host/src/encode/windows/nvenc.rs b/crates/punktfunk-host/src/encode/windows/nvenc.rs index b172aee..4179c0a 100644 --- a/crates/punktfunk-host/src/encode/windows/nvenc.rs +++ b/crates/punktfunk-host/src/encode/windows/nvenc.rs @@ -12,6 +12,21 @@ //! //! Needs a real NVIDIA GPU at runtime (session creation fails otherwise) — compiles GPU-less, but //! `open`/`submit` only succeed on a GPU box. The software encoder (`super::sw`) is the fallback. +//! +//! **Two-thread async retrieve** (`PUNKTFUNK_NVENC_ASYNC=1`, opt-in until on-glass validated — +//! gpu-contention plan §5.B): the NVENC guide mandates that the main thread only *submit* +//! (`nvEncEncodePicture`) while a **secondary thread** waits on per-buffer completion events and +//! does `nvEncLockBitstream`. Today's sync mode does both on one thread, so under a GPU-saturating +//! game the whole pipeline serializes on the WDDM scheduling wait (`1000/17ms ≈ 59 fps` — the +//! depth-1 collapse). In async mode the session is opened `enableEncodeAsync=1`, each output +//! bitstream gets a registered auto-reset event, `submit` returns immediately, and an internal +//! retrieve thread waits + locks + copies + unlocks, handing finished AUs back through a channel +//! that `poll` drains without blocking. All input-resource calls (register/map/unmap) stay on the +//! encode thread; the retrieve thread touches ONLY the event + lock/unlock — the exact split the +//! guide blesses. Backpressure: `submit` blocks on the oldest completion when `POOL - 1` encodes +//! are in flight, so an output buffer is never reused mid-encode. Latency cost when idle ≈ 0 (the +//! AU completes within the same tick and `poll` picks it up); under contention completed frames +//! queue instead of stalling capture — throughput recovers up to the scheduler-granted share. // Every `unsafe` block / impl in this file carries a `// SAFETY:` proof; enforce it. #![deny(clippy::undocumented_unsafe_blocks)] @@ -22,8 +37,11 @@ use anyhow::{anyhow, bail, Context, Result}; use std::collections::{HashMap, VecDeque}; use std::ffi::c_void; use std::ptr; -use windows::core::Interface; +use std::sync::mpsc; +use windows::core::{Interface, PCWSTR}; +use windows::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0}; use windows::Win32::Graphics::Direct3D11::{ID3D11Device, ID3D11Texture2D}; +use windows::Win32::System::Threading::{CreateEventW, WaitForSingleObject}; use nvidia_video_codec_sdk::sys::nvEncodeAPI as nv; use nvidia_video_codec_sdk::ENCODE_API as API; @@ -46,6 +64,110 @@ fn codec_guid(codec: Codec) -> nv::GUID { } } +/// Whether the operator asked for the two-thread async retrieve (`PUNKTFUNK_NVENC_ASYNC` truthy). +/// Combined with the GPU's `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` in `init_session`. Opt-in until +/// on-glass validated; note an async-rejecting config surfaces as a failed session open — unset +/// the env in that case. +fn async_retrieve_requested() -> bool { + std::env::var("PUNKTFUNK_NVENC_ASYNC") + .map(|v| matches!(v.trim(), "1" | "true" | "yes" | "on")) + .unwrap_or(false) +} + +/// Max encodes in flight in async mode (`PUNKTFUNK_NVENC_ASYNC_DEPTH`, default 4, clamped +/// `2..=POOL-1`). Two independent ceilings meet here: the output-bitstream pool (hard, `POOL-1` — +/// a buffer must never be reused mid-encode) and the capturer's texture ring (soft — NVENC encodes +/// the ring textures in place, so in-flight depth beyond the ring lets the capturer overwrite a +/// frame mid-encode: visual corruption, not UB). IDD-push rings are sized around +/// `PUNKTFUNK_IDD_DEPTH`; raise both together if deeper pipelining is needed. +fn async_inflight_cap() -> usize { + std::env::var("PUNKTFUNK_NVENC_ASYNC_DEPTH") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(4) + .clamp(2, POOL - 1) +} + +/// One in-flight encode handed to the retrieve thread: the output bitstream to lock once its +/// completion `event` signals. Raw pointers travel as `usize` (the addresses are process-global +/// driver handles; the thread is joined before the session they belong to is destroyed). +struct RetrieveJob { + bs: usize, + event: usize, +} + +/// A finished retrieve: the locked-and-copied AU (or the retrieve-side error) for the oldest +/// in-flight bitstream. `bs` lets the encode thread cross-check FIFO pairing with `pending`. +struct RetrieveDone { + bs: usize, + result: std::result::Result<(Vec, bool), String>, +} + +/// The async-retrieve runtime: the job channel feeding the retrieve thread, the completion channel +/// back, the thread handle (joined in `teardown` BEFORE the session is destroyed), and AUs already +/// absorbed by backpressure that `poll` hands out first. +struct AsyncRetrieve { + work_tx: Option>, + done_rx: mpsc::Receiver, + join: Option>, + ready: VecDeque, +} + +/// The retrieve-thread body (gpu-contention plan §5.B): for each submitted frame, wait on its +/// completion event, lock the bitstream, copy the AU out, unlock, and send it back. Exits when the +/// job channel closes (teardown drops the sender and joins BEFORE destroying the session, so +/// `enc`/`bs`/`event` outlive every use here). Touches ONLY the event wait + lock/unlock — the +/// NVENC threading model's sanctioned secondary-thread surface. +fn retrieve_loop( + enc: usize, + work_rx: mpsc::Receiver, + done_tx: mpsc::Sender, +) { + crate::punktfunk1::boost_thread_priority(false); + while let Ok(job) = work_rx.recv() { + // SAFETY: `job.event` is one of the auto-reset events `init_session` created and + // registered for exactly this session, and `job.bs` one of its pool bitstreams; both stay + // valid until `teardown`, which joins this thread first. `WaitForSingleObject` takes the + // handle by value. On WAIT_OBJECT_0 the driver has completed the encode into `job.bs`, so + // `lock_bitstream` (version set, struct a live stack local for the synchronous call) + // yields a CPU-readable `bitstreamBufferPtr`/`bitstreamSizeInBytes` valid until + // `unlock_bitstream`; the slice is copied (`to_vec`) before the unlock on the same buffer. + // Lock/unlock from a secondary thread while the encode thread submits is the NVENC + // guide's documented threading model. + let result = unsafe { + if WaitForSingleObject(HANDLE(job.event as *mut c_void), 5000) != WAIT_OBJECT_0 { + Err("NVENC completion event timeout (5s) — encoder wedged?".to_string()) + } else { + let mut lock = nv::NV_ENC_LOCK_BITSTREAM { + version: nv::NV_ENC_LOCK_BITSTREAM_VER, + outputBitstream: job.bs as *mut c_void, + ..Default::default() + }; + match (API.lock_bitstream)(enc as *mut c_void, &mut lock).result_without_string() { + Ok(()) => { + let data = std::slice::from_raw_parts( + lock.bitstreamBufferPtr as *const u8, + lock.bitstreamSizeInBytes as usize, + ) + .to_vec(); + let keyframe = matches!( + lock.pictureType, + nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_IDR + | nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_I + ); + let _ = (API.unlock_bitstream)(enc as *mut c_void, job.bs as *mut c_void); + Ok((data, keyframe)) + } + Err(e) => Err(format!("lock_bitstream (async): {e:?}")), + } + } + }; + if done_tx.send(RetrieveDone { bs: job.bs, result }).is_err() { + break; // encoder side gone (teardown drains us via join) + } + } +} + pub struct NvencD3d11Encoder { encoder: *mut c_void, codec: Codec, @@ -81,6 +203,13 @@ pub struct NvencD3d11Encoder { regs: HashMap, next: usize, bitstreams: Vec, + /// Async mode: the registered completion event per pool bitstream (raw `HANDLE` as `usize`, + /// parallel to `bitstreams`); empty in sync mode. Unregistered + closed in `teardown`. + events: Vec, + /// Async mode: the retrieve thread + its channels (`None` = classic same-thread sync retrieve). + async_rt: Option, + /// `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` from the caps probe — gates the async retrieve mode. + async_supported: bool, /// (bitstream, mapped input resource to unmap after retrieval, pts_ns) per in-flight encode. pending: VecDeque<(nv::NV_ENC_OUTPUT_PTR, nv::NV_ENC_INPUT_PTR, u64)>, frame_idx: i64, @@ -102,13 +231,16 @@ pub struct NvencD3d11Encoder { // SAFETY: the `!Send` fields are the raw NVENC session/device handles (`encoder`, `init_device`), // the raw NVENC bitstream/registered/mapped pointers carried in `bitstreams`/`regs`/`pending`, and -// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads. This -// encoder is owned by exactly one thread: it is moved onto the host encode thread once at -// construction, and every NVENC call and D3D11 access happens only from that thread thereafter -// (`submit`/`poll`/`invalidate_ref_frames`/`Drop` all run there, like the Linux encoder). Moving the -// handles across that single ownership-transfer boundary is sound because no NVENC/D3D11 call is in -// flight during the move and the session and its D3D11 immediate context are never shared (`&`) or -// used concurrently — so `Send` introduces no data race on the non-`Send` fields. +// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads +// EXCEPT along the NVENC guide's sanctioned split. The encoder object is owned by exactly one +// thread: it is moved onto the host encode thread once at construction, and every method +// (`submit`/`poll`/`invalidate_ref_frames`/`Drop`) runs there. In async mode the internal retrieve +// thread additionally calls `WaitForSingleObject`/`lock_bitstream`/`unlock_bitstream` on the same +// session — the exact two-thread model the NVENC API documents as thread-safe (submit-side vs +// output-side); it never touches registrations, mappings, or D3D11. `teardown` joins that thread +// BEFORE destroying the session, so no retrieve call can outlive the handles. Moving the encoder +// across its single ownership-transfer boundary is sound because no NVENC/D3D11 call is in flight +// during the move — so `Send` introduces no data race on the non-`Send` fields. unsafe impl Send for NvencD3d11Encoder {} impl NvencD3d11Encoder { @@ -141,6 +273,9 @@ impl NvencD3d11Encoder { regs: HashMap::new(), next: 0, bitstreams: Vec::new(), + events: Vec::new(), + async_rt: None, + async_supported: false, pending: VecDeque::new(), frame_idx: 0, force_kf: false, @@ -158,6 +293,19 @@ impl NvencD3d11Encoder { if self.encoder.is_null() { return; } + // Async mode: retire the retrieve thread FIRST — drop the job sender so it finishes every + // queued job (each references the still-live session) and exits, then join. Only after the + // join is it sound to unmap/destroy anything the thread might have been touching. + if let Some(mut rt) = self.async_rt.take() { + drop(rt.work_tx.take()); + if let Some(j) = rt.join.take() { + let _ = j.join(); + } + // Completions the thread produced that poll() never absorbed — their AUs are dropped + // (the session is going away), but the FIFO pairing stands, so nothing extra to do + // beyond the pending unmap below. + while rt.done_rx.try_recv().is_ok() {} + } // Unmap any in-flight inputs, then unregister every cached texture and destroy the bitstreams. for (_, map, _) in &self.pending { if !map.is_null() { @@ -167,6 +315,17 @@ impl NvencD3d11Encoder { for (reg, _tex) in self.regs.values() { let _ = (API.unregister_resource)(self.encoder, *reg); } + // Async events: unregister from the session, then close the Win32 handles. + for &ev in &self.events { + let mut ep = nv::NV_ENC_EVENT_PARAMS { + version: nv::NV_ENC_EVENT_PARAMS_VER, + completionEvent: ev as *mut c_void, + ..Default::default() + }; + let _ = (API.unregister_async_event)(self.encoder, &mut ep); + let _ = CloseHandle(HANDLE(ev as *mut c_void)); + } + self.events.clear(); for &bs in &self.bitstreams { let _ = (API.destroy_bitstream_buffer)(self.encoder, bs); } @@ -232,6 +391,7 @@ impl NvencD3d11Encoder { enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_CUSTOM_VBV_BUF_SIZE, ); + let async_enc = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT); let _ = (API.destroy_encoder)(enc); // Reject an over-range mode with a clear message instead of an opaque InvalidParam. @@ -259,9 +419,11 @@ impl NvencD3d11Encoder { } self.rfi_supported = rfi != 0; self.custom_vbv = custom_vbv != 0; + self.async_supported = async_enc != 0; tracing::info!( rfi = self.rfi_supported, custom_vbv = self.custom_vbv, + async_encode = self.async_supported, max = %format!("{wmax}x{hmax}"), ten_bit = ten_bit != 0, "NVENC capabilities probed" @@ -277,6 +439,7 @@ impl NvencD3d11Encoder { device: &ID3D11Device, bitrate: u64, split_mode: u32, + enable_async: bool, ) -> Result<*mut c_void> { let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS { version: nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER, @@ -441,6 +604,9 @@ impl NvencD3d11Encoder { frameRateNum: self.fps, frameRateDen: 1, enablePTD: 1, + // Two-thread async retrieve (§5.B): completion events signal the retrieve thread + // instead of `lock_bitstream` blocking the submit thread. + enableEncodeAsync: enable_async as u32, encodeConfig: &mut cfg, ..Default::default() }; @@ -525,7 +691,12 @@ impl NvencD3d11Encoder { // to THAT (don't undershoot — the old ×¾ step-down landed well below the real ceiling). const CLAMP_TOL_BPS: u64 = 20_000_000; // stop bisecting within ~20 Mbps of the ceiling - let mut probe = self.try_open_session(device, requested_bps, split_mode); + // Two-thread async retrieve: operator opt-in AND the GPU reports async-encode support + // (query_caps above). Threaded into every session-open probe so the chosen session is + // built in the right mode from the start. + let use_async = self.async_supported && async_retrieve_requested(); + + let mut probe = self.try_open_session(device, requested_bps, split_mode, use_async); // Disambiguate a forced-split rejection from a bitrate-cap rejection: retry once at the // requested rate with split disabled — if THAT succeeds, split was the problem, not bitrate. let split_forced = split_mode @@ -533,7 +704,7 @@ impl NvencD3d11Encoder { && split_mode != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32; if probe.is_err() && split_forced { let no_split = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32; - if let Ok(e) = self.try_open_session(device, requested_bps, no_split) { + if let Ok(e) = self.try_open_session(device, requested_bps, no_split, use_async) { tracing::warn!("NVENC: split-encode rejected by codec/config — disabled"); split_mode = no_split; probe = Ok(e); @@ -555,7 +726,7 @@ impl NvencD3d11Encoder { let mut best_bps = 0u64; while hi > lo + CLAMP_TOL_BPS { let mid = lo + (hi - lo) / 2; - match self.try_open_session(device, mid, split_mode) { + match self.try_open_session(device, mid, split_mode, use_async) { Ok(e) => { if !best.is_null() { let _ = (API.destroy_encoder)(best); @@ -573,8 +744,10 @@ impl NvencD3d11Encoder { let no_split = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32; best = self - .try_open_session(device, FLOOR_BPS, split_mode) - .or_else(|_| self.try_open_session(device, FLOOR_BPS, no_split)) + .try_open_session(device, FLOOR_BPS, split_mode, use_async) + .or_else(|_| { + self.try_open_session(device, FLOOR_BPS, no_split, use_async) + }) .context( "NVENC initialize_encoder rejected even at the floor bitrate", )?; @@ -610,6 +783,42 @@ impl NvencD3d11Encoder { .map_err(|e| anyhow!("create_bitstream_buffer: {e:?}"))?; self.bitstreams.push(cb.bitstreamBuffer); } + // Async retrieve: one auto-reset completion event per pool bitstream, registered with + // the session, plus the retrieve thread the events signal. The thread only ever sees + // raw addresses; `teardown` joins it before any of them die. + if use_async { + for _ in 0..POOL { + let ev = CreateEventW(None, false, false, PCWSTR::null()) + .context("CreateEvent (NVENC completion)")?; + let mut ep = nv::NV_ENC_EVENT_PARAMS { + version: nv::NV_ENC_EVENT_PARAMS_VER, + completionEvent: ev.0, + ..Default::default() + }; + (API.register_async_event)(enc, &mut ep) + .result_without_string() + .map_err(|e| anyhow!("register_async_event: {e:?}"))?; + self.events.push(ev.0 as usize); + } + let (work_tx, work_rx) = mpsc::sync_channel::(POOL); + let (done_tx, done_rx) = mpsc::channel::(); + let enc_addr = enc as usize; + let join = std::thread::Builder::new() + .name("punktfunk-nvenc-out".into()) + .spawn(move || retrieve_loop(enc_addr, work_rx, done_tx)) + .context("spawn NVENC retrieve thread")?; + self.async_rt = Some(AsyncRetrieve { + work_tx: Some(work_tx), + done_rx, + join: Some(join), + ready: VecDeque::new(), + }); + tracing::info!( + pool = POOL, + "NVENC async retrieve active (two-thread encode: submit here, \ + lock_bitstream on the retrieve thread)" + ); + } self.inited = true; tracing::info!( "NVENC D3D11 session: {}x{}@{} {}-bit{} {} Mbps {:?}", @@ -624,6 +833,40 @@ impl NvencD3d11Encoder { Ok(()) } } + + /// Fold one retrieve-thread completion back into encoder state ON THE ENCODE THREAD: pop the + /// oldest `pending` entry (completions are FIFO — one retrieve thread, in-order jobs), verify + /// the bitstream pairing, unmap the input resource, and queue the AU for `poll`. A retrieve + /// error surfaces AFTER the unmap (the resource is retired either way) so the session glue's + /// rebuild path starts from clean state. + fn absorb_done(&mut self, done: RetrieveDone) -> Result<()> { + let Some((bs, map, pts_ns)) = self.pending.pop_front() else { + bail!("NVENC async: completion with no in-flight frame (pairing bug)"); + }; + if bs as usize != done.bs { + bail!("NVENC async: completion out of order (pairing bug)"); + } + // SAFETY: `map` is the mapped input `submit` recorded for exactly this now-completed + // encode; the session is live (`async_rt` exists only between `init_session` and + // `teardown`) and this runs on the encode thread — the single unmap here mirrors the sync + // path's poll-side unmap, exactly once per mapping. + unsafe { + if !map.is_null() { + let _ = (API.unmap_input_resource)(self.encoder, map); + } + } + let (data, keyframe) = done.result.map_err(|e| anyhow!("{e}"))?; + self.async_rt + .as_mut() + .expect("absorb_done is only reachable in async mode") + .ready + .push_back(EncodedFrame { + data, + pts_ns, + keyframe, + }); + Ok(()) + } } impl Encoder for NvencD3d11Encoder { @@ -694,6 +937,20 @@ impl Encoder for NvencD3d11Encoder { self.init_session(&device)?; self.init_device = dev_raw; } + // Async backpressure: never hand NVENC an output bitstream that is still in flight, and + // keep in-flight depth within the capturer's texture ring (see `async_inflight_cap`). At + // the cap, block on the OLDEST completion (the retrieve thread is already waiting on its + // event) before submitting more — bounding depth exactly like the sync path's per-tick + // blocking poll, just `cap` deep instead of 1. + while self.async_rt.is_some() && self.pending.len() >= async_inflight_cap() { + let done = { + let rt = self.async_rt.as_mut().expect("checked in loop condition"); + rt.done_rx + .recv_timeout(std::time::Duration::from_secs(5)) + .map_err(|_| anyhow!("NVENC async retrieve stalled (5s) — encoder wedged?"))? + }; + self.absorb_done(done)?; + } let slot = self.next % POOL; self.next += 1; // SAFETY: every NVENC call goes through a function pointer from the loaded `ENCODE_API` table @@ -765,6 +1022,13 @@ impl Encoder for NvencD3d11Encoder { pictureStruct: nv::NV_ENC_PIC_STRUCT::NV_ENC_PIC_STRUCT_FRAME, inputTimeStamp: pts, encodePicFlags: flags as u32, + // Async mode: the event the driver signals when this encode completes (the + // retrieve thread waits on it). Null in sync mode (`events` is empty). + completionEvent: self + .events + .get(slot) + .map(|&e| e as *mut c_void) + .unwrap_or(ptr::null_mut()), ..Default::default() }; @@ -817,6 +1081,18 @@ impl Encoder for NvencD3d11Encoder { .map_err(|e| anyhow!("encode_picture: {e:?}"))?; self.pending .push_back((self.bitstreams[slot], mp.mappedResource, captured.pts_ns)); + // Async: hand the in-flight encode to the retrieve thread (channel capacity = POOL ≥ + // in-flight, so this send never blocks). The pending entry above pairs with its + // completion FIFO in `absorb_done`. + if let Some(rt) = &self.async_rt { + let job = RetrieveJob { + bs: self.bitstreams[slot] as usize, + event: self.events[slot], + }; + if rt.work_tx.as_ref().is_none_or(|tx| tx.send(job).is_err()) { + bail!("NVENC retrieve thread gone — rebuilding the session"); + } + } } Ok(()) } @@ -834,6 +1110,9 @@ impl Encoder for NvencD3d11Encoder { // Reflects what the session actually configured (cleared in `query_caps` if the GPU lacks // YUV444 encode), so the glue can confirm 4:4:4 vs the negotiated request. chroma_444: self.chroma_444, + // The direct-NVENC path recovers via real RFI (or a forced IDR), not the Linux + // libavcodec intra-refresh mode. + intra_refresh: false, } } @@ -891,6 +1170,30 @@ impl Encoder for NvencD3d11Encoder { } fn poll(&mut self) -> Result> { + // Async mode: drain whatever the retrieve thread has finished (non-blocking) and hand out + // the oldest ready AU. `None` = nothing completed yet — the session loop keeps the frame + // in flight and re-polls next tick, capture never blocks on the WDDM scheduling wait. + if self.async_rt.is_some() { + loop { + let done = match self + .async_rt + .as_mut() + .expect("checked just above") + .done_rx + .try_recv() + { + Ok(d) => d, + Err(_) => break, + }; + self.absorb_done(done)?; + } + return Ok(self + .async_rt + .as_mut() + .expect("checked just above") + .ready + .pop_front()); + } let Some((bs, map, pts_ns)) = self.pending.pop_front() else { return Ok(None); };