#![forbid(unsafe_code)] use anyhow::{Result, bail}; use gstreamer as gst; use std::sync::OnceLock; use std::sync::atomic::{AtomicU64, Ordering}; static DEV_MODE: OnceLock = OnceLock::new(); pub const SOFTWARE_VIDEO_FALLBACK_ENV: &str = "LESAVKA_ALLOW_SOFTWARE_VIDEO"; /// Read an unsigned integer environment variable with a default. /// /// Inputs: the env var name plus the fallback value. /// Outputs: the parsed value when present and valid, or the fallback. /// Why: video tuning knobs are operator-controlled and should never panic the /// server when a typo slips into the environment. #[must_use] pub fn env_u32(name: &str, default: u32) -> u32 { std::env::var(name) .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(default) } /// Read a `usize` environment variable with a default. /// /// Inputs: the env var name plus the fallback value. /// Outputs: the parsed value when present and valid, or the fallback. /// Why: queue and channel capacities use `usize`, but should otherwise follow /// the same forgiving behavior as the numeric video tuning env vars. #[must_use] pub fn env_usize(name: &str, default: usize) -> usize { std::env::var(name) .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(default) } /// Check whether development-mode video dumps are enabled. /// /// Inputs: none. /// Outputs: `true` once the process observes `LESAVKA_DEV_MODE`. /// Why: the value is cached because the callback hot path checks it on every /// frame when deciding whether to dump debug samples. #[must_use] pub fn dev_mode_enabled() -> bool { *DEV_MODE.get_or_init(|| std::env::var("LESAVKA_DEV_MODE").is_ok()) } /// Return whether software video decode/encode fallback is explicitly allowed. /// /// Inputs: `LESAVKA_ALLOW_SOFTWARE_VIDEO`. /// Outputs: `true` only for intentional opt-in values. /// Why: production Lesavka should fail loudly when hardware video acceleration /// is missing instead of silently wedging a host with CPU decode. #[must_use] pub fn software_video_fallback_allowed() -> bool { std::env::var(SOFTWARE_VIDEO_FALLBACK_ENV) .ok() .is_some_and(|value| { let trimmed = value.trim(); !(trimmed.is_empty() || trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) } #[must_use] pub fn is_hardware_h264_decoder(name: &str) -> bool { matches!( name, "v4l2h264dec" | "v4l2slh264dec" | "omxh264dec" | "vulkanh264dec" ) } #[must_use] pub fn is_hardware_hevc_decoder(name: &str) -> bool { matches!( name, "v4l2slh265dec" | "v4l2h265dec" | "vulkanh265dec" | "nvh265dec" | "nvh265sldec" ) } fn buildable_decoder(name: &str) -> bool { gst::ElementFactory::find(name).is_some() && gst::ElementFactory::make(name).build().is_ok() } fn env_override_decoder(env_name: &str) -> Option { std::env::var(env_name) .ok() .map(|name| name.trim().to_string()) .filter(|name| !name.is_empty()) } /// Pick the first available H.264 decoder in our preference order. /// /// Inputs: none. /// Outputs: the GStreamer element name that should be instantiated. /// Why: different targets expose different hardware decoders, so we probe in a /// stable order. Software decode is allowed only by explicit lab opt-in. #[must_use] pub fn pick_h264_decoder() -> &'static str { require_h264_decoder().unwrap_or("missing-hardware-h264dec") } pub fn require_h264_decoder() -> Result<&'static str> { if let Some(name) = env_override_decoder("LESAVKA_H264_DECODER") { if !buildable_decoder(&name) { bail!("requested H.264 decoder '{name}' is not buildable"); } let leaked = Box::leak(name.into_boxed_str()); if is_hardware_h264_decoder(leaked) || software_video_fallback_allowed() { return Ok(leaked); } bail!( "requested H.264 decoder '{leaked}' is not a hardware decoder; set {SOFTWARE_VIDEO_FALLBACK_ENV}=1 only for lab fallback" ); } for name in [ "v4l2h264dec", "v4l2slh264dec", "omxh264dec", "vulkanh264dec", ] { if buildable_decoder(name) { return Ok(name); } } if software_video_fallback_allowed() { for name in ["avdec_h264", "openh264dec"] { if buildable_decoder(name) { return Ok(name); } } } bail!("hardware H.264 decoder required, but no buildable V4L2/OMX/Vulkan decoder was found") } /// Pick the HEVC decoder that should be used for client-origin H.265 media. /// /// Inputs: optional `LESAVKA_HEVC_DECODER` / `LESAVKA_HEVC_ALLOW_HARDWARE` /// environment overrides plus the local GStreamer registry. /// Outputs: a decoder element name. /// Why: production media must stay on hardware acceleration; software HEVC /// fallback is intentionally opt-in for lab/debug runs only. #[must_use] pub fn pick_hevc_decoder() -> &'static str { require_hevc_decoder().unwrap_or("missing-hardware-hevcdec") } pub fn require_hevc_decoder() -> Result<&'static str> { if let Some(name) = env_override_decoder("LESAVKA_HEVC_DECODER") { if !buildable_decoder(&name) { bail!("requested HEVC decoder '{name}' is not buildable"); } let leaked = Box::leak(name.into_boxed_str()); if is_hardware_hevc_decoder(leaked) || software_video_fallback_allowed() { return Ok(leaked); } bail!( "requested HEVC decoder '{leaked}' is not a hardware decoder; set {SOFTWARE_VIDEO_FALLBACK_ENV}=1 only for lab fallback" ); } for name in ["v4l2slh265dec", "v4l2h265dec", "vulkanh265dec"] { if buildable_decoder(name) { return Ok(name); } } if software_video_fallback_allowed() { for name in ["avdec_h265", "libde265dec"] { if buildable_decoder(name) { return Ok(name); } } } bail!("hardware HEVC decoder required, but no buildable V4L2/Vulkan decoder was found") } /// Choose the default eye-stream FPS for the requested bitrate tier. /// /// Inputs: the negotiated maximum bitrate in kbit/s. /// Outputs: the target FPS before env overrides are applied. /// Why: low bitrates need a lower frame rate to preserve visual quality, while /// higher bitrates can sustain the full target cadence. #[must_use] pub fn default_eye_fps(max_bitrate_kbit: u32) -> u32 { match max_bitrate_kbit { 0 => 25, 1..=2_500 => 15, 2_501..=4_000 => 20, _ => 25, } } /// Detect whether an H.264 access unit contains an IDR NAL. /// /// Inputs: one Annex-B encoded H.264 access unit. /// Outputs: `true` when the access unit carries an IDR frame. /// Why: after dropping frames we wait for the next keyframe so downstream /// decoders do not resume from a broken prediction chain. #[must_use] pub fn contains_idr(h264: &[u8]) -> bool { let mut index = 0; while index + 4 < h264.len() { if h264[index] == 0 && h264[index + 1] == 0 { let offset = if h264[index + 2] == 1 { 3 } else if h264[index + 2] == 0 && h264[index + 3] == 1 { 4 } else { index += 1; continue; }; let nal_index = index + offset; if nal_index < h264.len() && (h264[nal_index] & 0x1F) == 5 { return true; } } index += 1; } false } /// Detect whether an HEVC access unit contains an intra recovery point. /// /// Inputs: one Annex-B encoded HEVC access unit. Output: `true` when the access /// unit carries an IRAP NAL. Why: freshness-first upstream media can drop HEVC /// packets before server decode; after that gap, the decoder must resume from a /// clean picture instead of a predictive frame with missing references. #[must_use] pub fn contains_hevc_irap(hevc: &[u8]) -> bool { let mut index = 0; while index + 4 < hevc.len() { if hevc[index] == 0 && hevc[index + 1] == 0 { let offset = if hevc[index + 2] == 1 { 3 } else if hevc[index + 2] == 0 && hevc[index + 3] == 1 { 4 } else { index += 1; continue; }; let nal_index = index + offset; if nal_index + 1 < hevc.len() { let nal_type = (hevc[nal_index] >> 1) & 0x3f; if (16..=23).contains(&nal_type) { return true; } } } index += 1; } false } /// Compute the next adaptive eye-stream FPS after one reporting window. /// /// Inputs: the current FPS plus the target/min bounds and the sent/dropped /// frame counts collected during the last window. /// Outputs: the adjusted FPS for the next window. /// Why: the callback path keeps only counters; this pure policy function makes /// the adaptation behavior unit-testable and easier to tune. #[must_use] pub fn adjust_effective_fps( current_fps: u32, min_fps: u32, target_fps: u32, dropped: u64, sent: u64, ) -> u32 { let total = dropped + sent; if total == 0 { return current_fps.max(1); } let drop_ratio = dropped as f64 / total as f64; if drop_ratio > 0.10 && current_fps > min_fps { current_fps.saturating_sub(3).max(min_fps) } else if dropped == 0 && drop_ratio < 0.02 && current_fps < target_fps { (current_fps + 1).min(target_fps) } else { current_fps.max(1) } } /// Decide whether a frame should be emitted at the current pacing budget. /// /// Inputs: the previous sent timestamp, the candidate frame timestamp, and the /// current target FPS. /// Outputs: `true` when enough time has elapsed to send another frame. /// Why: rate limiting on timestamps keeps the gRPC stream bounded without /// requiring the callback to inspect wall-clock time. #[must_use] pub fn should_send_frame(last_pts_us: u64, current_pts_us: u64, fps: u32) -> bool { let frame_interval_us = 1_000_000u64 / u64::from(fps.max(1)); if frame_interval_us == 0 || last_pts_us == 0 { return true; } current_pts_us.saturating_sub(last_pts_us) >= frame_interval_us } /// Advance the local monotonic PTS used by sink appsrc instances. /// /// Inputs: the shared counter and the per-frame duration in microseconds. /// Outputs: the next strictly monotonic local timestamp. /// Why: WAN-delivered packet PTS values can arrive out of order, so sink-side /// playback uses a synthetic monotonic timeline instead. #[must_use] pub fn next_local_pts(counter: &AtomicU64, frame_step_us: u64) -> u64 { counter.fetch_add(frame_step_us, Ordering::Relaxed) } /// Reserve a monotonic local timestamp while preferring a caller-provided value. /// /// Inputs: the shared counter, a preferred local timestamp, and the minimum /// step to enforce between consecutive values. /// Outputs: a timestamp that never goes backwards for the current sink. /// Why: upstream media can now arrive with a shared session timeline, so sink /// playback should honor that timing when possible while still guarding against /// repeated values that would destabilize live playback. #[must_use] pub fn reserve_local_pts(counter: &AtomicU64, preferred_pts_us: u64, frame_step_us: u64) -> u64 { let next_allowed_pts_us = counter.load(Ordering::Relaxed); let chosen_pts_us = preferred_pts_us.max(next_allowed_pts_us); counter.store( chosen_pts_us.saturating_add(frame_step_us.max(1)), Ordering::Relaxed, ); chosen_pts_us } #[cfg(test)] #[path = "video_support/tests/mod.rs"] mod tests;