From 05bc9ab22c7d12bd4d74a989cf8267e002f4f43f Mon Sep 17 00:00:00 2001 From: enricobuehler Date: Fri, 12 Jun 2026 11:20:20 +0000 Subject: [PATCH] feat(latency): wall-clock skew handshake for cross-machine latency measurement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ClockProbe/ClockEcho on the QUIC control stream — 8 NTP-style rounds right after Start; the min-RTT sample gives the host-client clock offset (clock_offset_ns estimator in punktfunk-core). The client adds the offset to its receive instant before differencing against the AU pts_ns, so the capture->reassembled latency percentiles are valid across machines (skew_corrected=true), not just same-host. Back-compat: an old host that doesn't answer the probe times out and the client falls back to a shared-clock assumption (skew_corrected=false). Host adds one ClockProbe dispatch arm in the control task; the client runs clock_sync after Start, before the --remode/--speed-test tasks take the stream. Validated cross-LAN (GNOME box -> dev box): offset ~ -1.57 ms (reproducible), rtt ~140 us, p50 1.30 ms skew-corrected capture->reassembled — the offset is exactly the systematic error the handshake removes. Unit tests for the message codecs and the min-RTT offset estimator. Roadmap §12: skew handshake done; remaining for true glass-to-glass is the Apple client present-stamp (decode->present) plus the host render->capture term. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 6 +- crates/punktfunk-client-rs/src/main.rs | 58 ++++++++++- crates/punktfunk-core/src/quic.rs | 128 +++++++++++++++++++++++++ crates/punktfunk-host/src/m3.rs | 17 +++- docs/roadmap.md | 19 ++-- include/punktfunk_core.h | 10 ++ 6 files changed, 225 insertions(+), 13 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index c6e564c..0dd57c8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -50,7 +50,11 @@ Low-latency desktop/game streaming stack, Linux-first, with a shared Rust protoc **Mid-stream mode renegotiation**: `Reconfigure` on the still-open control stream — the host rebuilds output+encoder at the new mode in ~90 ms while the data plane runs on (validated live: one .h265 with 720p and 1080p segments). Measured on-box at 720p120: 1680/1680 frames, **p50 0.83 ms** - capture→…→reassembled; audio measured live (~200 pkts/s). `punktfunk-client-rs` is the + capture→…→reassembled; audio measured live (~200 pkts/s). A **wall-clock skew handshake** + (`ClockProbe`/`ClockEcho`, 8 NTP rounds after `Start`, `clock_offset_ns`) aligns the client to the + host clock, so that latency is now valid **cross-machine** (`skew_corrected=true`) — measured GNOME + box → dev box over the LAN: **p50 1.30 ms** (the −1.57 ms inter-box clock offset removed). + `punktfunk-client-rs` is the working reference client (`--pin`, datagram counters, `--input-test` incl. gamepad). The embeddable connector (`NativeClient`) exposes it all over the C ABI: `punktfunk_connect` (pin/TOFU) + `next_au`/`next_audio`/`next_rumble`/`next_hidout`/`send_input`/ diff --git a/crates/punktfunk-client-rs/src/main.rs b/crates/punktfunk-client-rs/src/main.rs index d452f99..8c3286a 100644 --- a/crates/punktfunk-client-rs/src/main.rs +++ b/crates/punktfunk-client-rs/src/main.rs @@ -45,7 +45,8 @@ use punktfunk_core::config::Role; use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::FLAG_PROBE; use punktfunk_core::quic::{ - endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, + endpoint, io, ClockEcho, ClockProbe, Hello, ProbeRequest, ProbeResult, Reconfigure, + Reconfigured, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; @@ -331,6 +332,40 @@ fn discover(secs: u64) -> Result<()> { Ok(()) } +/// Run the wall-clock skew handshake: `ROUNDS` `ClockProbe`/`ClockEcho` round-trips on the control +/// stream, returning the host−client clock offset (ns) from the minimum-RTT sample, or `None` if the +/// host never answers (an old host — the caller then assumes a shared clock). Each read is bounded so +/// a silent old host can't wedge session start. +async fn clock_sync(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Option { + const ROUNDS: usize = 8; + let read_timeout = std::time::Duration::from_secs(2); + let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS); + for _ in 0..ROUNDS { + let t1 = now_ns(); + let probe = ClockProbe { t1_ns: t1 }.encode(); + if io::write_msg(send, &probe).await.is_err() { + break; + } + let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await; + let echo = match read { + Ok(Ok(b)) => match ClockEcho::decode(&b) { + Ok(e) => e, + Err(_) => break, // not a ClockEcho -> give up on skew + }, + _ => break, // timeout or stream error -> old host / no skew support + }; + samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns())); + } + let (offset, rtt) = punktfunk_core::quic::clock_offset_ns(&samples)?; + tracing::info!( + offset_ns = offset, + rtt_us = rtt / 1000, + rounds = samples.len(), + "clock skew estimated (host-client); latency now cross-machine valid" + ); + Some(offset) +} + async fn session(args: Args) -> Result<()> { let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?; let identity = load_or_create_identity()?; @@ -392,6 +427,11 @@ async fn session(args: Args) -> Result<()> { ) .await?; + // Wall-clock skew handshake on the still-private control stream (before --remode/--speed-test + // take it): align our clock to the host's so the per-frame capture→reassembled latency is valid + // across machines. `None` ⇒ an old host that doesn't answer — fall back to a shared clock (0). + let clock_offset_ns = clock_sync(&mut send, &mut recv).await; + // Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the // --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold // now_ns timestamps of the receive window (0 = unset). @@ -761,6 +801,12 @@ async fn session(args: Args) -> Result<()> { probe_last_ns.clone(), ); + // Express our receive time in the host clock before differencing against the host-stamped + // capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency + // is then only valid same-host, as before). + let clock_offset = clock_offset_ns.unwrap_or(0); + let skew_corrected = clock_offset_ns.is_some(); + // Data plane on a blocking thread (native threads only on the frame path). let result = tokio::task::spawn_blocking(move || -> Result<()> { let transport = @@ -810,8 +856,10 @@ async fn session(args: Args) -> Result<()> { continue; } bytes += frame.data.len() as u64; - // The host stamps pts with its capture wall clock; same-host runs share it. - let lat = now_ns().saturating_sub(frame.pts_ns); + // capture→reassembled: our receive instant in the host clock (now + offset) + // minus the host's capture pts. offset is 0 same-host / old host. + let lat = (now_ns() as i128 + clock_offset as i128 - frame.pts_ns as i128) + .max(0) as u64; if lat > 0 && lat < 10_000_000_000 { latencies_us.push(lat / 1000); } @@ -856,7 +904,9 @@ async fn session(args: Args) -> Result<()> { lat_p95_us = pct(0.95), lat_p99_us = pct(0.99), lat_max_us = latencies_us.last().copied().unwrap_or(0), - "punktfunk/1 stream complete (capture→reassembled latency, same-host clock)" + skew_corrected, + "punktfunk/1 stream complete (capture→reassembled latency; skew_corrected=true ⇒ \ + cross-machine valid, false ⇒ same-host clock)" ); if expected > 0 { anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames"); diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 476c762..9f5e25b 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -144,6 +144,44 @@ pub struct ProbeResult { pub duration_ms: u32, } +/// `client → host`, right after [`Start`]: one round of the wall-clock skew handshake. The client +/// stamps `t1_ns` (its monotonic-since-epoch clock) and sends; the host echoes it in [`ClockEcho`] +/// with its own receive/send stamps. A few rounds let the client estimate the host↔client clock +/// offset, so the per-frame `capture→reassembled` latency (the AU `pts_ns` is the host's capture +/// clock) is meaningful across machines, not just same-host. An old host ignores it (the client +/// times out and assumes a shared clock). +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct ClockProbe { + pub t1_ns: u64, +} + +/// `host → client`: answer to [`ClockProbe`]. `t2_ns` is when the host received the probe and +/// `t3_ns` when it sent this echo (both the host clock); `t1_ns` is the client's send stamp echoed +/// back. With the client's receive time `t4`, offset = ((t2−t1)+(t3−t4))/2 (host minus client) and +/// RTT = (t4−t1)−(t3−t2). See [`clock_offset_ns`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct ClockEcho { + pub t1_ns: u64, + pub t2_ns: u64, + pub t3_ns: u64, +} + +/// Estimate the host↔client clock offset (**host minus client**, ns) and RTT (ns) from skew-handshake +/// samples `(t1, t2, t3, t4)` — NTP's formula, taking the **minimum-RTT** sample (least queuing +/// noise; also discards the first round's host-setup latency). Offset is positive when the host +/// clock is ahead of the client's; add it to a client timestamp to express it in the host clock. +/// Returns `None` for an empty sample set. +pub fn clock_offset_ns(samples: &[(u64, u64, u64, u64)]) -> Option<(i64, u64)> { + samples + .iter() + .map(|&(t1, t2, t3, t4)| { + let rtt = ((t4 as i128 - t1 as i128) - (t3 as i128 - t2 as i128)).max(0) as u64; + let offset = (((t2 as i128 - t1 as i128) + (t3 as i128 - t4 as i128)) / 2) as i64; + (offset, rtt) + }) + .min_by_key(|&(_, rtt)| rtt) +} + /// Type byte of [`Reconfigure`] (first byte after the magic). pub const MSG_RECONFIGURE: u8 = 0x01; /// Type byte of [`Reconfigured`]. @@ -152,6 +190,10 @@ pub const MSG_RECONFIGURED: u8 = 0x02; pub const MSG_PROBE_REQUEST: u8 = 0x20; /// Type byte of [`ProbeResult`]. pub const MSG_PROBE_RESULT: u8 = 0x21; +/// Type byte of [`ClockProbe`]. +pub const MSG_CLOCK_PROBE: u8 = 0x30; +/// Type byte of [`ClockEcho`]. +pub const MSG_CLOCK_ECHO: u8 = 0x31; // --------------------------------------------------------------------------------------------- // Pairing ceremony (typed control messages): instead of a session Hello, a client may open @@ -668,6 +710,50 @@ impl ProbeResult { } } +impl ClockProbe { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] t1[5..13] + let mut b = Vec::with_capacity(13); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_CLOCK_PROBE); + b.extend_from_slice(&self.t1_ns.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 13 || &b[0..4] != CTL_MAGIC || b[4] != MSG_CLOCK_PROBE { + return Err(PunktfunkError::InvalidArg("bad ClockProbe")); + } + Ok(ClockProbe { + t1_ns: u64::from_le_bytes(b[5..13].try_into().unwrap()), + }) + } +} + +impl ClockEcho { + pub fn encode(&self) -> Vec { + // magic[0..4] type[4] t1[5..13] t2[13..21] t3[21..29] + let mut b = Vec::with_capacity(29); + b.extend_from_slice(CTL_MAGIC); + b.push(MSG_CLOCK_ECHO); + b.extend_from_slice(&self.t1_ns.to_le_bytes()); + b.extend_from_slice(&self.t2_ns.to_le_bytes()); + b.extend_from_slice(&self.t3_ns.to_le_bytes()); + b + } + + pub fn decode(b: &[u8]) -> Result { + if b.len() != 29 || &b[0..4] != CTL_MAGIC || b[4] != MSG_CLOCK_ECHO { + return Err(PunktfunkError::InvalidArg("bad ClockEcho")); + } + Ok(ClockEcho { + t1_ns: u64::from_le_bytes(b[5..13].try_into().unwrap()), + t2_ns: u64::from_le_bytes(b[13..21].try_into().unwrap()), + t3_ns: u64::from_le_bytes(b[21..29].try_into().unwrap()), + }) + } +} + /// Frame a message for the control stream: `u16 LE length || payload`. pub fn frame(payload: &[u8]) -> Vec { let mut b = Vec::with_capacity(2 + payload.len()); @@ -1434,6 +1520,48 @@ mod tests { assert!(ProbeResult::decode(&req.encode()).is_err()); } + #[test] + fn clock_messages_roundtrip() { + let probe = ClockProbe { + t1_ns: 1_700_000_000_123, + }; + assert_eq!(ClockProbe::decode(&probe.encode()).unwrap(), probe); + let echo = ClockEcho { + t1_ns: 1_700_000_000_123, + t2_ns: 1_700_000_050_456, + t3_ns: 1_700_000_050_789, + }; + assert_eq!(ClockEcho::decode(&echo.encode()).unwrap(), echo); + // Disjoint from the other control messages (distinct type bytes). + assert!(ClockProbe::decode(&echo.encode()).is_err()); + assert!(ProbeRequest::decode(&probe.encode()).is_err()); + assert!(ClockEcho::decode(&probe.encode()).is_err()); + } + + #[test] + fn clock_offset_picks_min_rtt_and_recovers_offset() { + // Host clock is +1_000_000 ns ahead of the client. Construct samples where a symmetric + // round-trip recovers exactly that offset, and a noisy (asymmetric, high-RTT) sample is + // present but must be ignored by the min-RTT selection. + const OFF: i64 = 1_000_000; + // Clean sample: client t1=0, one-way=200µs each way → t2 = t1 + 200_000 + OFF (host clock), + // t3 = t2 + 50_000 (host processing), t4 = t3 - OFF + 200_000 (back in client clock). + let t1 = 0u64; + let t2 = (t1 as i64 + 200_000 + OFF) as u64; + let t3 = t2 + 50_000; + let t4 = (t3 as i64 - OFF + 200_000) as u64; + // Noisy sample: same offset but a fat, asymmetric RTT (slow return path) — higher RTT. + let n1 = 1_000_000u64; + let n2 = (n1 as i64 + 200_000 + OFF) as u64; + let n3 = n2 + 50_000; + let n4 = (n3 as i64 - OFF + 5_000_000) as u64; // 5 ms return → big RTT + let (offset, rtt) = + clock_offset_ns(&[(n1, n2, n3, n4), (t1, t2, t3, t4)]).expect("non-empty"); + assert_eq!(offset, OFF, "min-RTT sample recovers the offset exactly"); + assert_eq!(rtt, 400_000, "min-RTT sample's RTT (2x200us), not the noisy 5ms one"); + assert!(clock_offset_ns(&[]).is_none()); + } + #[test] fn control_messages_disjoint_from_hello() { // A Hello uses MAGIC (PKF1); control messages use CTL_MAGIC (PKFc). No Hello — at diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 6e45219..46911eb 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -27,8 +27,8 @@ use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref, use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF}; use punktfunk_core::quic::{ - endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, ProbeRequest, - ProbeResult, Reconfigure, Reconfigured, Start, Welcome, + endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult, + ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::Session; @@ -545,6 +545,19 @@ async fn serve_session( if probe_tx.send(req).is_err() { break; // data plane gone } + } else if let Ok(probe) = ClockProbe::decode(&msg) { + // Wall-clock skew handshake: echo the client's t1 with our receive (t2) and + // send (t3) stamps, both in the host clock the AU pts_ns uses. Answered + // inline on the control stream — cheap, no data-plane involvement. + let t2_ns = now_ns(); + let echo = ClockEcho { + t1_ns: probe.t1_ns, + t2_ns, + t3_ns: now_ns(), + }; + if io::write_msg(&mut ctrl_send, &echo.encode()).await.is_err() { + break; + } } else { tracing::warn!("unknown control message — ignoring"); } diff --git a/docs/roadmap.md b/docs/roadmap.md index ba00346..2085c7e 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -295,15 +295,22 @@ buffer; `sendmmsg`/`recvmmsg` batching; the capture-timestamp anchor placement. `sync_channel(3)` with backpressure. Removes the serialization (~2–8 ms @60–120 fps) and is the substrate the slice wrapper needs. Real-NIC soak (host on the Ubuntu/GNOME box, client over the LAN): `send_dropped=0` at 720p60 / 1080p120, and a 1 Gbps probe pushed 625 MB in 5 s clean. +- **Done & live (skew handshake landed 2026-06-12):** **wall-clock skew handshake** — `ClockProbe`/ + `ClockEcho` on the control stream (8 NTP-style rounds right after `Start`; min-RTT sample → + host−client offset; `clock_offset_ns`). The client adds the offset to its receive instant before + differencing against the AU `pts_ns`, so the `capture→reassembled` percentiles are now valid + **across machines** (reported `skew_corrected=true`), not just same-host. Back-compat: an old host + that doesn't answer times out → `skew_corrected=false` (shared-clock assumption, as before). + **Remaining for true glass-to-glass**: the **client present-stamp** (decode→present term) — only + the Apple client presents today, so it needs the connector to expose the offset + an Apple + present-time probe; and the **render→capture** term (compare the PipeWire buffer presentation + timestamp to our capture stamp). `tools/latency-probe` is still the cross-machine orchestrator. - **Bigger bets (ordered, deferred — need real-NIC/GPU/Mac validation):** - 1. **Wall-clock skew handshake + glass-to-glass probe** (`tools/latency-probe`) — measures the two - biggest unmeasured terms (render→capture, decode→present); client present-stamp vs the AU's - `pts_ns` (already attached). - 2. **CUDA stream+event** to drop one of two redundant `cuCtxSynchronize` in `submit_cuda` (keep the + 1. **CUDA stream+event** to drop one of two redundant `cuCtxSynchronize` in `submit_cuda` (keep the copy) — ~0.1–0.4 ms@720p, ~1 ms@5K; only if per-stage timing proves the sync is on the path. - 3. **Stage-2 Apple presenter** (`VTDecompressionSession` → `CAMetalLayer`, hand-paced) — ~0.5 refresh + 2. **Stage-2 Apple presenter** (`VTDecompressionSession` → `CAMetalLayer`, hand-paced) — ~0.5 refresh off the present tail (biggest client win at 60 Hz); gate on the probe proving present is real. - 4. **NVENC slice-mode wrapper** (roadmap §2 sub-frame pipelining) — per-slice transmit overlaps + 3. **NVENC slice-mode wrapper** (roadmap §2 sub-frame pipelining) — per-slice transmit overlaps encode+send within a frame (~3–6 ms at 4K/5K/IDR); large + driver-ABI-fragile, on top of the thread split, only after measurement justifies it. diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index 0361c56..7c0b996 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -166,6 +166,16 @@ #define MSG_PROBE_RESULT 33 #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`ClockProbe`]. +#define MSG_CLOCK_PROBE 48 +#endif + +#if defined(PUNKTFUNK_FEATURE_QUIC) +// Type byte of [`ClockEcho`]. +#define MSG_CLOCK_ECHO 49 +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Type byte of [`PairRequest`]. #define MSG_PAIR_REQUEST 16