feat(host/windows): two-thread async NVENC retrieve (PUNKTFUNK_NVENC_ASYNC, opt-in)
The gpu-contention plan's §5.B lever: today submit and the blocking lock_bitstream share one thread, so under a GPU-saturating game the pipeline serializes on the WDDM scheduling wait (1000/17ms ≈ 59fps — the depth-1 collapse; the old 'deeper pipeline just stacks latency' result was a same-thread implementation, not a disproof). Async mode opens the session enableEncodeAsync=1, registers an auto-reset completion event per pool bitstream, and moves the wait+lock+copy+ unlock onto an internal retrieve thread feeding poll() through a channel — the exact split the NVENC guide mandates. Register/map/unmap stay on the encode thread; teardown drops the job channel, joins the thread, THEN destroys the session. In-flight depth is bounded by PUNKTFUNK_NVENC_ASYNC_DEPTH (default 4, hard cap POOL-1) — both for output-buffer reuse and because NVENC encodes the capture ring's textures in place. Idle latency cost ≈ 0 (same-tick pickup); under contention completed frames queue instead of stalling capture. CI-compile validated only — on-glass A/B under game load on the RTX box still pending (box offline). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,21 @@
|
|||||||
//!
|
//!
|
||||||
//! Needs a real NVIDIA GPU at runtime (session creation fails otherwise) — compiles GPU-less, but
|
//! Needs a real NVIDIA GPU at runtime (session creation fails otherwise) — compiles GPU-less, but
|
||||||
//! `open`/`submit` only succeed on a GPU box. The software encoder (`super::sw`) is the fallback.
|
//! `open`/`submit` only succeed on a GPU box. The software encoder (`super::sw`) is the fallback.
|
||||||
|
//!
|
||||||
|
//! **Two-thread async retrieve** (`PUNKTFUNK_NVENC_ASYNC=1`, opt-in until on-glass validated —
|
||||||
|
//! gpu-contention plan §5.B): the NVENC guide mandates that the main thread only *submit*
|
||||||
|
//! (`nvEncEncodePicture`) while a **secondary thread** waits on per-buffer completion events and
|
||||||
|
//! does `nvEncLockBitstream`. Today's sync mode does both on one thread, so under a GPU-saturating
|
||||||
|
//! game the whole pipeline serializes on the WDDM scheduling wait (`1000/17ms ≈ 59 fps` — the
|
||||||
|
//! depth-1 collapse). In async mode the session is opened `enableEncodeAsync=1`, each output
|
||||||
|
//! bitstream gets a registered auto-reset event, `submit` returns immediately, and an internal
|
||||||
|
//! retrieve thread waits + locks + copies + unlocks, handing finished AUs back through a channel
|
||||||
|
//! that `poll` drains without blocking. All input-resource calls (register/map/unmap) stay on the
|
||||||
|
//! encode thread; the retrieve thread touches ONLY the event + lock/unlock — the exact split the
|
||||||
|
//! guide blesses. Backpressure: `submit` blocks on the oldest completion when `POOL - 1` encodes
|
||||||
|
//! are in flight, so an output buffer is never reused mid-encode. Latency cost when idle ≈ 0 (the
|
||||||
|
//! AU completes within the same tick and `poll` picks it up); under contention completed frames
|
||||||
|
//! queue instead of stalling capture — throughput recovers up to the scheduler-granted share.
|
||||||
|
|
||||||
// Every `unsafe` block / impl in this file carries a `// SAFETY:` proof; enforce it.
|
// Every `unsafe` block / impl in this file carries a `// SAFETY:` proof; enforce it.
|
||||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||||
@@ -22,8 +37,11 @@ use anyhow::{anyhow, bail, Context, Result};
|
|||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::ffi::c_void;
|
use std::ffi::c_void;
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use windows::core::Interface;
|
use std::sync::mpsc;
|
||||||
|
use windows::core::{Interface, PCWSTR};
|
||||||
|
use windows::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0};
|
||||||
use windows::Win32::Graphics::Direct3D11::{ID3D11Device, ID3D11Texture2D};
|
use windows::Win32::Graphics::Direct3D11::{ID3D11Device, ID3D11Texture2D};
|
||||||
|
use windows::Win32::System::Threading::{CreateEventW, WaitForSingleObject};
|
||||||
|
|
||||||
use nvidia_video_codec_sdk::sys::nvEncodeAPI as nv;
|
use nvidia_video_codec_sdk::sys::nvEncodeAPI as nv;
|
||||||
use nvidia_video_codec_sdk::ENCODE_API as API;
|
use nvidia_video_codec_sdk::ENCODE_API as API;
|
||||||
@@ -46,6 +64,110 @@ fn codec_guid(codec: Codec) -> nv::GUID {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Whether the operator asked for the two-thread async retrieve (`PUNKTFUNK_NVENC_ASYNC` truthy).
|
||||||
|
/// Combined with the GPU's `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` in `init_session`. Opt-in until
|
||||||
|
/// on-glass validated; note an async-rejecting config surfaces as a failed session open — unset
|
||||||
|
/// the env in that case.
|
||||||
|
fn async_retrieve_requested() -> bool {
|
||||||
|
std::env::var("PUNKTFUNK_NVENC_ASYNC")
|
||||||
|
.map(|v| matches!(v.trim(), "1" | "true" | "yes" | "on"))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Max encodes in flight in async mode (`PUNKTFUNK_NVENC_ASYNC_DEPTH`, default 4, clamped
|
||||||
|
/// `2..=POOL-1`). Two independent ceilings meet here: the output-bitstream pool (hard, `POOL-1` —
|
||||||
|
/// a buffer must never be reused mid-encode) and the capturer's texture ring (soft — NVENC encodes
|
||||||
|
/// the ring textures in place, so in-flight depth beyond the ring lets the capturer overwrite a
|
||||||
|
/// frame mid-encode: visual corruption, not UB). IDD-push rings are sized around
|
||||||
|
/// `PUNKTFUNK_IDD_DEPTH`; raise both together if deeper pipelining is needed.
|
||||||
|
fn async_inflight_cap() -> usize {
|
||||||
|
std::env::var("PUNKTFUNK_NVENC_ASYNC_DEPTH")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse::<usize>().ok())
|
||||||
|
.unwrap_or(4)
|
||||||
|
.clamp(2, POOL - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One in-flight encode handed to the retrieve thread: the output bitstream to lock once its
|
||||||
|
/// completion `event` signals. Raw pointers travel as `usize` (the addresses are process-global
|
||||||
|
/// driver handles; the thread is joined before the session they belong to is destroyed).
|
||||||
|
struct RetrieveJob {
|
||||||
|
bs: usize,
|
||||||
|
event: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A finished retrieve: the locked-and-copied AU (or the retrieve-side error) for the oldest
|
||||||
|
/// in-flight bitstream. `bs` lets the encode thread cross-check FIFO pairing with `pending`.
|
||||||
|
struct RetrieveDone {
|
||||||
|
bs: usize,
|
||||||
|
result: std::result::Result<(Vec<u8>, bool), String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The async-retrieve runtime: the job channel feeding the retrieve thread, the completion channel
|
||||||
|
/// back, the thread handle (joined in `teardown` BEFORE the session is destroyed), and AUs already
|
||||||
|
/// absorbed by backpressure that `poll` hands out first.
|
||||||
|
struct AsyncRetrieve {
|
||||||
|
work_tx: Option<mpsc::SyncSender<RetrieveJob>>,
|
||||||
|
done_rx: mpsc::Receiver<RetrieveDone>,
|
||||||
|
join: Option<std::thread::JoinHandle<()>>,
|
||||||
|
ready: VecDeque<EncodedFrame>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The retrieve-thread body (gpu-contention plan §5.B): for each submitted frame, wait on its
|
||||||
|
/// completion event, lock the bitstream, copy the AU out, unlock, and send it back. Exits when the
|
||||||
|
/// job channel closes (teardown drops the sender and joins BEFORE destroying the session, so
|
||||||
|
/// `enc`/`bs`/`event` outlive every use here). Touches ONLY the event wait + lock/unlock — the
|
||||||
|
/// NVENC threading model's sanctioned secondary-thread surface.
|
||||||
|
fn retrieve_loop(
|
||||||
|
enc: usize,
|
||||||
|
work_rx: mpsc::Receiver<RetrieveJob>,
|
||||||
|
done_tx: mpsc::Sender<RetrieveDone>,
|
||||||
|
) {
|
||||||
|
crate::punktfunk1::boost_thread_priority(false);
|
||||||
|
while let Ok(job) = work_rx.recv() {
|
||||||
|
// SAFETY: `job.event` is one of the auto-reset events `init_session` created and
|
||||||
|
// registered for exactly this session, and `job.bs` one of its pool bitstreams; both stay
|
||||||
|
// valid until `teardown`, which joins this thread first. `WaitForSingleObject` takes the
|
||||||
|
// handle by value. On WAIT_OBJECT_0 the driver has completed the encode into `job.bs`, so
|
||||||
|
// `lock_bitstream` (version set, struct a live stack local for the synchronous call)
|
||||||
|
// yields a CPU-readable `bitstreamBufferPtr`/`bitstreamSizeInBytes` valid until
|
||||||
|
// `unlock_bitstream`; the slice is copied (`to_vec`) before the unlock on the same buffer.
|
||||||
|
// Lock/unlock from a secondary thread while the encode thread submits is the NVENC
|
||||||
|
// guide's documented threading model.
|
||||||
|
let result = unsafe {
|
||||||
|
if WaitForSingleObject(HANDLE(job.event as *mut c_void), 5000) != WAIT_OBJECT_0 {
|
||||||
|
Err("NVENC completion event timeout (5s) — encoder wedged?".to_string())
|
||||||
|
} else {
|
||||||
|
let mut lock = nv::NV_ENC_LOCK_BITSTREAM {
|
||||||
|
version: nv::NV_ENC_LOCK_BITSTREAM_VER,
|
||||||
|
outputBitstream: job.bs as *mut c_void,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
match (API.lock_bitstream)(enc as *mut c_void, &mut lock).result_without_string() {
|
||||||
|
Ok(()) => {
|
||||||
|
let data = std::slice::from_raw_parts(
|
||||||
|
lock.bitstreamBufferPtr as *const u8,
|
||||||
|
lock.bitstreamSizeInBytes as usize,
|
||||||
|
)
|
||||||
|
.to_vec();
|
||||||
|
let keyframe = matches!(
|
||||||
|
lock.pictureType,
|
||||||
|
nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_IDR
|
||||||
|
| nv::NV_ENC_PIC_TYPE::NV_ENC_PIC_TYPE_I
|
||||||
|
);
|
||||||
|
let _ = (API.unlock_bitstream)(enc as *mut c_void, job.bs as *mut c_void);
|
||||||
|
Ok((data, keyframe))
|
||||||
|
}
|
||||||
|
Err(e) => Err(format!("lock_bitstream (async): {e:?}")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if done_tx.send(RetrieveDone { bs: job.bs, result }).is_err() {
|
||||||
|
break; // encoder side gone (teardown drains us via join)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct NvencD3d11Encoder {
|
pub struct NvencD3d11Encoder {
|
||||||
encoder: *mut c_void,
|
encoder: *mut c_void,
|
||||||
codec: Codec,
|
codec: Codec,
|
||||||
@@ -81,6 +203,13 @@ pub struct NvencD3d11Encoder {
|
|||||||
regs: HashMap<isize, (nv::NV_ENC_REGISTERED_PTR, ID3D11Texture2D)>,
|
regs: HashMap<isize, (nv::NV_ENC_REGISTERED_PTR, ID3D11Texture2D)>,
|
||||||
next: usize,
|
next: usize,
|
||||||
bitstreams: Vec<nv::NV_ENC_OUTPUT_PTR>,
|
bitstreams: Vec<nv::NV_ENC_OUTPUT_PTR>,
|
||||||
|
/// Async mode: the registered completion event per pool bitstream (raw `HANDLE` as `usize`,
|
||||||
|
/// parallel to `bitstreams`); empty in sync mode. Unregistered + closed in `teardown`.
|
||||||
|
events: Vec<usize>,
|
||||||
|
/// Async mode: the retrieve thread + its channels (`None` = classic same-thread sync retrieve).
|
||||||
|
async_rt: Option<AsyncRetrieve>,
|
||||||
|
/// `NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT` from the caps probe — gates the async retrieve mode.
|
||||||
|
async_supported: bool,
|
||||||
/// (bitstream, mapped input resource to unmap after retrieval, pts_ns) per in-flight encode.
|
/// (bitstream, mapped input resource to unmap after retrieval, pts_ns) per in-flight encode.
|
||||||
pending: VecDeque<(nv::NV_ENC_OUTPUT_PTR, nv::NV_ENC_INPUT_PTR, u64)>,
|
pending: VecDeque<(nv::NV_ENC_OUTPUT_PTR, nv::NV_ENC_INPUT_PTR, u64)>,
|
||||||
frame_idx: i64,
|
frame_idx: i64,
|
||||||
@@ -102,13 +231,16 @@ pub struct NvencD3d11Encoder {
|
|||||||
|
|
||||||
// SAFETY: the `!Send` fields are the raw NVENC session/device handles (`encoder`, `init_device`),
|
// SAFETY: the `!Send` fields are the raw NVENC session/device handles (`encoder`, `init_device`),
|
||||||
// the raw NVENC bitstream/registered/mapped pointers carried in `bitstreams`/`regs`/`pending`, and
|
// the raw NVENC bitstream/registered/mapped pointers carried in `bitstreams`/`regs`/`pending`, and
|
||||||
// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads. This
|
// the `ID3D11Texture2D` COM refs — none of which may be touched concurrently from two threads
|
||||||
// encoder is owned by exactly one thread: it is moved onto the host encode thread once at
|
// EXCEPT along the NVENC guide's sanctioned split. The encoder object is owned by exactly one
|
||||||
// construction, and every NVENC call and D3D11 access happens only from that thread thereafter
|
// thread: it is moved onto the host encode thread once at construction, and every method
|
||||||
// (`submit`/`poll`/`invalidate_ref_frames`/`Drop` all run there, like the Linux encoder). Moving the
|
// (`submit`/`poll`/`invalidate_ref_frames`/`Drop`) runs there. In async mode the internal retrieve
|
||||||
// handles across that single ownership-transfer boundary is sound because no NVENC/D3D11 call is in
|
// thread additionally calls `WaitForSingleObject`/`lock_bitstream`/`unlock_bitstream` on the same
|
||||||
// flight during the move and the session and its D3D11 immediate context are never shared (`&`) or
|
// session — the exact two-thread model the NVENC API documents as thread-safe (submit-side vs
|
||||||
// used concurrently — so `Send` introduces no data race on the non-`Send` fields.
|
// output-side); it never touches registrations, mappings, or D3D11. `teardown` joins that thread
|
||||||
|
// BEFORE destroying the session, so no retrieve call can outlive the handles. Moving the encoder
|
||||||
|
// across its single ownership-transfer boundary is sound because no NVENC/D3D11 call is in flight
|
||||||
|
// during the move — so `Send` introduces no data race on the non-`Send` fields.
|
||||||
unsafe impl Send for NvencD3d11Encoder {}
|
unsafe impl Send for NvencD3d11Encoder {}
|
||||||
|
|
||||||
impl NvencD3d11Encoder {
|
impl NvencD3d11Encoder {
|
||||||
@@ -141,6 +273,9 @@ impl NvencD3d11Encoder {
|
|||||||
regs: HashMap::new(),
|
regs: HashMap::new(),
|
||||||
next: 0,
|
next: 0,
|
||||||
bitstreams: Vec::new(),
|
bitstreams: Vec::new(),
|
||||||
|
events: Vec::new(),
|
||||||
|
async_rt: None,
|
||||||
|
async_supported: false,
|
||||||
pending: VecDeque::new(),
|
pending: VecDeque::new(),
|
||||||
frame_idx: 0,
|
frame_idx: 0,
|
||||||
force_kf: false,
|
force_kf: false,
|
||||||
@@ -158,6 +293,19 @@ impl NvencD3d11Encoder {
|
|||||||
if self.encoder.is_null() {
|
if self.encoder.is_null() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Async mode: retire the retrieve thread FIRST — drop the job sender so it finishes every
|
||||||
|
// queued job (each references the still-live session) and exits, then join. Only after the
|
||||||
|
// join is it sound to unmap/destroy anything the thread might have been touching.
|
||||||
|
if let Some(mut rt) = self.async_rt.take() {
|
||||||
|
drop(rt.work_tx.take());
|
||||||
|
if let Some(j) = rt.join.take() {
|
||||||
|
let _ = j.join();
|
||||||
|
}
|
||||||
|
// Completions the thread produced that poll() never absorbed — their AUs are dropped
|
||||||
|
// (the session is going away), but the FIFO pairing stands, so nothing extra to do
|
||||||
|
// beyond the pending unmap below.
|
||||||
|
while rt.done_rx.try_recv().is_ok() {}
|
||||||
|
}
|
||||||
// Unmap any in-flight inputs, then unregister every cached texture and destroy the bitstreams.
|
// Unmap any in-flight inputs, then unregister every cached texture and destroy the bitstreams.
|
||||||
for (_, map, _) in &self.pending {
|
for (_, map, _) in &self.pending {
|
||||||
if !map.is_null() {
|
if !map.is_null() {
|
||||||
@@ -167,6 +315,17 @@ impl NvencD3d11Encoder {
|
|||||||
for (reg, _tex) in self.regs.values() {
|
for (reg, _tex) in self.regs.values() {
|
||||||
let _ = (API.unregister_resource)(self.encoder, *reg);
|
let _ = (API.unregister_resource)(self.encoder, *reg);
|
||||||
}
|
}
|
||||||
|
// Async events: unregister from the session, then close the Win32 handles.
|
||||||
|
for &ev in &self.events {
|
||||||
|
let mut ep = nv::NV_ENC_EVENT_PARAMS {
|
||||||
|
version: nv::NV_ENC_EVENT_PARAMS_VER,
|
||||||
|
completionEvent: ev as *mut c_void,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let _ = (API.unregister_async_event)(self.encoder, &mut ep);
|
||||||
|
let _ = CloseHandle(HANDLE(ev as *mut c_void));
|
||||||
|
}
|
||||||
|
self.events.clear();
|
||||||
for &bs in &self.bitstreams {
|
for &bs in &self.bitstreams {
|
||||||
let _ = (API.destroy_bitstream_buffer)(self.encoder, bs);
|
let _ = (API.destroy_bitstream_buffer)(self.encoder, bs);
|
||||||
}
|
}
|
||||||
@@ -232,6 +391,7 @@ impl NvencD3d11Encoder {
|
|||||||
enc,
|
enc,
|
||||||
nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_CUSTOM_VBV_BUF_SIZE,
|
nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_CUSTOM_VBV_BUF_SIZE,
|
||||||
);
|
);
|
||||||
|
let async_enc = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_ASYNC_ENCODE_SUPPORT);
|
||||||
let _ = (API.destroy_encoder)(enc);
|
let _ = (API.destroy_encoder)(enc);
|
||||||
|
|
||||||
// Reject an over-range mode with a clear message instead of an opaque InvalidParam.
|
// Reject an over-range mode with a clear message instead of an opaque InvalidParam.
|
||||||
@@ -259,9 +419,11 @@ impl NvencD3d11Encoder {
|
|||||||
}
|
}
|
||||||
self.rfi_supported = rfi != 0;
|
self.rfi_supported = rfi != 0;
|
||||||
self.custom_vbv = custom_vbv != 0;
|
self.custom_vbv = custom_vbv != 0;
|
||||||
|
self.async_supported = async_enc != 0;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
rfi = self.rfi_supported,
|
rfi = self.rfi_supported,
|
||||||
custom_vbv = self.custom_vbv,
|
custom_vbv = self.custom_vbv,
|
||||||
|
async_encode = self.async_supported,
|
||||||
max = %format!("{wmax}x{hmax}"),
|
max = %format!("{wmax}x{hmax}"),
|
||||||
ten_bit = ten_bit != 0,
|
ten_bit = ten_bit != 0,
|
||||||
"NVENC capabilities probed"
|
"NVENC capabilities probed"
|
||||||
@@ -277,6 +439,7 @@ impl NvencD3d11Encoder {
|
|||||||
device: &ID3D11Device,
|
device: &ID3D11Device,
|
||||||
bitrate: u64,
|
bitrate: u64,
|
||||||
split_mode: u32,
|
split_mode: u32,
|
||||||
|
enable_async: bool,
|
||||||
) -> Result<*mut c_void> {
|
) -> Result<*mut c_void> {
|
||||||
let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS {
|
let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS {
|
||||||
version: nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER,
|
version: nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER,
|
||||||
@@ -441,6 +604,9 @@ impl NvencD3d11Encoder {
|
|||||||
frameRateNum: self.fps,
|
frameRateNum: self.fps,
|
||||||
frameRateDen: 1,
|
frameRateDen: 1,
|
||||||
enablePTD: 1,
|
enablePTD: 1,
|
||||||
|
// Two-thread async retrieve (§5.B): completion events signal the retrieve thread
|
||||||
|
// instead of `lock_bitstream` blocking the submit thread.
|
||||||
|
enableEncodeAsync: enable_async as u32,
|
||||||
encodeConfig: &mut cfg,
|
encodeConfig: &mut cfg,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
@@ -525,7 +691,12 @@ impl NvencD3d11Encoder {
|
|||||||
// to THAT (don't undershoot — the old ×¾ step-down landed well below the real ceiling).
|
// to THAT (don't undershoot — the old ×¾ step-down landed well below the real ceiling).
|
||||||
const CLAMP_TOL_BPS: u64 = 20_000_000; // stop bisecting within ~20 Mbps of the ceiling
|
const CLAMP_TOL_BPS: u64 = 20_000_000; // stop bisecting within ~20 Mbps of the ceiling
|
||||||
|
|
||||||
let mut probe = self.try_open_session(device, requested_bps, split_mode);
|
// Two-thread async retrieve: operator opt-in AND the GPU reports async-encode support
|
||||||
|
// (query_caps above). Threaded into every session-open probe so the chosen session is
|
||||||
|
// built in the right mode from the start.
|
||||||
|
let use_async = self.async_supported && async_retrieve_requested();
|
||||||
|
|
||||||
|
let mut probe = self.try_open_session(device, requested_bps, split_mode, use_async);
|
||||||
// Disambiguate a forced-split rejection from a bitrate-cap rejection: retry once at the
|
// Disambiguate a forced-split rejection from a bitrate-cap rejection: retry once at the
|
||||||
// requested rate with split disabled — if THAT succeeds, split was the problem, not bitrate.
|
// requested rate with split disabled — if THAT succeeds, split was the problem, not bitrate.
|
||||||
let split_forced = split_mode
|
let split_forced = split_mode
|
||||||
@@ -533,7 +704,7 @@ impl NvencD3d11Encoder {
|
|||||||
&& split_mode != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
&& split_mode != nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
||||||
if probe.is_err() && split_forced {
|
if probe.is_err() && split_forced {
|
||||||
let no_split = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
let no_split = nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
||||||
if let Ok(e) = self.try_open_session(device, requested_bps, no_split) {
|
if let Ok(e) = self.try_open_session(device, requested_bps, no_split, use_async) {
|
||||||
tracing::warn!("NVENC: split-encode rejected by codec/config — disabled");
|
tracing::warn!("NVENC: split-encode rejected by codec/config — disabled");
|
||||||
split_mode = no_split;
|
split_mode = no_split;
|
||||||
probe = Ok(e);
|
probe = Ok(e);
|
||||||
@@ -555,7 +726,7 @@ impl NvencD3d11Encoder {
|
|||||||
let mut best_bps = 0u64;
|
let mut best_bps = 0u64;
|
||||||
while hi > lo + CLAMP_TOL_BPS {
|
while hi > lo + CLAMP_TOL_BPS {
|
||||||
let mid = lo + (hi - lo) / 2;
|
let mid = lo + (hi - lo) / 2;
|
||||||
match self.try_open_session(device, mid, split_mode) {
|
match self.try_open_session(device, mid, split_mode, use_async) {
|
||||||
Ok(e) => {
|
Ok(e) => {
|
||||||
if !best.is_null() {
|
if !best.is_null() {
|
||||||
let _ = (API.destroy_encoder)(best);
|
let _ = (API.destroy_encoder)(best);
|
||||||
@@ -573,8 +744,10 @@ impl NvencD3d11Encoder {
|
|||||||
let no_split =
|
let no_split =
|
||||||
nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32;
|
||||||
best = self
|
best = self
|
||||||
.try_open_session(device, FLOOR_BPS, split_mode)
|
.try_open_session(device, FLOOR_BPS, split_mode, use_async)
|
||||||
.or_else(|_| self.try_open_session(device, FLOOR_BPS, no_split))
|
.or_else(|_| {
|
||||||
|
self.try_open_session(device, FLOOR_BPS, no_split, use_async)
|
||||||
|
})
|
||||||
.context(
|
.context(
|
||||||
"NVENC initialize_encoder rejected even at the floor bitrate",
|
"NVENC initialize_encoder rejected even at the floor bitrate",
|
||||||
)?;
|
)?;
|
||||||
@@ -610,6 +783,42 @@ impl NvencD3d11Encoder {
|
|||||||
.map_err(|e| anyhow!("create_bitstream_buffer: {e:?}"))?;
|
.map_err(|e| anyhow!("create_bitstream_buffer: {e:?}"))?;
|
||||||
self.bitstreams.push(cb.bitstreamBuffer);
|
self.bitstreams.push(cb.bitstreamBuffer);
|
||||||
}
|
}
|
||||||
|
// Async retrieve: one auto-reset completion event per pool bitstream, registered with
|
||||||
|
// the session, plus the retrieve thread the events signal. The thread only ever sees
|
||||||
|
// raw addresses; `teardown` joins it before any of them die.
|
||||||
|
if use_async {
|
||||||
|
for _ in 0..POOL {
|
||||||
|
let ev = CreateEventW(None, false, false, PCWSTR::null())
|
||||||
|
.context("CreateEvent (NVENC completion)")?;
|
||||||
|
let mut ep = nv::NV_ENC_EVENT_PARAMS {
|
||||||
|
version: nv::NV_ENC_EVENT_PARAMS_VER,
|
||||||
|
completionEvent: ev.0,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
(API.register_async_event)(enc, &mut ep)
|
||||||
|
.result_without_string()
|
||||||
|
.map_err(|e| anyhow!("register_async_event: {e:?}"))?;
|
||||||
|
self.events.push(ev.0 as usize);
|
||||||
|
}
|
||||||
|
let (work_tx, work_rx) = mpsc::sync_channel::<RetrieveJob>(POOL);
|
||||||
|
let (done_tx, done_rx) = mpsc::channel::<RetrieveDone>();
|
||||||
|
let enc_addr = enc as usize;
|
||||||
|
let join = std::thread::Builder::new()
|
||||||
|
.name("punktfunk-nvenc-out".into())
|
||||||
|
.spawn(move || retrieve_loop(enc_addr, work_rx, done_tx))
|
||||||
|
.context("spawn NVENC retrieve thread")?;
|
||||||
|
self.async_rt = Some(AsyncRetrieve {
|
||||||
|
work_tx: Some(work_tx),
|
||||||
|
done_rx,
|
||||||
|
join: Some(join),
|
||||||
|
ready: VecDeque::new(),
|
||||||
|
});
|
||||||
|
tracing::info!(
|
||||||
|
pool = POOL,
|
||||||
|
"NVENC async retrieve active (two-thread encode: submit here, \
|
||||||
|
lock_bitstream on the retrieve thread)"
|
||||||
|
);
|
||||||
|
}
|
||||||
self.inited = true;
|
self.inited = true;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"NVENC D3D11 session: {}x{}@{} {}-bit{} {} Mbps {:?}",
|
"NVENC D3D11 session: {}x{}@{} {}-bit{} {} Mbps {:?}",
|
||||||
@@ -624,6 +833,40 @@ impl NvencD3d11Encoder {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fold one retrieve-thread completion back into encoder state ON THE ENCODE THREAD: pop the
|
||||||
|
/// oldest `pending` entry (completions are FIFO — one retrieve thread, in-order jobs), verify
|
||||||
|
/// the bitstream pairing, unmap the input resource, and queue the AU for `poll`. A retrieve
|
||||||
|
/// error surfaces AFTER the unmap (the resource is retired either way) so the session glue's
|
||||||
|
/// rebuild path starts from clean state.
|
||||||
|
fn absorb_done(&mut self, done: RetrieveDone) -> Result<()> {
|
||||||
|
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
|
||||||
|
bail!("NVENC async: completion with no in-flight frame (pairing bug)");
|
||||||
|
};
|
||||||
|
if bs as usize != done.bs {
|
||||||
|
bail!("NVENC async: completion out of order (pairing bug)");
|
||||||
|
}
|
||||||
|
// SAFETY: `map` is the mapped input `submit` recorded for exactly this now-completed
|
||||||
|
// encode; the session is live (`async_rt` exists only between `init_session` and
|
||||||
|
// `teardown`) and this runs on the encode thread — the single unmap here mirrors the sync
|
||||||
|
// path's poll-side unmap, exactly once per mapping.
|
||||||
|
unsafe {
|
||||||
|
if !map.is_null() {
|
||||||
|
let _ = (API.unmap_input_resource)(self.encoder, map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let (data, keyframe) = done.result.map_err(|e| anyhow!("{e}"))?;
|
||||||
|
self.async_rt
|
||||||
|
.as_mut()
|
||||||
|
.expect("absorb_done is only reachable in async mode")
|
||||||
|
.ready
|
||||||
|
.push_back(EncodedFrame {
|
||||||
|
data,
|
||||||
|
pts_ns,
|
||||||
|
keyframe,
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Encoder for NvencD3d11Encoder {
|
impl Encoder for NvencD3d11Encoder {
|
||||||
@@ -694,6 +937,20 @@ impl Encoder for NvencD3d11Encoder {
|
|||||||
self.init_session(&device)?;
|
self.init_session(&device)?;
|
||||||
self.init_device = dev_raw;
|
self.init_device = dev_raw;
|
||||||
}
|
}
|
||||||
|
// Async backpressure: never hand NVENC an output bitstream that is still in flight, and
|
||||||
|
// keep in-flight depth within the capturer's texture ring (see `async_inflight_cap`). At
|
||||||
|
// the cap, block on the OLDEST completion (the retrieve thread is already waiting on its
|
||||||
|
// event) before submitting more — bounding depth exactly like the sync path's per-tick
|
||||||
|
// blocking poll, just `cap` deep instead of 1.
|
||||||
|
while self.async_rt.is_some() && self.pending.len() >= async_inflight_cap() {
|
||||||
|
let done = {
|
||||||
|
let rt = self.async_rt.as_mut().expect("checked in loop condition");
|
||||||
|
rt.done_rx
|
||||||
|
.recv_timeout(std::time::Duration::from_secs(5))
|
||||||
|
.map_err(|_| anyhow!("NVENC async retrieve stalled (5s) — encoder wedged?"))?
|
||||||
|
};
|
||||||
|
self.absorb_done(done)?;
|
||||||
|
}
|
||||||
let slot = self.next % POOL;
|
let slot = self.next % POOL;
|
||||||
self.next += 1;
|
self.next += 1;
|
||||||
// SAFETY: every NVENC call goes through a function pointer from the loaded `ENCODE_API` table
|
// SAFETY: every NVENC call goes through a function pointer from the loaded `ENCODE_API` table
|
||||||
@@ -765,6 +1022,13 @@ impl Encoder for NvencD3d11Encoder {
|
|||||||
pictureStruct: nv::NV_ENC_PIC_STRUCT::NV_ENC_PIC_STRUCT_FRAME,
|
pictureStruct: nv::NV_ENC_PIC_STRUCT::NV_ENC_PIC_STRUCT_FRAME,
|
||||||
inputTimeStamp: pts,
|
inputTimeStamp: pts,
|
||||||
encodePicFlags: flags as u32,
|
encodePicFlags: flags as u32,
|
||||||
|
// Async mode: the event the driver signals when this encode completes (the
|
||||||
|
// retrieve thread waits on it). Null in sync mode (`events` is empty).
|
||||||
|
completionEvent: self
|
||||||
|
.events
|
||||||
|
.get(slot)
|
||||||
|
.map(|&e| e as *mut c_void)
|
||||||
|
.unwrap_or(ptr::null_mut()),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -817,6 +1081,18 @@ impl Encoder for NvencD3d11Encoder {
|
|||||||
.map_err(|e| anyhow!("encode_picture: {e:?}"))?;
|
.map_err(|e| anyhow!("encode_picture: {e:?}"))?;
|
||||||
self.pending
|
self.pending
|
||||||
.push_back((self.bitstreams[slot], mp.mappedResource, captured.pts_ns));
|
.push_back((self.bitstreams[slot], mp.mappedResource, captured.pts_ns));
|
||||||
|
// Async: hand the in-flight encode to the retrieve thread (channel capacity = POOL ≥
|
||||||
|
// in-flight, so this send never blocks). The pending entry above pairs with its
|
||||||
|
// completion FIFO in `absorb_done`.
|
||||||
|
if let Some(rt) = &self.async_rt {
|
||||||
|
let job = RetrieveJob {
|
||||||
|
bs: self.bitstreams[slot] as usize,
|
||||||
|
event: self.events[slot],
|
||||||
|
};
|
||||||
|
if rt.work_tx.as_ref().is_none_or(|tx| tx.send(job).is_err()) {
|
||||||
|
bail!("NVENC retrieve thread gone — rebuilding the session");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -834,6 +1110,9 @@ impl Encoder for NvencD3d11Encoder {
|
|||||||
// Reflects what the session actually configured (cleared in `query_caps` if the GPU lacks
|
// Reflects what the session actually configured (cleared in `query_caps` if the GPU lacks
|
||||||
// YUV444 encode), so the glue can confirm 4:4:4 vs the negotiated request.
|
// YUV444 encode), so the glue can confirm 4:4:4 vs the negotiated request.
|
||||||
chroma_444: self.chroma_444,
|
chroma_444: self.chroma_444,
|
||||||
|
// The direct-NVENC path recovers via real RFI (or a forced IDR), not the Linux
|
||||||
|
// libavcodec intra-refresh mode.
|
||||||
|
intra_refresh: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -891,6 +1170,30 @@ impl Encoder for NvencD3d11Encoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn poll(&mut self) -> Result<Option<EncodedFrame>> {
|
fn poll(&mut self) -> Result<Option<EncodedFrame>> {
|
||||||
|
// Async mode: drain whatever the retrieve thread has finished (non-blocking) and hand out
|
||||||
|
// the oldest ready AU. `None` = nothing completed yet — the session loop keeps the frame
|
||||||
|
// in flight and re-polls next tick, capture never blocks on the WDDM scheduling wait.
|
||||||
|
if self.async_rt.is_some() {
|
||||||
|
loop {
|
||||||
|
let done = match self
|
||||||
|
.async_rt
|
||||||
|
.as_mut()
|
||||||
|
.expect("checked just above")
|
||||||
|
.done_rx
|
||||||
|
.try_recv()
|
||||||
|
{
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(_) => break,
|
||||||
|
};
|
||||||
|
self.absorb_done(done)?;
|
||||||
|
}
|
||||||
|
return Ok(self
|
||||||
|
.async_rt
|
||||||
|
.as_mut()
|
||||||
|
.expect("checked just above")
|
||||||
|
.ready
|
||||||
|
.pop_front());
|
||||||
|
}
|
||||||
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
|
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user