From 9f50b3930d7dae920641be34da35dc0e1da6e860 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Tue, 16 Jun 2026 07:43:20 +0000 Subject: [PATCH] =?UTF-8?q?feat(host/windows):=20two-process=20secure-desk?= =?UTF-8?q?top=20step=204=20=E2=80=94=20spawn=20helper=20+=20relay=20AUs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SYSTEM host now sources the normal-desktop video from a user-session WGC helper instead of capturing in-process (WGC won't activate as SYSTEM). New `capture/wgc_relay.rs`: `HelperRelay::spawn` launches `m3-host wgc-helper` in the interactive user session via CreateProcessAsUserW (WTSQueryUserToken → DuplicateTokenEx(TokenPrimary) → lpDesktop="winsta0\\default", CREATE_NO_WINDOW) with three anonymous pipes — stdout (framed Annex-B AUs → parsed back to RelayAu), stdin (control: force-keyframe), stderr (helper logs → host tracing). The host holds the SudoVDA keepalive (sole isolation/topology owner); the helper captures by GDI name only. m3.rs: `virtual_stream` dispatches to the new `virtual_stream_relay` when `should_use_helper()` (running as SYSTEM, or PUNKTFUNK_FORCE_HELPER; disable with PUNKTFUNK_NO_HELPER). The relay loop feeds the existing send thread — same FEC/seal/paced-send path. Reconfigure rebuilds the output + re-spawns the helper; keyframe requests forward over the control pipe; helper pts_ns (same-machine monotonic clock) is used directly as capture_ns. Disconnect ends the stream (step 6 adds the relaunch watchdog). wgc_helper.rs: reads the stdin control byte to request an IDR; --bit-depth flag threaded through so SDR 10-bit (Main10) negotiation reaches the helper's encoder. cfg-gated windows-only; Linux/macOS build unaffected. Step 5 (DesktopWatcher mux to host DDA on the Winlogon secure desktop) is next. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/punktfunk-host/Cargo.toml | 5 + crates/punktfunk-host/src/capture.rs | 2 + .../punktfunk-host/src/capture/wgc_relay.rs | 389 ++++++++++++++++++ crates/punktfunk-host/src/m3.rs | 200 +++++++++ crates/punktfunk-host/src/main.rs | 1 + crates/punktfunk-host/src/wgc_helper.rs | 41 +- 6 files changed, 635 insertions(+), 3 deletions(-) create mode 100644 crates/punktfunk-host/src/capture/wgc_relay.rs diff --git a/crates/punktfunk-host/Cargo.toml b/crates/punktfunk-host/Cargo.toml index 0bef6f2..7a26611 100644 --- a/crates/punktfunk-host/Cargo.toml +++ b/crates/punktfunk-host/Cargo.toml @@ -136,6 +136,11 @@ windows = { version = "0.62", features = [ "Win32_System_WinRT_Graphics_Capture", # WGC runs under SYSTEM via interactive-user impersonation (WGC won't activate as SYSTEM). "Win32_System_RemoteDesktop", + # Two-process secure-desktop design: the SYSTEM host spawns the WGC helper in the interactive + # user session (CreateProcessAsUserW) with stdout/stdin redirected to anonymous pipes. + "Win32_System_Threading", + "Win32_System_Pipes", + "Win32_System_Environment", ] } # Software H.264 encoder (GPU-less path + NVENC fallback). The default `source` feature statically # compiles OpenH264 (BSD-2) — no system lib, builds on MSVC; nasm on PATH adds the SIMD fast path. diff --git a/crates/punktfunk-host/src/capture.rs b/crates/punktfunk-host/src/capture.rs index 3e91a67..5532bb8 100644 --- a/crates/punktfunk-host/src/capture.rs +++ b/crates/punktfunk-host/src/capture.rs @@ -328,3 +328,5 @@ pub mod dxgi; mod linux; #[cfg(target_os = "windows")] pub mod wgc; +#[cfg(target_os = "windows")] +pub mod wgc_relay; diff --git a/crates/punktfunk-host/src/capture/wgc_relay.rs b/crates/punktfunk-host/src/capture/wgc_relay.rs new file mode 100644 index 0000000..a347375 --- /dev/null +++ b/crates/punktfunk-host/src/capture/wgc_relay.rs @@ -0,0 +1,389 @@ +//! Host-side WGC helper relay (Windows two-process secure-desktop design, +//! docs/windows-secure-desktop.md — step 4). +//! +//! WGC won't activate under the SYSTEM account, so the SYSTEM host can't capture the normal desktop +//! itself. Instead it spawns `m3-host wgc-helper` in the **interactive user session** (so WGC works) +//! via `CreateProcessAsUserW`, with the helper's **stdout** redirected to an anonymous pipe the host +//! reads. The helper ships framed Annex-B access units; this module parses them back into AUs the +//! host relays onto the live QUIC session (same `EncodedFrame` flow, just sourced over a pipe instead +//! of a local encoder). A second pipe carries a tiny **control** channel to the helper (stdin: force +//! keyframe), and the helper's **stderr** is forwarded line-by-line into host tracing so its logs are +//! visible from the SYSTEM host's console. +//! +//! Wire framing (must match `wgc_helper::write_au`): per AU +//! `[u32 magic "PFAU" LE][u32 len LE][u64 pts_ns LE][u8 keyframe][len bytes data]`. + +use crate::capture::dxgi::WinCaptureTarget; +use anyhow::{bail, Context, Result}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::Mutex; +use windows::core::PWSTR; +use windows::Win32::Foundation::SetHandleInformation; +use windows::Win32::Foundation::{CloseHandle, HANDLE}; +use windows::Win32::Foundation::{HANDLE_FLAGS, HANDLE_FLAG_INHERIT}; +use windows::Win32::Security::{ + DuplicateTokenEx, SecurityImpersonation, TokenPrimary, SECURITY_ATTRIBUTES, TOKEN_ALL_ACCESS, +}; +use windows::Win32::System::Environment::{CreateEnvironmentBlock, DestroyEnvironmentBlock}; +use windows::Win32::System::Pipes::CreatePipe; +use windows::Win32::System::RemoteDesktop::{WTSGetActiveConsoleSessionId, WTSQueryUserToken}; +use windows::Win32::System::Threading::{ + CreateProcessAsUserW, TerminateProcess, CREATE_NO_WINDOW, CREATE_UNICODE_ENVIRONMENT, + PROCESS_INFORMATION, STARTF_USESTDHANDLES, STARTUPINFOW, +}; + +/// Must match [`crate::wgc_helper`]'s `AU_MAGIC` ("PFAU"). +const AU_MAGIC: u32 = 0x5046_4155; + +/// One access unit relayed from the helper, in the helper's (= the host's, same machine) monotonic +/// clock — `pts_ns` is directly comparable to the host's `now_ns()`. +pub struct RelayAu { + pub data: Vec, + pub pts_ns: u64, + pub keyframe: bool, +} + +/// A running USER-session WGC helper whose AUs the SYSTEM host relays. Drop kills the child + closes +/// the pipes; the reader threads then end on the broken pipe. +pub struct HelperRelay { + proc: HANDLE, + thread: HANDLE, + /// Host write end of the helper's stdin — control commands (force keyframe). Mutex so the relay + /// can be shared while the encode thread requests keyframes. + stdin_w: Mutex, + /// Parsed AUs from the helper's stdout reader thread. + rx: Receiver, +} + +// HANDLEs are just kernel handle values; we own them for the relay's lifetime and close them on Drop. +unsafe impl Send for HelperRelay {} +unsafe impl Sync for HelperRelay {} + +/// Control byte on the helper's stdin: force the next encoded frame to be an IDR (client decode +/// recovery). Mirrors `enc.request_keyframe()` in the single-process path. +const CTL_KEYFRAME: u8 = 0x01; + +impl HelperRelay { + /// Spawn the helper in the interactive user session and start relaying its AUs. `target` is the + /// SudoVDA output the host already created (captured by GDI name only — the helper never touches + /// display topology). `(w, h, hz)` is the negotiated mode; `bitrate_kbps` the negotiated bitrate. + pub fn spawn( + target: &WinCaptureTarget, + mode: (u32, u32, u32), + bitrate_kbps: u32, + bit_depth: u8, + ) -> Result { + let exe = std::env::current_exe().context("current_exe for helper spawn")?; + let exe = exe.to_string_lossy().into_owned(); + let (w, h, hz) = mode; + // CreateProcessAsUserW takes a single mutable command line (argv[0] = exe). + let cmdline = format!( + "\"{exe}\" wgc-helper --gdi \"{}\" --target-id {} --mode {w}x{h}x{hz} --bitrate {bitrate_kbps} --bit-depth {bit_depth}", + target.gdi_name, target.target_id + ); + tracing::info!(cmd = %cmdline, "spawning WGC helper in user session"); + + unsafe { spawn_inner(&cmdline, w, h, hz) } + } + + /// Receive the next relayed AU. Distinguishes a `Timeout` (helper slow/stalled — keep waiting) + /// from `Disconnected` (helper exited → its stdout closed → reader thread ended → channel + /// dropped), which returns *immediately* and means the relay must stop, not spin. + pub fn recv_timeout( + &self, + dur: std::time::Duration, + ) -> Result { + self.rx.recv_timeout(dur) + } + + /// Ask the helper's encoder for an IDR on the next frame (client decode recovery). Best-effort: + /// a write failure means the helper is gone — the caller's recv loop will see the disconnect. + pub fn request_keyframe(&self) { + let h = self.stdin_w.lock().unwrap(); + let mut written = 0u32; + unsafe { + let _ = windows::Win32::Storage::FileSystem::WriteFile( + *h, + Some(&[CTL_KEYFRAME]), + Some(&mut written), + None, + ); + } + } +} + +impl Drop for HelperRelay { + fn drop(&mut self) { + unsafe { + // Terminate the child first so its WGC capture + NVENC session tear down, then close our + // handles (the reader threads end on the resulting broken pipe). + let _ = TerminateProcess(self.proc, 1); + let _ = CloseHandle(*self.stdin_w.lock().unwrap()); + let _ = CloseHandle(self.proc); + let _ = CloseHandle(self.thread); + } + tracing::info!("WGC helper relay torn down"); + } +} + +/// Inheritable anonymous pipe (read, write). The caller marks whichever end the host keeps as +/// non-inheritable so the child only inherits its own end. +unsafe fn make_pipe() -> Result<(HANDLE, HANDLE)> { + let mut read = HANDLE::default(); + let mut write = HANDLE::default(); + let sa = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::() as u32, + lpSecurityDescriptor: std::ptr::null_mut(), + bInheritHandle: true.into(), + }; + CreatePipe(&mut read, &mut write, Some(&sa), 0).context("CreatePipe")?; + Ok((read, write)) +} + +/// Mark a handle non-inheritable (the host keeps it; the child must not get a copy). +unsafe fn no_inherit(h: HANDLE) { + let _ = SetHandleInformation(h, HANDLE_FLAG_INHERIT.0, HANDLE_FLAGS(0)); +} + +unsafe fn spawn_inner(cmdline: &str, w: u32, h: u32, hz: u32) -> Result { + // The user token of the active console session (requires the host to be SYSTEM). + let session = WTSGetActiveConsoleSessionId(); + if session == 0xFFFF_FFFF { + bail!("no active console session (WTSGetActiveConsoleSessionId)"); + } + let mut user_token = HANDLE::default(); + WTSQueryUserToken(session, &mut user_token) + .context("WTSQueryUserToken (host must run as SYSTEM)")?; + + // A primary token for CreateProcessAsUserW. + let mut primary = HANDLE::default(); + let dup = DuplicateTokenEx( + user_token, + TOKEN_ALL_ACCESS, + None, + SecurityImpersonation, + TokenPrimary, + &mut primary, + ); + let _ = CloseHandle(user_token); + dup.context("DuplicateTokenEx(TokenPrimary)")?; + + // The user's environment block (PATH, USERPROFILE, …) so the helper resolves config + DLLs. + let mut env_block: *mut core::ffi::c_void = std::ptr::null_mut(); + let _ = CreateEnvironmentBlock(&mut env_block, Some(primary), false); + + // Three pipes: stdout (helper→host AUs), stdin (host→helper control), stderr (helper→host logs). + let (out_r, out_w) = make_pipe().context("stdout pipe")?; + let (in_r, in_w) = make_pipe().context("stdin pipe")?; + let (err_r, err_w) = make_pipe().context("stderr pipe")?; + // The host keeps out_r / in_w / err_r — none inheritable; the child inherits out_w/in_r/err_w. + no_inherit(out_r); + no_inherit(in_w); + no_inherit(err_r); + + let mut si = STARTUPINFOW { + cb: std::mem::size_of::() as u32, + dwFlags: STARTF_USESTDHANDLES, + hStdInput: in_r, + hStdOutput: out_w, + hStdError: err_w, + ..Default::default() + }; + // WGC needs the interactive desktop. + let mut desktop: Vec = "winsta0\\default\0".encode_utf16().collect(); + si.lpDesktop = PWSTR(desktop.as_mut_ptr()); + + let mut cmd: Vec = cmdline.encode_utf16().chain(std::iter::once(0)).collect(); + let mut pi = PROCESS_INFORMATION::default(); + + let created = CreateProcessAsUserW( + Some(primary), + None, + Some(PWSTR(cmd.as_mut_ptr())), + None, + None, + true, // inherit handles (the child's std ends) + CREATE_UNICODE_ENVIRONMENT | CREATE_NO_WINDOW, + Some(env_block), + None, + &si, + &mut pi, + ); + + // Clean up regardless of outcome: the child now owns its inherited ends; close our copies. + let _ = CloseHandle(out_w); + let _ = CloseHandle(in_r); + let _ = CloseHandle(err_w); + if !env_block.is_null() { + let _ = DestroyEnvironmentBlock(env_block); + } + let _ = CloseHandle(primary); + + if let Err(e) = created { + let _ = CloseHandle(out_r); + let _ = CloseHandle(in_w); + let _ = CloseHandle(err_r); + return Err(e).context("CreateProcessAsUserW(wgc-helper)"); + } + tracing::info!(pid = pi.dwProcessId, mode = %format!("{w}x{h}@{hz}"), "WGC helper spawned"); + + // stderr → host tracing, line by line. + let err_handle = HandleReader(err_r); + std::thread::Builder::new() + .name("wgc-helper-log".into()) + .spawn(move || { + let r = BufReader::new(err_handle); + for line in r.lines() { + match line { + Ok(l) if !l.trim().is_empty() => tracing::info!(target: "wgc_helper", "{l}"), + Ok(_) => {} + Err(_) => break, + } + } + }) + .ok(); + + // stdout → parsed AUs. Bounded so a stalled relay applies backpressure (the pipe then fills and + // the helper blocks on write — the same backpressure the single-process channel gives). + let (tx, rx) = std::sync::mpsc::sync_channel::(3); + let out_handle = HandleReader(out_r); + std::thread::Builder::new() + .name("wgc-helper-au".into()) + .spawn(move || au_reader(out_handle, tx)) + .ok(); + + Ok(HelperRelay { + proc: pi.hProcess, + thread: pi.hThread, + stdin_w: Mutex::new(in_w), + rx, + }) +} + +/// Parse the AU framing off the helper's stdout and forward each AU. Ends (returns) when the pipe +/// breaks (helper exit) or the channel's receiver is dropped (relay torn down). +fn au_reader(mut r: HandleReader, tx: SyncSender) { + loop { + let mut hdr = [0u8; 4 + 4 + 8 + 1]; + if r.read_exact(&mut hdr).is_err() { + break; + } + let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]); + if magic != AU_MAGIC { + tracing::error!( + magic = format!("{magic:#x}"), + "WGC helper AU stream desync — aborting relay" + ); + break; + } + let len = u32::from_le_bytes([hdr[4], hdr[5], hdr[6], hdr[7]]) as usize; + let pts_ns = u64::from_le_bytes([ + hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15], + ]); + let keyframe = hdr[16] != 0; + // Bound the allocation — a corrupt length must not OOM the host. 64 MiB is far above any real + // AU (a 5K keyframe is a few MB). + if len > 64 * 1024 * 1024 { + tracing::error!(len, "WGC helper AU length implausible — aborting relay"); + break; + } + let mut data = vec![0u8; len]; + if r.read_exact(&mut data).is_err() { + break; + } + if tx + .send(RelayAu { + data, + pts_ns, + keyframe, + }) + .is_err() + { + break; // relay dropped + } + } +} + +/// Minimal `Read` over a Win32 pipe HANDLE (the windows crate doesn't impl `Read` on HANDLE). +struct HandleReader(HANDLE); +unsafe impl Send for HandleReader {} +impl Read for HandleReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let mut read = 0u32; + let ok = unsafe { + windows::Win32::Storage::FileSystem::ReadFile(self.0, Some(buf), Some(&mut read), None) + }; + match ok { + Ok(()) => Ok(read as usize), + // A broken pipe (helper exited) reads as ERROR_BROKEN_PIPE → report EOF (0). + Err(_) => Ok(0), + } + } +} +impl Drop for HandleReader { + fn drop(&mut self) { + unsafe { + let _ = CloseHandle(self.0); + } + } +} + +/// Is this process running as the LOCAL SYSTEM account? Used to decide whether the two-process +/// secure-desktop path applies (only SYSTEM can `WTSQueryUserToken` + capture the Winlogon desktop). +pub fn running_as_system() -> bool { + use windows::Win32::Security::{GetTokenInformation, TokenUser, TOKEN_QUERY, TOKEN_USER}; + use windows::Win32::System::Threading::{GetCurrentProcess, OpenProcessToken}; + unsafe { + let mut token = HANDLE::default(); + if OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &mut token).is_err() { + return false; + } + let mut len = 0u32; + let _ = GetTokenInformation(token, TokenUser, None, 0, &mut len); + if len == 0 { + let _ = CloseHandle(token); + return false; + } + let mut buf = vec![0u8; len as usize]; + let ok = GetTokenInformation( + token, + TokenUser, + Some(buf.as_mut_ptr() as *mut _), + len, + &mut len, + ) + .is_ok(); + let _ = CloseHandle(token); + if !ok { + return false; + } + let tu = &*(buf.as_ptr() as *const TOKEN_USER); + // The well-known LocalSystem SID is S-1-5-18. + is_local_system_sid(tu.User.Sid) + } +} + +/// True iff `sid` is S-1-5-18 (LocalSystem). +unsafe fn is_local_system_sid(sid: windows::Win32::Security::PSID) -> bool { + use windows::Win32::Security::{ + GetSidIdentifierAuthority, GetSidSubAuthority, GetSidSubAuthorityCount, IsValidSid, + }; + if !IsValidSid(sid).as_bool() { + return false; + } + let auth = GetSidIdentifierAuthority(sid); + if auth.is_null() { + return false; + } + // NT Authority = {0,0,0,0,0,5}. + let a = (*auth).Value; + if a != [0, 0, 0, 0, 0, 5] { + return false; + } + let count = *GetSidSubAuthorityCount(sid); + if count != 1 { + return false; + } + *GetSidSubAuthority(sid, 0) == 18 // SECURITY_LOCAL_SYSTEM_RID +} diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 95e0420..48e7748 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -1968,6 +1968,26 @@ fn virtual_stream( probe_rx: std::sync::mpsc::Receiver, probe_result_tx: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { + // Windows two-process secure-desktop path: when the host runs as SYSTEM (required for the secure + // desktop + SendInput), WGC can't activate in-process, so we capture the normal desktop via a + // helper spawned in the user session and relay its AUs. (Single-process WGC/DDA is used as the + // user, and stays the path on Linux.) See docs/windows-secure-desktop.md. + #[cfg(target_os = "windows")] + if should_use_helper() { + return virtual_stream_relay( + session, + mode, + seconds, + stop, + reconfig, + keyframe, + compositor, + bitrate_kbps, + bit_depth, + probe_rx, + probe_result_tx, + ); + } tracing::info!( compositor = compositor.id(), ?mode, @@ -2178,6 +2198,186 @@ fn virtual_stream( Ok(()) } +/// Should this host take the two-process (SYSTEM host + user-session WGC helper) path? Yes when it's +/// running as SYSTEM — the only account that can capture the secure desktop + drive SendInput on it, +/// and the account under which in-process WGC won't activate. `PUNKTFUNK_FORCE_HELPER` forces it on +/// (for testing the relay as a normal user); `PUNKTFUNK_NO_HELPER` forces it off. +#[cfg(target_os = "windows")] +fn should_use_helper() -> bool { + if std::env::var_os("PUNKTFUNK_NO_HELPER").is_some() { + return false; + } + std::env::var_os("PUNKTFUNK_FORCE_HELPER").is_some() + || crate::capture::wgc_relay::running_as_system() +} + +/// Windows two-process video stream: the SYSTEM host creates the SudoVDA virtual output (and holds +/// its keepalive = the sole topology/isolation owner), spawns the WGC helper in the user session to +/// capture+encode it, and relays the helper's AUs onto the QUIC data plane via the same send thread +/// as the single-process path. Reconfigure rebuilds the output + re-spawns the helper at the new +/// mode; keyframe requests are forwarded to the helper's encoder over its control channel. +/// +/// Step 4 (this function): the normal-desktop relay. Step 5 adds the DesktopWatcher-driven mux that +/// switches to the host's own DDA encoder on the secure (Winlogon) desktop. +#[cfg(target_os = "windows")] +#[allow(clippy::too_many_arguments)] +fn virtual_stream_relay( + session: Session, + mode: punktfunk_core::Mode, + seconds: u32, + stop: Arc, + reconfig: &std::sync::mpsc::Receiver, + keyframe: &std::sync::mpsc::Receiver<()>, + compositor: crate::vdisplay::Compositor, + bitrate_kbps: u32, + bit_depth: u8, + probe_rx: std::sync::mpsc::Receiver, + probe_result_tx: tokio::sync::mpsc::UnboundedSender, +) -> Result<()> { + use crate::capture::wgc_relay::HelperRelay; + tracing::info!( + ?mode, + bitrate_kbps, + bit_depth, + "punktfunk/1 two-process stream (SYSTEM host + user-session WGC helper)" + ); + + let mut vd = crate::vdisplay::open(compositor)?; + + // Create the SudoVDA output + spawn a helper capturing it by GDI name. Returns the keepalive + // (held for the output's life — the sole isolation owner) and the running relay. + type Built = (Box, HelperRelay); + let build = |vd: &mut Box, + mode: punktfunk_core::Mode| + -> Result { + let vout = vd.create(mode).context("create virtual output")?; + let effective_hz = vout + .preferred_mode + .map(|(_, _, hz)| hz) + .filter(|&hz| hz > 0) + .unwrap_or(mode.refresh_hz); + let target = vout.win_capture.clone().ok_or_else(|| { + anyhow!("SudoVDA target not yet an active display (needs a WDDM GPU to activate it)") + })?; + let relay = HelperRelay::spawn( + &target, + (mode.width, mode.height, effective_hz), + bitrate_kbps, + bit_depth, + ) + .context("spawn WGC helper")?; + Ok((vout.keepalive, relay)) + }; + + let (mut _keepalive, mut relay) = build(&mut vd, mode)?; + + let perf = std::env::var("PUNKTFUNK_PERF").is_ok(); + let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(128) + * 1024; + + // Same encode|send split as the single-process path: this thread relays AUs, a dedicated send + // thread owns the Session and does FEC+seal+paced-send. + let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::(3); + let send_thread = std::thread::Builder::new() + .name("punktfunk-send".into()) + .spawn({ + let stop = stop.clone(); + move || { + send_loop( + session, + frame_rx, + probe_rx, + probe_result_tx, + stop, + perf, + burst_cap, + ) + } + }) + .context("spawn send thread")?; + + let mut interval = std::time::Duration::from_secs_f64(1.0 / mode.refresh_hz.max(1) as f64); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64); + let mut sent: u64 = 0; + 'outer: while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline { + // Mode switch: rebuild the output + re-spawn the helper at the new mode (drop the old relay + + // keepalive only after the new pair is up, so a failed rebuild keeps the current stream). + let mut want = None; + while let Ok(m) = reconfig.try_recv() { + want = Some(m); + } + if let Some(new_mode) = want { + tracing::info!(?new_mode, "two-process: rebuilding for mode switch"); + match build(&mut vd, new_mode) { + Ok((ka, rl)) => { + relay = rl; // drops the old relay (kills old helper) ... + _keepalive = ka; // ... then releases the old output + interval = + std::time::Duration::from_secs_f64(1.0 / new_mode.refresh_hz.max(1) as f64); + } + Err(e) => { + tracing::error!(error = %format!("{e:#}"), ?new_mode, + "two-process mode-switch rebuild failed — staying on the current mode"); + } + } + } + // Forward client decode-recovery keyframe requests to the helper's encoder. + let mut want_kf = false; + while keyframe.try_recv().is_ok() { + want_kf = true; + } + if want_kf { + tracing::debug!("two-process: forwarding keyframe request to helper"); + relay.request_keyframe(); + } + + // Pull the next relayed AU. A timeout means the helper stalled (or is mid-respawn); loop so + // reconfig/keyframe/stop still get serviced. Disconnected means the helper exited — end the + // stream (step 6 adds a relaunch watchdog; for now a dead helper ends the session). + let au = match relay.recv_timeout(std::time::Duration::from_millis(500)) { + Ok(au) => au, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + if stop.load(Ordering::SeqCst) { + break; + } + tracing::warn!("two-process: no AU from helper within 500ms"); + continue; + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + tracing::error!("two-process: WGC helper exited — ending stream"); + break; + } + }; + let flags = if au.keyframe { + (FLAG_PIC | FLAG_SOF) as u32 + } else { + FLAG_PIC as u32 + }; + // The helper's pts_ns is on this machine's monotonic clock (same `now_ns()` source), so it is + // directly usable as the capture timestamp. encode_us = pipe-relay latency from capture. + let capture_ns = au.pts_ns; + let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32; + let msg = FrameMsg { + data: au.data, + capture_ns, + flags, + deadline: std::time::Instant::now() + interval, + encode_us, + }; + if frame_tx.send(msg).is_err() { + break 'outer; // send thread gone + } + sent += 1; + } + drop(frame_tx); + let _ = send_thread.join(); + tracing::info!(sent, "punktfunk/1 two-process stream complete"); + Ok(()) +} + /// One mode's capture/encode pipeline: (capturer, encoder, first frame, frame interval). /// Dropping the capturer tears down the PipeWire stream and the virtual output with it. type Pipeline = ( diff --git a/crates/punktfunk-host/src/main.rs b/crates/punktfunk-host/src/main.rs index 1a3d847..883efa3 100644 --- a/crates/punktfunk-host/src/main.rs +++ b/crates/punktfunk-host/src/main.rs @@ -223,6 +223,7 @@ fn real_main() -> Result<()> { bitrate_kbps: get("--bitrate") .and_then(|s| s.parse().ok()) .unwrap_or(20000), + bit_depth: get("--bit-depth").and_then(|s| s.parse().ok()).unwrap_or(8), }) } Some("-h") | Some("--help") | Some("help") | None => { diff --git a/crates/punktfunk-host/src/wgc_helper.rs b/crates/punktfunk-host/src/wgc_helper.rs index cd0ccdd..d580e89 100644 --- a/crates/punktfunk-host/src/wgc_helper.rs +++ b/crates/punktfunk-host/src/wgc_helper.rs @@ -15,7 +15,9 @@ use crate::capture::{dxgi::WinCaptureTarget, wgc::WgcCapturer, Capturer}; use crate::encode::{self, Codec}; use anyhow::{Context, Result}; -use std::io::Write; +use std::io::{Read, Write}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; pub struct HelperOptions { pub target_id: u32, @@ -24,11 +26,18 @@ pub struct HelperOptions { pub height: u32, pub fps: u32, pub bitrate_kbps: u32, + /// Negotiated encode bit depth (8, or 10 = HEVC Main10). HDR auto-upgrades to 10 from the + /// captured frame's `Rgb10a2` format regardless. + pub bit_depth: u8, } /// AU framing magic + version, so the host can resync / detect a helper crash on its stdout stream. const AU_MAGIC: u32 = 0x5046_4155; // "PFAU" +/// Control byte the host writes on our stdin to force the next frame to be an IDR. Must match +/// `wgc_relay::CTL_KEYFRAME`. +const CTL_KEYFRAME: u8 = 0x01; + pub fn run(opts: HelperOptions) -> Result<()> { tracing::info!( target_id = opts.target_id, @@ -59,11 +68,34 @@ pub fn run(opts: HelperOptions) -> Result<()> { h, opts.fps, opts.bitrate_kbps as u64 * 1000, - false, // not cuda - 8, // bit depth: HDR auto-upgrades to Main10 from the Rgb10a2 frame + false, // not cuda + opts.bit_depth, // 8, or 10 = Main10 (HDR auto-upgrades from the Rgb10a2 frame regardless) ) .context("open NVENC")?; + // Control channel: the host writes a single byte on our stdin to force an IDR (client decode + // recovery), mirroring `enc.request_keyframe()` in the single-process path. A reader thread sets + // a flag the encode loop checks; stdin EOF (host gone) just stops the thread. + let kf = Arc::new(AtomicBool::new(false)); + { + let kf = kf.clone(); + std::thread::Builder::new() + .name("wgc-helper-ctl".into()) + .spawn(move || { + let mut stdin = std::io::stdin(); + let mut byte = [0u8; 1]; + while let Ok(n) = stdin.read(&mut byte) { + if n == 0 { + break; // host closed our stdin + } + if byte[0] == CTL_KEYFRAME { + kf.store(true, Ordering::Relaxed); + } + } + }) + .ok(); + } + // Binary stdout — lock it once + write framed AUs. A short write / broken pipe means the host // (parent) went away → exit cleanly so the host's relaunch watchdog can respawn us. let stdout = std::io::stdout(); @@ -71,6 +103,9 @@ pub fn run(opts: HelperOptions) -> Result<()> { let mut frame = first; loop { + if kf.swap(false, Ordering::Relaxed) { + enc.request_keyframe(); + } enc.submit(&frame).context("encoder submit")?; while let Some(au) = enc.poll().context("encoder poll")? { if write_au(&mut out, &au).is_err() {