// client/src/handshake.rs #![forbid(unsafe_code)] use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; use std::time::{Duration, Instant}; use tokio::time::timeout; use tonic::{Code, transport::Endpoint}; use tracing::{info, warn}; #[derive(Default, Clone, Debug)] #[cfg_attr(test, derive(PartialEq, Eq))] pub struct PeerCaps { pub camera: bool, pub microphone: bool, pub server_version: Option, pub camera_output: Option, pub camera_codec: Option, pub camera_width: Option, pub camera_height: Option, pub camera_fps: Option, pub eye_width: Option, pub eye_height: Option, pub eye_fps: Option, } #[derive(Default, Clone, Debug)] #[cfg_attr(test, derive(PartialEq))] pub struct HandshakeProbe { pub caps: PeerCaps, pub rtt_ms: Option, pub reachable: bool, } fn likely_port_typo_hint(uri: &str) -> Option<&'static str> { if uri.contains(":5005") && !uri.contains(":50051") { Some("possible typo: lesavka server listens on port 50051") } else { None } } /// Negotiate the server capabilities the client should honor locally. /// /// Inputs: the server URI to dial for the gRPC handshake. /// Outputs: the negotiated peer capability set, or defaults when the server /// is unreachable or does not implement the handshake service yet. /// Why: the rest of client startup depends on these capabilities, but a /// missing or misconfigured server should fall back to safe defaults instead /// of aborting the whole client session. #[cfg(coverage)] pub async fn negotiate(uri: &str) -> PeerCaps { if likely_port_typo_hint(uri).is_some() { return PeerCaps::default(); } let ep = match Endpoint::from_shared(uri.to_owned()) { Ok(ep) => ep .tcp_nodelay(true) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_timeout(Duration::from_secs(5)), Err(_) => return PeerCaps::default(), }; let channel = match timeout(Duration::from_secs(8), ep.connect()).await { Ok(Ok(channel)) => channel, _ => return PeerCaps::default(), }; let mut cli = HandshakeClient::new(channel); match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { Ok(Ok(rsp)) => { let rsp = rsp.get_ref(); PeerCaps { camera: rsp.camera, microphone: rsp.microphone, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()).then_some(rsp.camera_output.clone()), camera_codec: (!rsp.camera_codec.is_empty()).then_some(rsp.camera_codec.clone()), camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width), camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height), camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps), eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width), eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height), eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps), } } Ok(Err(e)) if e.code() == Code::Unimplemented => PeerCaps::default(), Ok(Err(_)) | Err(_) => PeerCaps::default(), } } #[cfg(coverage)] pub async fn probe(uri: &str) -> HandshakeProbe { if likely_port_typo_hint(uri).is_some() { return HandshakeProbe::default(); } let started = Instant::now(); let ep = match Endpoint::from_shared(uri.to_owned()) { Ok(ep) => ep .tcp_nodelay(true) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_timeout(Duration::from_secs(5)), Err(_) => return HandshakeProbe::default(), }; let channel = match timeout(Duration::from_secs(8), ep.connect()).await { Ok(Ok(channel)) => channel, _ => return HandshakeProbe::default(), }; let mut cli = HandshakeClient::new(channel); let rtt_ms = started.elapsed().as_secs_f32() * 1000.0; match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { Ok(Ok(rsp)) => { let rsp = rsp.get_ref(); HandshakeProbe { caps: PeerCaps { camera: rsp.camera, microphone: rsp.microphone, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()) .then_some(rsp.camera_output.clone()), camera_codec: (!rsp.camera_codec.is_empty()) .then_some(rsp.camera_codec.clone()), camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width), camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height), camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps), eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width), eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height), eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps), }, rtt_ms: Some(rtt_ms), reachable: true, } } Ok(Err(e)) if e.code() == Code::Unimplemented => HandshakeProbe { caps: PeerCaps::default(), rtt_ms: Some(rtt_ms), reachable: true, }, Ok(Err(_)) => HandshakeProbe { caps: PeerCaps::default(), rtt_ms: Some(rtt_ms), reachable: true, }, Err(_) => HandshakeProbe::default(), } } #[cfg(not(coverage))] pub async fn negotiate(uri: &str) -> PeerCaps { info!(%uri, "🀝 dial handshake"); let Some(hint) = likely_port_typo_hint(uri) else { let ep = match Endpoint::from_shared(uri.to_owned()) { Ok(ep) => ep .tcp_nodelay(true) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_timeout(Duration::from_secs(5)), Err(e) => { warn!("🀝 invalid handshake endpoint '{uri}': {e} – assuming defaults"); return PeerCaps::default(); } }; let channel = match timeout(Duration::from_secs(8), ep.connect()).await { Ok(Ok(channel)) => channel, Ok(Err(e)) => { if let Some(hint) = likely_port_typo_hint(uri) { warn!("🀝 handshake connect failed: {e} ({hint}) – assuming defaults"); } else { warn!("🀝 handshake connect failed: {e} – assuming defaults"); } return PeerCaps::default(); } Err(_) => { if let Some(hint) = likely_port_typo_hint(uri) { warn!("🀝 handshake connect timed out ({hint}) – assuming defaults"); } else { warn!("🀝 handshake connect timed out – assuming defaults"); } return PeerCaps::default(); } }; info!("🀝 handshake channel connected"); let mut cli = HandshakeClient::new(channel); info!("🀝 fetching capabilities…"); return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { Ok(Ok(rsp)) => { let rsp = rsp.get_ref(); let caps = PeerCaps { camera: rsp.camera, microphone: rsp.microphone, server_version: if rsp.server_version.is_empty() { None } else { Some(rsp.server_version.clone()) }, camera_output: if rsp.camera_output.is_empty() { None } else { Some(rsp.camera_output.clone()) }, camera_codec: if rsp.camera_codec.is_empty() { None } else { Some(rsp.camera_codec.clone()) }, camera_width: if rsp.camera_width == 0 { None } else { Some(rsp.camera_width) }, camera_height: if rsp.camera_height == 0 { None } else { Some(rsp.camera_height) }, camera_fps: if rsp.camera_fps == 0 { None } else { Some(rsp.camera_fps) }, eye_width: if rsp.eye_width == 0 { None } else { Some(rsp.eye_width) }, eye_height: if rsp.eye_height == 0 { None } else { Some(rsp.eye_height) }, eye_fps: if rsp.eye_fps == 0 { None } else { Some(rsp.eye_fps) }, }; info!(?caps, "🀝 handshake ok"); caps } Ok(Err(e)) if e.code() == Code::Unimplemented => { warn!("🀝 handshake not implemented on server – assuming defaults"); PeerCaps::default() } Ok(Err(e)) => { warn!("🀝 handshake failed: {e} – assuming defaults"); PeerCaps::default() } Err(_) => { warn!("🀝 handshake timed out – assuming defaults"); PeerCaps::default() } }; }; warn!("🀝 handshake endpoint '{uri}' looks wrong ({hint}) – assuming defaults"); PeerCaps::default() } #[cfg(not(coverage))] pub async fn probe(uri: &str) -> HandshakeProbe { info!(%uri, "πŸ§ͺ probing handshake"); let Some(hint) = likely_port_typo_hint(uri) else { let started = Instant::now(); let ep = match Endpoint::from_shared(uri.to_owned()) { Ok(ep) => ep .tcp_nodelay(true) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_timeout(Duration::from_secs(5)), Err(e) => { warn!("πŸ§ͺ invalid probe endpoint '{uri}': {e}"); return HandshakeProbe::default(); } }; let channel = match timeout(Duration::from_secs(8), ep.connect()).await { Ok(Ok(channel)) => channel, Ok(Err(e)) => { warn!("πŸ§ͺ handshake probe connect failed: {e}"); return HandshakeProbe::default(); } Err(_) => { warn!("πŸ§ͺ handshake probe timed out"); return HandshakeProbe::default(); } }; let mut cli = HandshakeClient::new(channel); let rtt_ms = started.elapsed().as_secs_f32() * 1000.0; return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { Ok(Ok(rsp)) => { let rsp = rsp.get_ref(); let caps = PeerCaps { camera: rsp.camera, microphone: rsp.microphone, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()) .then_some(rsp.camera_output.clone()), camera_codec: (!rsp.camera_codec.is_empty()) .then_some(rsp.camera_codec.clone()), camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width), camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height), camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps), eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width), eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height), eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps), }; info!(rtt_ms, ?caps, "πŸ§ͺ handshake probe ok"); HandshakeProbe { caps, rtt_ms: Some(rtt_ms), reachable: true, } } Ok(Err(e)) if e.code() == Code::Unimplemented => { warn!("πŸ§ͺ handshake probe reached a server without the handshake service"); HandshakeProbe { caps: PeerCaps::default(), rtt_ms: Some(rtt_ms), reachable: true, } } Ok(Err(e)) => { warn!("πŸ§ͺ handshake probe RPC failed: {e}"); HandshakeProbe { caps: PeerCaps::default(), rtt_ms: Some(rtt_ms), reachable: true, } } Err(_) => { warn!("πŸ§ͺ handshake probe RPC timed out"); HandshakeProbe::default() } }; }; warn!("πŸ§ͺ handshake probe endpoint '{uri}' looks wrong ({hint})"); HandshakeProbe::default() } #[cfg(test)] mod tests { use super::{HandshakeProbe, PeerCaps, likely_port_typo_hint, negotiate, probe}; #[test] fn likely_port_typo_hint_flags_common_port_mistype() { assert_eq!( likely_port_typo_hint("http://127.0.0.1:5005"), Some("possible typo: lesavka server listens on port 50051") ); assert_eq!(likely_port_typo_hint("http://127.0.0.1:50051"), None); } #[tokio::test] async fn negotiate_returns_defaults_for_invalid_endpoint() { let caps = negotiate("not a uri").await; assert_eq!(caps, PeerCaps::default()); } #[tokio::test] async fn negotiate_returns_defaults_for_port_typo_hint() { let caps = negotiate("http://127.0.0.1:5005").await; assert_eq!(caps, PeerCaps::default()); } #[tokio::test] async fn probe_returns_defaults_for_port_typo_hint() { let probe_result = probe("http://127.0.0.1:5005").await; assert_eq!(probe_result, HandshakeProbe::default()); } }