stabilize camera streaming and add adaptive monitor controls

This commit is contained in:
Brad Stein 2026-04-08 22:23:40 -03:00
parent eaa03924ed
commit b001aa8dd3
7 changed files with 344 additions and 67 deletions

View File

@ -338,6 +338,10 @@ impl LesavkaClientApp {
/*──────────────── monitor stream ────────────────*/
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(6_000);
for monitor_id in 0..=1 {
let ep = ep.clone();
let tx = tx.clone();
@ -346,7 +350,7 @@ impl LesavkaClientApp {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: monitor_id,
max_bitrate: 6_000,
max_bitrate,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
@ -449,29 +453,40 @@ impl LesavkaClientApp {
loop {
let mut cli = RelayClient::new(ep.clone());
let (tx, rx) = tokio::sync::mpsc::channel::<VideoPacket>(256);
std::thread::spawn({
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
move || {
while let Some(pkt) = cam.pull() {
// TRACE every 120 frames only
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let _ = tx.try_send(pkt);
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
// TRACE every 120 frames only
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
if tx.blocking_send(pkt).is_err() {
break;
}
}
});
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_camera(Request::new(outbound)).await {
Ok(_) => delay = Duration::from_secs(1), // got a stream → reset
Ok(mut resp) => {
delay = Duration::from_secs(1); // got a stream → reset
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
let _ = stop_tx.send(());
let _ = cam_worker.join();
return; // stop the task completely (#3)
}
Err(e) => {
@ -479,6 +494,8 @@ impl LesavkaClientApp {
delay = next_delay(delay); // back-off (#2)
}
}
let _ = stop_tx.send(());
let _ = cam_worker.join();
tokio::time::sleep(delay).await;
}
}

View File

@ -90,6 +90,14 @@ impl CameraCapture {
if use_mjpg_source && !output_mjpeg {
tracing::info!("📸 using MJPG source with software encode");
}
let _enc_opts = if enc == "x264enc" {
let bitrate_kbit = env_u32("LESAVKA_CAM_H264_KBIT", 2500);
format!(
"{enc} tune=zerolatency speed-preset=veryfast bitrate={bitrate_kbit} {kf_prop}={kf_val}"
)
} else {
format!("{enc} {kf_prop}={kf_val}")
};
if output_mjpeg {
tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})");
} else {
@ -161,7 +169,7 @@ impl CameraCapture {
"{src_desc} ! \
image/jpeg,width={width},height={height} ! \
jpegdec ! videorate ! video/x-raw,framerate={fps}/1 ! \
videoconvert ! {enc} {kf_prop}={kf_val} ! \
videoconvert ! {_enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
@ -169,7 +177,7 @@ impl CameraCapture {
} else {
format!(
"{src_desc} ! {src_caps} ! \
{preenc} {enc} {kf_prop}={kf_val} ! \
{preenc} {_enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"

View File

@ -6,7 +6,11 @@ ORIG_USER=${SUDO_USER:-$(id -un)}
SCRIPT_DIR=$(cd -- "$(dirname "${BASH_SOURCE[0]}")" && pwd)
REPO_ROOT=$(git -C "$SCRIPT_DIR/.." rev-parse --show-toplevel 2>/dev/null || true)
# 1. packages (Arch)
log() {
printf '==> %s\n' "$*"
}
log "1. Installing base packages"
sudo pacman -Syq --needed --noconfirm \
git rustup protobuf gcc clang evtest base-devel \
gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav \
@ -24,18 +28,24 @@ ensure_yay() {
cd yay && makepkg -si --noconfirm'
}
# 1b. grpcurl (prefer repo package; fallback to AUR if needed)
log "1b. Installing grpcurl"
if sudo pacman -Si grpcurl >/dev/null 2>&1; then
sudo pacman -Syq --needed --noconfirm grpcurl
else
ensure_yay
sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin
if ! sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin; then
log "grpcurl AUR install failed once, rebuilding yay and retrying"
ensure_yay
sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin
fi
fi
# 1c. input access
log "1c. Ensuring input group access for $ORIG_USER"
sudo usermod -aG input "$ORIG_USER"
# 2. Rust tool-chain for both root & user
log "2. Ensuring Rust toolchain"
sudo rustup default stable
sudo -u "$ORIG_USER" rustup default stable
@ -55,9 +65,11 @@ else
fi
# 4. build
log "4. Building client release binary"
sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo clean && cargo build --release"
# 5. install binary
log "5. Installing /usr/local/bin/lesavka-client"
sudo install -Dm755 "$SRC/client/target/release/lesavka-client" /usr/local/bin/lesavka-client
# 6. systemd service for system scope: /etc/systemd/system/lesavka-client.service
@ -84,9 +96,17 @@ WantedBy=default.target
EOF
# 7. Call the *user* instance inside the callers session
log "7. Reloading/starting service"
sudo systemctl daemon-reload
sudo systemctl enable --now lesavka-client.service
sudo systemctl restart lesavka-client || true
echo "✅ lesavka-client installed to /usr/local/bin/lesavka-client"
echo "➡️ Run: /usr/local/bin/lesavka-client (set LESAVKA_SERVER_ADDR as needed)"
echo
echo "✅ lesavka-client install complete"
echo " Binary: /usr/local/bin/lesavka-client"
echo " Build source: $SRC/client/target/release/lesavka-client"
echo " Service: systemctl status lesavka-client --no-pager"
echo
echo "Fish quick start:"
echo " set -gx LESAVKA_SERVER_ADDR http://<server-ip>:50051"
echo " /usr/local/bin/lesavka-client"

View File

@ -151,7 +151,7 @@ fi
echo "==> 4b. Kernel upgrade (optional)"
if [[ "${LESAVKA_KERNEL_UPDATE:-1}" != "0" ]]; then
sudo LESAVKA_KERNEL_BUILD_USER="$ORIG_USER" "$SRC_DIR/scripts/kernel/build-linux-rpi.sh"
sudo LESAVKA_KERNEL_BUILD_USER="$ORIG_USER" bash "$SRC_DIR/scripts/kernel/build-linux-rpi.sh"
else
echo "⚠️ skipping kernel upgrade (LESAVKA_KERNEL_UPDATE=0)"
fi
@ -208,6 +208,10 @@ Environment=RUST_BACKTRACE=1
Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6"
Environment=LESAVKA_UVC_CODEC=mjpeg
Environment=LESAVKA_UVC_EXTERNAL=1
Environment=LESAVKA_EYE_ADAPTIVE=1
Environment=LESAVKA_EYE_MIN_FPS=12
Environment=LESAVKA_MIC_INIT_ATTEMPTS=5
Environment=LESAVKA_MIC_INIT_DELAY_MS=250
Restart=always
RestartSec=5
StandardError=append:/tmp/lesavka-server.stderr
@ -313,3 +317,5 @@ fi
sudo systemctl restart lesavka-server
echo "✅ lesavka-server installed and restarted..."
echo "➡️ Status: sudo systemctl status lesavka-server --no-pager"
echo "➡️ Logs: sudo journalctl -u lesavka-server -f --no-pager"

42
scripts/manual/soak-report.sh Executable file
View File

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# scripts/manual/soak-report.sh - summarize server stability/quality counters for a time window
set -euo pipefail
SERVER_HOST=${LESAVKA_SERVER_HOST:-theia}
SINCE=${1:-"30 minutes ago"}
UNTIL=${2:-"now"}
tmp=$(mktemp)
trap 'rm -f "$tmp"' EXIT
ssh "$SERVER_HOST" \
"journalctl -u lesavka-server --since '$SINCE' --until '$UNTIL' --no-pager -o short-iso" \
>"$tmp"
count() {
local pattern=$1
grep -E -c "$pattern" "$tmp" || true
}
echo "Lesavka Soak Report"
echo " host: $SERVER_HOST"
echo " since: $SINCE"
echo " until: $UNTIL"
echo
echo "Counters"
echo " stream_camera selections: $(count 'stream_camera output selected')"
echo " camera relay recreated: $(count 'camera relay (re)created')"
echo " camera relay reused: $(count 'camera relay reused')"
echo " camera superseded events: $(count 'session superseded')"
echo " webcam frames logged: $(count 'srv webcam frame')"
echo " monitor channel drops: $(count 'channel full - dropping frames')"
echo " adaptive fps down events: $(count 'adaptive eye fps ↓')"
echo " adaptive fps up events: $(count 'adaptive eye fps ↑')"
echo " HID EAGAIN drops: $(count 'Resource temporarily unavailable (os error 11)')"
echo " webcam appsrc push fail: $(count 'appsrc push failed')"
echo " HDMI appsrc push fail: $(count 'HDMI appsrc push failed')"
echo " server abort/crash signs: $(count 'Bail out!|status=6/ABRT|Main process exited|core-dump')"
echo
echo "Recent critical lines"
grep -E 'Bail out!|status=6/ABRT|Main process exited|core-dump|stream_camera output selected|adaptive eye fps|channel full - dropping frames' "$tmp" | tail -n 60 || true

View File

@ -4,10 +4,9 @@
use anyhow::Context as _;
use futures_util::{Stream, StreamExt};
use gstreamer as gst;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc};
use tokio::{fs::OpenOptions, io::AsyncWriteExt, process::Command, sync::Mutex};
use tokio_stream::wrappers::ReceiverStream;
@ -82,13 +81,6 @@ async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
Err(anyhow::anyhow!("timeout waiting for {path}"))
}
fn next_minute() -> SystemTime {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let next = (secs / 60 + 1) * 60;
UNIX_EPOCH + Duration::from_secs(next)
}
fn allow_gadget_cycle() -> bool {
std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok()
}
@ -147,6 +139,35 @@ async fn recover_hid_if_needed(
});
}
async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result<audio::Voice> {
let attempts = std::env::var("LESAVKA_MIC_INIT_ATTEMPTS")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(5)
.max(1);
let delay_ms = std::env::var("LESAVKA_MIC_INIT_DELAY_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(250);
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=attempts {
match audio::Voice::new(uac_dev).await {
Ok(v) => {
if attempt > 1 {
info!(%uac_dev, attempt, "🎤 microphone sink recovered");
}
return Ok(v);
}
Err(e) => {
warn!(%uac_dev, attempt, "⚠️ microphone sink init failed: {e:#}");
last_err = Some(e);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed")))
}
/// Pick the UVC gadget video node.
/// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node.
/// Returns an error when nothing matches instead of guessing a capture card.
@ -267,6 +288,105 @@ struct Handler {
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: Arc<AtomicBool>,
camera_rt: Arc<CameraRuntime>,
}
struct CameraRelaySlot {
cfg: camera::CameraConfig,
relay: Arc<video::CameraRelay>,
}
struct CameraRuntime {
generation: AtomicU64,
slot: Mutex<Option<CameraRelaySlot>>,
}
impl CameraRuntime {
fn new() -> Self {
Self {
generation: AtomicU64::new(0),
slot: Mutex::new(None),
}
}
async fn activate(
&self,
cfg: &camera::CameraConfig,
) -> Result<(u64, Arc<video::CameraRelay>), Status> {
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
let mut slot = self.slot.lock().await;
let mut reused = false;
let relay = if let Some(existing) = slot.as_ref() {
if camera_cfg_eq(&existing.cfg, cfg) {
reused = true;
existing.relay.clone()
} else {
self.make_relay(cfg)?
}
} else {
self.make_relay(cfg)?
};
if !reused {
*slot = Some(CameraRelaySlot {
cfg: cfg.clone(),
relay: relay.clone(),
});
info!(
session_id,
output = cfg.output.as_str(),
codec = cfg.codec.as_str(),
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"🎥 camera relay (re)created"
);
} else {
info!(session_id, "🎥 camera relay reused");
}
Ok((session_id, relay))
}
fn is_active(&self, session_id: u64) -> bool {
self.generation.load(Ordering::Relaxed) == session_id
}
fn make_relay(&self, cfg: &camera::CameraConfig) -> Result<Arc<video::CameraRelay>, Status> {
let relay = match cfg.output {
camera::CameraOutput::Uvc => {
if std::env::var("LESAVKA_DISABLE_UVC").is_ok() {
return Err(Status::failed_precondition(
"UVC output disabled (LESAVKA_DISABLE_UVC set)",
));
}
let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?;
info!(%uvc, "🎥 stream_camera using UVC sink");
video::CameraRelay::new_uvc(0, &uvc, cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?
}
camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?,
};
Ok(Arc::new(relay))
}
}
fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool {
if a.output != b.output
|| a.codec != b.codec
|| a.width != b.width
|| a.height != b.height
|| a.fps != b.fps
{
return false;
}
match (&a.hdmi, &b.hdmi) {
(Some(ha), Some(hb)) => ha.name == hb.name && ha.id == hb.id,
(None, None) => true,
_ => false,
}
}
impl Handler {
@ -290,6 +410,7 @@ impl Handler {
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: Arc::new(AtomicBool::new(false)),
camera_rt: Arc::new(CameraRuntime::new()),
})
}
@ -384,7 +505,7 @@ impl Relay for Handler {
// 1 ─ build once, early
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(%uac_dev, "🎤 stream_microphone using UAC sink");
let mut sink = audio::Voice::new(&uac_dev)
let mut sink = open_voice_with_retry(&uac_dev)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
@ -426,21 +547,8 @@ impl Relay for Handler {
"🎥 stream_camera output selected"
);
let relay = match cfg.output {
camera::CameraOutput::Uvc => {
if std::env::var("LESAVKA_DISABLE_UVC").is_ok() {
return Err(Status::failed_precondition(
"UVC output disabled (LESAVKA_DISABLE_UVC set)",
));
}
let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?;
info!(%uvc, "🎥 stream_camera using UVC sink");
video::CameraRelay::new_uvc(0, &uvc, &cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?
}
camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, &cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?,
};
let (session_id, relay) = self.camera_rt.activate(&cfg).await?;
let camera_rt = self.camera_rt.clone();
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
@ -448,6 +556,10 @@ impl Relay for Handler {
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if !camera_rt.is_active(session_id) {
info!(session_id, "🎥 stream_camera session superseded");
break;
}
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
@ -461,14 +573,15 @@ impl Relay for Handler {
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let id = req.into_inner().id;
let req = req.into_inner();
let id = req.id;
let dev = match id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
};
debug!("🎥 streaming {dev}");
let s = video::eye_ball(dev, id, 6_000)
let s = video::eye_ball(dev, id, req.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))

View File

@ -9,7 +9,7 @@ use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
@ -92,16 +92,35 @@ impl Drop for VideoStream {
}
}
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let target_fps = env_u32("LESAVKA_EYE_FPS", 25);
let frame_interval_us = if target_fps == 0 {
0
} else {
(1_000_000 / target_fps) as u64
let bitrate_default_fps = match max_bitrate_kbit {
0 => 25,
1..=2_500 => 15,
2_501..=4_000 => 20,
_ => 25,
};
let target_fps = env_u32("LESAVKA_EYE_FPS", bitrate_default_fps).max(1);
let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps);
let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE")
.map(|v| v != "0")
.unwrap_or(true);
info!(
target: "lesavka_server::video",
eye = %eye,
max_bitrate_kbit,
target_fps,
min_fps,
adaptive,
"🎥 eye stream profile selected"
);
let effective_fps = Arc::new(std::sync::atomic::AtomicU32::new(target_fps));
let dropped_window = Arc::new(AtomicU64::new(0));
let sent_window = Arc::new(AtomicU64::new(0));
let last_adjust_sec = Arc::new(AtomicU64::new(0));
let wait_for_idr = Arc::new(AtomicBool::new(false));
let last_sent = Arc::new(AtomicU64::new(0));
let desc = format!(
@ -190,6 +209,12 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
});
let last_sent_cloned = last_sent.clone();
let effective_fps_cloned = effective_fps.clone();
let dropped_window_cloned = dropped_window.clone();
let sent_window_cloned = sent_window.clone();
let last_adjust_sec_cloned = last_adjust_sec.clone();
let wait_for_idr_cloned = wait_for_idr.clone();
let eye_name = eye.to_string();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
@ -200,10 +225,12 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let is_idr = contains_idr(map.as_slice());
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 && contains_idr(map.as_slice()) {
if n % 120 == 0 && is_idr {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
@ -216,14 +243,8 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
}
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05
/* IDR */
{
debug!("eye-{eye}: IDR");
}
}
if enabled!(Level::DEBUG) && is_idr {
debug!("eye-{eye}: IDR");
}
/* -------- timestamps ---------- */
@ -235,6 +256,46 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
.nseconds()
/ 1_000;
if adaptive {
let sec = pts_us / 1_000_000;
let prev = last_adjust_sec_cloned.load(Ordering::Relaxed);
if sec > prev
&& last_adjust_sec_cloned
.compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let dropped = dropped_window_cloned.swap(0, Ordering::Relaxed);
let sent = sent_window_cloned.swap(0, Ordering::Relaxed);
let total = dropped + sent;
if total > 0 {
let drop_ratio = dropped as f64 / total as f64;
let mut fps = effective_fps_cloned.load(Ordering::Relaxed).max(1);
if drop_ratio > 0.20 && fps > min_fps {
fps = fps.saturating_sub(2).max(min_fps);
effective_fps_cloned.store(fps, Ordering::Relaxed);
warn!(
target: "lesavka_server::video",
eye = %eye_name,
fps,
drop_ratio = %format_args!("{drop_ratio:.2}"),
"🎥 adaptive eye fps ↓"
);
} else if dropped == 0 && drop_ratio < 0.02 && fps < target_fps {
fps = (fps + 1).min(target_fps);
effective_fps_cloned.store(fps, Ordering::Relaxed);
info!(
target: "lesavka_server::video",
eye = %eye_name,
fps,
"🎥 adaptive eye fps ↑"
);
}
}
}
}
let cur_fps = effective_fps_cloned.load(Ordering::Relaxed).max(1);
let frame_interval_us = 1_000_000u64 / cur_fps as u64;
if frame_interval_us > 0 {
let last = last_sent_cloned.load(Ordering::Relaxed);
if last != 0 && pts_us.saturating_sub(last) < frame_interval_us {
@ -243,6 +304,10 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
last_sent_cloned.store(pts_us, Ordering::Relaxed);
}
if wait_for_idr_cloned.load(Ordering::Relaxed) && !is_idr {
return Ok(gst::FlowSuccess::Ok);
}
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
@ -253,12 +318,18 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
sent_window_cloned.fetch_add(1, Ordering::Relaxed);
if is_idr {
wait_for_idr_cloned.store(false, Ordering::Relaxed);
}
trace!(target:"lesavka_server::video",
eye = %eye,
size = size,
"🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_window_cloned.fetch_add(1, Ordering::Relaxed);
wait_for_idr_cloned.store(true, Ordering::Relaxed);
static DROP_CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);