diff --git a/Cargo.lock b/Cargo.lock index 8c69330..b50ce55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2585,6 +2585,23 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "punktfunk-client-windows" +version = "0.0.1" +dependencies = [ + "anyhow", + "async-channel", + "ffmpeg-next", + "mdns-sd", + "opus", + "punktfunk-core", + "serde", + "serde_json", + "tracing", + "tracing-subscriber", + "wasapi", +] + [[package]] name = "punktfunk-core" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 112a196..a13c62d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/punktfunk-host", "crates/punktfunk-client-rs", "crates/punktfunk-client-linux", + "crates/punktfunk-client-windows", "crates/punktfunk-android", "tools/latency-probe", "tools/loss-harness", diff --git a/crates/punktfunk-client-windows/Cargo.toml b/crates/punktfunk-client-windows/Cargo.toml new file mode 100644 index 0000000..870b5e8 --- /dev/null +++ b/crates/punktfunk-client-windows/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "punktfunk-client-windows" +description = "Native Windows punktfunk/1 client — winit/D3D11 shell, FFmpeg decode, WASAPI audio, SDL3 gamepads" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true + +[[bin]] +name = "punktfunk-client" +path = "src/main.rs" + +# Everything is Windows-gated so `cargo build --workspace` stays green on Linux/macOS (the +# other native clients live in crates/punktfunk-client-linux and clients/apple); on other +# platforms this builds as a stub binary. Mirrors the Linux client's cfg(target_os="linux") +# gating exactly. +[target.'cfg(windows)'.dependencies] +# The protocol core, linked directly (no C ABI) — same as the GTK Linux client. NativeClient +# is Sync (mutexed plane receivers), so it drops into a UI app cleanly. +punktfunk-core = { path = "../punktfunk-core", features = ["quic"] } + +# Video decode (same FFmpeg pin as the host/Linux client) — software HEVC on the GPU-less dev +# box; D3D11VA hardware decode is a follow-up for the real-GPU box. +ffmpeg-next = "8" +opus = "0.3" + +# Audio render + mic capture (the WASAPI analogue of the Linux client's PipeWire backend). +wasapi = "0.23" + +mdns-sd = "0.20" +async-channel = "2" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +anyhow = "1" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/punktfunk-client-windows/src/audio.rs b/crates/punktfunk-client-windows/src/audio.rs new file mode 100644 index 0000000..850faa1 --- /dev/null +++ b/crates/punktfunk-client-windows/src/audio.rs @@ -0,0 +1,285 @@ +//! Audio: playback (decoded PCM → a WASAPI shared-mode render stream) and the microphone +//! uplink (WASAPI capture → Opus → 0xCB datagrams, the inverse of the host's virtual mic). +//! +//! The WASAPI analogue of the Linux client's PipeWire backend. Playback mirrors the host's +//! virtual-mic producer's adaptive jitter buffer: the session pump pushes 5 ms Opus-decoded +//! chunks on the network clock; the WASAPI render thread pulls whole event-driven quanta on +//! the device clock. Prime to ~3 quanta before producing, cap the ring so latency stays +//! bounded, re-prime after a real drain. +//! +//! WASAPI objects are COM-apartment-bound and not `Send`, so they live on a dedicated thread +//! (the same discipline as the host's `wasapi_cap`); only the channel + stop flag + join +//! handle cross the boundary. + +use anyhow::{anyhow, Context, Result}; +use punktfunk_core::client::NativeClient; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, SyncSender, TrySendError}; +use std::sync::Arc; +use std::time::Duration; +use wasapi::{Direction, SampleType, StreamMode, WaveFormat}; + +const SAMPLE_RATE: usize = 48_000; +const CHANNELS: usize = 2; +/// 48 kHz stereo f32: 2 channels * 4 bytes = 8 bytes per frame. +const BLOCK_ALIGN: usize = CHANNELS * 4; +/// Mic frames are 20 ms (960 samples/channel) — any size ≤ 120 ms is fine host-side. +const MIC_FRAME: usize = 960; + +pub struct AudioPlayer { + pcm_tx: SyncSender>, + stop: Arc, + thread: Option>, +} + +impl AudioPlayer { + /// Spawn the WASAPI render thread. Failure (no render endpoint on this box) is + /// survivable — the caller streams video-only. + pub fn spawn() -> Result { + // 64 × 5 ms = 320 ms of slack between the pump and the WASAPI loop. + let (pcm_tx, pcm_rx) = std::sync::mpsc::sync_channel::>(64); + let stop = Arc::new(AtomicBool::new(false)); + let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::>(1); + let stop_t = stop.clone(); + let thread = std::thread::Builder::new() + .name("punktfunk-audio".into()) + .spawn(move || { + if let Err(e) = render_thread(pcm_rx, stop_t, ready_tx) { + tracing::warn!(error = format!("{e:#}"), "audio playback thread ended"); + } + }) + .context("spawn audio thread")?; + match ready_rx.recv_timeout(Duration::from_secs(3)) { + Ok(Ok(())) => { + tracing::info!("WASAPI render: 48 kHz stereo f32 (default endpoint)"); + Ok(AudioPlayer { + pcm_tx, + stop, + thread: Some(thread), + }) + } + Ok(Err(e)) => Err(e), + Err(_) => Err(anyhow!( + "wasapi render init timed out (no render endpoint?)" + )), + } + } + + /// Queue one interleaved-stereo f32 chunk. Drops the chunk if the WASAPI side is wedged + /// (the renderer conceals the gap; never block the session pump). + pub fn push(&self, pcm: Vec) { + if let Err(TrySendError::Disconnected(_)) = self.pcm_tx.try_send(pcm) { + // Thread already dead — Drop will reap it; nothing to do per-chunk. + } + } +} + +impl Drop for AudioPlayer { + fn drop(&mut self) { + self.stop.store(true, Ordering::SeqCst); + if let Some(t) = self.thread.take() { + let _ = t.join(); + } + } +} + +fn render_thread( + pcm_rx: Receiver>, + stop: Arc, + ready: SyncSender>, +) -> Result<()> { + if let Err(e) = wasapi::initialize_mta() + .ok() + .context("CoInitializeEx (MTA)") + { + let _ = ready.send(Err(e)); + return Ok(()); + } + let res = (|| -> Result<()> { + let device = + wasapi::get_default_device(&Direction::Render).context("default render endpoint")?; + let mut audio_client = device.get_iaudioclient().context("IAudioClient")?; + let desired = WaveFormat::new(32, 32, &SampleType::Float, SAMPLE_RATE, CHANNELS, None); + let (default_period, _min_period) = + audio_client.get_device_period().context("device period")?; + let mode = StreamMode::EventsShared { + autoconvert: true, + buffer_duration_hns: default_period, + }; + audio_client + .initialize_client(&desired, &Direction::Render, &mode) + .context("initialize render client")?; + let h_event = audio_client.set_get_eventhandle().context("event handle")?; + let render_client = audio_client + .get_audiorenderclient() + .context("IAudioRenderClient")?; + audio_client.start_stream().context("start render stream")?; + let _ = ready.send(Ok(())); + + // Adaptive jitter buffer, in f32-byte units (same shape as the host's virtual mic). + let mut ring: VecDeque = VecDeque::new(); + let mut primed = false; + + while !stop.load(Ordering::Relaxed) { + if h_event.wait_for_event(100).is_err() { + continue; + } + // Drain everything the pump has queued into the ring. + while let Ok(chunk) = pcm_rx.try_recv() { + for s in chunk { + ring.extend(s.to_le_bytes()); + } + } + let avail_frames = audio_client + .get_available_space_in_frames() + .context("available space")? as usize; + if avail_frames == 0 { + continue; + } + let want_bytes = avail_frames * BLOCK_ALIGN; + + // Prime to ~3 quanta; cap at ~1 quantum of slack beyond that; re-prime on drain. + let target = (3 * want_bytes).clamp(720 * BLOCK_ALIGN, 9600 * BLOCK_ALIGN); + while ring.len() > target.max(want_bytes) + want_bytes { + ring.pop_front(); + } + if !primed && ring.len() >= target { + primed = true; + } + + let mut out = vec![0u8; want_bytes]; + if primed { + let n = ring.len().min(want_bytes); + for (dst, b) in out.iter_mut().zip(ring.drain(..n)) { + *dst = b; + } + } + if ring.is_empty() { + primed = false; + } + render_client + .write_to_device(avail_frames, BLOCK_ALIGN, &out, None) + .context("write_to_device")?; + } + audio_client.stop_stream().ok(); + Ok(()) + })(); + if let Err(ref e) = res { + let _ = ready.send(Err(anyhow!("{e:#}"))); + } + res +} + +/// The microphone uplink: capture the default input device, Opus-encode 20 ms chunks, ship +/// them as 0xCB datagrams into the host's virtual mic source. +pub struct MicStreamer { + stop: Arc, + thread: Option>, +} + +impl MicStreamer { + pub fn spawn(connector: Arc) -> Result { + let stop = Arc::new(AtomicBool::new(false)); + let stop_t = stop.clone(); + let thread = std::thread::Builder::new() + .name("punktfunk-mic".into()) + .spawn(move || { + if let Err(e) = mic_thread(&connector, stop_t) { + tracing::warn!(error = format!("{e:#}"), "mic uplink thread ended"); + } + }) + .context("spawn mic thread")?; + Ok(MicStreamer { + stop, + thread: Some(thread), + }) + } +} + +impl Drop for MicStreamer { + fn drop(&mut self) { + self.stop.store(true, Ordering::SeqCst); + if let Some(t) = self.thread.take() { + let _ = t.join(); + } + } +} + +fn mic_thread(connector: &Arc, stop: Arc) -> Result<()> { + wasapi::initialize_mta() + .ok() + .context("CoInitializeEx (MTA)")?; + + let mut encoder = opus::Encoder::new( + SAMPLE_RATE as u32, + opus::Channels::Stereo, + opus::Application::Voip, + ) + .map_err(|e| anyhow!("opus encoder: {e}"))?; + let _ = encoder.set_bitrate(opus::Bitrate::Bits(64_000)); + + let device = wasapi::get_default_device(&Direction::Capture) + .context("default capture endpoint (no microphone?)")?; + let mut audio_client = device.get_iaudioclient().context("IAudioClient")?; + let desired = WaveFormat::new(32, 32, &SampleType::Float, SAMPLE_RATE, CHANNELS, None); + let (default_period, _min_period) = + audio_client.get_device_period().context("device period")?; + let mode = StreamMode::EventsShared { + autoconvert: true, + buffer_duration_hns: default_period, + }; + audio_client + .initialize_client(&desired, &Direction::Capture, &mode) + .context("initialize capture client")?; + let h_event = audio_client.set_get_eventhandle().context("event handle")?; + let capture_client = audio_client + .get_audiocaptureclient() + .context("IAudioCaptureClient")?; + audio_client + .start_stream() + .context("start capture stream")?; + + let mut bytes: VecDeque = VecDeque::new(); + let mut ring: VecDeque = VecDeque::new(); + let mut out = vec![0u8; 4000]; + let mut seq = 0u32; + + while !stop.load(Ordering::Relaxed) { + if h_event.wait_for_event(100).is_err() { + continue; + } + loop { + match capture_client.get_next_packet_size() { + Ok(Some(0)) | Ok(None) => break, + Ok(Some(_n)) => { + capture_client + .read_from_device_to_deque(&mut bytes) + .context("read capture")?; + } + Err(e) => return Err(anyhow!("get_next_packet_size: {e}")), + } + } + let whole = (bytes.len() / 4) * 4; + for c in bytes.drain(..whole).collect::>().chunks_exact(4) { + ring.push_back(f32::from_le_bytes([c[0], c[1], c[2], c[3]])); + } + // Ship every complete 20 ms stereo frame. + while ring.len() >= MIC_FRAME * CHANNELS { + let pcm: Vec = ring.drain(..MIC_FRAME * CHANNELS).collect(); + match encoder.encode_float(&pcm, &mut out) { + Ok(len) => { + let pts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let _ = connector.send_mic(seq, pts, out[..len].to_vec()); + seq = seq.wrapping_add(1); + } + Err(e) => tracing::debug!(error = %e, "opus mic encode"), + } + } + } + audio_client.stop_stream().ok(); + Ok(()) +} diff --git a/crates/punktfunk-client-windows/src/discovery.rs b/crates/punktfunk-client-windows/src/discovery.rs new file mode 100644 index 0000000..e825f80 --- /dev/null +++ b/crates/punktfunk-client-windows/src/discovery.rs @@ -0,0 +1,76 @@ +//! LAN host discovery: browse the host's mDNS advert (`_punktfunk._udp`, TXT keys +//! `fp`/`pair`/`id` — see the host crate's `discovery.rs`) on a worker thread and stream +//! results to the UI. Ported verbatim from the GTK client (`mdns-sd` is cross-platform). + +use mdns_sd::{ServiceDaemon, ServiceEvent}; + +#[derive(Clone, Debug)] +pub struct DiscoveredHost { + /// Stable row key: the advertised host id, falling back to the mDNS fullname. + pub key: String, + pub name: String, + pub addr: String, + pub port: u16, + /// Host certificate fingerprint to pin (lowercase hex), empty if not advertised. + pub fp_hex: String, + /// Pairing requirement: `"required"` or `"optional"`. + pub pair: String, +} + +/// Browse continuously for the app's lifetime. The thread exits when the receiver is +/// dropped (the send fails) or the daemon dies. +pub fn browse() -> async_channel::Receiver { + let (tx, rx) = async_channel::unbounded(); + std::thread::Builder::new() + .name("punktfunk-mdns".into()) + .spawn(move || { + let daemon = match ServiceDaemon::new() { + Ok(d) => d, + Err(e) => { + tracing::warn!(error = %e, "mDNS daemon failed — discovery disabled"); + return; + } + }; + let receiver = match daemon.browse("_punktfunk._udp.local.") { + Ok(r) => r, + Err(e) => { + tracing::warn!(error = %e, "mDNS browse failed — discovery disabled"); + return; + } + }; + while let Ok(event) = receiver.recv() { + if let ServiceEvent::ServiceResolved(info) = event { + let props = info.get_properties(); + let val = |k: &str| props.get_property_val_str(k).unwrap_or("").to_string(); + let Some(addr) = info.get_addresses().iter().next().map(|a| a.to_string()) + else { + continue; + }; + let id = val("id"); + let host = DiscoveredHost { + key: if id.is_empty() { + info.get_fullname().to_string() + } else { + id + }, + name: info + .get_fullname() + .split('.') + .next() + .unwrap_or("?") + .to_string(), + addr, + port: info.get_port(), + fp_hex: val("fp"), + pair: val("pair"), + }; + if tx.send_blocking(host).is_err() { + break; // UI gone — stop browsing + } + } + } + let _ = daemon.shutdown(); + }) + .expect("spawn mdns thread"); + rx +} diff --git a/crates/punktfunk-client-windows/src/main.rs b/crates/punktfunk-client-windows/src/main.rs new file mode 100644 index 0000000..ec610ed --- /dev/null +++ b/crates/punktfunk-client-windows/src/main.rs @@ -0,0 +1,150 @@ +//! `punktfunk-client` — the native Windows punktfunk/1 client. +//! +//! Pure Rust: `NativeClient` linked as a crate (no C ABI, like the GTK Linux client) · +//! FFmpeg decode · WASAPI audio · SDL3 gamepads · a winit + Direct3D11 present surface. The +//! trust surface mirrors the other native clients: persistent identity, TOFU prompt with the +//! host fingerprint, SPAKE2 PIN pairing. +//! +//! Until the UI shell lands, the binary runs **headless** (`--connect host[:port]`): connect, +//! decode, play audio, and print per-second stats — the Windows analogue of +//! `punktfunk-client-rs`, for validating the protocol/decode path against a live host. + +#[cfg(windows)] +mod audio; +#[cfg(windows)] +mod discovery; +#[cfg(windows)] +mod session; +#[cfg(windows)] +mod trust; +#[cfg(windows)] +mod video; + +#[cfg(windows)] +fn main() { + use punktfunk_core::config::{CompositorPref, GamepadPref, Mode}; + use std::time::{Duration, Instant}; + + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()), + ) + .init(); + + let args: Vec = std::env::args().collect(); + let arg = |name: &str| -> Option { + args.iter() + .position(|a| a == name) + .and_then(|i| args.get(i + 1)) + .cloned() + }; + + let Some(target) = arg("--connect") else { + eprintln!( + "punktfunk-client (headless): --connect host[:port] [--pin HEX] [--mode WxHxHz] \ + [--bitrate MBPS] [--mic]\n\ + The windowed UI is not wired yet; this runs the protocol/decode path headless." + ); + std::process::exit(2); + }; + let (host, port) = match target.rsplit_once(':') { + Some((a, p)) => (a.to_string(), p.parse().unwrap_or(9777)), + None => (target.clone(), 9777u16), + }; + let mode = arg("--mode") + .and_then(|m| { + let mut it = m.split(['x', 'X']); + Some(Mode { + width: it.next()?.parse().ok()?, + height: it.next()?.parse().ok()?, + refresh_hz: it.next()?.parse().ok()?, + }) + }) + .unwrap_or(Mode { + width: 1280, + height: 720, + refresh_hz: 60, + }); + let pin = arg("--pin").and_then(|h| trust::parse_hex32(&h)); + let bitrate_kbps = arg("--bitrate") + .and_then(|b| b.parse::().ok()) + .map(|m| m * 1000) + .unwrap_or(0); + let mic_enabled = args.iter().any(|a| a == "--mic"); + + let identity = match trust::load_or_create_identity() { + Ok(i) => i, + Err(e) => { + eprintln!("client identity: {e:#}"); + std::process::exit(1); + } + }; + + tracing::info!(%host, port, ?mode, tofu = pin.is_none(), "connecting (headless)"); + let handle = session::start(session::SessionParams { + host, + port, + mode, + compositor: CompositorPref::Auto, + gamepad: GamepadPref::Auto, + bitrate_kbps, + mic_enabled, + pin, + identity, + }); + + // Headless consumer: drain events + frames, print stats, run until the host ends or + // ~60 s elapse (the harness bound). Frames are counted and dropped (no present yet). + let deadline = Instant::now() + Duration::from_secs(60); + let mut frames_seen = 0u64; + loop { + while let Ok(ev) = handle.events.try_recv() { + match ev { + session::SessionEvent::Connected { + mode, fingerprint, .. + } => tracing::info!( + ?mode, + fp = %trust::hex(&fingerprint), + "connected" + ), + session::SessionEvent::Stats(s) => tracing::info!( + fps = format!("{:.0}", s.fps), + mbps = format!("{:.1}", s.mbps), + decode_ms = format!("{:.2}", s.decode_ms), + lat_ms = format!("{:.2}", s.latency_ms), + frames_seen, + "stats" + ), + session::SessionEvent::Failed { msg, .. } => { + tracing::error!(%msg, "connect failed"); + return; + } + session::SessionEvent::Ended(err) => { + tracing::info!(reason = err.as_deref().unwrap_or("done"), "session ended"); + return; + } + } + } + while handle.frames.try_recv().is_ok() { + frames_seen += 1; + } + if Instant::now() > deadline { + tracing::info!(frames_seen, "harness deadline — stopping"); + handle.stop.store(true, std::sync::atomic::Ordering::SeqCst); + return; + } + std::thread::sleep(Duration::from_millis(2)); + } +} + +/// Win32/Direct3D11/WASAPI/SDL3 are Windows turf; this stub keeps `cargo build --workspace` +/// green on Linux/macOS (the other native clients live in crates/punktfunk-client-linux and +/// clients/apple). +#[cfg(not(windows))] +fn main() { + eprintln!( + "punktfunk-client-windows is Windows-only — the Linux client lives in \ + crates/punktfunk-client-linux, the macOS client in clients/apple" + ); + std::process::exit(2); +} diff --git a/crates/punktfunk-client-windows/src/session.rs b/crates/punktfunk-client-windows/src/session.rs new file mode 100644 index 0000000..c5c7b39 --- /dev/null +++ b/crates/punktfunk-client-windows/src/session.rs @@ -0,0 +1,250 @@ +//! Session controller: one worker thread runs connect → pump (video pull + decode, audio +//! pull + Opus decode, stats), feeding the UI over channels. The UI keeps the +//! `Arc` from the `Connected` event for direct input sends (no extra hop on +//! the input path) — `NativeClient` is `Sync`, planes stay one-consumer-per-thread: +//! video+audio here, rumble+hidout on the gamepad thread. +//! +//! Ported from the GTK Linux client; the platform-specific pieces are the video decoder +//! (software-only here) and the audio backend (WASAPI). The pump body is identical. + +use crate::audio; +use crate::video::{DecodedFrame, Decoder}; +use punktfunk_core::client::NativeClient; +use punktfunk_core::config::{CompositorPref, GamepadPref, Mode}; +use punktfunk_core::PunktfunkError; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +pub struct SessionParams { + pub host: String, + pub port: u16, + pub mode: Mode, + pub compositor: CompositorPref, + pub gamepad: GamepadPref, + pub bitrate_kbps: u32, + /// Stream the default microphone to the host's virtual mic source. + pub mic_enabled: bool, + /// Pinned host fingerprint; `None` = trust on first use (caller persists the observed one). + pub pin: Option<[u8; 32]>, + pub identity: (String, String), +} + +#[derive(Clone, Copy, Default)] +pub struct Stats { + pub fps: f32, + pub mbps: f32, + pub decode_ms: f32, + /// Median capture→decoded latency over the last window (host-clock corrected). + pub latency_ms: f32, +} + +pub enum SessionEvent { + Connected { + connector: Arc, + mode: Mode, + fingerprint: [u8; 32], + }, + /// `trust_rejected` is set when the connect failed the TLS trust check (a `Crypto` + /// error): for a pinned connect this is the fingerprint-changed signal, so the UI can + /// offer a re-pair (PIN) path rather than a dead-end error. + Failed { + msg: String, + trust_rejected: bool, + }, + Ended(Option), + Stats(Stats), +} + +pub struct SessionHandle { + pub events: async_channel::Receiver, + pub frames: async_channel::Receiver, + pub stop: Arc, +} + +pub fn start(params: SessionParams) -> SessionHandle { + let (ev_tx, ev_rx) = async_channel::unbounded(); + // Tiny frame queue, newest wins: force_send displaces the oldest when the UI lags. + let (frame_tx, frame_rx) = async_channel::bounded(2); + let stop = Arc::new(AtomicBool::new(false)); + let stop_w = stop.clone(); + std::thread::Builder::new() + .name("punktfunk-session".into()) + .spawn(move || pump(params, ev_tx, frame_tx, stop_w)) + .expect("spawn session thread"); + SessionHandle { + events: ev_rx, + frames: frame_rx, + stop, + } +} + +fn now_ns() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) +} + +fn pump( + params: SessionParams, + ev_tx: async_channel::Sender, + frame_tx: async_channel::Sender, + stop: Arc, +) { + let connector = match NativeClient::connect( + ¶ms.host, + params.port, + params.mode, + params.compositor, + params.gamepad, + params.bitrate_kbps, + None, // launch: the Windows client has no library picker yet + params.pin, + Some(params.identity), + Duration::from_secs(15), + ) { + Ok(c) => Arc::new(c), + Err(e) => { + let trust_rejected = matches!(e, PunktfunkError::Crypto); + let msg = match e { + PunktfunkError::Crypto => { + "Host identity rejected — wrong fingerprint, or the host requires pairing" + .to_string() + } + PunktfunkError::Timeout => "Connection timed out".to_string(), + other => format!("Connect failed: {other:?}"), + }; + let _ = ev_tx.send_blocking(SessionEvent::Failed { + msg, + trust_rejected, + }); + return; + } + }; + let _ = ev_tx.send_blocking(SessionEvent::Connected { + connector: connector.clone(), + mode: connector.mode(), + fingerprint: connector.host_fingerprint, + }); + + let mut decoder = match Decoder::new() { + Ok(d) => d, + Err(e) => { + let _ = ev_tx.send_blocking(SessionEvent::Ended(Some(format!("video decoder: {e}")))); + return; + } + }; + // Audio is best-effort: a session without it still streams. Gamepads are the + // app-lifetime service's job (the UI attaches it on Connected). + let player = audio::AudioPlayer::spawn() + .map_err(|e| tracing::warn!(error = %e, "audio disabled")) + .ok(); + let mut opus_dec = opus::Decoder::new(48_000, opus::Channels::Stereo) + .map_err(|e| tracing::warn!(error = %e, "opus decoder failed — audio disabled")) + .ok(); + let _mic = params + .mic_enabled + .then(|| { + audio::MicStreamer::spawn(connector.clone()) + .map_err(|e| tracing::warn!(error = %e, "mic uplink disabled")) + .ok() + }) + .flatten(); + + let clock_offset = connector.clock_offset_ns; + let mut total_frames = 0u64; + let mut window_start = Instant::now(); + let mut frames_n = 0u32; + let mut bytes_n = 0u64; + let mut decode_us_sum = 0u64; + let mut lat_us: Vec = Vec::with_capacity(256); + let mut pcm = vec![0f32; 5760 * 2]; // decode scratch: max Opus frame (120 ms stereo) + + let end: Option = loop { + if stop.load(Ordering::SeqCst) { + break None; + } + match connector.next_frame(Duration::from_millis(4)) { + Ok(frame) => { + let t0 = Instant::now(); + match decoder.decode(&frame.data) { + Ok(Some(decoded)) => { + total_frames += 1; + if total_frames == 1 { + let DecodedFrame::Cpu(c) = &decoded; + tracing::info!( + width = c.width, + height = c.height, + path = "software", + "first frame decoded" + ); + } + // Latency: our wall clock expressed in the host's capture clock, + // minus the host-stamped capture pts (same math as client-rs). + let lat = (now_ns() as i128 + clock_offset as i128 - frame.pts_ns as i128) + .max(0) as u64; + if lat > 0 && lat < 10_000_000_000 { + lat_us.push(lat / 1000); + } + decode_us_sum += t0.elapsed().as_micros() as u64; + frames_n += 1; + bytes_n += frame.data.len() as u64; + let _ = frame_tx.force_send(decoded); + } + Ok(None) => {} + // Survivable (loss until the next IDR/RFI recovery) — keep feeding. + Err(e) => tracing::debug!(error = %e, "decode error (recovering)"), + } + } + Err(PunktfunkError::NoFrame) => {} + Err(PunktfunkError::Closed) => break Some("Host ended the session".to_string()), + Err(e) => break Some(format!("session: {e:?}")), + } + + // Drain audio between frames (packets land every 5 ms; the queue holds 320 ms). + while let Ok(pkt) = connector.next_audio(Duration::ZERO) { + if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) { + match dec.decode_float(&pkt.data, &mut pcm, false) { + Ok(samples) => player.push(pcm[..samples * 2].to_vec()), + Err(e) => tracing::debug!(error = %e, "opus decode"), + } + } + } + + if window_start.elapsed() >= Duration::from_secs(1) { + let secs = window_start.elapsed().as_secs_f32(); + lat_us.sort_unstable(); + let p50 = lat_us.get(lat_us.len() / 2).copied().unwrap_or(0); + tracing::debug!( + fps = frames_n, + lat_p50_us = p50, + total_frames, + "stream window" + ); + let _ = ev_tx.try_send(SessionEvent::Stats(Stats { + fps: frames_n as f32 / secs, + mbps: bytes_n as f32 * 8.0 / 1e6 / secs, + decode_ms: if frames_n > 0 { + decode_us_sum as f32 / frames_n as f32 / 1000.0 + } else { + 0.0 + }, + latency_ms: p50 as f32 / 1000.0, + })); + window_start = Instant::now(); + frames_n = 0; + bytes_n = 0; + decode_us_sum = 0; + lat_us.clear(); + } + }; + + tracing::info!( + total_frames, + reason = end.as_deref().unwrap_or("user"), + "session ended" + ); + stop.store(true, Ordering::SeqCst); + let _ = ev_tx.send_blocking(SessionEvent::Ended(end)); +} diff --git a/crates/punktfunk-client-windows/src/trust.rs b/crates/punktfunk-client-windows/src/trust.rs new file mode 100644 index 0000000..f863723 --- /dev/null +++ b/crates/punktfunk-client-windows/src/trust.rs @@ -0,0 +1,166 @@ +//! Client identity, the known-hosts (pinned fingerprint) store, and app settings. +//! +//! Ported near-verbatim from the GTK Linux client; the only platform change is the config +//! directory — `%APPDATA%\punktfunk` (the Windows analogue of `~/.config/punktfunk`), shared +//! with the Windows host's identity location. The identity files (`client-{cert,key}.pem`) +//! keep the same names so the trust model is identical across the native clients. + +use anyhow::{anyhow, Context, Result}; +use punktfunk_core::quic::endpoint; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +pub fn config_dir() -> Result { + let appdata = std::env::var("APPDATA").context("APPDATA unset")?; + Ok(PathBuf::from(appdata).join("punktfunk")) +} + +/// This client's persistent identity, generated on first use — presented on every connect +/// so hosts can recognize it once paired. +pub fn load_or_create_identity() -> Result<(String, String)> { + let dir = config_dir()?; + let (cp, kp) = (dir.join("client-cert.pem"), dir.join("client-key.pem")); + if let (Ok(c), Ok(k)) = (std::fs::read_to_string(&cp), std::fs::read_to_string(&kp)) { + return Ok((c, k)); + } + let (c, k) = endpoint::generate_identity().map_err(|e| anyhow!("generate identity: {e}"))?; + std::fs::create_dir_all(&dir)?; + std::fs::write(&cp, &c)?; + std::fs::write(&kp, &k)?; + tracing::info!(cert = %cp.display(), "generated client identity"); + Ok((c, k)) +} + +pub fn hex(fp: &[u8; 32]) -> String { + fp.iter().map(|b| format!("{b:02x}")).collect() +} + +pub fn parse_hex32(s: &str) -> Option<[u8; 32]> { + if s.len() != 64 { + return None; + } + let mut out = [0u8; 32]; + for (i, b) in out.iter_mut().enumerate() { + *b = u8::from_str_radix(&s[2 * i..2 * i + 2], 16).ok()?; + } + Some(out) +} + +/// One trusted host: its pinned certificate fingerprint plus how we got there (TOFU or a +/// PIN ceremony) and where we last reached it. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KnownHost { + pub name: String, + pub addr: String, + pub port: u16, + /// SHA-256 of the host certificate, lowercase hex — the pin for every later connect. + pub fp_hex: String, + /// True if trust came from the SPAKE2 PIN ceremony (vs. trust-on-first-use). + pub paired: bool, +} + +#[derive(Default, Serialize, Deserialize)] +pub struct KnownHosts { + pub hosts: Vec, +} + +impl KnownHosts { + fn path() -> Result { + Ok(config_dir()?.join("client-known-hosts.json")) + } + + pub fn load() -> KnownHosts { + Self::path() + .and_then(|p| Ok(std::fs::read_to_string(p)?)) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() + } + + pub fn save(&self) -> Result<()> { + let p = Self::path()?; + std::fs::create_dir_all(p.parent().unwrap())?; + std::fs::write(&p, serde_json::to_string_pretty(self)?)?; + Ok(()) + } + + pub fn find_by_fp(&self, fp_hex: &str) -> Option<&KnownHost> { + self.hosts.iter().find(|h| h.fp_hex == fp_hex) + } + + pub fn find_by_addr(&self, addr: &str, port: u16) -> Option<&KnownHost> { + self.hosts.iter().find(|h| h.addr == addr && h.port == port) + } + + /// Insert or refresh an entry, keyed by fingerprint. `paired` only ever upgrades + /// (a later TOFU connect must not demote a PIN-paired host). + pub fn upsert(&mut self, entry: KnownHost) { + if let Some(h) = self.hosts.iter_mut().find(|h| h.fp_hex == entry.fp_hex) { + h.name = entry.name; + h.addr = entry.addr; + h.port = entry.port; + h.paired |= entry.paired; + } else { + self.hosts.push(entry); + } + } +} + +/// App settings, persisted as JSON. Stringly-typed gamepad/compositor prefs so the file +/// stays readable; parsed with `*Pref::from_name` at connect time. +#[derive(Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct Settings { + /// Stream mode; `0` = the native size/refresh of the monitor the window is on, + /// resolved at connect time. + pub width: u32, + pub height: u32, + pub refresh_hz: u32, + /// Requested encoder bitrate (kbps); 0 = host default. + pub bitrate_kbps: u32, + pub gamepad: String, + /// Which host compositor backend to request (advisory; the host falls back to + /// auto-detect when unavailable). + pub compositor: String, + /// Grab system shortcuts (Alt+Tab, Win…) while input is captured. + pub inhibit_shortcuts: bool, + /// Stream the default microphone to the host's virtual mic source. + pub mic_enabled: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + width: 0, + height: 0, + refresh_hz: 0, + bitrate_kbps: 0, + gamepad: "auto".into(), + compositor: "auto".into(), + inhibit_shortcuts: true, + mic_enabled: false, + } + } +} + +impl Settings { + fn path() -> Result { + Ok(config_dir()?.join("client-windows-settings.json")) + } + + pub fn load() -> Settings { + Self::path() + .and_then(|p| Ok(std::fs::read_to_string(p)?)) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() + } + + pub fn save(&self) { + let Ok(p) = Self::path() else { return }; + let _ = std::fs::create_dir_all(p.parent().unwrap()); + if let Ok(s) = serde_json::to_string_pretty(self) { + let _ = std::fs::write(&p, s); + } + } +} diff --git a/crates/punktfunk-client-windows/src/video.rs b/crates/punktfunk-client-windows/src/video.rs new file mode 100644 index 0000000..afcf266 --- /dev/null +++ b/crates/punktfunk-client-windows/src/video.rs @@ -0,0 +1,106 @@ +//! Video decode: reassembled HEVC access units → frames for the D3D11 presenter. +//! +//! The dev box has no working GPU, so this ships the **software** backend first: libavcodec +//! on the CPU + swscale to RGBA, uploaded into a D3D11 texture by the presenter. It runs +//! `AV_CODEC_FLAG_LOW_DELAY` with slice threading only — the host encodes zero-reorder +//! streams (no B-frames, in-band parameter sets on every IDR), so decode is strictly +//! one-in/one-out and frame threading would only add latency. +//! +//! `DecodedFrame` is an enum so the real-GPU **D3D11VA** path (decode → `NV12`/`P010` +//! `ID3D11Texture2D`, zero-copy into the swapchain) can be added as a second variant without +//! touching the session pump or the presenter's frame contract. + +use anyhow::{anyhow, Context as _, Result}; +use ffmpeg::format::Pixel; +use ffmpeg::software::scaling; +use ffmpeg::util::frame::Video as AvFrame; +use ffmpeg_next as ffmpeg; + +pub enum DecodedFrame { + Cpu(CpuFrame), +} + +/// RGBA pixels for a D3D11 `R8G8B8A8_UNORM` texture upload (which takes a row pitch). +pub struct CpuFrame { + pub width: u32, + pub height: u32, + /// RGBA row stride in bytes (≥ width*4 — swscale pads rows for SIMD). + pub stride: usize, + pub rgba: Vec, +} + +pub struct Decoder { + inner: SoftwareDecoder, +} + +impl Decoder { + pub fn new() -> Result { + ffmpeg::init().context("ffmpeg init")?; + Ok(Decoder { + inner: SoftwareDecoder::new()?, + }) + } + + /// Feed one access unit; returns the decoded frame (the host's streams are + /// one-in/one-out). A decode error after packet loss is survivable — log upstream and + /// keep feeding; the host's IDR/RFI recovery resynchronizes on the next keyframe. + pub fn decode(&mut self, au: &[u8]) -> Result> { + Ok(self.inner.decode(au)?.map(DecodedFrame::Cpu)) + } +} + +struct SoftwareDecoder { + decoder: ffmpeg::decoder::Video, + /// Rebuilt whenever the decoded format/size changes (mid-stream `Reconfigure`). + sws: Option<(scaling::Context, Pixel, u32, u32)>, +} + +impl SoftwareDecoder { + fn new() -> Result { + let codec = + ffmpeg::decoder::find(ffmpeg::codec::Id::HEVC).ok_or(anyhow!("no HEVC decoder"))?; + let mut ctx = ffmpeg::codec::Context::new_with_codec(codec); + unsafe { + let raw = ctx.as_mut_ptr(); + (*raw).flags |= ffmpeg::ffi::AV_CODEC_FLAG_LOW_DELAY as i32; + // Slice threading adds no frame delay (frame threading adds thread_count-1). + (*raw).thread_type = ffmpeg::ffi::FF_THREAD_SLICE; + (*raw).thread_count = 0; // auto + } + let decoder = ctx.decoder().video().context("open HEVC decoder")?; + Ok(SoftwareDecoder { decoder, sws: None }) + } + + fn decode(&mut self, au: &[u8]) -> Result> { + let packet = ffmpeg::Packet::copy(au); + self.decoder + .send_packet(&packet) + .map_err(|e| anyhow!("send_packet: {e}"))?; + let mut frame = AvFrame::empty(); + let mut out = None; + while self.decoder.receive_frame(&mut frame).is_ok() { + out = Some(self.convert_rgba(&frame)?); + } + Ok(out) + } + + fn convert_rgba(&mut self, frame: &AvFrame) -> Result { + let (fmt, w, h) = (frame.format(), frame.width(), frame.height()); + let rebuild = + !matches!(&self.sws, Some((_, f, sw, sh)) if *f == fmt && *sw == w && *sh == h); + if rebuild { + let ctx = scaling::Context::get(fmt, w, h, Pixel::RGBA, w, h, scaling::Flags::POINT) + .context("swscale context")?; + self.sws = Some((ctx, fmt, w, h)); + } + let (sws, ..) = self.sws.as_mut().unwrap(); + let mut rgba = AvFrame::empty(); + sws.run(frame, &mut rgba).map_err(|e| anyhow!("sws: {e}"))?; + Ok(CpuFrame { + width: w, + height: h, + stride: rgba.stride(0), + rgba: rgba.data(0).to_vec(), + }) + } +} diff --git a/crates/punktfunk-host/src/encode/nvenc.rs b/crates/punktfunk-host/src/encode/nvenc.rs index 66e987a..f7f01d4 100644 --- a/crates/punktfunk-host/src/encode/nvenc.rs +++ b/crates/punktfunk-host/src/encode/nvenc.rs @@ -158,11 +158,25 @@ impl NvencD3d11Encoder { } Some("3") => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_THREE_FORCED_MODE as u32, Some("2") => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_TWO_FORCED_MODE as u32, + // Main10 (10-bit / HDR): 2-way split is measurably SLOWER on Ada — at 5120x1440@240 + // Main10, forced-2 took 7.6 ms/frame (~131 fps) vs 2.8 ms (~357 fps) single-engine + // (the split/merge overhead dominates for 10-bit). A single Ada NVENC engine already + // handles 5K@240 Main10 well under the 4.17 ms budget, so DON'T split — splitting was + // the "broken animations in HDR" (the stream capped at ~131 fps). Env still overrides. + _ if self.bit_depth >= 10 => { + nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_DISABLE_MODE as u32 + } _ if pixel_rate > 1_000_000_000 => { nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_TWO_FORCED_MODE as u32 } _ => nv::NV_ENC_SPLIT_ENCODE_MODE::NV_ENC_SPLIT_AUTO_MODE as u32, }; + tracing::info!( + split_mode, + bit_depth = self.bit_depth, + pixel_rate, + "NVENC split-encode mode (0=disable 1=auto-forced 2=two 3=three 4=auto)" + ); let enc = loop { // 1. open the session bound to the D3D11 device. let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS {