Files
punktfunk/clients/windows/src/session.rs
T
enricobuehler 0cc36fa130
windows-msix / package (push) Successful in 1m2s
apple / swift (push) Successful in 54s
windows / build (push) Failing after 1m2s
android / android (push) Failing after 48s
ci / web (push) Failing after 6s
ci / docs-site (push) Failing after 1s
ci / bench (push) Failing after 0s
deb / build-publish (push) Failing after 0s
decky / build-publish (push) Failing after 0s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Failing after 0s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Failing after 1s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Failing after 0s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Failing after 0s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Failing after 0s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Failing after 1s
docker / deploy-docs (push) Has been skipped
ci / rust (push) Failing after 2m0s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Failing after 4m18s
feat(windows-client): D3D11VA zero-copy hw decode + HDR10 present + GUI polish
The client was pure software HEVC decode + CPU swscale->RGBA + a full-frame
dynamic-texture upload every frame -- the reason performance was poor on a GPU
box (the GPU sat idle while the CPU churned). This adds a hardware path, HDR,
and a GUI pass.

Performance -- D3D11VA zero-copy:
- gpu.rs (new): one D3D11 device (hardware + VIDEO_SUPPORT, WARP fallback,
  multithread-protected) shared by decoder and presenter via a Send/Sync
  OnceLock. Sharing is mandatory -- a decoded texture is only bindable on the
  device that created it. windows-rs COM interfaces are !Send/!Sync, so the
  unsafe impl is sound only under the multithread protection + disjoint
  decode(video ctx)/present(immediate ctx) split.
- video.rs: D3d11vaDecoder (raw FFI mirroring the Linux VAAPI module). The
  COM-typed AVD3D11VA{Device,Frames}Context are declared here (stable FFmpeg
  ABI) to avoid ffmpeg-sys binding the d3d11 headers; get_format builds a frames
  ctx with BindFlags=SHADER_RESOURCE so the NV12/P010 array slices are
  sampleable. av_frame_clone guard keeps each surface out of the reuse pool
  until the presenter drops it. Software decode stays as the fallback
  (DecoderPref Auto/Hardware/Software; auto falls back on init/decode error).
- present.rs: shared device; per-plane SRVs over the array slice
  (NV12->R8/R8G8, P010->R16/R16G16) + three pixel shaders (RGBA passthrough,
  NV12/BT.709, P010/BT.2020-PQ). present() now takes the frame by value so the
  GPU surface survives re-presents.

HDR:
- Detected in-band (transfer == SMPTE2084), same signal as the other clients.
  Swapchain flips to R10G10B10A2 + ST.2084 + HDR10 metadata. New Settings toggle
  gates advertising VIDEO_CAP_10BIT|HDR; host still gates 10-bit behind its own
  PUNKTFUNK_10BIT + actual-HDR-content checks.

GUI (windows-reactor):
- Host cards with accent-monogram avatars + colored status pills, InfoBar for
  errors/pairing hints, ToggleSwitch settings (+ HDR, decoder, bitrate), button
  icons, a richer connecting screen, and a stream HUD with GPU/CPU-decode + HDR
  status chips.

Not yet on-glass validated: the Linux dev box can't compile the cfg(windows)
code (ffmpeg/windows crates unfetched; WARP has no hw decode) -- only
cargo fmt checks it here. API shapes verified against the windows-rs/reactor
source and the YUV->RGB coefficients checked by hand, but D3D11VA + shaders +
the GUI need a real build (Windows CI / build VM) and on-glass test on the RTX
box. The host-side HDR encode path is unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 23:16:07 +00:00

294 lines
12 KiB
Rust

//! Session controller: one worker thread runs connect → pump (video pull + decode, audio
//! pull + Opus decode, stats), feeding the UI over channels. The UI keeps the
//! `Arc<NativeClient>` from the `Connected` event for direct input sends (no extra hop on
//! the input path) — `NativeClient` is `Sync`, planes stay one-consumer-per-thread:
//! video+audio here, rumble+hidout on the gamepad thread.
//!
//! Ported from the GTK Linux client; the platform-specific pieces are the video decoder
//! (software-only here) and the audio backend (WASAPI). The pump body is identical.
use crate::audio;
use crate::video::{DecodedFrame, Decoder, DecoderPref};
use punktfunk_core::client::NativeClient;
use punktfunk_core::config::{CompositorPref, GamepadPref, Mode};
use punktfunk_core::PunktfunkError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub struct SessionParams {
pub host: String,
pub port: u16,
pub mode: Mode,
pub compositor: CompositorPref,
pub gamepad: GamepadPref,
pub bitrate_kbps: u32,
/// Stream the default microphone to the host's virtual mic source.
pub mic_enabled: bool,
/// Advertise 10-bit + HDR10 so the host may upgrade HDR content to a Main10/PQ stream.
pub hdr_enabled: bool,
/// Which video decode backend to use (auto/hardware/software).
pub decoder: DecoderPref,
/// Pinned host fingerprint; `None` = trust on first use (caller persists the observed one).
pub pin: Option<[u8; 32]>,
pub identity: (String, String),
}
#[derive(Clone, Copy, Default, PartialEq)]
pub struct Stats {
pub fps: f32,
pub mbps: f32,
pub decode_ms: f32,
/// Median capture→decoded latency over the last window (host-clock corrected).
pub latency_ms: f32,
/// True when decoding on the GPU (D3D11VA zero-copy) vs. CPU (software).
pub hardware: bool,
/// True when the stream is BT.2020 PQ HDR10 (last decoded frame).
pub hdr: bool,
}
pub enum SessionEvent {
Connected {
connector: Arc<NativeClient>,
mode: Mode,
fingerprint: [u8; 32],
},
/// `trust_rejected` is set when the connect failed the TLS trust check (a `Crypto`
/// error): for a pinned connect this is the fingerprint-changed signal, so the UI can
/// offer a re-pair (PIN) path rather than a dead-end error.
Failed {
msg: String,
trust_rejected: bool,
},
Ended(Option<String>),
Stats(Stats),
}
pub struct SessionHandle {
pub events: async_channel::Receiver<SessionEvent>,
pub frames: async_channel::Receiver<DecodedFrame>,
pub stop: Arc<AtomicBool>,
}
pub fn start(params: SessionParams) -> SessionHandle {
let (ev_tx, ev_rx) = async_channel::unbounded();
// Tiny frame queue, newest wins: force_send displaces the oldest when the UI lags.
let (frame_tx, frame_rx) = async_channel::bounded(2);
let stop = Arc::new(AtomicBool::new(false));
let stop_w = stop.clone();
std::thread::Builder::new()
.name("punktfunk-session".into())
.spawn(move || pump(params, ev_tx, frame_tx, stop_w))
.expect("spawn session thread");
SessionHandle {
events: ev_rx,
frames: frame_rx,
stop,
}
}
fn now_ns() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
fn pump(
params: SessionParams,
ev_tx: async_channel::Sender<SessionEvent>,
frame_tx: async_channel::Sender<DecodedFrame>,
stop: Arc<AtomicBool>,
) {
let connector = match NativeClient::connect(
&params.host,
params.port,
params.mode,
params.compositor,
params.gamepad,
params.bitrate_kbps,
// Advertise 10-bit + HDR10 (when enabled): the presenter handles BT.2020 PQ frames (P010 on
// the GPU path, X2BGR10 on software), so the host may upgrade HDR content to a Main10/PQ
// stream — it still only does so for actual HDR content with its own 10-bit gate. 8-bit SDR
// is unaffected. A client that turns HDR off advertises `0` and always gets the 8-bit stream.
if params.hdr_enabled {
punktfunk_core::quic::VIDEO_CAP_10BIT | punktfunk_core::quic::VIDEO_CAP_HDR
} else {
0
},
None, // launch: the Windows client has no library picker yet
params.pin,
Some(params.identity),
Duration::from_secs(15),
) {
Ok(c) => Arc::new(c),
Err(e) => {
let trust_rejected = matches!(e, PunktfunkError::Crypto);
let msg = match e {
PunktfunkError::Crypto => {
"Host identity rejected — wrong fingerprint, or the host requires pairing"
.to_string()
}
PunktfunkError::Timeout => "Connection timed out".to_string(),
other => format!("Connect failed: {other:?}"),
};
let _ = ev_tx.send_blocking(SessionEvent::Failed {
msg,
trust_rejected,
});
return;
}
};
let _ = ev_tx.send_blocking(SessionEvent::Connected {
connector: connector.clone(),
mode: connector.mode(),
fingerprint: connector.host_fingerprint,
});
let mut decoder = match Decoder::new(params.decoder) {
Ok(d) => d,
Err(e) => {
let _ = ev_tx.send_blocking(SessionEvent::Ended(Some(format!("video decoder: {e}"))));
return;
}
};
let mut hardware = decoder.is_hardware();
let mut hdr = false;
// Audio is best-effort: a session without it still streams. Gamepads are the
// app-lifetime service's job (the UI attaches it on Connected).
let player = audio::AudioPlayer::spawn()
.map_err(|e| tracing::warn!(error = %e, "audio disabled"))
.ok();
let mut opus_dec = opus::Decoder::new(48_000, opus::Channels::Stereo)
.map_err(|e| tracing::warn!(error = %e, "opus decoder failed — audio disabled"))
.ok();
let _mic = params
.mic_enabled
.then(|| {
audio::MicStreamer::spawn(connector.clone())
.map_err(|e| tracing::warn!(error = %e, "mic uplink disabled"))
.ok()
})
.flatten();
let clock_offset = connector.clock_offset_ns;
let mut total_frames = 0u64;
let mut window_start = Instant::now();
let mut frames_n = 0u32;
let mut bytes_n = 0u64;
let mut decode_us_sum = 0u64;
let mut lat_us: Vec<u64> = Vec::with_capacity(256);
let mut pcm = vec![0f32; 5760 * 2]; // decode scratch: max Opus frame (120 ms stereo)
// Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs.
let mut last_dropped = connector.frames_dropped();
let mut last_kf_req: Option<Instant> = None;
let end: Option<String> = loop {
if stop.load(Ordering::SeqCst) {
break None;
}
match connector.next_frame(Duration::from_millis(4)) {
Ok(frame) => {
let t0 = Instant::now();
match decoder.decode(&frame.data) {
Ok(Some(decoded)) => {
total_frames += 1;
hdr = decoded.hdr();
// The backend can demote D3D11VA → software mid-session on a hardware error.
hardware = decoder.is_hardware();
if total_frames == 1 {
let (w, h) = decoded.dims();
tracing::info!(
width = w,
height = h,
path = if hardware { "d3d11va" } else { "software" },
hdr,
"first frame decoded"
);
}
// Latency: our wall clock expressed in the host's capture clock,
// minus the host-stamped capture pts (same math as client-rs).
let lat = (now_ns() as i128 + clock_offset as i128 - frame.pts_ns as i128)
.max(0) as u64;
if lat > 0 && lat < 10_000_000_000 {
lat_us.push(lat / 1000);
}
decode_us_sum += t0.elapsed().as_micros() as u64;
frames_n += 1;
bytes_n += frame.data.len() as u64;
let _ = frame_tx.force_send(decoded);
}
Ok(None) => {}
// Survivable (loss until the next IDR/RFI recovery) — keep feeding.
Err(e) => tracing::debug!(error = %e, "decode error (recovering)"),
}
}
Err(PunktfunkError::NoFrame) => {}
Err(PunktfunkError::Closed) => break Some("Host ended the session".to_string()),
Err(e) => break Some(format!("session: {e:?}")),
}
// Loss recovery: under infinite GOP the only recovery keyframe is one we request. The
// reassembler drops unrecoverable AUs (frames_dropped); the decoder conceals the
// reference-missing delta frames that follow and returns Ok, so keying off a decode error
// rarely fires. Request an IDR when the drop count climbs, throttled.
let dropped = connector.frames_dropped();
if dropped > last_dropped {
last_dropped = dropped;
let now = Instant::now();
if last_kf_req.is_none_or(|t| now.duration_since(t) >= Duration::from_millis(100)) {
last_kf_req = Some(now);
let _ = connector.request_keyframe();
tracing::debug!(dropped, "requested keyframe (loss recovery)");
}
}
// Drain audio between frames (packets land every 5 ms; the queue holds 320 ms).
while let Ok(pkt) = connector.next_audio(Duration::ZERO) {
if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) {
match dec.decode_float(&pkt.data, &mut pcm, false) {
Ok(samples) => player.push(pcm[..samples * 2].to_vec()),
Err(e) => tracing::debug!(error = %e, "opus decode"),
}
}
}
if window_start.elapsed() >= Duration::from_secs(1) {
let secs = window_start.elapsed().as_secs_f32();
lat_us.sort_unstable();
let p50 = lat_us.get(lat_us.len() / 2).copied().unwrap_or(0);
tracing::debug!(
fps = frames_n,
lat_p50_us = p50,
total_frames,
"stream window"
);
let _ = ev_tx.try_send(SessionEvent::Stats(Stats {
fps: frames_n as f32 / secs,
mbps: bytes_n as f32 * 8.0 / 1e6 / secs,
decode_ms: if frames_n > 0 {
decode_us_sum as f32 / frames_n as f32 / 1000.0
} else {
0.0
},
latency_ms: p50 as f32 / 1000.0,
hardware,
hdr,
}));
window_start = Instant::now();
frames_n = 0;
bytes_n = 0;
decode_us_sum = 0;
lat_us.clear();
}
};
tracing::info!(
total_frames,
reason = end.as_deref().unwrap_or("user"),
"session ended"
);
stop.store(true, Ordering::SeqCst);
let _ = ev_tx.send_blocking(SessionEvent::Ended(end));
}