Files
punktfunk/crates/punktfunk-host/src/spike.rs
T
enricobuehler 9c8fa9340c
apple / swift (push) Failing after 40s
audit / cargo-audit (push) Failing after 1m12s
windows-msix / package (push) Successful in 1m37s
windows / build (push) Successful in 1m14s
android / android (push) Successful in 4m48s
ci / web (push) Successful in 27s
ci / rust (push) Successful in 4m21s
ci / docs-site (push) Successful in 31s
ci / bench (push) Successful in 4m39s
decky / build-publish (push) Successful in 11s
docker / build-push (--build-arg FEDORA_VERSION=44, ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora44-rpm) (push) Successful in 5s
docker / build-push (., web/Dockerfile, punktfunk-web) (push) Successful in 4s
docker / build-push (ci, ci/fedora-rpm.Dockerfile, punktfunk-fedora-rpm) (push) Successful in 4s
docker / build-push (ci, ci/rust-ci.Dockerfile, punktfunk-rust-ci) (push) Successful in 4s
docker / build-push (docs-site, docs-site/Dockerfile, punktfunk-docs) (push) Successful in 19s
deb / build-publish (push) Successful in 6m3s
flatpak / build-publish (push) Successful in 4m13s
rpm / build-publish (bazzite, punktfunk-fedora-rpm) (push) Successful in 8m15s
rpm / build-publish (fedora-44, punktfunk-fedora44-rpm) (push) Successful in 8m16s
docker / deploy-docs (push) Successful in 18s
refactor: drop milestone names + consolidate clients; loss-recovery & rumble fixes
Two bodies of work in one commit (the rename moved files the fixes also touched).

Naming/structure cleanup (pre-launch):
- Host modules m3.rs->punktfunk1.rs, m0.rs->spike.rs; CLI m3-host->punktfunk1-host,
  m0->spike; bare `punktfunk-host` now prints help. Types M3Options/M3Source->
  Punktfunk1Options/Punktfunk1Source.
- Clients consolidated out of crates/ into clients/: punktfunk-client-rs->
  clients/probe (crate punktfunk-probe), client-linux->clients/linux,
  client-windows->clients/windows, punktfunk-android->clients/android/native
  (crate punktfunk-client-android; kept [lib] name=punktfunk_android so the JNI
  contract is unchanged). crates/ now holds only core + host.
- Milestone codes M0-M4 purged from code/CLI/CLAUDE.md/README/docs/docs-site,
  kept only in docs/implementation-plan.md. docs/m2-plan.md->
  docs/gamestream-host-plan.md. CI/gradle/flatpak paths updated.

Client loss-recovery (video froze and never recovered after a brief drop):
- Export punktfunk_connection_frames_dropped through the C ABI (the core already
  tracked it for the client keyframe-recovery loop; it was never reachable from
  the ABI clients). Regenerated punktfunk_core.h.
- Apple (StreamPump + Stage2Pipeline) and Android (decode.rs) now poll
  frames_dropped and request a keyframe when it climbs -- the same loss-driven
  recovery Linux/Windows already had. Under infinite GOP the decoder silently
  conceals reference-missing frames, so the decode-error trigger rarely fires.

Apple rumble robustness (worked then went spotty -- DualSense + Xbox):
- Add CHHapticEngine stopped/reset handlers (rebuild on app background / audio
  interruption / server reset) and drop the permanent `broken` latch on a
  transient drive failure; latch only when the controller truly has no haptics.
- Surface swallowed SDL set_rumble errors on Linux/Windows + diagnostic logging.

Verified: cargo build/clippy/fmt --workspace, C-ABI harness, header drift.
Not runnable on this box (verify in CI): Gitea workflows, gradle/Android,
flatpak, Swift/decky.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 21:05:58 +00:00

267 lines
9.1 KiB
Rust

//! The pipeline spike (plan §8): capture → NVENC encode → playable file, with the
//! encoded access units also fed through a `punktfunk_core` host→client `Session` over an
//! in-process loopback to prove the core's FEC + packetize + reassemble path on real
//! encoder output.
//!
//! This is the spike runner, not the production host path: it drives the stages on one thread (the
//! per-stage-thread pipeline with bounded channels is [`crate::pipeline`]). Source is
//! either a synthetic BGRx test pattern (no capture session needed) or the live xdg
//! ScreenCast portal monitor.
use crate::capture::{self, Capturer, SyntheticCapturer};
use crate::encode::{self, Codec, EncodedFrame, Encoder};
use anyhow::{anyhow, Context, Result};
use punktfunk_core::packet::{FLAG_PIC, FLAG_SOF};
use punktfunk_core::{Config, Role, Session};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::time::Instant;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Source {
/// Deterministic moving BGRx test pattern — no capture session required.
Synthetic,
/// Live monitor via the xdg ScreenCast portal + PipeWire.
Portal,
/// KWin virtual output created at `width`x`height` (zkde_screencast). Lets us validate
/// capture (and zero-copy) at an arbitrary client resolution against a headless KWin.
KwinVirtual,
}
#[derive(Clone, Debug)]
pub struct Options {
pub source: Source,
/// Synthetic-only; the portal source uses the PipeWire-negotiated size.
pub width: u32,
pub height: u32,
pub fps: u32,
pub seconds: u32,
pub codec: Codec,
pub bitrate_bps: u64,
/// Raw Annex-B elementary-stream sink (`.h265`/`.h264`/`.ivf-less .obu`); playable.
pub out: PathBuf,
/// Also round-trip every AU through a `punktfunk_core` host→client loopback and verify.
pub loopback: bool,
}
pub fn run(opts: Options) -> Result<()> {
let mut capturer: Box<dyn Capturer> = match opts.source {
Source::Synthetic => {
tracing::info!(
width = opts.width,
height = opts.height,
fps = opts.fps,
"spike source: synthetic BGRx test pattern"
);
Box::new(SyntheticCapturer::new(opts.width, opts.height, opts.fps))
}
Source::Portal => {
tracing::info!("spike source: xdg ScreenCast portal (live monitor)");
capture::open_portal_monitor().context("open portal capturer")?
}
Source::KwinVirtual => {
let compositor = crate::vdisplay::detect().unwrap_or(crate::vdisplay::Compositor::Kwin);
tracing::info!(
width = opts.width,
height = opts.height,
?compositor,
"spike source: virtual output (PUNKTFUNK_COMPOSITOR)"
);
let mut vd = crate::vdisplay::open(compositor).context("open virtual display")?;
let vout = vd
.create(punktfunk_core::Mode {
width: opts.width,
height: opts.height,
refresh_hz: opts.fps,
})
.context("create virtual output")?;
capture::capture_virtual_output(vout).context("capture virtual output")?
}
};
// Activate the capturer so the portal/PipeWire process callback actually delivers frames
// (it gates the per-frame de-pad on `active`; idle by default so reconnects are cheap).
capturer.set_active(true);
// The first frame establishes the authoritative dimensions (the portal's negotiated
// size, or the synthetic size) used to configure the encoder.
let first = capturer.next_frame().context("capture first frame")?;
let (w, h) = (first.width, first.height);
tracing::info!(
width = w,
height = h,
format = ?first.format,
codec = ?opts.codec,
bitrate_bps = opts.bitrate_bps,
"opening NVENC encoder"
);
let mut encoder = encode::open_video(
opts.codec,
first.format,
w,
h,
opts.fps,
opts.bitrate_bps,
first.is_cuda(),
8, // spike synthetic harness: 8-bit
)
.context("open encoder")?;
let mut sink = BufWriter::new(
File::create(&opts.out).with_context(|| format!("create {}", opts.out.display()))?,
);
let mut lb = if opts.loopback {
Some(Loopback::new().context("build punktfunk-core loopback")?)
} else {
None
};
let target_frames = (opts.seconds as u64) * (opts.fps as u64);
let started = Instant::now();
let mut stats = Stats::default();
let mut frame = first;
loop {
encoder.submit(&frame).context("encoder submit")?;
stats.submitted += 1;
drain_encoder(encoder.as_mut(), &mut sink, lb.as_mut(), &mut stats)?;
if stats.submitted >= target_frames {
break;
}
frame = capturer.next_frame().context("capture frame")?;
}
// NVENC buffers frames internally even at delay=0 — flush and drain the tail.
encoder.flush().context("encoder flush")?;
drain_encoder(encoder.as_mut(), &mut sink, lb.as_mut(), &mut stats)?;
sink.flush().context("flush output file")?;
let elapsed = started.elapsed().as_secs_f64();
tracing::info!(
submitted = stats.submitted,
encoded = stats.encoded,
keyframes = stats.keyframes,
bytes_out = stats.bytes_out,
out = %opts.out.display(),
elapsed_s = format!("{elapsed:.2}"),
encode_fps = format!("{:.1}", stats.encoded as f64 / elapsed.max(1e-9)),
"spike capture→encode→file complete"
);
if let Some(lb) = lb {
lb.report();
if lb.mismatches > 0 || lb.recovered != lb.submitted {
return Err(anyhow!(
"punktfunk-core loopback verification FAILED: {} mismatches, {}/{} AUs recovered",
lb.mismatches,
lb.recovered,
lb.submitted
));
}
}
Ok(())
}
#[derive(Default)]
struct Stats {
submitted: u64,
encoded: u64,
keyframes: u64,
bytes_out: u64,
}
fn drain_encoder(
encoder: &mut dyn Encoder,
sink: &mut impl Write,
mut lb: Option<&mut Loopback>,
stats: &mut Stats,
) -> Result<()> {
while let Some(au) = encoder.poll().context("encoder poll")? {
sink.write_all(&au.data).context("write AU to file")?;
stats.encoded += 1;
stats.bytes_out += au.data.len() as u64;
if au.keyframe {
stats.keyframes += 1;
}
if let Some(lb) = lb.as_deref_mut() {
lb.submit(&au)?;
}
}
Ok(())
}
/// A host↔client `punktfunk_core` pair over a lossless in-process loopback. Each encoded AU is
/// FEC-protected, packetized, sent, then reassembled on the client and byte-compared to the
/// original — exercising the core on real encoder output (the spike "feed into a Session" goal).
struct Loopback {
host: Session,
client: Session,
submitted: u64,
recovered: u64,
mismatches: u64,
bytes: u64,
}
impl Loopback {
fn new() -> Result<Loopback> {
let (host_tx, client_tx) = punktfunk_core::transport::loopback_pair(0, 0);
let host = Session::new(Config::p1_defaults(Role::Host), Box::new(host_tx))
.map_err(|e| anyhow!("host session: {e:?}"))?;
let client = Session::new(Config::p1_defaults(Role::Client), Box::new(client_tx))
.map_err(|e| anyhow!("client session: {e:?}"))?;
Ok(Loopback {
host,
client,
submitted: 0,
recovered: 0,
mismatches: 0,
bytes: 0,
})
}
fn submit(&mut self, au: &EncodedFrame) -> Result<()> {
let mut flags = FLAG_PIC as u32;
if au.keyframe {
flags |= FLAG_SOF as u32;
}
self.host
.submit_frame(&au.data, au.pts_ns, flags)
.map_err(|e| anyhow!("host submit_frame: {e:?}"))?;
self.submitted += 1;
self.bytes += au.data.len() as u64;
// Lossless + in-order loopback: each submit yields exactly the AU just sent.
loop {
match self.client.poll_frame() {
Ok(frame) => {
self.recovered += 1;
if frame.data != au.data {
self.mismatches += 1;
tracing::warn!(
recovered = self.recovered,
got = frame.data.len(),
expected = au.data.len(),
"loopback AU mismatch"
);
}
}
Err(punktfunk_core::PunktfunkError::NoFrame) => break,
Err(e) => return Err(anyhow!("client poll_frame: {e:?}")),
}
}
Ok(())
}
fn report(&self) {
tracing::info!(
submitted = self.submitted,
recovered = self.recovered,
mismatches = self.mismatches,
bytes = self.bytes,
"punktfunk-core loopback: AUs FEC-packetized → sent → reassembled & verified"
);
}
}