feat(host,web): host log ring + GET /api/v1/logs + console Logs page

Remote debugging without shell access: a tracing layer tees every
event at DEBUG-and-up — independent of the RUST_LOG filter gating
stderr/host.log, so console-side debugging never needs a restart —
into a bounded in-memory ring (log_capture.rs, 4096 newest entries,
OnceLock singleton like config()), installed at both init sites
(stderr path in main, the Windows service file path). The mgmt API
serves it cursor-paged at GET /api/v1/logs?after=&limit= — bearer-only
and deliberately NOT on the mTLS cert allowlist (log lines can name
client identities and host paths). The web console grows a Logs page
(follow/pause · min-level filter · text search · eviction-gap badge);
polling self-paces: a non-empty page advances the after-cursor (new
query key → immediate refetch, drains backlogs), an empty page idles
at the 2s interval. OpenAPI regenerated; ring pagination/eviction,
layer wiring, and the authed route are unit-tested; Storybook story
included.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 16:33:16 +00:00
parent 7ced80c4e3
commit 8af1a15aa6
13 changed files with 816 additions and 12 deletions
+285
View File
@@ -0,0 +1,285 @@
//! In-memory capture of the host's own log stream for the web console.
//!
//! A `tracing` layer tees every event at DEBUG and above — independent of the `RUST_LOG` filter
//! that gates stderr/file output — into a bounded in-process ring, and the management API serves
//! it as `GET /api/v1/logs` (see `mgmt.rs`). That gives an operator the host's recent logs from
//! the web console without shell access to the box, which is where gamepad-driver / capture /
//! encoder failures otherwise go to die ("it just doesn't work" bug reports).
//!
//! The ring keeps the *newest* [`CAPACITY`] entries (a log tail — unlike the stats recorder,
//! which keeps the head of a capture). Readers poll with an `after` sequence cursor.
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
use utoipa::ToSchema;
/// Ring capacity — bounds memory at a few MB worst case ([`MAX_MSG`]-sized entries).
const CAPACITY: usize = 4096;
/// Per-entry message cap; log lines are short, anything longer is a payload dump we truncate.
const MAX_MSG: usize = 2048;
/// Hard cap on entries returned per poll (the client immediately re-polls to drain a backlog).
pub const MAX_PAGE: usize = 1000;
/// One captured log event.
#[derive(Serialize, Deserialize, ToSchema, Clone, Debug)]
pub struct LogEntry {
/// Monotonic sequence number (1-based) — pass the last one back as the `after` cursor.
pub seq: u64,
/// Unix timestamp in milliseconds.
pub ts_ms: u64,
/// `ERROR` | `WARN` | `INFO` | `DEBUG` | `TRACE`.
pub level: String,
/// The emitting module path (tracing target).
pub target: String,
/// The formatted message, structured fields appended as `key=value`.
pub msg: String,
}
/// One poll's worth of log entries.
#[derive(Serialize, Deserialize, ToSchema, Debug)]
pub struct LogPage {
pub entries: Vec<LogEntry>,
/// Cursor for the next poll (the last returned seq, or the request's `after` when empty).
pub next: u64,
/// True when entries between `after` and the first returned one were already evicted.
pub dropped: bool,
}
/// The process-wide log ring (see [`ring`]).
pub struct LogRing {
inner: Mutex<Inner>,
}
struct Inner {
entries: VecDeque<LogEntry>,
next_seq: u64,
}
impl LogRing {
fn new() -> Self {
Self {
inner: Mutex::new(Inner {
entries: VecDeque::with_capacity(CAPACITY),
next_seq: 1,
}),
}
}
/// `pub(crate)` for the mgmt handler tests; production entries only come from [`RingLayer`].
pub(crate) fn push(&self, level: &tracing::Level, target: &str, msg: String) {
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
let seq = inner.next_seq;
inner.next_seq += 1;
if inner.entries.len() == CAPACITY {
inner.entries.pop_front();
}
inner.entries.push_back(LogEntry {
seq,
ts_ms,
level: level.to_string(),
target: target.to_string(),
msg,
});
}
/// Entries with `seq > after`, oldest first, capped at `limit` (≤ [`MAX_PAGE`]).
pub fn since(&self, after: u64, limit: usize) -> LogPage {
let limit = limit.clamp(1, MAX_PAGE);
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
// Entries are seq-ordered and contiguous: index of the first wanted one is derivable.
let first_seq = inner.entries.front().map_or(inner.next_seq, |e| e.seq);
let dropped = after != 0 && after + 1 < first_seq;
let skip = after
.saturating_sub(first_seq)
.saturating_add(u64::from(after >= first_seq)) as usize;
let entries: Vec<LogEntry> = inner
.entries
.iter()
.skip(skip)
.take(limit)
.cloned()
.collect();
let next = entries.last().map_or(after, |e| e.seq);
LogPage {
entries,
next,
dropped,
}
}
}
/// The process-wide ring — a `OnceLock` singleton so the tracing layer (installed in `main()`
/// before any host state exists) and the mgmt handler share it without threading an `Arc`.
pub fn ring() -> &'static LogRing {
static RING: OnceLock<LogRing> = OnceLock::new();
RING.get_or_init(LogRing::new)
}
/// The tee: a `tracing_subscriber` layer pushing every event into [`ring`]. Install with a
/// per-layer `LevelFilter::DEBUG` so the ring sees DEBUG even when `RUST_LOG` keeps stderr at
/// `info` (remote debugging must not require a restart with a different env).
pub struct RingLayer;
impl<S: tracing::Subscriber> tracing_subscriber::Layer<S> for RingLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let meta = event.metadata();
let mut fields = FieldFmt::default();
event.record(&mut fields);
ring().push(meta.level(), meta.target(), fields.finish());
}
}
/// Formats an event's fields like the default fmt layer: the `message` field first, every other
/// field appended as ` key=value`.
#[derive(Default)]
struct FieldFmt {
msg: String,
fields: String,
}
impl tracing::field::Visit for FieldFmt {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
use std::fmt::Write;
if field.name() == "message" {
let _ = write!(self.msg, "{value:?}");
} else {
let _ = write!(self.fields, " {}={:?}", field.name(), value);
}
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
use std::fmt::Write;
if field.name() == "message" {
self.msg.push_str(value);
} else {
let _ = write!(self.fields, " {}={value}", field.name());
}
}
}
impl FieldFmt {
fn finish(mut self) -> String {
if self.msg.is_empty() {
self.msg = self.fields.trim_start().to_string();
} else {
self.msg.push_str(&self.fields);
}
if self.msg.len() > MAX_MSG {
let mut end = MAX_MSG;
while !self.msg.is_char_boundary(end) {
end -= 1;
}
self.msg.truncate(end);
self.msg.push('…');
}
self.msg
}
}
#[cfg(test)]
mod tests {
use super::*;
fn push_n(ring: &LogRing, n: usize) {
for i in 0..n {
ring.push(&tracing::Level::INFO, "test", format!("m{i}"));
}
}
#[test]
fn cursor_pagination_and_eviction() {
let ring = LogRing::new();
push_n(&ring, 10);
// Full backfill from 0.
let page = ring.since(0, 100);
assert_eq!(page.entries.len(), 10);
assert_eq!(page.next, 10);
assert!(!page.dropped);
// Incremental: nothing new.
let page = ring.since(10, 100);
assert!(page.entries.is_empty());
assert_eq!(page.next, 10);
// Incremental: partial.
let page = ring.since(4, 3);
assert_eq!(
page.entries.iter().map(|e| e.seq).collect::<Vec<_>>(),
vec![5, 6, 7]
);
assert_eq!(page.next, 7);
assert!(!page.dropped);
}
#[test]
fn eviction_reports_dropped() {
let ring = LogRing::new();
push_n(&ring, CAPACITY + 50);
// Seqs 1..=50 were evicted; a cursor inside the gap must flag it.
let page = ring.since(10, 5);
assert!(page.dropped);
assert_eq!(page.entries.first().map(|e| e.seq), Some(51));
// A cursor at the ring head is not a gap.
let head = ring.since(page.next, 5);
assert!(!head.dropped);
assert_eq!(head.entries.first().map(|e| e.seq), Some(page.next + 1));
}
#[test]
fn layer_captures_events_into_the_singleton_ring() {
use tracing_subscriber::layer::SubscriberExt;
// The singleton ring is process-wide — find its current tail first (parallel tests may
// interleave, so only assert on OUR event appearing after it).
let mut cur = 0;
loop {
let page = ring().since(cur, MAX_PAGE);
if page.entries.is_empty() {
break;
}
cur = page.next;
}
let subscriber = tracing_subscriber::registry().with(RingLayer);
tracing::subscriber::with_default(subscriber, || {
tracing::warn!(answer = 42, "ring layer test message");
});
let page = ring().since(cur, MAX_PAGE);
let hit = page
.entries
.iter()
.find(|e| e.msg.contains("ring layer test message"))
.expect("event captured");
assert_eq!(hit.level, "WARN");
assert!(
hit.msg.contains("answer=42"),
"fields appended: {}",
hit.msg
);
assert!(hit.target.contains("log_capture"), "target: {}", hit.target);
assert!(hit.ts_ms > 0);
}
#[test]
fn message_truncation_keeps_char_boundary() {
let f = FieldFmt {
msg: "ä".repeat(MAX_MSG), // 2 bytes each — exceeds the cap at a multi-byte boundary
..Default::default()
};
let out = f.finish();
assert!(out.ends_with('…'));
assert!(out.len() <= MAX_MSG + '…'.len_utf8());
}
}
+15 -3
View File
@@ -45,6 +45,7 @@ mod install;
#[path = "windows/interactive.rs"]
mod interactive;
mod library;
mod log_capture;
mod mgmt;
mod mgmt_token;
mod native_pairing;
@@ -92,9 +93,20 @@ fn main() {
service::init_file_logging(filter);
} else {
// Logs go to stderr so stdout stays machine-readable (`punktfunk-host openapi > spec.json`).
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_writer(std::io::stderr)
// A second layer tees DEBUG-and-up into the in-memory ring served by GET /api/v1/logs —
// deliberately not gated by RUST_LOG, so console-side debugging never needs a restart.
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
tracing_subscriber::registry()
.with(
log_capture::RingLayer.with_filter(tracing_subscriber::filter::LevelFilter::DEBUG),
)
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(filter),
)
.init();
}
+68 -2
View File
@@ -25,10 +25,11 @@ use crate::gamestream::{
tls::{serve_https, PeerAddr, PeerCertFingerprint},
AppState, APP_VERSION, AUDIO_PORT, CONTROL_PORT, GFE_VERSION, RTSP_PORT, VIDEO_PORT,
};
use crate::log_capture::LogPage;
use crate::stats_recorder::{Capture, CaptureMeta, StatsStatus};
use anyhow::{Context, Result};
use axum::{
extract::{Path, Request, State},
extract::{Path, Query, Request, State},
http::{header, Method, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
@@ -179,7 +180,8 @@ fn api_router_parts() -> (Router<Arc<MgmtState>>, utoipa::openapi::OpenApi) {
.routes(routes!(stats_capture_status))
.routes(routes!(stats_capture_live))
.routes(routes!(stats_recordings_list))
.routes(routes!(stats_recording_get, stats_recording_delete)),
.routes(routes!(stats_recording_get, stats_recording_delete))
.routes(routes!(logs_get)),
)
.split_for_parts()
}
@@ -213,6 +215,7 @@ pub fn openapi_json() -> String {
(name = "session", description = "Active streaming session control"),
(name = "library", description = "Game library: installed-store titles (Steam) plus user-curated custom entries"),
(name = "stats", description = "Streaming performance-stats capture: arm/stop a recording, read the live + saved time-series for graphing"),
(name = "logs", description = "Host log stream: the newest in-memory log entries, cursor-paged for live following"),
)
)]
struct ApiDoc;
@@ -1730,6 +1733,39 @@ async fn stats_recording_delete(
}
}
/// Query for `GET /logs` — a cursor poll.
#[derive(Deserialize)]
struct LogsQuery {
after: Option<u64>,
limit: Option<u32>,
}
/// Host logs
///
/// The host's recent log entries — an in-memory ring of the newest few thousand, captured at
/// DEBUG and above regardless of `RUST_LOG`. Follow live by polling with `after` set to the last
/// response's `next` cursor; a `dropped: true` means entries were evicted between polls (the ring
/// wrapped). Bearer-only: logs can reference client identities and host paths, so this is part of
/// the loopback-only admin surface, never the LAN-readable mTLS one.
#[utoipa::path(
get,
path = "/logs",
tag = "logs",
operation_id = "logsGet",
params(
("after" = Option<u64>, Query, description = "Return entries with seq greater than this (omitted/0 = oldest retained)"),
("limit" = Option<u32>, Query, description = "Max entries per response (default and cap 1000)"),
),
responses(
(status = OK, description = "Entries after the cursor, oldest first", body = LogPage),
(status = UNAUTHORIZED, description = "Missing or invalid bearer token", body = ApiError),
)
)]
async fn logs_get(Query(q): Query<LogsQuery>) -> Json<LogPage> {
let limit = q.limit.map_or(crate::log_capture::MAX_PAGE, |l| l as usize);
Json(crate::log_capture::ring().since(q.after.unwrap_or(0), limit))
}
// ---------------------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------------------
@@ -2509,4 +2545,34 @@ mod tests {
.await;
assert_eq!(s, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn logs_endpoint_pages_by_cursor() {
let app = test_app(test_state(), None);
// The ring is a process-wide singleton — start from wherever its cursor currently is.
let (s, json) = send(&app, get_req("/api/v1/logs")).await;
assert_eq!(s, StatusCode::OK);
let start = json["next"].as_u64().unwrap();
let ring = crate::log_capture::ring();
ring.push(&tracing::Level::WARN, "mgmt::tests", "first".into());
ring.push(&tracing::Level::INFO, "mgmt::tests", "second".into());
let (s, json) = send(&app, get_req(&format!("/api/v1/logs?after={start}"))).await;
assert_eq!(s, StatusCode::OK);
let entries = json["entries"].as_array().unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0]["msg"], "first");
assert_eq!(entries[0]["level"], "WARN");
assert_eq!(json["next"].as_u64().unwrap(), start + 2);
assert_eq!(json["dropped"], false);
// Nothing newer → empty page, cursor unchanged.
let after = start + 2;
let (s, json) = send(&app, get_req(&format!("/api/v1/logs?after={after}"))).await;
assert_eq!(s, StatusCode::OK);
assert!(json["entries"].as_array().unwrap().is_empty());
assert_eq!(json["next"].as_u64().unwrap(), after);
}
}
+22 -7
View File
@@ -130,23 +130,38 @@ fn host_log_path() -> PathBuf {
/// Initialise tracing to the service log file (the SCM gives the service no console/stderr). Falls
/// back to stderr if the file can't be opened. Called from `main()` only for `service run`.
/// Also tees into the in-memory log ring (`log_capture`), like the stderr path in `main()` — the
/// supervisor serves no mgmt API itself, but the layer is harmless and keeps both inits uniform.
pub fn init_file_logging(filter: tracing_subscriber::EnvFilter) {
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
let ring =
crate::log_capture::RingLayer.with_filter(tracing_subscriber::filter::LevelFilter::DEBUG);
match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(service_log_path())
{
Ok(file) => {
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_ansi(false)
.with_writer(move || file.try_clone().expect("clone service log handle"))
tracing_subscriber::registry()
.with(ring)
.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(move || file.try_clone().expect("clone service log handle"))
.with_filter(filter),
)
.init();
}
Err(_) => {
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_writer(std::io::stderr)
tracing_subscriber::registry()
.with(ring)
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(filter),
)
.init();
}
}