From b001aa8dd3e426bfd727d4fa14565d61c98fc4c4 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 8 Apr 2026 22:23:40 -0300 Subject: [PATCH] stabilize camera streaming and add adaptive monitor controls --- client/src/app.rs | 47 +++++++--- client/src/input/camera.rs | 12 ++- scripts/install/client.sh | 30 +++++- scripts/install/server.sh | 8 +- scripts/manual/soak-report.sh | 42 +++++++++ server/src/main.rs | 169 ++++++++++++++++++++++++++++------ server/src/video.rs | 103 +++++++++++++++++---- 7 files changed, 344 insertions(+), 67 deletions(-) create mode 100755 scripts/manual/soak-report.sh diff --git a/client/src/app.rs b/client/src/app.rs index d84ae9b..87cda53 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -338,6 +338,10 @@ impl LesavkaClientApp { /*──────────────── monitor stream ────────────────*/ async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender) { + let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(6_000); for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); @@ -346,7 +350,7 @@ impl LesavkaClientApp { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: monitor_id, - max_bitrate: 6_000, + max_bitrate, }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { @@ -449,29 +453,40 @@ impl LesavkaClientApp { loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); - - std::thread::spawn({ + let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); + let cam_worker = std::thread::spawn({ let cam = cam.clone(); - move || { - while let Some(pkt) = cam.pull() { - // TRACE every 120 frames only - static CNT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); - let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n % 120 == 0 { - tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); - } - tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); - let _ = tx.try_send(pkt); + move || loop { + if stop_rx.try_recv().is_ok() { + break; + } + let Some(pkt) = cam.pull() else { + std::thread::sleep(Duration::from_millis(5)); + continue; + }; + // TRACE every 120 frames only + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 120 == 0 { + tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); + } + tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); + if tx.blocking_send(pkt).is_err() { + break; } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_camera(Request::new(outbound)).await { - Ok(_) => delay = Duration::from_secs(1), // got a stream → reset + Ok(mut resp) => { + delay = Duration::from_secs(1); // got a stream → reset + while resp.get_mut().message().await.transpose().is_some() {} + } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("📸 server does not support StreamCamera – giving up"); + let _ = stop_tx.send(()); + let _ = cam_worker.join(); return; // stop the task completely (#3) } Err(e) => { @@ -479,6 +494,8 @@ impl LesavkaClientApp { delay = next_delay(delay); // back-off (#2) } } + let _ = stop_tx.send(()); + let _ = cam_worker.join(); tokio::time::sleep(delay).await; } } diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index e45fa5c..e6e257e 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -90,6 +90,14 @@ impl CameraCapture { if use_mjpg_source && !output_mjpeg { tracing::info!("📸 using MJPG source with software encode"); } + let _enc_opts = if enc == "x264enc" { + let bitrate_kbit = env_u32("LESAVKA_CAM_H264_KBIT", 2500); + format!( + "{enc} tune=zerolatency speed-preset=veryfast bitrate={bitrate_kbit} {kf_prop}={kf_val}" + ) + } else { + format!("{enc} {kf_prop}={kf_val}") + }; if output_mjpeg { tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})"); } else { @@ -161,7 +169,7 @@ impl CameraCapture { "{src_desc} ! \ image/jpeg,width={width},height={height} ! \ jpegdec ! videorate ! video/x-raw,framerate={fps}/1 ! \ - videoconvert ! {enc} {kf_prop}={kf_val} ! \ + videoconvert ! {_enc_opts} ! \ h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ queue max-size-buffers=30 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=60 drop=true" @@ -169,7 +177,7 @@ impl CameraCapture { } else { format!( "{src_desc} ! {src_caps} ! \ - {preenc} {enc} {kf_prop}={kf_val} ! \ + {preenc} {_enc_opts} ! \ h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ queue max-size-buffers=30 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=60 drop=true" diff --git a/scripts/install/client.sh b/scripts/install/client.sh index e270f26..2256485 100755 --- a/scripts/install/client.sh +++ b/scripts/install/client.sh @@ -6,7 +6,11 @@ ORIG_USER=${SUDO_USER:-$(id -un)} SCRIPT_DIR=$(cd -- "$(dirname "${BASH_SOURCE[0]}")" && pwd) REPO_ROOT=$(git -C "$SCRIPT_DIR/.." rev-parse --show-toplevel 2>/dev/null || true) -# 1. packages (Arch) +log() { + printf '==> %s\n' "$*" +} + +log "1. Installing base packages" sudo pacman -Syq --needed --noconfirm \ git rustup protobuf gcc clang evtest base-devel \ gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav \ @@ -24,18 +28,24 @@ ensure_yay() { cd yay && makepkg -si --noconfirm' } -# 1b. grpcurl (prefer repo package; fallback to AUR if needed) +log "1b. Installing grpcurl" if sudo pacman -Si grpcurl >/dev/null 2>&1; then sudo pacman -Syq --needed --noconfirm grpcurl else ensure_yay - sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin + if ! sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin; then + log "grpcurl AUR install failed once, rebuilding yay and retrying" + ensure_yay + sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin + fi fi # 1c. input access +log "1c. Ensuring input group access for $ORIG_USER" sudo usermod -aG input "$ORIG_USER" # 2. Rust tool-chain for both root & user +log "2. Ensuring Rust toolchain" sudo rustup default stable sudo -u "$ORIG_USER" rustup default stable @@ -55,9 +65,11 @@ else fi # 4. build +log "4. Building client release binary" sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo clean && cargo build --release" # 5. install binary +log "5. Installing /usr/local/bin/lesavka-client" sudo install -Dm755 "$SRC/client/target/release/lesavka-client" /usr/local/bin/lesavka-client # 6. systemd service for system scope: /etc/systemd/system/lesavka-client.service @@ -84,9 +96,17 @@ WantedBy=default.target EOF # 7. Call the *user* instance inside the caller’s session +log "7. Reloading/starting service" sudo systemctl daemon-reload sudo systemctl enable --now lesavka-client.service sudo systemctl restart lesavka-client || true -echo "✅ lesavka-client installed to /usr/local/bin/lesavka-client" -echo "➡️ Run: /usr/local/bin/lesavka-client (set LESAVKA_SERVER_ADDR as needed)" +echo +echo "✅ lesavka-client install complete" +echo " Binary: /usr/local/bin/lesavka-client" +echo " Build source: $SRC/client/target/release/lesavka-client" +echo " Service: systemctl status lesavka-client --no-pager" +echo +echo "Fish quick start:" +echo " set -gx LESAVKA_SERVER_ADDR http://:50051" +echo " /usr/local/bin/lesavka-client" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 59602b3..e44ad22 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -151,7 +151,7 @@ fi echo "==> 4b. Kernel upgrade (optional)" if [[ "${LESAVKA_KERNEL_UPDATE:-1}" != "0" ]]; then - sudo LESAVKA_KERNEL_BUILD_USER="$ORIG_USER" "$SRC_DIR/scripts/kernel/build-linux-rpi.sh" + sudo LESAVKA_KERNEL_BUILD_USER="$ORIG_USER" bash "$SRC_DIR/scripts/kernel/build-linux-rpi.sh" else echo "⚠️ skipping kernel upgrade (LESAVKA_KERNEL_UPDATE=0)" fi @@ -208,6 +208,10 @@ Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Environment=LESAVKA_UVC_CODEC=mjpeg Environment=LESAVKA_UVC_EXTERNAL=1 +Environment=LESAVKA_EYE_ADAPTIVE=1 +Environment=LESAVKA_EYE_MIN_FPS=12 +Environment=LESAVKA_MIC_INIT_ATTEMPTS=5 +Environment=LESAVKA_MIC_INIT_DELAY_MS=250 Restart=always RestartSec=5 StandardError=append:/tmp/lesavka-server.stderr @@ -313,3 +317,5 @@ fi sudo systemctl restart lesavka-server echo "✅ lesavka-server installed and restarted..." +echo "➡️ Status: sudo systemctl status lesavka-server --no-pager" +echo "➡️ Logs: sudo journalctl -u lesavka-server -f --no-pager" diff --git a/scripts/manual/soak-report.sh b/scripts/manual/soak-report.sh new file mode 100755 index 0000000..f2310ca --- /dev/null +++ b/scripts/manual/soak-report.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# scripts/manual/soak-report.sh - summarize server stability/quality counters for a time window +set -euo pipefail + +SERVER_HOST=${LESAVKA_SERVER_HOST:-theia} +SINCE=${1:-"30 minutes ago"} +UNTIL=${2:-"now"} + +tmp=$(mktemp) +trap 'rm -f "$tmp"' EXIT + +ssh "$SERVER_HOST" \ + "journalctl -u lesavka-server --since '$SINCE' --until '$UNTIL' --no-pager -o short-iso" \ + >"$tmp" + +count() { + local pattern=$1 + grep -E -c "$pattern" "$tmp" || true +} + +echo "Lesavka Soak Report" +echo " host: $SERVER_HOST" +echo " since: $SINCE" +echo " until: $UNTIL" +echo +echo "Counters" +echo " stream_camera selections: $(count 'stream_camera output selected')" +echo " camera relay recreated: $(count 'camera relay (re)created')" +echo " camera relay reused: $(count 'camera relay reused')" +echo " camera superseded events: $(count 'session superseded')" +echo " webcam frames logged: $(count 'srv webcam frame')" +echo " monitor channel drops: $(count 'channel full - dropping frames')" +echo " adaptive fps down events: $(count 'adaptive eye fps ↓')" +echo " adaptive fps up events: $(count 'adaptive eye fps ↑')" +echo " HID EAGAIN drops: $(count 'Resource temporarily unavailable (os error 11)')" +echo " webcam appsrc push fail: $(count 'appsrc push failed')" +echo " HDMI appsrc push fail: $(count 'HDMI appsrc push failed')" +echo " server abort/crash signs: $(count 'Bail out!|status=6/ABRT|Main process exited|core-dump')" + +echo +echo "Recent critical lines" +grep -E 'Bail out!|status=6/ABRT|Main process exited|core-dump|stream_camera output selected|adaptive eye fps|channel full - dropping frames' "$tmp" | tail -n 60 || true diff --git a/server/src/main.rs b/server/src/main.rs index 3eea8ba..16d5221 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,10 +4,9 @@ use anyhow::Context as _; use futures_util::{Stream, StreamExt}; -use gstreamer as gst; use std::path::Path; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc}; use tokio::{fs::OpenOptions, io::AsyncWriteExt, process::Command, sync::Mutex}; use tokio_stream::wrappers::ReceiverStream; @@ -82,13 +81,6 @@ async fn open_with_retry(path: &str) -> anyhow::Result { Err(anyhow::anyhow!("timeout waiting for {path}")) } -fn next_minute() -> SystemTime { - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let secs = now.as_secs(); - let next = (secs / 60 + 1) * 60; - UNIX_EPOCH + Duration::from_secs(next) -} - fn allow_gadget_cycle() -> bool { std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok() } @@ -147,6 +139,35 @@ async fn recover_hid_if_needed( }); } +async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { + let attempts = std::env::var("LESAVKA_MIC_INIT_ATTEMPTS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(5) + .max(1); + let delay_ms = std::env::var("LESAVKA_MIC_INIT_DELAY_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(250); + let mut last_err: Option = None; + for attempt in 1..=attempts { + match audio::Voice::new(uac_dev).await { + Ok(v) => { + if attempt > 1 { + info!(%uac_dev, attempt, "🎤 microphone sink recovered"); + } + return Ok(v); + } + Err(e) => { + warn!(%uac_dev, attempt, "⚠️ microphone sink init failed: {e:#}"); + last_err = Some(e); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + } + } + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed"))) +} + /// Pick the UVC gadget video node. /// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node. /// Returns an error when nothing matches instead of guessing a capture card. @@ -267,6 +288,105 @@ struct Handler { ms: Arc>, gadget: UsbGadget, did_cycle: Arc, + camera_rt: Arc, +} + +struct CameraRelaySlot { + cfg: camera::CameraConfig, + relay: Arc, +} + +struct CameraRuntime { + generation: AtomicU64, + slot: Mutex>, +} + +impl CameraRuntime { + fn new() -> Self { + Self { + generation: AtomicU64::new(0), + slot: Mutex::new(None), + } + } + + async fn activate( + &self, + cfg: &camera::CameraConfig, + ) -> Result<(u64, Arc), Status> { + let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; + let mut slot = self.slot.lock().await; + let mut reused = false; + + let relay = if let Some(existing) = slot.as_ref() { + if camera_cfg_eq(&existing.cfg, cfg) { + reused = true; + existing.relay.clone() + } else { + self.make_relay(cfg)? + } + } else { + self.make_relay(cfg)? + }; + + if !reused { + *slot = Some(CameraRelaySlot { + cfg: cfg.clone(), + relay: relay.clone(), + }); + info!( + session_id, + output = cfg.output.as_str(), + codec = cfg.codec.as_str(), + width = cfg.width, + height = cfg.height, + fps = cfg.fps, + "🎥 camera relay (re)created" + ); + } else { + info!(session_id, "🎥 camera relay reused"); + } + + Ok((session_id, relay)) + } + + fn is_active(&self, session_id: u64) -> bool { + self.generation.load(Ordering::Relaxed) == session_id + } + + fn make_relay(&self, cfg: &camera::CameraConfig) -> Result, Status> { + let relay = match cfg.output { + camera::CameraOutput::Uvc => { + if std::env::var("LESAVKA_DISABLE_UVC").is_ok() { + return Err(Status::failed_precondition( + "UVC output disabled (LESAVKA_DISABLE_UVC set)", + )); + } + let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?; + info!(%uvc, "🎥 stream_camera using UVC sink"); + video::CameraRelay::new_uvc(0, &uvc, cfg) + .map_err(|e| Status::internal(format!("{e:#}")))? + } + camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg) + .map_err(|e| Status::internal(format!("{e:#}")))?, + }; + Ok(Arc::new(relay)) + } +} + +fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool { + if a.output != b.output + || a.codec != b.codec + || a.width != b.width + || a.height != b.height + || a.fps != b.fps + { + return false; + } + match (&a.hdmi, &b.hdmi) { + (Some(ha), Some(hb)) => ha.name == hb.name && ha.id == hb.id, + (None, None) => true, + _ => false, + } } impl Handler { @@ -290,6 +410,7 @@ impl Handler { ms: Arc::new(Mutex::new(ms)), gadget, did_cycle: Arc::new(AtomicBool::new(false)), + camera_rt: Arc::new(CameraRuntime::new()), }) } @@ -384,7 +505,7 @@ impl Relay for Handler { // 1 ─ build once, early let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); - let mut sink = audio::Voice::new(&uac_dev) + let mut sink = open_voice_with_retry(&uac_dev) .await .map_err(|e| Status::internal(format!("{e:#}")))?; @@ -426,21 +547,8 @@ impl Relay for Handler { "🎥 stream_camera output selected" ); - let relay = match cfg.output { - camera::CameraOutput::Uvc => { - if std::env::var("LESAVKA_DISABLE_UVC").is_ok() { - return Err(Status::failed_precondition( - "UVC output disabled (LESAVKA_DISABLE_UVC set)", - )); - } - let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?; - info!(%uvc, "🎥 stream_camera using UVC sink"); - video::CameraRelay::new_uvc(0, &uvc, &cfg) - .map_err(|e| Status::internal(format!("{e:#}")))? - } - camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, &cfg) - .map_err(|e| Status::internal(format!("{e:#}")))?, - }; + let (session_id, relay) = self.camera_rt.activate(&cfg).await?; + let camera_rt = self.camera_rt.clone(); // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -448,6 +556,10 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { + if !camera_rt.is_active(session_id) { + info!(session_id, "🎥 stream_camera session superseded"); + break; + } relay.feed(pkt); // ← all logging inside video.rs } tx.send(Ok(Empty {})).await.ok(); @@ -461,14 +573,15 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { - let id = req.into_inner().id; + let req = req.into_inner(); + let id = req.id; let dev = match id { 0 => "/dev/lesavka_l_eye", 1 => "/dev/lesavka_r_eye", _ => return Err(Status::invalid_argument("monitor id must be 0 or 1")), }; debug!("🎥 streaming {dev}"); - let s = video::eye_ball(dev, id, 6_000) + let s = video::eye_ball(dev, id, req.max_bitrate) .await .map_err(|e| Status::internal(format!("{e:#}")))?; Ok(Response::new(Box::pin(s))) diff --git a/server/src/video.rs b/server/src/video.rs index 894d777..e9c821c 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -9,7 +9,7 @@ use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{Level, debug, enabled, error, info, trace, warn}; @@ -92,16 +92,35 @@ impl Drop for VideoStream { } } -pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result { +pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result { let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; - let target_fps = env_u32("LESAVKA_EYE_FPS", 25); - let frame_interval_us = if target_fps == 0 { - 0 - } else { - (1_000_000 / target_fps) as u64 + let bitrate_default_fps = match max_bitrate_kbit { + 0 => 25, + 1..=2_500 => 15, + 2_501..=4_000 => 20, + _ => 25, }; + let target_fps = env_u32("LESAVKA_EYE_FPS", bitrate_default_fps).max(1); + let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps); + let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE") + .map(|v| v != "0") + .unwrap_or(true); + info!( + target: "lesavka_server::video", + eye = %eye, + max_bitrate_kbit, + target_fps, + min_fps, + adaptive, + "🎥 eye stream profile selected" + ); + let effective_fps = Arc::new(std::sync::atomic::AtomicU32::new(target_fps)); + let dropped_window = Arc::new(AtomicU64::new(0)); + let sent_window = Arc::new(AtomicU64::new(0)); + let last_adjust_sec = Arc::new(AtomicU64::new(0)); + let wait_for_idr = Arc::new(AtomicBool::new(false)); let last_sent = Arc::new(AtomicU64::new(0)); let desc = format!( @@ -190,6 +209,12 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res }); let last_sent_cloned = last_sent.clone(); + let effective_fps_cloned = effective_fps.clone(); + let dropped_window_cloned = dropped_window.clone(); + let sent_window_cloned = sent_window.clone(); + let last_adjust_sec_cloned = last_adjust_sec.clone(); + let wait_for_idr_cloned = wait_for_idr.clone(); + let eye_name = eye.to_string(); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { @@ -200,10 +225,12 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res /* -------- map once, reuse ----- */ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let is_idr = contains_idr(map.as_slice()); + /* -------- basic counters ------ */ static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n % 120 == 0 && contains_idr(map.as_slice()) { + if n % 120 == 0 && is_idr { trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames"); if enabled!(Level::TRACE) { let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n); @@ -216,14 +243,8 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res } /* -------- detect SPS / IDR ---- */ - if enabled!(Level::DEBUG) { - if let Some(&nal) = map.as_slice().get(4) { - if (nal & 0x1F) == 0x05 - /* IDR */ - { - debug!("eye-{eye}: IDR"); - } - } + if enabled!(Level::DEBUG) && is_idr { + debug!("eye-{eye}: IDR"); } /* -------- timestamps ---------- */ @@ -235,6 +256,46 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res .nseconds() / 1_000; + if adaptive { + let sec = pts_us / 1_000_000; + let prev = last_adjust_sec_cloned.load(Ordering::Relaxed); + if sec > prev + && last_adjust_sec_cloned + .compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let dropped = dropped_window_cloned.swap(0, Ordering::Relaxed); + let sent = sent_window_cloned.swap(0, Ordering::Relaxed); + let total = dropped + sent; + if total > 0 { + let drop_ratio = dropped as f64 / total as f64; + let mut fps = effective_fps_cloned.load(Ordering::Relaxed).max(1); + if drop_ratio > 0.20 && fps > min_fps { + fps = fps.saturating_sub(2).max(min_fps); + effective_fps_cloned.store(fps, Ordering::Relaxed); + warn!( + target: "lesavka_server::video", + eye = %eye_name, + fps, + drop_ratio = %format_args!("{drop_ratio:.2}"), + "🎥 adaptive eye fps ↓" + ); + } else if dropped == 0 && drop_ratio < 0.02 && fps < target_fps { + fps = (fps + 1).min(target_fps); + effective_fps_cloned.store(fps, Ordering::Relaxed); + info!( + target: "lesavka_server::video", + eye = %eye_name, + fps, + "🎥 adaptive eye fps ↑" + ); + } + } + } + } + + let cur_fps = effective_fps_cloned.load(Ordering::Relaxed).max(1); + let frame_interval_us = 1_000_000u64 / cur_fps as u64; if frame_interval_us > 0 { let last = last_sent_cloned.load(Ordering::Relaxed); if last != 0 && pts_us.saturating_sub(last) < frame_interval_us { @@ -243,6 +304,10 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res last_sent_cloned.store(pts_us, Ordering::Relaxed); } + if wait_for_idr_cloned.load(Ordering::Relaxed) && !is_idr { + return Ok(gst::FlowSuccess::Ok); + } + /* -------- ship over gRPC ----- */ let data = map.as_slice().to_vec(); let size = data.len(); @@ -253,12 +318,18 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res }; match tx.try_send(Ok(pkt)) { Ok(_) => { + sent_window_cloned.fetch_add(1, Ordering::Relaxed); + if is_idr { + wait_for_idr_cloned.store(false, Ordering::Relaxed); + } trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + dropped_window_cloned.fetch_add(1, Ordering::Relaxed); + wait_for_idr_cloned.store(true, Ordering::Relaxed); static DROP_CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);