From 449a67ce8d1ec649ff610b34cc1ed5e9c034d547 Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 3 Jul 2026 21:22:12 +0000 Subject: [PATCH] =?UTF-8?q?feat(protocol):=20per-AU=20host-timing=20plane?= =?UTF-8?q?=20(0xCF)=20=E2=80=94=20split=20host+network=20latency=20(stats?= =?UTF-8?q?=20phase=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The unified-stats equation's host+network stage was one opaque number because the wire carried nothing but pts_ns. Now the host reports its own share per frame: when the client's Hello sets VIDEO_CAP_HOST_TIMING (0x08), the send thread emits a 13-byte 0xCF datagram — [tag][pts_ns u64][host_us u32] — right after the AU's last packet leaves the socket, so host_us = capture→fully-sent (capture read/convert, encode, FEC+seal, paced send) against the same anchor the wire pts carries. Clients correlate by pts_ns and derive network = (received + clock_offset − pts) − host_us; the two terms tile per frame by construction. Back-compat is free in all four combinations: old clients ignore unknown datagram tags, old hosts ignore unknown cap bits (client keeps the combined stage). The hardened data-plane format is untouched — this rides the established QUIC side-plane pattern (0xC8…0xCE). NativeClient ORs the bit in unconditionally and exposes next_host_timing(); the C ABI gains PunktfunkHostTiming + punktfunk_connection_next_host_timing (additive). The synthetic host emits 0xCF too, so pure-loopback protocol tests cover the plane. The probe reports the split (host_p50/p95_us · net_p50/p95_us) and is our direct analogue of Sunshine's "host processing latency" — ours additionally includes the paced send. Validated on loopback (synthetic host + probe, debug build): 240/240 AUs matched, host_p50 6.5 ms + net_p50 6.4 ms ≈ capture→received p50 13.0 ms. Core suite + new 0xCF roundtrip/truncation test green; host+core+probe clippy clean. Co-Authored-By: Claude Fable 5 --- clients/probe/src/main.rs | 54 ++++++++++++++++++- crates/punktfunk-core/src/abi.rs | 59 +++++++++++++++++++++ crates/punktfunk-core/src/client.rs | 39 +++++++++++++- crates/punktfunk-core/src/quic.rs | 70 +++++++++++++++++++++++++ crates/punktfunk-host/src/punktfunk1.rs | 48 ++++++++++++++++- include/punktfunk_core.h | 48 +++++++++++++++++ 6 files changed, 314 insertions(+), 4 deletions(-) diff --git a/clients/probe/src/main.rs b/clients/probe/src/main.rs index 1bcb674..9b39e93 100644 --- a/clients/probe/src/main.rs +++ b/clients/probe/src/main.rs @@ -424,7 +424,9 @@ async fn session(args: Args) -> Result<()> { // PUNKTFUNK_CLIENT_444=1 advertises VIDEO_CAP_444 (host HEVC 4:4:4 path) — verify the // resulting chroma with `ffprobe` on the `--out` .h265. video_caps: { - let mut caps = 0u8; + // Always ask for per-AU host timings (0xCF) — this is a measurement tool, and the + // host/network split is exactly what it exists to report. Old hosts ignore the bit. + let mut caps = punktfunk_core::quic::VIDEO_CAP_HOST_TIMING; if std::env::var_os("PUNKTFUNK_CLIENT_10BIT").is_some() { caps |= punktfunk_core::quic::VIDEO_CAP_10BIT; } @@ -902,6 +904,10 @@ async fn session(args: Args) -> Result<()> { let audio_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let rumble_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let hidout_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + // Per-AU host timings (0xCF) → the stream loop, which matches them to received AUs by pts + // and reports the host/network split. try_send: overflow drops samples, never blocks QUIC. + let (host_timing_tx, host_timing_rx) = + std::sync::mpsc::sync_channel::(512); { let (a, ab, r, h) = ( audio_pkts.clone(), @@ -909,6 +915,7 @@ async fn session(args: Args) -> Result<()> { rumble_pkts.clone(), hidout_pkts.clone(), ); + let ht_tx = host_timing_tx; let conn2 = conn.clone(); // Build a multistream decoder for the host-RESOLVED layout so the probe actually decodes // the surround stream (not just counts bytes) — the headless validator for the encode path. @@ -956,6 +963,10 @@ async fn session(args: Args) -> Result<()> { if h.fetch_add(1, Relaxed) < 12 { tracing::info!(?hid, "DualSense HID output (0xCD)"); } + } else if let Some(t) = punktfunk_core::quic::decode_host_timing_datagram(&d) { + // Per-AU host timing (0xCF) — forwarded to the stream loop for the + // host/network latency split. + let _ = ht_tx.try_send(t); } } }); @@ -1000,6 +1011,12 @@ async fn session(args: Args) -> Result<()> { let mut mismatched = 0u32; let mut bytes = 0u64; let mut latencies_us: Vec = Vec::new(); + // Host/network split: received AUs awaiting their 0xCF host timing (pts → capture→received + // µs), matched as the datagrams arrive. Bounded — an old host never sends any. + let mut pending_split: std::collections::VecDeque<(u64, u64)> = + std::collections::VecDeque::new(); + let mut host_us_v: Vec = Vec::new(); + let mut net_us_v: Vec = Vec::new(); let mut last_rx = std::time::Instant::now(); let started = std::time::Instant::now(); // Adaptive-FEC loss window: publish a fresh estimate every 750 ms for the LossReport task. @@ -1057,6 +1074,19 @@ async fn session(args: Args) -> Result<()> { .max(0) as u64; if lat > 0 && lat < 10_000_000_000 { latencies_us.push(lat / 1000); + pending_split.push_back((frame.pts_ns, lat / 1000)); + if pending_split.len() > 1024 { + pending_split.pop_front(); + } + } + // Match any host timings (0xCF) that have arrived: host = the reported + // capture→sent, network = our capture→received minus it (per-frame tiling). + while let Ok(t) = host_timing_rx.try_recv() { + if let Some(i) = pending_split.iter().position(|(p, _)| *p == t.pts_ns) { + let (_, hostnet_us) = pending_split.remove(i).unwrap(); + host_us_v.push(t.host_us as u64); + net_us_v.push(hostnet_us.saturating_sub(t.host_us as u64)); + } } if expected > 0 { // Verification mode: deterministic content. @@ -1103,6 +1133,28 @@ async fn session(args: Args) -> Result<()> { "punktfunk/1 stream complete (capture→received latency; skew_corrected=true ⇒ \ cross-machine valid, false ⇒ same-host clock)" ); + if !host_us_v.is_empty() { + // The host/network split from the per-AU 0xCF timings (design/stats-unification.md + // Phase 2): host = the host's own capture→sent, network = capture→received minus it. + let pcts = |v: &mut Vec, p: f64| -> u64 { + if v.is_empty() { + return 0; + } + v.sort_unstable(); + v[((v.len() as f64 * p) as usize).min(v.len() - 1)] + }; + tracing::info!( + timing_samples = host_us_v.len(), + host_p50_us = pcts(&mut host_us_v, 0.50), + host_p95_us = pcts(&mut host_us_v, 0.95), + net_p50_us = pcts(&mut net_us_v, 0.50), + net_p95_us = pcts(&mut net_us_v, 0.95), + "host/network latency split (host = capture→sent on the host; network = wire + \ + reassembly)" + ); + } else { + tracing::info!("no host timing datagrams (0xCF) — old host; host+network unsplit"); + } if expected > 0 { anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames"); anyhow::ensure!(ok == expected, "received {ok}/{expected} frames"); diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index eba2c95..ff3fdfc 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -635,6 +635,22 @@ impl PunktfunkHdrMeta { } } +/// One access unit's host-side processing time ([`punktfunk_connection_next_host_timing`]): +/// capture → fully sent, i.e. the whole host pipeline (capture read/convert, encode, FEC+seal, +/// paced send). Correlate to the AU whose `PunktfunkFrame::pts_ns` equals `pts_ns`, then +/// `network = (received_instant + clock_offset − pts_ns) − host_us` — the unified stats HUD's +/// `host` / `network` split (design/stats-unification.md Phase 2). Best-effort: a lost datagram +/// means that frame simply contributes no sample. +#[cfg(feature = "quic")] +#[repr(C)] +#[derive(Clone, Copy)] +pub struct PunktfunkHostTiming { + /// The AU's capture stamp (host capture clock — matches `PunktfunkFrame::pts_ns` exactly). + pub pts_ns: u64, + /// Host capture→sent duration, µs. + pub host_us: u32, +} + /// `PunktfunkRichInput::kind` — a touchpad contact (`finger`/`active`/`x`/`y` valid). pub const PUNKTFUNK_RICH_TOUCHPAD: u8 = 1; /// `PunktfunkRichInput::kind` — a motion sample (`gyro`/`accel` valid). @@ -1759,6 +1775,49 @@ pub unsafe extern "C" fn punktfunk_connection_next_hdr_meta( }) } +/// Pull the next per-AU host timing (0xCF) into `*out`: the host's capture→sent duration for one +/// access unit, correlated to the AU by `pts_ns` (see [`PunktfunkHostTiming`]). +/// [`PunktfunkStatus::NoFrame`] on timeout, [`PunktfunkStatus::Closed`] once the session ended. +/// A stats consumer drains this non-blockingly (`timeout_ms = 0`) alongside its frame samples; +/// an older host never emits any — keep showing the combined `host+network` stage then. Same +/// threading rules as [`punktfunk_connection_next_rumble`] (one puller, may run alongside the +/// other planes). +/// +/// # Safety +/// `c` is a valid connection handle; `out` is writable for one `PunktfunkHostTiming`. +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_next_host_timing( + c: *mut PunktfunkConnection, + out: *mut PunktfunkHostTiming, + timeout_ms: u32, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + if out.is_null() { + return PunktfunkStatus::NullPointer; + } + match c + .inner + .next_host_timing(std::time::Duration::from_millis(timeout_ms as u64)) + { + Ok(t) => { + unsafe { + *out = PunktfunkHostTiming { + pts_ns: t.pts_ns, + host_us: t.host_us, + } + }; + PunktfunkStatus::Ok + } + Err(e) => e.status(), + } + }) +} + /// Read the session's resolved colour signalling + encode bit depth (from the host's Welcome). /// Each out pointer is filled when non-NULL: `primaries`/`transfer`/`matrix` are CICP code points /// (BT.709 = 1; BT.2020 = 9; PQ transfer = 16, HLG = 18; BT.2020-NCL matrix = 9), `full_range` is diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index cc5a123..8b43fd8 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -140,6 +140,11 @@ const HIDOUT_QUEUE: usize = 32; /// and low-rate (one on start, re-sent on mastering changes / keyframes); a small ring is ample. const HDR_META_QUEUE: usize = 8; +/// Host-timing plane depth (0xCF, one datagram per AU). Sized for a 240 fps stream whose stats +/// consumer drains once per second with headroom; overflow drops the newest sample (try_send) — +/// harmless, it's per-frame observability, not state. +const HOST_TIMING_QUEUE: usize = 512; + /// One Opus packet from the host's audio datagram stream (48 kHz stereo, 5 ms frames). #[derive(Clone, Debug)] pub struct AudioPacket { @@ -161,6 +166,9 @@ pub struct NativeClient { hidout: Mutex>, /// Inbound static HDR metadata (ST.2086 mastering + content light level) — 0xCE datagrams. hdr_meta: Mutex>, + /// Inbound per-AU host capture→send timings — 0xCF datagrams (the client always advertises + /// [`quic::VIDEO_CAP_HOST_TIMING`]; an older host simply never sends any). + host_timing: Mutex>, input_tx: tokio::sync::mpsc::UnboundedSender, /// Outbound mic frames `(seq, pts_ns, opus)` → encoded as 0xCB datagrams by the worker. mic_tx: tokio::sync::mpsc::UnboundedSender<(u32, u64, Vec)>, @@ -315,6 +323,8 @@ impl NativeClient { let (rumble_tx, rumble_rx) = std::sync::mpsc::sync_channel::<(u16, u16, u16)>(RUMBLE_QUEUE); let (hidout_tx, hidout_rx) = std::sync::mpsc::sync_channel::(HIDOUT_QUEUE); let (hdr_meta_tx, hdr_meta_rx) = std::sync::mpsc::sync_channel::(HDR_META_QUEUE); + let (host_timing_tx, host_timing_rx) = + std::sync::mpsc::sync_channel::(HOST_TIMING_QUEUE); let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel::(); let (mic_tx, mic_rx) = tokio::sync::mpsc::unbounded_channel::<(u32, u64, Vec)>(); let (rich_input_tx, rich_input_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -370,6 +380,7 @@ impl NativeClient { rumble_tx, hidout_tx, hdr_meta_tx, + host_timing_tx, input_rx, mic_rx, rich_input_rx, @@ -412,6 +423,7 @@ impl NativeClient { rumble: Mutex::new(rumble_rx), hidout: Mutex::new(hidout_rx), hdr_meta: Mutex::new(hdr_meta_rx), + host_timing: Mutex::new(host_timing_rx), input_tx, mic_tx, rich_input_tx, @@ -715,6 +727,20 @@ impl NativeClient { } } + /// Pull the next per-AU host timing (0xCF): the host's capture→sent duration for one access + /// unit, correlated to the AU by `pts_ns`. Feeds the unified stats HUD's `host` / `network` + /// split (`network = (received + clock_offset − pts) − host_us`); a stats consumer should + /// drain this non-blockingly alongside its frame samples. An older host never sends any — + /// the HUD then keeps the combined `host+network` stage. Same timeout/closed semantics as + /// [`NativeClient::next_hidout`]. + pub fn next_host_timing(&self, timeout: Duration) -> Result { + match self.host_timing.lock().unwrap().recv_timeout(timeout) { + Ok(t) => Ok(t), + Err(RecvTimeoutError::Timeout) => Err(PunktfunkError::NoFrame), + Err(RecvTimeoutError::Disconnected) => Err(PunktfunkError::Closed), + } + } + /// Queue one input event for delivery as a QUIC datagram. pub fn send_input(&self, ev: &InputEvent) -> Result<()> { self.input_tx.send(*ev).map_err(|_| PunktfunkError::Closed) @@ -768,6 +794,7 @@ struct WorkerArgs { rumble_tx: SyncSender<(u16, u16, u16)>, hidout_tx: SyncSender, hdr_meta_tx: SyncSender, + host_timing_tx: SyncSender, input_rx: tokio::sync::mpsc::UnboundedReceiver, mic_rx: tokio::sync::mpsc::UnboundedReceiver<(u32, u64, Vec)>, rich_input_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -803,6 +830,7 @@ async fn worker_main(args: WorkerArgs) { rumble_tx, hidout_tx, hdr_meta_tx, + host_timing_tx, mut input_rx, mut mic_rx, mut rich_input_rx, @@ -860,8 +888,10 @@ async fn worker_main(args: WorkerArgs) { launch: launch.clone(), // The embedder's decode/present caps (e.g. the Windows client advertises // VIDEO_CAP_10BIT | VIDEO_CAP_HDR). The host only upgrades to a 10-bit / HDR encode - // when the matching bit is set, so `0` stays an 8-bit BT.709 stream. - video_caps, + // when the matching bit is set, so `0` stays an 8-bit BT.709 stream. HOST_TIMING is + // OR'd in unconditionally: every NativeClient build demuxes the 0xCF plane, and the + // bit only asks the host for observability datagrams (never changes the encode). + video_caps: video_caps | crate::quic::VIDEO_CAP_HOST_TIMING, // Requested surround channel count; the host echoes the resolved value in Welcome. audio_channels, // The codecs this client can decode + its soft preference (0 = auto). The host @@ -1099,6 +1129,11 @@ async fn worker_main(args: WorkerArgs) { let _ = hdr_meta_tx.try_send(m); } } + Some(&crate::quic::HOST_TIMING_MAGIC) => { + if let Some(t) = crate::quic::decode_host_timing_datagram(&d) { + let _ = host_timing_tx.try_send(t); + } + } _ => {} // unknown tag — a newer host; ignore } } diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 2c5f07e..8126a2d 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -114,6 +114,13 @@ pub const VIDEO_CAP_HDR: u8 = 0x02; /// [`Welcome::chroma_format`] reflects the real resolved value. Independent of 10-bit/HDR (4:4:4 is a /// chroma decision, bit depth is a depth decision; the two may combine where the hardware allows). pub const VIDEO_CAP_444: u8 = 0x04; +/// [`Hello::video_caps`] bit: the client consumes per-AU host-timing datagrams +/// ([`HOST_TIMING_MAGIC`], 0xCF) — the host's capture→send duration per frame, letting the client +/// split its `host+network` latency stage into `host` and `network` +/// (design/stats-unification.md Phase 2). The host emits 0xCF ONLY when this bit is set (an older +/// host ignores it and simply never sends any); a client that doesn't set it keeps the combined +/// stage. Purely observability — never changes what the host encodes. +pub const VIDEO_CAP_HOST_TIMING: u8 = 0x08; /// [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software** /// encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST @@ -1601,6 +1608,50 @@ pub fn decode_hdr_meta_datagram(b: &[u8]) -> Option { }) } +/// Per-AU host-timing datagram tag, host → client (see [`HostTiming`]). Next tag after +/// [`HDR_META_MAGIC`]. Emitted once per access unit, right after its last packet left the host's +/// socket, and only when the client advertised [`VIDEO_CAP_HOST_TIMING`]. +pub const HOST_TIMING_MAGIC: u8 = 0xCF; + +/// One access unit's host-side processing time: capture → fully sent (the whole host pipeline — +/// capture read/convert, encode, FEC+seal, paced send). The client correlates it to the AU by +/// `pts_ns` (the AU's capture stamp, unique per frame) and derives +/// `network = (received + clock_offset − pts_ns) − host_us`, so the unified-stats equation's +/// `host+network` stage splits into two per-frame-tiling terms. Best-effort like every side-plane +/// datagram: a lost 0xCF just means that frame contributes no host/network sample. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct HostTiming { + /// The AU's capture stamp (host capture clock — matches the AU's `pts_ns` exactly). + pub pts_ns: u64, + /// Host capture→sent duration, µs (saturated at `u32::MAX` ≈ 71 min — far past the 10 s + /// client-side sanity clamp anyway). + pub host_us: u32, +} + +/// Wire length of a [`HOST_TIMING_MAGIC`] datagram: tag + u64 pts + u32 µs = 13 bytes. +const HOST_TIMING_LEN: usize = 1 + 8 + 4; + +/// Encode a [`HostTiming`] into a [`HOST_TIMING_MAGIC`] datagram. +pub fn encode_host_timing_datagram(t: &HostTiming) -> Vec { + let mut b = Vec::with_capacity(HOST_TIMING_LEN); + b.push(HOST_TIMING_MAGIC); + b.extend_from_slice(&t.pts_ns.to_le_bytes()); + b.extend_from_slice(&t.host_us.to_le_bytes()); + b +} + +/// Parse a [`HOST_TIMING_MAGIC`] datagram → [`HostTiming`]. `None` on bad tag or a short buffer +/// (the fixed length bounds every read before it happens). +pub fn decode_host_timing_datagram(b: &[u8]) -> Option { + if b.len() < HOST_TIMING_LEN || b[0] != HOST_TIMING_MAGIC { + return None; + } + Some(HostTiming { + pts_ns: u64::from_le_bytes(b[1..9].try_into().unwrap()), + host_us: u32::from_le_bytes(b[9..13].try_into().unwrap()), + }) +} + /// Async framed-message IO over a quinn stream (`u16 LE length || payload`). pub mod io { /// Read one framed message (bounded at 64 KiB — control messages are tiny). @@ -2189,6 +2240,25 @@ mod tests { assert_eq!(decode_hdr_meta_datagram(&bad), None); } + #[test] + fn host_timing_datagram_roundtrip_and_truncation() { + let t = HostTiming { + pts_ns: 1_751_500_000_123_456_789, // a realistic 2026 CLOCK_REALTIME capture stamp + host_us: 4_321, + }; + let d = encode_host_timing_datagram(&t); + assert_eq!(d[0], HOST_TIMING_MAGIC); + assert_eq!(d.len(), 13); + assert_eq!(decode_host_timing_datagram(&d), Some(t)); + // Truncated buffers and a wrong tag are rejected (never partially read). + for n in 0..d.len() { + assert_eq!(decode_host_timing_datagram(&d[..n]), None); + } + let mut bad = d.clone(); + bad[0] = HDR_META_MAGIC; + assert_eq!(decode_host_timing_datagram(&bad), None); + } + #[test] fn hello_start_roundtrip() { let h = Hello { diff --git a/crates/punktfunk-host/src/punktfunk1.rs b/crates/punktfunk-host/src/punktfunk1.rs index f0fc460..45418c3 100644 --- a/crates/punktfunk-host/src/punktfunk1.rs +++ b/crates/punktfunk-host/src/punktfunk1.rs @@ -1155,6 +1155,11 @@ async fn serve_session( let stop_stream = stop.clone(); let fec_target_dp = fec_target.clone(); // data-plane handle to the adaptive-FEC target let conn_stream = conn.clone(); // for sending the source's real HDR metadata (0xCE) mid-stream + // Per-AU host-timing emission (0xCF): only when the client advertised the cap bit. All + // first-party clients do (the core connector ORs it in); an older client leaves it clear + // and gets no extra datagrams. + let timing_conn = + (hello.video_caps & punktfunk_core::quic::VIDEO_CAP_HOST_TIMING != 0).then(|| conn.clone()); let stats_dp = stats; // data-plane handle to the shared stats recorder // Short label for web-console stats captures: the client's cert-fingerprint prefix, else its // peer IP (no fingerprint = anonymous TOFU/--open client). @@ -1197,6 +1202,7 @@ async fn serve_session( &probe_rx, &probe_result_tx, &fec_target_dp, + timing_conn.as_ref(), ), Punktfunk1Source::Virtual => { let compositor = compositor @@ -1217,6 +1223,7 @@ async fn serve_session( probe_result_tx, fec_target: fec_target_dp, conn: conn_stream, + timing_conn, stats: stats_dp, client_label, launch: launch_for_dp, @@ -1810,6 +1817,7 @@ fn synthetic_stream( probe_rx: &std::sync::mpsc::Receiver, probe_result_tx: &tokio::sync::mpsc::UnboundedSender, fec_target: &AtomicU8, + timing_conn: Option<&quinn::Connection>, ) -> Result<()> { let interval = std::time::Duration::from_millis(1000 / 60); for idx in 0..frames { @@ -1820,9 +1828,19 @@ fn synthetic_stream( // Service speed-test probes between synthetic frames (loopback bandwidth tests). service_probes(session, stop, probe_rx, probe_result_tx); let data = test_frame(idx, 64 * 1024); + let pts_ns = now_ns(); session - .submit_frame(&data, now_ns(), (FLAG_PIC | FLAG_SOF) as u32) + .submit_frame(&data, pts_ns, (FLAG_PIC | FLAG_SOF) as u32) .map_err(|e| anyhow!("submit_frame: {e:?}"))?; + // Host timing (0xCF) for protocol tests: near-zero here (no capture/encode), but it + // proves the plane end-to-end on a pure loopback run. + if let Some(tc) = timing_conn { + let t = punktfunk_core::quic::HostTiming { + pts_ns, + host_us: (now_ns().saturating_sub(pts_ns) / 1000).min(u32::MAX as u64) as u32, + }; + let _ = tc.send_datagram(punktfunk_core::quic::encode_host_timing_datagram(&t).into()); + } std::thread::sleep(interval); } tracing::info!(frames, "synthetic stream complete"); @@ -2404,6 +2422,9 @@ fn send_loop( burst_cap: usize, fec_target: Arc, stats: SendStats, + // `Some` = the client advertised VIDEO_CAP_HOST_TIMING: emit one 0xCF datagram per AU right + // after its last packet left the socket (capture→sent, the whole host pipeline incl. pacing). + timing_conn: Option, ) { boost_thread_priority(false); // transmit thread: above-normal (Apollo's encoder-thread level) let mut last_perf = std::time::Instant::now(); @@ -2446,6 +2467,25 @@ fn send_loop( burst_cap, ) { Ok(stat) => { + // Host timing (0xCF): stamped now — the AU's packets have fully left the + // socket — against the same capture anchor the wire pts carries, so the + // client's per-frame math tiles exactly (network = its host+network − this). + // Best-effort like every side-plane datagram; skipped for speed-test filler + // (FLAG_PROBE isn't video and its pts is the burst clock). + if let Some(tc) = &timing_conn { + if msg.flags & FLAG_PROBE as u32 == 0 { + let host_us = (now_ns().saturating_sub(msg.capture_ns) / 1000) + .min(u32::MAX as u64) + as u32; + let t = punktfunk_core::quic::HostTiming { + pts_ns: msg.capture_ns, + host_us, + }; + let _ = tc.send_datagram( + punktfunk_core::quic::encode_host_timing_datagram(&t).into(), + ); + } + } if perf || stats.rec.is_armed() { // `encode_us`/`pace_us`/fps are valid for every frame (always measured), // including the Windows relay + tail-drain frames. The cap/submit/wait splits @@ -2719,6 +2759,10 @@ struct SessionContext { fec_target: Arc, /// The QUIC control connection (carries host→client 0xCE source-HDR metadata mid-stream). conn: quinn::Connection, + /// `Some` when the client advertised [`punktfunk_core::quic::VIDEO_CAP_HOST_TIMING`]: the send + /// thread emits one 0xCF datagram per AU (capture→sent µs) on it, so the client can split its + /// `host+network` latency stage. `None` = older client, no emission. + timing_conn: Option, /// Shared streaming-stats recorder. The capture loop reads `is_armed()` per frame to decide /// whether to measure the per-stage split; the send thread builds + pushes the aggregated /// `StatsSample` at its 2 s boundary. @@ -2764,6 +2808,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { probe_result_tx, fec_target, conn, + timing_conn, stats, client_label, launch, @@ -2870,6 +2915,7 @@ fn virtual_stream(ctx: SessionContext) -> Result<()> { burst_cap, fec_target, send_stats, + timing_conn, ) } }) diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index cd2226e..04796c2 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -254,6 +254,16 @@ #define VIDEO_CAP_444 4 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// [`Hello::video_caps`] bit: the client consumes per-AU host-timing datagrams +// ([`HOST_TIMING_MAGIC`], 0xCF) — the host's capture→send duration per frame, letting the client +// split its `host+network` latency stage into `host` and `network` +// (design/stats-unification.md Phase 2). The host emits 0xCF ONLY when this bit is set (an older +// host ignores it and simply never sends any); a client that doesn't set it keeps the combined +// stage. Purely observability — never changes what the host encodes. +#define VIDEO_CAP_HOST_TIMING 8 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // [`Hello::video_codecs`] bit: the client can decode H.264 / AVC. The GPU-less **software** // encode path (openh264) emits H.264, so a client that wants to stream from a software host MUST @@ -395,6 +405,13 @@ #define HDR_META_MAGIC 206 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Per-AU host-timing datagram tag, host → client (see [`HostTiming`]). Next tag after +// [`HDR_META_MAGIC`]. Emitted once per access unit, right after its last packet left the host's +// socket, and only when the client advertised [`VIDEO_CAP_HOST_TIMING`]. +#define HOST_TIMING_MAGIC 207 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // CICP colour-primaries code point: BT.709. #define ColorInfo_CP_BT709 1 @@ -672,6 +689,21 @@ typedef struct { } PunktfunkHdrMeta; #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// One access unit's host-side processing time ([`punktfunk_connection_next_host_timing`]): +// capture → fully sent, i.e. the whole host pipeline (capture read/convert, encode, FEC+seal, +// paced send). Correlate to the AU whose `PunktfunkFrame::pts_ns` equals `pts_ns`, then +// `network = (received_instant + clock_offset − pts_ns) − host_us` — the unified stats HUD's +// `host` / `network` split (design/stats-unification.md Phase 2). Best-effort: a lost datagram +// means that frame simply contributes no sample. +typedef struct { + // The AU's capture stamp (host capture clock — matches `PunktfunkFrame::pts_ns` exactly). + uint64_t pts_ns; + // Host capture→sent duration, µs. + uint32_t host_us; +} PunktfunkHostTiming; +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // One rich client→host input for the host's virtual DualSense // ([`punktfunk_connection_send_rich_input`]): a touchpad contact or a motion sample. Set `kind` @@ -1189,6 +1221,22 @@ PunktfunkStatus punktfunk_connection_next_hdr_meta(PunktfunkConnection *c, uint32_t timeout_ms); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Pull the next per-AU host timing (0xCF) into `*out`: the host's capture→sent duration for one +// access unit, correlated to the AU by `pts_ns` (see [`PunktfunkHostTiming`]). +// [`PunktfunkStatus::NoFrame`] on timeout, [`PunktfunkStatus::Closed`] once the session ended. +// A stats consumer drains this non-blockingly (`timeout_ms = 0`) alongside its frame samples; +// an older host never emits any — keep showing the combined `host+network` stage then. Same +// threading rules as [`punktfunk_connection_next_rumble`] (one puller, may run alongside the +// other planes). +// +// # Safety +// `c` is a valid connection handle; `out` is writable for one `PunktfunkHostTiming`. +PunktfunkStatus punktfunk_connection_next_host_timing(PunktfunkConnection *c, + PunktfunkHostTiming *out, + uint32_t timeout_ms); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Read the session's resolved colour signalling + encode bit depth (from the host's Welcome). // Each out pointer is filled when non-NULL: `primaries`/`transfer`/`matrix` are CICP code points