feat(host): web-console performance capture — record stream stats, graph them
apple / swift (push) Successful in 1m1s
android / android (push) Successful in 4m13s
ci / rust (push) Successful in 4m42s
ci / web (push) Successful in 50s
ci / docs-site (push) Successful in 53s
windows-host / package (push) Successful in 5m51s
apple / screenshots (push) Successful in 5m1s
deb / build-publish (push) Successful in 2m29s
decky / build-publish (push) Successful in 12s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 33s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
ci / bench (push) Successful in 4m35s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m9s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 9m10s
apple / swift (push) Successful in 1m1s
android / android (push) Successful in 4m13s
ci / rust (push) Successful in 4m42s
ci / web (push) Successful in 50s
ci / docs-site (push) Successful in 53s
windows-host / package (push) Successful in 5m51s
apple / screenshots (push) Successful in 5m1s
deb / build-publish (push) Successful in 2m29s
decky / build-publish (push) Successful in 12s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 33s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 5s
ci / bench (push) Successful in 4m35s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 9m9s
docker / deploy-docs (push) Successful in 18s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 9m10s
Arm streaming-perf-stats capture from the web console, play, stop, and review the run as graphs; finished captures are saved to disk as browsable/exportable recordings. Covers both the native punktfunk/1 path and GameStream. - stats_recorder.rs: one shared Arc<StatsRecorder> ring (created in gamestream::serve, shared with the mgmt API + both streaming loops, mirroring NativePairing). The hot-path gate is a runtime AtomicBool that replaces the startup-only PUNKTFUNK_PERF for *recording* (PERF stdout logging unchanged); bounded ring (~3 h); atomic temp+rename writes to ~/.config/punktfunk/captures/*.json; path-traversal-safe ids; poison-resilient locks. - native (punktfunk1.rs) + GameStream (stream.rs) emit a StatsSample at their existing ~2 s / ~1 s aggregation boundary — per-stage latency p50/p99, fps new/repeat, goodput, loss/FEC deltas — with no new per-frame work beyond the cheap atomic check. FrameMsg.was_measured keeps pre-arm in-flight frames out of the first window's percentiles (without zeroing the Windows-relay path's fps/encode). - mgmt.rs: 7 bearer-only /api/v1/stats/* endpoints (capture start/stop/status/live; recordings list/get/delete); api/openapi.json regenerated, in sync. - web: new "Performance" page (recharts, rendered SSR-safe) — capture control, live graphs while armed, recordings table (view / download-JSON / delete), and a detail view with the latency stacked-area bottleneck breakdown (p50/p99 toggle) + throughput + health. Charts adapt to either path's stage set. Design: design/stats-capture-plan.md. Built and adversarially reviewed via a multi-agent workflow; workspace build/clippy(-D warnings)/fmt/tests green, OpenAPI no-drift. Not yet on-glass validated against a live session. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -125,12 +125,21 @@ pub struct AppState {
|
||||
/// (avoids a PipeWire stream setup per reconnect); drained on reuse so no stale audio is
|
||||
/// sent, dropped + reopened when a session negotiates a different channel count.
|
||||
pub audio_cap: std::sync::Arc<std::sync::Mutex<Option<Box<dyn crate::audio::AudioCapturer>>>>,
|
||||
/// Shared streaming-stats recorder (web-console capture/graph). The GameStream encode loop
|
||||
/// reads `is_armed()` per frame and emits samples; the same `Arc` is shared with the mgmt API
|
||||
/// and the native punktfunk/1 loops so one capture spans whichever path is streaming.
|
||||
pub stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
/// Fresh control-plane state: no active session; the pairing allow-list is loaded from
|
||||
/// disk (pairings persist across restarts).
|
||||
pub fn new(host: Host, identity: cert::ServerIdentity) -> AppState {
|
||||
/// disk (pairings persist across restarts). `stats` is the shared recorder handed to both the
|
||||
/// mgmt API and the streaming loops.
|
||||
pub fn new(
|
||||
host: Host,
|
||||
identity: cert::ServerIdentity,
|
||||
stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
) -> AppState {
|
||||
AppState {
|
||||
host,
|
||||
identity,
|
||||
@@ -145,6 +154,7 @@ impl AppState {
|
||||
rfi_range: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
video_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
audio_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
stats,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -166,7 +176,10 @@ pub fn serve(
|
||||
) -> Result<()> {
|
||||
let host = Host::detect()?;
|
||||
let identity = cert::ServerIdentity::load_or_create().context("host certificate")?;
|
||||
let state = Arc::new(AppState::new(host, identity));
|
||||
// The shared streaming-stats recorder: one handle for the mgmt API, the GameStream encode loop
|
||||
// (via `AppState`), and the native punktfunk/1 loops (passed to `punktfunk1::serve`).
|
||||
let stats = crate::stats_recorder::StatsRecorder::new(crate::stats_recorder::default_dir());
|
||||
let state = Arc::new(AppState::new(host, identity, stats.clone()));
|
||||
// The native plane always runs, so the shared native-pairing handle (linking the QUIC ceremony
|
||||
// and the management API) always exists.
|
||||
let np = Arc::new(
|
||||
@@ -206,8 +219,8 @@ pub fn serve(
|
||||
);
|
||||
tokio::try_join!(
|
||||
nvhttp::run(state.clone()),
|
||||
crate::mgmt::run(state.clone(), mgmt, Some(np.clone())),
|
||||
crate::punktfunk1::serve(native_opts, np),
|
||||
crate::mgmt::run(state.clone(), mgmt, Some(np.clone()), stats.clone()),
|
||||
crate::punktfunk1::serve(native_opts, np, stats.clone()),
|
||||
)?;
|
||||
} else {
|
||||
// Secure default: native punktfunk/1 + management API only (no GameStream surface).
|
||||
@@ -217,8 +230,8 @@ pub fn serve(
|
||||
(GameStream OFF — pass --gamestream for stock-Moonlight compat)"
|
||||
);
|
||||
tokio::try_join!(
|
||||
crate::mgmt::run(state.clone(), mgmt, Some(np.clone())),
|
||||
crate::punktfunk1::serve(native_opts, np),
|
||||
crate::mgmt::run(state.clone(), mgmt, Some(np.clone()), stats.clone()),
|
||||
crate::punktfunk1::serve(native_opts, np, stats.clone()),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -291,7 +291,10 @@ mod tests {
|
||||
https_port: HTTPS_PORT,
|
||||
};
|
||||
let identity = super::super::cert::ServerIdentity::ephemeral().expect("ephemeral identity");
|
||||
Arc::new(AppState::new(host, identity))
|
||||
let stats = crate::stats_recorder::StatsRecorder::new(
|
||||
std::env::temp_dir().join(format!("pf-nvhttp-stats-{}", std::process::id())),
|
||||
);
|
||||
Arc::new(AppState::new(host, identity, stats))
|
||||
}
|
||||
|
||||
fn fp_of(der: &[u8]) -> String {
|
||||
|
||||
@@ -234,6 +234,7 @@ fn handle_request(req: &Request, state: &AppState) -> String {
|
||||
state.force_idr.clone(),
|
||||
state.rfi_range.clone(),
|
||||
state.video_cap.clone(),
|
||||
state.stats.clone(),
|
||||
);
|
||||
}
|
||||
Some(_) => tracing::info!("RTSP PLAY — stream already running"),
|
||||
|
||||
@@ -48,6 +48,7 @@ pub fn start(
|
||||
force_idr: Arc<AtomicBool>,
|
||||
rfi_range: RfiSlot,
|
||||
video_cap: CapturerSlot,
|
||||
stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
) {
|
||||
let _ = std::thread::Builder::new()
|
||||
.name("punktfunk-video".into())
|
||||
@@ -60,6 +61,7 @@ pub fn start(
|
||||
&force_idr,
|
||||
&rfi_range,
|
||||
&video_cap,
|
||||
&stats,
|
||||
) {
|
||||
tracing::error!(error = %format!("{e:#}"), "video stream failed");
|
||||
}
|
||||
@@ -68,6 +70,7 @@ pub fn start(
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn run(
|
||||
cfg: StreamConfig,
|
||||
app: Option<&super::apps::AppEntry>,
|
||||
@@ -75,6 +78,9 @@ fn run(
|
||||
force_idr: &AtomicBool,
|
||||
rfi_range: &std::sync::Mutex<Option<(i64, i64)>>,
|
||||
video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>,
|
||||
// Shared stats recorder for the web-console capture/graph. Threaded into `stream_body` (the
|
||||
// encode loop); per-frame sample emission is wired by a later pass.
|
||||
stats: &Arc<crate::stats_recorder::StatsRecorder>,
|
||||
) -> Result<()> {
|
||||
// GameStream capture/encode thread: apply Windows session tuning (no-op off Windows).
|
||||
crate::session_tuning::on_hot_thread();
|
||||
@@ -100,6 +106,8 @@ fn run(
|
||||
sock.connect(client)
|
||||
.context("connect client video endpoint")?;
|
||||
tracing::info!(%client, "video: client endpoint learned");
|
||||
// Short label for web-console stats captures: the client's peer IP.
|
||||
let client_label = client.ip().to_string();
|
||||
|
||||
// Native client-resolution source: create a compositor virtual output sized to the client's
|
||||
// request and capture it (no scaling). Self-contained — deliberately NOT pooled in
|
||||
@@ -163,7 +171,16 @@ fn run(
|
||||
}
|
||||
}
|
||||
}
|
||||
return stream_body(&mut *capturer, &sock, cfg, running, force_idr, rfi_range);
|
||||
return stream_body(
|
||||
&mut *capturer,
|
||||
&sock,
|
||||
cfg,
|
||||
running,
|
||||
force_idr,
|
||||
rfi_range,
|
||||
stats,
|
||||
&client_label,
|
||||
);
|
||||
}
|
||||
|
||||
// Reuse the persistent capturer (one screencast session → clean reconnect); create it on
|
||||
@@ -183,7 +200,16 @@ fn run(
|
||||
}
|
||||
};
|
||||
capturer.set_active(true);
|
||||
let result = stream_body(&mut *capturer, &sock, cfg, running, force_idr, rfi_range);
|
||||
let result = stream_body(
|
||||
&mut *capturer,
|
||||
&sock,
|
||||
cfg,
|
||||
running,
|
||||
force_idr,
|
||||
rfi_range,
|
||||
stats,
|
||||
&client_label,
|
||||
);
|
||||
capturer.set_active(false);
|
||||
*video_cap.lock().unwrap() = Some(capturer);
|
||||
result
|
||||
@@ -326,8 +352,20 @@ fn spawn_sender(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Percentile of a slice (sorts it in place first). `q` in `0.0..=1.0`. Used for the web-console
|
||||
/// stats sample's per-stage p50/p99.
|
||||
fn percentile(v: &mut [u32], q: f64) -> u32 {
|
||||
if v.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
v.sort_unstable();
|
||||
let i = ((v.len() as f64 * q) as usize).min(v.len() - 1);
|
||||
v[i]
|
||||
}
|
||||
|
||||
/// The encode → packetize loop, over a borrowed capturer. Sending runs on a dedicated thread
|
||||
/// (see [`spawn_sender`]) so a send spike can never stall capture/encode.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn stream_body(
|
||||
capturer: &mut dyn Capturer,
|
||||
sock: &UdpSocket,
|
||||
@@ -335,6 +373,11 @@ fn stream_body(
|
||||
running: &Arc<AtomicBool>,
|
||||
force_idr: &AtomicBool,
|
||||
rfi_range: &std::sync::Mutex<Option<(i64, i64)>>,
|
||||
// Shared stats recorder. The encode loop reads `stats.is_armed()` per frame to decide whether
|
||||
// to accumulate the per-stage split, then emits a `StatsSample` at its 1 s aggregation boundary.
|
||||
stats: &Arc<crate::stats_recorder::StatsRecorder>,
|
||||
// Short client label (peer IP) seeded into the capture meta on the first armed registration.
|
||||
client_label: &str,
|
||||
) -> Result<()> {
|
||||
// The first frame establishes the authoritative size/format for the encoder.
|
||||
let mut frame = capturer.next_frame().context("capture first frame")?;
|
||||
@@ -398,6 +441,19 @@ fn stream_body(
|
||||
let perf = crate::config::config().perf;
|
||||
let (mut mx_cap, mut mx_enc, mut mx_pkt, mut mx_send, mut mx_pkts, mut uniq) =
|
||||
(0u128, 0u128, 0u128, 0u128, 0usize, 0u32);
|
||||
// Web-console stats accumulation (active when `perf` OR a capture is armed): per-stage vectors
|
||||
// for p50/p99, the goodput bytes queued to the sender this window, the previous window's
|
||||
// dropped-frame count for delta computation, and the registration id cached on the first sample.
|
||||
let codec_name = match cfg.codec {
|
||||
Codec::H264 => "h264",
|
||||
Codec::H265 => "hevc",
|
||||
Codec::Av1 => "av1",
|
||||
};
|
||||
let mut sid: Option<u32> = None;
|
||||
let (mut v_cap, mut v_enc, mut v_pkt, mut v_send): (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>) =
|
||||
(Vec::new(), Vec::new(), Vec::new(), Vec::new());
|
||||
let mut bytes_win: u64 = 0;
|
||||
let mut last_dropped_batches: u64 = 0;
|
||||
// Absolute next-frame deadline — the single pacing clock for the loop.
|
||||
let mut next_frame = Instant::now();
|
||||
// RFI capability is fixed for the session (probed at encoder open). Query it once so the
|
||||
@@ -407,6 +463,9 @@ fn stream_body(
|
||||
|
||||
while running.load(Ordering::SeqCst) {
|
||||
let tick = Instant::now();
|
||||
// Measure per-stage timing when `PUNKTFUNK_PERF` is set OR a web-console stats capture is
|
||||
// armed (cheap Relaxed atomic, re-read each frame).
|
||||
let measure = perf || stats.is_armed();
|
||||
// Advance to the freshest captured frame if one arrived; otherwise reuse the last.
|
||||
if let Some(f) = capturer.try_latest().context("capture frame")? {
|
||||
frame = f;
|
||||
@@ -447,9 +506,19 @@ fn stream_body(
|
||||
// Hand the frame's packets to the send thread; never block here. A full queue means
|
||||
// the sender is behind — drop this batch (FEC/RFI covers the client) and keep encoding.
|
||||
let n = batch.len();
|
||||
// Goodput this window = bytes actually queued to the sender (a dropped batch never reaches
|
||||
// the wire, so it's excluded). Summed only when measuring, to keep the idle path free.
|
||||
let batch_bytes: u64 = if measure {
|
||||
batch.iter().map(|p| p.len() as u64).sum()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
if n > 0 {
|
||||
match batch_tx.try_send(batch) {
|
||||
Ok(()) => sent_batches += 1,
|
||||
Ok(()) => {
|
||||
sent_batches += 1;
|
||||
bytes_win += batch_bytes;
|
||||
}
|
||||
Err(std::sync::mpsc::TrySendError::Full(_)) => {
|
||||
dropped_batches += 1;
|
||||
if dropped_batches.is_power_of_two() {
|
||||
@@ -461,17 +530,26 @@ fn stream_body(
|
||||
}
|
||||
}
|
||||
}
|
||||
if perf {
|
||||
if measure {
|
||||
let t_send = tick.elapsed();
|
||||
mx_cap = mx_cap.max(t_cap.as_micros());
|
||||
mx_enc = mx_enc.max((t_enc - t_cap).as_micros());
|
||||
mx_pkt = mx_pkt.max((t_pkt - t_enc).as_micros());
|
||||
mx_send = mx_send.max((t_send - t_pkt).as_micros());
|
||||
let cap_us = t_cap.as_micros();
|
||||
let enc_us = (t_enc - t_cap).as_micros();
|
||||
let pkt_us = (t_pkt - t_enc).as_micros();
|
||||
let send_us = (t_send - t_pkt).as_micros();
|
||||
mx_cap = mx_cap.max(cap_us);
|
||||
mx_enc = mx_enc.max(enc_us);
|
||||
mx_pkt = mx_pkt.max(pkt_us);
|
||||
mx_send = mx_send.max(send_us);
|
||||
mx_pkts = mx_pkts.max(n);
|
||||
v_cap.push(cap_us as u32);
|
||||
v_enc.push(enc_us as u32);
|
||||
v_pkt.push(pkt_us as u32);
|
||||
v_send.push(send_us as u32);
|
||||
}
|
||||
|
||||
fps_count += 1;
|
||||
if fps_t.elapsed() >= Duration::from_secs(1) {
|
||||
let secs = fps_t.elapsed().as_secs_f64();
|
||||
if perf {
|
||||
// Max µs/stage this second: cap=drain channel, enc=submit (zero-copy device
|
||||
// copy + NVENC), pkt=poll+FEC+packetize, send=paced packet send. `uniq`=new
|
||||
@@ -486,12 +564,6 @@ fn stream_body(
|
||||
max_pkts = mx_pkts,
|
||||
"video: streaming (perf)"
|
||||
);
|
||||
mx_cap = 0;
|
||||
mx_enc = 0;
|
||||
mx_pkt = 0;
|
||||
mx_send = 0;
|
||||
mx_pkts = 0;
|
||||
uniq = 0;
|
||||
} else {
|
||||
tracing::info!(
|
||||
fps = fps_count,
|
||||
@@ -500,6 +572,68 @@ fn stream_body(
|
||||
"video: streaming"
|
||||
);
|
||||
}
|
||||
// Web-console capture: build the aggregated sample. The host send side exposes no
|
||||
// receiver-side packet loss / FEC-recovery / send-buffer EAGAIN counters, so those stay
|
||||
// 0 (not fabricated); `frames_dropped` is the per-frame send-queue overflow delta.
|
||||
if stats.is_armed() {
|
||||
let session_id = *sid.get_or_insert_with(|| {
|
||||
stats.register_session(
|
||||
"gamestream",
|
||||
cfg.width,
|
||||
cfg.height,
|
||||
cfg.fps,
|
||||
codec_name,
|
||||
client_label,
|
||||
)
|
||||
});
|
||||
let sample = crate::stats_recorder::StatsSample {
|
||||
t_ms: 0, // stamped by push_sample from the capture's monotonic start
|
||||
session_id,
|
||||
stages: vec![
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "capture".into(),
|
||||
p50_us: percentile(&mut v_cap, 0.50) as f32,
|
||||
p99_us: percentile(&mut v_cap, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "encode".into(),
|
||||
p50_us: percentile(&mut v_enc, 0.50) as f32,
|
||||
p99_us: percentile(&mut v_enc, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "packetize".into(),
|
||||
p50_us: percentile(&mut v_pkt, 0.50) as f32,
|
||||
p99_us: percentile(&mut v_pkt, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "send".into(),
|
||||
p50_us: percentile(&mut v_send, 0.50) as f32,
|
||||
p99_us: percentile(&mut v_send, 0.99) as f32,
|
||||
},
|
||||
],
|
||||
fps: (uniq as f64 / secs) as f32,
|
||||
repeat_fps: (fps_count.saturating_sub(uniq) as f64 / secs) as f32,
|
||||
mbps: (bytes_win as f64 * 8.0 / secs / 1_000_000.0) as f32,
|
||||
bitrate_kbps: cfg.bitrate_kbps,
|
||||
frames_dropped: dropped_batches.saturating_sub(last_dropped_batches) as u32,
|
||||
packets_dropped: 0,
|
||||
send_dropped: 0,
|
||||
fec_recovered: 0,
|
||||
};
|
||||
stats.push_sample(session_id, sample);
|
||||
}
|
||||
mx_cap = 0;
|
||||
mx_enc = 0;
|
||||
mx_pkt = 0;
|
||||
mx_send = 0;
|
||||
mx_pkts = 0;
|
||||
uniq = 0;
|
||||
v_cap.clear();
|
||||
v_enc.clear();
|
||||
v_pkt.clear();
|
||||
v_send.clear();
|
||||
bytes_win = 0;
|
||||
last_dropped_batches = dropped_batches;
|
||||
fps_count = 0;
|
||||
fps_t = Instant::now();
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ mod service;
|
||||
mod session_plan;
|
||||
mod session_tuning;
|
||||
mod spike;
|
||||
mod stats_recorder;
|
||||
mod vdisplay;
|
||||
#[cfg(target_os = "windows")]
|
||||
#[path = "windows/wgc_helper.rs"]
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::gamestream::{
|
||||
tls::{serve_https, PeerCertFingerprint},
|
||||
AppState, APP_VERSION, AUDIO_PORT, CONTROL_PORT, GFE_VERSION, RTSP_PORT, VIDEO_PORT,
|
||||
};
|
||||
use crate::stats_recorder::{Capture, CaptureMeta, StatsStatus};
|
||||
use anyhow::{Context, Result};
|
||||
use axum::{
|
||||
extract::{Path, Request, State},
|
||||
@@ -66,6 +67,9 @@ struct MgmtState {
|
||||
/// Native (punktfunk/1) pairing — shared with the QUIC host when the unified `serve --native`
|
||||
/// runs it. `None` ⇒ GameStream-only host (the native endpoints report `enabled: false`).
|
||||
native: Option<Arc<crate::native_pairing::NativePairing>>,
|
||||
/// Shared streaming-stats recorder — the same handle the streaming loops emit into, so an
|
||||
/// operator can arm/stop a capture here and review/list/delete saved recordings.
|
||||
stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
token: Option<String>,
|
||||
/// The port we serve on, echoed in [`PortMap`] so a client can persist a full endpoint map.
|
||||
port: u16,
|
||||
@@ -77,6 +81,7 @@ pub async fn run(
|
||||
state: Arc<AppState>,
|
||||
opts: Options,
|
||||
native: Option<Arc<crate::native_pairing::NativePairing>>,
|
||||
stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
) -> Result<()> {
|
||||
// The mgmt API is HTTPS + token-authenticated ALWAYS (even on loopback): `parse_serve`
|
||||
// guarantees a token (CLI flag / env / persisted ~/.config/punktfunk/mgmt-token / generated).
|
||||
@@ -100,7 +105,7 @@ pub async fn run(
|
||||
auth = "mTLS (paired cert) or bearer (required)",
|
||||
"management API listening over HTTPS (docs at /api/docs, spec at /api/v1/openapi.json)"
|
||||
);
|
||||
let app = app(state, Some(token), opts.bind.port(), native);
|
||||
let app = app(state, Some(token), opts.bind.port(), native, stats);
|
||||
serve_https(opts.bind, app, tls).await
|
||||
}
|
||||
|
||||
@@ -110,10 +115,12 @@ fn app(
|
||||
token: Option<String>,
|
||||
port: u16,
|
||||
native: Option<Arc<crate::native_pairing::NativePairing>>,
|
||||
stats: Arc<crate::stats_recorder::StatsRecorder>,
|
||||
) -> Router {
|
||||
let shared = Arc::new(MgmtState {
|
||||
app: state,
|
||||
native,
|
||||
stats,
|
||||
token,
|
||||
port,
|
||||
});
|
||||
@@ -158,7 +165,13 @@ fn api_router_parts() -> (Router<Arc<MgmtState>>, utoipa::openapi::OpenApi) {
|
||||
.routes(routes!(request_idr))
|
||||
.routes(routes!(get_library))
|
||||
.routes(routes!(create_custom_game))
|
||||
.routes(routes!(update_custom_game, delete_custom_game)),
|
||||
.routes(routes!(update_custom_game, delete_custom_game))
|
||||
.routes(routes!(stats_capture_start))
|
||||
.routes(routes!(stats_capture_stop))
|
||||
.routes(routes!(stats_capture_status))
|
||||
.routes(routes!(stats_capture_live))
|
||||
.routes(routes!(stats_recordings_list))
|
||||
.routes(routes!(stats_recording_get, stats_recording_delete)),
|
||||
)
|
||||
.split_for_parts()
|
||||
}
|
||||
@@ -190,6 +203,7 @@ pub fn openapi_json() -> String {
|
||||
(name = "native", description = "Native punktfunk/1 pairing: arm a window, display the host PIN, manage paired devices"),
|
||||
(name = "session", description = "Active streaming session control"),
|
||||
(name = "library", description = "Game library: installed-store titles (Steam) plus user-curated custom entries"),
|
||||
(name = "stats", description = "Streaming performance-stats capture: arm/stop a recording, read the live + saved time-series for graphing"),
|
||||
)
|
||||
)]
|
||||
struct ApiDoc;
|
||||
@@ -1218,6 +1232,185 @@ async fn delete_custom_game(Path(id): Path<String>) -> Response {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// Streaming stats capture (design/stats-capture-plan.md §2)
|
||||
// ---------------------------------------------------------------------------------------
|
||||
|
||||
/// Start a stats capture
|
||||
///
|
||||
/// Arms a new performance-stats capture. Idempotent: if a capture is already running this returns
|
||||
/// the current status unchanged. While armed, the streaming loops emit aggregated samples (~ every
|
||||
/// 1–2 s) into the in-progress capture, readable live via `GET /stats/capture/live`.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/stats/capture/start",
|
||||
tag = "stats",
|
||||
operation_id = "statsCaptureStart",
|
||||
responses(
|
||||
(status = OK, description = "Capture armed (or already running)", body = StatsStatus),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_capture_start(State(st): State<Arc<MgmtState>>) -> Json<StatsStatus> {
|
||||
let status = st.stats.start();
|
||||
tracing::info!(
|
||||
started_unix_ms = status.started_unix_ms,
|
||||
"management API: stats capture armed"
|
||||
);
|
||||
Json(status)
|
||||
}
|
||||
|
||||
/// Stop the stats capture
|
||||
///
|
||||
/// Disarms the in-progress capture and writes it to disk atomically, returning its summary. If
|
||||
/// nothing was recording, returns `204 No Content`.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/stats/capture/stop",
|
||||
tag = "stats",
|
||||
operation_id = "statsCaptureStop",
|
||||
responses(
|
||||
(status = OK, description = "Capture stopped and saved", body = CaptureMeta),
|
||||
(status = NO_CONTENT, description = "Nothing was recording"),
|
||||
(status = INTERNAL_SERVER_ERROR, description = "Could not write the recording to disk", body = ApiError),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_capture_stop(State(st): State<Arc<MgmtState>>) -> Response {
|
||||
match st.stats.stop() {
|
||||
Ok(Some(meta)) => {
|
||||
tracing::info!(id = %meta.id, samples = meta.sample_count, "management API: stats capture saved");
|
||||
(StatusCode::OK, Json(meta)).into_response()
|
||||
}
|
||||
Ok(None) => StatusCode::NO_CONTENT.into_response(),
|
||||
Err(e) => api_error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
&format!("could not save capture: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stats capture status
|
||||
///
|
||||
/// Whether a capture is armed, its sample count, and start time. Poll this (e.g. every 2 s) to
|
||||
/// drive the capture-control UI.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/stats/capture/status",
|
||||
tag = "stats",
|
||||
operation_id = "statsCaptureStatus",
|
||||
responses(
|
||||
(status = OK, description = "In-progress capture status (idle when not armed)", body = StatsStatus),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_capture_status(State(st): State<Arc<MgmtState>>) -> Json<StatsStatus> {
|
||||
Json(st.stats.status())
|
||||
}
|
||||
|
||||
/// Live in-progress capture
|
||||
///
|
||||
/// The full sample time-series of the capture currently recording, for live graphing. `404` when
|
||||
/// nothing is armed.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/stats/capture/live",
|
||||
tag = "stats",
|
||||
operation_id = "statsCaptureLive",
|
||||
responses(
|
||||
(status = OK, description = "The in-progress capture (meta + samples so far)", body = Capture),
|
||||
(status = NOT_FOUND, description = "No capture is currently recording", body = ApiError),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_capture_live(State(st): State<Arc<MgmtState>>) -> Response {
|
||||
match st.stats.live_snapshot() {
|
||||
Some(capture) => Json(capture).into_response(),
|
||||
None => api_error(StatusCode::NOT_FOUND, "no capture is currently recording"),
|
||||
}
|
||||
}
|
||||
|
||||
/// List saved recordings
|
||||
///
|
||||
/// Every saved capture's summary (the `meta` head only — not the sample body), newest first.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/stats/recordings",
|
||||
tag = "stats",
|
||||
operation_id = "statsRecordingsList",
|
||||
responses(
|
||||
(status = OK, description = "Saved capture summaries, newest first", body = [CaptureMeta]),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_recordings_list(State(st): State<Arc<MgmtState>>) -> Json<Vec<CaptureMeta>> {
|
||||
Json(st.stats.list())
|
||||
}
|
||||
|
||||
/// Get a saved recording
|
||||
///
|
||||
/// The full capture (meta + samples) for `id`, for graphing or download.
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/stats/recordings/{id}",
|
||||
tag = "stats",
|
||||
operation_id = "statsRecordingGet",
|
||||
params(("id" = String, Path, description = "The recording id (its filename stem)")),
|
||||
responses(
|
||||
(status = OK, description = "The full capture", body = Capture),
|
||||
(status = NOT_FOUND, description = "No recording with that id", body = ApiError),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
(status = INTERNAL_SERVER_ERROR, description = "The recording file is unreadable", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_recording_get(State(st): State<Arc<MgmtState>>, Path(id): Path<String>) -> Response {
|
||||
match st.stats.load(&id) {
|
||||
Ok(capture) => Json(capture).into_response(),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
api_error(StatusCode::NOT_FOUND, "no recording with that id")
|
||||
}
|
||||
Err(e) => api_error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
&format!("could not read recording: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a saved recording
|
||||
///
|
||||
/// Removes the recording `id` from disk. `404` if there is no such recording.
|
||||
#[utoipa::path(
|
||||
delete,
|
||||
path = "/stats/recordings/{id}",
|
||||
tag = "stats",
|
||||
operation_id = "statsRecordingDelete",
|
||||
params(("id" = String, Path, description = "The recording id (its filename stem)")),
|
||||
responses(
|
||||
(status = NO_CONTENT, description = "Recording deleted"),
|
||||
(status = NOT_FOUND, description = "No recording with that id", body = ApiError),
|
||||
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
|
||||
(status = INTERNAL_SERVER_ERROR, description = "Could not delete the recording", body = ApiError),
|
||||
)
|
||||
)]
|
||||
async fn stats_recording_delete(
|
||||
State(st): State<Arc<MgmtState>>,
|
||||
Path(id): Path<String>,
|
||||
) -> Response {
|
||||
match st.stats.delete(&id) {
|
||||
Ok(()) => {
|
||||
tracing::info!(id, "management API: recording deleted");
|
||||
StatusCode::NO_CONTENT.into_response()
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
api_error(StatusCode::NOT_FOUND, "no recording with that id")
|
||||
}
|
||||
Err(e) => api_error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
&format!("could not delete recording: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------------------
|
||||
@@ -1231,6 +1424,15 @@ mod tests {
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use tower::ServiceExt;
|
||||
|
||||
/// A throwaway stats recorder rooted in a unique temp dir (never touches the real config dir).
|
||||
fn test_stats() -> Arc<crate::stats_recorder::StatsRecorder> {
|
||||
crate::stats_recorder::StatsRecorder::new(std::env::temp_dir().join(format!(
|
||||
"pf-mgmt-stats-{}-{:p}",
|
||||
std::process::id(),
|
||||
&0u8 as *const u8
|
||||
)))
|
||||
}
|
||||
|
||||
fn test_state() -> Arc<AppState> {
|
||||
let host = Host {
|
||||
hostname: "test-host".into(),
|
||||
@@ -1240,18 +1442,20 @@ mod tests {
|
||||
https_port: HTTPS_PORT,
|
||||
};
|
||||
let identity = ServerIdentity::ephemeral().expect("ephemeral identity");
|
||||
Arc::new(AppState::new(host, identity))
|
||||
Arc::new(AppState::new(host, identity, test_stats()))
|
||||
}
|
||||
|
||||
// The mgmt API now always requires auth, so the router always has a token. A test that passes
|
||||
// `None` gets the default "test-secret" (and `send` auto-attaches the matching bearer); a test
|
||||
// that passes an explicit token exercises a mismatch (e.g. `bearer_token_is_enforced`).
|
||||
fn test_app(state: Arc<AppState>, token: Option<&str>) -> Router {
|
||||
let stats = state.stats.clone();
|
||||
app(
|
||||
state,
|
||||
Some(token.unwrap_or("test-secret").to_string()),
|
||||
DEFAULT_PORT,
|
||||
None,
|
||||
stats,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1261,11 +1465,13 @@ mod tests {
|
||||
) -> Router {
|
||||
// Auth required always; the paired-cert tests inject a fingerprint (cert branch wins), the
|
||||
// rest authenticate via the `send`-attached default bearer.
|
||||
let stats = state.stats.clone();
|
||||
app(
|
||||
state,
|
||||
Some("test-secret".to_string()),
|
||||
DEFAULT_PORT,
|
||||
Some(np),
|
||||
stats,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1580,7 +1786,9 @@ mod tests {
|
||||
bind: "127.0.0.1:0".parse().unwrap(),
|
||||
token: Some(" ".into()),
|
||||
};
|
||||
let err = run(test_state(), opts, None).await.unwrap_err();
|
||||
let err = run(test_state(), opts, None, test_stats())
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.to_string().contains("no token"), "{err}");
|
||||
}
|
||||
|
||||
|
||||
@@ -79,6 +79,9 @@ pub struct Punktfunk1Options {
|
||||
|
||||
/// The native (punktfunk/1) trust store + on-demand arming PIN, shared with the management API.
|
||||
use crate::native_pairing::NativePairing;
|
||||
/// The shared streaming-stats recorder (web-console capture/graph), shared with the management API
|
||||
/// and the GameStream loop; threaded into each session's `SessionContext`.
|
||||
use crate::stats_recorder::StatsRecorder;
|
||||
|
||||
/// Minimum spacing between accepted pairing ceremonies (bounds online PIN guessing — with
|
||||
/// SPAKE2 an attacker already gets only one guess per ceremony; this caps the rate).
|
||||
@@ -114,7 +117,11 @@ pub fn run(opts: Punktfunk1Options) -> Result<()> {
|
||||
opts.pairing_pin.clone(),
|
||||
opts.allow_pairing || opts.require_pairing,
|
||||
)?);
|
||||
rt.block_on(serve(opts, np))
|
||||
// Standalone `punktfunk1-host` has no mgmt API to arm capture, so this recorder stays disarmed
|
||||
// (harmless — the loops' `is_armed()` gate is always false). The unified `serve` shares one
|
||||
// recorder across mgmt + both streaming paths instead.
|
||||
let stats = StatsRecorder::new(crate::stats_recorder::default_dir());
|
||||
rt.block_on(serve(opts, np, stats))
|
||||
}
|
||||
|
||||
fn fingerprint_hex(fp: &[u8; 32]) -> String {
|
||||
@@ -157,7 +164,11 @@ pub(crate) fn native_serve_opts(cfg: &NativeServe) -> Punktfunk1Options {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn serve(opts: Punktfunk1Options, np: Arc<NativePairing>) -> Result<()> {
|
||||
pub(crate) async fn serve(
|
||||
opts: Punktfunk1Options,
|
||||
np: Arc<NativePairing>,
|
||||
stats: Arc<StatsRecorder>,
|
||||
) -> Result<()> {
|
||||
let identity = crate::gamestream::cert::ServerIdentity::load_or_create()
|
||||
.context("load host identity (~/.config/punktfunk)")?;
|
||||
let fingerprint = endpoint::fingerprint_of_pem(&identity.cert_pem)
|
||||
@@ -276,6 +287,7 @@ pub(crate) async fn serve(opts: Punktfunk1Options, np: Arc<NativePairing>) -> Re
|
||||
let audio_cap = audio_cap.clone();
|
||||
let np = np.clone();
|
||||
let last_pairing = last_pairing.clone();
|
||||
let stats = stats.clone();
|
||||
let inj_tx = injector.sender();
|
||||
let mic_tx = mic_service.sender();
|
||||
sessions.spawn(async move {
|
||||
@@ -289,6 +301,7 @@ pub(crate) async fn serve(opts: Punktfunk1Options, np: Arc<NativePairing>) -> Re
|
||||
&fingerprint,
|
||||
&np,
|
||||
&last_pairing,
|
||||
stats,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -479,6 +492,7 @@ async fn serve_session(
|
||||
host_fp: &[u8; 32],
|
||||
np: &NativePairing,
|
||||
last_pairing: &std::sync::Mutex<Option<std::time::Instant>>,
|
||||
stats: Arc<StatsRecorder>,
|
||||
) -> Result<()> {
|
||||
let peer = conn.remote_address();
|
||||
|
||||
@@ -935,6 +949,12 @@ async fn serve_session(
|
||||
let stop_stream = stop.clone();
|
||||
let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target
|
||||
let conn_stream = conn.clone(); // for sending the source's real HDR metadata (0xCE) mid-stream
|
||||
let stats_dp = stats; // data-plane handle to the shared stats recorder
|
||||
// Short label for web-console stats captures: the client's cert-fingerprint prefix, else its
|
||||
// peer IP (no fingerprint = anonymous TOFU/--open client).
|
||||
let client_label = endpoint::peer_fingerprint(&conn)
|
||||
.map(|fp| fingerprint_hex(&fp)[..12].to_string())
|
||||
.unwrap_or_else(|| conn.remote_address().ip().to_string());
|
||||
let result: Result<()> = async {
|
||||
tokio::task::spawn_blocking(move || -> Result<()> {
|
||||
// Wait briefly for the client to hole-punch our data port, then stream to its OBSERVED
|
||||
@@ -989,6 +1009,8 @@ async fn serve_session(
|
||||
probe_result_tx,
|
||||
fec_target: fec_target_dp,
|
||||
conn: conn_stream,
|
||||
stats: stats_dp,
|
||||
client_label,
|
||||
#[cfg(target_os = "windows")]
|
||||
launch: launch_for_dp,
|
||||
})
|
||||
@@ -1947,6 +1969,21 @@ struct FrameMsg {
|
||||
deadline: std::time::Instant,
|
||||
/// capture→encoded latency (µs), measured on the encode thread, carried for the perf histogram.
|
||||
encode_us: u32,
|
||||
/// Per-stage µs splits, measured on the capture/encode thread (0 when neither `PUNKTFUNK_PERF`
|
||||
/// nor a stats capture is armed). The send thread accumulates them for the web-console sample:
|
||||
/// `cap_us` = `try_latest` (ring read + colour convert), `submit_us` = NVENC `encode_picture`
|
||||
/// launch, `wait_us` = `lock_bitstream` (the scheduling wait + ASIC encode = the "encode" stage).
|
||||
cap_us: u32,
|
||||
submit_us: u32,
|
||||
wait_us: u32,
|
||||
/// This frame is a re-encoded hold (the source had no fresh frame): a source-starvation signal
|
||||
/// the send thread folds into `repeat_fps`.
|
||||
repeat: bool,
|
||||
/// Whether the per-stage splits (`cap_us`/`submit_us`/`wait_us`) were actually measured at
|
||||
/// capture time (`perf` was on or a stats capture was armed). The send thread trusts this
|
||||
/// instead of re-reading `is_armed()`, so a capture that arms while frames are already in flight
|
||||
/// doesn't fold their zeroed splits into the first window's percentiles.
|
||||
was_measured: bool,
|
||||
}
|
||||
|
||||
/// The dedicated send thread: it owns the whole [`Session`] (so no socket clone or shared stats are
|
||||
@@ -2020,6 +2057,19 @@ pub(crate) fn boost_thread_priority(critical: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Everything the send thread needs to emit web-console stats samples at its 2 s aggregation
|
||||
/// boundary: the shared recorder (whose `is_armed()` gates emission) plus the negotiated
|
||||
/// mode/codec/client to seed the capture's `CaptureMeta` on the first armed registration.
|
||||
struct SendStats {
|
||||
rec: Arc<StatsRecorder>,
|
||||
width: u32,
|
||||
height: u32,
|
||||
fps: u32,
|
||||
codec: &'static str,
|
||||
client: String,
|
||||
bitrate_kbps: u32,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn send_loop(
|
||||
mut session: Session,
|
||||
@@ -2030,6 +2080,7 @@ fn send_loop(
|
||||
perf: bool,
|
||||
burst_cap: usize,
|
||||
fec_target: Arc<AtomicU8>,
|
||||
stats: SendStats,
|
||||
) {
|
||||
boost_thread_priority(false); // transmit thread: above-normal (Apollo's encoder-thread level)
|
||||
let mut last_perf = std::time::Instant::now();
|
||||
@@ -2038,6 +2089,16 @@ fn send_loop(
|
||||
let mut encode_us: Vec<u32> = Vec::new();
|
||||
let mut pace_us: Vec<u32> = Vec::new();
|
||||
let (mut paced_frames, mut immediate_frames) = (0u64, 0u64);
|
||||
// Web-console stats accumulation (active when `perf` OR the recorder is armed): the per-stage
|
||||
// split carried on each FrameMsg, the new-vs-repeat frame split, the cached registration id, and
|
||||
// the previous window's loss snapshot for delta computation.
|
||||
let mut sid: Option<u32> = None;
|
||||
let (mut cap_v, mut submit_v, mut wait_v): (Vec<u32>, Vec<u32>, Vec<u32>) =
|
||||
(Vec::new(), Vec::new(), Vec::new());
|
||||
let (mut new_frames, mut repeat_frames) = (0u64, 0u64);
|
||||
let mut last_frames_dropped = 0u64;
|
||||
let mut last_packets_dropped = 0u64;
|
||||
let mut last_fec_recovered = 0u64;
|
||||
loop {
|
||||
if stop.load(Ordering::SeqCst) {
|
||||
break;
|
||||
@@ -2058,9 +2119,24 @@ fn send_loop(
|
||||
burst_cap,
|
||||
) {
|
||||
Ok(stat) => {
|
||||
if perf {
|
||||
if perf || stats.rec.is_armed() {
|
||||
// `encode_us`/`pace_us`/fps are valid for every frame (always measured),
|
||||
// including the Windows relay + tail-drain frames. The cap/submit/wait splits
|
||||
// are only real when the frame was measured at capture time — a frame captured
|
||||
// before this capture armed carries zeroed splits, so skip those (an empty
|
||||
// window → `percentile()` returns 0) rather than pull the percentiles down.
|
||||
encode_us.push(msg.encode_us);
|
||||
pace_us.push(stat.spread_us);
|
||||
if msg.was_measured {
|
||||
cap_v.push(msg.cap_us);
|
||||
submit_v.push(msg.submit_us);
|
||||
wait_v.push(msg.wait_us);
|
||||
}
|
||||
if msg.repeat {
|
||||
repeat_frames += 1;
|
||||
} else {
|
||||
new_frames += 1;
|
||||
}
|
||||
if stat.paced {
|
||||
paced_frames += 1;
|
||||
} else {
|
||||
@@ -2076,31 +2152,91 @@ fn send_loop(
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, // encode thread done
|
||||
}
|
||||
if perf && last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||
if last_perf.elapsed() >= std::time::Duration::from_secs(2) {
|
||||
let s = session.stats();
|
||||
let secs = last_perf.elapsed().as_secs_f64();
|
||||
// Attempted (sealed) transmit rate; `send_dropped` is what didn't reach the wire.
|
||||
let tx_mbps = (s.bytes_sent - last_bytes) as f64 * 8.0 / secs / 1_000_000.0;
|
||||
tracing::info!(
|
||||
tx_mbps = format!("{tx_mbps:.0}"),
|
||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||
send_dropped_total = s.packets_send_dropped,
|
||||
encode_us_p50 = percentile(&mut encode_us, 0.50),
|
||||
encode_us_p99 = percentile(&mut encode_us, 0.99),
|
||||
pace_us_p50 = percentile(&mut pace_us, 0.50),
|
||||
pace_us_p99 = percentile(&mut pace_us, 0.99),
|
||||
pace_us_max = pace_us.last().copied().unwrap_or(0),
|
||||
immediate_frames,
|
||||
paced_frames,
|
||||
"perf"
|
||||
);
|
||||
if perf {
|
||||
tracing::info!(
|
||||
tx_mbps = format!("{tx_mbps:.0}"),
|
||||
send_dropped = s.packets_send_dropped - last_send_dropped,
|
||||
send_dropped_total = s.packets_send_dropped,
|
||||
encode_us_p50 = percentile(&mut encode_us, 0.50),
|
||||
encode_us_p99 = percentile(&mut encode_us, 0.99),
|
||||
pace_us_p50 = percentile(&mut pace_us, 0.50),
|
||||
pace_us_p99 = percentile(&mut pace_us, 0.99),
|
||||
pace_us_max = pace_us.last().copied().unwrap_or(0),
|
||||
immediate_frames,
|
||||
paced_frames,
|
||||
"perf"
|
||||
);
|
||||
}
|
||||
// Web-console capture: this thread owns `session.stats()`, so it emits the COMPLETE
|
||||
// sample — the cap/submit/encode split carried over from the capture thread plus this
|
||||
// window's pacing/goodput/loss. Loss fields are deltas vs the previous window's snapshot.
|
||||
if stats.rec.is_armed() {
|
||||
let session_id = *sid.get_or_insert_with(|| {
|
||||
stats.rec.register_session(
|
||||
"native",
|
||||
stats.width,
|
||||
stats.height,
|
||||
stats.fps,
|
||||
stats.codec,
|
||||
&stats.client,
|
||||
)
|
||||
});
|
||||
let sample = crate::stats_recorder::StatsSample {
|
||||
t_ms: 0, // stamped by push_sample from the capture's monotonic start
|
||||
session_id,
|
||||
stages: vec![
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "capture".into(),
|
||||
p50_us: percentile(&mut cap_v, 0.50) as f32,
|
||||
p99_us: percentile(&mut cap_v, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "submit".into(),
|
||||
p50_us: percentile(&mut submit_v, 0.50) as f32,
|
||||
p99_us: percentile(&mut submit_v, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "encode".into(),
|
||||
p50_us: percentile(&mut wait_v, 0.50) as f32,
|
||||
p99_us: percentile(&mut wait_v, 0.99) as f32,
|
||||
},
|
||||
crate::stats_recorder::StageTiming {
|
||||
name: "send".into(),
|
||||
p50_us: percentile(&mut pace_us, 0.50) as f32,
|
||||
p99_us: percentile(&mut pace_us, 0.99) as f32,
|
||||
},
|
||||
],
|
||||
fps: (new_frames as f64 / secs) as f32,
|
||||
repeat_fps: (repeat_frames as f64 / secs) as f32,
|
||||
mbps: tx_mbps as f32,
|
||||
bitrate_kbps: stats.bitrate_kbps,
|
||||
frames_dropped: s.frames_dropped.saturating_sub(last_frames_dropped) as u32,
|
||||
packets_dropped: s.packets_dropped.saturating_sub(last_packets_dropped) as u32,
|
||||
send_dropped: s.packets_send_dropped.saturating_sub(last_send_dropped) as u32,
|
||||
fec_recovered: s.fec_recovered_shards.saturating_sub(last_fec_recovered) as u32,
|
||||
};
|
||||
stats.rec.push_sample(session_id, sample);
|
||||
}
|
||||
last_perf = std::time::Instant::now();
|
||||
last_bytes = s.bytes_sent;
|
||||
last_send_dropped = s.packets_send_dropped;
|
||||
last_frames_dropped = s.frames_dropped;
|
||||
last_packets_dropped = s.packets_dropped;
|
||||
last_fec_recovered = s.fec_recovered_shards;
|
||||
encode_us.clear();
|
||||
pace_us.clear();
|
||||
cap_v.clear();
|
||||
submit_v.clear();
|
||||
wait_v.clear();
|
||||
paced_frames = 0;
|
||||
immediate_frames = 0;
|
||||
new_frames = 0;
|
||||
repeat_frames = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2201,6 +2337,13 @@ struct SessionContext {
|
||||
fec_target: Arc<AtomicU8>,
|
||||
/// The QUIC control connection (carries host→client 0xCE source-HDR metadata mid-stream).
|
||||
conn: quinn::Connection,
|
||||
/// Shared streaming-stats recorder. The capture loop reads `is_armed()` per frame to decide
|
||||
/// whether to measure the per-stage split; the send thread builds + pushes the aggregated
|
||||
/// `StatsSample` at its 2 s boundary.
|
||||
stats: Arc<StatsRecorder>,
|
||||
/// Short client label (cert-fingerprint prefix, else peer IP) seeded into the capture meta on
|
||||
/// the first armed stats registration.
|
||||
client_label: String,
|
||||
/// Windows: the store-qualified library id to launch into the interactive user session once
|
||||
/// capture is live (no gamescope nesting on Windows). `None` = no launch requested. Linux uses the
|
||||
/// gamescope `PUNKTFUNK_GAMESCOPE_APP` path resolved at handshake, so this field is Windows-only.
|
||||
@@ -2242,6 +2385,8 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
probe_result_tx,
|
||||
fec_target,
|
||||
conn,
|
||||
stats,
|
||||
client_label,
|
||||
#[cfg(target_os = "windows")]
|
||||
launch,
|
||||
} = ctx;
|
||||
@@ -2310,6 +2455,17 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
// The bounded channel applies backpressure (the encode thread blocks if the send falls behind,
|
||||
// so frames slow down rather than a dropped frame freezing the infinite-GOP stream).
|
||||
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<FrameMsg>(3);
|
||||
// The send thread emits the web-console stats sample (it owns `session.stats()`); clone the
|
||||
// recorder so the capture loop keeps its own handle for the per-frame `is_armed()` gate.
|
||||
let send_stats = SendStats {
|
||||
rec: stats.clone(),
|
||||
width: mode.width,
|
||||
height: mode.height,
|
||||
fps: mode.refresh_hz,
|
||||
codec: "hevc",
|
||||
client: client_label,
|
||||
bitrate_kbps,
|
||||
};
|
||||
let send_thread = std::thread::Builder::new()
|
||||
.name("punktfunk-send".into())
|
||||
.spawn({
|
||||
@@ -2324,6 +2480,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
perf,
|
||||
burst_cap,
|
||||
fec_target,
|
||||
send_stats,
|
||||
)
|
||||
}
|
||||
})
|
||||
@@ -2480,18 +2637,31 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
tracing::debug!("forcing keyframe (client decode recovery)");
|
||||
enc.request_keyframe();
|
||||
}
|
||||
// Measure the per-stage split when `PUNKTFUNK_PERF` is set OR a web-console stats capture is
|
||||
// armed (a cheap Relaxed atomic, re-read each frame). The values feed the existing perf log
|
||||
// unchanged and ride each FrameMsg to the send thread, which builds the aggregated sample.
|
||||
let measure = perf || stats.is_armed();
|
||||
let t_cap = std::time::Instant::now();
|
||||
let cap_result = capturer.try_latest();
|
||||
let cap_us = if measure {
|
||||
t_cap.elapsed().as_micros() as u32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
if perf {
|
||||
st_cap.push(t_cap.elapsed().as_micros() as u32);
|
||||
st_cap.push(cap_us);
|
||||
}
|
||||
let mut repeat = false;
|
||||
match cap_result {
|
||||
Ok(Some(f)) => {
|
||||
frame = f;
|
||||
diag_new += 1;
|
||||
capture_rebuilds = 0; // a delivered frame clears the consecutive-loss counter
|
||||
}
|
||||
Ok(None) => diag_repeat += 1, // no new frame (static desktop / mid-rebuild) — repeat the last
|
||||
Ok(None) => {
|
||||
diag_repeat += 1; // no new frame (static desktop / mid-rebuild) — repeat the last
|
||||
repeat = true;
|
||||
}
|
||||
// The capture source died (PipeWire/compositor thread ended, virtual output gone). Rather
|
||||
// than tear the whole session down — the client has no reconnect path and would have to
|
||||
// cold-restart the handshake — rebuild the pipeline IN PLACE at the current mode, exactly
|
||||
@@ -2558,8 +2728,13 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
let capture_ns = now_ns();
|
||||
let t_submit = std::time::Instant::now();
|
||||
enc.submit(&frame).context("encoder submit")?;
|
||||
let submit_us = if measure {
|
||||
t_submit.elapsed().as_micros() as u32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
if perf {
|
||||
st_submit.push(t_submit.elapsed().as_micros() as u32);
|
||||
st_submit.push(submit_us);
|
||||
}
|
||||
// This frame's pacing deadline (the next frame's due time); the send thread spreads a big frame
|
||||
// up to here. Each in-flight frame carries its own (capture_ns, deadline) for when it's polled.
|
||||
@@ -2573,8 +2748,13 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
while inflight.len() >= depth {
|
||||
let t_wait = std::time::Instant::now();
|
||||
let polled = enc.poll().context("encoder poll")?;
|
||||
let wait_us = if measure {
|
||||
t_wait.elapsed().as_micros() as u32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
if perf {
|
||||
st_wait.push(t_wait.elapsed().as_micros() as u32);
|
||||
st_wait.push(wait_us);
|
||||
}
|
||||
let au = match polled {
|
||||
Some(au) => au,
|
||||
@@ -2602,6 +2782,11 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
flags,
|
||||
deadline,
|
||||
encode_us,
|
||||
cap_us,
|
||||
submit_us,
|
||||
wait_us,
|
||||
repeat,
|
||||
was_measured: measure,
|
||||
};
|
||||
// Hand to the send thread; this blocks (backpressure) if it's behind. An Err means it
|
||||
// exited (send failure / stop) — end the encode loop too.
|
||||
@@ -2629,12 +2814,19 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> {
|
||||
FLAG_PIC as u32
|
||||
};
|
||||
let encode_us = (now_ns().saturating_sub(cap_ns) / 1000) as u32;
|
||||
// End-of-stream tail drain: the per-stage split isn't measured here (the capture loop has
|
||||
// exited), so leave it zero — these last few frames are negligible for the aggregates.
|
||||
let msg = FrameMsg {
|
||||
data: au.data,
|
||||
capture_ns: cap_ns,
|
||||
flags,
|
||||
deadline,
|
||||
encode_us,
|
||||
cap_us: 0,
|
||||
submit_us: 0,
|
||||
wait_us: 0,
|
||||
repeat: false,
|
||||
was_measured: false,
|
||||
};
|
||||
if frame_tx.send(msg).is_err() {
|
||||
break;
|
||||
@@ -2681,6 +2873,8 @@ fn virtual_stream_relay(ctx: SessionContext) -> Result<()> {
|
||||
probe_result_tx,
|
||||
fec_target,
|
||||
conn: _conn,
|
||||
stats,
|
||||
client_label,
|
||||
launch,
|
||||
} = ctx;
|
||||
tracing::info!(
|
||||
@@ -2815,7 +3009,18 @@ fn virtual_stream_relay(ctx: SessionContext) -> Result<()> {
|
||||
* 1024;
|
||||
|
||||
// Same encode|send split as the single-process path: this thread relays AUs, a dedicated send
|
||||
// thread owns the Session and does FEC+seal+paced-send.
|
||||
// thread owns the Session and does FEC+seal+paced-send. The relay encodes in the helper process,
|
||||
// so this path's FrameMsgs carry no cap/submit/encode split (those stages stay 0 in the sample);
|
||||
// the send thread still emits fps/goodput/pacing/loss from `session.stats()`.
|
||||
let send_stats = SendStats {
|
||||
rec: stats,
|
||||
width: mode.width,
|
||||
height: mode.height,
|
||||
fps: effective_hz,
|
||||
codec: "hevc",
|
||||
client: client_label,
|
||||
bitrate_kbps,
|
||||
};
|
||||
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<FrameMsg>(3);
|
||||
let send_thread = std::thread::Builder::new()
|
||||
.name("punktfunk-send".into())
|
||||
@@ -2831,6 +3036,7 @@ fn virtual_stream_relay(ctx: SessionContext) -> Result<()> {
|
||||
perf,
|
||||
burst_cap,
|
||||
fec_target,
|
||||
send_stats,
|
||||
)
|
||||
}
|
||||
})
|
||||
@@ -2893,6 +3099,11 @@ fn virtual_stream_relay(ctx: SessionContext) -> Result<()> {
|
||||
flags,
|
||||
deadline: std::time::Instant::now() + interval,
|
||||
encode_us,
|
||||
cap_us: 0,
|
||||
submit_us: 0,
|
||||
wait_us: 0,
|
||||
repeat: false,
|
||||
was_measured: false,
|
||||
};
|
||||
let ok = frame_tx.send(msg).is_ok();
|
||||
if ok {
|
||||
@@ -3645,6 +3856,9 @@ mod tests {
|
||||
paired_store: None, // unused: the shared `np` IS the store handle
|
||||
},
|
||||
np_host,
|
||||
StatsRecorder::new(
|
||||
std::env::temp_dir().join(format!("pf-approval-stats-{}", std::process::id())),
|
||||
),
|
||||
))
|
||||
});
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
|
||||
@@ -0,0 +1,553 @@
|
||||
//! Shared streaming-stats recorder (`design/stats-capture-plan.md` §1). One
|
||||
//! [`StatsRecorder`] handle is created once in the unified host entry
|
||||
//! (`gamestream::serve`) alongside [`crate::native_pairing::NativePairing`], and shared with
|
||||
//! **both** the management API ([`crate::mgmt`]) and the streaming loops (threaded through
|
||||
//! [`crate::punktfunk1::serve`] → `SessionContext` and into the GameStream encode loop). The
|
||||
//! operator arms a capture from the web console, plays a session, stops, and reviews the
|
||||
//! captured time-series as graphs; captures are saved to disk and survive a host restart.
|
||||
//!
|
||||
//! Hot-path discipline: [`StatsRecorder::is_armed`] is a cheap `Relaxed` atomic load (re-read
|
||||
//! per frame); sample construction happens only at the loops' existing ~2 s / ~1 s aggregation
|
||||
//! boundary, never per frame. Memory is bounded ([`MAX_SAMPLES`]); the on-disk write is atomic
|
||||
//! (temp + rename); and capture ids are path-traversal-safe.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
/// Cap on samples kept in one capture: ≈ 3 h at one sample / 2 s. On overflow we stop appending
|
||||
/// (keeping the oldest — a saved recording must keep its start), never dropping the front and never
|
||||
/// growing unbounded.
|
||||
const MAX_SAMPLES: usize = 5400;
|
||||
|
||||
/// One pipeline stage's latency in an aggregation window (microseconds).
|
||||
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
|
||||
pub struct StageTiming {
|
||||
/// `"capture" | "submit" | "encode" | "packetize" | "send"` (path-dependent).
|
||||
pub name: String,
|
||||
pub p50_us: f32,
|
||||
pub p99_us: f32,
|
||||
}
|
||||
|
||||
/// One aggregated sample (~ every 2 s native, ~ every 1 s GameStream).
|
||||
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
|
||||
pub struct StatsSample {
|
||||
/// Milliseconds since capture start (monotonic; stamped by [`StatsRecorder::push_sample`]).
|
||||
pub t_ms: u64,
|
||||
/// Disambiguates concurrent sessions (usually constant).
|
||||
pub session_id: u32,
|
||||
/// Ordered pipeline stages for this path.
|
||||
pub stages: Vec<StageTiming>,
|
||||
/// Genuine NEW frames/s from the source.
|
||||
pub fps: f32,
|
||||
/// Re-encoded holds/s (source-starvation indicator).
|
||||
pub repeat_fps: f32,
|
||||
/// Transmit goodput (Mb/s).
|
||||
pub mbps: f32,
|
||||
/// Configured target bitrate.
|
||||
pub bitrate_kbps: u32,
|
||||
/// Frames dropped this window (delta).
|
||||
pub frames_dropped: u32,
|
||||
/// Packets dropped this window (receiver-side / reassembler, where known).
|
||||
pub packets_dropped: u32,
|
||||
/// Host send-buffer overflow / EAGAIN this window (delta).
|
||||
pub send_dropped: u32,
|
||||
/// FEC shards recovered this window (delta).
|
||||
pub fec_recovered: u32,
|
||||
}
|
||||
|
||||
/// Capture summary — the filename stem plus the negotiated mode/codec/client. Stored at the head
|
||||
/// of each on-disk recording and listed standalone (without the sample body) by
|
||||
/// [`StatsRecorder::list`].
|
||||
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
|
||||
pub struct CaptureMeta {
|
||||
/// e.g. `"2026-06-26T20-14-03Z_5120x1440"` — also the filename stem.
|
||||
pub id: String,
|
||||
pub started_unix_ms: u64,
|
||||
pub duration_ms: u64,
|
||||
/// `"native" | "gamestream"`.
|
||||
pub kind: String,
|
||||
pub width: u32,
|
||||
pub height: u32,
|
||||
pub fps: u32,
|
||||
/// `"h264" | "hevc" | "av1"`.
|
||||
pub codec: String,
|
||||
/// Short label / fingerprint prefix, or `""` if unknown.
|
||||
pub client: String,
|
||||
pub sample_count: u32,
|
||||
}
|
||||
|
||||
/// A full capture: summary + the sample time-series. The wire + on-disk shape.
|
||||
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
|
||||
pub struct Capture {
|
||||
pub meta: CaptureMeta,
|
||||
pub samples: Vec<StatsSample>,
|
||||
}
|
||||
|
||||
/// Snapshot of the in-progress capture for the management API.
|
||||
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
|
||||
pub struct StatsStatus {
|
||||
/// Capture currently running.
|
||||
pub armed: bool,
|
||||
/// Samples in the in-progress capture.
|
||||
pub sample_count: u32,
|
||||
/// Unix start time of the in-progress capture (`0` if idle).
|
||||
pub started_unix_ms: u64,
|
||||
/// Path of the in-progress capture (`""` if idle).
|
||||
pub kind: String,
|
||||
}
|
||||
|
||||
/// Mode/codec/client seeded on the first [`StatsRecorder::register_session`] of a capture.
|
||||
#[derive(Clone)]
|
||||
struct MetaSeed {
|
||||
kind: String,
|
||||
width: u32,
|
||||
height: u32,
|
||||
fps: u32,
|
||||
codec: String,
|
||||
client: String,
|
||||
}
|
||||
|
||||
/// The in-progress capture (present iff armed).
|
||||
struct Live {
|
||||
/// Monotonic clock origin for sample `t_ms`.
|
||||
started: Instant,
|
||||
started_unix_ms: u64,
|
||||
/// Seeded once, on the first session registration.
|
||||
meta: Option<MetaSeed>,
|
||||
samples: Vec<StatsSample>,
|
||||
/// Set once the sample cap was hit (further samples dropped). Read so it isn't dead.
|
||||
truncated: bool,
|
||||
}
|
||||
|
||||
/// Shared streaming-stats recorder: an arm/disarm flag (the hot-path gate), the in-progress
|
||||
/// capture, and the on-disk capture directory.
|
||||
pub struct StatsRecorder {
|
||||
dir: PathBuf,
|
||||
/// The hot-path gate — a `Relaxed` load per frame; never blocks the frame thread.
|
||||
armed: AtomicBool,
|
||||
/// The in-progress capture. Locks recover a poisoned guard (`unwrap_or_else(|e| e.into_inner())`,
|
||||
/// as in `vdisplay::gamescope`) rather than `unwrap()`: a panic somewhere must never make stats
|
||||
/// recording crash an otherwise-healthy stream. The critical sections only push/clone/format, so
|
||||
/// poisoning is near-impossible anyway — this is belt-and-suspenders.
|
||||
live: Mutex<Option<Live>>,
|
||||
next_sid: AtomicU32,
|
||||
}
|
||||
|
||||
/// The default captures directory: `~/.config/punktfunk/captures/` (next to `cert.pem`),
|
||||
/// resolved via the same config-dir helper the rest of the host uses.
|
||||
pub fn default_dir() -> PathBuf {
|
||||
crate::gamestream::config_dir().join("captures")
|
||||
}
|
||||
|
||||
/// `id` charset gate, matching `^[A-Za-z0-9._-]+$` — the exact charset `capture_id` emits (which
|
||||
/// deliberately uses dashes, not colons, so the stem is a valid Windows filename). We additionally
|
||||
/// reject `.`/`..` so a path-component sneaks no parent reference even though the charset would allow
|
||||
/// bare dots. The charset already excludes `/` and `\`, so `dir.join("<id>.json")` is always a single
|
||||
/// child of `dir`. Defense in depth — the endpoints are bearer-authed.
|
||||
fn valid_id(id: &str) -> bool {
|
||||
!id.is_empty()
|
||||
&& id != "."
|
||||
&& id != ".."
|
||||
&& id
|
||||
.bytes()
|
||||
.all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'_' | b'-'))
|
||||
}
|
||||
|
||||
fn unix_ms_now() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_millis() as u64)
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// A human-readable, filesystem-safe capture id from the start time + mode, e.g.
|
||||
/// `2026-06-26T20-14-03Z_5120x1440`. Dashes (not colons) in the time so it's a valid Windows
|
||||
/// filename; matches [`valid_id`].
|
||||
fn capture_id(unix_ms: u64, width: u32, height: u32) -> String {
|
||||
let secs = (unix_ms / 1000) as i64;
|
||||
let days = secs.div_euclid(86_400);
|
||||
let tod = secs.rem_euclid(86_400);
|
||||
let (y, mo, d) = civil_from_days(days);
|
||||
let (h, mi, s) = (tod / 3600, (tod % 3600) / 60, tod % 60);
|
||||
format!("{y:04}-{mo:02}-{d:02}T{h:02}-{mi:02}-{s:02}Z_{width}x{height}")
|
||||
}
|
||||
|
||||
/// Civil (Y, M, D) from a count of days since the Unix epoch (Howard Hinnant's `civil_from_days`).
|
||||
fn civil_from_days(z: i64) -> (i64, u32, u32) {
|
||||
let z = z + 719_468;
|
||||
let era = if z >= 0 { z } else { z - 146_096 }.div_euclid(146_097);
|
||||
let doe = z - era * 146_097; // [0, 146096]
|
||||
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; // [0, 399]
|
||||
let y = yoe + era * 400;
|
||||
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); // [0, 365]
|
||||
let mp = (5 * doy + 2) / 153; // [0, 11]
|
||||
let d = (doy - (153 * mp + 2) / 5 + 1) as u32; // [1, 31]
|
||||
let m = if mp < 10 { mp + 3 } else { mp - 9 }; // [1, 12]
|
||||
(if m <= 2 { y + 1 } else { y }, m as u32, d)
|
||||
}
|
||||
|
||||
impl StatsRecorder {
|
||||
/// Create the recorder, creating `dir` (owner-private, best-effort) if missing.
|
||||
pub fn new(dir: PathBuf) -> Arc<Self> {
|
||||
if let Err(e) = crate::gamestream::create_private_dir(&dir) {
|
||||
tracing::warn!(dir = %dir.display(), error = %e, "could not create stats captures dir");
|
||||
}
|
||||
Arc::new(StatsRecorder {
|
||||
dir,
|
||||
armed: AtomicBool::new(false),
|
||||
live: Mutex::new(None),
|
||||
next_sid: AtomicU32::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
/// The hot-path gate: cheap `Relaxed` load, called per frame to decide whether to measure.
|
||||
pub fn is_armed(&self) -> bool {
|
||||
self.armed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Arm a new capture. No-op if already armed (returns the current status).
|
||||
pub fn start(&self) -> StatsStatus {
|
||||
let mut guard = self.live.lock().unwrap_or_else(|e| e.into_inner());
|
||||
if guard.is_none() {
|
||||
*guard = Some(Live {
|
||||
started: Instant::now(),
|
||||
started_unix_ms: unix_ms_now(),
|
||||
meta: None,
|
||||
samples: Vec::new(),
|
||||
truncated: false,
|
||||
});
|
||||
// Publish AFTER the live capture exists, so a frame thread that observes `armed` always
|
||||
// finds a capture to push into.
|
||||
self.armed.store(true, Ordering::Relaxed);
|
||||
}
|
||||
status_of(guard.as_ref())
|
||||
}
|
||||
|
||||
/// A streaming loop announces itself when it first records while armed. Seeds the capture's
|
||||
/// `CaptureMeta` (kind/w/h/fps/codec/client) on the FIRST registration; returns a session id
|
||||
/// to stamp on the loop's samples.
|
||||
pub fn register_session(
|
||||
&self,
|
||||
kind: &'static str,
|
||||
w: u32,
|
||||
h: u32,
|
||||
fps: u32,
|
||||
codec: &str,
|
||||
client: &str,
|
||||
) -> u32 {
|
||||
let sid = self.next_sid.fetch_add(1, Ordering::Relaxed);
|
||||
let mut guard = self.live.lock().unwrap_or_else(|e| e.into_inner());
|
||||
if let Some(live) = guard.as_mut() {
|
||||
if live.meta.is_none() {
|
||||
live.meta = Some(MetaSeed {
|
||||
kind: kind.to_string(),
|
||||
width: w,
|
||||
height: h,
|
||||
fps,
|
||||
codec: codec.to_string(),
|
||||
client: client.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
sid
|
||||
}
|
||||
|
||||
/// Append one aggregated sample (called from the loops' existing ~2 s / ~1 s boundary). The
|
||||
/// `t_ms` is (re)stamped here from the capture's monotonic start, so callers may leave it `0`.
|
||||
/// Bounded at [`MAX_SAMPLES`]: on overflow we stop appending (oldest kept) and flag truncation.
|
||||
/// A no-op when nothing is armed (e.g. a `stop()` raced the frame boundary).
|
||||
pub fn push_sample(&self, session_id: u32, mut sample: StatsSample) {
|
||||
let mut guard = self.live.lock().unwrap_or_else(|e| e.into_inner());
|
||||
let Some(live) = guard.as_mut() else { return };
|
||||
if live.samples.len() >= MAX_SAMPLES {
|
||||
if !live.truncated {
|
||||
live.truncated = true;
|
||||
tracing::warn!(
|
||||
max = MAX_SAMPLES,
|
||||
"stats capture hit the sample cap — further samples dropped (oldest kept)"
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
sample.session_id = session_id;
|
||||
sample.t_ms = live.started.elapsed().as_millis() as u64;
|
||||
live.samples.push(sample);
|
||||
}
|
||||
|
||||
/// Disarm + finalize: write `<dir>/<id>.json` atomically (temp + rename) and return its meta.
|
||||
/// `Ok(None)` if nothing was recording.
|
||||
pub fn stop(&self) -> std::io::Result<Option<CaptureMeta>> {
|
||||
// Clear the hot-path gate first so frame threads stop building samples immediately.
|
||||
self.armed.store(false, Ordering::Relaxed);
|
||||
let Some(live) = self.live.lock().unwrap_or_else(|e| e.into_inner()).take() else {
|
||||
return Ok(None);
|
||||
};
|
||||
let meta = meta_of(&live);
|
||||
let capture = Capture {
|
||||
meta: meta.clone(),
|
||||
samples: live.samples,
|
||||
};
|
||||
let bytes = serde_json::to_vec(&capture).map_err(std::io::Error::other)?;
|
||||
// Atomic replace: write a sibling temp then rename, so a crash mid-write can't leave a half
|
||||
// file. The id is generated (always `valid_id`), so this only ever names a child of `dir`.
|
||||
let path = self.dir.join(format!("{}.json", meta.id));
|
||||
let tmp = self.dir.join(format!("{}.json.tmp", meta.id));
|
||||
std::fs::write(&tmp, &bytes)?;
|
||||
std::fs::rename(&tmp, &path)?;
|
||||
Ok(Some(meta))
|
||||
}
|
||||
|
||||
/// The in-progress capture status (idle = `armed: false`, zeroed fields).
|
||||
pub fn status(&self) -> StatsStatus {
|
||||
status_of(self.live.lock().unwrap_or_else(|e| e.into_inner()).as_ref())
|
||||
}
|
||||
|
||||
/// A clone of the in-progress capture for live graphing (`None` when idle).
|
||||
pub fn live_snapshot(&self) -> Option<Capture> {
|
||||
let guard = self.live.lock().unwrap_or_else(|e| e.into_inner());
|
||||
let live = guard.as_ref()?;
|
||||
Some(Capture {
|
||||
meta: meta_of(live),
|
||||
samples: live.samples.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// All saved recordings, newest first, parsing each file's `meta` head only (not the samples).
|
||||
pub fn list(&self) -> Vec<CaptureMeta> {
|
||||
/// Parse only the `meta` head — serde skips the (large) `samples` array.
|
||||
#[derive(Deserialize)]
|
||||
struct MetaOnly {
|
||||
meta: CaptureMeta,
|
||||
}
|
||||
let mut out: Vec<CaptureMeta> = Vec::new();
|
||||
let Ok(entries) = std::fs::read_dir(&self.dir) else {
|
||||
return out;
|
||||
};
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|e| e.to_str()) != Some("json") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(bytes) = std::fs::read(&path) {
|
||||
if let Ok(parsed) = serde_json::from_slice::<MetaOnly>(&bytes) {
|
||||
out.push(parsed.meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
out.sort_by_key(|m| std::cmp::Reverse(m.started_unix_ms));
|
||||
out
|
||||
}
|
||||
|
||||
/// Load a saved recording by id. Rejects a path-unsafe id (and a missing file) as `NotFound`.
|
||||
pub fn load(&self, id: &str) -> std::io::Result<Capture> {
|
||||
let path = self.recording_path(id)?;
|
||||
let bytes = std::fs::read(&path)?;
|
||||
serde_json::from_slice(&bytes)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
|
||||
}
|
||||
|
||||
/// Delete a saved recording by id. Rejects a path-unsafe id (and a missing file) as `NotFound`.
|
||||
pub fn delete(&self, id: &str) -> std::io::Result<()> {
|
||||
let path = self.recording_path(id)?;
|
||||
std::fs::remove_file(&path)
|
||||
}
|
||||
|
||||
/// Resolve `dir/<id>.json` after validating `id`. A rejected id is `NotFound` (defense in
|
||||
/// depth: never let an attacker-shaped id escape `dir`).
|
||||
fn recording_path(&self, id: &str) -> std::io::Result<PathBuf> {
|
||||
if !valid_id(id) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
"invalid recording id",
|
||||
));
|
||||
}
|
||||
Ok(self.dir.join(format!("{id}.json")))
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the live `StatsStatus` from the optional in-progress capture.
|
||||
fn status_of(live: Option<&Live>) -> StatsStatus {
|
||||
match live {
|
||||
Some(l) => StatsStatus {
|
||||
armed: true,
|
||||
sample_count: l.samples.len() as u32,
|
||||
started_unix_ms: l.started_unix_ms,
|
||||
kind: l.meta.as_ref().map(|m| m.kind.clone()).unwrap_or_default(),
|
||||
},
|
||||
None => StatsStatus {
|
||||
armed: false,
|
||||
sample_count: 0,
|
||||
started_unix_ms: 0,
|
||||
kind: String::new(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute the `CaptureMeta` for an in-progress or finalizing capture (id derived from the start
|
||||
/// time + negotiated mode; duration from the monotonic start).
|
||||
fn meta_of(live: &Live) -> CaptureMeta {
|
||||
let (kind, width, height, fps, codec, client) = match &live.meta {
|
||||
Some(m) => (
|
||||
m.kind.clone(),
|
||||
m.width,
|
||||
m.height,
|
||||
m.fps,
|
||||
m.codec.clone(),
|
||||
m.client.clone(),
|
||||
),
|
||||
None => (String::new(), 0, 0, 0, String::new(), String::new()),
|
||||
};
|
||||
CaptureMeta {
|
||||
id: capture_id(live.started_unix_ms, width, height),
|
||||
started_unix_ms: live.started_unix_ms,
|
||||
duration_ms: live.started.elapsed().as_millis() as u64,
|
||||
kind,
|
||||
width,
|
||||
height,
|
||||
fps,
|
||||
codec,
|
||||
client,
|
||||
sample_count: live.samples.len() as u32,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn temp_dir() -> PathBuf {
|
||||
// A per-call unique dir: a process-wide counter (NOT a timestamp, which collides when tests
|
||||
// run in parallel within the same millisecond — one test's cleanup would then wipe another's
|
||||
// dir mid-run).
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(0);
|
||||
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let p = std::env::temp_dir().join(format!("pf-stats-{}-{}", std::process::id(), n));
|
||||
let _ = std::fs::remove_dir_all(&p);
|
||||
p
|
||||
}
|
||||
|
||||
fn sample() -> StatsSample {
|
||||
StatsSample {
|
||||
t_ms: 0,
|
||||
session_id: 0,
|
||||
stages: vec![StageTiming {
|
||||
name: "capture".into(),
|
||||
p50_us: 100.0,
|
||||
p99_us: 200.0,
|
||||
}],
|
||||
fps: 60.0,
|
||||
repeat_fps: 0.0,
|
||||
mbps: 25.0,
|
||||
bitrate_kbps: 20_000,
|
||||
frames_dropped: 0,
|
||||
packets_dropped: 0,
|
||||
send_dropped: 0,
|
||||
fec_recovered: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arm_record_save_load_delete() {
|
||||
let dir = temp_dir();
|
||||
let rec = StatsRecorder::new(dir.clone());
|
||||
assert!(!rec.is_armed());
|
||||
assert!(!rec.status().armed);
|
||||
// A push while idle is a no-op (no live capture).
|
||||
rec.push_sample(0, sample());
|
||||
|
||||
let st = rec.start();
|
||||
assert!(st.armed);
|
||||
assert!(rec.is_armed());
|
||||
let sid = rec.register_session("native", 5120, 1440, 240, "hevc", "abcd");
|
||||
rec.push_sample(sid, sample());
|
||||
rec.push_sample(sid, sample());
|
||||
assert_eq!(rec.status().sample_count, 2);
|
||||
assert_eq!(rec.status().kind, "native");
|
||||
assert!(rec.live_snapshot().is_some());
|
||||
|
||||
let meta = rec.stop().unwrap().expect("a capture was recording");
|
||||
assert_eq!(meta.sample_count, 2);
|
||||
assert_eq!(meta.kind, "native");
|
||||
assert_eq!(meta.width, 5120);
|
||||
assert!(meta.id.ends_with("_5120x1440"), "id was {}", meta.id);
|
||||
assert!(!rec.is_armed());
|
||||
assert!(rec.live_snapshot().is_none());
|
||||
// Stop with nothing recording → Ok(None).
|
||||
assert!(rec.stop().unwrap().is_none());
|
||||
|
||||
// It is listed and loadable.
|
||||
let list = rec.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].id, meta.id);
|
||||
let loaded = rec.load(&meta.id).unwrap();
|
||||
assert_eq!(loaded.samples.len(), 2);
|
||||
assert_eq!(loaded.meta.codec, "hevc");
|
||||
|
||||
// Delete removes it; a second delete is NotFound.
|
||||
rec.delete(&meta.id).unwrap();
|
||||
assert!(rec.list().is_empty());
|
||||
assert_eq!(
|
||||
rec.delete(&meta.id).unwrap_err().kind(),
|
||||
std::io::ErrorKind::NotFound
|
||||
);
|
||||
let _ = std::fs::remove_dir_all(&dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_path_traversal_ids() {
|
||||
let dir = temp_dir();
|
||||
let rec = StatsRecorder::new(dir.clone());
|
||||
for bad in [
|
||||
"../secret",
|
||||
"..",
|
||||
".",
|
||||
"a/b",
|
||||
"a\\b",
|
||||
"",
|
||||
"/etc/passwd",
|
||||
"x/../../y",
|
||||
] {
|
||||
assert_eq!(
|
||||
rec.load(bad).unwrap_err().kind(),
|
||||
std::io::ErrorKind::NotFound,
|
||||
"load({bad:?}) must be rejected as NotFound"
|
||||
);
|
||||
assert_eq!(
|
||||
rec.delete(bad).unwrap_err().kind(),
|
||||
std::io::ErrorKind::NotFound,
|
||||
"delete({bad:?}) must be rejected as NotFound"
|
||||
);
|
||||
}
|
||||
let _ = std::fs::remove_dir_all(&dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn samples_are_bounded() {
|
||||
let dir = temp_dir();
|
||||
let rec = StatsRecorder::new(dir.clone());
|
||||
rec.start();
|
||||
for _ in 0..(MAX_SAMPLES + 50) {
|
||||
rec.push_sample(0, sample());
|
||||
}
|
||||
assert_eq!(rec.status().sample_count as usize, MAX_SAMPLES);
|
||||
let _ = std::fs::remove_dir_all(&dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn start_is_idempotent_while_armed() {
|
||||
let dir = temp_dir();
|
||||
let rec = StatsRecorder::new(dir.clone());
|
||||
rec.start();
|
||||
rec.register_session("native", 1920, 1080, 60, "hevc", "");
|
||||
rec.push_sample(0, sample());
|
||||
// A second start must NOT wipe the in-progress capture.
|
||||
let st = rec.start();
|
||||
assert!(st.armed);
|
||||
assert_eq!(st.sample_count, 1);
|
||||
let _ = std::fs::remove_dir_all(&dir);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user