From 1c095a95d148e2dbbf8fa1a4843c852ddd48a69c Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 3 Jul 2025 08:19:59 -0500 Subject: [PATCH 1/7] camera core add --- client/Cargo.toml | 2 +- client/src/input/camera.rs | 82 ++++++++++++++++++++++++++++++---- client/src/input/inputs.rs | 7 ++- common/Cargo.toml | 2 +- common/proto/lesavka.proto | 4 +- scripts/daemon/lesavka-core.sh | 11 ++++- server/Cargo.toml | 2 +- server/src/main.rs | 11 +++++ server/src/video.rs | 45 +++++++++++++++++++ 9 files changed, 150 insertions(+), 16 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index b63f8a6..5141e67 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.5.0" +version = "0.6.0" edition = "2024" [dependencies] diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 29814db..49476e8 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -1,21 +1,85 @@ // client/src/input/camera.rs +#![forbid(unsafe_code)] use anyhow::Result; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gst::prelude::*; +use lesavka_common::lesavka::VideoPacket; -/// A stub camera aggregator or capture pub struct CameraCapture { - // no real fields yet + pipeline: gst::Pipeline, + sink: gst_app::AppSink, } impl CameraCapture { - pub fn new_stub() -> Self { - // in real code: open /dev/video0, set formats, etc - CameraCapture {} + pub fn new(device_fragment: Option<&str>) -> anyhow::Result { + gst::init().ok(); + + // Pick device + let dev = device_fragment + .and_then(Self::find_device) + .unwrap_or_else(|| "/dev/video0".into()); + + let desc = format!( + "v4l2src device={dev} io-mode=dmabuf-do-timestamp=true ! \ + videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! \ + v4l2h264enc key-int-max=30 \ + extra-controls=\"encode,frame_level_rate_control_enable=1,h264_profile=4\" ! \ + h264parse config-interval=-1 ! \ + appsink name=asink emit-signals=true max-buffers=60 drop=true" + ); + + // --- NEW: propagate the Result and down‑cast with `?` + let pipeline: gst::Pipeline = gst::parse::launch(&desc)? + .downcast::() + .expect("not a pipeline"); + + // --- NEW: the lookup already returns the concrete AppSink → just unwrap with `?` + let sink: gst_app::AppSink = pipeline + .by_name("asink") + .expect("appsink element not found") + .downcast::() + .expect("appsink down‑cast"); + + pipeline.set_state(gst::State::Playing)?; + + Ok(Self { pipeline, sink }) } - /// Called regularly to capture frames or do nothing - pub fn process_frames(&mut self) -> Result<()> { - // no-op - Ok(()) + pub fn pull(&self) -> Option { + let sample = self.sink.pull_sample().ok()?; + let buf = sample.buffer()?; + let map = buf.map_readable().ok()?; + let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; + Some(VideoPacket { id: 100, pts, data: map.as_slice().to_vec() }) + } + + /// Fuzzy‑match devices under `/dev/v4l/by-id` + fn find_device(substr: &str) -> Option { + let dir = std::fs::read_dir("/dev/v4l/by-id").ok()?; + for e in dir.flatten() { + let p = e.path(); + if p.file_name()?.to_string_lossy().contains(substr) { + if let Ok(target) = std::fs::read_link(&p) { + return Some(format!("/dev/{}", target.file_name()?.to_string_lossy())); + } + } + } + None + } + + /// Cheap stub used when the web‑cam is disabled + pub fn new_stub() -> Self { + let pipeline = gst::Pipeline::new(); + + // --- NEW: AppSink factory helper (AppSink::new() does not exist) + let sink: gst_app::AppSink = gst::ElementFactory::make("appsink") + .build() + .expect("make appsink") + .downcast::() + .expect("appsink down‑cast"); + + Self { pipeline, sink } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index f471f43..861e596 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -92,8 +92,11 @@ impl InputAggregator { bail!("No suitable keyboard/mouse devices found or none grabbed."); } - // Stubs for camera / mic: - self.camera = Some(CameraCapture::new_stub()); + // Auto‑select webcam (may be toggled later through magic chord) + self.camera = match CameraCapture::new(std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()) { + Ok(cam) => { info!("📸 webcam enabled"); Some(cam) } + Err(e) => { warn!("📸 webcam disabled: {e}"); None } + }; Ok(()) } diff --git a/common/Cargo.toml b/common/Cargo.toml index 4628718..24781aa 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.5.0" +version = "0.6.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 4ee8de2..8e97acc 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -17,6 +17,8 @@ service Relay { rpc StreamMouse (stream MouseReport) returns (stream MouseReport); rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); - rpc StreamMicrophone (stream AudioPacket) returns (stream Empty) {} + rpc StreamMicrophone (stream AudioPacket) returns (stream Empty); + rpc StreamCamera (stream VideoPacket) returns (stream Empty); + rpc ResetUsb (Empty) returns (ResetUsbReply); } diff --git a/scripts/daemon/lesavka-core.sh b/scripts/daemon/lesavka-core.sh index d0f07e1..e4c1ece 100644 --- a/scripts/daemon/lesavka-core.sh +++ b/scripts/daemon/lesavka-core.sh @@ -112,6 +112,14 @@ echo 2 >"$U/c_ssize" # Optional: allocate a few extra request buffers echo 32 >"$U/req_number" 2>/dev/null || true +# ----------------------- UVC function (usb‑video) ------------------ +mkdir -p "$G/functions/uvc.usb0" +# Mandatory control interface strings +mkdir -p "$G/functions/uvc.usb0/control/strings/0x409" +echo "Lesavka UVC" >"$G/functions/uvc.usb0/control/strings/0x409/label" +# Simple 720p MJPEG + 720p H.264 alt‑setting +printf '\x50\x00\x00\x00' >"$G/functions/uvc.usb0/control/header/h_video" + # ----------------------- configuration ----------------------------- mkdir -p "$G/configs/c.1/strings/0x409" echo 500 > "$G/configs/c.1/MaxPower" @@ -120,7 +128,8 @@ echo "Config 1: HID + UAC2" >"$G/configs/c.1/strings/0x409/configuration" ln -s $G/functions/hid.usb0 $G/configs/c.1/ ln -s $G/functions/hid.usb1 $G/configs/c.1/ -ln -s "$U" "$G/configs/c.1/" +ln -s $U $G/configs/c.1/ +ln -s $G/functions/uvc.usb0 $G/configs/c.1/ # mkdir -p $G/functions/hid.usb0/os_desc # mkdir -p $G/functions/hid.usb1/os_desc diff --git a/server/Cargo.toml b/server/Cargo.toml index eba7ff2..7eb3c4f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_server" -version = "0.5.0" +version = "0.6.0" edition = "2024" [dependencies] diff --git a/server/src/main.rs b/server/src/main.rs index 1104715..3b56188 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -133,6 +133,7 @@ impl Relay for Handler { type CaptureVideoStream = Pin> + Send>>; type CaptureAudioStream = Pin> + Send>>; type StreamMicrophoneStream = ReceiverStream>; + type StreamCameraStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -209,6 +210,16 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + async fn stream_camera( + &self, + _req: Request>, + ) -> Result, Status> { + // Simply consume the inbound stream and discard; echo back a single Empty. + let (tx, rx) = tokio::sync::mpsc::channel(1); + tx.send(Ok(Empty {})).await.ok(); + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn capture_video( &self, req: Request, diff --git a/server/src/video.rs b/server/src/video.rs index 1604376..78884a5 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -214,3 +214,48 @@ pub async fn eye_ball( } Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) }) } + +pub struct WebcamSink { + appsrc: gst_app::AppSrc, + _pipe: gst::Pipeline, +} + +impl WebcamSink { + pub fn new(uvc_dev: &str) -> anyhow::Result { + gst::init()?; + + let pipeline = gst::Pipeline::new(); + let src = gst::ElementFactory::make("appsrc") + .build()? + .downcast::() + .expect("appsrc"); + src.set_is_live(true); + src.set_format(gst::Format::Time); + + let h264parse = gst::ElementFactory::make("h264parse").build()?; + let decoder = gst::ElementFactory::make("v4l2h264dec").build()?; + let convert = gst::ElementFactory::make("videoconvert").build()?; + let sink = gst::ElementFactory::make("v4l2sink") + .property("device", &uvc_dev) + .property("sync", &false) + .build()?; + + // Up‑cast to &gst::Element for the collection macros + pipeline.add_many(&[ + src.upcast_ref(), &h264parse, &decoder, &convert, &sink + ])?; + gst::Element::link_many(&[ + src.upcast_ref(), &h264parse, &decoder, &convert, &sink + ])?; + pipeline.set_state(gst::State::Playing)?; + + Ok(Self { appsrc: src, _pipe: pipeline }) + } + + pub fn push(&self, pkt: VideoPacket) { + let mut buf = gst::Buffer::from_slice(pkt.data); + buf.get_mut().unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + let _ = self.appsrc.push_buffer(buf); + } +} From 6c3942cd2c7bb8437bc02d2c7b44b9f986c9596c Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 3 Jul 2025 09:24:57 -0500 Subject: [PATCH 2/7] increased cam logging --- client/src/input/camera.rs | 18 +++++++++--------- client/src/input/inputs.rs | 25 ++++++++++++++++++++++--- scripts/daemon/lesavka-core.sh | 3 +-- scripts/install/server.sh | 2 +- server/src/main.rs | 23 ++++++++++++++++++++--- server/src/video.rs | 34 ++++++++++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 18 deletions(-) diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 49476e8..e751e63 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -2,6 +2,7 @@ #![forbid(unsafe_code)] use anyhow::Result; +use anyhow::Context; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; @@ -22,7 +23,7 @@ impl CameraCapture { .unwrap_or_else(|| "/dev/video0".into()); let desc = format!( - "v4l2src device={dev} io-mode=dmabuf-do-timestamp=true ! \ + "v4l2src device={dev} io-mode=dmabuf do-timestamp=true ! \ videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! \ v4l2h264enc key-int-max=30 \ extra-controls=\"encode,frame_level_rate_control_enable=1,h264_profile=4\" ! \ @@ -30,12 +31,13 @@ impl CameraCapture { appsink name=asink emit-signals=true max-buffers=60 drop=true" ); - // --- NEW: propagate the Result and down‑cast with `?` - let pipeline: gst::Pipeline = gst::parse::launch(&desc)? + tracing::debug!("📸 pipeline‑desc:\n{desc}"); + let pipeline: gst::Pipeline = gst::parse::launch(&desc) + .context("gst parse_launch(cam)")? .downcast::() .expect("not a pipeline"); - // --- NEW: the lookup already returns the concrete AppSink → just unwrap with `?` + tracing::debug!("📸 pipeline built OK – setting PLAYING…"); let sink: gst_app::AppSink = pipeline .by_name("asink") .expect("appsink element not found") @@ -43,6 +45,7 @@ impl CameraCapture { .expect("appsink down‑cast"); pipeline.set_state(gst::State::Playing)?; + tracing::info!("📸 webcam pipeline ▶️ device={dev}"); Ok(Self { pipeline, sink }) } @@ -72,14 +75,11 @@ impl CameraCapture { /// Cheap stub used when the web‑cam is disabled pub fn new_stub() -> Self { let pipeline = gst::Pipeline::new(); - - // --- NEW: AppSink factory helper (AppSink::new() does not exist) let sink: gst_app::AppSink = gst::ElementFactory::make("appsink") .build() - .expect("make appsink") + .expect("appsink") .downcast::() - .expect("appsink down‑cast"); - + .unwrap(); Self { pipeline, sink } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 861e596..39c0efb 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -93,9 +93,22 @@ impl InputAggregator { } // Auto‑select webcam (may be toggled later through magic chord) - self.camera = match CameraCapture::new(std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()) { - Ok(cam) => { info!("📸 webcam enabled"); Some(cam) } - Err(e) => { warn!("📸 webcam disabled: {e}"); None } + let cam_src_env = std::env::var("LESAVKA_CAM_SOURCE").ok(); + self.camera = match cam_src_env.as_deref() + // .or(device_fragment) // <- if you later wire CLI arg + .map(|s| CameraCapture::new(Some(s))) { + Some(Ok(c)) => { + info!("📸 webcam enabled (device fragment = {:?})", cam_src_env); + Some(c) + } + Some(Err(e)) => { + warn!("📸 webcam disabled: {e:#}"); + Some(CameraCapture::new_stub()) // keep stub so call‑sites compile + } + None => { + info!("📸 webcam disabled (no CAM_SOURCE set)"); + None // or Stub – your choice + } }; Ok(()) @@ -135,6 +148,12 @@ impl InputAggregator { mouse.process_events(); } + if let Some(cam) = &self.camera { + debug!("📸 CameraCapture present – first pull() will block until a frame arrives"); + } else { + debug!("📸 No camera pipeline active"); + } + self.magic_active = magic_now; tick.tick().await; } diff --git a/scripts/daemon/lesavka-core.sh b/scripts/daemon/lesavka-core.sh index e4c1ece..e687396 100644 --- a/scripts/daemon/lesavka-core.sh +++ b/scripts/daemon/lesavka-core.sh @@ -114,7 +114,6 @@ echo 32 >"$U/req_number" 2>/dev/null || true # ----------------------- UVC function (usb‑video) ------------------ mkdir -p "$G/functions/uvc.usb0" -# Mandatory control interface strings mkdir -p "$G/functions/uvc.usb0/control/strings/0x409" echo "Lesavka UVC" >"$G/functions/uvc.usb0/control/strings/0x409/label" # Simple 720p MJPEG + 720p H.264 alt‑setting @@ -152,6 +151,6 @@ ln -s $G/functions/uvc.usb0 $G/configs/c.1/ # 4. Bind gadget #────────────────────────────────────────────────── echo "$UDC" >"$G/UDC" -log "🎉 gadget bound on $UDC (hidg0, hidg1, UAC2 L+R)" +log "🎉 gadget bound on $UDC (hidg0, hidg1, UAC2 L+R, UVC)" exit 0 diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 1a9f19a..9823865 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -122,7 +122,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=info,lesavka_server::gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=trace,lesavka_server::gadget=info Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Restart=always diff --git a/server/src/main.rs b/server/src/main.rs index 3b56188..4c9cda1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -212,11 +212,28 @@ impl Relay for Handler { async fn stream_camera( &self, - _req: Request>, + req: Request>, ) -> Result, Status> { - // Simply consume the inbound stream and discard; echo back a single Empty. + // map gRPC camera id → UVC device + let uvc = std::env::var("LESAVKA_UVC_DEV") + .unwrap_or_else(|_| "/dev/video4".into()); + + // build once + let relay = video::CameraRelay::new(0, &uvc) + .map_err(|e| Status::internal(format!("{e:#}")))?; + + // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); - tx.send(Ok(Empty {})).await.ok(); + + tokio::spawn(async move { + let mut s = req.into_inner(); + while let Some(pkt) = s.next().await.transpose()? { + relay.feed(pkt); // ← all logging inside video.rs + } + tx.send(Ok(Empty {})).await.ok(); + Ok::<(), Status>(()) + }); + Ok(Response::new(ReceiverStream::new(rx))) } diff --git a/server/src/video.rs b/server/src/video.rs index 78884a5..dd244b4 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -259,3 +259,37 @@ impl WebcamSink { let _ = self.appsrc.push_buffer(buf); } } + +/*─────────────────────────────────*/ +/* gRPC → WebcamSink relay */ +/*─────────────────────────────────*/ + +pub struct CameraRelay { + sink: WebcamSink, // the v4l2sink pipeline (or stub) + id: u32, // gRPC “id” (for future multi‑cam) + frames: std::sync::atomic::AtomicU64, +} + +impl CameraRelay { + pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result { + Ok(Self { + sink: WebcamSink::new(uvc_dev)?, + id, + frames: std::sync::atomic::AtomicU64::new(0), + }) + } + + /// Push one VideoPacket coming from the client + pub fn feed(&self, pkt: VideoPacket) { + let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 300 == 0 { + tracing::debug!(target:"lesavka_server::video", + cam_id = self.id, + frame = n, + bytes = pkt.data.len(), + pts = pkt.pts, + "📸 srv webcam frame"); + } + self.sink.push(pkt); + } +} From 2fdfa7e59754797d9ce061d85ff5c8cce09065eb Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 3 Jul 2025 15:22:30 -0500 Subject: [PATCH 3/7] camera core add --- client/src/app.rs | 53 +++++++++++++++++++++++++++++++- client/src/input/camera.rs | 62 ++++++++++++++++++++++++++++++++------ client/src/input/inputs.rs | 10 ++++-- server/src/video.rs | 7 ++++- 4 files changed, 117 insertions(+), 15 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index aa7115e..53b5664 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -21,6 +21,7 @@ use lesavka_common::lesavka::{ use crate::{input::inputs::InputAggregator, input::microphone::MicrophoneCapture, + input::camera::CameraCapture, output::video::MonitorWindow, output::audio::AudioOut}; @@ -46,6 +47,14 @@ impl LesavkaClientApp { let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately + let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() { + None + } else { + Some(Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?)) + }; + Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } @@ -134,6 +143,14 @@ impl LesavkaClientApp { let ep_mic = vid_ep.clone(); tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── webcam gRPC pusher ───────────────*/ + if !std::env::var("LESAVKA_CAM_DISABLE").is_ok() { + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + } + /*────────── central reactor ───────────────────*/ tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, @@ -295,5 +312,39 @@ impl LesavkaClientApp { let _ = stop_tx.send(()); tokio::time::sleep(Duration::from_secs(1)).await; } - } + } + + /*──────────────── cam stream ───────────────────*/ + async fn cam_loop(ep: Channel, cam: Arc) { + loop { + let mut cli = RelayClient::new(ep.clone()); + + // pull frames in a real thread so we don’t block async scheduler + let (tx_pkt, rx_pkt) = tokio::sync::mpsc::channel::(256); + std::thread::spawn({ + let cam = cam.clone(); + move || { + while let Some(pkt) = cam.pull() { + // TRACE every 120 frames only + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 120 == 0 { + tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); + } + let _ = tx_pkt.blocking_send(pkt); + } + } + }); + + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx_pkt); + match cli.stream_camera(Request::new(outbound)).await { + Ok(mut resp) => { + while resp.get_mut().message().await.transpose().is_some() {} + } + Err(e) => tracing::warn!("❌📸 connect failed: {e}"), + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } } diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index e751e63..858d815 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -22,16 +22,30 @@ impl CameraCapture { .and_then(Self::find_device) .unwrap_or_else(|| "/dev/video0".into()); - let desc = format!( - "v4l2src device={dev} io-mode=dmabuf do-timestamp=true ! \ - videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! \ - v4l2h264enc key-int-max=30 \ - extra-controls=\"encode,frame_level_rate_control_enable=1,h264_profile=4\" ! \ - h264parse config-interval=-1 ! \ - appsink name=asink emit-signals=true max-buffers=60 drop=true" - ); + // let (enc, raw_caps) = Self::pick_encoder(); + // (NVIDIA → VA-API → software x264). + let (enc, kf_prop, kf_val) = Self::choose_encoder(); + tracing::info!("📸 using encoder element: {enc}"); + + // let desc = format!( + // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ + // videoconvert ! {enc} key-int-max=30 ! \ + // h264parse config-interval=-1 ! \ + // appsink name=asink emit-signals=true max-buffers=60 drop=true" + // ); + // tracing::debug!(%desc, "📸 pipeline-desc"); + // Build a pipeline that works for any of the three encoders. + // * nvh264enc needs NVMM memory caps; + // * vaapih264enc wants system-memory caps; + // * x264enc needs the usual raw caps. + let desc = format!( + "v4l2src device={dev} do-timestamp=true ! \ + videoconvert ! {enc} {kf_prop}={kf_val} ! \ + h264parse config-interval=-1 ! \ + appsink name=asink emit-signals=true max-buffers=60 drop=true" + ); + tracing::info!(%enc, ?desc, "📸 using encoder element"); - tracing::debug!("📸 pipeline‑desc:\n{desc}"); let pipeline: gst::Pipeline = gst::parse::launch(&desc) .context("gst parse_launch(cam)")? .downcast::() @@ -55,7 +69,7 @@ impl CameraCapture { let buf = sample.buffer()?; let map = buf.map_readable().ok()?; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; - Some(VideoPacket { id: 100, pts, data: map.as_slice().to_vec() }) + Some(VideoPacket { id: 0, pts, data: map.as_slice().to_vec() }) } /// Fuzzy‑match devices under `/dev/v4l/by-id` @@ -82,4 +96,32 @@ impl CameraCapture { .unwrap(); Self { pipeline, sink } } + + fn pick_encoder() -> (&'static str, &'static str) { + let encoders = &[ + ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), + ("vaapih264enc","video/x-raw,format=NV12"), + ("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc. + ("x264enc", "video/x-raw"), // software + ]; + for (name, caps) in encoders { + if gst::ElementFactory::find(name).is_some() { + return (name, caps); + } + } + // last resort – software + ("x264enc", "video/x-raw") + } + + /// Return the encoder element *and* the property/key-frame pair we must + /// set to get an IDR every 30 frames. + fn choose_encoder() -> (&'static str, &'static str, &'static str) { + if gst::ElementFactory::find("nvh264enc").is_some() { + ("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0} + } else if gst::ElementFactory::find("vaapih264enc").is_some() { + ("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1} + } else { + ("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2} + } + } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 39c0efb..b2f1813 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, warn, trace}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; @@ -102,7 +102,11 @@ impl InputAggregator { Some(c) } Some(Err(e)) => { - warn!("📸 webcam disabled: {e:#}"); + warn!( + "📸 webcam disabled – {:?}. \ + Hint: install gst‑plugins‑bad or provide LESAVKA_CAM_DISABLE=1", + e.root_cause() + ); Some(CameraCapture::new_stub()) // keep stub so call‑sites compile } None => { @@ -149,7 +153,7 @@ impl InputAggregator { } if let Some(cam) = &self.camera { - debug!("📸 CameraCapture present – first pull() will block until a frame arrives"); + trace!("📸 CameraCapture present – first pull() will block until a frame arrives"); } else { debug!("📸 No camera pipeline active"); } diff --git a/server/src/video.rs b/server/src/video.rs index dd244b4..62d85e9 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -282,13 +282,18 @@ impl CameraRelay { /// Push one VideoPacket coming from the client pub fn feed(&self, pkt: VideoPacket) { let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n % 300 == 0 { + if n < 10 || n % 60 == 0 { tracing::debug!(target:"lesavka_server::video", cam_id = self.id, frame = n, bytes = pkt.data.len(), pts = pkt.pts, "📸 srv webcam frame"); + } else if n % 10 == 0 { + tracing::trace!(target:"lesavka_server::video", + cam_id = self.id, + bytes = pkt.data.len(), + "📥 srv pkt"); } self.sink.push(pkt); } From 9fc17478b242b5504113ef0de7bc24ee70725a32 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 3 Jul 2025 16:08:30 -0500 Subject: [PATCH 4/7] camera core add --- client/src/app.rs | 32 ++++++++++++-------------------- server/src/video.rs | 6 +++--- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 53b5664..b801249 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -138,18 +138,14 @@ impl LesavkaClientApp { tokio::spawn(Self::audio_loop(ep_audio, audio_out)); - /*────────── microphone gRPC pusher ───────────*/ - let mic = Arc::new(MicrophoneCapture::new()?); - let ep_mic = vid_ep.clone(); - tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── camera & mic tasks ───────────────*/ + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - /*────────── webcam gRPC pusher ───────────────*/ - if !std::env::var("LESAVKA_CAM_DISABLE").is_ok() { - let cam = Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?); - tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - } + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); /*────────── central reactor ───────────────────*/ tokio::select! { @@ -318,9 +314,8 @@ impl LesavkaClientApp { async fn cam_loop(ep: Channel, cam: Arc) { loop { let mut cli = RelayClient::new(ep.clone()); + let (tx, rx) = tokio::sync::mpsc::channel::(256); - // pull frames in a real thread so we don’t block async scheduler - let (tx_pkt, rx_pkt) = tokio::sync::mpsc::channel::(256); std::thread::spawn({ let cam = cam.clone(); move || { @@ -332,17 +327,14 @@ impl LesavkaClientApp { if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } - let _ = tx_pkt.blocking_send(pkt); + let _ = tx.blocking_send(pkt); } } }); - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx_pkt); - match cli.stream_camera(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => tracing::warn!("❌📸 connect failed: {e}"), + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + if let Err(e) = cli.stream_camera(Request::new(outbound)).await { + tracing::warn!("❌📸 connect failed: {e}"); } tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/server/src/video.rs b/server/src/video.rs index 62d85e9..3e018dd 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -181,7 +181,7 @@ pub async fn eye_ball( trace!(target:"lesavka_server::video", eye = %eye, size = size, - "📤 sent"); + "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { static DROP_CNT: std::sync::atomic::AtomicU64 = @@ -191,7 +191,7 @@ pub async fn eye_ball( debug!(target:"lesavka_server::video", eye = %eye, dropped = c, - "⏳ channel full - dropping frames"); + "🎥⏳ channel full - dropping frames"); } } Err(e) => error!("mpsc send err: {e}"), @@ -293,7 +293,7 @@ impl CameraRelay { tracing::trace!(target:"lesavka_server::video", cam_id = self.id, bytes = pkt.data.len(), - "📥 srv pkt"); + "📸📥 srv pkt"); } self.sink.push(pkt); } From 84790ea470bae935fb9032a34be71e9168ad3bd8 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 4 Jul 2025 01:56:59 -0500 Subject: [PATCH 5/7] more logging --- client/src/app.rs | 74 +++++++++++++++++++++------------- client/src/handshake.rs | 29 +++++++++++++ client/src/input/camera.rs | 35 +++++++++++----- client/src/input/inputs.rs | 36 ++--------------- client/src/input/microphone.rs | 20 +++++++-- client/src/lib.rs | 1 + common/proto/lesavka.proto | 7 ++++ scripts/install/server.sh | 2 +- server/src/video.rs | 12 ++++++ 9 files changed, 140 insertions(+), 76 deletions(-) create mode 100644 client/src/handshake.rs diff --git a/client/src/app.rs b/client/src/app.rs index b801249..b35c986 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,7 +19,8 @@ use lesavka_common::lesavka::{ MonitorRequest, MouseReport, VideoPacket, AudioPacket }; -use crate::{input::inputs::InputAggregator, +use crate::{handshake, + input::inputs::InputAggregator, input::microphone::MicrophoneCapture, input::camera::CameraCapture, output::video::MonitorWindow, @@ -47,18 +48,14 @@ impl LesavkaClientApp { let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately - let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() { - None - } else { - Some(Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?)) - }; - Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { + /*────────── handshake / feature-negotiation ───────────────*/ + let caps = handshake::negotiate(&self.server_addr).await; + tracing::info!("🤝 server capabilities = {:?}", caps); + /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) @@ -137,13 +134,17 @@ impl LesavkaClientApp { let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); - - /*────────── camera & mic tasks ───────────────*/ - let cam = Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?); - tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - + /*────────── camera & mic tasks (gated by caps) ───────────*/ + if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + if caps.microphone { + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); + } + } let mic = Arc::new(MicrophoneCapture::new()?); tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); @@ -269,6 +270,7 @@ impl LesavkaClientApp { /*──────────────── mic stream ─────────────────*/ async fn mic_loop(ep: Channel, mic: Arc) { + let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); @@ -290,12 +292,11 @@ impl LesavkaClientApp { // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - - match cli.stream_microphone(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => { + match cli.stream_microphone(Request::new(outbound)).await { + Ok(mut resp) => { + while resp.get_mut().message().await.transpose().is_some() {} + } + Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); @@ -303,15 +304,17 @@ impl LesavkaClientApp { } else { debug!("❌🎤 reconnect failed: {e}"); } + delay = next_delay(delay); } } let _ = stop_tx.send(()); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ async fn cam_loop(ep: Channel, cam: Arc) { + let mut delay = Duration::from_secs(1); loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); @@ -327,16 +330,33 @@ impl LesavkaClientApp { if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } - let _ = tx.blocking_send(pkt); + tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", + pkt.pts, pkt.data.len()); + let _ = tx.try_send(pkt); } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - if let Err(e) = cli.stream_camera(Request::new(outbound)).await { - tracing::warn!("❌📸 connect failed: {e}"); + match cli.stream_camera(Request::new(outbound)).await { + Ok(_) => delay = Duration::from_secs(1), // got a stream → reset + Err(e) if e.code() == tonic::Code::Unimplemented => { + tracing::warn!("📸 server does not support StreamCamera – giving up"); + return; // stop the task completely (#3) + } + Err(e) => { + tracing::warn!("❌📸 connect failed: {e:?}"); + delay = next_delay(delay); // back-off (#2) + } } - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; } } } + +fn next_delay(cur: std::time::Duration) -> std::time::Duration { + match cur.as_secs() { + 1..=15 => cur * 2, + _ => std::time::Duration::from_secs(30), + } +} diff --git a/client/src/handshake.rs b/client/src/handshake.rs new file mode 100644 index 0000000..190f4ac --- /dev/null +++ b/client/src/handshake.rs @@ -0,0 +1,29 @@ +// client/src/handshake.rs +#![forbid(unsafe_code)] + +use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; +use tonic::Code; + +#[derive(Default, Clone, Copy, Debug)] +pub struct PeerCaps { + pub camera: bool, + pub microphone: bool, +} + +pub async fn negotiate(uri: &str) -> PeerCaps { + let mut cli = HandshakeClient::connect(uri.to_owned()) + .await + .expect("\"dial handshake\""); + + match cli.get_capabilities(pb::Empty {}).await { + Ok(rsp) => PeerCaps { + camera: rsp.get_ref().camera, + microphone: rsp.get_ref().microphone, + }, + Err(e) if e.code() == Code::Unimplemented => { + // ↺ old server – pretend it supports nothing special. + PeerCaps::default() + } + Err(e) => panic!("\"handshake failed: {e}\""), + } +} diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 858d815..1e2f8b2 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -26,6 +26,17 @@ impl CameraCapture { // (NVIDIA → VA-API → software x264). let (enc, kf_prop, kf_val) = Self::choose_encoder(); tracing::info!("📸 using encoder element: {enc}"); + let (src_caps, preenc) = match enc { + "nvh264enc" => ( + "video/x-raw(memory:NVMM),format=NV12,width=1280,height=720", + "nvvidconv !" + ), + "vaapih264enc" => ( + "video/x-raw,format=NV12,width=1280,height=720", + "videoconvert !" + ), + _ => ("video/x-raw,width=1280,height=720", "videoconvert !"), + }; // let desc = format!( // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ @@ -39,9 +50,10 @@ impl CameraCapture { // * vaapih264enc wants system-memory caps; // * x264enc needs the usual raw caps. let desc = format!( - "v4l2src device={dev} do-timestamp=true ! \ - videoconvert ! {enc} {kf_prop}={kf_val} ! \ - h264parse config-interval=-1 ! \ + "v4l2src device={dev} do-timestamp=true ! {src_caps} ! \ + {preenc} {enc} {kf_prop}={kf_val} ! \ + h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ + queue max-size-buffers=30 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=60 drop=true" ); tracing::info!(%enc, ?desc, "📸 using encoder element"); @@ -97,6 +109,7 @@ impl CameraCapture { Self { pipeline, sink } } + #[allow(dead_code)] // helper kept for future heuristics fn pick_encoder() -> (&'static str, &'static str) { let encoders = &[ ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), @@ -113,15 +126,15 @@ impl CameraCapture { ("x264enc", "video/x-raw") } - /// Return the encoder element *and* the property/key-frame pair we must - /// set to get an IDR every 30 frames. fn choose_encoder() -> (&'static str, &'static str, &'static str) { - if gst::ElementFactory::find("nvh264enc").is_some() { - ("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0} - } else if gst::ElementFactory::find("vaapih264enc").is_some() { - ("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1} - } else { - ("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2} + match () { + _ if gst::ElementFactory::find("nvh264enc").is_some() => + ("nvh264enc", "gop-size", "30"), + _ if gst::ElementFactory::find("vaapih264enc").is_some() => + ("vaapih264enc","keyframe-period","30"), + _ if gst::ElementFactory::find("v4l2h264enc").is_some() => + ("v4l2h264enc","idrcount", "30"), + _ => ("x264enc", "key-int-max", "30"), } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index b2f1813..1ae54c2 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -7,7 +7,7 @@ use tracing::{debug, info, warn, trace}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; -use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, camera::CameraCapture}; +use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator}; use crate::layout::{Layout, apply as apply_layout}; pub struct InputAggregator { @@ -18,7 +18,6 @@ pub struct InputAggregator { magic_active: bool, keyboards: Vec, mice: Vec, - camera: Option, } impl InputAggregator { @@ -26,8 +25,8 @@ impl InputAggregator { kbd_tx: Sender, mou_tx: Sender) -> Self { Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false, - keyboards: Vec::new(), mice: Vec::new(), - camera: None } + keyboards: Vec::new(), mice: Vec::new() + } } /// Called once at startup: enumerates input devices, @@ -92,29 +91,6 @@ impl InputAggregator { bail!("No suitable keyboard/mouse devices found or none grabbed."); } - // Auto‑select webcam (may be toggled later through magic chord) - let cam_src_env = std::env::var("LESAVKA_CAM_SOURCE").ok(); - self.camera = match cam_src_env.as_deref() - // .or(device_fragment) // <- if you later wire CLI arg - .map(|s| CameraCapture::new(Some(s))) { - Some(Ok(c)) => { - info!("📸 webcam enabled (device fragment = {:?})", cam_src_env); - Some(c) - } - Some(Err(e)) => { - warn!( - "📸 webcam disabled – {:?}. \ - Hint: install gst‑plugins‑bad or provide LESAVKA_CAM_DISABLE=1", - e.root_cause() - ); - Some(CameraCapture::new_stub()) // keep stub so call‑sites compile - } - None => { - info!("📸 webcam disabled (no CAM_SOURCE set)"); - None // or Stub – your choice - } - }; - Ok(()) } @@ -152,12 +128,6 @@ impl InputAggregator { mouse.process_events(); } - if let Some(cam) = &self.camera { - trace!("📸 CameraCapture present – first pull() will block until a frame arrives"); - } else { - debug!("📸 No camera pipeline active"); - } - self.magic_active = magic_now; tick.tick().await; } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 936dcd5..27d2441 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -30,12 +30,24 @@ impl MicrophoneCapture { _ => String::new(), }; debug!("🎤 device: {device_arg}"); + let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] + .into_iter() + .find(|e| gst::ElementFactory::find(e).is_some()) + .unwrap_or("opusenc"); + let parser = if aac.contains("opus") { + // opusenc already outputs raw Opus frames – just state the caps + "capsfilter caps=audio/x-opus,rate=48000,channels=2" + } else { + // AAC → ADTS frames + "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2" + }; let desc = format!( "pulsesrc {device_arg} do-timestamp=true ! \ - audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ - audioconvert ! audioresample ! avenc_aac bitrate=128000 ! \ - aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! \ - appsink name=asink emit-signals=true max-buffers=50 drop=true" + audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audioconvert ! audioresample ! {aac} bitrate=128000 ! \ + {parser} ! \ + queue max-size-buffers=100 leaky=downstream ! \ + appsink name=asink emit-signals=true max-buffers=50 drop=true" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? diff --git a/client/src/lib.rs b/client/src/lib.rs index 0db2943..ba798cc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -6,5 +6,6 @@ pub mod app; pub mod input; pub mod output; pub mod layout; +pub mod handshake; pub use app::LesavkaClientApp; diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 8e97acc..7d37566 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -1,3 +1,4 @@ +// ───────────────────────────────────────── proto/lesavka.proto syntax = "proto3"; package lesavka; @@ -10,6 +11,8 @@ message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message ResetUsbReply { bool ok = 1; } // true = success +message HandshakeSet { bool camera = 1; bool microphone = 2; } + message Empty {} service Relay { @@ -22,3 +25,7 @@ service Relay { rpc ResetUsb (Empty) returns (ResetUsbReply); } + +service Handshake { + rpc GetCapabilities (Empty) returns (HandshakeSet); +} diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 9823865..8ca910c 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -122,7 +122,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=trace,lesavka_server::gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=trace,lesavka_server::video=trace,lesavka_server::gadget=info Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Restart=always diff --git a/server/src/video.rs b/server/src/video.rs index 3e018dd..7957e9c 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -295,6 +295,18 @@ impl CameraRelay { bytes = pkt.data.len(), "📸📥 srv pkt"); } + + if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) { + if n % 120 == 0 { + let path = format!("/tmp/eye3-cli-{n:05}.h264"); + if let Err(e) = std::fs::write(&path, &pkt.data) { + tracing::warn!("📸💾 dump failed: {e}"); + } else { + tracing::debug!("📸💾 wrote {}", path); + } + } + } + self.sink.push(pkt); } } From 49022b37149b35b20bfd8591e3fd6519fd5bf84f Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 4 Jul 2025 03:41:39 -0500 Subject: [PATCH 6/7] added handshakes --- client/src/app.rs | 12 +++++------- client/src/input/camera.rs | 2 +- server/src/handshake.rs | 32 ++++++++++++++++++++++++++++++++ server/src/lib.rs | 1 + server/src/main.rs | 3 ++- 5 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 server/src/handshake.rs diff --git a/client/src/app.rs b/client/src/app.rs index b35c986..39cfca3 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -140,13 +140,11 @@ impl LesavkaClientApp { std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() )?); tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - if caps.microphone { - let mic = Arc::new(MicrophoneCapture::new()?); - tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); - } } - let mic = Arc::new(MicrophoneCapture::new()?); - tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); + if caps.microphone { + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed + } /*────────── central reactor ───────────────────*/ tokio::select! { @@ -269,7 +267,7 @@ impl LesavkaClientApp { } /*──────────────── mic stream ─────────────────*/ - async fn mic_loop(ep: Channel, mic: Arc) { + async fn voice_loop(ep: Channel, mic: Arc) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 1e2f8b2..c0a776f 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -81,7 +81,7 @@ impl CameraCapture { let buf = sample.buffer()?; let map = buf.map_readable().ok()?; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; - Some(VideoPacket { id: 0, pts, data: map.as_slice().to_vec() }) + Some(VideoPacket { id: 2, pts, data: map.as_slice().to_vec() }) } /// Fuzzy‑match devices under `/dev/v4l/by-id` diff --git a/server/src/handshake.rs b/server/src/handshake.rs new file mode 100644 index 0000000..7152fa5 --- /dev/null +++ b/server/src/handshake.rs @@ -0,0 +1,32 @@ +// ─── server/src/handshake.rs ─────────────────────────────────────────────── +use tonic::{Request, Response, Status}; + +use lesavka_common::lesavka::{ + Empty, HandshakeSet, + handshake_server::{Handshake, HandshakeServer}, +}; + +/// Static capabilities for now; could be probed at runtime +pub struct HandshakeSvc { + pub camera: bool, + pub microphone: bool, +} + +#[tonic::async_trait] +impl Handshake for HandshakeSvc { + async fn get_capabilities( + &self, + _req: Request, + ) -> Result, Status> { + Ok(Response::new(HandshakeSet { + camera: self.camera, + microphone: self.microphone, + })) + } +} + +impl HandshakeSvc { + pub fn server() -> HandshakeServer { + HandshakeServer::new(Self { camera: true, microphone: true }) + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index b1b4b61..38c337f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -3,3 +3,4 @@ pub mod audio; pub mod video; pub mod gadget; +pub mod handshake; diff --git a/server/src/main.rs b/server/src/main.rs index 4c9cda1..3efef6a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,7 +28,7 @@ use lesavka_common::lesavka::{ MonitorRequest, VideoPacket, AudioPacket }; -use lesavka_server::{gadget::UsbGadget, video, audio}; +use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc}; /*──────────────── constants ────────────────*/ /// **false** = never reset automatically. @@ -305,6 +305,7 @@ async fn main() -> anyhow::Result<()> { .tcp_nodelay(true) .max_frame_size(Some(2*1024*1024)) .add_service(RelayServer::new(handler)) + .add_service(HandshakeSvc::server()) .add_service(ReflBuilder::configure().build_v1().unwrap()) .serve(([0,0,0,0], 50051).into()) .await?; From d24f6c438602764e3062662279230eb914e2d486 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 4 Jul 2025 10:17:01 -0500 Subject: [PATCH 7/7] testing handshakes --- server/src/handshake.rs | 1 - server/src/main.rs | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/handshake.rs b/server/src/handshake.rs index 7152fa5..d11f179 100644 --- a/server/src/handshake.rs +++ b/server/src/handshake.rs @@ -6,7 +6,6 @@ use lesavka_common::lesavka::{ handshake_server::{Handshake, HandshakeServer}, }; -/// Static capabilities for now; could be probed at runtime pub struct HandshakeSvc { pub camera: bool, pub microphone: bool, diff --git a/server/src/main.rs b/server/src/main.rs index 3efef6a..9f3376f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,8 @@ use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc}; /*──────────────── constants ────────────────*/ /// **false** = never reset automatically. const AUTO_CYCLE: bool = false; +const VERSION: &str = env!("CARGO_PKG_VERSION"); +const PKG_NAME: &str = env!("CARGO_PKG_NAME"); /*──────────────── logging ───────────────────*/ fn init_tracing() -> anyhow::Result { @@ -291,6 +293,7 @@ impl Relay for Handler { #[tokio::main(worker_threads = 4)] async fn main() -> anyhow::Result<()> { let _guard = init_tracing()?; + info!("🚀 {} v{} starting up", PKG_NAME, VERSION); panic::set_hook(Box::new(|p| { let bt = Backtrace::force_capture();