diff --git a/client/src/app.rs b/client/src/app.rs index 87cda53..c9041eb 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -341,7 +341,7 @@ impl LesavkaClientApp { let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(6_000); + .unwrap_or(4_000); for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); diff --git a/scripts/install/client.sh b/scripts/install/client.sh index 2256485..1838803 100755 --- a/scripts/install/client.sh +++ b/scripts/install/client.sh @@ -109,4 +109,5 @@ echo " Service: systemctl status lesavka-client --no-pager" echo echo "Fish quick start:" echo " set -gx LESAVKA_SERVER_ADDR http://:50051" +echo " set -gx LESAVKA_VIDEO_MAX_KBIT 4000" echo " /usr/local/bin/lesavka-client" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index e44ad22..b77f56a 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -210,6 +210,7 @@ Environment=LESAVKA_UVC_CODEC=mjpeg Environment=LESAVKA_UVC_EXTERNAL=1 Environment=LESAVKA_EYE_ADAPTIVE=1 Environment=LESAVKA_EYE_MIN_FPS=12 +Environment=LESAVKA_EYE_FPS=20 Environment=LESAVKA_MIC_INIT_ATTEMPTS=5 Environment=LESAVKA_MIC_INIT_DELAY_MS=250 Restart=always diff --git a/server/src/main.rs b/server/src/main.rs index 16d5221..463e36e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,6 +28,7 @@ use lesavka_server::{audio, camera, gadget::UsbGadget, handshake::HandshakeSvc, /*──────────────── constants ────────────────*/ const VERSION: &str = env!("CARGO_PKG_VERSION"); const PKG_NAME: &str = env!("CARGO_PKG_NAME"); +static STREAM_SEQ: AtomicU64 = AtomicU64::new(1); /*──────────────── logging ───────────────────*/ fn init_tracing() -> anyhow::Result { @@ -168,6 +169,30 @@ async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { Err(last_err.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed"))) } +fn next_stream_id() -> u64 { + STREAM_SEQ.fetch_add(1, Ordering::Relaxed) +} + +async fn write_hid_report(dev: &Arc>, data: &[u8]) -> std::io::Result<()> { + let mut last: Option = None; + for attempt in 0..5 { + let mut f = dev.lock().await; + match f.write_all(data).await { + Ok(()) => return Ok(()), + Err(e) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.raw_os_error() == Some(libc::EAGAIN) => + { + last = Some(e); + } + Err(e) => return Err(e), + } + drop(f); + tokio::time::sleep(Duration::from_millis((attempt as u64 + 1) * 2)).await; + } + Err(last.unwrap_or_else(|| std::io::Error::from_raw_os_error(libc::EAGAIN))) +} + /// Pick the UVC gadget video node. /// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node. /// Returns an error when nothing matches instead of guessing a capture card. @@ -438,6 +463,8 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { + let rpc_id = next_stream_id(); + info!(rpc_id, "⌨️ stream_keyboard opened"); let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); let ms = self.ms.clone(); @@ -447,19 +474,24 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - if let Err(e) = kb.lock().await.write_all(&pkt.data).await { - warn!("⌨️ write failed: {e} (dropped)"); - recover_hid_if_needed( - &e, - gadget.clone(), - kb.clone(), - ms.clone(), - did_cycle.clone(), - ) - .await; + if let Err(e) = write_hid_report(&kb, &pkt.data).await { + if e.raw_os_error() == Some(libc::EAGAIN) { + debug!(rpc_id, "⌨️ write would block (dropped)"); + } else { + warn!(rpc_id, "⌨️ write failed: {e} (dropped)"); + recover_hid_if_needed( + &e, + gadget.clone(), + kb.clone(), + ms.clone(), + did_cycle.clone(), + ) + .await; + } } tx.send(Ok(pkt)).await.ok(); } + info!(rpc_id, "⌨️ stream_keyboard closed"); Ok::<(), Status>(()) }); @@ -470,6 +502,8 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { + let rpc_id = next_stream_id(); + info!(rpc_id, "🖱️ stream_mouse opened"); let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); let kb = self.kb.clone(); @@ -479,19 +513,24 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - if let Err(e) = ms.lock().await.write_all(&pkt.data).await { - warn!("🖱️ write failed: {e} (dropped)"); - recover_hid_if_needed( - &e, - gadget.clone(), - kb.clone(), - ms.clone(), - did_cycle.clone(), - ) - .await; + if let Err(e) = write_hid_report(&ms, &pkt.data).await { + if e.raw_os_error() == Some(libc::EAGAIN) { + debug!(rpc_id, "🖱️ write would block (dropped)"); + } else { + warn!(rpc_id, "🖱️ write failed: {e} (dropped)"); + recover_hid_if_needed( + &e, + gadget.clone(), + kb.clone(), + ms.clone(), + did_cycle.clone(), + ) + .await; + } } tx.send(Ok(pkt)).await.ok(); } + info!(rpc_id, "🖱️ stream_mouse closed"); Ok::<(), Status>(()) }); @@ -502,6 +541,8 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { + let rpc_id = next_stream_id(); + info!(rpc_id, "🎤 stream_microphone opened"); // 1 ─ build once, early let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); @@ -520,12 +561,13 @@ impl Relay for Handler { while let Some(pkt) = inbound.next().await.transpose()? { let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 300 == 0 { - tracing::trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); + tracing::trace!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } sink.push(&pkt); } sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; + info!(rpc_id, "🎤 stream_microphone closed"); Ok::<(), Status>(()) }); @@ -536,8 +578,10 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { + let rpc_id = next_stream_id(); let cfg = camera::current_camera_config(); info!( + rpc_id, output = cfg.output.as_str(), codec = cfg.codec.as_str(), width = cfg.width, @@ -549,6 +593,7 @@ impl Relay for Handler { let (session_id, relay) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); + info!(rpc_id, session_id, "🎥 stream_camera opened"); // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -557,12 +602,13 @@ impl Relay for Handler { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if !camera_rt.is_active(session_id) { - info!(session_id, "🎥 stream_camera session superseded"); + info!(rpc_id, session_id, "🎥 stream_camera session superseded"); break; } relay.feed(pkt); // ← all logging inside video.rs } tx.send(Ok(Empty {})).await.ok(); + info!(rpc_id, session_id, "🎥 stream_camera closed"); Ok::<(), Status>(()) }); @@ -573,6 +619,7 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { + let rpc_id = next_stream_id(); let req = req.into_inner(); let id = req.id; let dev = match id { @@ -580,7 +627,13 @@ impl Relay for Handler { 1 => "/dev/lesavka_r_eye", _ => return Err(Status::invalid_argument("monitor id must be 0 or 1")), }; - debug!("🎥 streaming {dev}"); + info!( + rpc_id, + id, + max_bitrate = req.max_bitrate, + "🎥 capture_video opened" + ); + debug!(rpc_id, "🎥 streaming {dev}"); let s = video::eye_ball(dev, id, req.max_bitrate) .await .map_err(|e| Status::internal(format!("{e:#}")))?; @@ -591,10 +644,12 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { + let rpc_id = next_stream_id(); // Only one speaker stream for now; both 0/1 → same ALSA dev. let _id = req.into_inner().id; // Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging). let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); + info!(rpc_id, %dev, "🔊 capture_audio opened"); let s = audio::ear(&dev, 0) .await