From d198fbf55c0432aebec9a883e567afbbc0865c3e Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 28 Jun 2025 03:34:48 -0500 Subject: [PATCH] Video Fix --- client/src/app.rs | 35 ++++++++++++++++++++++++----- scripts/install/server.sh | 2 +- scripts/manual/video-frame-fetch.sh | 8 +++---- server/src/video.rs | 13 ++++++----- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 5c12189..64f3e86 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -8,7 +8,7 @@ use tonic::{transport::Channel, Request}; use tracing::{error, info, warn}; use winit::{ event::Event, - event_loop::EventLoopBuilder, + event_loop::{EventLoopBuilder, ControlFlow}, platform::wayland::EventLoopBuilderExtWayland, }; @@ -83,17 +83,29 @@ impl LesavkaClientApp { let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); std::thread::spawn(move || { + tracing::info!("🎥 spawn move"); let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap(); let win0 = MonitorWindow::new(0, &el).expect("win0"); let win1 = MonitorWindow::new(1, &el).expect("win1"); - let _ = el.run(move |_: Event<()>, _| { + let _ = el.run(move |_: Event<()>, _elwt| { + _elwt.set_control_flow(ControlFlow::WaitUntil( + std::time::Instant::now() + std::time::Duration::from_millis(16))); + tracing::info!("🎥 el run move"); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); while let Ok(pkt) = video_rx.try_recv() { + tracing::info!("🎥 pkt recieved?"); CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { tracing::debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); } + let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n % 120 == 0 { + let eye = if pkt.id == 0 { "l" } else { "r" }; + let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); + std::fs::write(&path, &pkt.data).ok(); + } match pkt.id { 0 => win0.push_packet(pkt), 1 => win1.push_packet(pkt), @@ -184,12 +196,23 @@ impl LesavkaClientApp { let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { - while let Some(pkt) = stream.get_mut().message().await.transpose() { - match pkt { - Ok(p) => { let _ = tx.send(p); } - Err(e) => { error!("video {monitor_id}: {e}"); break; } + tracing::info!("cli video{monitor_id}: stream opened"); + while let Some(res) = stream.get_mut().message().await.transpose() { + match res { + Ok(pkt) => { + tracing::debug!("cli video{monitor_id}: got {} bytes", pkt.data.len()); + if tx.send(pkt).is_err() { + tracing::warn!("cli video{monitor_id}: GUI thread gone"); + break; + } + } + Err(e) => { + tracing::error!("cli video{monitor_id}: gRPC error: {e}"); + break; + } } } + tracing::warn!("cli video{monitor_id}: stream ended"); } Err(e) => error!("video {monitor_id}: {e}"), } diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 0665851..84d43d2 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -105,7 +105,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::video=trace,lesavka_server::usb_gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::video=trace,lesavka_server::usb_gadget=infotonic=debug,h2=debug,lesavka_client::app=trace Environment=RUST_BACKTRACE=1 Restart=always RestartSec=5 diff --git a/scripts/manual/video-frame-fetch.sh b/scripts/manual/video-frame-fetch.sh index 27538da..24c9415 100755 --- a/scripts/manual/video-frame-fetch.sh +++ b/scripts/manual/video-frame-fetch.sh @@ -4,14 +4,14 @@ REMOTE_DIR="/tmp" # where eye*-idr.h264 are written set -eu WORKDIR="$(mktemp -d)" -echo "⏬ pulling IDR samples from $PI_HOST ..." -scp "${PI_HOST}:${REMOTE_DIR}/eye*-idr.h264" "$WORKDIR/" +echo "⏬ pulling h264 samples from $PI_HOST ..." +scp "${PI_HOST}:${REMOTE_DIR}/eye*.h264" "$WORKDIR/" echo "🎞️ converting to PNG ..." -for h264 in "$WORKDIR"/eye*-idr.h264; do +for h264 in "$WORKDIR"/eye*.h264; do png="${h264%.h264}.png" ffmpeg -loglevel error -y -f h264 -i "$h264" -frames:v 1 "$png" echo "🖼️ $(basename "$png") ready" xdg-open "$png" >/dev/null 2>&1 & done -echo "✅ done – images are opening (directory: $WORKDIR)" +echo "✅ done - images are opening (directory: $WORKDIR)" diff --git a/server/src/video.rs b/server/src/video.rs index 484fa07..e55d655 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -10,6 +10,7 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, enabled, trace, Level}; +const EYE_ID: [&str; 2] = ["l", "r"]; static START: std::sync::OnceLock = std::sync::OnceLock::new(); pub async fn eye_ball( @@ -17,6 +18,7 @@ pub async fn eye_ball( id: u32, _max_bitrate_kbit: u32, ) -> anyhow::Result>> { + let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; let desc = format!( @@ -53,10 +55,10 @@ pub async fn eye_ball( std::sync::atomic::AtomicU64::new(0); let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { - // trace!("eye{id}: delivered {n} frames"); - trace!(target: "lesavka_server::video", "eye{id}: delivered {n} frames"); + // trace!("eye{eye}: delivered {n} frames"); + trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames"); if enabled!(Level::TRACE) { - let path = format!("/tmp/eye{id}-srv-{:05}.h264", n); + let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n); std::fs::write(&path, map.as_slice()).ok(); } } @@ -64,14 +66,14 @@ pub async fn eye_ball( // write first IDR to disk (quick sanity check) if n == 0 { // let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - std::fs::write(format!("/tmp/eye{id}-idr.h264"), map.as_slice()).ok(); + std::fs::write(format!("/tmp/eye-{eye}-idr.h264"), map.as_slice()).ok(); } /* -------- detect SPS / IDR ---- */ if enabled!(Level::DEBUG) { if let Some(&nal) = map.as_slice().get(4) { if (nal & 0x1F) == 0x05 /* IDR */ { - debug!("eye{id}: IDR"); + debug!("eye-{eye}: IDR"); } } } @@ -91,6 +93,7 @@ pub async fn eye_ball( pts: pts_us, data: map.as_slice().to_vec(), }; + tracing::trace!("srv→grpc eye-{eye} {} bytes pts={}", pkt.data.len(), pkt.pts); let _ = tx.try_send(Ok(pkt)); Ok(gst::FlowSuccess::Ok)