diff --git a/crates/punktfunk-client-rs/src/main.rs b/crates/punktfunk-client-rs/src/main.rs index f15c00c..577d780 100644 --- a/crates/punktfunk-client-rs/src/main.rs +++ b/crates/punktfunk-client-rs/src/main.rs @@ -814,6 +814,13 @@ async fn session(args: Args) -> Result<()> { let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string()) .context("bind data plane")?; + // Hole-punch the host's data port so video traverses a NAT / inter-VLAN firewall. This + // tool runs one session then exits, so the keepalive thread dies with the process — no + // explicit stop needed (the flag is never set). + if let Ok(sock) = transport.try_clone_socket() { + let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + punktfunk_core::transport::spawn_data_punch(sock, stop); + } let mut session = Session::new(cfg, Box::new(transport)).map_err(|e| anyhow!("client session: {e:?}"))?; let mut sink = match &out_path { diff --git a/crates/punktfunk-core/src/client.rs b/crates/punktfunk-core/src/client.rs index e18d310..cd90498 100644 --- a/crates/punktfunk-core/src/client.rs +++ b/crates/punktfunk-core/src/client.rs @@ -648,6 +648,12 @@ async fn worker_main(args: WorkerArgs) { let host_udp = std::net::SocketAddr::new(remote.ip(), welcome.udp_port); let transport = UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &host_udp.to_string())?; + // Hole-punch the host's data port so video traverses a NAT / stateful inter-VLAN firewall + // (control + side planes ride the client-initiated QUIC; the raw video UDP needs the client + // to open the path first). Stops with the session via the shared shutdown flag. + if let Ok(sock) = transport.try_clone_socket() { + crate::transport::spawn_data_punch(sock, shutdown.clone()); + } let session = Session::new(welcome.session_config(Role::Client), Box::new(transport))?; Ok::<_, PunktfunkError>(( conn, diff --git a/crates/punktfunk-core/src/transport/mod.rs b/crates/punktfunk-core/src/transport/mod.rs index bbd2a00..2694a48 100644 --- a/crates/punktfunk-core/src/transport/mod.rs +++ b/crates/punktfunk-core/src/transport/mod.rs @@ -5,7 +5,7 @@ mod loopback; mod udp; pub use loopback::{loopback_pair, LoopbackTransport}; -pub use udp::UdpTransport; +pub use udp::{spawn_data_punch, UdpTransport, PUNCH_MAGIC}; /// A datagram transport. `recv` is non-blocking: it returns `Ok(None)` when no packet /// is currently available, so the caller (decode/present thread) never blocks here. diff --git a/crates/punktfunk-core/src/transport/udp.rs b/crates/punktfunk-core/src/transport/udp.rs index 2b0d385..47618ba 100644 --- a/crates/punktfunk-core/src/transport/udp.rs +++ b/crates/punktfunk-core/src/transport/udp.rs @@ -159,6 +159,38 @@ extern "C" { ) -> libc::ssize_t; } +/// Data-plane NAT/firewall hole-punch marker. The video data plane is a raw UDP socket distinct +/// from the QUIC control connection; on a flat LAN the host can send straight to the client, but +/// across a NAT or a stateful inter-VLAN firewall the unsolicited host→client video is rejected +/// (ICMP port-unreachable). So the client sends these tiny datagrams FROM its data socket TO the +/// host's data port: that opens the firewall/NAT return path and lets the host learn the client's +/// *observed* source (the NAT-translated address, not the client's reported private one). It's the +/// only thing a client ever sends on the data plane (video is host→client), so the host treats any +/// punch-magic datagram purely as a source-address probe and never as stream data. +pub const PUNCH_MAGIC: &[u8] = b"PFpunch1"; + +/// Spawn the client-side data-plane hole-punch keepalive. `sock` is a clone of the data socket +/// (already `connect`ed to the host's data port — see [`UdpTransport::try_clone_socket`]). Bursts +/// fast at first to open the NAT/firewall path before the host's punch-wait expires, then steady +/// keepalive so a stateful firewall's idle timeout can't close the path during a static, low-bitrate +/// scene. Stops when `stop` is set (session teardown) or the socket closes. No-op cost on a flat LAN. +pub fn spawn_data_punch(sock: UdpSocket, stop: std::sync::Arc) { + std::thread::Builder::new() + .name("punktfunk-data-punch".into()) + .spawn(move || { + let mut i = 0u32; + while !stop.load(std::sync::atomic::Ordering::Relaxed) { + if sock.send(PUNCH_MAGIC).is_err() { + break; + } + let delay_ms = if i < 15 { 200 } else { 2000 }; + i = i.saturating_add(1); + std::thread::sleep(std::time::Duration::from_millis(delay_ms)); + } + }) + .ok(); +} + pub struct UdpTransport { socket: UdpSocket, } @@ -188,6 +220,60 @@ impl UdpTransport { Ok(UdpTransport { socket }) } + /// Host side of the data plane for clients that may sit behind NAT / a stateful inter-VLAN + /// firewall. Bind `local`, then block up to `punch_timeout` for the client's first + /// [`PUNCH_MAGIC`] datagram and `connect` to its *observed* source — so video flows back + /// through the path the client just opened, to the address+port the host actually sees (the + /// NAT-translated one, which can differ from the client-reported `fallback_peer`). If no punch + /// arrives (a client that doesn't hole-punch), fall back to `fallback_peer` — the same flat-LAN + /// behaviour as [`connect`](Self::connect). Returns `(transport, punched)`. + pub fn connect_via_punch( + local: &str, + fallback_peer: &str, + punch_timeout: std::time::Duration, + ) -> std::io::Result<(Self, bool)> { + let socket = UdpSocket::bind(local)?; + socket.set_read_timeout(Some(punch_timeout))?; + let deadline = std::time::Instant::now() + punch_timeout; + let mut buf = [0u8; 64]; + let mut observed: Option = None; + loop { + match socket.recv_from(&mut buf) { + Ok((n, src)) if n >= PUNCH_MAGIC.len() && &buf[..PUNCH_MAGIC.len()] == PUNCH_MAGIC => { + observed = Some(src); + break; + } + Ok(_) => {} // stray datagram — keep waiting for a real punch + Err(e) + if matches!( + e.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ) => + { + break + } + Err(e) => return Err(e), + } + if std::time::Instant::now() >= deadline { + break; + } + } + let punched = observed.is_some(); + let target = observed.map(|s| s.to_string()); + socket.connect(target.as_deref().unwrap_or(fallback_peer))?; + socket.set_read_timeout(None)?; + Self::grow_buffers(&socket); + socket.set_nonblocking(true)?; + Ok((UdpTransport { socket }, punched)) + } + + /// A second handle to the data socket, for sending hole-punch keepalives ([`PUNCH_MAGIC`]) + /// while the [`Session`](crate::Session) owns the transport. The socket is already `connect`ed + /// to the host's data port, so `clone.send(PUNCH_MAGIC)` reaches it with no address. + pub fn try_clone_socket(&self) -> std::io::Result { + self.socket.try_clone() + } + /// The bound local address (e.g. to learn the OS-assigned ephemeral port). pub fn local_addr(&self) -> std::io::Result { self.socket.local_addr() diff --git a/crates/punktfunk-host/src/m3.rs b/crates/punktfunk-host/src/m3.rs index 1d7b68b..00f1c85 100644 --- a/crates/punktfunk-host/src/m3.rs +++ b/crates/punktfunk-host/src/m3.rs @@ -788,9 +788,23 @@ async fn serve_session( let stop_stream = stop.clone(); let result: Result<()> = async { tokio::task::spawn_blocking(move || -> Result<()> { - let transport = - UdpTransport::connect(&format!("0.0.0.0:{udp_port}"), &client_udp.to_string()) - .context("bind data plane")?; + // Wait briefly for the client to hole-punch our data port, then stream to its OBSERVED + // source — so video traverses a NAT / stateful inter-VLAN firewall (the client and host + // can be on different subnets; control + side planes ride the client-initiated QUIC, but + // the raw video UDP needs the client to open the path first). Falls back to the + // client-reported address for clients that don't punch (flat-LAN, unchanged). + let (transport, punched) = UdpTransport::connect_via_punch( + &format!("0.0.0.0:{udp_port}"), + &client_udp.to_string(), + std::time::Duration::from_millis(2500), + ) + .context("bind data plane")?; + tracing::info!( + %client_udp, + punched, + "data plane bound (punched=true → streaming to the client's observed source; \ + false → no hole-punch seen, using the reported address)" + ); let mut session = Session::new(cfg, Box::new(transport)) .map_err(|e| anyhow!("host session: {e:?}"))?; match source {