feat(protocol): per-AU host-timing plane (0xCF) — split host+network latency (stats phase 2)
The unified-stats equation's host+network stage was one opaque number because the wire carried nothing but pts_ns. Now the host reports its own share per frame: when the client's Hello sets VIDEO_CAP_HOST_TIMING (0x08), the send thread emits a 13-byte 0xCF datagram — [tag][pts_ns u64][host_us u32] — right after the AU's last packet leaves the socket, so host_us = capture→fully-sent (capture read/convert, encode, FEC+seal, paced send) against the same anchor the wire pts carries. Clients correlate by pts_ns and derive network = (received + clock_offset − pts) − host_us; the two terms tile per frame by construction. Back-compat is free in all four combinations: old clients ignore unknown datagram tags, old hosts ignore unknown cap bits (client keeps the combined stage). The hardened data-plane format is untouched — this rides the established QUIC side-plane pattern (0xC8…0xCE). NativeClient ORs the bit in unconditionally and exposes next_host_timing(); the C ABI gains PunktfunkHostTiming + punktfunk_connection_next_host_timing (additive). The synthetic host emits 0xCF too, so pure-loopback protocol tests cover the plane. The probe reports the split (host_p50/p95_us · net_p50/p95_us) and is our direct analogue of Sunshine's "host processing latency" — ours additionally includes the paced send. Validated on loopback (synthetic host + probe, debug build): 240/240 AUs matched, host_p50 6.5 ms + net_p50 6.4 ms ≈ capture→received p50 13.0 ms. Core suite + new 0xCF roundtrip/truncation test green; host+core+probe clippy clean. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -424,7 +424,9 @@ async fn session(args: Args) -> Result<()> {
|
||||
// PUNKTFUNK_CLIENT_444=1 advertises VIDEO_CAP_444 (host HEVC 4:4:4 path) — verify the
|
||||
// resulting chroma with `ffprobe` on the `--out` .h265.
|
||||
video_caps: {
|
||||
let mut caps = 0u8;
|
||||
// Always ask for per-AU host timings (0xCF) — this is a measurement tool, and the
|
||||
// host/network split is exactly what it exists to report. Old hosts ignore the bit.
|
||||
let mut caps = punktfunk_core::quic::VIDEO_CAP_HOST_TIMING;
|
||||
if std::env::var_os("PUNKTFUNK_CLIENT_10BIT").is_some() {
|
||||
caps |= punktfunk_core::quic::VIDEO_CAP_10BIT;
|
||||
}
|
||||
@@ -902,6 +904,10 @@ async fn session(args: Args) -> Result<()> {
|
||||
let audio_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let rumble_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let hidout_pkts = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
// Per-AU host timings (0xCF) → the stream loop, which matches them to received AUs by pts
|
||||
// and reports the host/network split. try_send: overflow drops samples, never blocks QUIC.
|
||||
let (host_timing_tx, host_timing_rx) =
|
||||
std::sync::mpsc::sync_channel::<punktfunk_core::quic::HostTiming>(512);
|
||||
{
|
||||
let (a, ab, r, h) = (
|
||||
audio_pkts.clone(),
|
||||
@@ -909,6 +915,7 @@ async fn session(args: Args) -> Result<()> {
|
||||
rumble_pkts.clone(),
|
||||
hidout_pkts.clone(),
|
||||
);
|
||||
let ht_tx = host_timing_tx;
|
||||
let conn2 = conn.clone();
|
||||
// Build a multistream decoder for the host-RESOLVED layout so the probe actually decodes
|
||||
// the surround stream (not just counts bytes) — the headless validator for the encode path.
|
||||
@@ -956,6 +963,10 @@ async fn session(args: Args) -> Result<()> {
|
||||
if h.fetch_add(1, Relaxed) < 12 {
|
||||
tracing::info!(?hid, "DualSense HID output (0xCD)");
|
||||
}
|
||||
} else if let Some(t) = punktfunk_core::quic::decode_host_timing_datagram(&d) {
|
||||
// Per-AU host timing (0xCF) — forwarded to the stream loop for the
|
||||
// host/network latency split.
|
||||
let _ = ht_tx.try_send(t);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -1000,6 +1011,12 @@ async fn session(args: Args) -> Result<()> {
|
||||
let mut mismatched = 0u32;
|
||||
let mut bytes = 0u64;
|
||||
let mut latencies_us: Vec<u64> = Vec::new();
|
||||
// Host/network split: received AUs awaiting their 0xCF host timing (pts → capture→received
|
||||
// µs), matched as the datagrams arrive. Bounded — an old host never sends any.
|
||||
let mut pending_split: std::collections::VecDeque<(u64, u64)> =
|
||||
std::collections::VecDeque::new();
|
||||
let mut host_us_v: Vec<u64> = Vec::new();
|
||||
let mut net_us_v: Vec<u64> = Vec::new();
|
||||
let mut last_rx = std::time::Instant::now();
|
||||
let started = std::time::Instant::now();
|
||||
// Adaptive-FEC loss window: publish a fresh estimate every 750 ms for the LossReport task.
|
||||
@@ -1057,6 +1074,19 @@ async fn session(args: Args) -> Result<()> {
|
||||
.max(0) as u64;
|
||||
if lat > 0 && lat < 10_000_000_000 {
|
||||
latencies_us.push(lat / 1000);
|
||||
pending_split.push_back((frame.pts_ns, lat / 1000));
|
||||
if pending_split.len() > 1024 {
|
||||
pending_split.pop_front();
|
||||
}
|
||||
}
|
||||
// Match any host timings (0xCF) that have arrived: host = the reported
|
||||
// capture→sent, network = our capture→received minus it (per-frame tiling).
|
||||
while let Ok(t) = host_timing_rx.try_recv() {
|
||||
if let Some(i) = pending_split.iter().position(|(p, _)| *p == t.pts_ns) {
|
||||
let (_, hostnet_us) = pending_split.remove(i).unwrap();
|
||||
host_us_v.push(t.host_us as u64);
|
||||
net_us_v.push(hostnet_us.saturating_sub(t.host_us as u64));
|
||||
}
|
||||
}
|
||||
if expected > 0 {
|
||||
// Verification mode: deterministic content.
|
||||
@@ -1103,6 +1133,28 @@ async fn session(args: Args) -> Result<()> {
|
||||
"punktfunk/1 stream complete (capture→received latency; skew_corrected=true ⇒ \
|
||||
cross-machine valid, false ⇒ same-host clock)"
|
||||
);
|
||||
if !host_us_v.is_empty() {
|
||||
// The host/network split from the per-AU 0xCF timings (design/stats-unification.md
|
||||
// Phase 2): host = the host's own capture→sent, network = capture→received minus it.
|
||||
let pcts = |v: &mut Vec<u64>, p: f64| -> u64 {
|
||||
if v.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
v.sort_unstable();
|
||||
v[((v.len() as f64 * p) as usize).min(v.len() - 1)]
|
||||
};
|
||||
tracing::info!(
|
||||
timing_samples = host_us_v.len(),
|
||||
host_p50_us = pcts(&mut host_us_v, 0.50),
|
||||
host_p95_us = pcts(&mut host_us_v, 0.95),
|
||||
net_p50_us = pcts(&mut net_us_v, 0.50),
|
||||
net_p95_us = pcts(&mut net_us_v, 0.95),
|
||||
"host/network latency split (host = capture→sent on the host; network = wire + \
|
||||
reassembly)"
|
||||
);
|
||||
} else {
|
||||
tracing::info!("no host timing datagrams (0xCF) — old host; host+network unsplit");
|
||||
}
|
||||
if expected > 0 {
|
||||
anyhow::ensure!(mismatched == 0, "{mismatched} corrupted frames");
|
||||
anyhow::ensure!(ok == expected, "received {ok}/{expected} frames");
|
||||
|
||||
Reference in New Issue
Block a user