feat: M2 — host productionization: app catalog, persistent pairing, quit semantics, systemd (Phase 4)

- gamestream/apps.rs: an app catalog (loaded from ~/.config/lumen/apps.json, with defaults:
  Desktop + gamescope entries when gamescope/steam/vkcube are installed). /applist renders
  it; /launch?appid=N selects the entry; RTSP PLAY resolves it and the stream honors the
  app's compositor + nested command — so a Moonlight client picks "Steam" and gets a
  gamescope session at its native resolution, or "Desktop" for the KWin/GNOME desktop.
- Persistent pairing: the paired-client cert allow-list now survives restarts
  (~/.config/lumen/paired.json), saved on each successful pairing, loaded at boot.
- Quit semantics: /cancel now actually stops the media threads (streaming/audio flags),
  tearing down the per-session virtual output / gamescope process via the capturer's RAII.
- scripts/lumen-host.service (systemd user unit) + scripts/host.env.example (config file
  consumed by it) — the host runs as a managed service instead of an SSH shell.

Smoke-tested: serve boots, /applist serves the catalog (Desktop + vkcube gamescope entry
auto-detected on this box). GNOME backend validation still pending gnome-shell install;
wlroots vdisplay backend deliberately deferred (not in the priority compositor trio).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 23:23:53 +00:00
parent 826da9968e
commit 1eeb35a723
8 changed files with 270 additions and 7 deletions
+143
View File
@@ -0,0 +1,143 @@
//! The app catalog: what `/applist` advertises and what `/launch?appid=N` selects. Each entry
//! maps to a session recipe — which compositor backend hosts it and (for gamescope) which
//! command runs nested. Loaded from `~/.config/lumen/apps.json`; sensible defaults otherwise.
//!
//! ```json
//! [ {"id":1,"title":"Desktop"},
//! {"id":2,"title":"Steam","compositor":"gamescope","cmd":"steam -gamepadui"} ]
//! ```
use serde_json::Value;
#[derive(Clone, Debug)]
pub struct AppEntry {
pub id: u32,
pub title: String,
/// `None` = auto-detect (the desktop session's compositor).
pub compositor: Option<crate::vdisplay::Compositor>,
/// Command gamescope runs nested (gamescope entries only).
pub cmd: Option<String>,
}
fn config_path() -> Option<std::path::PathBuf> {
Some(std::path::Path::new(&std::env::var("HOME").ok()?).join(".config/lumen/apps.json"))
}
fn parse_compositor(s: &str) -> Option<crate::vdisplay::Compositor> {
use crate::vdisplay::Compositor::*;
match s.to_ascii_lowercase().as_str() {
"kwin" | "kde" => Some(Kwin),
"mutter" | "gnome" => Some(Mutter),
"gamescope" => Some(Gamescope),
"wlroots" | "sway" => Some(Wlroots),
_ => None,
}
}
/// The catalog: the user's `apps.json` if present, else defaults (Desktop, plus gamescope
/// entries when gamescope is installed).
pub fn catalog() -> Vec<AppEntry> {
if let Some(path) = config_path() {
if let Ok(raw) = std::fs::read_to_string(&path) {
match serde_json::from_str::<Value>(&raw) {
Ok(Value::Array(items)) => {
let apps: Vec<AppEntry> = items
.iter()
.filter_map(|it| {
Some(AppEntry {
id: it.get("id")?.as_u64()? as u32,
title: it.get("title")?.as_str()?.to_string(),
compositor: it
.get("compositor")
.and_then(|c| c.as_str())
.and_then(parse_compositor),
cmd: it.get("cmd").and_then(|c| c.as_str()).map(String::from),
})
})
.collect();
if !apps.is_empty() {
return apps;
}
tracing::warn!(path = %path.display(), "apps.json parsed to zero entries — using defaults");
}
_ => {
tracing::warn!(path = %path.display(), "apps.json malformed — using defaults")
}
}
}
}
let mut apps = vec![AppEntry {
id: 1,
title: "Desktop".into(),
compositor: None,
cmd: None,
}];
if which("gamescope") {
if which("steam") {
apps.push(AppEntry {
id: 2,
title: "Steam".into(),
compositor: Some(crate::vdisplay::Compositor::Gamescope),
cmd: Some("steam -gamepadui".into()),
});
}
if which("vkcube") {
apps.push(AppEntry {
id: 3,
title: "vkcube (test)".into(),
compositor: Some(crate::vdisplay::Compositor::Gamescope),
cmd: Some("vkcube".into()),
});
}
}
apps
}
pub fn by_id(id: u32) -> Option<AppEntry> {
catalog().into_iter().find(|a| a.id == id)
}
/// Render the GameStream `/applist` XML.
pub fn applist_xml() -> String {
let mut xml =
String::from("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<root status_code=\"200\">\n");
for app in catalog() {
xml.push_str(&format!(
"<App>\n<IsHdrSupported>0</IsHdrSupported>\n<AppTitle>{}</AppTitle>\n<ID>{}</ID>\n</App>\n",
xml_escape(&app.title),
app.id
));
}
xml.push_str("</root>\n");
xml
}
fn xml_escape(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}
fn which(bin: &str) -> bool {
std::env::var_os("PATH")
.is_some_and(|paths| std::env::split_paths(&paths).any(|d| d.join(bin).is_file()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_catalog_has_desktop() {
let apps = catalog();
assert!(apps.iter().any(|a| a.id == 1 && a.title == "Desktop"));
}
#[test]
fn applist_xml_is_wellformed_ish() {
let xml = applist_xml();
assert!(xml.contains("<AppTitle>Desktop</AppTitle>"));
assert!(xml.starts_with("<?xml"));
assert_eq!(xml.matches("<App>").count(), xml.matches("</App>").count());
}
}
+45 -1
View File
@@ -6,6 +6,7 @@
//! Status: P1.1 — mDNS `_nvstream._tcp` advertisement + `/serverinfo`. Pairing, RTSP, and //! Status: P1.1 — mDNS `_nvstream._tcp` advertisement + `/serverinfo`. Pairing, RTSP, and
//! the media streams follow (see the M2 task list / plan). //! the media streams follow (see the M2 task list / plan).
pub mod apps;
mod audio; mod audio;
mod cert; mod cert;
mod control; mod control;
@@ -73,6 +74,8 @@ pub struct LaunchSession {
pub width: u32, pub width: u32,
pub height: u32, pub height: u32,
pub fps: u32, pub fps: u32,
/// `/launch?appid=N` — selects the app-catalog entry (session recipe).
pub appid: u32,
} }
/// Shared control-plane state used as the axum app state. /// Shared control-plane state used as the axum app state.
@@ -110,7 +113,7 @@ pub fn serve() -> Result<()> {
host, host,
identity, identity,
pairing: pairing::Pairing::new(), pairing: pairing::Pairing::new(),
paired: std::sync::Mutex::new(Vec::new()), paired: std::sync::Mutex::new(load_paired()),
launch: std::sync::Mutex::new(None), launch: std::sync::Mutex::new(None),
stream: std::sync::Mutex::new(None), stream: std::sync::Mutex::new(None),
streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
@@ -177,3 +180,44 @@ fn primary_local_ip() -> Option<IpAddr> {
sock.connect("8.8.8.8:80").ok()?; sock.connect("8.8.8.8:80").ok()?;
sock.local_addr().ok().map(|a| a.ip()) sock.local_addr().ok().map(|a| a.ip())
} }
/// Where the paired-client allow-list persists (survives host restarts, like Sunshine).
fn paired_path() -> Option<std::path::PathBuf> {
Some(std::path::Path::new(&std::env::var("HOME").ok()?).join(".config/lumen/paired.json"))
}
/// Load the persisted paired-client certificate DERs (empty on first run / parse failure).
fn load_paired() -> Vec<Vec<u8>> {
let Some(path) = paired_path() else {
return Vec::new();
};
let Ok(raw) = std::fs::read(&path) else {
return Vec::new();
};
match serde_json::from_slice::<Vec<Vec<u8>>>(&raw) {
Ok(v) => {
tracing::info!(clients = v.len(), "loaded persisted pairings");
v
}
Err(e) => {
tracing::warn!(error = %e, "paired.json unreadable — starting unpaired");
Vec::new()
}
}
}
/// Persist the paired-client allow-list (called after each successful pairing).
pub(crate) fn save_paired(paired: &[Vec<u8>]) {
let Some(path) = paired_path() else { return };
if let Some(dir) = path.parent() {
let _ = std::fs::create_dir_all(dir);
}
match serde_json::to_vec(paired) {
Ok(bytes) => {
if let Err(e) = std::fs::write(&path, bytes) {
tracing::warn!(error = %e, "persisting pairings failed");
}
}
Err(e) => tracing::warn!(error = %e, "serializing pairings failed"),
}
}
+10 -2
View File
@@ -81,7 +81,7 @@ async fn h_pin(
async fn h_applist(State(_st): State<Arc<AppState>>) -> impl IntoResponse { async fn h_applist(State(_st): State<Arc<AppState>>) -> impl IntoResponse {
// One app for now: the headless desktop (the wlroots virtual output). // One app for now: the headless desktop (the wlroots virtual output).
xml("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<root status_code=\"200\">\n<App>\n<IsHdrSupported>0</IsHdrSupported>\n<AppTitle>Desktop</AppTitle>\n<ID>1</ID>\n</App>\n</root>\n".to_string()) xml(super::apps::applist_xml())
} }
async fn h_launch( async fn h_launch(
@@ -118,7 +118,13 @@ async fn h_resume(State(st): State<Arc<AppState>>) -> impl IntoResponse {
async fn h_cancel(State(st): State<Arc<AppState>>) -> impl IntoResponse { async fn h_cancel(State(st): State<Arc<AppState>>) -> impl IntoResponse {
*st.launch.lock().unwrap() = None; *st.launch.lock().unwrap() = None;
tracing::info!("cancel — launch session cleared"); // Quit semantics: stop the running media threads (they observe these flags) so the session
// actually ends — the virtual output/gamescope teardown follows via the capturer's RAII.
st.streaming
.store(false, std::sync::atomic::Ordering::SeqCst);
st.audio_streaming
.store(false, std::sync::atomic::Ordering::SeqCst);
tracing::info!("cancel — launch session cleared, streams stopping");
xml("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<root status_code=\"200\"><cancel>1</cancel></root>\n".to_string()) xml("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<root status_code=\"200\"><cancel>1</cancel></root>\n".to_string())
} }
@@ -137,12 +143,14 @@ fn launch(_st: &AppState, q: &HashMap<String, String>) -> Result<LaunchSession>
.get("mode") .get("mode")
.and_then(|m| parse_mode(m)) .and_then(|m| parse_mode(m))
.unwrap_or((1920, 1080, 60)); .unwrap_or((1920, 1080, 60));
let appid = q.get("appid").and_then(|s| s.parse().ok()).unwrap_or(1);
Ok(LaunchSession { Ok(LaunchSession {
gcm_key, gcm_key,
rikeyid, rikeyid,
width, width,
height, height,
fps, fps,
appid,
}) })
} }
+5 -1
View File
@@ -206,7 +206,11 @@ impl Pairing {
let hash_ok = expected[..] == s.client_hash[..]; let hash_ok = expected[..] == s.client_hash[..];
let sig_ok = verify256(&s.client_pubkey, client_secret, client_sig).is_ok(); let sig_ok = verify256(&s.client_pubkey, client_secret, client_sig).is_ok();
if hash_ok && sig_ok { if hash_ok && sig_ok {
paired_store.lock().unwrap().push(s.client_cert_der.clone()); {
let mut store = paired_store.lock().unwrap();
store.push(s.client_cert_der.clone());
super::save_paired(&store);
}
tracing::info!(uniqueid, "pairing phase 4 — SUCCESS, client cert pinned"); tracing::info!(uniqueid, "pairing phase 4 — SUCCESS, client cert pinned");
Ok(paired_xml("", true)) Ok(paired_xml("", true))
} else { } else {
+9 -1
View File
@@ -166,9 +166,17 @@ fn handle_request(req: &Request, state: &AppState) -> String {
let cfg = *state.stream.lock().unwrap(); let cfg = *state.stream.lock().unwrap();
match cfg { match cfg {
Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => { Some(cfg) if !state.streaming.swap(true, Ordering::SeqCst) => {
tracing::info!("RTSP PLAY — starting video stream"); // Resolve the launched catalog entry (session recipe) for the stream.
let app = state
.launch
.lock()
.unwrap()
.map(|l| l.appid)
.and_then(super::apps::by_id);
tracing::info!(app = ?app.as_ref().map(|a| &a.title), "RTSP PLAY — starting video stream");
stream::start( stream::start(
cfg, cfg,
app,
state.streaming.clone(), state.streaming.clone(),
state.force_idr.clone(), state.force_idr.clone(),
state.video_cap.clone(), state.video_cap.clone(),
+15 -2
View File
@@ -36,6 +36,7 @@ pub type CapturerSlot = Arc<std::sync::Mutex<Option<Box<dyn Capturer>>>>;
/// the persistent capturer the thread borrows for the stream's duration. /// the persistent capturer the thread borrows for the stream's duration.
pub fn start( pub fn start(
cfg: StreamConfig, cfg: StreamConfig,
app: Option<super::apps::AppEntry>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
force_idr: Arc<AtomicBool>, force_idr: Arc<AtomicBool>,
video_cap: CapturerSlot, video_cap: CapturerSlot,
@@ -44,7 +45,7 @@ pub fn start(
.name("lumen-video".into()) .name("lumen-video".into())
.spawn(move || { .spawn(move || {
tracing::info!(?cfg, "video stream starting"); tracing::info!(?cfg, "video stream starting");
if let Err(e) = run(cfg, &running, &force_idr, &video_cap) { if let Err(e) = run(cfg, app.as_ref(), &running, &force_idr, &video_cap) {
tracing::error!(error = %format!("{e:#}"), "video stream failed"); tracing::error!(error = %format!("{e:#}"), "video stream failed");
} }
running.store(false, Ordering::SeqCst); running.store(false, Ordering::SeqCst);
@@ -54,6 +55,7 @@ pub fn start(
fn run( fn run(
cfg: StreamConfig, cfg: StreamConfig,
app: Option<&super::apps::AppEntry>,
running: &Arc<AtomicBool>, running: &Arc<AtomicBool>,
force_idr: &AtomicBool, force_idr: &AtomicBool,
video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>, video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>,
@@ -82,9 +84,20 @@ fn run(
// `video_cap`, since a reconnect at a different resolution needs a freshly-sized output; the // `video_cap`, since a reconnect at a different resolution needs a freshly-sized output; the
// output is released when this capturer drops at stream end (RAII via its keepalive). // output is released when this capturer drops at stream end (RAII via its keepalive).
if std::env::var("LUMEN_VIDEO_SOURCE").as_deref() == Ok("virtual") { if std::env::var("LUMEN_VIDEO_SOURCE").as_deref() == Ok("virtual") {
let compositor = crate::vdisplay::detect().context("detect compositor")?; // The launched app picks the compositor (e.g. gamescope for game entries) and the
// nested command; env vars remain manual overrides / fallbacks.
let compositor = app
.and_then(|a| a.compositor)
.map(Ok)
.unwrap_or_else(|| crate::vdisplay::detect().context("detect compositor"))?;
if let Some(cmd) = app.and_then(|a| a.cmd.as_deref()) {
// The gamescope backend reads the nested command from this env var; setting it
// per-launch is safe (one stream session at a time).
std::env::set_var("LUMEN_GAMESCOPE_APP", cmd);
}
tracing::info!( tracing::info!(
?compositor, ?compositor,
app = ?app.map(|a| &a.title),
w = cfg.width, w = cfg.width,
h = cfg.height, h = cfg.height,
"video source: virtual display (native client resolution)" "video source: virtual display (native client resolution)"
+22
View File
@@ -0,0 +1,22 @@
# lumen host configuration (~/.config/lumen/host.env) — consumed by lumen-host.service.
# Session / compositor environment (headless KWin example).
XDG_RUNTIME_DIR=/run/user/1000
DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus
WAYLAND_DISPLAY=wayland-kde
XDG_CURRENT_DESKTOP=KDE
# Video source: `virtual` creates a per-client virtual output at the client's exact
# resolution+refresh (the flagship mode); `portal` captures an existing monitor.
LUMEN_VIDEO_SOURCE=virtual
# GPU zero-copy capture (EGL/Vulkan → CUDA → NVENC). Falls back to CPU automatically.
LUMEN_ZEROCOPY=1
# Optional overrides (apps.json is the primary mechanism for per-app settings):
#LUMEN_COMPOSITOR=kwin # kwin | mutter | gamescope | wlroots
#LUMEN_GAMESCOPE_APP=vkcube # nested command for ad-hoc gamescope sessions
#LUMEN_INPUT_BACKEND=libei # wlr | libei | gamescope | uinput
#LUMEN_FEC_PCT=20 # video FEC overhead percent
#LUMEN_PERF=1 # per-stage timing logs
#RUST_LOG=info
+21
View File
@@ -0,0 +1,21 @@
# lumen streaming host — systemd USER unit.
#
# Install:
# mkdir -p ~/.config/systemd/user && cp scripts/lumen-host.service ~/.config/systemd/user/
# cp scripts/host.env.example ~/.config/lumen/host.env # then edit
# systemctl --user daemon-reload && systemctl --user enable --now lumen-host
#
# The unit assumes the compositor session (e.g. headless KWin on wayland-kde) is already up;
# for a fully self-contained appliance, pair it with a kwin_wayland user unit it can After=.
[Unit]
Description=lumen GameStream host
After=pipewire.service
[Service]
EnvironmentFile=%h/.config/lumen/host.env
ExecStart=%h/lumen/target/release/lumen-host serve
Restart=on-failure
RestartSec=2
[Install]
WantedBy=default.target