feat(host/windows): two-thread async NVENC retrieve (PUNKTFUNK_NVENC_ASYNC, opt-in)

The gpu-contention plan's §5.B lever: today submit and the blocking
lock_bitstream share one thread, so under a GPU-saturating game the
pipeline serializes on the WDDM scheduling wait (1000/17ms ≈ 59fps —
the depth-1 collapse; the old 'deeper pipeline just stacks latency'
result was a same-thread implementation, not a disproof). Async mode
opens the session enableEncodeAsync=1, registers an auto-reset
completion event per pool bitstream, and moves the wait+lock+copy+
unlock onto an internal retrieve thread feeding poll() through a
channel — the exact split the NVENC guide mandates. Register/map/unmap
stay on the encode thread; teardown drops the job channel, joins the
thread, THEN destroys the session. In-flight depth is bounded by
PUNKTFUNK_NVENC_ASYNC_DEPTH (default 4, hard cap POOL-1) — both for
output-buffer reuse and because NVENC encodes the capture ring's
textures in place. Idle latency cost ≈ 0 (same-tick pickup); under
contention completed frames queue instead of stalling capture.

CI-compile validated only — on-glass A/B under game load on the RTX
box still pending (box offline).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 15:21:55 +00:00
parent 49e6021ece
commit 1a483aae06
+316 -13
View File
@@ -12,6 +12,21 @@
//!
//! Needs a real NVIDIA GPU at runtime (session creation fails otherwise) — compiles GPU-less, but
//! `open`/`submit` only succeed on a GPU box. The software encoder (`super::sw`) is the fallback.
//!
//! **Two-thread async retrieve** (`PUNKTFUNK_NVENC_ASYNC=1`, opt-in until on-glass validated —
//! gpu-contention plan §5.B): the NVENC guide mandates that the main thread only *submit*
//! (`nvEncEncodePicture`) while a **secondary thread** waits on per-buffer completion events and
//! does `nvEncLockBitstream`. Today's sync mode does both on one thread, so under a GPU-saturating
//! game the whole pipeline serializes on the WDDM scheduling wait (`1000/17ms ≈ 59 fps` — the
//! depth-1 collapse). In async mode the session is opened `enableEncodeAsync=1`, each output
//! bitstream gets a registered auto-reset event, `submit` returns immediately, and an internal
//! retrieve thread waits + locks + copies + unlocks, handing finished AUs back through a channel
//! that `poll` drains without blocking. All input-resource calls (register/map/unmap) stay on the
//! encode thread; the retrieve thread touches ONLY the event + lock/unlock — the exact split the
//! guide blesses. Backpressure: `submit` blocks on the oldest completion when `POOL - 1` encodes
//! are in flight, so an output buffer is never reused mid-encode. Latency cost when idle ≈ 0 (the
//! AU completes within the same tick and `poll` picks it up); under contention completed frames
//! queue instead of stalling capture — throughput recovers up to the scheduler-granted share.
// Every `unsafe` block / impl in this file carries a `// SAFETY:` proof; enforce it.
#![deny(clippy::undocumented_unsafe_blocks)]
@@ -22,8 +37,11 @@ use anyhow::{anyhow, bail, Context, Result};
use std::collections::{HashMap, VecDeque};
use std::ffi::c_void;
use std::ptr;
use windows::core::Interface;
use std::sync::mpsc;
use windows::core::{Interface, PCWSTR};
use windows::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0};
use windows::Win32::Graphics::Direct3D11::{ID3D11Device, ID3D11Texture2D};
use windows::Win32::System::Threading::{CreateEventW, WaitForSingleObject};
use nvidia_video_codec_sdk::sys::nvEncodeAPI as nv;
use nvidia_video_codec_sdk::ENCODE_API as API;
@@ -46,6 +64,110 @@ fn codec_guid(codec: Codec) -> nv::GUID {
}
}
/// Whether the operator asked for the two-thread async retrieve (`PUNKTFUNK_NVENC_ASYNC` truthy).
/// Combined with the GPU's `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` in `init_session`. Opt-in until
/// on-glass validated; note an async-rejecting config surfaces as a failed session open — unset
/// the env in that case.
fn async_retrieve_requested() -> bool {
std::env::var("PUNKTFUNK_NVENC_ASYNC")
.map(|v| matches!(v.trim(), "1" | "true" | "yes" | "on"))
.unwrap_or(false)
}
/// Max encodes in flight in async mode (`PUNKTFUNK_NVENC_ASYNC_DEPTH`, default 4, clamped
/// `2..=POOL-1`). Two independent ceilings meet here: the output-bitstream pool (hard, `POOL-1` —
/// a buffer must never be reused mid-encode) and the capturer's texture ring (soft — NVENC encodes
/// the ring textures in place, so in-flight depth beyond the ring lets the capturer overwrite a
/// frame mid-encode: visual corruption, not UB). IDD-push rings are sized around
/// `PUNKTFUNK_IDD_DEPTH`; raise both together if deeper pipelining is needed.
fn async_inflight_cap() -> usize {
std::env::var("PUNKTFUNK_NVENC_ASYNC_DEPTH")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(4)
.clamp(2, POOL - 1)
}
/// One in-flight encode handed to the retrieve thread: the output bitstream to lock once its
/// completion `event` signals. Raw pointers travel as `usize` (the addresses are process-global
/// driver handles; the thread is joined before the session they belong to is destroyed).
struct RetrieveJob {
bs: usize,
event: usize,
}
/// A finished retrieve: the locked-and-copied AU (or the retrieve-side error) for the oldest
/// in-flight bitstream. `bs` lets the encode thread cross-check FIFO pairing with `pending`.
struct RetrieveDone {
bs: usize,
result: std::result::Result<(Vec<u8>, bool), String>,
}
/// The async-retrieve runtime: the job channel feeding the retrieve thread, the completion channel
/// back, the thread handle (joined in `teardown` BEFORE the session is destroyed), and AUs already
/// absorbed by backpressure that `poll` hands out first.
struct AsyncRetrieve {
work_tx: Option<mpsc::SyncSender<RetrieveJob>>,
done_rx: mpsc::Receiver<RetrieveDone>,
join: Option<std::thread::JoinHandle<()>>,
ready: VecDeque<EncodedFrame>,
}
/// The retrieve-thread body (gpu-contention plan §5.B): for each submitted frame, wait on its
/// completion event, lock the bitstream, copy the AU out, unlock, and send it back. Exits when the
/// job channel closes (teardown drops the sender and joins BEFORE destroying the session, so
/// `enc`/`bs`/`event` outlive every use here). Touches ONLY the event wait + lock/unlock — the
/// NVENC threading model's sanctioned secondary-thread surface.
fn retrieve_loop(
enc: usize,
work_rx: mpsc::Receiver<RetrieveJob>,
done_tx: mpsc::Sender<RetrieveDone>,
) {
crate::punktfunk1::boost_thread_priority(false);
while let Ok(job) = work_rx.recv() {
// SAFETY: `job.event` is one of the auto-reset events `init_session` created and
// registered for exactly this session, and `job.bs` one of its pool bitstreams; both stay
// valid until `teardown`, which joins this thread first. `WaitForSingleObject` takes the
// handle by value. On WAIT_OBJECT_0 the driver has completed the encode into `job.bs`, so
// `lock_bitstream` (version set, struct a live stack local for the synchronous call)
// yields a CPU-readable `bitstreamBufferPtr`/`bitstreamSizeInBytes` valid until
// `unlock_bitstream`; the slice is copied (`to_vec`) before the unlock on the same buffer.
// Lock/unlock from a secondary thread while the encode thread submits is the NVENC
// guide's documented threading model.
let result = unsafe {
if WaitForSingleObject(HANDLE(job.event as *mut c_void), 5000) != WAIT_OBJECT_0 {
Err("NVENC completion event timeout (5s) — encoder wedged?".to_string())
} else {
let mut lock = nv::NV_ENC_LOCK_BITSTREAM {
version: nv::NV_ENC_LOCK_BITSTREAM_VER,
outputBitstream: job.bs as *mut c_void,
..Default::default()
};
match (API.lock_bitstream)(enc as *mut c_void, &mut lock).result_without_string() {
Ok(()) => {
let data = std::slice::from_raw_parts(
lock.bitstreamBufferPtr as *const u8,
lock.bitstreamSizeInBytes as usize,
)
.to_vec();
let keyframe = matches!(
lock.pictureType,
nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_IDR
| nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_I
);
let _ = (API.unlock_bitstream)(enc as *mut c_void, job.bs as *mut c_void);
Ok((data, keyframe))
}
Err(e) => Err(format!("lock_bitstream (async): {e:?}")),
}
}
};
if done_tx.send(RetrieveDone { bs: job.bs, result }).is_err() {
break; // encoder side gone (teardown drains us via join)
}
}
}
pub struct NvencD3d11Encoder {
encoder: *mut c_void,
codec: Codec,
@@ -81,6 +203,13 @@ pub struct NvencD3d11Encoder {
regs: HashMap<isize, (nv::NV_ENC_REGISTERED_PTR, ID3D11Texture2D)>,
next: usize,
bitstreams: Vec<nv::NV_ENC_OUTPUT_PTR>,
/// Async mode: the registered completion event per pool bitstream (raw `HANDLE` as `usize`,
/// parallel to `bitstreams`); empty in sync mode. Unregistered + closed in `teardown`.
events: Vec<usize>,
/// Async mode: the retrieve thread + its channels (`None` = classic same-thread sync retrieve).
async_rt: Option<AsyncRetrieve>,
/// `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` from the caps probe — gates the async retrieve mode.
async_supported: bool,
/// (bitstream, mapped input resource to unmap after retrieval, pts_ns) per in-flight encode.
pending: VecDeque<(nv::NV_ENC_OUTPUT_PTR, nv::NV_ENC_INPUT_PTR, u64)>,
frame_idx: i64,
@@ -102,13 +231,16 @@ pub struct NvencD3d11Encoder {
// SAFETY: the `!Send` fields are the raw NVENC session/device handles (`encoder`, `init_device`),
// the raw NVENC bitstream/registered/mapped pointers carried in `bitstreams`/`regs`/`pending`, and
// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads. This
// encoder is owned by exactly one thread: it is moved onto the host encode thread once at
// construction, and every NVENC call and D3D11 access happens only from that thread thereafter
// (`submit`/`poll`/`invalidate_ref_frames`/`Drop` all run there, like the Linux encoder). Moving the
// handles across that single ownership-transfer boundary is sound because no NVENC/D3D11 call is in
// flight during the move and the session and its D3D11 immediate context are never shared (`&`) or
// used concurrently — so `Send` introduces no data race on the non-`Send` fields.
// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads
// EXCEPT along the NVENC guide's sanctioned split. The encoder object is owned by exactly one
// thread: it is moved onto the host encode thread once at construction, and every method
// (`submit`/`poll`/`invalidate_ref_frames`/`Drop`) runs there. In async mode the internal retrieve
// thread additionally calls `WaitForSingleObject`/`lock_bitstream`/`unlock_bitstream` on the same
// session — the exact two-thread model the NVENC API documents as thread-safe (submit-side vs
// output-side); it never touches registrations, mappings, or D3D11. `teardown` joins that thread
// BEFORE destroying the session, so no retrieve call can outlive the handles. Moving the encoder
// across its single ownership-transfer boundary is sound because no NVENC/D3D11 call is in flight
// during the move — so `Send` introduces no data race on the non-`Send` fields.
unsafe impl Send for NvencD3d11Encoder {}
impl NvencD3d11Encoder {
@@ -141,6 +273,9 @@ impl NvencD3d11Encoder {
regs: HashMap::new(),
next: 0,
bitstreams: Vec::new(),
events: Vec::new(),
async_rt: None,
async_supported: false,
pending: VecDeque::new(),
frame_idx: 0,
force_kf: false,
@@ -158,6 +293,19 @@ impl NvencD3d11Encoder {
if self.encoder.is_null() {
return;
}
// Async mode: retire the retrieve thread FIRST — drop the job sender so it finishes every
// queued job (each references the still-live session) and exits, then join. Only after the
// join is it sound to unmap/destroy anything the thread might have been touching.
if let Some(mut rt) = self.async_rt.take() {
drop(rt.work_tx.take());
if let Some(j) = rt.join.take() {
let _ = j.join();
}
// Completions the thread produced that poll() never absorbed — their AUs are dropped
// (the session is going away), but the FIFO pairing stands, so nothing extra to do
// beyond the pending unmap below.
while rt.done_rx.try_recv().is_ok() {}
}
// Unmap any in-flight inputs, then unregister every cached texture and destroy the bitstreams.
for (_, map, _) in &self.pending {
if !map.is_null() {
@@ -167,6 +315,17 @@ impl NvencD3d11Encoder {
for (reg, _tex) in self.regs.values() {
let _ = (API.unregister_resource)(self.encoder, *reg);
}
// Async events: unregister from the session, then close the Win32 handles.
for &ev in &self.events {
let mut ep = nv::NV_ENC_EVENT_PARAMS {
version: nv::NV_ENC_EVENT_PARAMS_VER,
completionEvent: ev as *mut c_void,
..Default::default()
};
let _ = (API.unregister_async_event)(self.encoder, &mut ep);
let _ = CloseHandle(HANDLE(ev as *mut c_void));
}
self.events.clear();
for &bs in &self.bitstreams {
let _ = (API.destroy_bitstream_buffer)(self.encoder, bs);
}
@@ -232,6 +391,7 @@ impl NvencD3d11Encoder {
enc,
nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_CUSTOM_VBV_BUF_SIZE,
);
let async_enc = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT);
let _ = (API.destroy_encoder)(enc);
// Reject an over-range mode with a clear message instead of an opaque InvalidParam.
@@ -259,9 +419,11 @@ impl NvencD3d11Encoder {
}
self.rfi_supported = rfi != 0;
self.custom_vbv = custom_vbv != 0;
self.async_supported = async_enc != 0;
tracing::info!(
rfi = self.rfi_supported,
custom_vbv = self.custom_vbv,
async_encode = self.async_supported,
max = %format!("{wmax}x{hmax}"),
ten_bit = ten_bit != 0,
"NVENC capabilities probed"
@@ -277,6 +439,7 @@ impl NvencD3d11Encoder {
device: &ID3D11Device,
bitrate: u64,
split_mode: u32,
enable_async: bool,
) -> Result<*mut c_void> {
let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS {
version: nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER,
@@ -441,6 +604,9 @@ impl NvencD3d11Encoder {
frameRateNum: self.fps,
frameRateDen: 1,
enablePTD: 1,
// Two-thread async retrieve (§5.B): completion events signal the retrieve thread
// instead of `lock_bitstream` blocking the submit thread.
enableEncodeAsync: enable_async as u32,
encodeConfig: &mut cfg,
..Default::default()
};
@@ -525,7 +691,12 @@ impl NvencD3d11Encoder {
// to THAT (don't undershoot — the old ×¾ step-down landed well below the real ceiling).
const CLAMP_TOL_BPS: u64 = 20_000_000; // stop bisecting within ~20 Mbps of the ceiling
let mut probe = self.try_open_session(device, requested_bps, split_mode);
// Two-thread async retrieve: operator opt-in AND the GPU reports async-encode support
// (query_caps above). Threaded into every session-open probe so the chosen session is
// built in the right mode from the start.
let use_async = self.async_supported && async_retrieve_requested();
let mut probe = self.try_open_session(device, requested_bps, split_mode, use_async);
// Disambiguate a forced-split rejection from a bitrate-cap rejection: retry once at the
// requested rate with split disabled — if THAT succeeds, split was the problem, not bitrate.
let split_forced = split_mode
@@ -533,7 +704,7 @@ impl NvencD3d11Encoder {
&& split_mode != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
if probe.is_err() && split_forced {
let no_split = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
if let Ok(e) = self.try_open_session(device, requested_bps, no_split) {
if let Ok(e) = self.try_open_session(device, requested_bps, no_split, use_async) {
tracing::warn!("NVENC: split-encode rejected by codec/config — disabled");
split_mode = no_split;
probe = Ok(e);
@@ -555,7 +726,7 @@ impl NvencD3d11Encoder {
let mut best_bps = 0u64;
while hi > lo + CLAMP_TOL_BPS {
let mid = lo + (hi - lo) / 2;
match self.try_open_session(device, mid, split_mode) {
match self.try_open_session(device, mid, split_mode, use_async) {
Ok(e) => {
if !best.is_null() {
let _ = (API.destroy_encoder)(best);
@@ -573,8 +744,10 @@ impl NvencD3d11Encoder {
let no_split =
nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
best = self
.try_open_session(device, FLOOR_BPS, split_mode)
.or_else(|_| self.try_open_session(device, FLOOR_BPS, no_split))
.try_open_session(device, FLOOR_BPS, split_mode, use_async)
.or_else(|_| {
self.try_open_session(device, FLOOR_BPS, no_split, use_async)
})
.context(
"NVENC initialize_encoder rejected even at the floor bitrate",
)?;
@@ -610,6 +783,42 @@ impl NvencD3d11Encoder {
.map_err(|e| anyhow!("create_bitstream_buffer: {e:?}"))?;
self.bitstreams.push(cb.bitstreamBuffer);
}
// Async retrieve: one auto-reset completion event per pool bitstream, registered with
// the session, plus the retrieve thread the events signal. The thread only ever sees
// raw addresses; `teardown` joins it before any of them die.
if use_async {
for _ in 0..POOL {
let ev = CreateEventW(None, false, false, PCWSTR::null())
.context("CreateEvent (NVENC completion)")?;
let mut ep = nv::NV_ENC_EVENT_PARAMS {
version: nv::NV_ENC_EVENT_PARAMS_VER,
completionEvent: ev.0,
..Default::default()
};
(API.register_async_event)(enc, &mut ep)
.result_without_string()
.map_err(|e| anyhow!("register_async_event: {e:?}"))?;
self.events.push(ev.0 as usize);
}
let (work_tx, work_rx) = mpsc::sync_channel::<RetrieveJob>(POOL);
let (done_tx, done_rx) = mpsc::channel::<RetrieveDone>();
let enc_addr = enc as usize;
let join = std::thread::Builder::new()
.name("punktfunk-nvenc-out".into())
.spawn(move || retrieve_loop(enc_addr, work_rx, done_tx))
.context("spawn NVENC retrieve thread")?;
self.async_rt = Some(AsyncRetrieve {
work_tx: Some(work_tx),
done_rx,
join: Some(join),
ready: VecDeque::new(),
});
tracing::info!(
pool = POOL,
"NVENC async retrieve active (two-thread encode: submit here, \
lock_bitstream on the retrieve thread)"
);
}
self.inited = true;
tracing::info!(
"NVENC D3D11 session: {}x{}@{} {}-bit{} {} Mbps {:?}",
@@ -624,6 +833,40 @@ impl NvencD3d11Encoder {
Ok(())
}
}
/// Fold one retrieve-thread completion back into encoder state ON THE ENCODE THREAD: pop the
/// oldest `pending` entry (completions are FIFO — one retrieve thread, in-order jobs), verify
/// the bitstream pairing, unmap the input resource, and queue the AU for `poll`. A retrieve
/// error surfaces AFTER the unmap (the resource is retired either way) so the session glue's
/// rebuild path starts from clean state.
fn absorb_done(&mut self, done: RetrieveDone) -> Result<()> {
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
bail!("NVENC async: completion with no in-flight frame (pairing bug)");
};
if bs as usize != done.bs {
bail!("NVENC async: completion out of order (pairing bug)");
}
// SAFETY: `map` is the mapped input `submit` recorded for exactly this now-completed
// encode; the session is live (`async_rt` exists only between `init_session` and
// `teardown`) and this runs on the encode thread — the single unmap here mirrors the sync
// path's poll-side unmap, exactly once per mapping.
unsafe {
if !map.is_null() {
let _ = (API.unmap_input_resource)(self.encoder, map);
}
}
let (data, keyframe) = done.result.map_err(|e| anyhow!("{e}"))?;
self.async_rt
.as_mut()
.expect("absorb_done is only reachable in async mode")
.ready
.push_back(EncodedFrame {
data,
pts_ns,
keyframe,
});
Ok(())
}
}
impl Encoder for NvencD3d11Encoder {
@@ -694,6 +937,20 @@ impl Encoder for NvencD3d11Encoder {
self.init_session(&device)?;
self.init_device = dev_raw;
}
// Async backpressure: never hand NVENC an output bitstream that is still in flight, and
// keep in-flight depth within the capturer's texture ring (see `async_inflight_cap`). At
// the cap, block on the OLDEST completion (the retrieve thread is already waiting on its
// event) before submitting more — bounding depth exactly like the sync path's per-tick
// blocking poll, just `cap` deep instead of 1.
while self.async_rt.is_some() && self.pending.len() >= async_inflight_cap() {
let done = {
let rt = self.async_rt.as_mut().expect("checked in loop condition");
rt.done_rx
.recv_timeout(std::time::Duration::from_secs(5))
.map_err(|_| anyhow!("NVENC async retrieve stalled (5s) — encoder wedged?"))?
};
self.absorb_done(done)?;
}
let slot = self.next % POOL;
self.next += 1;
// SAFETY: every NVENC call goes through a function pointer from the loaded `ENCODE_API` table
@@ -765,6 +1022,13 @@ impl Encoder for NvencD3d11Encoder {
pictureStruct: nv::NV_ENC_PIC_STRUCT::NV_ENC_PIC_STRUCT_FRAME,
inputTimeStamp: pts,
encodePicFlags: flags as u32,
// Async mode: the event the driver signals when this encode completes (the
// retrieve thread waits on it). Null in sync mode (`events` is empty).
completionEvent: self
.events
.get(slot)
.map(|&e| e as *mut c_void)
.unwrap_or(ptr::null_mut()),
..Default::default()
};
@@ -817,6 +1081,18 @@ impl Encoder for NvencD3d11Encoder {
.map_err(|e| anyhow!("encode_picture: {e:?}"))?;
self.pending
.push_back((self.bitstreams[slot], mp.mappedResource, captured.pts_ns));
// Async: hand the in-flight encode to the retrieve thread (channel capacity = POOL ≥
// in-flight, so this send never blocks). The pending entry above pairs with its
// completion FIFO in `absorb_done`.
if let Some(rt) = &self.async_rt {
let job = RetrieveJob {
bs: self.bitstreams[slot] as usize,
event: self.events[slot],
};
if rt.work_tx.as_ref().is_none_or(|tx| tx.send(job).is_err()) {
bail!("NVENC retrieve thread gone — rebuilding the session");
}
}
}
Ok(())
}
@@ -834,6 +1110,9 @@ impl Encoder for NvencD3d11Encoder {
// Reflects what the session actually configured (cleared in `query_caps` if the GPU lacks
// YUV444 encode), so the glue can confirm 4:4:4 vs the negotiated request.
chroma_444: self.chroma_444,
// The direct-NVENC path recovers via real RFI (or a forced IDR), not the Linux
// libavcodec intra-refresh mode.
intra_refresh: false,
}
}
@@ -891,6 +1170,30 @@ impl Encoder for NvencD3d11Encoder {
}
fn poll(&mut self) -> Result<Option<EncodedFrame>> {
// Async mode: drain whatever the retrieve thread has finished (non-blocking) and hand out
// the oldest ready AU. `None` = nothing completed yet — the session loop keeps the frame
// in flight and re-polls next tick, capture never blocks on the WDDM scheduling wait.
if self.async_rt.is_some() {
loop {
let done = match self
.async_rt
.as_mut()
.expect("checked just above")
.done_rx
.try_recv()
{
Ok(d) => d,
Err(_) => break,
};
self.absorb_done(done)?;
}
return Ok(self
.async_rt
.as_mut()
.expect("checked just above")
.ready
.pop_front());
}
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
return Ok(None);
};