diff --git a/client/src/app.rs b/client/src/app.rs index aa7115e..53b5664 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -21,6 +21,7 @@ use lesavka_common::lesavka::{ use crate::{input::inputs::InputAggregator, input::microphone::MicrophoneCapture, + input::camera::CameraCapture, output::video::MonitorWindow, output::audio::AudioOut}; @@ -46,6 +47,14 @@ impl LesavkaClientApp { let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately + let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() { + None + } else { + Some(Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?)) + }; + Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } @@ -134,6 +143,14 @@ impl LesavkaClientApp { let ep_mic = vid_ep.clone(); tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── webcam gRPC pusher ───────────────*/ + if !std::env::var("LESAVKA_CAM_DISABLE").is_ok() { + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + } + /*────────── central reactor ───────────────────*/ tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, @@ -295,5 +312,39 @@ impl LesavkaClientApp { let _ = stop_tx.send(()); tokio::time::sleep(Duration::from_secs(1)).await; } - } + } + + /*──────────────── cam stream ───────────────────*/ + async fn cam_loop(ep: Channel, cam: Arc) { + loop { + let mut cli = RelayClient::new(ep.clone()); + + // pull frames in a real thread so we don’t block async scheduler + let (tx_pkt, rx_pkt) = tokio::sync::mpsc::channel::(256); + std::thread::spawn({ + let cam = cam.clone(); + move || { + while let Some(pkt) = cam.pull() { + // TRACE every 120 frames only + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 120 == 0 { + tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); + } + let _ = tx_pkt.blocking_send(pkt); + } + } + }); + + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx_pkt); + match cli.stream_camera(Request::new(outbound)).await { + Ok(mut resp) => { + while resp.get_mut().message().await.transpose().is_some() {} + } + Err(e) => tracing::warn!("❌📸 connect failed: {e}"), + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } } diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index e751e63..858d815 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -22,16 +22,30 @@ impl CameraCapture { .and_then(Self::find_device) .unwrap_or_else(|| "/dev/video0".into()); - let desc = format!( - "v4l2src device={dev} io-mode=dmabuf do-timestamp=true ! \ - videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! \ - v4l2h264enc key-int-max=30 \ - extra-controls=\"encode,frame_level_rate_control_enable=1,h264_profile=4\" ! \ - h264parse config-interval=-1 ! \ - appsink name=asink emit-signals=true max-buffers=60 drop=true" - ); + // let (enc, raw_caps) = Self::pick_encoder(); + // (NVIDIA → VA-API → software x264). + let (enc, kf_prop, kf_val) = Self::choose_encoder(); + tracing::info!("📸 using encoder element: {enc}"); + + // let desc = format!( + // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ + // videoconvert ! {enc} key-int-max=30 ! \ + // h264parse config-interval=-1 ! \ + // appsink name=asink emit-signals=true max-buffers=60 drop=true" + // ); + // tracing::debug!(%desc, "📸 pipeline-desc"); + // Build a pipeline that works for any of the three encoders. + // * nvh264enc needs NVMM memory caps; + // * vaapih264enc wants system-memory caps; + // * x264enc needs the usual raw caps. + let desc = format!( + "v4l2src device={dev} do-timestamp=true ! \ + videoconvert ! {enc} {kf_prop}={kf_val} ! \ + h264parse config-interval=-1 ! \ + appsink name=asink emit-signals=true max-buffers=60 drop=true" + ); + tracing::info!(%enc, ?desc, "📸 using encoder element"); - tracing::debug!("📸 pipeline‑desc:\n{desc}"); let pipeline: gst::Pipeline = gst::parse::launch(&desc) .context("gst parse_launch(cam)")? .downcast::() @@ -55,7 +69,7 @@ impl CameraCapture { let buf = sample.buffer()?; let map = buf.map_readable().ok()?; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; - Some(VideoPacket { id: 100, pts, data: map.as_slice().to_vec() }) + Some(VideoPacket { id: 0, pts, data: map.as_slice().to_vec() }) } /// Fuzzy‑match devices under `/dev/v4l/by-id` @@ -82,4 +96,32 @@ impl CameraCapture { .unwrap(); Self { pipeline, sink } } + + fn pick_encoder() -> (&'static str, &'static str) { + let encoders = &[ + ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), + ("vaapih264enc","video/x-raw,format=NV12"), + ("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc. + ("x264enc", "video/x-raw"), // software + ]; + for (name, caps) in encoders { + if gst::ElementFactory::find(name).is_some() { + return (name, caps); + } + } + // last resort – software + ("x264enc", "video/x-raw") + } + + /// Return the encoder element *and* the property/key-frame pair we must + /// set to get an IDR every 30 frames. + fn choose_encoder() -> (&'static str, &'static str, &'static str) { + if gst::ElementFactory::find("nvh264enc").is_some() { + ("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0} + } else if gst::ElementFactory::find("vaapih264enc").is_some() { + ("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1} + } else { + ("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2} + } + } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 39c0efb..b2f1813 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, warn, trace}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; @@ -102,7 +102,11 @@ impl InputAggregator { Some(c) } Some(Err(e)) => { - warn!("📸 webcam disabled: {e:#}"); + warn!( + "📸 webcam disabled – {:?}. \ + Hint: install gst‑plugins‑bad or provide LESAVKA_CAM_DISABLE=1", + e.root_cause() + ); Some(CameraCapture::new_stub()) // keep stub so call‑sites compile } None => { @@ -149,7 +153,7 @@ impl InputAggregator { } if let Some(cam) = &self.camera { - debug!("📸 CameraCapture present – first pull() will block until a frame arrives"); + trace!("📸 CameraCapture present – first pull() will block until a frame arrives"); } else { debug!("📸 No camera pipeline active"); } diff --git a/server/src/video.rs b/server/src/video.rs index dd244b4..62d85e9 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -282,13 +282,18 @@ impl CameraRelay { /// Push one VideoPacket coming from the client pub fn feed(&self, pkt: VideoPacket) { let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n % 300 == 0 { + if n < 10 || n % 60 == 0 { tracing::debug!(target:"lesavka_server::video", cam_id = self.id, frame = n, bytes = pkt.data.len(), pts = pkt.pts, "📸 srv webcam frame"); + } else if n % 10 == 0 { + tracing::trace!(target:"lesavka_server::video", + cam_id = self.id, + bytes = pkt.data.len(), + "📥 srv pkt"); } self.sink.push(pkt); }