feat(host/windows): two-process secure-desktop step 4 — spawn helper + relay AUs
The SYSTEM host now sources the normal-desktop video from a user-session WGC helper instead of capturing in-process (WGC won't activate as SYSTEM). New `capture/wgc_relay.rs`: `HelperRelay::spawn` launches `m3-host wgc-helper` in the interactive user session via CreateProcessAsUserW (WTSQueryUserToken → DuplicateTokenEx(TokenPrimary) → lpDesktop="winsta0\\default", CREATE_NO_WINDOW) with three anonymous pipes — stdout (framed Annex-B AUs → parsed back to RelayAu), stdin (control: force-keyframe), stderr (helper logs → host tracing). The host holds the SudoVDA keepalive (sole isolation/topology owner); the helper captures by GDI name only. m3.rs: `virtual_stream` dispatches to the new `virtual_stream_relay` when `should_use_helper()` (running as SYSTEM, or PUNKTFUNK_FORCE_HELPER; disable with PUNKTFUNK_NO_HELPER). The relay loop feeds the existing send thread — same FEC/seal/paced-send path. Reconfigure rebuilds the output + re-spawns the helper; keyframe requests forward over the control pipe; helper pts_ns (same-machine monotonic clock) is used directly as capture_ns. Disconnect ends the stream (step 6 adds the relaunch watchdog). wgc_helper.rs: reads the stdin control byte to request an IDR; --bit-depth flag threaded through so SDR 10-bit (Main10) negotiation reaches the helper's encoder. cfg-gated windows-only; Linux/macOS build unaffected. Step 5 (DesktopWatcher mux to host DDA on the Winlogon secure desktop) is next. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,6 +136,11 @@ windows = { version = "0.62", features = [
|
||||
"Win32_System_WinRT_Graphics_Capture",
|
||||
# WGC runs under SYSTEM via interactive-user impersonation (WGC won't activate as SYSTEM).
|
||||
"Win32_System_RemoteDesktop",
|
||||
# Two-process secure-desktop design: the SYSTEM host spawns the WGC helper in the interactive
|
||||
# user session (CreateProcessAsUserW) with stdout/stdin redirected to anonymous pipes.
|
||||
"Win32_System_Threading",
|
||||
"Win32_System_Pipes",
|
||||
"Win32_System_Environment",
|
||||
] }
|
||||
# Software H.264 encoder (GPU-less path + NVENC fallback). The default `source` feature statically
|
||||
# compiles OpenH264 (BSD-2) — no system lib, builds on MSVC; nasm on PATH adds the SIMD fast path.
|
||||
|
||||
@@ -328,3 +328,5 @@ pub mod dxgi;
|
||||
mod linux;
|
||||
#[cfg(target_os = "windows")]
|
||||
pub mod wgc;
|
||||
#[cfg(target_os = "windows")]
|
||||
pub mod wgc_relay;
|
||||
|
||||
@@ -0,0 +1,389 @@
|
||||
//! Host-side WGC helper relay (Windows two-process secure-desktop design,
|
||||
//! docs/windows-secure-desktop.md — step 4).
|
||||
//!
|
||||
//! WGC won't activate under the SYSTEM account, so the SYSTEM host can't capture the normal desktop
|
||||
//! itself. Instead it spawns `m3-host wgc-helper` in the **interactive user session** (so WGC works)
|
||||
//! via `CreateProcessAsUserW`, with the helper's **stdout** redirected to an anonymous pipe the host
|
||||
//! reads. The helper ships framed Annex-B access units; this module parses them back into AUs the
|
||||
//! host relays onto the live QUIC session (same `EncodedFrame` flow, just sourced over a pipe instead
|
||||
//! of a local encoder). A second pipe carries a tiny **control** channel to the helper (stdin: force
|
||||
//! keyframe), and the helper's **stderr** is forwarded line-by-line into host tracing so its logs are
|
||||
//! visible from the SYSTEM host's console.
|
||||
//!
|
||||
//! Wire framing (must match `wgc_helper::write_au`): per AU
|
||||
//! `[u32 magic "PFAU" LE][u32 len LE][u64 pts_ns LE][u8 keyframe][len bytes data]`.
|
||||
|
||||
use crate::capture::dxgi::WinCaptureTarget;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
use std::sync::mpsc::{Receiver, SyncSender};
|
||||
use std::sync::Mutex;
|
||||
use windows::core::PWSTR;
|
||||
use windows::Win32::Foundation::SetHandleInformation;
|
||||
use windows::Win32::Foundation::{CloseHandle, HANDLE};
|
||||
use windows::Win32::Foundation::{HANDLE_FLAGS, HANDLE_FLAG_INHERIT};
|
||||
use windows::Win32::Security::{
|
||||
DuplicateTokenEx, SecurityImpersonation, TokenPrimary, SECURITY_ATTRIBUTES, TOKEN_ALL_ACCESS,
|
||||
};
|
||||
use windows::Win32::System::Environment::{CreateEnvironmentBlock, DestroyEnvironmentBlock};
|
||||
use windows::Win32::System::Pipes::CreatePipe;
|
||||
use windows::Win32::System::RemoteDesktop::{WTSGetActiveConsoleSessionId, WTSQueryUserToken};
|
||||
use windows::Win32::System::Threading::{
|
||||
CreateProcessAsUserW, TerminateProcess, CREATE_NO_WINDOW, CREATE_UNICODE_ENVIRONMENT,
|
||||
PROCESS_INFORMATION, STARTF_USESTDHANDLES, STARTUPINFOW,
|
||||
};
|
||||
|
||||
/// Must match [`crate::wgc_helper`]'s `AU_MAGIC` ("PFAU").
|
||||
const AU_MAGIC: u32 = 0x5046_4155;
|
||||
|
||||
/// One access unit relayed from the helper, in the helper's (= the host's, same machine) monotonic
|
||||
/// clock — `pts_ns` is directly comparable to the host's `now_ns()`.
|
||||
pub struct RelayAu {
|
||||
pub data: Vec<u8>,
|
||||
pub pts_ns: u64,
|
||||
pub keyframe: bool,
|
||||
}
|
||||
|
||||
/// A running USER-session WGC helper whose AUs the SYSTEM host relays. Drop kills the child + closes
|
||||
/// the pipes; the reader threads then end on the broken pipe.
|
||||
pub struct HelperRelay {
|
||||
proc: HANDLE,
|
||||
thread: HANDLE,
|
||||
/// Host write end of the helper's stdin — control commands (force keyframe). Mutex so the relay
|
||||
/// can be shared while the encode thread requests keyframes.
|
||||
stdin_w: Mutex<HANDLE>,
|
||||
/// Parsed AUs from the helper's stdout reader thread.
|
||||
rx: Receiver<RelayAu>,
|
||||
}
|
||||
|
||||
// HANDLEs are just kernel handle values; we own them for the relay's lifetime and close them on Drop.
|
||||
unsafe impl Send for HelperRelay {}
|
||||
unsafe impl Sync for HelperRelay {}
|
||||
|
||||
/// Control byte on the helper's stdin: force the next encoded frame to be an IDR (client decode
|
||||
/// recovery). Mirrors `enc.request_keyframe()` in the single-process path.
|
||||
const CTL_KEYFRAME: u8 = 0x01;
|
||||
|
||||
impl HelperRelay {
|
||||
/// Spawn the helper in the interactive user session and start relaying its AUs. `target` is the
|
||||
/// SudoVDA output the host already created (captured by GDI name only — the helper never touches
|
||||
/// display topology). `(w, h, hz)` is the negotiated mode; `bitrate_kbps` the negotiated bitrate.
|
||||
pub fn spawn(
|
||||
target: &WinCaptureTarget,
|
||||
mode: (u32, u32, u32),
|
||||
bitrate_kbps: u32,
|
||||
bit_depth: u8,
|
||||
) -> Result<HelperRelay> {
|
||||
let exe = std::env::current_exe().context("current_exe for helper spawn")?;
|
||||
let exe = exe.to_string_lossy().into_owned();
|
||||
let (w, h, hz) = mode;
|
||||
// CreateProcessAsUserW takes a single mutable command line (argv[0] = exe).
|
||||
let cmdline = format!(
|
||||
"\"{exe}\" wgc-helper --gdi \"{}\" --target-id {} --mode {w}x{h}x{hz} --bitrate {bitrate_kbps} --bit-depth {bit_depth}",
|
||||
target.gdi_name, target.target_id
|
||||
);
|
||||
tracing::info!(cmd = %cmdline, "spawning WGC helper in user session");
|
||||
|
||||
unsafe { spawn_inner(&cmdline, w, h, hz) }
|
||||
}
|
||||
|
||||
/// Receive the next relayed AU. Distinguishes a `Timeout` (helper slow/stalled — keep waiting)
|
||||
/// from `Disconnected` (helper exited → its stdout closed → reader thread ended → channel
|
||||
/// dropped), which returns *immediately* and means the relay must stop, not spin.
|
||||
pub fn recv_timeout(
|
||||
&self,
|
||||
dur: std::time::Duration,
|
||||
) -> Result<RelayAu, std::sync::mpsc::RecvTimeoutError> {
|
||||
self.rx.recv_timeout(dur)
|
||||
}
|
||||
|
||||
/// Ask the helper's encoder for an IDR on the next frame (client decode recovery). Best-effort:
|
||||
/// a write failure means the helper is gone — the caller's recv loop will see the disconnect.
|
||||
pub fn request_keyframe(&self) {
|
||||
let h = self.stdin_w.lock().unwrap();
|
||||
let mut written = 0u32;
|
||||
unsafe {
|
||||
let _ = windows::Win32::Storage::FileSystem::WriteFile(
|
||||
*h,
|
||||
Some(&[CTL_KEYFRAME]),
|
||||
Some(&mut written),
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HelperRelay {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
// Terminate the child first so its WGC capture + NVENC session tear down, then close our
|
||||
// handles (the reader threads end on the resulting broken pipe).
|
||||
let _ = TerminateProcess(self.proc, 1);
|
||||
let _ = CloseHandle(*self.stdin_w.lock().unwrap());
|
||||
let _ = CloseHandle(self.proc);
|
||||
let _ = CloseHandle(self.thread);
|
||||
}
|
||||
tracing::info!("WGC helper relay torn down");
|
||||
}
|
||||
}
|
||||
|
||||
/// Inheritable anonymous pipe (read, write). The caller marks whichever end the host keeps as
|
||||
/// non-inheritable so the child only inherits its own end.
|
||||
unsafe fn make_pipe() -> Result<(HANDLE, HANDLE)> {
|
||||
let mut read = HANDLE::default();
|
||||
let mut write = HANDLE::default();
|
||||
let sa = SECURITY_ATTRIBUTES {
|
||||
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
|
||||
lpSecurityDescriptor: std::ptr::null_mut(),
|
||||
bInheritHandle: true.into(),
|
||||
};
|
||||
CreatePipe(&mut read, &mut write, Some(&sa), 0).context("CreatePipe")?;
|
||||
Ok((read, write))
|
||||
}
|
||||
|
||||
/// Mark a handle non-inheritable (the host keeps it; the child must not get a copy).
|
||||
unsafe fn no_inherit(h: HANDLE) {
|
||||
let _ = SetHandleInformation(h, HANDLE_FLAG_INHERIT.0, HANDLE_FLAGS(0));
|
||||
}
|
||||
|
||||
unsafe fn spawn_inner(cmdline: &str, w: u32, h: u32, hz: u32) -> Result<HelperRelay> {
|
||||
// The user token of the active console session (requires the host to be SYSTEM).
|
||||
let session = WTSGetActiveConsoleSessionId();
|
||||
if session == 0xFFFF_FFFF {
|
||||
bail!("no active console session (WTSGetActiveConsoleSessionId)");
|
||||
}
|
||||
let mut user_token = HANDLE::default();
|
||||
WTSQueryUserToken(session, &mut user_token)
|
||||
.context("WTSQueryUserToken (host must run as SYSTEM)")?;
|
||||
|
||||
// A primary token for CreateProcessAsUserW.
|
||||
let mut primary = HANDLE::default();
|
||||
let dup = DuplicateTokenEx(
|
||||
user_token,
|
||||
TOKEN_ALL_ACCESS,
|
||||
None,
|
||||
SecurityImpersonation,
|
||||
TokenPrimary,
|
||||
&mut primary,
|
||||
);
|
||||
let _ = CloseHandle(user_token);
|
||||
dup.context("DuplicateTokenEx(TokenPrimary)")?;
|
||||
|
||||
// The user's environment block (PATH, USERPROFILE, …) so the helper resolves config + DLLs.
|
||||
let mut env_block: *mut core::ffi::c_void = std::ptr::null_mut();
|
||||
let _ = CreateEnvironmentBlock(&mut env_block, Some(primary), false);
|
||||
|
||||
// Three pipes: stdout (helper→host AUs), stdin (host→helper control), stderr (helper→host logs).
|
||||
let (out_r, out_w) = make_pipe().context("stdout pipe")?;
|
||||
let (in_r, in_w) = make_pipe().context("stdin pipe")?;
|
||||
let (err_r, err_w) = make_pipe().context("stderr pipe")?;
|
||||
// The host keeps out_r / in_w / err_r — none inheritable; the child inherits out_w/in_r/err_w.
|
||||
no_inherit(out_r);
|
||||
no_inherit(in_w);
|
||||
no_inherit(err_r);
|
||||
|
||||
let mut si = STARTUPINFOW {
|
||||
cb: std::mem::size_of::<STARTUPINFOW>() as u32,
|
||||
dwFlags: STARTF_USESTDHANDLES,
|
||||
hStdInput: in_r,
|
||||
hStdOutput: out_w,
|
||||
hStdError: err_w,
|
||||
..Default::default()
|
||||
};
|
||||
// WGC needs the interactive desktop.
|
||||
let mut desktop: Vec<u16> = "winsta0\\default\0".encode_utf16().collect();
|
||||
si.lpDesktop = PWSTR(desktop.as_mut_ptr());
|
||||
|
||||
let mut cmd: Vec<u16> = cmdline.encode_utf16().chain(std::iter::once(0)).collect();
|
||||
let mut pi = PROCESS_INFORMATION::default();
|
||||
|
||||
let created = CreateProcessAsUserW(
|
||||
Some(primary),
|
||||
None,
|
||||
Some(PWSTR(cmd.as_mut_ptr())),
|
||||
None,
|
||||
None,
|
||||
true, // inherit handles (the child's std ends)
|
||||
CREATE_UNICODE_ENVIRONMENT | CREATE_NO_WINDOW,
|
||||
Some(env_block),
|
||||
None,
|
||||
&si,
|
||||
&mut pi,
|
||||
);
|
||||
|
||||
// Clean up regardless of outcome: the child now owns its inherited ends; close our copies.
|
||||
let _ = CloseHandle(out_w);
|
||||
let _ = CloseHandle(in_r);
|
||||
let _ = CloseHandle(err_w);
|
||||
if !env_block.is_null() {
|
||||
let _ = DestroyEnvironmentBlock(env_block);
|
||||
}
|
||||
let _ = CloseHandle(primary);
|
||||
|
||||
if let Err(e) = created {
|
||||
let _ = CloseHandle(out_r);
|
||||
let _ = CloseHandle(in_w);
|
||||
let _ = CloseHandle(err_r);
|
||||
return Err(e).context("CreateProcessAsUserW(wgc-helper)");
|
||||
}
|
||||
tracing::info!(pid = pi.dwProcessId, mode = %format!("{w}x{h}@{hz}"), "WGC helper spawned");
|
||||
|
||||
// stderr → host tracing, line by line.
|
||||
let err_handle = HandleReader(err_r);
|
||||
std::thread::Builder::new()
|
||||
.name("wgc-helper-log".into())
|
||||
.spawn(move || {
|
||||
let r = BufReader::new(err_handle);
|
||||
for line in r.lines() {
|
||||
match line {
|
||||
Ok(l) if !l.trim().is_empty() => tracing::info!(target: "wgc_helper", "{l}"),
|
||||
Ok(_) => {}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
.ok();
|
||||
|
||||
// stdout → parsed AUs. Bounded so a stalled relay applies backpressure (the pipe then fills and
|
||||
// the helper blocks on write — the same backpressure the single-process channel gives).
|
||||
let (tx, rx) = std::sync::mpsc::sync_channel::<RelayAu>(3);
|
||||
let out_handle = HandleReader(out_r);
|
||||
std::thread::Builder::new()
|
||||
.name("wgc-helper-au".into())
|
||||
.spawn(move || au_reader(out_handle, tx))
|
||||
.ok();
|
||||
|
||||
Ok(HelperRelay {
|
||||
proc: pi.hProcess,
|
||||
thread: pi.hThread,
|
||||
stdin_w: Mutex::new(in_w),
|
||||
rx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse the AU framing off the helper's stdout and forward each AU. Ends (returns) when the pipe
|
||||
/// breaks (helper exit) or the channel's receiver is dropped (relay torn down).
|
||||
fn au_reader(mut r: HandleReader, tx: SyncSender<RelayAu>) {
|
||||
loop {
|
||||
let mut hdr = [0u8; 4 + 4 + 8 + 1];
|
||||
if r.read_exact(&mut hdr).is_err() {
|
||||
break;
|
||||
}
|
||||
let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
|
||||
if magic != AU_MAGIC {
|
||||
tracing::error!(
|
||||
magic = format!("{magic:#x}"),
|
||||
"WGC helper AU stream desync — aborting relay"
|
||||
);
|
||||
break;
|
||||
}
|
||||
let len = u32::from_le_bytes([hdr[4], hdr[5], hdr[6], hdr[7]]) as usize;
|
||||
let pts_ns = u64::from_le_bytes([
|
||||
hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15],
|
||||
]);
|
||||
let keyframe = hdr[16] != 0;
|
||||
// Bound the allocation — a corrupt length must not OOM the host. 64 MiB is far above any real
|
||||
// AU (a 5K keyframe is a few MB).
|
||||
if len > 64 * 1024 * 1024 {
|
||||
tracing::error!(len, "WGC helper AU length implausible — aborting relay");
|
||||
break;
|
||||
}
|
||||
let mut data = vec![0u8; len];
|
||||
if r.read_exact(&mut data).is_err() {
|
||||
break;
|
||||
}
|
||||
if tx
|
||||
.send(RelayAu {
|
||||
data,
|
||||
pts_ns,
|
||||
keyframe,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
break; // relay dropped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal `Read` over a Win32 pipe HANDLE (the windows crate doesn't impl `Read` on HANDLE).
|
||||
struct HandleReader(HANDLE);
|
||||
unsafe impl Send for HandleReader {}
|
||||
impl Read for HandleReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let mut read = 0u32;
|
||||
let ok = unsafe {
|
||||
windows::Win32::Storage::FileSystem::ReadFile(self.0, Some(buf), Some(&mut read), None)
|
||||
};
|
||||
match ok {
|
||||
Ok(()) => Ok(read as usize),
|
||||
// A broken pipe (helper exited) reads as ERROR_BROKEN_PIPE → report EOF (0).
|
||||
Err(_) => Ok(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Drop for HandleReader {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let _ = CloseHandle(self.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Is this process running as the LOCAL SYSTEM account? Used to decide whether the two-process
|
||||
/// secure-desktop path applies (only SYSTEM can `WTSQueryUserToken` + capture the Winlogon desktop).
|
||||
pub fn running_as_system() -> bool {
|
||||
use windows::Win32::Security::{GetTokenInformation, TokenUser, TOKEN_QUERY, TOKEN_USER};
|
||||
use windows::Win32::System::Threading::{GetCurrentProcess, OpenProcessToken};
|
||||
unsafe {
|
||||
let mut token = HANDLE::default();
|
||||
if OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &mut token).is_err() {
|
||||
return false;
|
||||
}
|
||||
let mut len = 0u32;
|
||||
let _ = GetTokenInformation(token, TokenUser, None, 0, &mut len);
|
||||
if len == 0 {
|
||||
let _ = CloseHandle(token);
|
||||
return false;
|
||||
}
|
||||
let mut buf = vec![0u8; len as usize];
|
||||
let ok = GetTokenInformation(
|
||||
token,
|
||||
TokenUser,
|
||||
Some(buf.as_mut_ptr() as *mut _),
|
||||
len,
|
||||
&mut len,
|
||||
)
|
||||
.is_ok();
|
||||
let _ = CloseHandle(token);
|
||||
if !ok {
|
||||
return false;
|
||||
}
|
||||
let tu = &*(buf.as_ptr() as *const TOKEN_USER);
|
||||
// The well-known LocalSystem SID is S-1-5-18.
|
||||
is_local_system_sid(tu.User.Sid)
|
||||
}
|
||||
}
|
||||
|
||||
/// True iff `sid` is S-1-5-18 (LocalSystem).
|
||||
unsafe fn is_local_system_sid(sid: windows::Win32::Security::PSID) -> bool {
|
||||
use windows::Win32::Security::{
|
||||
GetSidIdentifierAuthority, GetSidSubAuthority, GetSidSubAuthorityCount, IsValidSid,
|
||||
};
|
||||
if !IsValidSid(sid).as_bool() {
|
||||
return false;
|
||||
}
|
||||
let auth = GetSidIdentifierAuthority(sid);
|
||||
if auth.is_null() {
|
||||
return false;
|
||||
}
|
||||
// NT Authority = {0,0,0,0,0,5}.
|
||||
let a = (*auth).Value;
|
||||
if a != [0, 0, 0, 0, 0, 5] {
|
||||
return false;
|
||||
}
|
||||
let count = *GetSidSubAuthorityCount(sid);
|
||||
if count != 1 {
|
||||
return false;
|
||||
}
|
||||
*GetSidSubAuthority(sid, 0) == 18 // SECURITY_LOCAL_SYSTEM_RID
|
||||
}
|
||||
@@ -1968,6 +1968,26 @@ fn virtual_stream(
|
||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
) -> Result<()> {
|
||||
// Windows two-process secure-desktop path: when the host runs as SYSTEM (required for the secure
|
||||
// desktop + SendInput), WGC can't activate in-process, so we capture the normal desktop via a
|
||||
// helper spawned in the user session and relay its AUs. (Single-process WGC/DDA is used as the
|
||||
// user, and stays the path on Linux.) See docs/windows-secure-desktop.md.
|
||||
#[cfg(target_os = "windows")]
|
||||
if should_use_helper() {
|
||||
return virtual_stream_relay(
|
||||
session,
|
||||
mode,
|
||||
seconds,
|
||||
stop,
|
||||
reconfig,
|
||||
keyframe,
|
||||
compositor,
|
||||
bitrate_kbps,
|
||||
bit_depth,
|
||||
probe_rx,
|
||||
probe_result_tx,
|
||||
);
|
||||
}
|
||||
tracing::info!(
|
||||
compositor = compositor.id(),
|
||||
?mode,
|
||||
@@ -2178,6 +2198,186 @@ fn virtual_stream(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Should this host take the two-process (SYSTEM host + user-session WGC helper) path? Yes when it's
|
||||
/// running as SYSTEM — the only account that can capture the secure desktop + drive SendInput on it,
|
||||
/// and the account under which in-process WGC won't activate. `PUNKTFUNK_FORCE_HELPER` forces it on
|
||||
/// (for testing the relay as a normal user); `PUNKTFUNK_NO_HELPER` forces it off.
|
||||
#[cfg(target_os = "windows")]
|
||||
fn should_use_helper() -> bool {
|
||||
if std::env::var_os("PUNKTFUNK_NO_HELPER").is_some() {
|
||||
return false;
|
||||
}
|
||||
std::env::var_os("PUNKTFUNK_FORCE_HELPER").is_some()
|
||||
|| crate::capture::wgc_relay::running_as_system()
|
||||
}
|
||||
|
||||
/// Windows two-process video stream: the SYSTEM host creates the SudoVDA virtual output (and holds
|
||||
/// its keepalive = the sole topology/isolation owner), spawns the WGC helper in the user session to
|
||||
/// capture+encode it, and relays the helper's AUs onto the QUIC data plane via the same send thread
|
||||
/// as the single-process path. Reconfigure rebuilds the output + re-spawns the helper at the new
|
||||
/// mode; keyframe requests are forwarded to the helper's encoder over its control channel.
|
||||
///
|
||||
/// Step 4 (this function): the normal-desktop relay. Step 5 adds the DesktopWatcher-driven mux that
|
||||
/// switches to the host's own DDA encoder on the secure (Winlogon) desktop.
|
||||
#[cfg(target_os = "windows")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn virtual_stream_relay(
|
||||
session: Session,
|
||||
mode: punktfunk_core::Mode,
|
||||
seconds: u32,
|
||||
stop: Arc<AtomicBool>,
|
||||
reconfig: &std::sync::mpsc::Receiver<punktfunk_core::Mode>,
|
||||
keyframe: &std::sync::mpsc::Receiver<()>,
|
||||
compositor: crate::vdisplay::Compositor,
|
||||
bitrate_kbps: u32,
|
||||
bit_depth: u8,
|
||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
) -> Result<()> {
|
||||
use crate::capture::wgc_relay::HelperRelay;
|
||||
tracing::info!(
|
||||
?mode,
|
||||
bitrate_kbps,
|
||||
bit_depth,
|
||||
"punktfunk/1 two-process stream (SYSTEM host + user-session WGC helper)"
|
||||
);
|
||||
|
||||
let mut vd = crate::vdisplay::open(compositor)?;
|
||||
|
||||
// Create the SudoVDA output + spawn a helper capturing it by GDI name. Returns the keepalive
|
||||
// (held for the output's life — the sole isolation owner) and the running relay.
|
||||
type Built = (Box<dyn Send>, HelperRelay);
|
||||
let build = |vd: &mut Box<dyn crate::vdisplay::VirtualDisplay>,
|
||||
mode: punktfunk_core::Mode|
|
||||
-> Result<Built> {
|
||||
let vout = vd.create(mode).context("create virtual output")?;
|
||||
let effective_hz = vout
|
||||
.preferred_mode
|
||||
.map(|(_, _, hz)| hz)
|
||||
.filter(|&hz| hz > 0)
|
||||
.unwrap_or(mode.refresh_hz);
|
||||
let target = vout.win_capture.clone().ok_or_else(|| {
|
||||
anyhow!("SudoVDA target not yet an active display (needs a WDDM GPU to activate it)")
|
||||
})?;
|
||||
let relay = HelperRelay::spawn(
|
||||
&target,
|
||||
(mode.width, mode.height, effective_hz),
|
||||
bitrate_kbps,
|
||||
bit_depth,
|
||||
)
|
||||
.context("spawn WGC helper")?;
|
||||
Ok((vout.keepalive, relay))
|
||||
};
|
||||
|
||||
let (mut _keepalive, mut relay) = build(&mut vd, mode)?;
|
||||
|
||||
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
||||
let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.unwrap_or(128)
|
||||
* 1024;
|
||||
|
||||
// Same encode|send split as the single-process path: this thread relays AUs, a dedicated send
|
||||
// thread owns the Session and does FEC+seal+paced-send.
|
||||
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<FrameMsg>(3);
|
||||
let send_thread = std::thread::Builder::new()
|
||||
.name("punktfunk-send".into())
|
||||
.spawn({
|
||||
let stop = stop.clone();
|
||||
move || {
|
||||
send_loop(
|
||||
session,
|
||||
frame_rx,
|
||||
probe_rx,
|
||||
probe_result_tx,
|
||||
stop,
|
||||
perf,
|
||||
burst_cap,
|
||||
)
|
||||
}
|
||||
})
|
||||
.context("spawn send thread")?;
|
||||
|
||||
let mut interval = std::time::Duration::from_secs_f64(1.0 / mode.refresh_hz.max(1) as f64);
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||
let mut sent: u64 = 0;
|
||||
'outer: while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||
// Mode switch: rebuild the output + re-spawn the helper at the new mode (drop the old relay +
|
||||
// keepalive only after the new pair is up, so a failed rebuild keeps the current stream).
|
||||
let mut want = None;
|
||||
while let Ok(m) = reconfig.try_recv() {
|
||||
want = Some(m);
|
||||
}
|
||||
if let Some(new_mode) = want {
|
||||
tracing::info!(?new_mode, "two-process: rebuilding for mode switch");
|
||||
match build(&mut vd, new_mode) {
|
||||
Ok((ka, rl)) => {
|
||||
relay = rl; // drops the old relay (kills old helper) ...
|
||||
_keepalive = ka; // ... then releases the old output
|
||||
interval =
|
||||
std::time::Duration::from_secs_f64(1.0 / new_mode.refresh_hz.max(1) as f64);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), ?new_mode,
|
||||
"two-process mode-switch rebuild failed — staying on the current mode");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Forward client decode-recovery keyframe requests to the helper's encoder.
|
||||
let mut want_kf = false;
|
||||
while keyframe.try_recv().is_ok() {
|
||||
want_kf = true;
|
||||
}
|
||||
if want_kf {
|
||||
tracing::debug!("two-process: forwarding keyframe request to helper");
|
||||
relay.request_keyframe();
|
||||
}
|
||||
|
||||
// Pull the next relayed AU. A timeout means the helper stalled (or is mid-respawn); loop so
|
||||
// reconfig/keyframe/stop still get serviced. Disconnected means the helper exited — end the
|
||||
// stream (step 6 adds a relaunch watchdog; for now a dead helper ends the session).
|
||||
let au = match relay.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(au) => au,
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
tracing::warn!("two-process: no AU from helper within 500ms");
|
||||
continue;
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
||||
tracing::error!("two-process: WGC helper exited — ending stream");
|
||||
break;
|
||||
}
|
||||
};
|
||||
let flags = if au.keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
// The helper's pts_ns is on this machine's monotonic clock (same `now_ns()` source), so it is
|
||||
// directly usable as the capture timestamp. encode_us = pipe-relay latency from capture.
|
||||
let capture_ns = au.pts_ns;
|
||||
let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32;
|
||||
let msg = FrameMsg {
|
||||
data: au.data,
|
||||
capture_ns,
|
||||
flags,
|
||||
deadline: std::time::Instant::now() + interval,
|
||||
encode_us,
|
||||
};
|
||||
if frame_tx.send(msg).is_err() {
|
||||
break 'outer; // send thread gone
|
||||
}
|
||||
sent += 1;
|
||||
}
|
||||
drop(frame_tx);
|
||||
let _ = send_thread.join();
|
||||
tracing::info!(sent, "punktfunk/1 two-process stream complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// One mode's capture/encode pipeline: (capturer, encoder, first frame, frame interval).
|
||||
/// Dropping the capturer tears down the PipeWire stream and the virtual output with it.
|
||||
type Pipeline = (
|
||||
|
||||
@@ -223,6 +223,7 @@ fn real_main() -> Result<()> {
|
||||
bitrate_kbps: get("--bitrate")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(20000),
|
||||
bit_depth: get("--bit-depth").and_then(|s| s.parse().ok()).unwrap_or(8),
|
||||
})
|
||||
}
|
||||
Some("-h") | Some("--help") | Some("help") | None => {
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
use crate::capture::{dxgi::WinCaptureTarget, wgc::WgcCapturer, Capturer};
|
||||
use crate::encode::{self, Codec};
|
||||
use anyhow::{Context, Result};
|
||||
use std::io::Write;
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct HelperOptions {
|
||||
pub target_id: u32,
|
||||
@@ -24,11 +26,18 @@ pub struct HelperOptions {
|
||||
pub height: u32,
|
||||
pub fps: u32,
|
||||
pub bitrate_kbps: u32,
|
||||
/// Negotiated encode bit depth (8, or 10 = HEVC Main10). HDR auto-upgrades to 10 from the
|
||||
/// captured frame's `Rgb10a2` format regardless.
|
||||
pub bit_depth: u8,
|
||||
}
|
||||
|
||||
/// AU framing magic + version, so the host can resync / detect a helper crash on its stdout stream.
|
||||
const AU_MAGIC: u32 = 0x5046_4155; // "PFAU"
|
||||
|
||||
/// Control byte the host writes on our stdin to force the next frame to be an IDR. Must match
|
||||
/// `wgc_relay::CTL_KEYFRAME`.
|
||||
const CTL_KEYFRAME: u8 = 0x01;
|
||||
|
||||
pub fn run(opts: HelperOptions) -> Result<()> {
|
||||
tracing::info!(
|
||||
target_id = opts.target_id,
|
||||
@@ -59,11 +68,34 @@ pub fn run(opts: HelperOptions) -> Result<()> {
|
||||
h,
|
||||
opts.fps,
|
||||
opts.bitrate_kbps as u64 * 1000,
|
||||
false, // not cuda
|
||||
8, // bit depth: HDR auto-upgrades to Main10 from the Rgb10a2 frame
|
||||
false, // not cuda
|
||||
opts.bit_depth, // 8, or 10 = Main10 (HDR auto-upgrades from the Rgb10a2 frame regardless)
|
||||
)
|
||||
.context("open NVENC")?;
|
||||
|
||||
// Control channel: the host writes a single byte on our stdin to force an IDR (client decode
|
||||
// recovery), mirroring `enc.request_keyframe()` in the single-process path. A reader thread sets
|
||||
// a flag the encode loop checks; stdin EOF (host gone) just stops the thread.
|
||||
let kf = Arc::new(AtomicBool::new(false));
|
||||
{
|
||||
let kf = kf.clone();
|
||||
std::thread::Builder::new()
|
||||
.name("wgc-helper-ctl".into())
|
||||
.spawn(move || {
|
||||
let mut stdin = std::io::stdin();
|
||||
let mut byte = [0u8; 1];
|
||||
while let Ok(n) = stdin.read(&mut byte) {
|
||||
if n == 0 {
|
||||
break; // host closed our stdin
|
||||
}
|
||||
if byte[0] == CTL_KEYFRAME {
|
||||
kf.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Binary stdout — lock it once + write framed AUs. A short write / broken pipe means the host
|
||||
// (parent) went away → exit cleanly so the host's relaunch watchdog can respawn us.
|
||||
let stdout = std::io::stdout();
|
||||
@@ -71,6 +103,9 @@ pub fn run(opts: HelperOptions) -> Result<()> {
|
||||
|
||||
let mut frame = first;
|
||||
loop {
|
||||
if kf.swap(false, Ordering::Relaxed) {
|
||||
enc.request_keyframe();
|
||||
}
|
||||
enc.submit(&frame).context("encoder submit")?;
|
||||
while let Some(au) = enc.poll().context("encoder poll")? {
|
||||
if write_au(&mut out, &au).is_err() {
|
||||
|
||||
Reference in New Issue
Block a user