feat(host): Apollo-backlog hardening — cert gate, NVENC RFI, media QoS, async injector
A pass over the apollo-comparison backlog (re-verified against current code). Lands four items end-to-end plus a Windows-DualSense scoping doc. - #5/#92/#26 — GameStream paired-cert allow-list. tls.rs surfaces the verified peer cert to handlers (serve_https + PeerCertFingerprint, now shared with the mgmt API instead of duplicated); nvhttp gates /launch /resume /applist /cancel on AppState.paired and reports a real PairStatus; save_paired writes atomically (temp+rename). Closes the "mTLS accepts any client cert" hole. + regression test. - #6/#51/#19/#22 — NVENC caps query -> reference-frame invalidation. nvenc.rs query_caps probes nvEncGetEncodeCaps (max dims / 10-bit / custom-VBV / RFI), rejecting over-range modes and degrading 10-bit->8-bit instead of an opaque InvalidParam. New Encoder::invalidate_ref_frames (default false -> caller keyframes); the Windows NVENC path implements real RFI (multi-ref DPB + nvEncInvalidateRefFrames, dedup + IDR-on-overflow). control.rs decodes the 0x0301 lost-frame range (Apollo's IDX_INVALIDATE_REF_FRAMES) -> AppState.rfi_range -> encode loop, falling back to a keyframe. NOTE: the Windows NVENC impl is RTX-box/CI-pending (can't compile on Linux); adversarially reviewed vs the SDK. - #43/#72 — media socket QoS + buffer growth. New punktfunk_core::transport::qos: grow_socket_buffers (factored out the native plane's 32MB SO_SNDBUF growth so the GameStream sockets reuse it) + set_media_qos (opt-in PUNKTFUNK_DSCP=1: DSCP CS5 video / CS6 audio + Linux SO_PRIORITY, Apollo's scheme). Wired into UdpTransport and the GameStream video/audio sockets. Windows IP_TOS needs qWAVE (follow-up). - #8/#45 — GameStream input injection off the ENet service thread. on_receive no longer injects inline (a slow inject head-blocked ENet keepalive/retransmit); it forwards to a dedicated injector thread. The hardened InjectorService moved from punktfunk1 into crate::inject (shared by both planes) + a coalesce step that sums adjacent relative-mouse/scroll deltas while preserving button/key/abs ordering. Docs: re-verified apollo-comparison.md status (22 items already done/obsolete since the snapshot) + windows-dualsense-scoping.md (ViGEm can't emulate a DualSense; real DS5 on Windows needs a VHF virtual-HID driver — web-research pass pending). fmt + clippy -D warnings clean; full workspace test suite green; no C-ABI/OpenAPI drift. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -31,7 +31,9 @@ fec-rs = { path = "vendor/fec-rs" }
|
||||
aes-gcm = "0.10" # AES-128-GCM session crypto, matches GameStream
|
||||
zerocopy = { version = "0.8", features = ["derive"] }
|
||||
bytes = "1"
|
||||
socket2 = "0.6" # set SO_SNDBUF/SO_RCVBUF — default UDP buffers are too small for 4K/5K frame bursts
|
||||
socket2 = { version = "0.6", features = [
|
||||
"all",
|
||||
] } # SO_SNDBUF/SO_RCVBUF growth (default UDP buffers too small for 4K/5K bursts) + DSCP/SO_PRIORITY media QoS
|
||||
thiserror = "2"
|
||||
tracing = { version = "0.1", default-features = false, features = ["std"] }
|
||||
rand = "0.9"
|
||||
|
||||
@@ -2,9 +2,11 @@
|
||||
//! directly — no async runtime is involved.
|
||||
|
||||
mod loopback;
|
||||
mod qos;
|
||||
mod udp;
|
||||
|
||||
pub use loopback::{loopback_pair, LoopbackTransport};
|
||||
pub use qos::{grow_socket_buffers, set_media_qos, MediaClass};
|
||||
/// Windows-only: reusable USO (UDP Send Offload) batch send for callers that own their own connected
|
||||
/// socket (the GameStream video sender) rather than going through [`UdpTransport`].
|
||||
#[cfg(target_os = "windows")]
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
//! Shared UDP socket tuning for the media planes: send/recv buffer growth + best-effort link-layer
|
||||
//! QoS.
|
||||
//!
|
||||
//! [`grow_socket_buffers`] is the `SO_SNDBUF`/`SO_RCVBUF` growth the native data plane applies; the
|
||||
//! GameStream video/audio sockets reuse it so they don't go ENOBUFS-bound at high bitrate.
|
||||
//!
|
||||
//! [`set_media_qos`] DSCP-tags the latency-sensitive video/audio traffic (+ Linux `SO_PRIORITY`) so a
|
||||
//! QoS-aware path (Wi-Fi WMM access categories, a managed switch, a shaped uplink) can prioritize it
|
||||
//! over bulk flows. Mirrors what Apollo/Sunshine tag — DSCP **CS5** for video, **CS6** for audio. It
|
||||
//! is **opt-in** (`PUNKTFUNK_DSCP=1`): DSCP can interact badly with some consumer ISPs/routers, and on
|
||||
//! Windows a plain `IP_TOS` is silently stripped unless a qWAVE policy is active (Apollo uses the
|
||||
//! qWAVE API there — that port is a follow-up; today this is a no-op on the wire on Windows).
|
||||
|
||||
use std::net::UdpSocket;
|
||||
|
||||
/// Target kernel socket-buffer size (`SO_SNDBUF`/`SO_RCVBUF`). A high-resolution frame is a burst (a
|
||||
/// 5120×1440 keyframe is ~130 packets the send thread hands to `sendmmsg` at once); the default UDP
|
||||
/// buffer (~208 KB on Linux) overflows on it, which EAGAINs the host send (dropping packets) or drops
|
||||
/// on the client recv — and with infinite-GOP a single lost frame freezes the decode until the next
|
||||
/// RFI refresh. Requested large; the OS clamps to `net.core.{wmem,rmem}_max` (Linux) /
|
||||
/// `kern.ipc.maxsockbuf` (macOS).
|
||||
///
|
||||
/// Sized for 1 Gbps+: at ~1.2 Gbps on the wire an 8 MB buffer is only ~49 ms of steady state, and a
|
||||
/// single multi-MB IDR keyframe (~4 MB ≈ 3300 packets) instantly fills most of it. 32 MB gives ~200 ms
|
||||
/// of headroom and absorbs a keyframe burst without EAGAIN/ENOBUFS drops. (Paced sending —
|
||||
/// `punktfunk1.rs::paced_submit` — spreads a big frame's overflow, so this buffer mostly absorbs the
|
||||
/// immediate microburst rather than a whole unpaced frame.)
|
||||
pub(crate) const TARGET_SOCKBUF: usize = 32 * 1024 * 1024;
|
||||
|
||||
/// Best-effort grow of `SO_SNDBUF`/`SO_RCVBUF` to [`TARGET_SOCKBUF`]. A failure isn't fatal (the
|
||||
/// stream just runs lossier); a grant far below the request means the OS cap is too low for clean
|
||||
/// 4K/5K streaming, so warn with the knob to raise.
|
||||
pub fn grow_socket_buffers(socket: &UdpSocket) {
|
||||
let sock = socket2::SockRef::from(socket);
|
||||
let _ = sock.set_send_buffer_size(TARGET_SOCKBUF);
|
||||
let _ = sock.set_recv_buffer_size(TARGET_SOCKBUF);
|
||||
// The kernel reports back the (possibly clamped, Linux-doubled) granted size.
|
||||
let granted = sock
|
||||
.send_buffer_size()
|
||||
.unwrap_or(0)
|
||||
.min(sock.recv_buffer_size().unwrap_or(0));
|
||||
if granted < TARGET_SOCKBUF / 4 {
|
||||
tracing::warn!(
|
||||
granted_kb = granted / 1024,
|
||||
"UDP socket buffer capped well below target — high-resolution streaming may drop \
|
||||
frames; raise net.core.wmem_max / net.core.rmem_max (Linux) for clean 4K/5K"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Media class of a socket — selects the DSCP code point (and Linux `SO_PRIORITY`), matching Apollo's
|
||||
/// mapping: video = CS5, audio = CS6.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum MediaClass {
|
||||
Video,
|
||||
Audio,
|
||||
}
|
||||
|
||||
impl MediaClass {
|
||||
/// DSCP code point (the high 6 bits of the IPv4 TOS / IPv6 traffic-class byte).
|
||||
const fn dscp(self) -> u32 {
|
||||
match self {
|
||||
MediaClass::Video => 40, // CS5
|
||||
MediaClass::Audio => 48, // CS6
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether DSCP/QoS marking is enabled (`PUNKTFUNK_DSCP=1`). Off by default.
|
||||
pub(crate) fn dscp_enabled() -> bool {
|
||||
matches!(
|
||||
std::env::var("PUNKTFUNK_DSCP").as_deref(),
|
||||
Ok("1") | Ok("true") | Ok("on")
|
||||
)
|
||||
}
|
||||
|
||||
/// Best-effort: tag `socket`'s outgoing packets for prioritized delivery of its media class. A no-op
|
||||
/// unless `PUNKTFUNK_DSCP=1`. Every step is best-effort (failures logged at debug, never fatal) — QoS
|
||||
/// is a nicety, not required for correctness.
|
||||
///
|
||||
/// IPv4 only (all current media sockets bind `0.0.0.0`); a v6 socket simply isn't tagged. On Windows
|
||||
/// the `IP_TOS` set succeeds but the OS doesn't tag the wire without a qWAVE policy (follow-up).
|
||||
pub fn set_media_qos(socket: &UdpSocket, class: MediaClass) {
|
||||
if dscp_enabled() {
|
||||
apply_media_qos(socket, class);
|
||||
}
|
||||
}
|
||||
|
||||
/// The unconditional QoS application, factored out of [`set_media_qos`] so it is directly testable
|
||||
/// without touching the process-global `PUNKTFUNK_DSCP` env. Best-effort (every step logs-and-continues).
|
||||
fn apply_media_qos(socket: &UdpSocket, class: MediaClass) {
|
||||
let sock = socket2::SockRef::from(socket);
|
||||
// DSCP occupies the high 6 bits of the TOS byte → shift left 2.
|
||||
if let Err(e) = sock.set_tos_v4(class.dscp() << 2) {
|
||||
tracing::debug!(error = %e, ?class, "set IP_TOS (DSCP) failed — QoS marking skipped");
|
||||
}
|
||||
// SO_PRIORITY must be set AFTER IP_TOS (setting TOS resets SO_PRIORITY to 0 on Linux). Linux-only;
|
||||
// 6 is the highest priority allowed without CAP_NET_ADMIN, so video=5 / audio=6 (Apollo's scheme).
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let prio = match class {
|
||||
MediaClass::Video => 5,
|
||||
MediaClass::Audio => 6,
|
||||
};
|
||||
if let Err(e) = sock.set_priority(prio) {
|
||||
tracing::debug!(error = %e, "set SO_PRIORITY failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn dscp_code_points_match_apollo() {
|
||||
// CS5 video / CS6 audio, shifted into the TOS byte (high 6 bits).
|
||||
assert_eq!(MediaClass::Video.dscp(), 40);
|
||||
assert_eq!(MediaClass::Audio.dscp(), 48);
|
||||
assert_eq!(MediaClass::Video.dscp() << 2, 0xA0);
|
||||
assert_eq!(MediaClass::Audio.dscp() << 2, 0xC0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn qos_and_buffer_growth_are_best_effort_and_never_panic() {
|
||||
let sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
// No PUNKTFUNK_DSCP in the test env → early return; must not panic regardless.
|
||||
set_media_qos(&sock, MediaClass::Video);
|
||||
set_media_qos(&sock, MediaClass::Audio);
|
||||
grow_socket_buffers(&sock);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn apply_qos_tags_the_socket() {
|
||||
// Exercise the enabled path directly (no env), and read the options back where we can.
|
||||
let sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
apply_media_qos(&sock, MediaClass::Video);
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let s = socket2::SockRef::from(&sock);
|
||||
assert_eq!(s.tos_v4().unwrap(), 0xA0, "video → CS5 in the TOS byte");
|
||||
assert_eq!(s.priority().unwrap(), 5, "video → SO_PRIORITY 5");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -413,26 +413,15 @@ pub struct UdpTransport {
|
||||
}
|
||||
|
||||
impl UdpTransport {
|
||||
/// Target kernel socket-buffer size. A high-resolution frame is a burst (a 5120×1440
|
||||
/// keyframe is ~130 packets the send thread hands to `sendmmsg` at once); the default
|
||||
/// UDP buffer (~208 KB on Linux) overflows on it, which EAGAINs the host send (dropping
|
||||
/// packets) or drops on the client recv — and with infinite-GOP a single lost frame
|
||||
/// freezes the decode until the next RFI refresh. Requested large; the OS clamps to
|
||||
/// `net.core.{wmem,rmem}_max` (Linux) / `kern.ipc.maxsockbuf` (macOS).
|
||||
///
|
||||
/// Sized for 1 Gbps+: at ~1.2 Gbps on the wire an 8 MB buffer is only ~49 ms of steady state,
|
||||
/// and a single multi-MB IDR keyframe (~4 MB ≈ 3300 packets) instantly fills most of it. 32 MB
|
||||
/// gives ~200 ms of headroom and absorbs a keyframe burst without EAGAIN drops. (Paced sending
|
||||
/// — `punktfunk1.rs::paced_submit` — now spreads a big frame's overflow, so this buffer mostly absorbs
|
||||
/// the immediate microburst rather than a whole unpaced frame.)
|
||||
const TARGET_SOCKBUF: usize = 32 * 1024 * 1024;
|
||||
|
||||
/// Bind `local` and `connect` to `peer`, so `send`/`recv` need no address and the
|
||||
/// kernel filters to this peer. Non-blocking, matching the [`Transport`] contract.
|
||||
pub fn connect(local: &str, peer: &str) -> std::io::Result<Self> {
|
||||
let socket = UdpSocket::bind(local)?;
|
||||
socket.connect(peer)?;
|
||||
Self::grow_buffers(&socket);
|
||||
super::qos::grow_socket_buffers(&socket);
|
||||
// The native data plane is video-dominant — tag it as the video class (opt-in via
|
||||
// PUNKTFUNK_DSCP). Each end marks its own egress.
|
||||
super::qos::set_media_qos(&socket, super::qos::MediaClass::Video);
|
||||
socket.set_nonblocking(true)?;
|
||||
Ok(UdpTransport { socket })
|
||||
}
|
||||
@@ -481,7 +470,8 @@ impl UdpTransport {
|
||||
let target = observed.map(|s| s.to_string());
|
||||
socket.connect(target.as_deref().unwrap_or(fallback_peer))?;
|
||||
socket.set_read_timeout(None)?;
|
||||
Self::grow_buffers(&socket);
|
||||
super::qos::grow_socket_buffers(&socket);
|
||||
super::qos::set_media_qos(&socket, super::qos::MediaClass::Video);
|
||||
socket.set_nonblocking(true)?;
|
||||
Ok((UdpTransport { socket }, punched))
|
||||
}
|
||||
@@ -498,27 +488,6 @@ impl UdpTransport {
|
||||
self.socket.local_addr()
|
||||
}
|
||||
|
||||
/// Best-effort grow of SO_SNDBUF/SO_RCVBUF (see [`TARGET_SOCKBUF`]). A failure isn't fatal
|
||||
/// (the stream just runs lossier); a grant far below the request means the OS cap is too
|
||||
/// low for clean 4K/5K streaming, so warn once with the knob to raise.
|
||||
fn grow_buffers(socket: &UdpSocket) {
|
||||
let sock = socket2::SockRef::from(socket);
|
||||
let _ = sock.set_send_buffer_size(Self::TARGET_SOCKBUF);
|
||||
let _ = sock.set_recv_buffer_size(Self::TARGET_SOCKBUF);
|
||||
// The kernel reports back the (possibly clamped, Linux-doubled) granted size.
|
||||
let granted = sock
|
||||
.send_buffer_size()
|
||||
.unwrap_or(0)
|
||||
.min(sock.recv_buffer_size().unwrap_or(0));
|
||||
if granted < Self::TARGET_SOCKBUF / 4 {
|
||||
tracing::warn!(
|
||||
granted_kb = granted / 1024,
|
||||
"UDP socket buffer capped well below target — high-resolution streaming may drop \
|
||||
frames; raise net.core.wmem_max / net.core.rmem_max (Linux) for clean 4K/5K"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Apple batched receive via `recvmsg_x` — drains up to `out.len()` datagrams in one syscall into
|
||||
/// the caller's reused buffers (the recv counterpart of Linux `recvmmsg`, which Darwin lacks).
|
||||
/// SAFETY: each `MsghdrX` holds a raw pointer into `iovs`, which holds raw pointers into `out`'s
|
||||
|
||||
@@ -57,6 +57,16 @@ pub trait Encoder: Send {
|
||||
/// Force the next submitted frame to be an IDR keyframe (e.g. after a client
|
||||
/// reference-frame-invalidation request). Default: no-op.
|
||||
fn request_keyframe(&mut self) {}
|
||||
/// Invalidate a contiguous range of previously-encoded reference frames (client frame numbers,
|
||||
/// as reported in a loss-recovery request) so the encoder re-references an older still-valid
|
||||
/// frame instead of emitting a full IDR. Returns `true` if a real reference invalidation was
|
||||
/// performed; `false` means the encoder couldn't (range older than the DPB, or the backend has
|
||||
/// no RFI) and the caller should fall back to [`request_keyframe`](Self::request_keyframe).
|
||||
/// Default: `false` — only the Windows direct-NVENC path implements true RFI; libavcodec
|
||||
/// (Linux NVENC) and VAAPI can't express `nvEncInvalidateRefFrames`, so they keyframe.
|
||||
fn invalidate_ref_frames(&mut self, _first_frame: i64, _last_frame: i64) -> bool {
|
||||
false
|
||||
}
|
||||
/// Pull the next encoded AU if one is ready.
|
||||
fn poll(&mut self) -> Result<Option<EncodedFrame>>;
|
||||
/// Signal end-of-stream. After this, drain the remaining AUs with [`poll`](Self::poll)
|
||||
|
||||
@@ -30,6 +30,11 @@ use nvidia_video_codec_sdk::ENCODE_API as API;
|
||||
// GPU-saturating game; this must be ≥ the helper's `PUNKTFUNK_ENCODE_DEPTH` (default 4, clamped ≤ 6).
|
||||
const POOL: usize = 8;
|
||||
|
||||
/// Reference-frame DPB depth when RFI is supported (Apollo uses 5 for H.264/HEVC). A deeper DPB
|
||||
/// lets an invalidated reference fall back to an older still-valid frame instead of a full IDR;
|
||||
/// `numRefL0 = 1` keeps each P-frame single-reference for low latency.
|
||||
const RFI_DPB: u32 = 5;
|
||||
|
||||
fn codec_guid(codec: Codec) -> nv::GUID {
|
||||
match codec {
|
||||
Codec::H264 => nv::NV_ENC_CODEC_H264_GUID,
|
||||
@@ -40,6 +45,7 @@ fn codec_guid(codec: Codec) -> nv::GUID {
|
||||
|
||||
pub struct NvencD3d11Encoder {
|
||||
encoder: *mut c_void,
|
||||
codec: Codec,
|
||||
codec_guid: nv::GUID,
|
||||
width: u32,
|
||||
height: u32,
|
||||
@@ -63,6 +69,14 @@ pub struct NvencD3d11Encoder {
|
||||
frame_idx: i64,
|
||||
force_kf: bool,
|
||||
inited: bool,
|
||||
/// GPU capabilities probed once via `nvEncGetEncodeCaps` before configuring (Apollo's
|
||||
/// `get_encoder_cap`): gates 10-bit/custom-VBV/RFI on what this card actually supports instead
|
||||
/// of failing later as an opaque `InvalidParam`. Set by [`query_caps`](Self::query_caps).
|
||||
rfi_supported: bool,
|
||||
custom_vbv: bool,
|
||||
/// The last reference-frame range we invalidated — dedupes repeated RFI requests for the same
|
||||
/// loss event (the client resends until it sees recovery).
|
||||
last_rfi_range: Option<(i64, i64)>,
|
||||
/// Raw ptr of the D3D11 device this session was initialized with. The capturer recreates the
|
||||
/// device on a desktop switch (normal ↔ Winlogon secure); when a frame carries a new device we
|
||||
/// tear down and re-init NVENC against it.
|
||||
@@ -84,6 +98,7 @@ impl NvencD3d11Encoder {
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
encoder: ptr::null_mut(),
|
||||
codec,
|
||||
codec_guid: codec_guid(codec),
|
||||
width,
|
||||
height,
|
||||
@@ -99,6 +114,9 @@ impl NvencD3d11Encoder {
|
||||
frame_idx: 0,
|
||||
force_kf: false,
|
||||
inited: false,
|
||||
rfi_supported: false,
|
||||
custom_vbv: false,
|
||||
last_rfi_range: None,
|
||||
init_device: ptr::null_mut(),
|
||||
})
|
||||
}
|
||||
@@ -128,6 +146,88 @@ impl NvencD3d11Encoder {
|
||||
self.encoder = ptr::null_mut();
|
||||
self.inited = false;
|
||||
self.next = 0;
|
||||
// The new session starts with an empty DPB (its first frame is an IDR), so any prior
|
||||
// invalidation range is meaningless against it.
|
||||
self.last_rfi_range = None;
|
||||
}
|
||||
|
||||
/// Query one `NV_ENC_CAPS` value for this codec on an open session; 0 on any error (treat an
|
||||
/// unqueryable cap as "unsupported", the conservative choice).
|
||||
unsafe fn get_cap(&self, enc: *mut c_void, which: nv::NV_ENC_CAPS) -> i32 {
|
||||
let mut param = nv::NV_ENC_CAPS_PARAM {
|
||||
version: nv::NV_ENC_CAPS_PARAM_VER,
|
||||
capsToQuery: which,
|
||||
reserved: [0; 62],
|
||||
};
|
||||
let mut val: i32 = 0;
|
||||
match (API.get_encode_caps)(enc, self.codec_guid, &mut param, &mut val)
|
||||
.result_without_string()
|
||||
{
|
||||
Ok(()) => val,
|
||||
Err(_) => 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Probe this GPU's real capabilities once (Apollo's `get_encoder_cap`) before the bitrate-probe
|
||||
/// loop configures the session: opens a throwaway session, queries the codec's max dimensions +
|
||||
/// 10-bit / custom-VBV / ref-pic-invalidation support, destroys it. Rejects an out-of-range mode
|
||||
/// up front with a clear error, downgrades 10-bit→8-bit when unsupported, and records the
|
||||
/// RFI/custom-VBV flags the config + [`invalidate_ref_frames`](Encoder::invalidate_ref_frames)
|
||||
/// gate on. Without this, an unsupported config surfaces only as an opaque `InvalidParam` that
|
||||
/// the bitrate-clamp search misreads as "bitrate too high" and binary-searches into the floor.
|
||||
unsafe fn query_caps(&mut self, device: &ID3D11Device) -> Result<()> {
|
||||
let mut params = nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS {
|
||||
version: nv::NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER,
|
||||
deviceType: nv::NV_ENC_DEVICE_TYPE::NV_ENC_DEVICE_TYPE_DIRECTX,
|
||||
device: device.as_raw(),
|
||||
apiVersion: nv::NVENCAPI_VERSION,
|
||||
..Default::default()
|
||||
};
|
||||
let mut enc: *mut c_void = ptr::null_mut();
|
||||
(API.open_encode_session_ex)(&mut params, &mut enc)
|
||||
.result_without_string()
|
||||
.map_err(|e| {
|
||||
anyhow!("NVENC open_encode_session_ex (caps probe): {e:?} (no NVIDIA GPU?)")
|
||||
})?;
|
||||
let wmax = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_WIDTH_MAX);
|
||||
let hmax = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_HEIGHT_MAX);
|
||||
let ten_bit = self.get_cap(enc, nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_10BIT_ENCODE);
|
||||
let rfi = self.get_cap(
|
||||
enc,
|
||||
nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_REF_PIC_INVALIDATION,
|
||||
);
|
||||
let custom_vbv = self.get_cap(
|
||||
enc,
|
||||
nv::NV_ENC_CAPS::NV_ENC_CAPS_SUPPORT_CUSTOM_VBV_BUF_SIZE,
|
||||
);
|
||||
let _ = (API.destroy_encoder)(enc);
|
||||
|
||||
// Reject an over-range mode with a clear message instead of an opaque InvalidParam.
|
||||
if wmax > 0 && hmax > 0 && (self.width as i32 > wmax || self.height as i32 > hmax) {
|
||||
bail!(
|
||||
"this GPU's NVENC max encode size for {:?} is {wmax}x{hmax}; client requested \
|
||||
{}x{} (lower the client resolution or use a codec/GPU that supports it)",
|
||||
self.codec,
|
||||
self.width,
|
||||
self.height
|
||||
);
|
||||
}
|
||||
// Degrade gracefully rather than fail: no 10-bit encode on this card → 8-bit SDR.
|
||||
if self.bit_depth >= 10 && ten_bit == 0 {
|
||||
tracing::warn!("NVENC: this GPU can't 10-bit encode — falling back to 8-bit SDR");
|
||||
self.bit_depth = 8;
|
||||
self.hdr = false;
|
||||
}
|
||||
self.rfi_supported = rfi != 0;
|
||||
self.custom_vbv = custom_vbv != 0;
|
||||
tracing::info!(
|
||||
rfi = self.rfi_supported,
|
||||
custom_vbv = self.custom_vbv,
|
||||
max = %format!("{wmax}x{hmax}"),
|
||||
ten_bit = ten_bit != 0,
|
||||
"NVENC capabilities probed"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open + configure + initialize ONE NVENC session at `bitrate` (bps) and `split_mode`. Returns
|
||||
@@ -181,10 +281,13 @@ impl NvencD3d11Encoder {
|
||||
let bps = bitrate.min(u32::MAX as u64) as u32;
|
||||
cfg.rcParams.averageBitRate = bps;
|
||||
cfg.rcParams.maxBitRate = bps;
|
||||
// Shrink the VBV with the bitrate — NVENC validates it against the same level ceiling.
|
||||
let vbv = (bitrate as f64 / self.fps.max(1) as f64) as u32;
|
||||
cfg.rcParams.vbvBufferSize = vbv;
|
||||
cfg.rcParams.vbvInitialDelay = vbv;
|
||||
// Shrink the VBV with the bitrate — NVENC validates it against the same level ceiling. Only
|
||||
// when the GPU advertises custom-VBV support (else leave the preset default, per the caps probe).
|
||||
if self.custom_vbv {
|
||||
let vbv = (bitrate as f64 / self.fps.max(1) as f64) as u32;
|
||||
cfg.rcParams.vbvBufferSize = vbv;
|
||||
cfg.rcParams.vbvInitialDelay = vbv;
|
||||
}
|
||||
|
||||
// HIGH tier + autoselect level. The codec's PER-LEVEL bitrate ceiling is otherwise the
|
||||
// MAIN-tier cap — for HEVC at 5K that's Level 6.2 Main ≈ 240 Mbps. HIGH tier lifts the HEVC
|
||||
@@ -212,6 +315,27 @@ impl NvencD3d11Encoder {
|
||||
vui.colourMatrix = nv::NV_ENC_VUI_MATRIX_COEFFS::NV_ENC_VUI_MATRIX_COEFFS_BT2020_NCL;
|
||||
}
|
||||
|
||||
// Reference-frame invalidation: keep a deeper DPB so an invalidated reference can fall back
|
||||
// to an older still-valid frame instead of a full IDR, while `numRefL0 = 1` keeps each
|
||||
// P-frame single-reference for low latency. Only when this GPU supports RFI (else leave the
|
||||
// preset default — `invalidate_ref_frames` then returns false and the caller forces an IDR).
|
||||
if self.rfi_supported {
|
||||
let one = nv::NV_ENC_NUM_REF_FRAMES::NV_ENC_NUM_REF_FRAMES_1;
|
||||
match self.codec {
|
||||
Codec::H264 => {
|
||||
cfg.encodeCodecConfig.h264Config.maxNumRefFrames = RFI_DPB;
|
||||
cfg.encodeCodecConfig.h264Config.numRefL0 = one;
|
||||
}
|
||||
Codec::H265 => {
|
||||
cfg.encodeCodecConfig.hevcConfig.maxNumRefFramesInDPB = RFI_DPB;
|
||||
cfg.encodeCodecConfig.hevcConfig.numRefL0 = one;
|
||||
}
|
||||
Codec::Av1 => {
|
||||
cfg.encodeCodecConfig.av1Config.maxNumRefFramesInDPB = RFI_DPB;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut init = nv::NV_ENC_INITIALIZE_PARAMS {
|
||||
version: nv::NV_ENC_INITIALIZE_PARAMS_VER,
|
||||
encodeGUID: self.codec_guid,
|
||||
@@ -242,6 +366,10 @@ impl NvencD3d11Encoder {
|
||||
/// Lazily create the session on the first frame's D3D11 device (so capture + encode share it).
|
||||
fn init_session(&mut self, device: &ID3D11Device) -> Result<()> {
|
||||
unsafe {
|
||||
// Probe real GPU caps first (max dims / 10-bit / custom-VBV / RFI) so the config below is
|
||||
// gated on what this card supports and an out-of-range mode fails with a clear error
|
||||
// rather than being misread as a too-high bitrate by the clamp search.
|
||||
self.query_caps(device)?;
|
||||
// Bitrate clamp (see the search below): NVENC rejects `initialize_encoder` when the bitrate
|
||||
// exceeds the GPU's max codec level. We try the requested rate, then binary-search down to
|
||||
// the MAX the level accepts and clamp to it — so an over-asking client (e.g. 1 Gbps on HEVC)
|
||||
@@ -521,6 +649,47 @@ impl Encoder for NvencD3d11Encoder {
|
||||
self.force_kf = true;
|
||||
}
|
||||
|
||||
fn invalidate_ref_frames(&mut self, first: i64, last: i64) -> bool {
|
||||
// No live session, the GPU can't invalidate, or a nonsense range → caller forces a full IDR.
|
||||
// (NVENC handles are single-threaded; this runs on the encode thread, like submit/poll.)
|
||||
if self.encoder.is_null() || !self.rfi_supported || first < 0 || first > last {
|
||||
return false;
|
||||
}
|
||||
// Already invalidated a covering range for this loss event — nothing more to do, no IDR.
|
||||
if let Some((pf, pl)) = self.last_rfi_range {
|
||||
if first >= pf && last <= pl {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// `frame_idx` is the NEXT timestamp to assign, so the last encoded frame is `frame_idx - 1`
|
||||
// and the DPB holds `[frame_idx - RFI_DPB, frame_idx - 1]`. A lost frame older than that
|
||||
// can't be invalidated, so the only correct recovery is an IDR.
|
||||
let oldest_in_dpb = self.frame_idx - RFI_DPB as i64;
|
||||
if first < oldest_in_dpb {
|
||||
return false;
|
||||
}
|
||||
// Clamp to frames we've actually encoded (don't invalidate a timestamp we never assigned).
|
||||
let last = last.min(self.frame_idx - 1);
|
||||
if first > last {
|
||||
return false;
|
||||
}
|
||||
// We tag each input with `inputTimeStamp = frame_idx` (0,1,2,…), which is also the client's
|
||||
// frame number (the packetizer numbers frames in submit order), so the client's lost-frame
|
||||
// range maps 1:1 onto the timestamps NVENC invalidates here.
|
||||
unsafe {
|
||||
for ts in first..=last {
|
||||
if (API.invalidate_ref_frames)(self.encoder, ts as u64)
|
||||
.result_without_string()
|
||||
.is_err()
|
||||
{
|
||||
return false; // any failure → fall back to IDR
|
||||
}
|
||||
}
|
||||
}
|
||||
self.last_rfi_range = Some((first, last));
|
||||
true
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Result<Option<EncodedFrame>> {
|
||||
let Some((bs, map, pts_ns)) = self.pending.pop_front() else {
|
||||
return Ok(None);
|
||||
|
||||
@@ -303,6 +303,9 @@ fn run(
|
||||
audio_cap: &std::sync::Mutex<Option<Box<dyn AudioCapturer>>>,
|
||||
) -> Result<()> {
|
||||
let sock = UdpSocket::bind(("0.0.0.0", AUDIO_PORT)).context("bind audio UDP")?;
|
||||
// Grow SO_SNDBUF/RCVBUF + opt-in DSCP/QoS-tag this as the audio class (PUNKTFUNK_DSCP=1).
|
||||
punktfunk_core::transport::grow_socket_buffers(&sock);
|
||||
punktfunk_core::transport::set_media_qos(&sock, punktfunk_core::transport::MediaClass::Audio);
|
||||
// The client pings the audio port (~every 500ms) so we learn where to send.
|
||||
sock.set_read_timeout(Some(Duration::from_secs(10)))?;
|
||||
tracing::info!(port = AUDIO_PORT, "audio: awaiting client ping");
|
||||
|
||||
@@ -24,10 +24,11 @@
|
||||
|
||||
use super::{AppState, CONTROL_PORT};
|
||||
use crate::inject::gamepad::GamepadManager;
|
||||
use crate::inject::InputInjector;
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use punktfunk_core::input::InputEvent;
|
||||
use rusty_enet::{Event, Host, HostSettings, Packet, PeerID};
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -53,12 +54,14 @@ pub fn spawn(state: Arc<AppState>) -> Result<()> {
|
||||
std::thread::Builder::new()
|
||||
.name("punktfunk-control".into())
|
||||
.spawn(move || {
|
||||
// Thread-local (the injector owns non-Send Wayland/xkb state, so it must be
|
||||
// created and live here rather than be captured into the closure).
|
||||
// GCM scheme detected from the first authenticating packet; reused thereafter.
|
||||
let mut detected: Option<Scheme> = None;
|
||||
// Lazily opened on the first input event (Sway's Wayland socket is up by then).
|
||||
let mut injector: Option<Box<dyn InputInjector>> = None;
|
||||
// Decoded keyboard/mouse is forwarded to a dedicated host-lifetime injector thread —
|
||||
// NEVER injected inline, so a slow Wayland/libei/SendInput call can't head-block ENet
|
||||
// keepalive/retransmit servicing on this thread. The injector owns non-Send compositor
|
||||
// state and lives on its own thread (see crate::inject::InjectorService); the held
|
||||
// `inj_tx` clone keeps it alive for the control thread's lifetime.
|
||||
let inj_tx = crate::inject::InjectorService::start().sender();
|
||||
// Virtual gamepads (uinput) + the host→client rumble sequence counter.
|
||||
let mut pads = GamepadManager::new();
|
||||
let mut rumble_seq: u32 = 0;
|
||||
@@ -86,7 +89,7 @@ pub fn spawn(state: Arc<AppState>) -> Result<()> {
|
||||
channel_id,
|
||||
packet.data(),
|
||||
&mut detected,
|
||||
&mut injector,
|
||||
&inj_tx,
|
||||
&mut pads,
|
||||
);
|
||||
}
|
||||
@@ -128,6 +131,19 @@ pub fn spawn(state: Arc<AppState>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decode the lost-frame range from an invalidate-reference-frames (0x0301) control message: two
|
||||
/// little-endian `i64` (firstFrame, lastFrame) after the 4-byte `[u16 type][u16 length]` header,
|
||||
/// matching Sunshine/Apollo's `IDX_INVALIDATE_REF_FRAMES`. Returns `None` when the body is too
|
||||
/// short or the range is nonsensical, in which case the caller falls back to a full IDR.
|
||||
fn decode_rfi_range(pt: &[u8]) -> Option<(i64, i64)> {
|
||||
if pt.len() < 20 {
|
||||
return None;
|
||||
}
|
||||
let first = i64::from_le_bytes(pt[4..12].try_into().ok()?);
|
||||
let last = i64::from_le_bytes(pt[12..20].try_into().ok()?);
|
||||
(first >= 0 && last >= first).then_some((first, last))
|
||||
}
|
||||
|
||||
/// Handle one received control packet: decrypt it (learning the GCM scheme on the first one),
|
||||
/// decode any input event, and inject it into the host session.
|
||||
fn on_receive(
|
||||
@@ -135,7 +151,7 @@ fn on_receive(
|
||||
_channel_id: u8,
|
||||
d: &[u8],
|
||||
detected: &mut Option<Scheme>,
|
||||
injector: &mut Option<Box<dyn InputInjector>>,
|
||||
inj_tx: &Sender<InputEvent>,
|
||||
pads: &mut GamepadManager,
|
||||
) {
|
||||
let Some(key) = state.launch.lock().unwrap().map(|s| s.gcm_key) else {
|
||||
@@ -160,17 +176,32 @@ fn on_receive(
|
||||
}
|
||||
};
|
||||
|
||||
// Recovery requests after loss: invalidate-reference-frames (0x0301, Gen7) or request-IDR
|
||||
// (0x0302, Gen7Enc). Force a keyframe so the client can resync without a multi-second stall.
|
||||
// Recovery requests after loss. Invalidate-reference-frames (0x0301, Gen7) carries the lost
|
||||
// frame range (two LE i64 after the [type][len] header, like Sunshine/Apollo's
|
||||
// IDX_INVALIDATE_REF_FRAMES) — route it to the encoder, which invalidates those refs instead of
|
||||
// a full IDR when it can (NVENC RFI). Request-IDR (0x0302 / 0x0305) and a malformed 0x0301 force
|
||||
// a keyframe. The video thread drains rfi_range/force_idr and resyncs without a multi-second stall.
|
||||
if pt.len() >= 2 {
|
||||
let inner = u16::from_le_bytes([pt[0], pt[1]]);
|
||||
if matches!(inner, 0x0301 | 0x0302 | 0x0305) {
|
||||
if inner == 0x0301 {
|
||||
if let Some((first, last)) = decode_rfi_range(&pt) {
|
||||
*state.rfi_range.lock().unwrap() = Some((first, last));
|
||||
tracing::info!(first, last, "control: RFI request → invalidate ref frames");
|
||||
} else {
|
||||
state
|
||||
.force_idr
|
||||
.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
tracing::info!("control: RFI request (no range) → keyframe");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if matches!(inner, 0x0302 | 0x0305) {
|
||||
state
|
||||
.force_idr
|
||||
.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
tracing::info!(
|
||||
ty = format!("{inner:#06x}"),
|
||||
"control: IDR/RFI request → keyframe"
|
||||
"control: IDR request → keyframe"
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -187,27 +218,11 @@ fn on_receive(
|
||||
return; // keepalive / QoS / unhandled input kind
|
||||
}
|
||||
|
||||
// Open the injector on demand — by the first input event the compositor session is up.
|
||||
// Backend auto-selects per desktop (wlr on Sway, libei on KWin/GNOME); override with
|
||||
// PUNKTFUNK_INPUT_BACKEND.
|
||||
if injector.is_none() {
|
||||
let backend = crate::inject::default_backend();
|
||||
match crate::inject::open(backend) {
|
||||
Ok(i) => {
|
||||
tracing::info!(?backend, "input injection backend opened");
|
||||
*injector = Some(i);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), "input injection unavailable");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
let inj = injector.as_mut().unwrap();
|
||||
// Forward to the dedicated injector thread (it opens the backend on the first event and
|
||||
// coalesces redundant motion). A closed channel means the injector thread died at startup —
|
||||
// input is lossy, so drop silently rather than spam.
|
||||
for ev in events {
|
||||
if let Err(e) = inj.inject(&ev) {
|
||||
tracing::warn!(error = %format!("{e:#}"), "inject failed");
|
||||
}
|
||||
let _ = inj_tx.send(ev);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -426,3 +441,29 @@ fn gcm_open(key: &[u8; 16], nonce: &[u8], ct_tag: &[u8], aad: &[u8]) -> Option<V
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::decode_rfi_range;
|
||||
|
||||
/// Build a 0x0301 invalidate-ref-frames plaintext: `[type LE][len LE][firstFrame i64 LE][last i64 LE]`.
|
||||
fn rfi_msg(first: i64, last: i64) -> Vec<u8> {
|
||||
let mut v = vec![0x01, 0x03, 0x10, 0x00]; // type 0x0301, length 16
|
||||
v.extend_from_slice(&first.to_le_bytes());
|
||||
v.extend_from_slice(&last.to_le_bytes());
|
||||
v
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decodes_a_valid_rfi_range() {
|
||||
assert_eq!(decode_rfi_range(&rfi_msg(40, 47)), Some((40, 47)));
|
||||
assert_eq!(decode_rfi_range(&rfi_msg(5, 5)), Some((5, 5))); // single frame
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_short_or_nonsensical_ranges() {
|
||||
assert_eq!(decode_rfi_range(&[0x01, 0x03, 0x00, 0x00]), None); // header only, no body
|
||||
assert_eq!(decode_rfi_range(&rfi_msg(-1, 9)), None); // negative first
|
||||
assert_eq!(decode_rfi_range(&rfi_msg(9, 4)), None); // last < first
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,6 +113,10 @@ pub struct AppState {
|
||||
/// Set by the control stream when the client requests an IDR / invalidates reference
|
||||
/// frames (recovery after loss); the video thread forces a keyframe and clears it.
|
||||
pub force_idr: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
/// A client reference-frame-invalidation request carrying the lost frame range (0x0301). The
|
||||
/// video thread drains it and calls `Encoder::invalidate_ref_frames`, falling back to a full
|
||||
/// IDR when the encoder can't invalidate (range too old / no NVENC RFI). `None` = nothing pending.
|
||||
pub rfi_range: std::sync::Arc<std::sync::Mutex<Option<(i64, i64)>>>,
|
||||
/// Persistent screen capturer, reused across streams so reconnects don't spawn a second
|
||||
/// (conflicting) screencast session. The video thread borrows it for the stream's duration
|
||||
/// and returns it; `set_active` gates its cost while idle.
|
||||
@@ -138,6 +142,7 @@ impl AppState {
|
||||
streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
audio_streaming: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
force_idr: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
rfi_range: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
video_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
audio_cap: std::sync::Arc::new(std::sync::Mutex::new(None)),
|
||||
}
|
||||
@@ -293,18 +298,30 @@ fn load_paired() -> Vec<Vec<u8>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Persist the paired-client allow-list (called after each successful pairing).
|
||||
/// Persist the paired-client allow-list (called after each successful pairing). Written
|
||||
/// atomically (temp file + rename) so a crash mid-write can't truncate `paired.json` — a partial
|
||||
/// write would otherwise lock out every paired client until they re-pair.
|
||||
pub(crate) fn save_paired(paired: &[Vec<u8>]) {
|
||||
let Some(path) = paired_path() else { return };
|
||||
if let Some(dir) = path.parent() {
|
||||
let _ = std::fs::create_dir_all(dir);
|
||||
}
|
||||
match serde_json::to_vec(paired) {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = std::fs::write(&path, bytes) {
|
||||
tracing::warn!(error = %e, "persisting pairings failed");
|
||||
}
|
||||
let bytes = match serde_json::to_vec(paired) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "serializing pairings failed");
|
||||
return;
|
||||
}
|
||||
Err(e) => tracing::warn!(error = %e, "serializing pairings failed"),
|
||||
};
|
||||
// Write to a sibling temp file, then rename over the target (atomic replace on Unix and
|
||||
// Windows). Never write `path` in place.
|
||||
let tmp = path.with_extension("json.tmp");
|
||||
if let Err(e) = std::fs::write(&tmp, &bytes) {
|
||||
tracing::warn!(error = %e, "persisting pairings failed (temp write)");
|
||||
return;
|
||||
}
|
||||
if let Err(e) = std::fs::rename(&tmp, &path) {
|
||||
tracing::warn!(error = %e, "persisting pairings failed (rename)");
|
||||
let _ = std::fs::remove_file(&tmp);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
//! `/pin` endpoint to deliver the Moonlight-displayed PIN. Over HTTPS the client is
|
||||
//! mutual-TLS-authenticated, so `/serverinfo` reports `PairStatus=1` there.
|
||||
|
||||
use super::tls::PeerCertFingerprint;
|
||||
use super::{serverinfo, AppState, LaunchSession, HTTPS_PORT, HTTP_PORT, RTSP_PORT};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use axum::{
|
||||
@@ -23,24 +24,36 @@ struct Https(bool);
|
||||
pub async fn run(state: Arc<AppState>) -> Result<()> {
|
||||
// Mutual-TLS: request + verify the client cert (Moonlight presents one for the
|
||||
// post-pairing pairchallenge + all post-pair endpoints).
|
||||
let tls = axum_server::tls_rustls::RustlsConfig::from_config(super::tls::server_config(
|
||||
&state.identity.cert_pem,
|
||||
&state.identity.key_pem,
|
||||
)?);
|
||||
let tls = super::tls::server_config(&state.identity.cert_pem, &state.identity.key_pem)?;
|
||||
|
||||
let http_addr = SocketAddr::from(([0, 0, 0, 0], HTTP_PORT));
|
||||
let https_addr = SocketAddr::from(([0, 0, 0, 0], HTTPS_PORT));
|
||||
tracing::info!(%http_addr, %https_addr, "nvhttp listening (serverinfo + pair + launch)");
|
||||
|
||||
let http = axum_server::bind(http_addr).serve(router(state.clone(), false).into_make_service());
|
||||
let https =
|
||||
axum_server::bind_rustls(https_addr, tls).serve(router(state, true).into_make_service());
|
||||
tokio::try_join!(async { http.await.context("nvhttp HTTP server") }, async {
|
||||
https.await.context("nvhttp HTTPS server")
|
||||
},)?;
|
||||
// HTTPS runs the handshake itself (super::tls::serve_https) so handlers see the verified peer
|
||||
// cert as a PeerCertFingerprint extension; the post-pair endpoints gate on the paired allow-list.
|
||||
tokio::try_join!(
|
||||
async { http.await.context("nvhttp HTTP server") },
|
||||
super::tls::serve_https(https_addr, router(state, true), tls),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// True iff the request arrived over HTTPS with a client cert whose SHA-256 fingerprint is pinned
|
||||
/// in the paired allow-list. Plain-HTTP requests carry no client cert and are never paired. This is
|
||||
/// the post-handshake authorization check (Apollo's `get_verified_cert`) gating the launch surface.
|
||||
fn peer_is_paired(peer: &Option<Extension<PeerCertFingerprint>>, st: &AppState) -> bool {
|
||||
let Some(Extension(PeerCertFingerprint(Some(fp)))) = peer else {
|
||||
return false;
|
||||
};
|
||||
st.paired
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|der| hex::encode(punktfunk_core::quic::endpoint::cert_fingerprint(der)) == *fp)
|
||||
}
|
||||
|
||||
fn router(state: Arc<AppState>, https: bool) -> Router {
|
||||
Router::new()
|
||||
.route("/serverinfo", get(h_serverinfo))
|
||||
@@ -61,9 +74,12 @@ fn xml(body: String) -> impl IntoResponse {
|
||||
async fn h_serverinfo(
|
||||
State(st): State<Arc<AppState>>,
|
||||
Extension(Https(https)): Extension<Https>,
|
||||
peer: Option<Extension<PeerCertFingerprint>>,
|
||||
) -> impl IntoResponse {
|
||||
// Over the mutual-TLS port the peer is an authenticated (paired) client → PairStatus=1.
|
||||
xml(serverinfo::serverinfo_xml(&st.host, https))
|
||||
// PairStatus=1 only when the HTTPS peer presented a *pinned* client cert; an unpaired client
|
||||
// (or plain HTTP) sees 0 and is steered into the pairing flow.
|
||||
let paired = https && peer_is_paired(&peer, &st);
|
||||
xml(serverinfo::serverinfo_xml(&st.host, https, paired))
|
||||
}
|
||||
|
||||
async fn h_pin(
|
||||
@@ -79,15 +95,27 @@ async fn h_pin(
|
||||
}
|
||||
}
|
||||
|
||||
async fn h_applist(State(_st): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
async fn h_applist(
|
||||
State(st): State<Arc<AppState>>,
|
||||
peer: Option<Extension<PeerCertFingerprint>>,
|
||||
) -> impl IntoResponse {
|
||||
if !peer_is_paired(&peer, &st) {
|
||||
tracing::warn!("applist rejected — client is not paired");
|
||||
return xml(error_xml());
|
||||
}
|
||||
// One app for now: the headless desktop (the wlroots virtual output).
|
||||
xml(super::apps::applist_xml())
|
||||
}
|
||||
|
||||
async fn h_launch(
|
||||
State(st): State<Arc<AppState>>,
|
||||
peer: Option<Extension<PeerCertFingerprint>>,
|
||||
Query(q): Query<HashMap<String, String>>,
|
||||
) -> impl IntoResponse {
|
||||
if !peer_is_paired(&peer, &st) {
|
||||
tracing::warn!("launch rejected — client is not paired");
|
||||
return xml(error_xml());
|
||||
}
|
||||
match launch(&st, &q) {
|
||||
Ok(session) => {
|
||||
*st.launch.lock().unwrap() = Some(session);
|
||||
@@ -108,7 +136,14 @@ async fn h_launch(
|
||||
}
|
||||
}
|
||||
|
||||
async fn h_resume(State(st): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
async fn h_resume(
|
||||
State(st): State<Arc<AppState>>,
|
||||
peer: Option<Extension<PeerCertFingerprint>>,
|
||||
) -> impl IntoResponse {
|
||||
if !peer_is_paired(&peer, &st) {
|
||||
tracing::warn!("resume rejected — client is not paired");
|
||||
return xml(error_xml());
|
||||
}
|
||||
if st.launch.lock().unwrap().is_some() {
|
||||
xml(session_url_xml(&st, "resume"))
|
||||
} else {
|
||||
@@ -116,7 +151,14 @@ async fn h_resume(State(st): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
}
|
||||
}
|
||||
|
||||
async fn h_cancel(State(st): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
async fn h_cancel(
|
||||
State(st): State<Arc<AppState>>,
|
||||
peer: Option<Extension<PeerCertFingerprint>>,
|
||||
) -> impl IntoResponse {
|
||||
if !peer_is_paired(&peer, &st) {
|
||||
tracing::warn!("cancel rejected — client is not paired");
|
||||
return xml(error_xml());
|
||||
}
|
||||
*st.launch.lock().unwrap() = None;
|
||||
// Quit semantics: stop the running media threads (they observe these flags) so the session
|
||||
// actually ends — the virtual output/gamescope teardown follows via the capturer's RAII.
|
||||
@@ -234,3 +276,56 @@ fn pair_error_xml() -> String {
|
||||
fn error_xml() -> String {
|
||||
"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<root status_code=\"400\"></root>\n".to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
|
||||
fn test_state() -> Arc<AppState> {
|
||||
let host = super::super::Host {
|
||||
hostname: "t".into(),
|
||||
uniqueid: "id".into(),
|
||||
local_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
http_port: HTTP_PORT,
|
||||
https_port: HTTPS_PORT,
|
||||
};
|
||||
let identity = super::super::cert::ServerIdentity::ephemeral().expect("ephemeral identity");
|
||||
Arc::new(AppState::new(host, identity))
|
||||
}
|
||||
|
||||
fn fp_of(der: &[u8]) -> String {
|
||||
hex::encode(punktfunk_core::quic::endpoint::cert_fingerprint(der))
|
||||
}
|
||||
|
||||
/// The launch surface (launch/resume/applist/cancel) must reject any client whose cert
|
||||
/// fingerprint is not in the paired allow-list — including a certless (plain-HTTP) peer.
|
||||
#[test]
|
||||
fn launch_gate_requires_a_pinned_client_cert() {
|
||||
let st = test_state();
|
||||
let der = b"a-client-cert-der".to_vec();
|
||||
let peer = Some(Extension(PeerCertFingerprint(Some(fp_of(&der)))));
|
||||
|
||||
// Empty allow-list: a presented cert, an absent extension, and an explicit None all fail.
|
||||
assert!(!peer_is_paired(&peer, &st), "unknown cert must be rejected");
|
||||
assert!(
|
||||
!peer_is_paired(&None, &st),
|
||||
"no client cert must be rejected"
|
||||
);
|
||||
assert!(
|
||||
!peer_is_paired(&Some(Extension(PeerCertFingerprint(None))), &st),
|
||||
"certless HTTPS peer must be rejected"
|
||||
);
|
||||
|
||||
// After pinning, the same fingerprint is accepted but a different cert still isn't.
|
||||
st.paired.lock().unwrap().push(der);
|
||||
assert!(peer_is_paired(&peer, &st), "pinned cert must be accepted");
|
||||
let other = Some(Extension(PeerCertFingerprint(Some(fp_of(
|
||||
b"different-der",
|
||||
)))));
|
||||
assert!(
|
||||
!peer_is_paired(&other, &st),
|
||||
"a non-pinned cert stays rejected"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,6 +182,7 @@ fn handle_request(req: &Request, state: &AppState) -> String {
|
||||
app,
|
||||
state.streaming.clone(),
|
||||
state.force_idr.clone(),
|
||||
state.rfi_range.clone(),
|
||||
state.video_cap.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3,18 +3,19 @@
|
||||
use super::{Host, APP_VERSION, GFE_VERSION, SERVER_CODEC_MODE_SUPPORT};
|
||||
|
||||
/// Build the `<root status_code="200">…</root>` serverinfo document. `https` selects the
|
||||
/// paired-HTTPS variant (real MAC). Element names are case-sensitive and match what
|
||||
/// moonlight-common-c parses.
|
||||
pub fn serverinfo_xml(host: &Host, https: bool) -> String {
|
||||
// MAC is hidden over plain HTTP; PairStatus reflects the pairing store once the HTTPS
|
||||
// path carries per-client identity (a hardening follow-up — 0 for now).
|
||||
/// paired-HTTPS variant (real MAC); `paired` is whether the HTTPS peer presented a client cert
|
||||
/// that is in the paired allow-list (drives `PairStatus`). Element names are case-sensitive and
|
||||
/// match what moonlight-common-c parses.
|
||||
pub fn serverinfo_xml(host: &Host, https: bool, paired: bool) -> String {
|
||||
// MAC is hidden over plain HTTP (no per-client identity there).
|
||||
let mac = if https {
|
||||
"01:02:03:04:05:06"
|
||||
} else {
|
||||
"00:00:00:00:00:00"
|
||||
};
|
||||
// Over the mutual-TLS HTTPS port the peer is an authenticated (paired) client.
|
||||
let pair_status = u8::from(https);
|
||||
// PairStatus reflects the real allow-list: 1 only when the HTTPS peer's client-cert
|
||||
// fingerprint is pinned (the nvhttp handler computes `paired`); 0 otherwise (incl. plain HTTP).
|
||||
let pair_status = u8::from(paired);
|
||||
let codec_mode_support = codec_mode_support();
|
||||
format!(
|
||||
r#"<?xml version="1.0" encoding="utf-8"?>
|
||||
@@ -104,7 +105,7 @@ mod tests {
|
||||
http_port: 47989,
|
||||
https_port: 47984,
|
||||
};
|
||||
let xml = serverinfo_xml(&host, false);
|
||||
let xml = serverinfo_xml(&host, false, false);
|
||||
// The mask is the GPU-aware value (NVENC/no-GPU → the static 65793; a VAAPI host →
|
||||
// whatever it probes). Assert the XML embeds exactly what `codec_mode_support()` returns,
|
||||
// so the test is deterministic regardless of the build host's GPU.
|
||||
|
||||
@@ -31,6 +31,10 @@ pub struct StreamConfig {
|
||||
/// streams so a reconnect doesn't open a second (conflicting) screencast session.
|
||||
pub type CapturerSlot = Arc<std::sync::Mutex<Option<Box<dyn Capturer>>>>;
|
||||
|
||||
/// A pending client reference-frame-invalidation range (lost `firstFrame..=lastFrame`), set by the
|
||||
/// control plane and drained by the video thread (see [`AppState::rfi_range`](super::AppState)).
|
||||
pub type RfiSlot = Arc<std::sync::Mutex<Option<(i64, i64)>>>;
|
||||
|
||||
/// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears.
|
||||
/// `force_idr` is set by the control stream on a client recovery request; `video_cap` holds
|
||||
/// the persistent capturer the thread borrows for the stream's duration.
|
||||
@@ -39,13 +43,21 @@ pub fn start(
|
||||
app: Option<super::apps::AppEntry>,
|
||||
running: Arc<AtomicBool>,
|
||||
force_idr: Arc<AtomicBool>,
|
||||
rfi_range: RfiSlot,
|
||||
video_cap: CapturerSlot,
|
||||
) {
|
||||
let _ = std::thread::Builder::new()
|
||||
.name("punktfunk-video".into())
|
||||
.spawn(move || {
|
||||
tracing::info!(?cfg, "video stream starting");
|
||||
if let Err(e) = run(cfg, app.as_ref(), &running, &force_idr, &video_cap) {
|
||||
if let Err(e) = run(
|
||||
cfg,
|
||||
app.as_ref(),
|
||||
&running,
|
||||
&force_idr,
|
||||
&rfi_range,
|
||||
&video_cap,
|
||||
) {
|
||||
tracing::error!(error = %format!("{e:#}"), "video stream failed");
|
||||
}
|
||||
running.store(false, Ordering::SeqCst);
|
||||
@@ -58,6 +70,7 @@ fn run(
|
||||
app: Option<&super::apps::AppEntry>,
|
||||
running: &Arc<AtomicBool>,
|
||||
force_idr: &AtomicBool,
|
||||
rfi_range: &std::sync::Mutex<Option<(i64, i64)>>,
|
||||
video_cap: &std::sync::Mutex<Option<Box<dyn Capturer>>>,
|
||||
) -> Result<()> {
|
||||
// GameStream capture/encode thread: apply Windows session tuning (no-op off Windows).
|
||||
@@ -66,6 +79,10 @@ fn run(
|
||||
encode::validate_dimensions(cfg.codec, cfg.width, cfg.height)
|
||||
.context("client-requested video mode")?;
|
||||
let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?;
|
||||
// Grow SO_SNDBUF/RCVBUF (avoid host-side ENOBUFS at high bitrate) like the native plane, and
|
||||
// opt-in DSCP/QoS-tag this as the video class (PUNKTFUNK_DSCP=1).
|
||||
punktfunk_core::transport::grow_socket_buffers(&sock);
|
||||
punktfunk_core::transport::set_media_qos(&sock, punktfunk_core::transport::MediaClass::Video);
|
||||
// The client pings the video port so we learn where to send; it re-pings until video
|
||||
// flows, so a missed early ping is fine.
|
||||
sock.set_read_timeout(Some(Duration::from_secs(10)))?;
|
||||
@@ -115,7 +132,7 @@ fn run(
|
||||
let mut capturer =
|
||||
capture::capture_virtual_output(vout).context("capture virtual output")?;
|
||||
capturer.set_active(true);
|
||||
return stream_body(&mut *capturer, &sock, cfg, running, force_idr);
|
||||
return stream_body(&mut *capturer, &sock, cfg, running, force_idr, rfi_range);
|
||||
}
|
||||
|
||||
// Reuse the persistent capturer (one screencast session → clean reconnect); create it on
|
||||
@@ -135,7 +152,7 @@ fn run(
|
||||
}
|
||||
};
|
||||
capturer.set_active(true);
|
||||
let result = stream_body(&mut *capturer, &sock, cfg, running, force_idr);
|
||||
let result = stream_body(&mut *capturer, &sock, cfg, running, force_idr, rfi_range);
|
||||
capturer.set_active(false);
|
||||
*video_cap.lock().unwrap() = Some(capturer);
|
||||
result
|
||||
@@ -275,6 +292,7 @@ fn stream_body(
|
||||
cfg: StreamConfig,
|
||||
running: &Arc<AtomicBool>,
|
||||
force_idr: &AtomicBool,
|
||||
rfi_range: &std::sync::Mutex<Option<(i64, i64)>>,
|
||||
) -> Result<()> {
|
||||
// The first frame establishes the authoritative size/format for the encoder.
|
||||
let mut frame = capturer.next_frame().context("capture first frame")?;
|
||||
@@ -349,8 +367,16 @@ fn stream_body(
|
||||
uniq += 1;
|
||||
}
|
||||
let t_cap = tick.elapsed();
|
||||
// Honor a client recovery request (RFI / request-IDR): force a keyframe so the client
|
||||
// resyncs immediately instead of waiting for the next GOP boundary.
|
||||
// Honor a client recovery request. Prefer reference-frame invalidation (the encoder
|
||||
// re-references an older still-valid frame — no costly IDR spike); if the encoder can't
|
||||
// invalidate (range too old, or no NVENC RFI) it returns false and we force a keyframe.
|
||||
if let Some((first, last)) = rfi_range.lock().unwrap().take() {
|
||||
if !enc.invalidate_ref_frames(first, last) {
|
||||
enc.request_keyframe();
|
||||
}
|
||||
}
|
||||
// An explicit IDR request (or a rangeless RFI) forces a keyframe so the client resyncs
|
||||
// immediately instead of waiting for the next GOP boundary.
|
||||
if force_idr.swap(false, Ordering::SeqCst) {
|
||||
enc.request_keyframe();
|
||||
}
|
||||
|
||||
@@ -1,17 +1,88 @@
|
||||
//! TLS for the HTTPS nvhttp port (47984). Moonlight does **mutual TLS** — it presents its
|
||||
//! client cert and expects the server to request one — so a plain server-auth config makes
|
||||
//! the post-pairing `pairchallenge` fail. This config requests the client cert and verifies
|
||||
//! the client owns its key, but (for now) accepts any well-formed cert; enforcing the
|
||||
//! paired allow-list (rejecting unpaired clients on /launch) is a follow-up hardening step.
|
||||
//! TLS for the HTTPS nvhttp port (47984) and the management API. Moonlight does **mutual TLS** —
|
||||
//! it presents its client cert and expects the server to request one — so a plain server-auth
|
||||
//! config makes the post-pairing `pairchallenge` fail. This config requests the client cert and
|
||||
//! verifies the client owns its key, but accepts any well-formed cert at the *handshake* (the
|
||||
//! pairing ceremony is the real proof of identity). Authorization against the paired allow-list is
|
||||
//! then enforced per-request: [`serve_https`] reads the verified peer cert and attaches its
|
||||
//! fingerprint ([`PeerCertFingerprint`]) to each request, and the nvhttp/mgmt handlers reject
|
||||
//! callers whose fingerprint is not pinned (mirroring Apollo's post-handshake `get_verified_cert`).
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use axum::Router;
|
||||
use rustls::client::danger::HandshakeSignatureValid;
|
||||
use rustls::crypto::{verify_tls12_signature, verify_tls13_signature, CryptoProvider};
|
||||
use rustls::pki_types::{CertificateDer, UnixTime};
|
||||
use rustls::server::danger::{ClientCertVerified, ClientCertVerifier};
|
||||
use rustls::{DigitallySignedStruct, DistinguishedName, ServerConfig, SignatureScheme};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// SHA-256 of the peer's client certificate (hex), injected per-connection into each request's
|
||||
/// extensions by [`serve_https`]; `None` when the peer presented no client cert (plain HTTP, or a
|
||||
/// browser falling back to a bearer token). Handlers authorize a request whose fingerprint is in
|
||||
/// the paired store.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct PeerCertFingerprint(pub Option<String>);
|
||||
|
||||
/// HTTPS server that surfaces the verified client cert to handlers. `axum_server` can't expose the
|
||||
/// peer cert, so this runs the rustls handshake itself (tokio-rustls), reads the peer certificate,
|
||||
/// and serves the axum `Router` over hyper with the peer's fingerprint attached to every request as
|
||||
/// a [`PeerCertFingerprint`] extension. Shared by the nvhttp HTTPS listener and the management API.
|
||||
pub(crate) async fn serve_https(
|
||||
bind: SocketAddr,
|
||||
app: Router,
|
||||
tls: Arc<ServerConfig>,
|
||||
) -> Result<()> {
|
||||
use tower::ServiceExt;
|
||||
let acceptor = tokio_rustls::TlsAcceptor::from(tls);
|
||||
let listener = tokio::net::TcpListener::bind(bind)
|
||||
.await
|
||||
.with_context(|| format!("bind HTTPS {bind}"))?;
|
||||
loop {
|
||||
let (tcp, _peer) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "HTTPS accept failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let acceptor = acceptor.clone();
|
||||
let app = app.clone();
|
||||
tokio::spawn(async move {
|
||||
let tls_stream = match acceptor.accept(tcp).await {
|
||||
Ok(s) => s,
|
||||
// A failed handshake is routine (port scan, a browser bailing on the self-signed
|
||||
// cert, a peer that hung up) — not fatal.
|
||||
Err(_) => return,
|
||||
};
|
||||
// The verified peer cert (the verifier accepts any well-formed one; handlers authorize
|
||||
// by fingerprint) → its SHA-256, matched against the paired store.
|
||||
let fp = tls_stream
|
||||
.get_ref()
|
||||
.1
|
||||
.peer_certificates()
|
||||
.and_then(|c| c.first())
|
||||
.map(|c| hex::encode(punktfunk_core::quic::endpoint::cert_fingerprint(c.as_ref())));
|
||||
let peer = PeerCertFingerprint(fp);
|
||||
let svc =
|
||||
hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
|
||||
let app = app.clone();
|
||||
let peer = peer.clone();
|
||||
async move {
|
||||
let mut req = req.map(axum::body::Body::new);
|
||||
req.extensions_mut().insert(peer);
|
||||
app.oneshot(req).await // Router error is Infallible
|
||||
}
|
||||
});
|
||||
let io = hyper_util::rt::TokioIo::new(tls_stream);
|
||||
let _ =
|
||||
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
|
||||
.serve_connection_with_upgrades(io, svc)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests + signature-checks the client cert but accepts any (the pairing handshake is
|
||||
/// the real proof). Pinning to the paired set is a hardening follow-up.
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
//! keysyms correctly.
|
||||
|
||||
use anyhow::Result;
|
||||
use punktfunk_core::input::InputEvent;
|
||||
use punktfunk_core::input::{InputEvent, InputKind};
|
||||
|
||||
/// Injects input events into the host session. Not `Send`: an injector owns compositor
|
||||
/// resources (a Wayland connection, an xkb state) and lives entirely on the control thread
|
||||
@@ -127,6 +127,133 @@ pub fn default_backend() -> Backend {
|
||||
}
|
||||
}
|
||||
|
||||
/// Host-lifetime pointer/keyboard injector running on its OWN thread, fed over a clonable `Send`
|
||||
/// channel. The injector backend owns non-`Send` compositor state (a Wayland connection / xkb / EIS
|
||||
/// socket), so it must live on a single thread; both the GameStream control plane and the native
|
||||
/// punktfunk/1 plane forward their decoded keyboard/mouse events here instead of injecting inline, so
|
||||
/// a slow inject (a portal stall, a desktop switch) never head-blocks the network thread's
|
||||
/// keepalive/retransmit servicing.
|
||||
pub(crate) struct InjectorService {
|
||||
tx: std::sync::mpsc::Sender<InputEvent>,
|
||||
}
|
||||
|
||||
impl InjectorService {
|
||||
pub(crate) fn start() -> InjectorService {
|
||||
let (tx, rx) = std::sync::mpsc::channel::<InputEvent>();
|
||||
if let Err(e) = std::thread::Builder::new()
|
||||
.name("punktfunk-injector".into())
|
||||
.spawn(move || injector_service_thread(rx))
|
||||
{
|
||||
tracing::error!(error = %e, "injector service thread spawn failed — pointer/keyboard input disabled");
|
||||
}
|
||||
InjectorService { tx }
|
||||
}
|
||||
|
||||
/// A sender a session/plane forwards its pointer/keyboard events to. Cloned per caller; dropping a
|
||||
/// clone does NOT stop the service (it runs while any sender — incl. the service's own — lives).
|
||||
pub(crate) fn sender(&self) -> std::sync::mpsc::Sender<InputEvent> {
|
||||
self.tx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Backoff between reopen attempts after the injector backend fails to open or its worker dies, so a
|
||||
/// persistently-unavailable portal isn't hammered once per event.
|
||||
const INJECTOR_REOPEN_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
|
||||
|
||||
/// The host-lifetime injector worker: lazily open the pointer/keyboard backend, then inject every
|
||||
/// forwarded event. Reopen (after [`INJECTOR_REOPEN_BACKOFF`]) on open failure, on a backend change
|
||||
/// (input follows the active session), or if the backend's worker dies mid-stream. Exits only when
|
||||
/// every sender has dropped (host shutdown), which drops the injector and closes its portal session.
|
||||
///
|
||||
/// Each wake drains the whole backlog and [`coalesce`]s redundant motion before injecting, so a slow
|
||||
/// backend never builds up a queue of stale relative-mouse/scroll events (latency) — while button,
|
||||
/// key, and absolute-move ordering is preserved exactly.
|
||||
fn injector_service_thread(rx: std::sync::mpsc::Receiver<InputEvent>) {
|
||||
let mut injector: Option<Box<dyn InputInjector>> = None;
|
||||
let mut open_backend: Option<Backend> = None;
|
||||
let mut last_failed: Option<std::time::Instant> = None;
|
||||
while let Ok(first) = rx.recv() {
|
||||
// Drain everything already queued behind `first` so we coalesce a whole burst at once.
|
||||
let mut batch = vec![first];
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
batch.push(ev);
|
||||
}
|
||||
|
||||
// The resolved input backend (PUNKTFUNK_INPUT_BACKEND, set per connect / mid-stream session
|
||||
// switch) may have changed since we opened. Reopen against it so input FOLLOWS the active
|
||||
// session instead of injecting into a stale, still-warm backend (e.g. the managed gamescope's
|
||||
// EIS socket after the user switched to the KDE desktop).
|
||||
let want = default_backend();
|
||||
if injector.is_some() && open_backend != Some(want) {
|
||||
tracing::info!(
|
||||
?open_backend,
|
||||
?want,
|
||||
"input: backend changed — reopening injector for the active session"
|
||||
);
|
||||
injector = None;
|
||||
last_failed = None; // re-resolve immediately
|
||||
}
|
||||
if injector.is_none() {
|
||||
// Open on the first event; after a failure wait out the backoff before retrying (a few
|
||||
// events drop during setup — acceptable, input is lossy).
|
||||
let ready = last_failed.is_none_or(|t| t.elapsed() >= INJECTOR_REOPEN_BACKOFF);
|
||||
if ready {
|
||||
match open(want) {
|
||||
Ok(i) => {
|
||||
tracing::info!(backend = ?want, "input injector ready (host-lifetime)");
|
||||
injector = Some(i);
|
||||
open_backend = Some(want);
|
||||
last_failed = None;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), "pointer/keyboard injection unavailable — will retry");
|
||||
last_failed = Some(std::time::Instant::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(inj) = injector.as_mut() {
|
||||
for ev in coalesce(batch) {
|
||||
if let Err(e) = inj.inject(&ev) {
|
||||
// The backend's worker (portal session / EIS socket) died — drop it and reopen on
|
||||
// a later event (covers a gamescope EIS socket that respawns with its session).
|
||||
tracing::warn!(error = %format!("{e:#}"), "inject failed — reopening injector");
|
||||
injector = None;
|
||||
open_backend = None;
|
||||
last_failed = Some(std::time::Instant::now());
|
||||
break; // abandon the rest of this batch; the next one reopens
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::debug!("injector service stopped (host shutting down)");
|
||||
}
|
||||
|
||||
/// Coalesce a drained burst: sum consecutive relative-mouse deltas and consecutive same-axis scroll
|
||||
/// deltas (identical net effect, far fewer injects), passing buttons, keys, absolute moves, and any
|
||||
/// type change through untouched and in order. Only *adjacent* same-type events merge, so a button
|
||||
/// or key between two moves flushes the accumulated motion first — ordering is never reshuffled.
|
||||
fn coalesce(events: Vec<InputEvent>) -> Vec<InputEvent> {
|
||||
let mut out: Vec<InputEvent> = Vec::with_capacity(events.len());
|
||||
for ev in events {
|
||||
match out.last_mut() {
|
||||
Some(last) if last.kind == InputKind::MouseMove && ev.kind == InputKind::MouseMove => {
|
||||
last.x = last.x.saturating_add(ev.x);
|
||||
last.y = last.y.saturating_add(ev.y);
|
||||
}
|
||||
Some(last)
|
||||
if last.kind == InputKind::MouseScroll
|
||||
&& ev.kind == InputKind::MouseScroll
|
||||
&& last.code == ev.code =>
|
||||
{
|
||||
last.x = last.x.saturating_add(ev.x);
|
||||
}
|
||||
_ => out.push(ev),
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// How the libei backend reaches its EIS server. KWin goes through the `RemoteDesktop` *portal*
|
||||
/// (with a pre-seeded grant), but GNOME's portal `Start()` needs an interactive approval a
|
||||
/// headless host can't answer — so GNOME goes straight to Mutter's *direct* RemoteDesktop EIS
|
||||
@@ -321,3 +448,57 @@ mod libei;
|
||||
mod sendinput;
|
||||
#[cfg(target_os = "linux")]
|
||||
mod wlr;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn mk(kind: InputKind, code: u32, x: i32, y: i32) -> InputEvent {
|
||||
InputEvent {
|
||||
kind,
|
||||
_pad: [0; 3],
|
||||
code,
|
||||
x,
|
||||
y,
|
||||
flags: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn coalesce_sums_adjacent_motion_and_preserves_order() {
|
||||
let events = vec![
|
||||
mk(InputKind::MouseMove, 0, 1, 2),
|
||||
mk(InputKind::MouseMove, 0, 3, -1), // → summed with the previous move
|
||||
mk(InputKind::KeyDown, 30, 0, 0), // flushes the move, passes through verbatim
|
||||
mk(InputKind::MouseMove, 0, 5, 5), // a NEW run after the key (not merged across it)
|
||||
mk(InputKind::MouseScroll, 0, 1, 0),
|
||||
mk(InputKind::MouseScroll, 0, 2, 0), // same axis (code 0) → summed
|
||||
mk(InputKind::MouseScroll, 1, 1, 0), // different axis (code 1) → separate
|
||||
];
|
||||
let out = coalesce(events);
|
||||
assert_eq!(out.len(), 5);
|
||||
assert_eq!(
|
||||
(out[0].kind, out[0].x, out[0].y),
|
||||
(InputKind::MouseMove, 4, 1)
|
||||
);
|
||||
assert_eq!(out[1].kind, InputKind::KeyDown);
|
||||
assert_eq!(
|
||||
(out[2].kind, out[2].x, out[2].y),
|
||||
(InputKind::MouseMove, 5, 5)
|
||||
);
|
||||
assert_eq!(
|
||||
(out[3].kind, out[3].code, out[3].x),
|
||||
(InputKind::MouseScroll, 0, 3)
|
||||
);
|
||||
assert_eq!(
|
||||
(out[4].kind, out[4].code, out[4].x),
|
||||
(InputKind::MouseScroll, 1, 1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn coalesce_handles_empty_and_singleton() {
|
||||
assert!(coalesce(vec![]).is_empty());
|
||||
assert_eq!(coalesce(vec![mk(InputKind::MouseMove, 0, 7, 8)]).len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
use crate::encode::Codec;
|
||||
use crate::gamestream::{
|
||||
tls::{serve_https, PeerCertFingerprint},
|
||||
AppState, APP_VERSION, AUDIO_PORT, CONTROL_PORT, GFE_VERSION, RTSP_PORT, VIDEO_PORT,
|
||||
};
|
||||
use anyhow::{Context, Result};
|
||||
@@ -103,66 +104,6 @@ pub async fn run(
|
||||
serve_https(opts.bind, app, tls).await
|
||||
}
|
||||
|
||||
/// SHA-256 of the peer's client certificate (hex), injected per-connection into each request's
|
||||
/// extensions by [`serve_https`]; `None` when the peer presented no client cert. `require_auth`
|
||||
/// authorizes a request whose fingerprint is in the paired store.
|
||||
#[derive(Clone)]
|
||||
struct PeerCertFingerprint(Option<String>);
|
||||
|
||||
/// HTTPS server for the mgmt API. axum-server can't surface the client cert to a handler, so this
|
||||
/// runs the rustls handshake itself (via tokio-rustls), reads the verified peer certificate, and
|
||||
/// serves the axum `Router` over hyper with the peer's fingerprint attached to every request.
|
||||
async fn serve_https(bind: SocketAddr, app: Router, tls: Arc<rustls::ServerConfig>) -> Result<()> {
|
||||
use tower::ServiceExt;
|
||||
let acceptor = tokio_rustls::TlsAcceptor::from(tls);
|
||||
let listener = tokio::net::TcpListener::bind(bind)
|
||||
.await
|
||||
.with_context(|| format!("bind management API {bind}"))?;
|
||||
loop {
|
||||
let (tcp, _peer) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "management API accept failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let acceptor = acceptor.clone();
|
||||
let app = app.clone();
|
||||
tokio::spawn(async move {
|
||||
let tls_stream = match acceptor.accept(tcp).await {
|
||||
Ok(s) => s,
|
||||
// A failed handshake is routine (port scan, a browser bailing on the self-signed
|
||||
// cert, a client cert we'd still accept but the peer hung up) — not fatal.
|
||||
Err(_) => return,
|
||||
};
|
||||
// The verified peer cert (the verifier accepts any well-formed one; we authorize by
|
||||
// fingerprint in the auth layer) → its SHA-256, matched against the paired store.
|
||||
let fp = tls_stream
|
||||
.get_ref()
|
||||
.1
|
||||
.peer_certificates()
|
||||
.and_then(|c| c.first())
|
||||
.map(|c| hex::encode(punktfunk_core::quic::endpoint::cert_fingerprint(c.as_ref())));
|
||||
let peer = PeerCertFingerprint(fp);
|
||||
let svc =
|
||||
hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
|
||||
let app = app.clone();
|
||||
let peer = peer.clone();
|
||||
async move {
|
||||
let mut req = req.map(axum::body::Body::new);
|
||||
req.extensions_mut().insert(peer);
|
||||
app.oneshot(req).await // Router error is Infallible
|
||||
}
|
||||
});
|
||||
let io = hyper_util::rt::TokioIo::new(tls_stream);
|
||||
let _ =
|
||||
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
|
||||
.serve_connection_with_upgrades(io, svc)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Compose the full management router (also used directly by the handler tests).
|
||||
fn app(
|
||||
state: Arc<AppState>,
|
||||
|
||||
@@ -200,7 +200,7 @@ pub(crate) async fn serve(opts: Punktfunk1Options, np: Arc<NativePairing>) -> Re
|
||||
// RemoteDesktop-portal grant is established ONCE and reused, instead of a CreateSession per
|
||||
// session — which, under rapid client reconnects, raced a prior session's portal teardown and
|
||||
// wedged KWin's EIS setup ("EIS setup timed out"). Gamepads stay per-session (uinput).
|
||||
let injector = InjectorService::start();
|
||||
let injector = crate::inject::InjectorService::start();
|
||||
// One virtual microphone for the whole host lifetime (see MicService): the client's mic uplink
|
||||
// (0xCB) is Opus-decoded and fed into a persistent virtual mic host apps record from (Linux
|
||||
// PipeWire Audio/Source; Windows a virtual audio device's render endpoint).
|
||||
@@ -1028,103 +1028,10 @@ impl PadState {
|
||||
/// actual pad creation at its own MAX_PADS.
|
||||
const MAX_WIRE_PADS: usize = 16;
|
||||
|
||||
/// Host-lifetime pointer/keyboard injector, shared across punktfunk/1 sessions.
|
||||
///
|
||||
/// The injector backend (libei/RemoteDesktop on KWin/GNOME, gamescope's EIS, wlr, uinput) owns
|
||||
/// compositor resources and is `!Send`, so — unlike the audio capturer — it can't be handed
|
||||
/// between per-session threads through a slot. Instead one host-lifetime thread *owns* it and
|
||||
/// injects events forwarded over a clonable `Send` channel. Opening it ONCE means the privileged
|
||||
/// RemoteDesktop-portal grant is established once and held for the whole run, eliminating the
|
||||
/// per-session `CreateSession` churn that wedged KWin's EIS setup (rapid client reconnects raced
|
||||
/// a prior session's portal teardown — "EIS setup timed out"). The service opens lazily on the
|
||||
/// first event and reopens, after a backoff, if injection fails — so a transient portal hiccup,
|
||||
/// or a gamescope EIS socket that respawns with its nested session, self-heals.
|
||||
struct InjectorService {
|
||||
tx: std::sync::mpsc::Sender<InputEvent>,
|
||||
}
|
||||
|
||||
impl InjectorService {
|
||||
fn start() -> InjectorService {
|
||||
let (tx, rx) = std::sync::mpsc::channel::<InputEvent>();
|
||||
if let Err(e) = std::thread::Builder::new()
|
||||
.name("punktfunk1-injector".into())
|
||||
.spawn(move || injector_service_thread(rx))
|
||||
{
|
||||
tracing::error!(error = %e, "injector service thread spawn failed — pointer/keyboard input disabled");
|
||||
}
|
||||
InjectorService { tx }
|
||||
}
|
||||
|
||||
/// A sender a session forwards its pointer/keyboard events to. Cloned per session; dropping a
|
||||
/// clone does NOT stop the service (the service holds the original sender for the host life).
|
||||
fn sender(&self) -> std::sync::mpsc::Sender<InputEvent> {
|
||||
self.tx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Backoff between reopen attempts after the injector backend fails to open or its worker dies,
|
||||
/// so a persistently-unavailable portal isn't hammered once per event.
|
||||
/// Backoff between reopen attempts after a host-lifetime service's backend (the mic source, a
|
||||
/// capturer) fails to open or its worker dies, so a persistently-unavailable resource isn't hammered.
|
||||
const INJECTOR_REOPEN_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
|
||||
|
||||
/// The host-lifetime injector worker: lazily open the pointer/keyboard backend, then inject every
|
||||
/// forwarded event into it. Reopen (after [`INJECTOR_REOPEN_BACKOFF`]) on open failure or if the
|
||||
/// backend's worker dies mid-stream. Exits only when every session sender *and* the service's own
|
||||
/// sender have dropped (host shutdown), which drops the injector and closes its portal session.
|
||||
fn injector_service_thread(rx: std::sync::mpsc::Receiver<InputEvent>) {
|
||||
let mut injector: Option<Box<dyn crate::inject::InputInjector>> = None;
|
||||
let mut open_backend: Option<crate::inject::Backend> = None;
|
||||
let mut last_failed: Option<std::time::Instant> = None;
|
||||
for ev in rx {
|
||||
// The resolved input backend (PUNKTFUNK_INPUT_BACKEND, set per connect by apply_input_env,
|
||||
// also on a mid-stream session switch) may have changed since we opened. Reopen against it
|
||||
// so input FOLLOWS the active session instead of injecting into a stale, still-warm backend
|
||||
// (e.g. the managed gamescope's EIS socket after the user switched to the KDE desktop).
|
||||
let want = crate::inject::default_backend();
|
||||
if injector.is_some() && open_backend != Some(want) {
|
||||
tracing::info!(
|
||||
?open_backend,
|
||||
?want,
|
||||
"input: backend changed — reopening injector for the active session"
|
||||
);
|
||||
injector = None;
|
||||
last_failed = None; // re-resolve immediately
|
||||
}
|
||||
if injector.is_none() {
|
||||
// Open on the first event; after a failure wait out the backoff before retrying (a
|
||||
// few events drop during setup — acceptable, input is lossy).
|
||||
let ready = last_failed.is_none_or(|t| t.elapsed() >= INJECTOR_REOPEN_BACKOFF);
|
||||
if ready {
|
||||
match crate::inject::open(want) {
|
||||
Ok(i) => {
|
||||
tracing::info!(
|
||||
backend = ?want,
|
||||
"punktfunk/1 input injector ready (host-lifetime)"
|
||||
);
|
||||
injector = Some(i);
|
||||
open_backend = Some(want);
|
||||
last_failed = None;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %format!("{e:#}"), "pointer/keyboard injection unavailable — will retry");
|
||||
last_failed = Some(std::time::Instant::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(inj) = injector.as_mut() {
|
||||
if let Err(e) = inj.inject(&ev) {
|
||||
// The backend's worker (portal session / EIS socket) died — drop it and reopen on
|
||||
// a later event (covers a gamescope EIS socket that respawns with its session).
|
||||
tracing::warn!(error = %format!("{e:#}"), "inject failed — reopening injector");
|
||||
injector = None;
|
||||
open_backend = None;
|
||||
last_failed = Some(std::time::Instant::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::debug!("injector service stopped (host shutting down)");
|
||||
}
|
||||
|
||||
/// Mic is 48 kHz stereo — matches the Opus stereo decoder and the host→client audio layout.
|
||||
const MIC_CHANNELS: u32 = 2;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user