feat(host/windows): two-process secure-desktop step 5 — DDA mux on Winlogon
`virtual_stream_relay` now muxes the AU source by input desktop. A DesktopWatcher (SYSTEM-only Winlogon-name poll) drives it: the user-session WGC helper relay feeds the normal (Default) desktop; the host's OWN DDA capturer+encoder — opened lazily on the first secure transition, on the same SudoVDA target with a no-op keepalive (the host still holds the real isolation owner) — captures the secure (Winlogon: UAC/lock/login) desktop that WGC can't see. Every switch latches "wait for IDR" and forces the now-active source to emit a keyframe (the two encoders keep independent infinite-GOP state, so the client must resume on an IDR); returning to the helper also drains its stale buffered AUs first. Reconfigure drops the stale-target DDA; keyframe requests route to the live source. Send path (FEC/seal/paced-send) unchanged. Also: wgc_relay gains try_recv (drain on switch-back); open_dda takes dims as args (avoids a closure borrow of the reassigned cur_mode); the forward! macro returns bool with `break 'outer` at the call site (no in-macro label hygiene). cfg-gated windows-only. Live validation (UAC switch over a session) pending. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -97,6 +97,12 @@ impl HelperRelay {
|
||||
self.rx.recv_timeout(dur)
|
||||
}
|
||||
|
||||
/// Non-blocking receive — used to drain stale buffered AUs (encoded while the secure desktop was
|
||||
/// the live source) before resuming the relay. `Ok` while AUs remain, `Err` once empty.
|
||||
pub fn try_recv(&self) -> Result<RelayAu, std::sync::mpsc::TryRecvError> {
|
||||
self.rx.try_recv()
|
||||
}
|
||||
|
||||
/// Ask the helper's encoder for an IDR on the next frame (client decode recovery). Best-effort:
|
||||
/// a write failure means the helper is gone — the caller's recv loop will see the disconnect.
|
||||
pub fn request_keyframe(&self) {
|
||||
|
||||
+195
-50
@@ -2213,12 +2213,14 @@ fn should_use_helper() -> bool {
|
||||
|
||||
/// Windows two-process video stream: the SYSTEM host creates the SudoVDA virtual output (and holds
|
||||
/// its keepalive = the sole topology/isolation owner), spawns the WGC helper in the user session to
|
||||
/// capture+encode it, and relays the helper's AUs onto the QUIC data plane via the same send thread
|
||||
/// as the single-process path. Reconfigure rebuilds the output + re-spawns the helper at the new
|
||||
/// mode; keyframe requests are forwarded to the helper's encoder over its control channel.
|
||||
///
|
||||
/// Step 4 (this function): the normal-desktop relay. Step 5 adds the DesktopWatcher-driven mux that
|
||||
/// switches to the host's own DDA encoder on the secure (Winlogon) desktop.
|
||||
/// capture+encode the NORMAL desktop, and relays the helper's AUs onto the QUIC data plane via the
|
||||
/// same send thread as the single-process path. A [`DesktopWatcher`](crate::capture::desktop_watch)
|
||||
/// muxes the source: while the input desktop is Winlogon (UAC / lock / login — which WGC can't
|
||||
/// capture), the host captures it with its OWN DDA encoder; back on Default it resumes the relay.
|
||||
/// Every source switch latches a "wait for IDR" so the client's decoder resumes on a keyframe (the
|
||||
/// two encoders keep independent infinite-GOP state). Reconfigure rebuilds the output + re-spawns the
|
||||
/// helper at the new mode (and drops the stale-target DDA); keyframe requests forward to the active
|
||||
/// source.
|
||||
#[cfg(target_os = "windows")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn virtual_stream_relay(
|
||||
@@ -2234,7 +2236,9 @@ fn virtual_stream_relay(
|
||||
probe_rx: std::sync::mpsc::Receiver<ProbeRequest>,
|
||||
probe_result_tx: tokio::sync::mpsc::UnboundedSender<ProbeResult>,
|
||||
) -> Result<()> {
|
||||
use crate::capture::dxgi::WinCaptureTarget;
|
||||
use crate::capture::wgc_relay::HelperRelay;
|
||||
use crate::capture::Capturer; // trait methods (set_active/next_frame) on the concrete DuplCapturer
|
||||
tracing::info!(
|
||||
?mode,
|
||||
bitrate_kbps,
|
||||
@@ -2245,8 +2249,9 @@ fn virtual_stream_relay(
|
||||
let mut vd = crate::vdisplay::open(compositor)?;
|
||||
|
||||
// Create the SudoVDA output + spawn a helper capturing it by GDI name. Returns the keepalive
|
||||
// (held for the output's life — the sole isolation owner) and the running relay.
|
||||
type Built = (Box<dyn Send>, HelperRelay);
|
||||
// (held for the output's life — the sole isolation owner), the running relay, the capture target
|
||||
// (so the host can also open DDA on it for the secure desktop), and the achieved refresh.
|
||||
type Built = (Box<dyn Send>, HelperRelay, WinCaptureTarget, u32);
|
||||
let build = |vd: &mut Box<dyn crate::vdisplay::VirtualDisplay>,
|
||||
mode: punktfunk_core::Mode|
|
||||
-> Result<Built> {
|
||||
@@ -2266,10 +2271,49 @@ fn virtual_stream_relay(
|
||||
bit_depth,
|
||||
)
|
||||
.context("spawn WGC helper")?;
|
||||
Ok((vout.keepalive, relay))
|
||||
Ok((vout.keepalive, relay, target, effective_hz))
|
||||
};
|
||||
|
||||
let (mut _keepalive, mut relay) = build(&mut vd, mode)?;
|
||||
let (mut _keepalive, mut relay, mut target, mut effective_hz) = build(&mut vd, mode)?;
|
||||
let mut cur_mode = mode;
|
||||
|
||||
// The host's own DDA capturer+encoder for the SECURE (Winlogon) desktop, which WGC — and thus the
|
||||
// helper — cannot capture. Opened lazily on the first secure transition (so a session that never
|
||||
// hits a UAC/lock screen never pays for a second NVENC session), then kept for fast re-switch.
|
||||
struct DdaPipe {
|
||||
cap: Box<dyn crate::capture::Capturer>,
|
||||
enc: Box<dyn crate::encode::Encoder>,
|
||||
frame: crate::capture::CapturedFrame,
|
||||
}
|
||||
// Note: takes the dimensions as args rather than capturing `cur_mode` — `cur_mode` is reassigned
|
||||
// on reconfig, and a closure holding a shared borrow of it for the whole fn would forbid that.
|
||||
let open_dda = |target: &WinCaptureTarget, w: u32, h: u32, hz: u32| -> Result<DdaPipe> {
|
||||
// The host already holds the real keepalive (sole isolation owner), so DDA gets a no-op one.
|
||||
let mut cap = crate::capture::dxgi::DuplCapturer::open(
|
||||
target.clone(),
|
||||
Some((w, h, hz)),
|
||||
Box::new(()),
|
||||
)
|
||||
.context("open DDA for secure desktop")?;
|
||||
cap.set_active(true);
|
||||
let frame = cap.next_frame().context("DDA first frame")?;
|
||||
let enc = crate::encode::open_video(
|
||||
crate::encode::Codec::H265,
|
||||
frame.format,
|
||||
frame.width,
|
||||
frame.height,
|
||||
hz,
|
||||
bitrate_kbps as u64 * 1000,
|
||||
frame.is_cuda(),
|
||||
bit_depth,
|
||||
)
|
||||
.context("open NVENC for DDA")?;
|
||||
Ok(DdaPipe {
|
||||
cap: Box::new(cap),
|
||||
enc,
|
||||
frame,
|
||||
})
|
||||
};
|
||||
|
||||
let perf = std::env::var("PUNKTFUNK_PERF").is_ok();
|
||||
let burst_cap = std::env::var("PUNKTFUNK_PACE_BURST_KB")
|
||||
@@ -2299,12 +2343,52 @@ fn virtual_stream_relay(
|
||||
})
|
||||
.context("spawn send thread")?;
|
||||
|
||||
let mut interval = std::time::Duration::from_secs_f64(1.0 / mode.refresh_hz.max(1) as f64);
|
||||
// The authoritative Default↔Winlogon signal (requires SYSTEM to read the Winlogon desktop name).
|
||||
let watcher = crate::capture::desktop_watch::DesktopWatcher::start();
|
||||
|
||||
let mut interval = std::time::Duration::from_secs_f64(1.0 / effective_hz.max(1) as f64);
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(seconds as u64);
|
||||
let mut sent: u64 = 0;
|
||||
// Mux state: which source is live, the lazily-opened DDA pipe, a DDA pacing clock, and a
|
||||
// "wait for the next IDR before forwarding" latch set on every source switch (the client's
|
||||
// decoder must resume on a keyframe — the two encoders keep independent infinite-GOP state).
|
||||
let mut dda: Option<DdaPipe> = None;
|
||||
let mut on_secure = false;
|
||||
let mut next = std::time::Instant::now();
|
||||
let mut await_idr = false;
|
||||
|
||||
// Build a FrameMsg + hand it to the send thread; returns false if the send thread is gone (caller
|
||||
// breaks the loop). Kept as a macro (not a closure) so each use borrows `frame_tx`/`sent`/`interval`
|
||||
// at its own site without a long-lived capture, and `break 'outer` stays a literal at the call site
|
||||
// (a `break 'outer` inside the macro body risks label-hygiene resolution failures).
|
||||
macro_rules! forward {
|
||||
($data:expr, $capture_ns:expr, $keyframe:expr) => {{
|
||||
let flags = if $keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
let capture_ns = $capture_ns;
|
||||
let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32;
|
||||
let msg = FrameMsg {
|
||||
data: $data,
|
||||
capture_ns,
|
||||
flags,
|
||||
deadline: std::time::Instant::now() + interval,
|
||||
encode_us,
|
||||
};
|
||||
let ok = frame_tx.send(msg).is_ok();
|
||||
if ok {
|
||||
sent += 1;
|
||||
}
|
||||
ok
|
||||
}};
|
||||
}
|
||||
|
||||
'outer: while !stop.load(Ordering::SeqCst) && std::time::Instant::now() < deadline {
|
||||
// Mode switch: rebuild the output + re-spawn the helper at the new mode (drop the old relay +
|
||||
// keepalive only after the new pair is up, so a failed rebuild keeps the current stream).
|
||||
// keepalive only after the new pair is up, so a failed rebuild keeps the current stream). The
|
||||
// DDA pipe (on the old target) is dropped — it reopens on the next secure transition.
|
||||
let mut want = None;
|
||||
while let Ok(m) = reconfig.try_recv() {
|
||||
want = Some(m);
|
||||
@@ -2312,11 +2396,14 @@ fn virtual_stream_relay(
|
||||
if let Some(new_mode) = want {
|
||||
tracing::info!(?new_mode, "two-process: rebuilding for mode switch");
|
||||
match build(&mut vd, new_mode) {
|
||||
Ok((ka, rl)) => {
|
||||
Ok((ka, rl, tg, hz)) => {
|
||||
relay = rl; // drops the old relay (kills old helper) ...
|
||||
_keepalive = ka; // ... then releases the old output
|
||||
interval =
|
||||
std::time::Duration::from_secs_f64(1.0 / new_mode.refresh_hz.max(1) as f64);
|
||||
target = tg;
|
||||
effective_hz = hz;
|
||||
cur_mode = new_mode;
|
||||
dda = None; // old-target DDA is stale; reopen on next secure
|
||||
interval = std::time::Duration::from_secs_f64(1.0 / hz.max(1) as f64);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), ?new_mode,
|
||||
@@ -2324,56 +2411,114 @@ fn virtual_stream_relay(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Forward client decode-recovery keyframe requests to the helper's encoder.
|
||||
// Coalesce client decode-recovery keyframe requests and forward to the active source.
|
||||
let mut want_kf = false;
|
||||
while keyframe.try_recv().is_ok() {
|
||||
want_kf = true;
|
||||
}
|
||||
|
||||
// Source mux: capture the secure (Winlogon) desktop via the host's DDA, the normal desktop via
|
||||
// the helper relay. On a switch, latch await_idr + force the now-active source to emit an IDR
|
||||
// so the client resumes cleanly.
|
||||
let secure = watcher.is_secure();
|
||||
if secure != on_secure {
|
||||
on_secure = secure;
|
||||
await_idr = true;
|
||||
tracing::info!(
|
||||
to = if secure {
|
||||
"secure(DDA)"
|
||||
} else {
|
||||
"normal(WGC relay)"
|
||||
},
|
||||
"two-process: source switch"
|
||||
);
|
||||
if secure {
|
||||
if dda.is_none() {
|
||||
match open_dda(&target, cur_mode.width, cur_mode.height, effective_hz) {
|
||||
Ok(p) => dda = Some(p),
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"),
|
||||
"two-process: DDA open failed — secure desktop will freeze on last frame");
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(d) = dda.as_mut() {
|
||||
d.enc.request_keyframe();
|
||||
}
|
||||
next = std::time::Instant::now();
|
||||
} else {
|
||||
// Returning to the helper: drain stale buffered AUs (encoded while we ignored it) and
|
||||
// force a fresh IDR; await_idr then skips the stale deltas until that IDR arrives.
|
||||
while relay.try_recv().is_ok() {}
|
||||
relay.request_keyframe();
|
||||
}
|
||||
}
|
||||
if want_kf {
|
||||
tracing::debug!("two-process: forwarding keyframe request to helper");
|
||||
relay.request_keyframe();
|
||||
if secure {
|
||||
if let Some(d) = dda.as_mut() {
|
||||
d.enc.request_keyframe();
|
||||
}
|
||||
} else {
|
||||
relay.request_keyframe();
|
||||
}
|
||||
await_idr = true;
|
||||
}
|
||||
|
||||
// Pull the next relayed AU. A timeout means the helper stalled (or is mid-respawn); loop so
|
||||
// reconfig/keyframe/stop still get serviced. Disconnected means the helper exited — end the
|
||||
// stream (step 6 adds a relaunch watchdog; for now a dead helper ends the session).
|
||||
let au = match relay.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(au) => au,
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
if secure {
|
||||
// DDA capture+encode for the secure desktop, paced to the frame interval.
|
||||
let Some(d) = dda.as_mut() else {
|
||||
std::thread::sleep(interval);
|
||||
continue;
|
||||
};
|
||||
if let Some(f) = d.cap.try_latest().context("DDA capture")? {
|
||||
d.frame = f;
|
||||
}
|
||||
let capture_ns = now_ns();
|
||||
d.enc.submit(&d.frame).context("DDA encoder submit")?;
|
||||
next += interval;
|
||||
while let Some(au) = d.enc.poll().context("DDA encoder poll")? {
|
||||
if await_idr && !au.keyframe {
|
||||
continue;
|
||||
}
|
||||
await_idr = false;
|
||||
if !forward!(au.data, capture_ns, au.keyframe) {
|
||||
break 'outer; // send thread gone
|
||||
}
|
||||
}
|
||||
match next.checked_duration_since(std::time::Instant::now()) {
|
||||
Some(dur) => std::thread::sleep(dur),
|
||||
None => next = std::time::Instant::now(),
|
||||
}
|
||||
} else {
|
||||
// Relay the helper's AUs for the normal desktop. Timeout → keep servicing the loop;
|
||||
// Disconnected → the helper exited (step 6 adds the relaunch watchdog).
|
||||
let au = match relay.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(au) => au,
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
tracing::warn!("two-process: no AU from helper within 500ms");
|
||||
continue;
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
||||
tracing::error!("two-process: WGC helper exited — ending stream");
|
||||
break;
|
||||
}
|
||||
tracing::warn!("two-process: no AU from helper within 500ms");
|
||||
continue;
|
||||
};
|
||||
if await_idr && !au.keyframe {
|
||||
continue; // skip stale deltas until the post-switch IDR
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
||||
tracing::error!("two-process: WGC helper exited — ending stream");
|
||||
break;
|
||||
await_idr = false;
|
||||
// The helper's pts_ns is on this machine's monotonic clock (same `now_ns()` source).
|
||||
if !forward!(au.data, au.pts_ns, au.keyframe) {
|
||||
break 'outer; // send thread gone
|
||||
}
|
||||
};
|
||||
let flags = if au.keyframe {
|
||||
(FLAG_PIC | FLAG_SOF) as u32
|
||||
} else {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
// The helper's pts_ns is on this machine's monotonic clock (same `now_ns()` source), so it is
|
||||
// directly usable as the capture timestamp. encode_us = pipe-relay latency from capture.
|
||||
let capture_ns = au.pts_ns;
|
||||
let encode_us = (now_ns().saturating_sub(capture_ns) / 1000) as u32;
|
||||
let msg = FrameMsg {
|
||||
data: au.data,
|
||||
capture_ns,
|
||||
flags,
|
||||
deadline: std::time::Instant::now() + interval,
|
||||
encode_us,
|
||||
};
|
||||
if frame_tx.send(msg).is_err() {
|
||||
break 'outer; // send thread gone
|
||||
}
|
||||
sent += 1;
|
||||
}
|
||||
drop(frame_tx);
|
||||
let _ = send_thread.join();
|
||||
drop(watcher);
|
||||
tracing::info!(sent, "punktfunk/1 two-process stream complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user