feat: M2 P1.5 robustness — IDR-on-request, send pacing, min-parity floor

Graceful FEC behavior on a lossy link: at a realistic 2% packet loss the stream
is now steady 0% (was spiking 40-60%). Verified live.

- IDR/RFI handling: the control thread recognizes the client's recovery requests
  (0x0301 invalidate-reference-frames, 0x0302 request-IDR, 0x0305) and sets a
  shared force_idr flag; the video thread forces an NVENC keyframe on the next
  frame (Encoder::request_keyframe → input frame pict_type = I). Without this, a
  frame that exceeds the FEC budget broke the reference chain until the next GOP
  IDR (~2s), cascading to most of the stream being undecodable.
- Min-parity floor: honor the client's x-nv-vqos[0].fec.minRequiredFecPackets
  (it asks for 2). Small P-frames previously got m=ceil(k*20/100)=1 parity — a
  single loss broke them; flooring m>=2 (capped so k+m<=255, wire pct recomputed)
  protects them. This is what turned the 2% spikes into steady 0%.
- Send pacing: spread each frame's packets evenly across the frame interval
  instead of blasting them at line rate (a real link drops microbursts), matching
  Sunshine's rate-controlled sends; sub-500us sleeps skipped (unreliable).

Note: sustained ~8% uniform loss still degrades — that exceeds 20% FEC for
reference-frame video and real Sunshine degrades there too; real networks are
<1% or bursty, which this now handles cleanly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 12:14:59 +00:00
parent 72f8c05aa3
commit af4360c930
7 changed files with 96 additions and 24 deletions
+36 -15
View File
@@ -23,15 +23,18 @@ pub struct StreamConfig {
pub packet_size: usize,
pub bitrate_kbps: u32,
pub codec: Codec,
/// Client's `x-nv-vqos[0].fec.minRequiredFecPackets` — parity floor per FEC block.
pub min_fec: u8,
}
/// Spawn the video stream thread (idempotent via `running`). Stops when `running` clears.
pub fn start(cfg: StreamConfig, running: Arc<AtomicBool>) {
/// `force_idr` is set by the control stream on a client recovery request.
pub fn start(cfg: StreamConfig, running: Arc<AtomicBool>, force_idr: Arc<AtomicBool>) {
let _ = std::thread::Builder::new()
.name("lumen-video".into())
.spawn(move || {
tracing::info!(?cfg, "video stream starting");
if let Err(e) = run(cfg, &running) {
if let Err(e) = run(cfg, &running, &force_idr) {
tracing::error!(error = %format!("{e:#}"), "video stream failed");
}
running.store(false, Ordering::SeqCst);
@@ -39,7 +42,7 @@ pub fn start(cfg: StreamConfig, running: Arc<AtomicBool>) {
});
}
fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> {
fn run(cfg: StreamConfig, running: &AtomicBool, force_idr: &AtomicBool) -> Result<()> {
let sock = UdpSocket::bind(("0.0.0.0", VIDEO_PORT)).context("bind video UDP")?;
// The client pings the video port so we learn where to send; it re-pings until video
// flows, so a missed early ping is fine.
@@ -88,7 +91,7 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(20);
let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct);
let mut pk = VideoPacketizer::new(cfg.packet_size, fec_pct, cfg.min_fec);
// Pace at a steady rate (capped at 60fps), re-encoding the last captured frame when the
// compositor produced no new one. wlroots only emits frames on damage, so a static or
@@ -114,31 +117,47 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> {
if let Some(f) = capturer.try_latest().context("capture frame")? {
frame = f;
}
// Honor a client recovery request (RFI / request-IDR): force a keyframe so the client
// resyncs immediately instead of waiting for the next GOP boundary.
if force_idr.swap(false, Ordering::SeqCst) {
enc.request_keyframe();
}
enc.submit(&frame).context("encoder submit")?;
// 90 kHz RTP timestamp from wall-clock, so a variable capture rate stays correct.
let ts = (stream_start.elapsed().as_secs_f64() * 90_000.0) as u32;
let mut client_gone = false;
let mut batch: Vec<Vec<u8>> = Vec::new();
while let Some(au) = enc.poll().context("encoder poll")? {
let ft = if au.keyframe {
FrameType::Idr
} else {
FrameType::P
};
for pkt in pk.packetize(&au.data, ft, ts) {
// Simulated network loss: build the packet (advances seq) but skip the send.
batch.extend(pk.packetize(&au.data, ft, ts));
}
// Pace the frame's packets evenly across the frame interval rather than blasting them
// at line rate (a real link drops microbursts). The per-packet schedule also paces the
// frame itself; sub-500µs sleeps are skipped (unreliable), batching the spread.
let mut client_gone = false;
let n = batch.len();
if n > 0 {
let per_ns = frame_interval.as_nanos() as u64 / n as u64;
for (i, pkt) in batch.iter().enumerate() {
if drop_pct > 0 && rng.gen_range(0..100) < drop_pct {
dropped += 1;
continue;
}
if sock.send(&pkt).is_err() {
dropped += 1; // simulated loss: built the packet, skip the send
} else if sock.send(pkt).is_err() {
client_gone = true;
break;
} else {
sent_pkts += 1;
}
let target = tick + Duration::from_nanos(per_ns * (i as u64 + 1));
if let Some(ahead) = target.checked_duration_since(Instant::now()) {
if ahead >= Duration::from_micros(500) {
std::thread::sleep(ahead);
}
}
sent_pkts += 1;
}
if client_gone {
break;
}
}
if client_gone {
@@ -152,6 +171,8 @@ fn run(cfg: StreamConfig, running: &AtomicBool) -> Result<()> {
fps_count = 0;
fps_t = Instant::now();
}
// Backstop the frame rate when few/no packets were produced (the packet pacing above
// otherwise consumes ~one frame interval on its own).
let elapsed = tick.elapsed();
if elapsed < frame_interval {
std::thread::sleep(frame_interval - elapsed);