diff --git a/client/Cargo.toml b/client/Cargo.toml index b63f8a6..5141e67 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.5.0" +version = "0.6.0" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index aa7115e..39cfca3 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,8 +19,10 @@ use lesavka_common::lesavka::{ MonitorRequest, MouseReport, VideoPacket, AudioPacket }; -use crate::{input::inputs::InputAggregator, +use crate::{handshake, + input::inputs::InputAggregator, input::microphone::MicrophoneCapture, + input::camera::CameraCapture, output::video::MonitorWindow, output::audio::AudioOut}; @@ -50,6 +52,10 @@ impl LesavkaClientApp { } pub async fn run(&mut self) -> Result<()> { + /*────────── handshake / feature-negotiation ───────────────*/ + let caps = handshake::negotiate(&self.server_addr).await; + tracing::info!("🤝 server capabilities = {:?}", caps); + /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) @@ -128,11 +134,17 @@ impl LesavkaClientApp { let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); - - /*────────── microphone gRPC pusher ───────────*/ - let mic = Arc::new(MicrophoneCapture::new()?); - let ep_mic = vid_ep.clone(); - tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── camera & mic tasks (gated by caps) ───────────*/ + if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + } + if caps.microphone { + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed + } /*────────── central reactor ───────────────────*/ tokio::select! { @@ -255,7 +267,8 @@ impl LesavkaClientApp { } /*──────────────── mic stream ─────────────────*/ - async fn mic_loop(ep: Channel, mic: Arc) { + async fn voice_loop(ep: Channel, mic: Arc) { + let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); @@ -277,12 +290,11 @@ impl LesavkaClientApp { // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - - match cli.stream_microphone(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => { + match cli.stream_microphone(Request::new(outbound)).await { + Ok(mut resp) => { + while resp.get_mut().message().await.transpose().is_some() {} + } + Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); @@ -290,10 +302,59 @@ impl LesavkaClientApp { } else { debug!("❌🎤 reconnect failed: {e}"); } + delay = next_delay(delay); } } let _ = stop_tx.send(()); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; } - } + } + + /*──────────────── cam stream ───────────────────*/ + async fn cam_loop(ep: Channel, cam: Arc) { + let mut delay = Duration::from_secs(1); + loop { + let mut cli = RelayClient::new(ep.clone()); + let (tx, rx) = tokio::sync::mpsc::channel::(256); + + std::thread::spawn({ + let cam = cam.clone(); + move || { + while let Some(pkt) = cam.pull() { + // TRACE every 120 frames only + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 120 == 0 { + tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); + } + tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", + pkt.pts, pkt.data.len()); + let _ = tx.try_send(pkt); + } + } + }); + + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + match cli.stream_camera(Request::new(outbound)).await { + Ok(_) => delay = Duration::from_secs(1), // got a stream → reset + Err(e) if e.code() == tonic::Code::Unimplemented => { + tracing::warn!("📸 server does not support StreamCamera – giving up"); + return; // stop the task completely (#3) + } + Err(e) => { + tracing::warn!("❌📸 connect failed: {e:?}"); + delay = next_delay(delay); // back-off (#2) + } + } + tokio::time::sleep(delay).await; + } + } +} + +fn next_delay(cur: std::time::Duration) -> std::time::Duration { + match cur.as_secs() { + 1..=15 => cur * 2, + _ => std::time::Duration::from_secs(30), + } } diff --git a/client/src/handshake.rs b/client/src/handshake.rs new file mode 100644 index 0000000..190f4ac --- /dev/null +++ b/client/src/handshake.rs @@ -0,0 +1,29 @@ +// client/src/handshake.rs +#![forbid(unsafe_code)] + +use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; +use tonic::Code; + +#[derive(Default, Clone, Copy, Debug)] +pub struct PeerCaps { + pub camera: bool, + pub microphone: bool, +} + +pub async fn negotiate(uri: &str) -> PeerCaps { + let mut cli = HandshakeClient::connect(uri.to_owned()) + .await + .expect("\"dial handshake\""); + + match cli.get_capabilities(pb::Empty {}).await { + Ok(rsp) => PeerCaps { + camera: rsp.get_ref().camera, + microphone: rsp.get_ref().microphone, + }, + Err(e) if e.code() == Code::Unimplemented => { + // ↺ old server – pretend it supports nothing special. + PeerCaps::default() + } + Err(e) => panic!("\"handshake failed: {e}\""), + } +} diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 29814db..c0a776f 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -1,21 +1,140 @@ // client/src/input/camera.rs +#![forbid(unsafe_code)] use anyhow::Result; +use anyhow::Context; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gst::prelude::*; +use lesavka_common::lesavka::VideoPacket; -/// A stub camera aggregator or capture pub struct CameraCapture { - // no real fields yet + pipeline: gst::Pipeline, + sink: gst_app::AppSink, } impl CameraCapture { - pub fn new_stub() -> Self { - // in real code: open /dev/video0, set formats, etc - CameraCapture {} + pub fn new(device_fragment: Option<&str>) -> anyhow::Result { + gst::init().ok(); + + // Pick device + let dev = device_fragment + .and_then(Self::find_device) + .unwrap_or_else(|| "/dev/video0".into()); + + // let (enc, raw_caps) = Self::pick_encoder(); + // (NVIDIA → VA-API → software x264). + let (enc, kf_prop, kf_val) = Self::choose_encoder(); + tracing::info!("📸 using encoder element: {enc}"); + let (src_caps, preenc) = match enc { + "nvh264enc" => ( + "video/x-raw(memory:NVMM),format=NV12,width=1280,height=720", + "nvvidconv !" + ), + "vaapih264enc" => ( + "video/x-raw,format=NV12,width=1280,height=720", + "videoconvert !" + ), + _ => ("video/x-raw,width=1280,height=720", "videoconvert !"), + }; + + // let desc = format!( + // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ + // videoconvert ! {enc} key-int-max=30 ! \ + // h264parse config-interval=-1 ! \ + // appsink name=asink emit-signals=true max-buffers=60 drop=true" + // ); + // tracing::debug!(%desc, "📸 pipeline-desc"); + // Build a pipeline that works for any of the three encoders. + // * nvh264enc needs NVMM memory caps; + // * vaapih264enc wants system-memory caps; + // * x264enc needs the usual raw caps. + let desc = format!( + "v4l2src device={dev} do-timestamp=true ! {src_caps} ! \ + {preenc} {enc} {kf_prop}={kf_val} ! \ + h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ + queue max-size-buffers=30 leaky=downstream ! \ + appsink name=asink emit-signals=true max-buffers=60 drop=true" + ); + tracing::info!(%enc, ?desc, "📸 using encoder element"); + + let pipeline: gst::Pipeline = gst::parse::launch(&desc) + .context("gst parse_launch(cam)")? + .downcast::() + .expect("not a pipeline"); + + tracing::debug!("📸 pipeline built OK – setting PLAYING…"); + let sink: gst_app::AppSink = pipeline + .by_name("asink") + .expect("appsink element not found") + .downcast::() + .expect("appsink down‑cast"); + + pipeline.set_state(gst::State::Playing)?; + tracing::info!("📸 webcam pipeline ▶️ device={dev}"); + + Ok(Self { pipeline, sink }) } - /// Called regularly to capture frames or do nothing - pub fn process_frames(&mut self) -> Result<()> { - // no-op - Ok(()) + pub fn pull(&self) -> Option { + let sample = self.sink.pull_sample().ok()?; + let buf = sample.buffer()?; + let map = buf.map_readable().ok()?; + let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; + Some(VideoPacket { id: 2, pts, data: map.as_slice().to_vec() }) + } + + /// Fuzzy‑match devices under `/dev/v4l/by-id` + fn find_device(substr: &str) -> Option { + let dir = std::fs::read_dir("/dev/v4l/by-id").ok()?; + for e in dir.flatten() { + let p = e.path(); + if p.file_name()?.to_string_lossy().contains(substr) { + if let Ok(target) = std::fs::read_link(&p) { + return Some(format!("/dev/{}", target.file_name()?.to_string_lossy())); + } + } + } + None + } + + /// Cheap stub used when the web‑cam is disabled + pub fn new_stub() -> Self { + let pipeline = gst::Pipeline::new(); + let sink: gst_app::AppSink = gst::ElementFactory::make("appsink") + .build() + .expect("appsink") + .downcast::() + .unwrap(); + Self { pipeline, sink } + } + + #[allow(dead_code)] // helper kept for future heuristics + fn pick_encoder() -> (&'static str, &'static str) { + let encoders = &[ + ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), + ("vaapih264enc","video/x-raw,format=NV12"), + ("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc. + ("x264enc", "video/x-raw"), // software + ]; + for (name, caps) in encoders { + if gst::ElementFactory::find(name).is_some() { + return (name, caps); + } + } + // last resort – software + ("x264enc", "video/x-raw") + } + + fn choose_encoder() -> (&'static str, &'static str, &'static str) { + match () { + _ if gst::ElementFactory::find("nvh264enc").is_some() => + ("nvh264enc", "gop-size", "30"), + _ if gst::ElementFactory::find("vaapih264enc").is_some() => + ("vaapih264enc","keyframe-period","30"), + _ if gst::ElementFactory::find("v4l2h264enc").is_some() => + ("v4l2h264enc","idrcount", "30"), + _ => ("x264enc", "key-int-max", "30"), + } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index f471f43..1ae54c2 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -3,11 +3,11 @@ use anyhow::{bail, Context, Result}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, warn, trace}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; -use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, camera::CameraCapture}; +use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator}; use crate::layout::{Layout, apply as apply_layout}; pub struct InputAggregator { @@ -18,7 +18,6 @@ pub struct InputAggregator { magic_active: bool, keyboards: Vec, mice: Vec, - camera: Option, } impl InputAggregator { @@ -26,8 +25,8 @@ impl InputAggregator { kbd_tx: Sender, mou_tx: Sender) -> Self { Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false, - keyboards: Vec::new(), mice: Vec::new(), - camera: None } + keyboards: Vec::new(), mice: Vec::new() + } } /// Called once at startup: enumerates input devices, @@ -92,9 +91,6 @@ impl InputAggregator { bail!("No suitable keyboard/mouse devices found or none grabbed."); } - // Stubs for camera / mic: - self.camera = Some(CameraCapture::new_stub()); - Ok(()) } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 936dcd5..27d2441 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -30,12 +30,24 @@ impl MicrophoneCapture { _ => String::new(), }; debug!("🎤 device: {device_arg}"); + let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] + .into_iter() + .find(|e| gst::ElementFactory::find(e).is_some()) + .unwrap_or("opusenc"); + let parser = if aac.contains("opus") { + // opusenc already outputs raw Opus frames – just state the caps + "capsfilter caps=audio/x-opus,rate=48000,channels=2" + } else { + // AAC → ADTS frames + "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2" + }; let desc = format!( "pulsesrc {device_arg} do-timestamp=true ! \ - audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ - audioconvert ! audioresample ! avenc_aac bitrate=128000 ! \ - aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! \ - appsink name=asink emit-signals=true max-buffers=50 drop=true" + audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audioconvert ! audioresample ! {aac} bitrate=128000 ! \ + {parser} ! \ + queue max-size-buffers=100 leaky=downstream ! \ + appsink name=asink emit-signals=true max-buffers=50 drop=true" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? diff --git a/client/src/lib.rs b/client/src/lib.rs index 0db2943..ba798cc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -6,5 +6,6 @@ pub mod app; pub mod input; pub mod output; pub mod layout; +pub mod handshake; pub use app::LesavkaClientApp; diff --git a/common/Cargo.toml b/common/Cargo.toml index 4628718..24781aa 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.5.0" +version = "0.6.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 4ee8de2..7d37566 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -1,3 +1,4 @@ +// ───────────────────────────────────────── proto/lesavka.proto syntax = "proto3"; package lesavka; @@ -10,6 +11,8 @@ message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message ResetUsbReply { bool ok = 1; } // true = success +message HandshakeSet { bool camera = 1; bool microphone = 2; } + message Empty {} service Relay { @@ -17,6 +20,12 @@ service Relay { rpc StreamMouse (stream MouseReport) returns (stream MouseReport); rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); - rpc StreamMicrophone (stream AudioPacket) returns (stream Empty) {} + rpc StreamMicrophone (stream AudioPacket) returns (stream Empty); + rpc StreamCamera (stream VideoPacket) returns (stream Empty); + rpc ResetUsb (Empty) returns (ResetUsbReply); } + +service Handshake { + rpc GetCapabilities (Empty) returns (HandshakeSet); +} diff --git a/scripts/daemon/lesavka-core.sh b/scripts/daemon/lesavka-core.sh index d0f07e1..e687396 100644 --- a/scripts/daemon/lesavka-core.sh +++ b/scripts/daemon/lesavka-core.sh @@ -112,6 +112,13 @@ echo 2 >"$U/c_ssize" # Optional: allocate a few extra request buffers echo 32 >"$U/req_number" 2>/dev/null || true +# ----------------------- UVC function (usb‑video) ------------------ +mkdir -p "$G/functions/uvc.usb0" +mkdir -p "$G/functions/uvc.usb0/control/strings/0x409" +echo "Lesavka UVC" >"$G/functions/uvc.usb0/control/strings/0x409/label" +# Simple 720p MJPEG + 720p H.264 alt‑setting +printf '\x50\x00\x00\x00' >"$G/functions/uvc.usb0/control/header/h_video" + # ----------------------- configuration ----------------------------- mkdir -p "$G/configs/c.1/strings/0x409" echo 500 > "$G/configs/c.1/MaxPower" @@ -120,7 +127,8 @@ echo "Config 1: HID + UAC2" >"$G/configs/c.1/strings/0x409/configuration" ln -s $G/functions/hid.usb0 $G/configs/c.1/ ln -s $G/functions/hid.usb1 $G/configs/c.1/ -ln -s "$U" "$G/configs/c.1/" +ln -s $U $G/configs/c.1/ +ln -s $G/functions/uvc.usb0 $G/configs/c.1/ # mkdir -p $G/functions/hid.usb0/os_desc # mkdir -p $G/functions/hid.usb1/os_desc @@ -143,6 +151,6 @@ ln -s "$U" "$G/configs/c.1/" # 4. Bind gadget #────────────────────────────────────────────────── echo "$UDC" >"$G/UDC" -log "🎉 gadget bound on $UDC (hidg0, hidg1, UAC2 L+R)" +log "🎉 gadget bound on $UDC (hidg0, hidg1, UAC2 L+R, UVC)" exit 0 diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 1a9f19a..8ca910c 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -122,7 +122,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=info,lesavka_server::gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=trace,lesavka_server::video=trace,lesavka_server::gadget=info Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Restart=always diff --git a/server/Cargo.toml b/server/Cargo.toml index eba7ff2..7eb3c4f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_server" -version = "0.5.0" +version = "0.6.0" edition = "2024" [dependencies] diff --git a/server/src/handshake.rs b/server/src/handshake.rs new file mode 100644 index 0000000..d11f179 --- /dev/null +++ b/server/src/handshake.rs @@ -0,0 +1,31 @@ +// ─── server/src/handshake.rs ─────────────────────────────────────────────── +use tonic::{Request, Response, Status}; + +use lesavka_common::lesavka::{ + Empty, HandshakeSet, + handshake_server::{Handshake, HandshakeServer}, +}; + +pub struct HandshakeSvc { + pub camera: bool, + pub microphone: bool, +} + +#[tonic::async_trait] +impl Handshake for HandshakeSvc { + async fn get_capabilities( + &self, + _req: Request, + ) -> Result, Status> { + Ok(Response::new(HandshakeSet { + camera: self.camera, + microphone: self.microphone, + })) + } +} + +impl HandshakeSvc { + pub fn server() -> HandshakeServer { + HandshakeServer::new(Self { camera: true, microphone: true }) + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index b1b4b61..38c337f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -3,3 +3,4 @@ pub mod audio; pub mod video; pub mod gadget; +pub mod handshake; diff --git a/server/src/main.rs b/server/src/main.rs index 1104715..9f3376f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,11 +28,13 @@ use lesavka_common::lesavka::{ MonitorRequest, VideoPacket, AudioPacket }; -use lesavka_server::{gadget::UsbGadget, video, audio}; +use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc}; /*──────────────── constants ────────────────*/ /// **false** = never reset automatically. const AUTO_CYCLE: bool = false; +const VERSION: &str = env!("CARGO_PKG_VERSION"); +const PKG_NAME: &str = env!("CARGO_PKG_NAME"); /*──────────────── logging ───────────────────*/ fn init_tracing() -> anyhow::Result { @@ -133,6 +135,7 @@ impl Relay for Handler { type CaptureVideoStream = Pin> + Send>>; type CaptureAudioStream = Pin> + Send>>; type StreamMicrophoneStream = ReceiverStream>; + type StreamCameraStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -209,6 +212,33 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + async fn stream_camera( + &self, + req: Request>, + ) -> Result, Status> { + // map gRPC camera id → UVC device + let uvc = std::env::var("LESAVKA_UVC_DEV") + .unwrap_or_else(|_| "/dev/video4".into()); + + // build once + let relay = video::CameraRelay::new(0, &uvc) + .map_err(|e| Status::internal(format!("{e:#}")))?; + + // dummy outbound (same pattern as other streams) + let (tx, rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(async move { + let mut s = req.into_inner(); + while let Some(pkt) = s.next().await.transpose()? { + relay.feed(pkt); // ← all logging inside video.rs + } + tx.send(Ok(Empty {})).await.ok(); + Ok::<(), Status>(()) + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn capture_video( &self, req: Request, @@ -263,6 +293,7 @@ impl Relay for Handler { #[tokio::main(worker_threads = 4)] async fn main() -> anyhow::Result<()> { let _guard = init_tracing()?; + info!("🚀 {} v{} starting up", PKG_NAME, VERSION); panic::set_hook(Box::new(|p| { let bt = Backtrace::force_capture(); @@ -277,6 +308,7 @@ async fn main() -> anyhow::Result<()> { .tcp_nodelay(true) .max_frame_size(Some(2*1024*1024)) .add_service(RelayServer::new(handler)) + .add_service(HandshakeSvc::server()) .add_service(ReflBuilder::configure().build_v1().unwrap()) .serve(([0,0,0,0], 50051).into()) .await?; diff --git a/server/src/video.rs b/server/src/video.rs index 1604376..7957e9c 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -181,7 +181,7 @@ pub async fn eye_ball( trace!(target:"lesavka_server::video", eye = %eye, size = size, - "📤 sent"); + "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { static DROP_CNT: std::sync::atomic::AtomicU64 = @@ -191,7 +191,7 @@ pub async fn eye_ball( debug!(target:"lesavka_server::video", eye = %eye, dropped = c, - "⏳ channel full - dropping frames"); + "🎥⏳ channel full - dropping frames"); } } Err(e) => error!("mpsc send err: {e}"), @@ -214,3 +214,99 @@ pub async fn eye_ball( } Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) }) } + +pub struct WebcamSink { + appsrc: gst_app::AppSrc, + _pipe: gst::Pipeline, +} + +impl WebcamSink { + pub fn new(uvc_dev: &str) -> anyhow::Result { + gst::init()?; + + let pipeline = gst::Pipeline::new(); + let src = gst::ElementFactory::make("appsrc") + .build()? + .downcast::() + .expect("appsrc"); + src.set_is_live(true); + src.set_format(gst::Format::Time); + + let h264parse = gst::ElementFactory::make("h264parse").build()?; + let decoder = gst::ElementFactory::make("v4l2h264dec").build()?; + let convert = gst::ElementFactory::make("videoconvert").build()?; + let sink = gst::ElementFactory::make("v4l2sink") + .property("device", &uvc_dev) + .property("sync", &false) + .build()?; + + // Up‑cast to &gst::Element for the collection macros + pipeline.add_many(&[ + src.upcast_ref(), &h264parse, &decoder, &convert, &sink + ])?; + gst::Element::link_many(&[ + src.upcast_ref(), &h264parse, &decoder, &convert, &sink + ])?; + pipeline.set_state(gst::State::Playing)?; + + Ok(Self { appsrc: src, _pipe: pipeline }) + } + + pub fn push(&self, pkt: VideoPacket) { + let mut buf = gst::Buffer::from_slice(pkt.data); + buf.get_mut().unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + let _ = self.appsrc.push_buffer(buf); + } +} + +/*─────────────────────────────────*/ +/* gRPC → WebcamSink relay */ +/*─────────────────────────────────*/ + +pub struct CameraRelay { + sink: WebcamSink, // the v4l2sink pipeline (or stub) + id: u32, // gRPC “id” (for future multi‑cam) + frames: std::sync::atomic::AtomicU64, +} + +impl CameraRelay { + pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result { + Ok(Self { + sink: WebcamSink::new(uvc_dev)?, + id, + frames: std::sync::atomic::AtomicU64::new(0), + }) + } + + /// Push one VideoPacket coming from the client + pub fn feed(&self, pkt: VideoPacket) { + let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 60 == 0 { + tracing::debug!(target:"lesavka_server::video", + cam_id = self.id, + frame = n, + bytes = pkt.data.len(), + pts = pkt.pts, + "📸 srv webcam frame"); + } else if n % 10 == 0 { + tracing::trace!(target:"lesavka_server::video", + cam_id = self.id, + bytes = pkt.data.len(), + "📸📥 srv pkt"); + } + + if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) { + if n % 120 == 0 { + let path = format!("/tmp/eye3-cli-{n:05}.h264"); + if let Err(e) = std::fs::write(&path, &pkt.data) { + tracing::warn!("📸💾 dump failed: {e}"); + } else { + tracing::debug!("📸💾 wrote {}", path); + } + } + } + + self.sink.push(pkt); + } +}