feat(connector): expose host clock offset over the C ABI for glass-to-glass
ci / rust (push) Has been cancelled
ci / rust (push) Has been cancelled
Factor the client-side skew handshake into a shared core helper (quic::clock_sync -> ClockSkew) so both the reference client and the embeddable connector use one implementation. NativeClient now runs the handshake at connect (right after Start, before the control task takes the stream) and stores the host-client offset; it's read over the C ABI via punktfunk_connection_clock_offset_ns (i64 ns, host minus client; 0 = no correction / old host). This is the substrate the Apple client needs for the decode->present (glass-to- glass) term: stamp present time, add the offset to express it in the host's capture clock, subtract the AU pts_ns. client-rs drops its local clock_sync copy and uses the shared helper (behavior unchanged; validated locally). Regenerates include/punktfunk_core.h. Roadmap section 12 + status updated. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -45,8 +45,7 @@ use punktfunk_core::config::Role;
|
||||
use punktfunk_core::input::{InputEvent, InputKind};
|
||||
use punktfunk_core::packet::FLAG_PROBE;
|
||||
use punktfunk_core::quic::{
|
||||
endpoint, io, ClockEcho, ClockProbe, Hello, ProbeRequest, ProbeResult, Reconfigure,
|
||||
Reconfigured, Start, Welcome,
|
||||
endpoint, io, Hello, ProbeRequest, ProbeResult, Reconfigure, Reconfigured, Start, Welcome,
|
||||
};
|
||||
use punktfunk_core::transport::UdpTransport;
|
||||
use punktfunk_core::{CompositorPref, Mode, PunktfunkError, Session};
|
||||
@@ -332,40 +331,6 @@ fn discover(secs: u64) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the wall-clock skew handshake: `ROUNDS` `ClockProbe`/`ClockEcho` round-trips on the control
|
||||
/// stream, returning the host−client clock offset (ns) from the minimum-RTT sample, or `None` if the
|
||||
/// host never answers (an old host — the caller then assumes a shared clock). Each read is bounded so
|
||||
/// a silent old host can't wedge session start.
|
||||
async fn clock_sync(send: &mut quinn::SendStream, recv: &mut quinn::RecvStream) -> Option<i64> {
|
||||
const ROUNDS: usize = 8;
|
||||
let read_timeout = std::time::Duration::from_secs(2);
|
||||
let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS);
|
||||
for _ in 0..ROUNDS {
|
||||
let t1 = now_ns();
|
||||
let probe = ClockProbe { t1_ns: t1 }.encode();
|
||||
if io::write_msg(send, &probe).await.is_err() {
|
||||
break;
|
||||
}
|
||||
let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await;
|
||||
let echo = match read {
|
||||
Ok(Ok(b)) => match ClockEcho::decode(&b) {
|
||||
Ok(e) => e,
|
||||
Err(_) => break, // not a ClockEcho -> give up on skew
|
||||
},
|
||||
_ => break, // timeout or stream error -> old host / no skew support
|
||||
};
|
||||
samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns()));
|
||||
}
|
||||
let (offset, rtt) = punktfunk_core::quic::clock_offset_ns(&samples)?;
|
||||
tracing::info!(
|
||||
offset_ns = offset,
|
||||
rtt_us = rtt / 1000,
|
||||
rounds = samples.len(),
|
||||
"clock skew estimated (host-client); latency now cross-machine valid"
|
||||
);
|
||||
Some(offset)
|
||||
}
|
||||
|
||||
async fn session(args: Args) -> Result<()> {
|
||||
let remote: std::net::SocketAddr = args.connect.parse().context("--connect host:port")?;
|
||||
let identity = load_or_create_identity()?;
|
||||
@@ -430,7 +395,18 @@ async fn session(args: Args) -> Result<()> {
|
||||
// Wall-clock skew handshake on the still-private control stream (before --remode/--speed-test
|
||||
// take it): align our clock to the host's so the per-frame capture→reassembled latency is valid
|
||||
// across machines. `None` ⇒ an old host that doesn't answer — fall back to a shared clock (0).
|
||||
let clock_offset_ns = clock_sync(&mut send, &mut recv).await;
|
||||
let clock_offset_ns = match punktfunk_core::quic::clock_sync(&mut send, &mut recv).await {
|
||||
Some(skew) => {
|
||||
tracing::info!(
|
||||
offset_ns = skew.offset_ns,
|
||||
rtt_us = skew.rtt_ns / 1000,
|
||||
rounds = skew.rounds,
|
||||
"clock skew estimated (host-client); latency now cross-machine valid"
|
||||
);
|
||||
Some(skew.offset_ns)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Speed-test accumulators: the data-plane loop folds each FLAG_PROBE filler AU in here; the
|
||||
// --speed-test reporter below reads them once the host's ProbeResult lands. first/last hold
|
||||
|
||||
@@ -1320,6 +1320,36 @@ pub unsafe extern "C" fn punktfunk_connection_bitrate(
|
||||
})
|
||||
}
|
||||
|
||||
/// The host↔client wall-clock offset (nanoseconds, **host minus client**) measured by the
|
||||
/// connect-time skew handshake. Add it to a local receive/present timestamp (same realtime clock,
|
||||
/// `CLOCK_REALTIME` / `gettimeofday`-epoch nanoseconds) to express that instant in the host's
|
||||
/// capture clock — the clock the per-access-unit `pts_ns` is stamped in — so glass-to-glass latency
|
||||
/// (e.g. present-time minus `pts_ns`) is valid across machines. `0` = no correction: either an older
|
||||
/// host that didn't answer the handshake, or genuinely synchronized clocks. Safe any time after
|
||||
/// connect.
|
||||
///
|
||||
/// # Safety
|
||||
/// `c` is a valid connection handle; `offset_ns` is writable (NULL is skipped).
|
||||
#[cfg(feature = "quic")]
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn punktfunk_connection_clock_offset_ns(
|
||||
c: *const PunktfunkConnection,
|
||||
offset_ns: *mut i64,
|
||||
) -> PunktfunkStatus {
|
||||
guard(|| {
|
||||
let c = match unsafe { c.as_ref() } {
|
||||
Some(c) => c,
|
||||
None => return PunktfunkStatus::NullPointer,
|
||||
};
|
||||
unsafe {
|
||||
if !offset_ns.is_null() {
|
||||
*offset_ns = c.inner.clock_offset_ns;
|
||||
}
|
||||
}
|
||||
PunktfunkStatus::Ok
|
||||
})
|
||||
}
|
||||
|
||||
/// Ask the host to switch the live session to `width`x`height`@`refresh_hz` without
|
||||
/// reconnecting (window resized, refresh changed). Non-blocking enqueue: on acceptance the
|
||||
/// stream continues at the new mode — the first new-mode access unit is an IDR with
|
||||
|
||||
@@ -35,9 +35,10 @@ enum CtrlRequest {
|
||||
}
|
||||
|
||||
/// What the worker reports to [`NativeClient::connect`] once the handshake lands: the negotiated
|
||||
/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, and the resolved
|
||||
/// encoder bitrate (kbps).
|
||||
type Negotiated = (Mode, GamepadPref, [u8; 32], u32);
|
||||
/// mode, the host-resolved gamepad backend, the host's certificate fingerprint, the resolved
|
||||
/// encoder bitrate (kbps), and the host↔client clock offset (ns, host minus client; 0 = no skew
|
||||
/// correction / an old host that didn't answer the handshake).
|
||||
type Negotiated = (Mode, GamepadPref, [u8; 32], u32, i64);
|
||||
|
||||
/// Accumulated state of an in-flight / finished speed test. The data-plane pump folds each
|
||||
/// received [`FLAG_PROBE`] access unit in; the control task records the host's [`ProbeResult`]
|
||||
@@ -136,6 +137,11 @@ pub struct NativeClient {
|
||||
/// requested rate clamped to the host's range, or its default if we requested `0`. `0` = an
|
||||
/// older host that didn't report it.
|
||||
pub resolved_bitrate_kbps: u32,
|
||||
/// Host clock minus client clock (ns), from the connect-time skew handshake. Add it to a local
|
||||
/// receive/present timestamp to express it in the host's capture clock (the AU `pts_ns`), making
|
||||
/// glass-to-glass latency valid across machines. `0` = no correction (an old host that didn't
|
||||
/// answer, or genuinely synced clocks).
|
||||
pub clock_offset_ns: i64,
|
||||
}
|
||||
|
||||
impl NativeClient {
|
||||
@@ -218,7 +224,7 @@ impl NativeClient {
|
||||
})
|
||||
.map_err(PunktfunkError::Io)?;
|
||||
|
||||
let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps) =
|
||||
let (negotiated, resolved_gamepad, fingerprint, resolved_bitrate_kbps, clock_offset_ns) =
|
||||
match ready_rx.recv_timeout(timeout) {
|
||||
Ok(Ok(t)) => t,
|
||||
Ok(Err(e)) => return Err(e),
|
||||
@@ -244,6 +250,7 @@ impl NativeClient {
|
||||
host_fingerprint: fingerprint,
|
||||
resolved_gamepad,
|
||||
resolved_bitrate_kbps,
|
||||
clock_offset_ns,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -604,6 +611,23 @@ async fn worker_main(args: WorkerArgs) {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Wall-clock skew handshake on the control stream (before the session's control task takes
|
||||
// it): align our clock to the host's so the embedder can express receive/present instants in
|
||||
// the host's capture clock (the AU `pts_ns`). 0 ⇒ an old host that didn't answer (shared-clock
|
||||
// assumption, as before). This is the substrate for glass-to-glass present-time measurement.
|
||||
let clock_offset_ns = match crate::quic::clock_sync(&mut send, &mut recv).await {
|
||||
Some(skew) => {
|
||||
tracing::info!(
|
||||
offset_ns = skew.offset_ns,
|
||||
rtt_us = skew.rtt_ns / 1000,
|
||||
rounds = skew.rounds,
|
||||
"clock skew estimated (host-client)"
|
||||
);
|
||||
skew.offset_ns
|
||||
}
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port);
|
||||
let transport =
|
||||
UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())?;
|
||||
@@ -617,6 +641,7 @@ async fn worker_main(args: WorkerArgs) {
|
||||
welcome.gamepad,
|
||||
fingerprint,
|
||||
welcome.bitrate_kbps,
|
||||
clock_offset_ns,
|
||||
))
|
||||
};
|
||||
|
||||
@@ -629,6 +654,7 @@ async fn worker_main(args: WorkerArgs) {
|
||||
resolved_gamepad,
|
||||
fingerprint,
|
||||
resolved_bitrate_kbps,
|
||||
clock_offset_ns,
|
||||
) = match setup.await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
@@ -641,6 +667,7 @@ async fn worker_main(args: WorkerArgs) {
|
||||
resolved_gamepad,
|
||||
fingerprint,
|
||||
resolved_bitrate_kbps,
|
||||
clock_offset_ns,
|
||||
)));
|
||||
|
||||
// Input task: embedder events → QUIC datagrams.
|
||||
|
||||
@@ -1005,6 +1005,59 @@ pub mod io {
|
||||
}
|
||||
}
|
||||
|
||||
/// One wall-clock skew-handshake outcome (see [`clock_sync`]).
|
||||
pub struct ClockSkew {
|
||||
/// Host clock minus client clock, ns: add it to a client timestamp to express it in host time.
|
||||
pub offset_ns: i64,
|
||||
/// Round-trip time of the minimum-RTT sample, ns.
|
||||
pub rtt_ns: u64,
|
||||
/// How many probe rounds the host answered.
|
||||
pub rounds: usize,
|
||||
}
|
||||
|
||||
/// Run the wall-clock skew handshake from the client side over the (already-open) control stream:
|
||||
/// `ROUNDS` [`ClockProbe`]/[`ClockEcho`] round-trips, returning the host↔client offset from the
|
||||
/// minimum-RTT sample. `None` if the host never answers (an old host) — the caller then assumes a
|
||||
/// shared clock. Each read is bounded so a silent host can't wedge session start. Shared by the
|
||||
/// reference client and the embeddable connector; uses the realtime clock the host stamps `pts_ns`
|
||||
/// with, so the offset aligns a client receive instant to the host's capture clock.
|
||||
pub async fn clock_sync(
|
||||
send: &mut quinn::SendStream,
|
||||
recv: &mut quinn::RecvStream,
|
||||
) -> Option<ClockSkew> {
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
fn now_ns() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0)
|
||||
}
|
||||
const ROUNDS: usize = 8;
|
||||
let read_timeout = Duration::from_secs(2);
|
||||
let mut samples: Vec<(u64, u64, u64, u64)> = Vec::with_capacity(ROUNDS);
|
||||
for _ in 0..ROUNDS {
|
||||
let t1 = now_ns();
|
||||
let probe = ClockProbe { t1_ns: t1 }.encode();
|
||||
if io::write_msg(send, &probe).await.is_err() {
|
||||
break;
|
||||
}
|
||||
let read = tokio::time::timeout(read_timeout, io::read_msg(recv)).await;
|
||||
let echo = match read {
|
||||
Ok(Ok(b)) => match ClockEcho::decode(&b) {
|
||||
Ok(e) => e,
|
||||
Err(_) => break,
|
||||
},
|
||||
_ => break, // timeout or stream error -> old host / no skew support
|
||||
};
|
||||
samples.push((echo.t1_ns, echo.t2_ns, echo.t3_ns, now_ns()));
|
||||
}
|
||||
clock_offset_ns(&samples).map(|(offset_ns, rtt_ns)| ClockSkew {
|
||||
offset_ns,
|
||||
rtt_ns,
|
||||
rounds: samples.len(),
|
||||
})
|
||||
}
|
||||
|
||||
/// quinn endpoint constructors. Host: self-signed identity (fresh, or persisted PEMs via
|
||||
/// [`endpoint::server_with_identity`]). Client: fingerprint pinning / TOFU via
|
||||
/// [`endpoint::client_pinned`] ([`endpoint::client_insecure`] is the no-pin special case).
|
||||
|
||||
Reference in New Issue
Block a user