From 9fc17478b242b5504113ef0de7bc24ee70725a32 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 3 Jul 2025 16:08:30 -0500 Subject: [PATCH] camera core add --- client/src/app.rs | 32 ++++++++++++-------------------- server/src/video.rs | 6 +++--- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 53b5664..b801249 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -138,18 +138,14 @@ impl LesavkaClientApp { tokio::spawn(Self::audio_loop(ep_audio, audio_out)); - /*────────── microphone gRPC pusher ───────────*/ - let mic = Arc::new(MicrophoneCapture::new()?); - let ep_mic = vid_ep.clone(); - tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── camera & mic tasks ───────────────*/ + let cam = Arc::new(CameraCapture::new( + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + )?); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - /*────────── webcam gRPC pusher ───────────────*/ - if !std::env::var("LESAVKA_CAM_DISABLE").is_ok() { - let cam = Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() - )?); - tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); - } + let mic = Arc::new(MicrophoneCapture::new()?); + tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); /*────────── central reactor ───────────────────*/ tokio::select! { @@ -318,9 +314,8 @@ impl LesavkaClientApp { async fn cam_loop(ep: Channel, cam: Arc) { loop { let mut cli = RelayClient::new(ep.clone()); + let (tx, rx) = tokio::sync::mpsc::channel::(256); - // pull frames in a real thread so we don’t block async scheduler - let (tx_pkt, rx_pkt) = tokio::sync::mpsc::channel::(256); std::thread::spawn({ let cam = cam.clone(); move || { @@ -332,17 +327,14 @@ impl LesavkaClientApp { if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } - let _ = tx_pkt.blocking_send(pkt); + let _ = tx.blocking_send(pkt); } } }); - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx_pkt); - match cli.stream_camera(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => tracing::warn!("❌📸 connect failed: {e}"), + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + if let Err(e) = cli.stream_camera(Request::new(outbound)).await { + tracing::warn!("❌📸 connect failed: {e}"); } tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/server/src/video.rs b/server/src/video.rs index 62d85e9..3e018dd 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -181,7 +181,7 @@ pub async fn eye_ball( trace!(target:"lesavka_server::video", eye = %eye, size = size, - "📤 sent"); + "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { static DROP_CNT: std::sync::atomic::AtomicU64 = @@ -191,7 +191,7 @@ pub async fn eye_ball( debug!(target:"lesavka_server::video", eye = %eye, dropped = c, - "⏳ channel full - dropping frames"); + "🎥⏳ channel full - dropping frames"); } } Err(e) => error!("mpsc send err: {e}"), @@ -293,7 +293,7 @@ impl CameraRelay { tracing::trace!(target:"lesavka_server::video", cam_id = self.id, bytes = pkt.data.len(), - "📥 srv pkt"); + "📸📥 srv pkt"); } self.sink.push(pkt); }