From 84790ea470bae935fb9032a34be71e9168ad3bd8 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 4 Jul 2025 01:56:59 -0500 Subject: [PATCH] more logging --- client/src/app.rs | 74 +++++++++++++++++++++------------- client/src/handshake.rs | 29 +++++++++++++ client/src/input/camera.rs | 35 +++++++++++----- client/src/input/inputs.rs | 36 ++--------------- client/src/input/microphone.rs | 20 +++++++-- client/src/lib.rs | 1 + common/proto/lesavka.proto | 7 ++++ scripts/install/server.sh | 2 +- server/src/video.rs | 12 ++++++ 9 files changed, 140 insertions(+), 76 deletions(-) create mode 100644 client/src/handshake.rs diff --git a/client/src/app.rs b/client/src/app.rs index b801249..b35c986 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,7 +19,8 @@ use lesavka_common::lesavka::{ MonitorRequest, MouseReport, VideoPacket, AudioPacket }; -use crate::{input::inputs::InputAggregator, +use crate::{handshake, + input::inputs::InputAggregator, input::microphone::MicrophoneCapture, input::camera::CameraCapture, output::video::MonitorWindow, @@ -47,18 +48,14 @@ impl LesavkaClientApp { let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately - let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() { - None - } else { - Some(Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?)) - }; - Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { + /*────────── handshake / feature-negotiation ───────────────*/ + let caps = handshake::negotiate(&self.server_addr).await; + tracing::info!("🤝 server capabilities = {:?}", caps); + /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) @@ -137,13 +134,17 @@ impl LesavkaClientApp { let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); - - /*────────── camera & mic tasks ───────────────*/ - let cam = Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?); - tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - + /*────────── camera & mic tasks (gated by caps) ───────────*/ + if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + if caps.microphone { + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); + } + } let mic = Arc::new(MicrophoneCapture::new()?); tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); @@ -269,6 +270,7 @@ impl LesavkaClientApp { /*──────────────── mic stream ─────────────────*/ async fn mic_loop(ep: Channel, mic: Arc) { + let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); @@ -290,12 +292,11 @@ impl LesavkaClientApp { // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - - match cli.stream_microphone(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => { + match cli.stream_microphone(Request::new(outbound)).await { + Ok(mut resp) => { + while resp.get_mut().message().await.transpose().is_some() {} + } + Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); @@ -303,15 +304,17 @@ impl LesavkaClientApp { } else { debug!("❌🎤 reconnect failed: {e}"); } + delay = next_delay(delay); } } let _ = stop_tx.send(()); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ async fn cam_loop(ep: Channel, cam: Arc) { + let mut delay = Duration::from_secs(1); loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); @@ -327,16 +330,33 @@ impl LesavkaClientApp { if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } - let _ = tx.blocking_send(pkt); + tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", + pkt.pts, pkt.data.len()); + let _ = tx.try_send(pkt); } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - if let Err(e) = cli.stream_camera(Request::new(outbound)).await { - tracing::warn!("❌📸 connect failed: {e}"); + match cli.stream_camera(Request::new(outbound)).await { + Ok(_) => delay = Duration::from_secs(1), // got a stream → reset + Err(e) if e.code() == tonic::Code::Unimplemented => { + tracing::warn!("📸 server does not support StreamCamera – giving up"); + return; // stop the task completely (#3) + } + Err(e) => { + tracing::warn!("❌📸 connect failed: {e:?}"); + delay = next_delay(delay); // back-off (#2) + } } - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; } } } + +fn next_delay(cur: std::time::Duration) -> std::time::Duration { + match cur.as_secs() { + 1..=15 => cur * 2, + _ => std::time::Duration::from_secs(30), + } +} diff --git a/client/src/handshake.rs b/client/src/handshake.rs new file mode 100644 index 0000000..190f4ac --- /dev/null +++ b/client/src/handshake.rs @@ -0,0 +1,29 @@ +// client/src/handshake.rs +#![forbid(unsafe_code)] + +use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; +use tonic::Code; + +#[derive(Default, Clone, Copy, Debug)] +pub struct PeerCaps { + pub camera: bool, + pub microphone: bool, +} + +pub async fn negotiate(uri: &str) -> PeerCaps { + let mut cli = HandshakeClient::connect(uri.to_owned()) + .await + .expect("\"dial handshake\""); + + match cli.get_capabilities(pb::Empty {}).await { + Ok(rsp) => PeerCaps { + camera: rsp.get_ref().camera, + microphone: rsp.get_ref().microphone, + }, + Err(e) if e.code() == Code::Unimplemented => { + // ↺ old server – pretend it supports nothing special. + PeerCaps::default() + } + Err(e) => panic!("\"handshake failed: {e}\""), + } +} diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 858d815..1e2f8b2 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -26,6 +26,17 @@ impl CameraCapture { // (NVIDIA → VA-API → software x264). let (enc, kf_prop, kf_val) = Self::choose_encoder(); tracing::info!("📸 using encoder element: {enc}"); + let (src_caps, preenc) = match enc { + "nvh264enc" => ( + "video/x-raw(memory:NVMM),format=NV12,width=1280,height=720", + "nvvidconv !" + ), + "vaapih264enc" => ( + "video/x-raw,format=NV12,width=1280,height=720", + "videoconvert !" + ), + _ => ("video/x-raw,width=1280,height=720", "videoconvert !"), + }; // let desc = format!( // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ @@ -39,9 +50,10 @@ impl CameraCapture { // * vaapih264enc wants system-memory caps; // * x264enc needs the usual raw caps. let desc = format!( - "v4l2src device={dev} do-timestamp=true ! \ - videoconvert ! {enc} {kf_prop}={kf_val} ! \ - h264parse config-interval=-1 ! \ + "v4l2src device={dev} do-timestamp=true ! {src_caps} ! \ + {preenc} {enc} {kf_prop}={kf_val} ! \ + h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ + queue max-size-buffers=30 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=60 drop=true" ); tracing::info!(%enc, ?desc, "📸 using encoder element"); @@ -97,6 +109,7 @@ impl CameraCapture { Self { pipeline, sink } } + #[allow(dead_code)] // helper kept for future heuristics fn pick_encoder() -> (&'static str, &'static str) { let encoders = &[ ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), @@ -113,15 +126,15 @@ impl CameraCapture { ("x264enc", "video/x-raw") } - /// Return the encoder element *and* the property/key-frame pair we must - /// set to get an IDR every 30 frames. fn choose_encoder() -> (&'static str, &'static str, &'static str) { - if gst::ElementFactory::find("nvh264enc").is_some() { - ("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0} - } else if gst::ElementFactory::find("vaapih264enc").is_some() { - ("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1} - } else { - ("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2} + match () { + _ if gst::ElementFactory::find("nvh264enc").is_some() => + ("nvh264enc", "gop-size", "30"), + _ if gst::ElementFactory::find("vaapih264enc").is_some() => + ("vaapih264enc","keyframe-period","30"), + _ if gst::ElementFactory::find("v4l2h264enc").is_some() => + ("v4l2h264enc","idrcount", "30"), + _ => ("x264enc", "key-int-max", "30"), } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index b2f1813..1ae54c2 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -7,7 +7,7 @@ use tracing::{debug, info, warn, trace}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; -use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, camera::CameraCapture}; +use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator}; use crate::layout::{Layout, apply as apply_layout}; pub struct InputAggregator { @@ -18,7 +18,6 @@ pub struct InputAggregator { magic_active: bool, keyboards: Vec, mice: Vec, - camera: Option, } impl InputAggregator { @@ -26,8 +25,8 @@ impl InputAggregator { kbd_tx: Sender, mou_tx: Sender) -> Self { Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false, - keyboards: Vec::new(), mice: Vec::new(), - camera: None } + keyboards: Vec::new(), mice: Vec::new() + } } /// Called once at startup: enumerates input devices, @@ -92,29 +91,6 @@ impl InputAggregator { bail!("No suitable keyboard/mouse devices found or none grabbed."); } - // Auto‑select webcam (may be toggled later through magic chord) - let cam_src_env = std::env::var("LESAVKA_CAM_SOURCE").ok(); - self.camera = match cam_src_env.as_deref() - // .or(device_fragment) // <- if you later wire CLI arg - .map(|s| CameraCapture::new(Some(s))) { - Some(Ok(c)) => { - info!("📸 webcam enabled (device fragment = {:?})", cam_src_env); - Some(c) - } - Some(Err(e)) => { - warn!( - "📸 webcam disabled – {:?}. \ - Hint: install gst‑plugins‑bad or provide LESAVKA_CAM_DISABLE=1", - e.root_cause() - ); - Some(CameraCapture::new_stub()) // keep stub so call‑sites compile - } - None => { - info!("📸 webcam disabled (no CAM_SOURCE set)"); - None // or Stub – your choice - } - }; - Ok(()) } @@ -152,12 +128,6 @@ impl InputAggregator { mouse.process_events(); } - if let Some(cam) = &self.camera { - trace!("📸 CameraCapture present – first pull() will block until a frame arrives"); - } else { - debug!("📸 No camera pipeline active"); - } - self.magic_active = magic_now; tick.tick().await; } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 936dcd5..27d2441 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -30,12 +30,24 @@ impl MicrophoneCapture { _ => String::new(), }; debug!("🎤 device: {device_arg}"); + let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] + .into_iter() + .find(|e| gst::ElementFactory::find(e).is_some()) + .unwrap_or("opusenc"); + let parser = if aac.contains("opus") { + // opusenc already outputs raw Opus frames – just state the caps + "capsfilter caps=audio/x-opus,rate=48000,channels=2" + } else { + // AAC → ADTS frames + "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2" + }; let desc = format!( "pulsesrc {device_arg} do-timestamp=true ! \ - audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ - audioconvert ! audioresample ! avenc_aac bitrate=128000 ! \ - aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! \ - appsink name=asink emit-signals=true max-buffers=50 drop=true" + audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audioconvert ! audioresample ! {aac} bitrate=128000 ! \ + {parser} ! \ + queue max-size-buffers=100 leaky=downstream ! \ + appsink name=asink emit-signals=true max-buffers=50 drop=true" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? diff --git a/client/src/lib.rs b/client/src/lib.rs index 0db2943..ba798cc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -6,5 +6,6 @@ pub mod app; pub mod input; pub mod output; pub mod layout; +pub mod handshake; pub use app::LesavkaClientApp; diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 8e97acc..7d37566 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -1,3 +1,4 @@ +// ───────────────────────────────────────── proto/lesavka.proto syntax = "proto3"; package lesavka; @@ -10,6 +11,8 @@ message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message ResetUsbReply { bool ok = 1; } // true = success +message HandshakeSet { bool camera = 1; bool microphone = 2; } + message Empty {} service Relay { @@ -22,3 +25,7 @@ service Relay { rpc ResetUsb (Empty) returns (ResetUsbReply); } + +service Handshake { + rpc GetCapabilities (Empty) returns (HandshakeSet); +} diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 9823865..8ca910c 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -122,7 +122,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=trace,lesavka_server::gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=trace,lesavka_server::video=trace,lesavka_server::gadget=info Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Restart=always diff --git a/server/src/video.rs b/server/src/video.rs index 3e018dd..7957e9c 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -295,6 +295,18 @@ impl CameraRelay { bytes = pkt.data.len(), "📸📥 srv pkt"); } + + if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) { + if n % 120 == 0 { + let path = format!("/tmp/eye3-cli-{n:05}.h264"); + if let Err(e) = std::fs::write(&path, &pkt.data) { + tracing::warn!("📸💾 dump failed: {e}"); + } else { + tracing::debug!("📸💾 wrote {}", path); + } + } + } + self.sink.push(pkt); } }