feat(linux): game library browser; split app.rs into cli/launch/ui_trust
- library.rs + ui_library.rs: the host's unified game library over the management API (the Apple LibraryClient/LibraryView ported) — mTLS with the paired identity, host verified by its pinned cert fingerprint (ureq + rustls, unified with the workspace rustls 0.23); posters load async with monogram placeholders, and picking a title starts a session that asks the host to launch it (the library id rides the Hello). - app.rs (~800 lines lighter) splits into cli.rs (argv/headless pairing/--connect/screenshot scenes), launch.rs (mode resolve + session worker + event stream into the UI) and ui_trust.rs (TOFU / SPAKE2 PIN / delegated-approval dialogs); ui_hosts/ui_stream reworked around the split. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
//! Session controller: one worker thread runs connect → pump (video pull + decode, audio
|
||||
//! pull + Opus decode, stats), feeding the GTK main loop over channels. The UI keeps the
|
||||
//! `Arc<NativeClient>` from the `Connected` event for direct input sends (no extra hop on
|
||||
//! the input path) — `NativeClient` is `Sync`, planes stay one-consumer-per-thread:
|
||||
//! video+audio here, rumble+hidout on the gamepad thread.
|
||||
//! Session controller: the worker thread runs connect → pump (video pull + decode +
|
||||
//! stats), a dedicated audio thread pulls + Opus-decodes the audio plane (Apple
|
||||
//! `SessionAudio` parity — audio never waits behind a video decode), both feeding the GTK
|
||||
//! main loop / PipeWire over channels. The UI keeps the `Arc<NativeClient>` from the
|
||||
//! `Connected` event for direct input sends (no extra hop on the input path) —
|
||||
//! `NativeClient` is `Sync`, planes stay one-consumer-per-thread: video here, audio on
|
||||
//! its own thread, rumble+hidout on the gamepad thread.
|
||||
|
||||
use crate::audio;
|
||||
use crate::video::{DecodedFrame, Decoder};
|
||||
use crate::video::{DecodedFrame, DecodedImage, Decoder};
|
||||
use punktfunk_core::client::NativeClient;
|
||||
use punktfunk_core::config::{CompositorPref, GamepadPref, Mode};
|
||||
use punktfunk_core::PunktfunkError;
|
||||
@@ -27,6 +29,12 @@ pub struct SessionParams {
|
||||
pub preferred_codec: u8,
|
||||
/// Stream the default microphone to the host's virtual mic source.
|
||||
pub mic_enabled: bool,
|
||||
/// Video decoder preference (Settings; `PUNKTFUNK_DECODER` overrides — see
|
||||
/// `video::Decoder::new`).
|
||||
pub decoder: String,
|
||||
/// Library id for the host to launch this session (`"steam:570"`, from the library
|
||||
/// page); `None` = plain desktop session.
|
||||
pub launch: Option<String>,
|
||||
/// Pinned host fingerprint; `None` = trust on first use (caller persists the observed one).
|
||||
pub pin: Option<[u8; 32]>,
|
||||
pub identity: (String, String),
|
||||
@@ -44,6 +52,9 @@ pub struct Stats {
|
||||
pub decode_ms: f32,
|
||||
/// Median capture→decoded latency over the last window (host-clock corrected).
|
||||
pub latency_ms: f32,
|
||||
/// The decode path frames actually took this window (`"vaapi"`/`"software"`, empty
|
||||
/// until the first frame) — the OSD's trailing tag; tracks a mid-session fallback.
|
||||
pub decoder: &'static str,
|
||||
}
|
||||
|
||||
pub enum SessionEvent {
|
||||
@@ -86,7 +97,7 @@ pub fn start(params: SessionParams) -> SessionHandle {
|
||||
}
|
||||
}
|
||||
|
||||
fn now_ns() -> u64 {
|
||||
pub fn now_ns() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
@@ -146,7 +157,7 @@ fn pump(
|
||||
params.audio_channels,
|
||||
crate::video::decodable_codecs(), // codecs FFmpeg can decode (HEVC/H.264/AV1)
|
||||
params.preferred_codec, // the user's soft codec preference (0 = auto)
|
||||
None, // launch: the Linux client has no library picker yet
|
||||
params.launch.clone(),
|
||||
params.pin,
|
||||
Some(params.identity),
|
||||
params.connect_timeout,
|
||||
@@ -175,14 +186,15 @@ fn pump(
|
||||
fingerprint: connector.host_fingerprint,
|
||||
});
|
||||
|
||||
// Build the decoder for the codec the host resolved (never assume HEVC).
|
||||
// Build the decoder for the codec the host resolved (never assume HEVC), honoring the
|
||||
// Settings backend preference (auto/vaapi/software).
|
||||
let codec_id = crate::video::ffmpeg_codec_id(connector.codec);
|
||||
tracing::info!(
|
||||
?codec_id,
|
||||
welcome_codec = connector.codec,
|
||||
"negotiated video codec"
|
||||
);
|
||||
let mut decoder = match Decoder::new(codec_id) {
|
||||
let mut decoder = match Decoder::new(codec_id, ¶ms.decoder) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
let _ = ev_tx.send_blocking(SessionEvent::Ended(Some(format!("video decoder: {e}"))));
|
||||
@@ -190,16 +202,9 @@ fn pump(
|
||||
}
|
||||
};
|
||||
// Audio is best-effort: a session without it still streams. Gamepads are the
|
||||
// app-lifetime service's job (the UI attaches it on Connected). Build the decoder + playback
|
||||
// from the host-RESOLVED channel count (never the request), so an older/clamping host that
|
||||
// resolves stereo is decoded as stereo.
|
||||
let channels = connector.audio_channels;
|
||||
let player = audio::AudioPlayer::spawn(channels as u32)
|
||||
.map_err(|e| tracing::warn!(error = %e, "audio disabled"))
|
||||
.ok();
|
||||
let mut opus_dec = AudioDec::new(channels)
|
||||
.map_err(|e| tracing::warn!(error = %e, "opus decoder failed — audio disabled"))
|
||||
.ok();
|
||||
// app-lifetime service's job (the UI attaches it on Connected). Audio runs on its own
|
||||
// thread (one puller per plane), blocking on the audio queue like the Apple client.
|
||||
let audio_thread = spawn_audio(connector.clone(), stop.clone());
|
||||
let _mic = params
|
||||
.mic_enabled
|
||||
.then(|| {
|
||||
@@ -216,8 +221,10 @@ fn pump(
|
||||
let mut bytes_n = 0u64;
|
||||
let mut decode_us_sum = 0u64;
|
||||
let mut lat_us: Vec<u64> = Vec::with_capacity(256);
|
||||
let mut pcm = vec![0f32; 5760 * channels as usize]; // scratch: max Opus frame (120 ms) × channels
|
||||
// Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs.
|
||||
// What actually decoded the last frame — a VAAPI failure demotes mid-session, so
|
||||
// this is read off each frame's image variant rather than fixed at startup.
|
||||
let mut dec_path: &'static str = "";
|
||||
// Loss recovery: watch the host→client unrecoverable-drop count and ask for an IDR when it climbs.
|
||||
let mut last_dropped = connector.frames_dropped();
|
||||
let mut last_kf_req: Option<Instant> = None;
|
||||
|
||||
@@ -225,16 +232,23 @@ fn pump(
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
break None;
|
||||
}
|
||||
match connector.next_frame(Duration::from_millis(4)) {
|
||||
// 20 ms wait: audio has its own thread now, so this only bounds stop-flag
|
||||
// responsiveness and the per-iteration keyframe-recovery check (a frame arrives
|
||||
// every ~8–16 ms at 60–120 Hz anyway, so this rarely times out mid-stream).
|
||||
match connector.next_frame(Duration::from_millis(20)) {
|
||||
Ok(frame) => {
|
||||
let t0 = Instant::now();
|
||||
match decoder.decode(&frame.data) {
|
||||
Ok(Some(decoded)) => {
|
||||
Ok(Some(image)) => {
|
||||
total_frames += 1;
|
||||
dec_path = match &image {
|
||||
DecodedImage::Cpu(_) => "software",
|
||||
DecodedImage::Dmabuf(_) => "vaapi",
|
||||
};
|
||||
if total_frames == 1 {
|
||||
let (w, h, path) = match &decoded {
|
||||
DecodedFrame::Cpu(c) => (c.width, c.height, "software"),
|
||||
DecodedFrame::Dmabuf(d) => (d.width, d.height, "vaapi-dmabuf"),
|
||||
let (w, h, path) = match &image {
|
||||
DecodedImage::Cpu(c) => (c.width, c.height, "software"),
|
||||
DecodedImage::Dmabuf(d) => (d.width, d.height, "vaapi-dmabuf"),
|
||||
};
|
||||
tracing::info!(width = w, height = h, path, "first frame decoded");
|
||||
}
|
||||
@@ -248,7 +262,10 @@ fn pump(
|
||||
decode_us_sum += t0.elapsed().as_micros() as u64;
|
||||
frames_n += 1;
|
||||
bytes_n += frame.data.len() as u64;
|
||||
let _ = frame_tx.force_send(decoded);
|
||||
let _ = frame_tx.force_send(DecodedFrame {
|
||||
pts_ns: frame.pts_ns,
|
||||
image,
|
||||
});
|
||||
}
|
||||
Ok(None) => {}
|
||||
// Survivable (loss until the next IDR/RFI recovery) — keep feeding.
|
||||
@@ -276,17 +293,6 @@ fn pump(
|
||||
}
|
||||
}
|
||||
|
||||
// Drain audio between frames (packets land every 5 ms; the queue holds 320 ms).
|
||||
while let Ok(pkt) = connector.next_audio(Duration::ZERO) {
|
||||
if let (Some(player), Some(dec)) = (&player, opus_dec.as_mut()) {
|
||||
match dec.decode_float(&pkt.data, &mut pcm, false) {
|
||||
// `samples` is per-channel; the interleaved frame is `samples * channels`.
|
||||
Ok(samples) => player.push(pcm[..samples * channels as usize].to_vec()),
|
||||
Err(e) => tracing::debug!(error = %e, "opus decode"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if window_start.elapsed() >= Duration::from_secs(1) {
|
||||
let secs = window_start.elapsed().as_secs_f32();
|
||||
lat_us.sort_unstable();
|
||||
@@ -306,6 +312,7 @@ fn pump(
|
||||
0.0
|
||||
},
|
||||
latency_ms: p50 as f32 / 1000.0,
|
||||
decoder: dec_path,
|
||||
}));
|
||||
window_start = Instant::now();
|
||||
frames_n = 0;
|
||||
@@ -321,5 +328,52 @@ fn pump(
|
||||
"session ended"
|
||||
);
|
||||
stop.store(true, Ordering::SeqCst);
|
||||
if let Some(t) = audio_thread {
|
||||
let _ = t.join(); // exits within its 100 ms pull timeout once `stop` is set
|
||||
}
|
||||
let _ = ev_tx.send_blocking(SessionEvent::Ended(end));
|
||||
}
|
||||
|
||||
/// The dedicated audio thread: owns the Opus decoder, the PCM scratch, and the PipeWire
|
||||
/// player, and blocks on `next_audio` (the plane's single consumer — packets land every
|
||||
/// 5 ms). Decoded chunks are pushed in Vecs recycled from the player's pool, so the
|
||||
/// steady state allocates nothing. Best-effort like before: any setup failure logs and
|
||||
/// the session streams video-only. Exits on the stop flag or a closed plane.
|
||||
fn spawn_audio(
|
||||
connector: Arc<NativeClient>,
|
||||
stop: Arc<AtomicBool>,
|
||||
) -> Option<std::thread::JoinHandle<()>> {
|
||||
// Decoder + playback are built from the host-RESOLVED channel count (never the
|
||||
// request), so an older/clamping host that resolves stereo is decoded as stereo.
|
||||
let channels = connector.audio_channels;
|
||||
let player = audio::AudioPlayer::spawn(channels as u32)
|
||||
.map_err(|e| tracing::warn!(error = %e, "audio disabled"))
|
||||
.ok()?;
|
||||
let mut dec = AudioDec::new(channels)
|
||||
.map_err(|e| tracing::warn!(error = %e, "opus decoder failed — audio disabled"))
|
||||
.ok()?;
|
||||
std::thread::Builder::new()
|
||||
.name("punktfunk-audio-rx".into())
|
||||
.spawn(move || {
|
||||
let mut pcm = vec![0f32; 5760 * channels as usize]; // scratch: max Opus frame (120 ms) × channels
|
||||
while !stop.load(Ordering::SeqCst) {
|
||||
match connector.next_audio(Duration::from_millis(100)) {
|
||||
Ok(pkt) => match dec.decode_float(&pkt.data, &mut pcm, false) {
|
||||
// `samples` is per-channel; the interleaved frame is `samples * channels`.
|
||||
Ok(samples) => {
|
||||
let n = samples * channels as usize;
|
||||
let mut buf = player.take_buffer();
|
||||
buf.extend_from_slice(&pcm[..n]);
|
||||
player.push(buf);
|
||||
}
|
||||
Err(e) => tracing::debug!(error = %e, "opus decode"),
|
||||
},
|
||||
Err(PunktfunkError::NoFrame) => {}
|
||||
Err(_) => break, // plane closed — the session is ending
|
||||
}
|
||||
}
|
||||
tracing::debug!("audio pull thread exited");
|
||||
})
|
||||
.map_err(|e| tracing::warn!(error = %e, "audio thread failed to start — audio disabled"))
|
||||
.ok()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user