lesavka/server/src/video.rs

897 lines
34 KiB
Rust

// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use lesavka_common::process_metrics::ProcessCpuSampler;
use std::os::unix::fs::FileTypeExt;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use tokio::time::{Duration, Instant, sleep};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
pub use crate::video_sinks::{CameraRelay, HdmiSink, WebcamSink};
use crate::video_support::{
adjust_effective_fps, contains_idr, default_eye_fps, env_u32, env_usize, should_send_frame,
};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: OnceLock<gst::ClockTime> = OnceLock::new();
static SERVER_PROCESS_CPU_TENTHS: OnceLock<Arc<AtomicU32>> = OnceLock::new();
fn server_process_cpu_metric() -> Arc<AtomicU32> {
Arc::clone(SERVER_PROCESS_CPU_TENTHS.get_or_init(|| {
let metric = Arc::new(AtomicU32::new(0));
let metric_for_thread = Arc::clone(&metric);
std::thread::spawn(move || {
let mut sampler = ProcessCpuSampler::new();
loop {
if let Some(value) = sampler.sample_tenths_percent() {
metric_for_thread.store(value, Ordering::Relaxed);
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
});
metric
}))
}
pub struct VideoStream {
_pipeline: gst::Pipeline,
#[cfg(not(coverage))]
_bus_watch: Option<BusWatchHandle>,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
#[cfg(not(coverage))]
{
let _ = self._bus_watch.take();
}
}
}
#[cfg(not(coverage))]
struct BusWatchHandle {
alive: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
}
#[cfg(not(coverage))]
impl BusWatchHandle {
fn spawn(bus: gst::Bus, eye: String) -> Self {
let alive = Arc::new(AtomicBool::new(true));
let alive_flag = Arc::clone(&alive);
let join = std::thread::spawn(move || {
while alive_flag.load(Ordering::Relaxed) {
let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else {
continue;
};
match msg.view() {
Error(err) => {
error!(
target:"lesavka_server::video",
eye = %eye,
"💥 pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
);
break;
}
Warning(warning) => {
warn!(
target:"lesavka_server::video",
eye = %eye,
"⚠️ pipeline warning: {} ({})",
warning.error(),
warning.debug().unwrap_or_default()
);
}
Info(info_msg) => {
info!(
target:"lesavka_server::video",
eye = %eye,
"📌 pipeline info: {} ({})",
info_msg.error(),
info_msg.debug().unwrap_or_default()
);
}
StateChanged(state) if state.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video", eye = %eye, "🎬 pipeline PLAYING");
}
StateChanged(state) if state.current() == gst::State::Null => {
debug!(target:"lesavka_server::video", eye = %eye, "🛑 pipeline stopped");
break;
}
Eos(..) => {
debug!(target:"lesavka_server::video", eye = %eye, "🏁 pipeline EOS");
break;
}
_ => {}
}
}
});
Self {
alive,
join: Some(join),
}
}
}
#[cfg(not(coverage))]
impl Drop for BusWatchHandle {
fn drop(&mut self) {
self.alive.store(false, Ordering::Relaxed);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
#[cfg(not(coverage))]
fn start_eye_pipeline(pipeline: &gst::Pipeline, bus: &gst::Bus, eye: &str) -> anyhow::Result<()> {
pipeline
.set_state(gst::State::Playing)
.context(format!("🎥 starting video pipeline eye-{eye}"))?;
for _ in 0..20 {
match bus.timed_pop(gst::ClockTime::from_mseconds(200)) {
Some(msg) => match msg.view() {
Error(err) => {
let _ = pipeline.set_state(gst::State::Null);
return Err(anyhow::anyhow!(
"🎥 eye-{eye} pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
));
}
StateChanged(state) if state.current() == gst::State::Playing => return Ok(()),
_ => continue,
},
None => continue,
}
}
Ok(())
}
fn eye_device_wait_timeout() -> Duration {
Duration::from_millis(
std::env::var("LESAVKA_EYE_DEVICE_WAIT_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(5_000),
)
}
fn eye_device_wait_poll() -> Duration {
Duration::from_millis(
std::env::var("LESAVKA_EYE_DEVICE_POLL_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(|value| value.max(25))
.unwrap_or(100),
)
}
pub fn eye_source_profile() -> (u32, u32, u32) {
let width = round_down_even_u32(env_u32("LESAVKA_EYE_SOURCE_WIDTH", 1920).max(320));
let height = round_down_even_u32(env_u32("LESAVKA_EYE_SOURCE_HEIGHT", 1080).max(180));
let fps = env_u32("LESAVKA_EYE_SOURCE_FPS", 30).max(1);
(width, height, fps)
}
fn round_down_even_u32(value: u32) -> u32 {
let rounded = value.max(2);
rounded - (rounded % 2)
}
fn reset_stream_telemetry_window(
last_window_sec: &AtomicU64,
current_sec: u64,
source_gap_peak_ms: &AtomicU32,
send_gap_peak_ms: &AtomicU32,
queue_peak_depth: &AtomicU32,
) {
let prev = last_window_sec.load(Ordering::Relaxed);
if current_sec <= prev {
return;
}
if last_window_sec
.compare_exchange(prev, current_sec, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
source_gap_peak_ms.store(0, Ordering::Relaxed);
send_gap_peak_ms.store(0, Ordering::Relaxed);
queue_peak_depth.store(0, Ordering::Relaxed);
}
}
#[derive(Clone, Copy, Debug)]
struct EyeCaptureRequest {
source_width: u32,
source_height: u32,
requested_width: u32,
requested_height: u32,
requested_fps: u32,
max_bitrate_kbit: u32,
reencode: bool,
}
fn normalize_eye_capture_request(
requested_width: u32,
requested_height: u32,
requested_fps: u32,
max_bitrate_kbit: u32,
prefer_reencode: bool,
) -> EyeCaptureRequest {
let (source_width, source_height, source_fps) = eye_source_profile();
let requested_width = if requested_width == 0 {
source_width
} else {
round_down_even_u32(requested_width.min(source_width).max(320))
};
let requested_height = if requested_height == 0 {
source_height
} else {
round_down_even_u32(requested_height.min(source_height).max(180))
};
let requested_fps = if requested_fps == 0 {
source_fps.max(1)
} else {
requested_fps.max(1).min(source_fps.max(1))
};
let max_bitrate_kbit = max_bitrate_kbit.max(800);
let downscale = requested_width < source_width || requested_height < source_height;
let baseline_source_bitrate_kbit = 12_000;
let reencode = prefer_reencode
|| downscale
|| requested_fps != source_fps.max(1)
|| max_bitrate_kbit != baseline_source_bitrate_kbit;
EyeCaptureRequest {
source_width,
source_height,
requested_width,
requested_height,
requested_fps,
max_bitrate_kbit,
reencode,
}
}
#[cfg(not(coverage))]
async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> {
let timeout = eye_device_wait_timeout();
let poll = eye_device_wait_poll();
let deadline = Instant::now() + timeout;
let last_detail = loop {
let detail = match tokio::fs::metadata(dev).await {
Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()),
Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"),
Err(err) => err.to_string(),
};
if Instant::now() >= deadline {
break detail;
}
sleep(poll).await;
};
Err(anyhow::anyhow!(
"🎥 eye-{eye} device {dev} was not ready within {} ms: {}",
timeout.as_millis(),
last_detail
))
}
#[cfg(coverage)]
async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> {
let timeout = eye_device_wait_timeout();
let poll = eye_device_wait_poll();
let deadline = Instant::now() + timeout;
let last_detail = loop {
let detail = match tokio::fs::metadata(dev).await {
Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()),
Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"),
Err(err) => err.to_string(),
};
if Instant::now() >= deadline {
break detail;
}
sleep(poll).await;
};
Err(anyhow::anyhow!(
"🎥 eye-{eye} device {dev} was not ready within {} ms: {}",
timeout.as_millis(),
last_detail
))
}
/// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream.
///
/// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap.
/// Outputs: a `VideoStream` that yields H.264 access units for the requested eye.
/// Why: the server keeps bitrate-aware pacing close to the capture pipeline so it can drop
/// frames before they build up in gRPC queues and destabilize downstream playback.
#[cfg(coverage)]
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
eye_ball_with_request(dev, id, _max_bitrate_kbit, 0, 0, 0, false).await
}
#[cfg(coverage)]
pub async fn eye_ball_with_request(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
_requested_width: u32,
_requested_height: u32,
_requested_fps: u32,
_prefer_reencode: bool,
) -> anyhow::Result<VideoStream> {
let _ = EYE_ID[id as usize];
if dev.contains('"') {
return Err(anyhow::anyhow!("invalid video source"));
}
let coverage_override = std::env::var("LESAVKA_TEST_VIDEO_SOURCE").ok();
let use_test_src = dev.eq_ignore_ascii_case("testsrc")
|| dev.eq_ignore_ascii_case("videotestsrc")
|| coverage_override.as_deref() == Some(dev);
if !use_test_src {
return Err(anyhow::anyhow!("video source unavailable"));
}
let _ = gst::init();
let pipeline = gst::Pipeline::new();
let (tx, rx) = tokio::sync::mpsc::channel(64);
let _ = tx.try_send(Ok(VideoPacket {
id: id.min(1),
pts: 0,
data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84],
..Default::default()
}));
Ok(VideoStream {
_pipeline: pipeline,
#[cfg(not(coverage))]
_bus_watch: None,
inner: ReceiverStream::new(rx),
})
}
#[cfg(not(coverage))]
pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
eye_ball_with_request(dev, id, max_bitrate_kbit, 0, 0, 0, false).await
}
#[cfg(not(coverage))]
pub async fn eye_ball_with_request(
dev: &str,
id: u32,
max_bitrate_kbit: u32,
requested_width: u32,
requested_height: u32,
requested_fps: u32,
prefer_reencode: bool,
) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let request = normalize_eye_capture_request(
requested_width,
requested_height,
requested_fps,
max_bitrate_kbit,
prefer_reencode,
);
let target_fps = if requested_fps > 0 {
request.requested_fps
} else {
env_u32("LESAVKA_EYE_FPS", default_eye_fps(max_bitrate_kbit)).max(1)
};
let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps);
let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE")
.map(|value| value != "0")
.unwrap_or(true);
info!(
target: "lesavka_server::video",
eye = %eye,
max_bitrate_kbit,
source_width = request.source_width,
source_height = request.source_height,
requested_width = request.requested_width,
requested_height = request.requested_height,
requested_fps = request.requested_fps,
target_fps,
min_fps,
adaptive,
"🎥 eye stream profile selected"
);
let effective_fps = Arc::new(AtomicU32::new(target_fps));
let dropped_window = Arc::new(AtomicU64::new(0));
let dropped_total = Arc::new(AtomicU64::new(0));
let sent_window = Arc::new(AtomicU64::new(0));
let last_adjust_sec = Arc::new(AtomicU64::new(0));
let wait_for_idr = Arc::new(AtomicBool::new(false));
let last_sent = Arc::new(AtomicU64::new(0));
let last_source_pts = Arc::new(AtomicU64::new(0));
let source_gap_peak_ms = Arc::new(AtomicU32::new(0));
let send_gap_peak_ms = Arc::new(AtomicU32::new(0));
let queue_peak_depth = Arc::new(AtomicU32::new(0));
let last_telemetry_sec = Arc::new(AtomicU64::new(0));
let packet_seq = Arc::new(AtomicU64::new(0));
let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 4).max(1);
let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 4).max(1);
let keyframe_interval = env_u32(
"LESAVKA_EYE_KEYFRAME_INTERVAL",
request.requested_fps.max(1).min(5),
)
.clamp(1, request.requested_fps.max(1));
let use_test_src =
dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc");
let server_encoder_label = if use_test_src {
"x264enc(testsrc)".to_string()
} else if request.reencode {
"x264enc".to_string()
} else {
"source-pass-through".to_string()
};
let server_process_cpu_tenths = server_process_cpu_metric();
if !use_test_src {
wait_for_eye_device(dev, eye).await?;
}
let desc = if use_test_src {
let test_bitrate = env_u32("LESAVKA_EYE_TESTSRC_KBIT", request.max_bitrate_kbit);
format!(
"videotestsrc name=cam_{eye} is-live=true pattern=smpte ! \
video/x-raw,width={},height={},framerate={}/1 ! \
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
videoconvert ! video/x-raw,format=I420,width={},height={},framerate={}/1,pixel-aspect-ratio=1/1 ! \
x264enc tune=zerolatency speed-preset=veryfast bitrate={test_bitrate} key-int-max={keyframe_interval} option-string=sar=1/1 ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true",
request.requested_width,
request.requested_height,
request.requested_fps,
request.requested_width,
request.requested_height,
request.requested_fps,
)
} else if request.reencode {
format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse disable-passthrough=true config-interval=-1 ! avdec_h264 ! videoconvert ! \
videoscale add-borders=false ! videorate ! video/x-raw,format=I420,width={},height={},framerate={}/1,pixel-aspect-ratio=1/1 ! \
x264enc tune=zerolatency speed-preset=faster bitrate={} key-int-max={} option-string=sar=1/1 ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true",
request.requested_width,
request.requested_height,
request.requested_fps,
request.max_bitrate_kbit,
keyframe_interval,
)
} else {
format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true"
)
};
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 32).max(8);
let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity);
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|element| element.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref event)) = info.data {
if let gst::EventView::Caps(caps) = event.view() {
trace!(target:"lesavka_server::video", ?caps, "🔍 new caps on {}", pad.name());
}
}
gst::PadProbeReturn::Ok
});
} else {
warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_name = eye.to_string();
let dropped_total_for_cb = Arc::clone(&dropped_total);
let packet_seq_for_cb = Arc::clone(&packet_seq);
let effective_fps_for_cb = Arc::clone(&effective_fps);
let last_source_pts_for_cb = Arc::clone(&last_source_pts);
let source_gap_peak_ms_for_cb = Arc::clone(&source_gap_peak_ms);
let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms);
let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth);
let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec);
let server_encoder_label_for_cb = server_encoder_label.clone();
let server_process_cpu_tenths_for_cb = Arc::clone(&server_process_cpu_tenths);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let is_idr = contains_idr(map.as_slice());
static FRAME: AtomicU64 = AtomicU64::new(0);
let frame = FRAME.fetch_add(1, Ordering::Relaxed);
if frame % 120 == 0 && is_idr {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {frame} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{frame:05}.h264");
std::fs::write(&path, map.as_slice()).ok();
}
} else if frame < 10 {
debug!(
target: "lesavka_server::video",
eye = eye,
frame,
bytes = map.len(),
pts = ?buffer.pts(),
"⬆️ pushed video sample eye-{eye}"
);
}
if enabled!(Level::TRACE) && is_idr {
trace!("eye-{eye}: IDR");
}
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
let sec = pts_us / 1_000_000;
reset_stream_telemetry_window(
&last_telemetry_sec_for_cb,
sec,
&source_gap_peak_ms_for_cb,
&send_gap_peak_ms_for_cb,
&queue_peak_depth_for_cb,
);
let previous_source_pts = last_source_pts_for_cb.swap(pts_us, Ordering::Relaxed);
if previous_source_pts > 0 && pts_us > previous_source_pts {
let source_gap_ms =
((pts_us.saturating_sub(previous_source_pts)) / 1_000) as u32;
source_gap_peak_ms_for_cb.fetch_max(source_gap_ms, Ordering::Relaxed);
}
if adaptive {
let prev = last_adjust_sec.load(Ordering::Relaxed);
if sec > prev
&& last_adjust_sec
.compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let dropped = dropped_window.swap(0, Ordering::Relaxed);
let sent = sent_window.swap(0, Ordering::Relaxed);
let current = effective_fps.load(Ordering::Relaxed).max(1);
let next = adjust_effective_fps(current, min_fps, target_fps, dropped, sent);
if next != current {
effective_fps.store(next, Ordering::Relaxed);
if next < current {
warn!(
target: "lesavka_server::video",
eye = %eye_name,
fps = next,
"🎥 adaptive eye fps ↓"
);
} else {
info!(
target: "lesavka_server::video",
eye = %eye_name,
fps = next,
"🎥 adaptive eye fps ↑"
);
}
}
}
}
let current_fps = effective_fps.load(Ordering::Relaxed).max(1);
let last = last_sent.load(Ordering::Relaxed);
if !should_send_frame(last, pts_us, current_fps) {
return Ok(gst::FlowSuccess::Ok);
}
if last > 0 && pts_us > last {
let send_gap_ms = ((pts_us.saturating_sub(last)) / 1_000) as u32;
send_gap_peak_ms_for_cb.fetch_max(send_gap_ms, Ordering::Relaxed);
}
last_sent.store(pts_us, Ordering::Relaxed);
if wait_for_idr.load(Ordering::Relaxed) && !is_idr {
return Ok(gst::FlowSuccess::Ok);
}
let data = map.as_slice().to_vec();
let size = data.len();
let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1;
let queue_depth = (chan_capacity.saturating_sub(tx.capacity())) as u32;
queue_peak_depth_for_cb.fetch_max(queue_depth, Ordering::Relaxed);
let pkt = VideoPacket {
id,
pts: pts_us,
data,
seq,
effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1),
dropped_total: dropped_total_for_cb.load(Ordering::Relaxed),
queue_depth,
server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed),
server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed),
server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed),
server_encoder_label: server_encoder_label_for_cb.clone(),
server_process_cpu_tenths: server_process_cpu_tenths_for_cb
.load(Ordering::Relaxed),
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
sent_window.fetch_add(1, Ordering::Relaxed);
if is_idr {
wait_for_idr.store(false, Ordering::Relaxed);
}
trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_window.fetch_add(1, Ordering::Relaxed);
dropped_total_for_cb.fetch_add(1, Ordering::Relaxed);
wait_for_idr.store(true, Ordering::Relaxed);
static DROP_CNT: AtomicU64 = AtomicU64::new(0);
let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed);
if dropped % 120 == 0 {
debug!(
target:"lesavka_server::video",
eye = %eye,
dropped,
"🎥⏳ channel full - dropping frames"
);
}
}
Err(error) => error!("mpsc send err: {error}"),
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
let bus = pipeline.bus().expect("bus");
start_eye_pipeline(&pipeline, &bus, eye)?;
let bus_watch = BusWatchHandle::spawn(bus, eye.to_owned());
Ok(VideoStream {
_pipeline: pipeline,
_bus_watch: Some(bus_watch),
inner: ReceiverStream::new(rx),
})
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
#[test]
fn source_profile_stays_pass_through_without_explicit_reencode_request() {
let request = normalize_eye_capture_request(1920, 1080, 30, 12_000, false);
assert_eq!(request.requested_width, 1920);
assert_eq!(request.requested_height, 1080);
assert_eq!(request.requested_fps, 30);
assert!(!request.reencode);
}
#[test]
fn explicit_reencode_preference_forces_same_size_reencode() {
let request = normalize_eye_capture_request(1920, 1080, 30, 12_000, true);
assert_eq!(request.requested_width, 1920);
assert_eq!(request.requested_height, 1080);
assert_eq!(request.requested_fps, 30);
assert!(request.reencode);
}
#[test]
fn bitrate_or_fps_change_forces_reencode_even_at_source_size() {
let bitrate_request = normalize_eye_capture_request(1920, 1080, 30, 2_500, false);
let fps_request = normalize_eye_capture_request(1920, 1080, 24, 12_000, false);
assert!(bitrate_request.reencode);
assert!(fps_request.reencode);
}
fn marker_frame(width: i32, height: i32) -> Vec<u8> {
let mut rgba = vec![0_u8; (width * height * 4) as usize];
let marker = 96;
for y in 0..height {
for x in 0..width {
let idx = ((y * width + x) * 4) as usize;
let (r, g, b) = if x < marker && y < marker {
(255, 0, 0)
} else if x >= width - marker && y < marker {
(0, 255, 0)
} else if x < marker && y >= height - marker {
(0, 0, 255)
} else if x >= width - marker && y >= height - marker {
(255, 255, 0)
} else {
(24, 24, 24)
};
rgba[idx..idx + 4].copy_from_slice(&[r, g, b, 255]);
}
}
rgba
}
fn pull_reencoded_frame_rgba(
width: i32,
height: i32,
input_fps: u32,
output_fps: u32,
) -> anyhow::Result<(i32, i32, Vec<u8>)> {
gst::init().context("gst init")?;
let desc = format!(
"appsrc name=src is-live=false format=time block=true ! \
videoconvert ! video/x-raw,format=I420,width={width},height={height},framerate={input_fps}/1,pixel-aspect-ratio=1/1 ! \
x264enc tune=zerolatency speed-preset=veryfast bitrate=12000 key-int-max={input_fps} option-string=sar=1/1 ! \
h264parse disable-passthrough=true config-interval=-1 ! \
avdec_h264 ! videoconvert ! videoscale add-borders=false ! videorate ! \
video/x-raw,format=I420,width={width},height={height},framerate={output_fps}/1,pixel-aspect-ratio=1/1 ! \
x264enc tune=zerolatency speed-preset=faster bitrate=12000 key-int-max=5 option-string=sar=1/1 ! \
h264parse disable-passthrough=true config-interval=-1 ! \
avdec_h264 ! videoconvert ! video/x-raw,format=RGBA,pixel-aspect-ratio=1/1 ! \
appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true"
);
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("pipeline");
let appsrc = pipeline
.by_name("src")
.expect("appsrc")
.downcast::<gst_app::AppSrc>()
.expect("appsrc cast");
appsrc.set_caps(Some(
&gst::Caps::builder("video/x-raw")
.field("format", &"RGBA")
.field("width", &width)
.field("height", &height)
.field("framerate", &gst::Fraction::new(input_fps as i32, 1))
.field("pixel-aspect-ratio", &gst::Fraction::new(1, 1))
.build(),
));
appsrc.set_format(gst::Format::Time);
let appsink = pipeline
.by_name("sink")
.expect("appsink")
.downcast::<gst_app::AppSink>()
.expect("appsink cast");
appsink.set_caps(Some(
&gst::Caps::builder("video/x-raw")
.field("format", &"RGBA")
.field("pixel-aspect-ratio", &gst::Fraction::new(1, 1))
.build(),
));
pipeline
.set_state(gst::State::Playing)
.context("starting reencode probe pipeline")?;
let mut buffer = gst::Buffer::from_mut_slice(marker_frame(width, height));
if let Some(buf) = buffer.get_mut() {
buf.set_pts(Some(gst::ClockTime::ZERO));
buf.set_duration(Some(
gst::ClockTime::from_nseconds(1_000_000_000_u64 / input_fps.max(1) as u64),
));
}
appsrc
.push_buffer(buffer)
.map_err(|err| anyhow::anyhow!("push buffer failed: {err:?}"))?;
appsrc
.end_of_stream()
.map_err(|err| anyhow::anyhow!("eos failed: {err:?}"))?;
let sample = appsink
.try_pull_sample(gst::ClockTime::from_seconds(5))
.ok_or_else(|| anyhow::anyhow!("timed out pulling reencoded frame"))?;
let caps = sample.caps().ok_or_else(|| anyhow::anyhow!("missing sample caps"))?;
let structure = caps
.structure(0)
.ok_or_else(|| anyhow::anyhow!("missing caps structure"))?;
let out_width = structure
.get::<i32>("width")
.map_err(|err| anyhow::anyhow!("missing output width: {err}"))?;
let out_height = structure
.get::<i32>("height")
.map_err(|err| anyhow::anyhow!("missing output height: {err}"))?;
let buffer = sample
.buffer()
.ok_or_else(|| anyhow::anyhow!("missing sample buffer"))?;
let map = buffer
.map_readable()
.map_err(|_| anyhow::anyhow!("sample map failed"))?;
let rgba = map.as_slice().to_vec();
let _ = pipeline.set_state(gst::State::Null);
Ok((out_width, out_height, rgba))
}
fn rgba_pixel(rgba: &[u8], width: i32, x: i32, y: i32) -> (u8, u8, u8) {
let idx = ((y * width + x) * 4) as usize;
(rgba[idx], rgba[idx + 1], rgba[idx + 2])
}
#[test]
#[serial]
fn reencode_probe_preserves_corner_markers_on_full_frame_content() {
let (width, height, rgba) =
pull_reencoded_frame_rgba(1920, 1080, 60, 30).expect("probe frame");
assert_eq!((width, height), (1920, 1080));
let top_left = rgba_pixel(&rgba, width, 24, 24);
let top_right = rgba_pixel(&rgba, width, width - 25, 24);
let bottom_left = rgba_pixel(&rgba, width, 24, height - 25);
let bottom_right = rgba_pixel(&rgba, width, width - 25, height - 25);
assert!(
top_left.0 > 180 && top_left.1 < 80 && top_left.2 < 80,
"top-left marker drifted: {top_left:?}"
);
assert!(
top_right.1 > 180 && top_right.0 < 80 && top_right.2 < 80,
"top-right marker drifted: {top_right:?}"
);
assert!(
bottom_left.2 > 180 && bottom_left.0 < 80 && bottom_left.1 < 80,
"bottom-left marker drifted: {bottom_left:?}"
);
assert!(
bottom_right.0 > 180 && bottom_right.1 > 180 && bottom_right.2 < 120,
"bottom-right marker drifted: {bottom_right:?}"
);
}
}