diff --git a/crates/punktfunk-client-rs/src/main.rs b/crates/punktfunk-client-rs/src/main.rs index 8c3286a..d239d63 100644 --- a/crates/punktfunk-client-rs/src/main.rs +++ b/crates/punktfunk-client-rs/src/main.rs @@ -45,8 +45,7 @@ use punktfunk_core::config::Role; use punktfunk_core::input::{InputEvent, InputKind}; use punktfunk_core::packet::FLAG_PROBE; use punktfunk_core::quic::{ - endpoint, io, ClockEcho, ClockProbe, Hello, ProbeRequest, ProbeResult, Reconfigure, - Reconfigured, Start, Welcome, + endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome, }; use punktfunk_core::transport::UdpTransport; use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session}; @@ -332,40 +331,6 @@ fn discover(secs: u64) -> Result<()> { Ok(()) } -/// Run the wall-clock skew handshake: `ROUNDS` `ClockProbe`/`ClockEcho` round-trips on the control -/// stream, returning the host−client clock offset (ns) from the minimum-RTT sample, or `None` if the -/// host never answers (an old host — the caller then assumes a shared clock). Each read is bounded so -/// a silent old host can't wedge session start. -async fn clock_sync(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Option { - const ROUNDS: usize = 8; - let read_timeout = std::time::Duration::from_secs(2); - let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS); - for _ in 0..ROUNDS { - let t1 = now_ns(); - let probe = ClockProbe { t1_ns: t1 }.encode(); - if io::write_msg(send, &probe).await.is_err() { - break; - } - let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await; - let echo = match read { - Ok(Ok(b)) => match ClockEcho::decode(&b) { - Ok(e) => e, - Err(_) => break, // not a ClockEcho -> give up on skew - }, - _ => break, // timeout or stream error -> old host / no skew support - }; - samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns())); - } - let (offset, rtt) = punktfunk_core::quic::clock_offset_ns(&samples)?; - tracing::info!( - offset_ns = offset, - rtt_us = rtt / 1000, - rounds = samples.len(), - "clock skew estimated (host-client); latency now cross-machine valid" - ); - Some(offset) -} - async fn session(args: Args) -> Result<()> { let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?; let identity = load_or_create_identity()?; @@ -430,7 +395,18 @@ async fn session(args: Args) -> Result<()> { // Wall-clock skew handshake on the still-private control stream (before --remode/--speed-test // take it): align our clock to the host's so the per-frame capture→reassembled latency is valid // across machines. `None` ⇒ an old host that doesn't answer — fall back to a shared clock (0). - let clock_offset_ns = clock_sync(&mut send, &mut recv).await; + let clock_offset_ns = match punktfunk_core::quic::clock_sync(&mut send, &mut recv).await { + Some(skew) => { + tracing::info!( + offset_ns = skew.offset_ns, + rtt_us = skew.rtt_ns / 1000, + rounds = skew.rounds, + "clock skew estimated (host-client); latency now cross-machine valid" + ); + Some(skew.offset_ns) + } + None => None, + }; // Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the // --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold diff --git a/crates/punktfunk-core/src/abi.rs b/crates/punktfunk-core/src/abi.rs index ff159f0..5fca6be 100644 --- a/crates/punktfunk-core/src/abi.rs +++ b/crates/punktfunk-core/src/abi.rs @@ -1320,6 +1320,36 @@ pub unsafe extern "C" fn punktfunk_connection_bitrate( }) } +/// The host↔client wall-clock offset (nanoseconds, **host minus client**) measured by the +/// connect-time skew handshake. Add it to a local receive/present timestamp (same realtime clock, +/// `CLOCK_REALTIME` / `gettimeofday`-epoch nanoseconds) to express that instant in the host's +/// capture clock — the clock the per-access-unit `pts_ns` is stamped in — so glass-to-glass latency +/// (e.g. present-time minus `pts_ns`) is valid across machines. `0` = no correction: either an older +/// host that didn't answer the handshake, or genuinely synchronized clocks. Safe any time after +/// connect. +/// +/// # Safety +/// `c` is a valid connection handle; `offset_ns` is writable (NULL is skipped). +#[cfg(feature = "quic")] +#[no_mangle] +pub unsafe extern "C" fn punktfunk_connection_clock_offset_ns( + c: *const PunktfunkConnection, + offset_ns: *mut i64, +) -> PunktfunkStatus { + guard(|| { + let c = match unsafe { c.as_ref() } { + Some(c) => c, + None => return PunktfunkStatus::NullPointer, + }; + unsafe { + if !offset_ns.is_null() { + *offset_ns = c.inner.clock_offset_ns; + } + } + PunktfunkStatus::Ok + }) +} + /// Ask the host to switch the live session to `width`x`height`@`refresh_hz` without /// reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the /// stream continues at the new mode — the first new-mode access unit is an IDR with diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index 9eb6dd5..eb56da2 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -35,9 +35,10 @@ enum CtrlRequest { } /// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated -/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, and the resolved -/// encoder bitrate (kbps). -type Negotiated = (Mode, GamepadPref, [u8; 32], u32); +/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, the resolved +/// encoder bitrate (kbps), and the host↔client clock offset (ns, host minus client; 0 = no skew +/// correction / an old host that didn't answer the handshake). +type Negotiated = (Mode, GamepadPref, [u8; 32], u32, i64); /// Accumulated state of an in-flight / finished speed test. The data-plane pump folds each /// received [`FLAG_PROBE`] access unit in; the control task records the host's [`ProbeResult`] @@ -136,6 +137,11 @@ pub struct NativeClient { /// requested rate clamped to the host's range, or its default if we requested `0`. `0` = an /// older host that didn't report it. pub resolved_bitrate_kbps: u32, + /// Host clock minus client clock (ns), from the connect-time skew handshake. Add it to a local + /// receive/present timestamp to express it in the host's capture clock (the AU `pts_ns`), making + /// glass-to-glass latency valid across machines. `0` = no correction (an old host that didn't + /// answer, or genuinely synced clocks). + pub clock_offset_ns: i64, } impl NativeClient { @@ -218,7 +224,7 @@ impl NativeClient { }) .map_err(PunktfunkError::Io)?; - let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps) = + let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps, clock_offset_ns) = match ready_rx.recv_timeout(timeout) { Ok(Ok(t)) => t, Ok(Err(e)) => return Err(e), @@ -244,6 +250,7 @@ impl NativeClient { host_fingerprint: fingerprint, resolved_gamepad, resolved_bitrate_kbps, + clock_offset_ns, }) } @@ -604,6 +611,23 @@ async fn worker_main(args: WorkerArgs) { ) .await?; + // Wall-clock skew handshake on the control stream (before the session's control task takes + // it): align our clock to the host's so the embedder can express receive/present instants in + // the host's capture clock (the AU `pts_ns`). 0 ⇒ an old host that didn't answer (shared-clock + // assumption, as before). This is the substrate for glass-to-glass present-time measurement. + let clock_offset_ns = match crate::quic::clock_sync(&mut send, &mut recv).await { + Some(skew) => { + tracing::info!( + offset_ns = skew.offset_ns, + rtt_us = skew.rtt_ns / 1000, + rounds = skew.rounds, + "clock skew estimated (host-client)" + ); + skew.offset_ns + } + None => 0, + }; + let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port); let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())?; @@ -617,6 +641,7 @@ async fn worker_main(args: WorkerArgs) { welcome.gamepad, fingerprint, welcome.bitrate_kbps, + clock_offset_ns, )) }; @@ -629,6 +654,7 @@ async fn worker_main(args: WorkerArgs) { resolved_gamepad, fingerprint, resolved_bitrate_kbps, + clock_offset_ns, ) = match setup.await { Ok(t) => t, Err(e) => { @@ -641,6 +667,7 @@ async fn worker_main(args: WorkerArgs) { resolved_gamepad, fingerprint, resolved_bitrate_kbps, + clock_offset_ns, ))); // Input task: embedder events → QUIC datagrams. diff --git a/crates/punktfunk-core/src/quic.rs b/crates/punktfunk-core/src/quic.rs index 9f5e25b..12c4dd3 100644 --- a/crates/punktfunk-core/src/quic.rs +++ b/crates/punktfunk-core/src/quic.rs @@ -1005,6 +1005,59 @@ pub mod io { } } +/// One wall-clock skew-handshake outcome (see [`clock_sync`]). +pub struct ClockSkew { + /// Host clock minus client clock, ns: add it to a client timestamp to express it in host time. + pub offset_ns: i64, + /// Round-trip time of the minimum-RTT sample, ns. + pub rtt_ns: u64, + /// How many probe rounds the host answered. + pub rounds: usize, +} + +/// Run the wall-clock skew handshake from the client side over the (already-open) control stream: +/// `ROUNDS` [`ClockProbe`]/[`ClockEcho`] round-trips, returning the host↔client offset from the +/// minimum-RTT sample. `None` if the host never answers (an old host) — the caller then assumes a +/// shared clock. Each read is bounded so a silent host can't wedge session start. Shared by the +/// reference client and the embeddable connector; uses the realtime clock the host stamps `pts_ns` +/// with, so the offset aligns a client receive instant to the host's capture clock. +pub async fn clock_sync( + send: &mut quinn::SendStream, + recv: &mut quinn::RecvStream, +) -> Option { + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + fn now_ns() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) + } + const ROUNDS: usize = 8; + let read_timeout = Duration::from_secs(2); + let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS); + for _ in 0..ROUNDS { + let t1 = now_ns(); + let probe = ClockProbe { t1_ns: t1 }.encode(); + if io::write_msg(send, &probe).await.is_err() { + break; + } + let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await; + let echo = match read { + Ok(Ok(b)) => match ClockEcho::decode(&b) { + Ok(e) => e, + Err(_) => break, + }, + _ => break, // timeout or stream error -> old host / no skew support + }; + samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns())); + } + clock_offset_ns(&samples).map(|(offset_ns, rtt_ns)| ClockSkew { + offset_ns, + rtt_ns, + rounds: samples.len(), + }) +} + /// quinn endpoint constructors. Host: self-signed identity (fresh, or persisted PEMs via /// [`endpoint::server_with_identity`]). Client: fingerprint pinning / TOFU via /// [`endpoint::client_pinned`] ([`endpoint::client_insecure`] is the no-pin special case). diff --git a/docs-site/content/docs/roadmap.md b/docs-site/content/docs/roadmap.md index 42c1bb0..392096f 100644 --- a/docs-site/content/docs/roadmap.md +++ b/docs-site/content/docs/roadmap.md @@ -306,11 +306,14 @@ buffer; `sendmmsg`/`recvmmsg` batching; the capture-timestamp anchor placement. **across machines** (reported `skew_corrected=true`), not just same-host. Back-compat: an old host that doesn't answer times out → `skew_corrected=false` (shared-clock assumption, as before). Validated cross-LAN (GNOME box → dev box): offset ≈ −1.57 ms (reproducible), rtt ~140 µs, **p50 - 1.30 ms** skew-corrected capture→reassembled. **Remaining for true glass-to-glass**: the **client - present-stamp** (decode→present term) — only the Apple client presents today, so it needs the - connector to expose the offset + an Apple present-time probe; and the **render→capture** term - (PipeWire buffer presentation timestamp vs our capture stamp). `tools/latency-probe` is still the - cross-machine orchestrator. + 1.30 ms** skew-corrected capture→reassembled. The skew handshake is now a shared core helper + (`quic::clock_sync` → `ClockSkew`) used by both the reference client and the **embeddable + connector** — `NativeClient` runs it at connect and exposes the offset over the C ABI + (`punktfunk_connection_clock_offset_ns`), so the Apple client can convert a present instant to the + host clock. **Remaining for true glass-to-glass**: (1) the **Apple client present-stamp** + (decode→present) — Swift: stamp `AVSampleBufferDisplayLayer`/presenter time, add the C-ABI offset, + subtract the AU `pts_ns`; (2) the host **render→capture** term (PipeWire buffer presentation + timestamp vs our capture stamp). `tools/latency-probe` is still the cross-machine orchestrator. - **Bigger bets (ordered, deferred — need real-NIC/GPU/Mac validation):** 1. **CUDA stream+event** to drop one of two redundant `cuCtxSynchronize` in `submit_cuda` (keep the copy) — ~0.1–0.4 ms@720p, ~1 ms@5K; only if per-stage timing proves the sync is on the path. diff --git a/docs-site/content/docs/status.md b/docs-site/content/docs/status.md index bee2ada..30596ce 100644 --- a/docs-site/content/docs/status.md +++ b/docs-site/content/docs/status.md @@ -29,6 +29,10 @@ All three appliances advertise over mDNS (`_punktfunk._udp`) and require PIN pai ## Progress log ### 2026-06-12 +- **Skew handshake in the connector + C ABI** — `quic::clock_sync` is now a shared core helper used + by both the reference client and `NativeClient`; the connector runs it at connect and exposes the + host clock offset over the C ABI (`punktfunk_connection_clock_offset_ns`). This is the substrate + the Apple client needs for the decode→present (glass-to-glass) term. - **Wall-clock skew handshake** (`ClockProbe`/`ClockEcho`, 8 NTP rounds after `Start`) — makes the client's capture→reassembled latency valid **cross-machine**. Validated GNOME box → dev box: offset −1.57 ms removed, **p50 1.30 ms** skew-corrected. (`05bc9ab`) diff --git a/include/punktfunk_core.h b/include/punktfunk_core.h index 7c0b996..5256459 100644 --- a/include/punktfunk_core.h +++ b/include/punktfunk_core.h @@ -797,6 +797,21 @@ PunktfunkStatus punktfunk_connection_gamepad(const PunktfunkConnection *c, uint3 PunktfunkStatus punktfunk_connection_bitrate(const PunktfunkConnection *c, uint32_t *bitrate_kbps); #endif +#if defined(PUNKTFUNK_FEATURE_QUIC) +// The host↔client wall-clock offset (nanoseconds, **host minus client**) measured by the +// connect-time skew handshake. Add it to a local receive/present timestamp (same realtime clock, +// `CLOCK_REALTIME` / `gettimeofday`-epoch nanoseconds) to express that instant in the host's +// capture clock — the clock the per-access-unit `pts_ns` is stamped in — so glass-to-glass latency +// (e.g. present-time minus `pts_ns`) is valid across machines. `0` = no correction: either an older +// host that didn't answer the handshake, or genuinely synchronized clocks. Safe any time after +// connect. +// +// # Safety +// `c` is a valid connection handle; `offset_ns` is writable (NULL is skipped). +PunktfunkStatus punktfunk_connection_clock_offset_ns(const PunktfunkConnection *c, + int64_t *offset_ns); +#endif + #if defined(PUNKTFUNK_FEATURE_QUIC) // Ask the host to switch the live session to `width`x`height`@`refresh_hz` without // reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the