lesavka/client/src/handshake.rs

382 lines
14 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// client/src/handshake.rs
#![forbid(unsafe_code)]
use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient};
use std::time::{Duration, Instant};
use tokio::time::timeout;
use tonic::{Code, transport::Endpoint};
use tracing::{info, warn};
#[derive(Default, Clone, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct PeerCaps {
pub camera: bool,
pub microphone: bool,
pub server_version: Option<String>,
pub camera_output: Option<String>,
pub camera_codec: Option<String>,
pub camera_width: Option<u32>,
pub camera_height: Option<u32>,
pub camera_fps: Option<u32>,
pub eye_width: Option<u32>,
pub eye_height: Option<u32>,
pub eye_fps: Option<u32>,
}
#[derive(Default, Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct HandshakeProbe {
pub caps: PeerCaps,
pub rtt_ms: Option<f32>,
pub reachable: bool,
}
fn likely_port_typo_hint(uri: &str) -> Option<&'static str> {
if uri.contains(":5005") && !uri.contains(":50051") {
Some("possible typo: lesavka server listens on port 50051")
} else {
None
}
}
/// Negotiate the server capabilities the client should honor locally.
///
/// Inputs: the server URI to dial for the gRPC handshake.
/// Outputs: the negotiated peer capability set, or defaults when the server
/// is unreachable or does not implement the handshake service yet.
/// Why: the rest of client startup depends on these capabilities, but a
/// missing or misconfigured server should fall back to safe defaults instead
/// of aborting the whole client session.
#[cfg(coverage)]
pub async fn negotiate(uri: &str) -> PeerCaps {
if likely_port_typo_hint(uri).is_some() {
return PeerCaps::default();
}
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(_) => return PeerCaps::default(),
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
_ => return PeerCaps::default(),
};
let mut cli = HandshakeClient::new(channel);
match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty()).then_some(rsp.camera_output.clone()),
camera_codec: (!rsp.camera_codec.is_empty()).then_some(rsp.camera_codec.clone()),
camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width),
camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height),
camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps),
eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width),
eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height),
eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps),
}
}
Ok(Err(e)) if e.code() == Code::Unimplemented => PeerCaps::default(),
Ok(Err(_)) | Err(_) => PeerCaps::default(),
}
}
#[cfg(coverage)]
pub async fn probe(uri: &str) -> HandshakeProbe {
if likely_port_typo_hint(uri).is_some() {
return HandshakeProbe::default();
}
let started = Instant::now();
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(_) => return HandshakeProbe::default(),
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
_ => return HandshakeProbe::default(),
};
let mut cli = HandshakeClient::new(channel);
let rtt_ms = started.elapsed().as_secs_f32() * 1000.0;
match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
HandshakeProbe {
caps: PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty())
.then_some(rsp.camera_output.clone()),
camera_codec: (!rsp.camera_codec.is_empty())
.then_some(rsp.camera_codec.clone()),
camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width),
camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height),
camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps),
eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width),
eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height),
eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps),
},
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) if e.code() == Code::Unimplemented => HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
},
Ok(Err(_)) => HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
},
Err(_) => HandshakeProbe::default(),
}
}
#[cfg(not(coverage))]
pub async fn negotiate(uri: &str) -> PeerCaps {
info!(%uri, "🤝 dial handshake");
let Some(hint) = likely_port_typo_hint(uri) else {
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(e) => {
warn!("🤝 invalid handshake endpoint '{uri}': {e} assuming defaults");
return PeerCaps::default();
}
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
Ok(Err(e)) => {
if let Some(hint) = likely_port_typo_hint(uri) {
warn!("🤝 handshake connect failed: {e} ({hint}) assuming defaults");
} else {
warn!("🤝 handshake connect failed: {e} assuming defaults");
}
return PeerCaps::default();
}
Err(_) => {
if let Some(hint) = likely_port_typo_hint(uri) {
warn!("🤝 handshake connect timed out ({hint}) assuming defaults");
} else {
warn!("🤝 handshake connect timed out assuming defaults");
}
return PeerCaps::default();
}
};
info!("🤝 handshake channel connected");
let mut cli = HandshakeClient::new(channel);
info!("🤝 fetching capabilities…");
return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
let caps = PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: if rsp.server_version.is_empty() {
None
} else {
Some(rsp.server_version.clone())
},
camera_output: if rsp.camera_output.is_empty() {
None
} else {
Some(rsp.camera_output.clone())
},
camera_codec: if rsp.camera_codec.is_empty() {
None
} else {
Some(rsp.camera_codec.clone())
},
camera_width: if rsp.camera_width == 0 {
None
} else {
Some(rsp.camera_width)
},
camera_height: if rsp.camera_height == 0 {
None
} else {
Some(rsp.camera_height)
},
camera_fps: if rsp.camera_fps == 0 {
None
} else {
Some(rsp.camera_fps)
},
eye_width: if rsp.eye_width == 0 {
None
} else {
Some(rsp.eye_width)
},
eye_height: if rsp.eye_height == 0 {
None
} else {
Some(rsp.eye_height)
},
eye_fps: if rsp.eye_fps == 0 {
None
} else {
Some(rsp.eye_fps)
},
};
info!(?caps, "🤝 handshake ok");
caps
}
Ok(Err(e)) if e.code() == Code::Unimplemented => {
warn!("🤝 handshake not implemented on server assuming defaults");
PeerCaps::default()
}
Ok(Err(e)) => {
warn!("🤝 handshake failed: {e} assuming defaults");
PeerCaps::default()
}
Err(_) => {
warn!("🤝 handshake timed out assuming defaults");
PeerCaps::default()
}
};
};
warn!("🤝 handshake endpoint '{uri}' looks wrong ({hint}) assuming defaults");
PeerCaps::default()
}
#[cfg(not(coverage))]
pub async fn probe(uri: &str) -> HandshakeProbe {
info!(%uri, "🧪 probing handshake");
let Some(hint) = likely_port_typo_hint(uri) else {
let started = Instant::now();
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(e) => {
warn!("🧪 invalid probe endpoint '{uri}': {e}");
return HandshakeProbe::default();
}
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
Ok(Err(e)) => {
warn!("🧪 handshake probe connect failed: {e}");
return HandshakeProbe::default();
}
Err(_) => {
warn!("🧪 handshake probe timed out");
return HandshakeProbe::default();
}
};
let mut cli = HandshakeClient::new(channel);
let rtt_ms = started.elapsed().as_secs_f32() * 1000.0;
return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
let caps = PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty())
.then_some(rsp.camera_output.clone()),
camera_codec: (!rsp.camera_codec.is_empty())
.then_some(rsp.camera_codec.clone()),
camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width),
camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height),
camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps),
eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width),
eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height),
eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps),
};
info!(rtt_ms, ?caps, "🧪 handshake probe ok");
HandshakeProbe {
caps,
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) if e.code() == Code::Unimplemented => {
warn!("🧪 handshake probe reached a server without the handshake service");
HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) => {
warn!("🧪 handshake probe RPC failed: {e}");
HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Err(_) => {
warn!("🧪 handshake probe RPC timed out");
HandshakeProbe::default()
}
};
};
warn!("🧪 handshake probe endpoint '{uri}' looks wrong ({hint})");
HandshakeProbe::default()
}
#[cfg(test)]
mod tests {
use super::{HandshakeProbe, PeerCaps, likely_port_typo_hint, negotiate, probe};
#[test]
fn likely_port_typo_hint_flags_common_port_mistype() {
assert_eq!(
likely_port_typo_hint("http://127.0.0.1:5005"),
Some("possible typo: lesavka server listens on port 50051")
);
assert_eq!(likely_port_typo_hint("http://127.0.0.1:50051"), None);
}
#[tokio::test]
async fn negotiate_returns_defaults_for_invalid_endpoint() {
let caps = negotiate("not a uri").await;
assert_eq!(caps, PeerCaps::default());
}
#[tokio::test]
async fn negotiate_returns_defaults_for_port_typo_hint() {
let caps = negotiate("http://127.0.0.1:5005").await;
assert_eq!(caps, PeerCaps::default());
}
#[tokio::test]
async fn probe_returns_defaults_for_port_typo_hint() {
let probe_result = probe("http://127.0.0.1:5005").await;
assert_eq!(probe_result, HandshakeProbe::default());
}
}