2026-04-23 07:00:06 -03:00
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> {
|
|
|
|
|
let timeout = eye_device_wait_timeout();
|
|
|
|
|
let poll = eye_device_wait_poll();
|
|
|
|
|
let deadline = Instant::now() + timeout;
|
|
|
|
|
let last_detail = loop {
|
|
|
|
|
let detail = match tokio::fs::metadata(dev).await {
|
|
|
|
|
Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()),
|
|
|
|
|
Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"),
|
|
|
|
|
Err(err) => err.to_string(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if Instant::now() >= deadline {
|
|
|
|
|
break detail;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sleep(poll).await;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Err(anyhow::anyhow!(
|
|
|
|
|
"🎥 eye-{eye} device {dev} was not ready within {} ms: {}",
|
|
|
|
|
timeout.as_millis(),
|
|
|
|
|
last_detail
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(coverage)]
|
|
|
|
|
async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> {
|
|
|
|
|
let timeout = eye_device_wait_timeout();
|
|
|
|
|
let poll = eye_device_wait_poll();
|
|
|
|
|
let deadline = Instant::now() + timeout;
|
|
|
|
|
let last_detail = loop {
|
|
|
|
|
let detail = match tokio::fs::metadata(dev).await {
|
|
|
|
|
Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()),
|
|
|
|
|
Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"),
|
|
|
|
|
Err(err) => err.to_string(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if Instant::now() >= deadline {
|
|
|
|
|
break detail;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sleep(poll).await;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Err(anyhow::anyhow!(
|
|
|
|
|
"🎥 eye-{eye} device {dev} was not ready within {} ms: {}",
|
|
|
|
|
timeout.as_millis(),
|
|
|
|
|
last_detail
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream.
|
|
|
|
|
///
|
|
|
|
|
/// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap.
|
|
|
|
|
/// Outputs: a `VideoStream` that yields H.264 access units for the requested eye.
|
|
|
|
|
/// Why: the server keeps bitrate-aware pacing close to the capture pipeline so it can drop
|
|
|
|
|
/// frames before they build up in gRPC queues and destabilize downstream playback.
|
|
|
|
|
#[cfg(coverage)]
|
|
|
|
|
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
|
|
|
|
|
eye_ball_with_request(dev, id, _max_bitrate_kbit, 0, 0, 0).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(coverage)]
|
|
|
|
|
pub async fn eye_ball_with_request(
|
|
|
|
|
dev: &str,
|
|
|
|
|
id: u32,
|
|
|
|
|
_max_bitrate_kbit: u32,
|
|
|
|
|
_requested_width: u32,
|
|
|
|
|
_requested_height: u32,
|
|
|
|
|
_requested_fps: u32,
|
|
|
|
|
) -> anyhow::Result<VideoStream> {
|
|
|
|
|
let _ = EYE_ID[id as usize];
|
|
|
|
|
if dev.contains('"') {
|
|
|
|
|
return Err(anyhow::anyhow!("invalid video source"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let coverage_override = std::env::var("LESAVKA_TEST_VIDEO_SOURCE").ok();
|
|
|
|
|
let use_test_src = dev.eq_ignore_ascii_case("testsrc")
|
|
|
|
|
|| dev.eq_ignore_ascii_case("videotestsrc")
|
|
|
|
|
|| coverage_override.as_deref() == Some(dev);
|
|
|
|
|
if !use_test_src {
|
|
|
|
|
return Err(anyhow::anyhow!("video source unavailable"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _ = gst::init();
|
|
|
|
|
let (tx, rx) = tokio::sync::mpsc::channel(64);
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
|
|
|
|
|
for seq in 0..8 {
|
|
|
|
|
let _ = tx
|
|
|
|
|
.send(Ok(VideoPacket {
|
|
|
|
|
id: id.min(1),
|
|
|
|
|
pts: seq * 16_666,
|
|
|
|
|
data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84],
|
|
|
|
|
seq: seq + 1,
|
|
|
|
|
effective_fps: 60,
|
|
|
|
|
server_encoder_label: "coverage-testsrc".to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
}))
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Ok(VideoStream {
|
|
|
|
|
_pipeline: gst::Pipeline::new(),
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
_bus_watch: None,
|
|
|
|
|
inner: ReceiverStream::new(rx),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
|
|
|
|
|
eye_ball_with_request(dev, id, max_bitrate_kbit, 0, 0, 0).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
pub async fn eye_ball_with_request(
|
|
|
|
|
dev: &str,
|
|
|
|
|
id: u32,
|
|
|
|
|
max_bitrate_kbit: u32,
|
|
|
|
|
requested_width: u32,
|
|
|
|
|
requested_height: u32,
|
|
|
|
|
requested_fps: u32,
|
|
|
|
|
) -> anyhow::Result<VideoStream> {
|
|
|
|
|
let eye = EYE_ID[id as usize];
|
|
|
|
|
gst::init().context("gst init")?;
|
|
|
|
|
|
|
|
|
|
let request = normalize_eye_capture_request(
|
|
|
|
|
requested_width,
|
|
|
|
|
requested_height,
|
|
|
|
|
requested_fps,
|
|
|
|
|
max_bitrate_kbit,
|
|
|
|
|
);
|
|
|
|
|
let target_fps = if requested_fps > 0 {
|
|
|
|
|
request.fps
|
|
|
|
|
} else {
|
|
|
|
|
env_u32("LESAVKA_EYE_FPS", default_eye_fps(max_bitrate_kbit)).max(1)
|
|
|
|
|
};
|
|
|
|
|
let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps);
|
|
|
|
|
let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE")
|
|
|
|
|
.map(|value| value != "0")
|
|
|
|
|
.unwrap_or(true);
|
|
|
|
|
info!(
|
|
|
|
|
target: "lesavka_server::video",
|
|
|
|
|
eye = %eye,
|
|
|
|
|
max_bitrate_kbit,
|
|
|
|
|
source_width = request.width,
|
|
|
|
|
source_height = request.height,
|
|
|
|
|
source_fps = request.fps,
|
|
|
|
|
requested_width = request.width,
|
|
|
|
|
requested_height = request.height,
|
|
|
|
|
requested_fps = request.fps,
|
|
|
|
|
target_fps,
|
|
|
|
|
min_fps,
|
|
|
|
|
adaptive,
|
|
|
|
|
"🎥 eye stream profile selected"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let effective_fps = Arc::new(AtomicU32::new(target_fps));
|
|
|
|
|
let dropped_window = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let dropped_total = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let sent_window = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let last_adjust_sec = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let wait_for_idr = Arc::new(AtomicBool::new(false));
|
|
|
|
|
let last_sent = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let last_source_pts = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let source_gap_peak_ms = Arc::new(AtomicU32::new(0));
|
|
|
|
|
let send_gap_peak_ms = Arc::new(AtomicU32::new(0));
|
|
|
|
|
let queue_peak_depth = Arc::new(AtomicU32::new(0));
|
|
|
|
|
let last_telemetry_sec = Arc::new(AtomicU64::new(0));
|
|
|
|
|
let packet_seq = Arc::new(AtomicU64::new(0));
|
|
|
|
|
|
|
|
|
|
let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 4).max(1);
|
|
|
|
|
let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 4).max(1);
|
|
|
|
|
let keyframe_interval = env_u32("LESAVKA_EYE_KEYFRAME_INTERVAL", request.fps.clamp(1, 5))
|
|
|
|
|
.clamp(1, request.fps.max(1));
|
|
|
|
|
let use_test_src =
|
|
|
|
|
dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc");
|
|
|
|
|
let server_encoder_label = if use_test_src {
|
|
|
|
|
"x264enc(testsrc)".to_string()
|
|
|
|
|
} else {
|
|
|
|
|
"source-pass-through".to_string()
|
|
|
|
|
};
|
|
|
|
|
let server_process_cpu_tenths = server_process_cpu_metric();
|
|
|
|
|
if !use_test_src {
|
|
|
|
|
wait_for_eye_device(dev, eye).await?;
|
|
|
|
|
}
|
|
|
|
|
let desc = if use_test_src {
|
|
|
|
|
let test_bitrate = env_u32("LESAVKA_EYE_TESTSRC_KBIT", request.max_bitrate_kbit);
|
|
|
|
|
format!(
|
|
|
|
|
"videotestsrc name=cam_{eye} is-live=true pattern=smpte ! \
|
|
|
|
|
video/x-raw,width={},height={},framerate={}/1 ! \
|
|
|
|
|
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
|
|
|
|
|
videoconvert ! video/x-raw,format=I420,width={},height={},framerate={}/1,pixel-aspect-ratio=1/1 ! \
|
|
|
|
|
x264enc tune=zerolatency speed-preset=veryfast bitrate={test_bitrate} key-int-max={keyframe_interval} option-string=sar=1/1 ! \
|
|
|
|
|
h264parse disable-passthrough=true config-interval=-1 ! \
|
|
|
|
|
video/x-h264,stream-format=byte-stream,alignment=au ! \
|
|
|
|
|
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true",
|
|
|
|
|
request.width, request.height, request.fps, request.width, request.height, request.fps,
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
format!(
|
|
|
|
|
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
|
|
|
|
|
video/x-h264,width={},height={} ! \
|
|
|
|
|
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
|
|
|
|
|
h264parse disable-passthrough=true config-interval=-1 ! \
|
|
|
|
|
video/x-h264,stream-format=byte-stream,alignment=au ! \
|
|
|
|
|
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true",
|
|
|
|
|
request.width, request.height,
|
|
|
|
|
)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let pipeline = gst::parse::launch(&desc)?
|
|
|
|
|
.downcast::<gst::Pipeline>()
|
|
|
|
|
.expect("not a pipeline");
|
|
|
|
|
|
|
|
|
|
let sink = pipeline
|
|
|
|
|
.by_name("sink")
|
|
|
|
|
.expect("appsink")
|
|
|
|
|
.dynamic_cast::<gst_app::AppSink>()
|
|
|
|
|
.expect("appsink down-cast");
|
|
|
|
|
|
|
|
|
|
let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 32).max(8);
|
|
|
|
|
let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity);
|
|
|
|
|
|
|
|
|
|
if let Some(src_pad) = pipeline
|
|
|
|
|
.by_name(&format!("cam_{eye}"))
|
|
|
|
|
.and_then(|element| element.static_pad("src"))
|
|
|
|
|
{
|
|
|
|
|
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
|
|
|
|
|
if let Some(gst::PadProbeData::Event(ref event)) = info.data
|
|
|
|
|
&& let gst::EventView::Caps(caps) = event.view() {
|
|
|
|
|
trace!(target:"lesavka_server::video", ?caps, "🔍 new caps on {}", pad.name());
|
|
|
|
|
}
|
|
|
|
|
gst::PadProbeReturn::Ok
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let eye_name = eye.to_string();
|
|
|
|
|
let dropped_total_for_cb = Arc::clone(&dropped_total);
|
|
|
|
|
let packet_seq_for_cb = Arc::clone(&packet_seq);
|
|
|
|
|
let effective_fps_for_cb = Arc::clone(&effective_fps);
|
|
|
|
|
let last_source_pts_for_cb = Arc::clone(&last_source_pts);
|
|
|
|
|
let source_gap_peak_ms_for_cb = Arc::clone(&source_gap_peak_ms);
|
|
|
|
|
let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms);
|
|
|
|
|
let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth);
|
|
|
|
|
let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec);
|
|
|
|
|
let server_encoder_label_for_cb = server_encoder_label.clone();
|
|
|
|
|
let server_process_cpu_tenths_for_cb = Arc::clone(&server_process_cpu_tenths);
|
|
|
|
|
sink.set_callbacks(
|
|
|
|
|
gst_app::AppSinkCallbacks::builder()
|
|
|
|
|
.new_sample(move |sink| {
|
|
|
|
|
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
|
|
|
|
|
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
|
|
|
|
|
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
|
|
|
|
|
let is_idr = contains_idr(map.as_slice());
|
|
|
|
|
|
|
|
|
|
static FRAME: AtomicU64 = AtomicU64::new(0);
|
|
|
|
|
let frame = FRAME.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
if frame.is_multiple_of(120) && is_idr {
|
|
|
|
|
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {frame} frames");
|
|
|
|
|
if enabled!(Level::TRACE) {
|
|
|
|
|
let path = format!("/tmp/eye-{eye}-srv-{frame:05}.h264");
|
|
|
|
|
std::fs::write(&path, map.as_slice()).ok();
|
|
|
|
|
}
|
|
|
|
|
} else if frame < 10 {
|
|
|
|
|
debug!(
|
|
|
|
|
target: "lesavka_server::video",
|
|
|
|
|
eye = eye,
|
|
|
|
|
frame,
|
|
|
|
|
bytes = map.len(),
|
|
|
|
|
pts = ?buffer.pts(),
|
|
|
|
|
"⬆️ pushed video sample eye-{eye}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if enabled!(Level::TRACE) && is_idr {
|
|
|
|
|
trace!("eye-{eye}: IDR");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
|
|
|
|
|
let pts_us = buffer
|
|
|
|
|
.pts()
|
|
|
|
|
.unwrap_or(gst::ClockTime::ZERO)
|
|
|
|
|
.saturating_sub(origin)
|
|
|
|
|
.nseconds()
|
|
|
|
|
/ 1_000;
|
|
|
|
|
let sec = pts_us / 1_000_000;
|
|
|
|
|
reset_stream_telemetry_window(
|
|
|
|
|
&last_telemetry_sec_for_cb,
|
|
|
|
|
sec,
|
|
|
|
|
&source_gap_peak_ms_for_cb,
|
|
|
|
|
&send_gap_peak_ms_for_cb,
|
|
|
|
|
&queue_peak_depth_for_cb,
|
|
|
|
|
);
|
|
|
|
|
let previous_source_pts = last_source_pts_for_cb.swap(pts_us, Ordering::Relaxed);
|
|
|
|
|
if previous_source_pts > 0 && pts_us > previous_source_pts {
|
|
|
|
|
let source_gap_ms =
|
|
|
|
|
((pts_us.saturating_sub(previous_source_pts)) / 1_000) as u32;
|
|
|
|
|
source_gap_peak_ms_for_cb.fetch_max(source_gap_ms, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if adaptive {
|
|
|
|
|
let prev = last_adjust_sec.load(Ordering::Relaxed);
|
|
|
|
|
if sec > prev
|
|
|
|
|
&& last_adjust_sec
|
|
|
|
|
.compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst)
|
|
|
|
|
.is_ok()
|
|
|
|
|
{
|
|
|
|
|
let dropped = dropped_window.swap(0, Ordering::Relaxed);
|
|
|
|
|
let sent = sent_window.swap(0, Ordering::Relaxed);
|
|
|
|
|
let current = effective_fps.load(Ordering::Relaxed).max(1);
|
|
|
|
|
let next = adjust_effective_fps(current, min_fps, target_fps, dropped, sent);
|
|
|
|
|
if next != current {
|
|
|
|
|
effective_fps.store(next, Ordering::Relaxed);
|
|
|
|
|
if next < current {
|
|
|
|
|
warn!(
|
|
|
|
|
target: "lesavka_server::video",
|
|
|
|
|
eye = %eye_name,
|
|
|
|
|
fps = next,
|
|
|
|
|
"🎥 adaptive eye fps ↓"
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
info!(
|
|
|
|
|
target: "lesavka_server::video",
|
|
|
|
|
eye = %eye_name,
|
|
|
|
|
fps = next,
|
|
|
|
|
"🎥 adaptive eye fps ↑"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let current_fps = effective_fps.load(Ordering::Relaxed).max(1);
|
|
|
|
|
let last = last_sent.load(Ordering::Relaxed);
|
|
|
|
|
if !should_send_frame(last, pts_us, current_fps) {
|
|
|
|
|
return Ok(gst::FlowSuccess::Ok);
|
|
|
|
|
}
|
|
|
|
|
if last > 0 && pts_us > last {
|
|
|
|
|
let send_gap_ms = ((pts_us.saturating_sub(last)) / 1_000) as u32;
|
|
|
|
|
send_gap_peak_ms_for_cb.fetch_max(send_gap_ms, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
last_sent.store(pts_us, Ordering::Relaxed);
|
|
|
|
|
|
|
|
|
|
if wait_for_idr.load(Ordering::Relaxed) && !is_idr {
|
|
|
|
|
return Ok(gst::FlowSuccess::Ok);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let data = map.as_slice().to_vec();
|
|
|
|
|
let size = data.len();
|
|
|
|
|
let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1;
|
|
|
|
|
let queue_depth = (chan_capacity.saturating_sub(tx.capacity())) as u32;
|
|
|
|
|
queue_peak_depth_for_cb.fetch_max(queue_depth, Ordering::Relaxed);
|
|
|
|
|
let pkt = VideoPacket {
|
|
|
|
|
id,
|
|
|
|
|
pts: pts_us,
|
|
|
|
|
data,
|
|
|
|
|
seq,
|
|
|
|
|
effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1),
|
|
|
|
|
dropped_total: dropped_total_for_cb.load(Ordering::Relaxed),
|
|
|
|
|
queue_depth,
|
|
|
|
|
server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed),
|
|
|
|
|
server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed),
|
|
|
|
|
server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed),
|
|
|
|
|
server_encoder_label: server_encoder_label_for_cb.clone(),
|
|
|
|
|
server_process_cpu_tenths: server_process_cpu_tenths_for_cb
|
|
|
|
|
.load(Ordering::Relaxed),
|
2026-05-02 17:27:59 -03:00
|
|
|
..Default::default()
|
2026-04-23 07:00:06 -03:00
|
|
|
};
|
|
|
|
|
match tx.try_send(Ok(pkt)) {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
sent_window.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
if is_idr {
|
|
|
|
|
wait_for_idr.store(false, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent");
|
|
|
|
|
}
|
|
|
|
|
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
|
|
|
|
dropped_window.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
dropped_total_for_cb.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
wait_for_idr.store(true, Ordering::Relaxed);
|
|
|
|
|
static DROP_CNT: AtomicU64 = AtomicU64::new(0);
|
|
|
|
|
let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
if dropped.is_multiple_of(120) {
|
|
|
|
|
debug!(
|
|
|
|
|
target:"lesavka_server::video",
|
|
|
|
|
eye = %eye,
|
|
|
|
|
dropped,
|
|
|
|
|
"🎥⏳ channel full - dropping frames"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(error) => error!("mpsc send err: {error}"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
|
})
|
|
|
|
|
.build(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let bus = pipeline.bus().expect("bus");
|
|
|
|
|
start_eye_pipeline(&pipeline, &bus, eye)?;
|
|
|
|
|
let bus_watch = BusWatchHandle::spawn(bus, eye.to_owned());
|
|
|
|
|
|
|
|
|
|
Ok(VideoStream {
|
|
|
|
|
_pipeline: pipeline,
|
|
|
|
|
_bus_watch: Some(bus_watch),
|
|
|
|
|
inner: ReceiverStream::new(rx),
|
|
|
|
|
})
|
|
|
|
|
}
|