d1d2ca293d
Web-console "Approve" (delegated pairing, roadmap §8b-1) was unreachable: every client routed a fresh pair=required host straight to the SPAKE2 PIN ceremony, so no "knock" was ever recorded; and an unpaired connect was rejected+closed with no way to resume after approval. The backend + console were complete but had no client-side trigger and no post-approval admit path. Host (native_pairing.rs, punktfunk1.rs): an unpaired identified knock is now PARKED instead of rejected — it releases its NVENC session permit, awaits an operator decision (NativePairing::wait_for_decision, woken by a Notify on approve/deny), and on approval re-acquires a slot and admits the SAME connection with no reconnect. QUIC keep-alive (4s/8s) holds the parked connection warm. The pairing gate moves out of the HANDSHAKE_TIMEOUT-bounded handshake future; approve_pending is reordered read-then-add and wait_for_decision double-checks is_paired to close a "neither pending nor paired" race. New PENDING_APPROVAL_WAIT (180s). Tests: delegated_approval_admits_after_knock now approves mid-park (no reconnect) + new wait_for_decision_approve_deny_timeout unit test (108 host tests green). Clients (Linux/Apple/Windows/Android): a fresh pair=required host now offers "Request access" alongside the PIN ceremony — a plain identified connect with a ~185s handshake budget and a cancelable "waiting for approval" UI; on success the host is saved as paired, and cancel returns the UI immediately while a late- resolving connect is torn down silently via a per-attempt flag. Apple reuses the existing C-ABI timeout_ms (no ABI change); Windows adds SessionParams.connect_timeout + a RequestAccess screen; Android adds a timeoutMs arg to the nativeConnect JNI seam (both sides + both callers). Linux built + clippy + fmt clean; Apple/Windows/ Android pending their CI/on-device compiles. SPAKE2 ceremony reviewed end-to-end against the spake2 0.4 contract — correct, no changes needed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
314 lines
12 KiB
Rust
314 lines
12 KiB
Rust
//! Session controller: one worker thread runs connect → pump (video pull + decode, audio
|
||
//! pull + Opus decode, stats), feeding the GTK main loop over channels. The UI keeps the
|
||
//! `Arc<NativeClient>` from the `Connected` event for direct input sends (no extra hop on
|
||
//! the input path) — `NativeClient` is `Sync`, planes stay one-consumer-per-thread:
|
||
//! video+audio here, rumble+hidout on the gamepad thread.
|
||
|
||
use crate::audio;
|
||
use crate::video::{DecodedFrame, Decoder};
|
||
use punktfunk_core::client::NativeClient;
|
||
use punktfunk_core::config::{CompositorPref, GamepadPref, Mode};
|
||
use punktfunk_core::PunktfunkError;
|
||
use std::sync::atomic::{AtomicBool, Ordering};
|
||
use std::sync::Arc;
|
||
use std::time::{Duration, Instant};
|
||
|
||
pub struct SessionParams {
|
||
pub host: String,
|
||
pub port: u16,
|
||
pub mode: Mode,
|
||
pub compositor: CompositorPref,
|
||
pub gamepad: GamepadPref,
|
||
pub bitrate_kbps: u32,
|
||
/// Requested audio channel count (2/6/8); the host echoes the resolved value.
|
||
pub audio_channels: u8,
|
||
/// Stream the default microphone to the host's virtual mic source.
|
||
pub mic_enabled: bool,
|
||
/// Pinned host fingerprint; `None` = trust on first use (caller persists the observed one).
|
||
pub pin: Option<[u8; 32]>,
|
||
pub identity: (String, String),
|
||
/// How long to wait for the handshake. The normal path uses a short budget; the
|
||
/// "request access" (delegated-approval) path uses a long one, because the host PARKS the
|
||
/// connection until the operator clicks Approve in its console (so this must exceed the
|
||
/// host's approval window — see `PENDING_APPROVAL_WAIT`).
|
||
pub connect_timeout: Duration,
|
||
}
|
||
|
||
#[derive(Clone, Copy, Default)]
|
||
pub struct Stats {
|
||
pub fps: f32,
|
||
pub mbps: f32,
|
||
pub decode_ms: f32,
|
||
/// Median capture→decoded latency over the last window (host-clock corrected).
|
||
pub latency_ms: f32,
|
||
}
|
||
|
||
pub enum SessionEvent {
|
||
Connected {
|
||
connector: Arc<NativeClient>,
|
||
mode: Mode,
|
||
fingerprint: [u8; 32],
|
||
},
|
||
/// `trust_rejected` is set when the connect failed the TLS trust check (a `Crypto`
|
||
/// error): for a pinned connect this is the fingerprint-changed signal, so the UI can
|
||
/// offer a re-pair (PIN) path rather than a dead-end error.
|
||
Failed {
|
||
msg: String,
|
||
trust_rejected: bool,
|
||
},
|
||
Ended(Option<String>),
|
||
Stats(Stats),
|
||
}
|
||
|
||
pub struct SessionHandle {
|
||
pub events: async_channel::Receiver<SessionEvent>,
|
||
pub frames: async_channel::Receiver<DecodedFrame>,
|
||
pub stop: Arc<AtomicBool>,
|
||
}
|
||
|
||
pub fn start(params: SessionParams) -> SessionHandle {
|
||
let (ev_tx, ev_rx) = async_channel::unbounded();
|
||
// Tiny frame queue, newest wins: force_send displaces the oldest when the UI lags.
|
||
let (frame_tx, frame_rx) = async_channel::bounded(2);
|
||
let stop = Arc::new(AtomicBool::new(false));
|
||
let stop_w = stop.clone();
|
||
std::thread::Builder::new()
|
||
.name("punktfunk-session".into())
|
||
.spawn(move || pump(params, ev_tx, frame_tx, stop_w))
|
||
.expect("spawn session thread");
|
||
SessionHandle {
|
||
events: ev_rx,
|
||
frames: frame_rx,
|
||
stop,
|
||
}
|
||
}
|
||
|
||
fn now_ns() -> u64 {
|
||
std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.map(|d| d.as_nanos() as u64)
|
||
.unwrap_or(0)
|
||
}
|
||
|
||
/// Opus decoder for the audio plane: a plain stereo decoder (the validated path) or a multistream
|
||
/// decoder for 5.1/7.1, both behind one `decode_float`. Built from the host-RESOLVED channel count
|
||
/// via the shared layout table.
|
||
enum AudioDec {
|
||
Stereo(opus::Decoder),
|
||
Surround(opus::MSDecoder),
|
||
}
|
||
|
||
impl AudioDec {
|
||
fn new(channels: u8) -> Result<AudioDec, opus::Error> {
|
||
if channels == 2 {
|
||
Ok(AudioDec::Stereo(opus::Decoder::new(
|
||
48_000,
|
||
opus::Channels::Stereo,
|
||
)?))
|
||
} else {
|
||
let l = punktfunk_core::audio::layout_for(channels, false);
|
||
Ok(AudioDec::Surround(opus::MSDecoder::new(
|
||
48_000, l.streams, l.coupled, l.mapping,
|
||
)?))
|
||
}
|
||
}
|
||
|
||
fn decode_float(
|
||
&mut self,
|
||
input: &[u8],
|
||
out: &mut [f32],
|
||
fec: bool,
|
||
) -> Result<usize, opus::Error> {
|
||
match self {
|
||
AudioDec::Stereo(d) => d.decode_float(input, out, fec),
|
||
AudioDec::Surround(d) => d.decode_float(input, out, fec),
|
||
}
|
||
}
|
||
}
|
||
|
||
fn pump(
|
||
params: SessionParams,
|
||
ev_tx: async_channel::Sender<SessionEvent>,
|
||
frame_tx: async_channel::Sender<DecodedFrame>,
|
||
stop: Arc<AtomicBool>,
|
||
) {
|
||
let connector = match NativeClient::connect(
|
||
¶ms.host,
|
||
params.port,
|
||
params.mode,
|
||
params.compositor,
|
||
params.gamepad,
|
||
params.bitrate_kbps,
|
||
0, // video_caps: the Linux client has no 10-bit/HDR present path yet
|
||
params.audio_channels,
|
||
None, // launch: the Linux client has no library picker yet
|
||
params.pin,
|
||
Some(params.identity),
|
||
params.connect_timeout,
|
||
) {
|
||
Ok(c) => Arc::new(c),
|
||
Err(e) => {
|
||
let trust_rejected = matches!(e, PunktfunkError::Crypto);
|
||
let msg = match e {
|
||
PunktfunkError::Crypto => {
|
||
"Host identity rejected — wrong fingerprint, or the host requires pairing"
|
||
.to_string()
|
||
}
|
||
PunktfunkError::Timeout => "Connection timed out".to_string(),
|
||
other => format!("Connect failed: {other:?}"),
|
||
};
|
||
let _ = ev_tx.send_blocking(SessionEvent::Failed {
|
||
msg,
|
||
trust_rejected,
|
||
});
|
||
return;
|
||
}
|
||
};
|
||
let _ = ev_tx.send_blocking(SessionEvent::Connected {
|
||
connector: connector.clone(),
|
||
mode: connector.mode(),
|
||
fingerprint: connector.host_fingerprint,
|
||
});
|
||
|
||
let mut decoder = match Decoder::new() {
|
||
Ok(d) => d,
|
||
Err(e) => {
|
||
let _ = ev_tx.send_blocking(SessionEvent::Ended(Some(format!("video decoder: {e}"))));
|
||
return;
|
||
}
|
||
};
|
||
// Audio is best-effort: a session without it still streams. Gamepads are the
|
||
// app-lifetime service's job (the UI attaches it on Connected). Build the decoder + playback
|
||
// from the host-RESOLVED channel count (never the request), so an older/clamping host that
|
||
// resolves stereo is decoded as stereo.
|
||
let channels = connector.audio_channels;
|
||
let player = audio::AudioPlayer::spawn(channels as u32)
|
||
.map_err(|e| tracing::warn!(error = %e, "audio disabled"))
|
||
.ok();
|
||
let mut opus_dec = AudioDec::new(channels)
|
||
.map_err(|e| tracing::warn!(error = %e, "opus decoder failed — audio disabled"))
|
||
.ok();
|
||
let _mic = params
|
||
.mic_enabled
|
||
.then(|| {
|
||
audio::MicStreamer::spawn(connector.clone())
|
||
.map_err(|e| tracing::warn!(error = %e, "mic uplink disabled"))
|
||
.ok()
|
||
})
|
||
.flatten();
|
||
|
||
let clock_offset = connector.clock_offset_ns;
|
||
let mut total_frames = 0u64;
|
||
let mut window_start = Instant::now();
|
||
let mut frames_n = 0u32;
|
||
let mut bytes_n = 0u64;
|
||
let mut decode_us_sum = 0u64;
|
||
let mut lat_us: Vec<u64> = Vec::with_capacity(256);
|
||
let mut pcm = vec![0f32; 5760 * channels as usize]; // scratch: max Opus frame (120 ms) × channels
|
||
// Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs.
|
||
let mut last_dropped = connector.frames_dropped();
|
||
let mut last_kf_req: Option<Instant> = None;
|
||
|
||
let end: Option<String> = loop {
|
||
if stop.load(Ordering::SeqCst) {
|
||
break None;
|
||
}
|
||
match connector.next_frame(Duration::from_millis(4)) {
|
||
Ok(frame) => {
|
||
let t0 = Instant::now();
|
||
match decoder.decode(&frame.data) {
|
||
Ok(Some(decoded)) => {
|
||
total_frames += 1;
|
||
if total_frames == 1 {
|
||
let (w, h, path) = match &decoded {
|
||
DecodedFrame::Cpu(c) => (c.width, c.height, "software"),
|
||
DecodedFrame::Dmabuf(d) => (d.width, d.height, "vaapi-dmabuf"),
|
||
};
|
||
tracing::info!(width = w, height = h, path, "first frame decoded");
|
||
}
|
||
// Latency: our wall clock expressed in the host's capture clock,
|
||
// minus the host-stamped capture pts (same math as client-rs).
|
||
let lat = (now_ns() as i128 + clock_offset as i128 - frame.pts_ns as i128)
|
||
.max(0) as u64;
|
||
if lat > 0 && lat < 10_000_000_000 {
|
||
lat_us.push(lat / 1000);
|
||
}
|
||
decode_us_sum += t0.elapsed().as_micros() as u64;
|
||
frames_n += 1;
|
||
bytes_n += frame.data.len() as u64;
|
||
let _ = frame_tx.force_send(decoded);
|
||
}
|
||
Ok(None) => {}
|
||
// Survivable (loss until the next IDR/RFI recovery) — keep feeding.
|
||
Err(e) => tracing::debug!(error = %e, "decode error (recovering)"),
|
||
}
|
||
}
|
||
Err(PunktfunkError::NoFrame) => {}
|
||
Err(PunktfunkError::Closed) => break Some("Host ended the session".to_string()),
|
||
Err(e) => break Some(format!("session: {e:?}")),
|
||
}
|
||
|
||
// Loss recovery: under infinite GOP the only recovery keyframe is one we request. The
|
||
// reassembler drops unrecoverable AUs (frames_dropped); the decoder then conceals the
|
||
// reference-missing delta frames that follow and returns Ok, so keying off a decode error
|
||
// rarely fires. Request an IDR when the drop count climbs, throttled — the decode stays
|
||
// wedged for several frames until the IDR lands, so requesting every frame would flood.
|
||
let dropped = connector.frames_dropped();
|
||
if dropped > last_dropped {
|
||
last_dropped = dropped;
|
||
let now = Instant::now();
|
||
if last_kf_req.is_none_or(|t| now.duration_since(t) >= Duration::from_millis(100)) {
|
||
last_kf_req = Some(now);
|
||
let _ = connector.request_keyframe();
|
||
tracing::debug!(dropped, "requested keyframe (loss recovery)");
|
||
}
|
||
}
|
||
|
||
// Drain audio between frames (packets land every 5 ms; the queue holds 320 ms).
|
||
while let Ok(pkt) = connector.next_audio(Duration::ZERO) {
|
||
if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) {
|
||
match dec.decode_float(&pkt.data, &mut pcm, false) {
|
||
// `samples` is per-channel; the interleaved frame is `samples * channels`.
|
||
Ok(samples) => player.push(pcm[..samples * channels as usize].to_vec()),
|
||
Err(e) => tracing::debug!(error = %e, "opus decode"),
|
||
}
|
||
}
|
||
}
|
||
|
||
if window_start.elapsed() >= Duration::from_secs(1) {
|
||
let secs = window_start.elapsed().as_secs_f32();
|
||
lat_us.sort_unstable();
|
||
let p50 = lat_us.get(lat_us.len() / 2).copied().unwrap_or(0);
|
||
tracing::debug!(
|
||
fps = frames_n,
|
||
lat_p50_us = p50,
|
||
total_frames,
|
||
"stream window"
|
||
);
|
||
let _ = ev_tx.try_send(SessionEvent::Stats(Stats {
|
||
fps: frames_n as f32 / secs,
|
||
mbps: bytes_n as f32 * 8.0 / 1e6 / secs,
|
||
decode_ms: if frames_n > 0 {
|
||
decode_us_sum as f32 / frames_n as f32 / 1000.0
|
||
} else {
|
||
0.0
|
||
},
|
||
latency_ms: p50 as f32 / 1000.0,
|
||
}));
|
||
window_start = Instant::now();
|
||
frames_n = 0;
|
||
bytes_n = 0;
|
||
decode_us_sum = 0;
|
||
lat_us.clear();
|
||
}
|
||
};
|
||
|
||
tracing::info!(
|
||
total_frames,
|
||
reason = end.as_deref().unwrap_or("user"),
|
||
"session ended"
|
||
);
|
||
stop.store(true, Ordering::SeqCst);
|
||
let _ = ev_tx.send_blocking(SessionEvent::Ended(end));
|
||
}
|