feat(latency): wall-clock skew handshake for cross-machine latency measurement
ci / rust (push) Has been cancelled

ClockProbe/ClockEcho on the QUIC control stream — 8 NTP-style rounds right after
Start; the min-RTT sample gives the host-client clock offset (clock_offset_ns
estimator in punktfunk-core). The client adds the offset to its receive instant
before differencing against the AU pts_ns, so the capture->reassembled latency
percentiles are valid across machines (skew_corrected=true), not just same-host.
Back-compat: an old host that doesn't answer the probe times out and the client
falls back to a shared-clock assumption (skew_corrected=false).

Host adds one ClockProbe dispatch arm in the control task; the client runs
clock_sync after Start, before the --remode/--speed-test tasks take the stream.

Validated cross-LAN (GNOME box -> dev box): offset ~ -1.57 ms (reproducible),
rtt ~140 us, p50 1.30 ms skew-corrected capture->reassembled — the offset is
exactly the systematic error the handshake removes. Unit tests for the message
codecs and the min-RTT offset estimator.

Roadmap §12: skew handshake done; remaining for true glass-to-glass is the Apple
client present-stamp (decode->present) plus the host render->capture term.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-12 11:20:20 +00:00
parent 50c9db785a
commit 05bc9ab22c
6 changed files with 225 additions and 13 deletions
+54 -4
View File
@@ -45,7 +45,8 @@ use punktfunk_core::config::Role;
use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::FLAG_PROBE;
use punktfunk_core::quic::{
endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
endpoint, io, ClockEcho, ClockProbe, Hello, ProbeRequest, ProbeResult, Reconfigure,
Reconfigured, Start, Welcome,
};
use punktfunk_core::transport::UdpTransport;
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
@@ -331,6 +332,40 @@ fn discover(secs: u64) -> Result<()> {
Ok(())
}
/// Run the wall-clock skew handshake: `ROUNDS` `ClockProbe`/`ClockEcho` round-trips on the control
/// stream, returning the hostclient clock offset (ns) from the minimum-RTT sample, or `None` if the
/// host never answers (an old host — the caller then assumes a shared clock). Each read is bounded so
/// a silent old host can't wedge session start.
async fn clock_sync(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Option<i64> {
const ROUNDS: usize = 8;
let read_timeout = std::time::Duration::from_secs(2);
let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS);
for _ in 0..ROUNDS {
let t1 = now_ns();
let probe = ClockProbe { t1_ns: t1 }.encode();
if io::write_msg(send, &probe).await.is_err() {
break;
}
let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await;
let echo = match read {
Ok(Ok(b)) => match ClockEcho::decode(&b) {
Ok(e) => e,
Err(_) => break, // not a ClockEcho -> give up on skew
},
_ => break, // timeout or stream error -> old host / no skew support
};
samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns()));
}
let (offset, rtt) = punktfunk_core::quic::clock_offset_ns(&samples)?;
tracing::info!(
offset_ns = offset,
rtt_us = rtt / 1000,
rounds = samples.len(),
"clock skew estimated (host-client); latency now cross-machine valid"
);
Some(offset)
}
async fn session(args: Args) -> Result<()> {
let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?;
let identity = load_or_create_identity()?;
@@ -392,6 +427,11 @@ async fn session(args: Args) -> Result<()> {
)
.await?;
// Wall-clock skew handshake on the still-private control stream (before --remode/--speed-test
// take it): align our clock to the host's so the per-frame capture→reassembled latency is valid
// across machines. `None` ⇒ an old host that doesn't answer — fall back to a shared clock (0).
let clock_offset_ns = clock_sync(&mut send, &mut recv).await;
// Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the
// --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold
// now_ns timestamps of the receive window (0 = unset).
@@ -761,6 +801,12 @@ async fn session(args: Args) -> Result<()> {
probe_last_ns.clone(),
);
// Express our receive time in the host clock before differencing against the host-stamped
// capture pts. 0 ⇒ same-host or an old host that didn't answer the skew handshake (the latency
// is then only valid same-host, as before).
let clock_offset = clock_offset_ns.unwrap_or(0);
let skew_corrected = clock_offset_ns.is_some();
// Data plane on a blocking thread (native threads only on the frame path).
let result = tokio::task::spawn_blocking(move || -> Result<()> {
let transport =
@@ -810,8 +856,10 @@ async fn session(args: Args) -> Result<()> {
continue;
}
bytes += frame.data.len() as u64;
// The host stamps pts with its capture wall clock; same-host runs share it.
let lat = now_ns().saturating_sub(frame.pts_ns);
// capture→reassembled: our receive instant in the host clock (now + offset)
// minus the host's capture pts. offset is 0 same-host / old host.
let lat = (now_ns() as i128 + clock_offset as i128 - frame.pts_ns as i128)
.max(0) as u64;
if lat > 0 && lat < 10_000_000_000 {
latencies_us.push(lat / 1000);
}
@@ -856,7 +904,9 @@ async fn session(args: Args) -> Result<()> {
lat_p95_us = pct(0.95),
lat_p99_us = pct(0.99),
lat_max_us = latencies_us.last().copied().unwrap_or(0),
"punktfunk/1 stream complete (capture→reassembled latency, same-host clock)"
skew_corrected,
"punktfunk/1 stream complete (capture→reassembled latency; skew_corrected=true ⇒ \
cross-machine valid, false ⇒ same-host clock)"
);
if expected > 0 {
anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames");
+128
View File
@@ -144,6 +144,44 @@ pub struct ProbeResult {
pub duration_ms: u32,
}
/// `client → host`, right after [`Start`]: one round of the wall-clock skew handshake. The client
/// stamps `t1_ns` (its monotonic-since-epoch clock) and sends; the host echoes it in [`ClockEcho`]
/// with its own receive/send stamps. A few rounds let the client estimate the host↔client clock
/// offset, so the per-frame `capture→reassembled` latency (the AU `pts_ns` is the host's capture
/// clock) is meaningful across machines, not just same-host. An old host ignores it (the client
/// times out and assumes a shared clock).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ClockProbe {
pub t1_ns: u64,
}
/// `host → client`: answer to [`ClockProbe`]. `t2_ns` is when the host received the probe and
/// `t3_ns` when it sent this echo (both the host clock); `t1_ns` is the client's send stamp echoed
/// back. With the client's receive time `t4`, offset = ((t2t1)+(t3t4))/2 (host minus client) and
/// RTT = (t4t1)(t3t2). See [`clock_offset_ns`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ClockEcho {
pub t1_ns: u64,
pub t2_ns: u64,
pub t3_ns: u64,
}
/// Estimate the host↔client clock offset (**host minus client**, ns) and RTT (ns) from skew-handshake
/// samples `(t1, t2, t3, t4)` — NTP's formula, taking the **minimum-RTT** sample (least queuing
/// noise; also discards the first round's host-setup latency). Offset is positive when the host
/// clock is ahead of the client's; add it to a client timestamp to express it in the host clock.
/// Returns `None` for an empty sample set.
pub fn clock_offset_ns(samples: &[(u64, u64, u64, u64)]) -> Option<(i64, u64)> {
samples
.iter()
.map(|&(t1, t2, t3, t4)| {
let rtt = ((t4 as i128 - t1 as i128) - (t3 as i128 - t2 as i128)).max(0) as u64;
let offset = (((t2 as i128 - t1 as i128) + (t3 as i128 - t4 as i128)) / 2) as i64;
(offset, rtt)
})
.min_by_key(|&(_, rtt)| rtt)
}
/// Type byte of [`Reconfigure`] (first byte after the magic).
pub const MSG_RECONFIGURE: u8 = 0x01;
/// Type byte of [`Reconfigured`].
@@ -152,6 +190,10 @@ pub const MSG_RECONFIGURED: u8 = 0x02;
pub const MSG_PROBE_REQUEST: u8 = 0x20;
/// Type byte of [`ProbeResult`].
pub const MSG_PROBE_RESULT: u8 = 0x21;
/// Type byte of [`ClockProbe`].
pub const MSG_CLOCK_PROBE: u8 = 0x30;
/// Type byte of [`ClockEcho`].
pub const MSG_CLOCK_ECHO: u8 = 0x31;
// ---------------------------------------------------------------------------------------------
// Pairing ceremony (typed control messages): instead of a session Hello, a client may open
@@ -668,6 +710,50 @@ impl ProbeResult {
}
}
impl ClockProbe {
pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] t1[5..13]
let mut b = Vec::with_capacity(13);
b.extend_from_slice(CTL_MAGIC);
b.push(MSG_CLOCK_PROBE);
b.extend_from_slice(&self.t1_ns.to_le_bytes());
b
}
pub fn decode(b: &[u8]) -> Result<ClockProbe> {
if b.len() != 13 || &b[0..4] != CTL_MAGIC || b[4] != MSG_CLOCK_PROBE {
return Err(PunktfunkError::InvalidArg("bad ClockProbe"));
}
Ok(ClockProbe {
t1_ns: u64::from_le_bytes(b[5..13].try_into().unwrap()),
})
}
}
impl ClockEcho {
pub fn encode(&self) -> Vec<u8> {
// magic[0..4] type[4] t1[5..13] t2[13..21] t3[21..29]
let mut b = Vec::with_capacity(29);
b.extend_from_slice(CTL_MAGIC);
b.push(MSG_CLOCK_ECHO);
b.extend_from_slice(&self.t1_ns.to_le_bytes());
b.extend_from_slice(&self.t2_ns.to_le_bytes());
b.extend_from_slice(&self.t3_ns.to_le_bytes());
b
}
pub fn decode(b: &[u8]) -> Result<ClockEcho> {
if b.len() != 29 || &b[0..4] != CTL_MAGIC || b[4] != MSG_CLOCK_ECHO {
return Err(PunktfunkError::InvalidArg("bad ClockEcho"));
}
Ok(ClockEcho {
t1_ns: u64::from_le_bytes(b[5..13].try_into().unwrap()),
t2_ns: u64::from_le_bytes(b[13..21].try_into().unwrap()),
t3_ns: u64::from_le_bytes(b[21..29].try_into().unwrap()),
})
}
}
/// Frame a message for the control stream: `u16 LE length || payload`.
pub fn frame(payload: &[u8]) -> Vec<u8> {
let mut b = Vec::with_capacity(2 + payload.len());
@@ -1434,6 +1520,48 @@ mod tests {
assert!(ProbeResult::decode(&req.encode()).is_err());
}
#[test]
fn clock_messages_roundtrip() {
let probe = ClockProbe {
t1_ns: 1_700_000_000_123,
};
assert_eq!(ClockProbe::decode(&probe.encode()).unwrap(), probe);
let echo = ClockEcho {
t1_ns: 1_700_000_000_123,
t2_ns: 1_700_000_050_456,
t3_ns: 1_700_000_050_789,
};
assert_eq!(ClockEcho::decode(&echo.encode()).unwrap(), echo);
// Disjoint from the other control messages (distinct type bytes).
assert!(ClockProbe::decode(&echo.encode()).is_err());
assert!(ProbeRequest::decode(&probe.encode()).is_err());
assert!(ClockEcho::decode(&probe.encode()).is_err());
}
#[test]
fn clock_offset_picks_min_rtt_and_recovers_offset() {
// Host clock is +1_000_000 ns ahead of the client. Construct samples where a symmetric
// round-trip recovers exactly that offset, and a noisy (asymmetric, high-RTT) sample is
// present but must be ignored by the min-RTT selection.
const OFF: i64 = 1_000_000;
// Clean sample: client t1=0, one-way=200µs each way → t2 = t1 + 200_000 + OFF (host clock),
// t3 = t2 + 50_000 (host processing), t4 = t3 - OFF + 200_000 (back in client clock).
let t1 = 0u64;
let t2 = (t1 as i64 + 200_000 + OFF) as u64;
let t3 = t2 + 50_000;
let t4 = (t3 as i64 - OFF + 200_000) as u64;
// Noisy sample: same offset but a fat, asymmetric RTT (slow return path) — higher RTT.
let n1 = 1_000_000u64;
let n2 = (n1 as i64 + 200_000 + OFF) as u64;
let n3 = n2 + 50_000;
let n4 = (n3 as i64 - OFF + 5_000_000) as u64; // 5 ms return → big RTT
let (offset, rtt) =
clock_offset_ns(&[(n1, n2, n3, n4), (t1, t2, t3, t4)]).expect("non-empty");
assert_eq!(offset, OFF, "min-RTT sample recovers the offset exactly");
assert_eq!(rtt, 400_000, "min-RTT sample's RTT (2x200us), not the noisy 5ms one");
assert!(clock_offset_ns(&[]).is_none());
}
#[test]
fn control_messages_disjoint_from_hello() {
// A Hello uses MAGIC (PKF1); control messages use CTL_MAGIC (PKFc). No Hello — at
+15 -2
View File
@@ -27,8 +27,8 @@ use punktfunk_core::config::{CompositorPref, FecConfig, FecScheme, GamepadPref,
use punktfunk_core::input::{InputEvent, InputKind};
use punktfunk_core::packet::{FLAG_PIC, FLAG_PROBE, FLAG_SOF};
use punktfunk_core::quic::{
endpoint, io, Hello, PairChallenge, PairProof, PairRequest, PairResult, ProbeRequest,
ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
endpoint, io, ClockEcho, ClockProbe, Hello, PairChallenge, PairProof, PairRequest, PairResult,
ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
};
use punktfunk_core::transport::UdpTransport;
use punktfunk_core::Session;
@@ -545,6 +545,19 @@ async fn serve_session(
if probe_tx.send(req).is_err() {
break; // data plane gone
}
} else if let Ok(probe) = ClockProbe::decode(&msg) {
// Wall-clock skew handshake: echo the client's t1 with our receive (t2) and
// send (t3) stamps, both in the host clock the AU pts_ns uses. Answered
// inline on the control stream — cheap, no data-plane involvement.
let t2_ns = now_ns();
let echo = ClockEcho {
t1_ns: probe.t1_ns,
t2_ns,
t3_ns: now_ns(),
};
if io::write_msg(&mut ctrl_send, &echo.encode()).await.is_err() {
break;
}
} else {
tracing::warn!("unknown control message — ignoring");
}