fix(m3): persistent host-lifetime input injector — end the RemoteDesktop portal churn
ci / rust (push) Has been cancelled

Under rapid client reconnects, KWin's libei/EIS input setup intermittently wedged
with "EIS setup timed out", causing total input loss for affected sessions. Root
cause: each punktfunk/1 session opened (and tore down) its own RemoteDesktop-portal
CreateSession for pointer/keyboard injection, and back-to-back reconnects raced a
prior session's portal teardown before it settled.

LibeiInjector is only a Send channel handle to a worker thread that owns the portal
session, so the injector can live for the whole host run instead of per session.
Adds InjectorService: one host-lifetime thread owns the (!Send) injector, opened
ONCE (lazily, on the first event) and reused across every session — the portal grant
is established a single time and held. Sessions forward pointer/keyboard events to it
over a clonable Send channel; gamepads stay per-session (uinput, no portal). The
service self-heals — reopen after a 2s backoff if open fails or the backend worker
dies (covers a gamescope EIS socket that respawns with its nested session).

Mirrors the existing host-lifetime audio-capturer slot; the audio capturer is Send
(a slot works), the injector is !Send (needs the owning thread + channel).

Validated live on headless KWin: 8 rapid back-to-back input sessions →
"input injector ready (host-lifetime)" exactly once, ZERO "EIS setup timed out",
8/8 sessions injected input. Tests green, clippy/fmt clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 21:30:52 +00:00
parent a730ca8557
commit a03aae891e
+112 -30
View File
@@ -180,6 +180,11 @@ async fn serve(opts: M3Options) -> Result<()> {
// One audio capturer for the whole host lifetime, handed from session to session
// (avoids a PipeWire stream setup per session — see AudioCapSlot).
let audio_cap: AudioCapSlot = Arc::new(std::sync::Mutex::new(None));
// One pointer/keyboard injector for the whole host lifetime (see InjectorService): the
// RemoteDesktop-portal grant is established ONCE and reused, instead of a CreateSession per
// session — which, under rapid client reconnects, raced a prior session's portal teardown and
// wedged KWin's EIS setup ("EIS setup timed out"). Gamepads stay per-session (uinput).
let injector = InjectorService::start();
let paired_at = match &opts.paired_store {
Some(p) => p.clone(),
None => paired_path()?,
@@ -227,6 +232,7 @@ async fn serve(opts: M3Options) -> Result<()> {
conn,
&opts,
&audio_cap,
injector.sender(),
&fingerprint,
&paired,
&last_pairing,
@@ -335,10 +341,15 @@ async fn pair_ceremony(
/// One client session: handshake → input/audio planes → data plane until done/disconnect.
/// Everything torn down on return (RAII: virtual output, encoder, threads via channel close).
/// A connection whose first message is a PairRequest runs the pairing ceremony instead.
// Each argument is a distinct host-lifetime handle threaded from `serve` (config, the audio +
// injector services, the trust store, pairing state) — bundling them into a context struct would
// obscure more than it'd save.
#[allow(clippy::too_many_arguments)]
async fn serve_session(
conn: quinn::Connection,
opts: &M3Options,
audio_cap: &AudioCapSlot,
inj_tx: std::sync::mpsc::Sender<InputEvent>,
host_fp: &[u8; 32],
paired: &PairedStore,
last_pairing: &std::sync::Mutex<Option<std::time::Instant>>,
@@ -497,16 +508,17 @@ async fn serve_session(
}
});
// Input plane: QUIC datagrams → channel → a native injector thread (the injector owns
// non-Send compositor state, so it lives on its own thread). The thread also owns the
// session's virtual gamepads and sends force feedback back over `conn`. It exits when
// the channel closes (datagram task ends on disconnect) — fresh state per session.
// Input plane: QUIC datagrams → channel → a native per-session thread. Pointer/keyboard
// events are forwarded to the host-lifetime [`InjectorService`] (`inj_tx`) so the portal
// grant persists across sessions; this thread owns the session's virtual gamepads (uinput,
// per-session) and sends force feedback back over `conn`. It exits when the channel closes
// (datagram task ends on disconnect) — fresh gamepad state per session.
let (input_tx, input_rx) = std::sync::mpsc::channel::<InputEvent>();
let input_handle = {
let conn = conn.clone();
std::thread::Builder::new()
.name("punktfunk-m3-input".into())
.spawn(move || input_thread(input_rx, conn))
.spawn(move || input_thread(input_rx, conn, inj_tx))
.context("spawn input thread")?
};
let input_conn = conn.clone();
@@ -665,12 +677,97 @@ impl PadState {
/// actual pad creation at its own MAX_PADS.
const MAX_WIRE_PADS: usize = 16;
/// The injector thread: open the session's input backend on first event, then inject.
/// Gamepad kinds route to the session's [`GamepadManager`](crate::inject::gamepad), with
/// force feedback pumped between events and sent back as rumble datagrams.
fn input_thread(rx: std::sync::mpsc::Receiver<InputEvent>, conn: quinn::Connection) {
/// Host-lifetime pointer/keyboard injector, shared across punktfunk/1 sessions.
///
/// The injector backend (libei/RemoteDesktop on KWin/GNOME, gamescope's EIS, wlr, uinput) owns
/// compositor resources and is `!Send`, so — unlike the audio capturer — it can't be handed
/// between per-session threads through a slot. Instead one host-lifetime thread *owns* it and
/// injects events forwarded over a clonable `Send` channel. Opening it ONCE means the privileged
/// RemoteDesktop-portal grant is established once and held for the whole run, eliminating the
/// per-session `CreateSession` churn that wedged KWin's EIS setup (rapid client reconnects raced
/// a prior session's portal teardown — "EIS setup timed out"). The service opens lazily on the
/// first event and reopens, after a backoff, if injection fails — so a transient portal hiccup,
/// or a gamescope EIS socket that respawns with its nested session, self-heals.
struct InjectorService {
tx: std::sync::mpsc::Sender<InputEvent>,
}
impl InjectorService {
fn start() -> InjectorService {
let (tx, rx) = std::sync::mpsc::channel::<InputEvent>();
if let Err(e) = std::thread::Builder::new()
.name("punktfunk-m3-injector".into())
.spawn(move || injector_service_thread(rx))
{
tracing::error!(error = %e, "injector service thread spawn failed — pointer/keyboard input disabled");
}
InjectorService { tx }
}
/// A sender a session forwards its pointer/keyboard events to. Cloned per session; dropping a
/// clone does NOT stop the service (the service holds the original sender for the host life).
fn sender(&self) -> std::sync::mpsc::Sender<InputEvent> {
self.tx.clone()
}
}
/// Backoff between reopen attempts after the injector backend fails to open or its worker dies,
/// so a persistently-unavailable portal isn't hammered once per event.
const INJECTOR_REOPEN_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
/// The host-lifetime injector worker: lazily open the pointer/keyboard backend, then inject every
/// forwarded event into it. Reopen (after [`INJECTOR_REOPEN_BACKOFF`]) on open failure or if the
/// backend's worker dies mid-stream. Exits only when every session sender *and* the service's own
/// sender have dropped (host shutdown), which drops the injector and closes its portal session.
fn injector_service_thread(rx: std::sync::mpsc::Receiver<InputEvent>) {
let mut injector: Option<Box<dyn crate::inject::InputInjector>> = None;
let mut injector_broken = false;
let mut last_failed: Option<std::time::Instant> = None;
for ev in rx {
if injector.is_none() {
// Open on the first event; after a failure wait out the backoff before retrying (a
// few events drop during setup — acceptable, input is lossy).
let ready = last_failed.is_none_or(|t| t.elapsed() >= INJECTOR_REOPEN_BACKOFF);
if ready {
let backend = crate::inject::default_backend();
match crate::inject::open(backend) {
Ok(i) => {
tracing::info!(
?backend,
"punktfunk/1 input injector ready (host-lifetime)"
);
injector = Some(i);
last_failed = None;
}
Err(e) => {
tracing::error!(error = %format!("{e:#}"), "pointer/keyboard injection unavailable — will retry");
last_failed = Some(std::time::Instant::now());
}
}
}
}
if let Some(inj) = injector.as_mut() {
if let Err(e) = inj.inject(&ev) {
// The backend's worker (portal session / EIS socket) died — drop it and reopen on
// a later event (covers a gamescope EIS socket that respawns with its session).
tracing::warn!(error = %format!("{e:#}"), "inject failed — reopening injector");
injector = None;
last_failed = Some(std::time::Instant::now());
}
}
}
tracing::debug!("injector service stopped (host shutting down)");
}
/// The per-session input thread: route pointer/keyboard events to the host-lifetime injector
/// service (`inj_tx`) and gamepad events to this session's own [`GamepadManager`]
/// (crate::inject::gamepad), with force feedback pumped between events and sent back as rumble
/// datagrams. The gamepads (uinput) are created and torn down with the session; the
/// pointer/keyboard injector (and its portal grant) lives in the service, across sessions.
fn input_thread(
rx: std::sync::mpsc::Receiver<InputEvent>,
conn: quinn::Connection,
inj_tx: std::sync::mpsc::Sender<InputEvent>,
) {
let mut pads = crate::inject::gamepad::GamepadManager::new();
let mut pad_state = [PadState::default(); MAX_WIRE_PADS];
let mut pad_mask = 0u16;
@@ -693,26 +790,11 @@ fn input_thread(rx: std::sync::mpsc::Receiver<InputEvent>, conn: quinn::Connecti
pads.handle(&crate::gamestream::gamepad::GamepadEvent::State(frame));
}
_ => {
if injector.is_none() && !injector_broken {
let backend = crate::inject::default_backend();
match crate::inject::open(backend) {
Ok(i) => {
tracing::info!(?backend, "punktfunk/1 input injector opened");
injector = Some(i);
}
Err(e) => {
// Keep running for gamepads — uinput pads work even when
// the pointer/keyboard backend doesn't.
tracing::error!(error = %format!("{e:#}"), "pointer/keyboard injection unavailable");
injector_broken = true;
}
}
}
if let Some(inj) = injector.as_mut() {
if let Err(e) = inj.inject(&ev) {
tracing::warn!(error = %format!("{e:#}"), "inject failed");
}
}
// Pointer/keyboard → the host-lifetime injector service (one persistent
// portal session for every punktfunk/1 session). A send error only means the
// service thread is gone (host shutting down) — dropping the event is fine,
// input is lossy by design.
let _ = inj_tx.send(ev);
}
},
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}