From 8da4f36dc35a80dcb2d94aed26cd91be2a72bb85 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 27 Jun 2025 22:51:50 -0500 Subject: [PATCH] Video Fix & Script Updates --- client/src/app.rs | 4 +- client/src/output/video.rs | 41 +++++----- scripts/{ => daemon}/lesavka-core.sh | 0 .../{install-client.sh => install/client.sh} | 2 +- .../{install-server.sh => install/server.sh} | 8 +- scripts/manual/usb-reset.sh | 7 ++ scripts/manual/video-check.sh | 8 ++ server/src/main.rs | 4 +- server/src/video.rs | 81 ++++++++----------- 9 files changed, 81 insertions(+), 74 deletions(-) rename scripts/{ => daemon}/lesavka-core.sh (100%) rename scripts/{install-client.sh => install/client.sh} (94%) rename scripts/{install-server.sh => install/server.sh} (92%) create mode 100644 scripts/manual/usb-reset.sh create mode 100644 scripts/manual/video-check.sh diff --git a/client/src/app.rs b/client/src/app.rs index f8a4854..59bbb68 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -52,8 +52,8 @@ impl LesavkaClientApp { .connect_lazy(); let vid_ep = Channel::from_shared(self.server_addr.clone())? - .initial_connection_window_size(2<<20) - .initial_stream_window_size(2<<20) + .initial_connection_window_size(4<<20) + .initial_stream_window_size(4<<20) .tcp_nodelay(true) .connect_lazy(); diff --git a/client/src/output/video.rs b/client/src/output/video.rs index 6a6a6d3..f6868bd 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -23,19 +23,20 @@ pub struct MonitorWindow { impl MonitorWindow { pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result { - gst::init()?; // idempotent + gst::init()?; // idempotent - /*────────────────────── window ──────────────────────*/ - let window = el.create_window( - WindowAttributes::default() - .with_title(format!("Lesavka‑monitor‑{id}")) - .with_decorations(false), - )?; + /* ---------- Wayland / X11 window ------------- */ + let window = el + .create_window( + WindowAttributes::default() + .with_title(format!("Lesavka‑monitor‑{id}")) + .with_decorations(true), + )?; - /*────────────────────── pipeline ────────────────────*/ + /* ---------- GStreamer pipeline --------------- */ let caps = gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") - .field("alignment", &"au") + .field("alignment", &"au") .build(); let pipeline = gst::parse::launch(DESC)? @@ -44,27 +45,29 @@ impl MonitorWindow { let src = pipeline .by_name("src") - .expect("appsink") + .expect("appsrc element not found") .downcast::() - .expect("appsink down‑cast"); + .expect("appsrc down‑cast"); src.set_caps(Some(&caps)); - src.set_format(gst::Format::Time); // running‑time PTS - src.set_property("blocksize", &0u32); // whole AU per buffer - src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE); + src.set_format(gst::Format::Time); // downstream clock + src.set_property("blocksize", &0u32); // one AU per buffer + // NOTE: set_property() and friends return (), so no `?` + src.set_property("do-timestamp", &true); + src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE); pipeline.set_state(gst::State::Playing)?; Ok(Self { id, _window: window, src }) } - /// Push one encoded access‑unit into the local pipeline. + /// Feed one H.264 access‑unit into the pipeline. pub fn push_packet(&self, pkt: VideoPacket) { - let buf = gst::Buffer::from_slice(pkt.data); // no PTS manipulation - if let Some(mut b) = buf.get_mut() { + // Mutable so we can set the PTS: + let mut buf = gst::Buffer::from_slice(pkt.data); + if let Some(ref mut b) = buf.get_mut() { b.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); } - let _ = self.src.push_buffer(buf); + let _ = self.src.push_buffer(buf); // ignore Eos / flushing } - } diff --git a/scripts/lesavka-core.sh b/scripts/daemon/lesavka-core.sh similarity index 100% rename from scripts/lesavka-core.sh rename to scripts/daemon/lesavka-core.sh diff --git a/scripts/install-client.sh b/scripts/install/client.sh similarity index 94% rename from scripts/install-client.sh rename to scripts/install/client.sh index 739d55c..2484a42 100755 --- a/scripts/install-client.sh +++ b/scripts/install/client.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# install-client.sh - install and setup all client related apps and environments +# scripts/install/client.sh - install and setup all client related apps and environments set -euo pipefail ORIG_USER=${SUDO_USER:-$(id -un)} diff --git a/scripts/install-server.sh b/scripts/install/server.sh similarity index 92% rename from scripts/install-server.sh rename to scripts/install/server.sh index b258bd3..ae991d8 100755 --- a/scripts/install-server.sh +++ b/scripts/install/server.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# install-server.sh - install and setup all server related apps and environments +# scripts/install/server.sh - install and setup all server related apps and environments set -euo pipefail ORIG_USER=${SUDO_USER:-$(id -un)} @@ -45,7 +45,7 @@ LEFT_TAG=${TAGS[0]} RIGHT_TAG=${TAGS[1]} sudo tee /etc/udev/rules.d/85-gc311.rules >/dev/null < 5. Install binaries" sudo install -Dm755 "$SRC_DIR/server/target/release/lesavka-server" /usr/local/bin/lesavka-server -sudo install -Dm755 "$SRC_DIR/scripts/lesavka-core.sh" /usr/local/bin/lesavka-core.sh +sudo install -Dm755 "$SRC_DIR/scripts/daemon/lesavka-core.sh" /usr/local/bin/lesavka-core.sh echo "==> 6a. Systemd units - lesavka-core" cat <<'UNIT' | sudo tee /etc/systemd/system/lesavka-core.service >/dev/null @@ -105,7 +105,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=debug,lesavka_server::usb_gadget=info +Environment=RUST_LOG=lesavka_server=debug,lesavka_server::video=trace,lesavka_server::usb_gadget=info Environment=RUST_BACKTRACE=1 Restart=always RestartSec=5 diff --git a/scripts/manual/usb-reset.sh b/scripts/manual/usb-reset.sh new file mode 100644 index 0000000..dc305aa --- /dev/null +++ b/scripts/manual/usb-reset.sh @@ -0,0 +1,7 @@ +grpcurl \ + -plaintext \ + -import-path ./../../common/proto \ + -proto lesavka.proto \ + -d '{}' \ + 192.168.42.253:50051 \ + lesavka.Relay/ResetUsb diff --git a/scripts/manual/video-check.sh b/scripts/manual/video-check.sh new file mode 100644 index 0000000..9dcc60e --- /dev/null +++ b/scripts/manual/video-check.sh @@ -0,0 +1,8 @@ +grpcurl -plaintext \ + -d '{"id":0,"max_bitrate":6000}' \ + -import-path ./../../common/proto -proto lesavka.proto \ + 192.168.42.253:50051 \ + lesavka.relay.Relay/CaptureVideo \ +| jq -r '.data' +| base64 -d \ +| gst-launch-1.0 fdsrc ! h264parse ! avdec_h264 ! autovideosink diff --git a/server/src/main.rs b/server/src/main.rs index 8fb95ac..b0c0c2c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -159,7 +159,7 @@ impl Relay for Handler { _ => return Err(Status::invalid_argument("monitor id must be 0 or 1")), }; info!("🎥 streaming {dev}"); - let s = video::spawn_camera(dev, id, 6_000) + let s = video::eye_ball(dev, id, 6_000) .await .map_err(|e| Status::internal(format!("{e:#}")))?; Ok(Response::new(Box::pin(s))) @@ -197,7 +197,7 @@ async fn main() -> anyhow::Result<()> { info!("🌐 lesavka‑server listening on 0.0.0.0:50051"); Server::builder() .tcp_nodelay(true) - .max_frame_size(Some(256*1024)) + .max_frame_size(Some(2*1024*1024)) .add_service(RelayServer::new(handler)) .add_service(ReflBuilder::configure().build_v1().unwrap()) .serve(([0,0,0,0], 50051).into()) diff --git a/server/src/video.rs b/server/src/video.rs index 1b04939..90486dc 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -8,101 +8,90 @@ use gst::log; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -use tracing::{debug, enabled, Level}; +use tracing::{debug, enabled, trace, Level}; static START: std::sync::OnceLock = std::sync::OnceLock::new(); -pub async fn spawn_camera( +pub async fn eye_ball( dev: &str, id: u32, _max_bitrate_kbit: u32, ) -> anyhow::Result>> { gst::init().context("gst init")?; - // IMPORTANT: keep one AU per buffer, include regular SPS/PPS let desc = format!( "v4l2src device={dev} io-mode=mmap ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse config-interval=1 ! \ appsink name=sink emit-signals=true drop=true sync=false" - ); + ); let pipeline = gst::parse::launch(&desc)? .downcast::() - .expect("pipeline down-cast"); + .expect("pipeline down‑cast"); let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() - .expect("appsink downcast"); + .expect("appsink down‑cast"); let (tx, rx) = tokio::sync::mpsc::channel(256); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() - // .new_sample(move |sink| { - // let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - // let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; - - // let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - // let pkt = VideoPacket { - // id, - // pts: buffer.pts().nseconds() / 1_000, // → µs - // data: map.as_slice().to_vec(), - // }; - // // ignore back‑pressure: drop oldest if channel is full - // let _ = tx.try_send(Ok(pkt)); - // Ok(gst::FlowSuccess::Ok) - // }) - // .build(), .new_sample(move |sink| { + /* -------- pull frame ---------- */ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; - let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO)); - let pts_us = buffer - .pts() - .unwrap_or(gst::ClockTime::ZERO) - .saturating_sub(origin) - .nseconds() / 1_000; + /* -------- basic counters ------ */ + static FRAME: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n % 120 == 0 { + trace!("eye{id}: delivered {n} frames"); + } + /* -------- map once, reuse ----- */ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + // write first IDR to disk (quick sanity check) + if n == 0 { + std::fs::write(format!("/tmp/eye{id}-idr.h264"), map.as_slice()).ok(); + } + + /* -------- detect SPS / IDR ---- */ if enabled!(Level::DEBUG) { - if let Some(slice) = map.as_slice().get(0..5) { - match slice[4] & 0b1_1111 { - // 0x07 = SPS, 0x05 = IDR (Annex-B “byte-stream”) - 0x07 | 0x05 => { - debug!( - "🎞️ monitor {id}: got NAL {:02X}, {} bytes", - slice[4], - map.as_slice().len() - ); - } - _ => {} + if let Some(&nal) = map.as_slice().get(4) { + if (nal & 0x1F) == 0x05 /* IDR */ { + debug!("eye{id}: IDR"); } } } + /* -------- timestamps ---------- */ + let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO)); + let pts_us = buffer + .pts() + .unwrap_or(gst::ClockTime::ZERO) + .saturating_sub(origin) + .nseconds() + / 1_000; + + /* -------- ship over gRPC ----- */ let pkt = VideoPacket { id, pts: pts_us, data: map.as_slice().to_vec(), }; - let _ = tx.try_send(Ok(pkt)); // drop on overflow + let _ = tx.try_send(Ok(pkt)); + Ok(gst::FlowSuccess::Ok) }) .build(), ); - // gst::debug_remove_default_log_function(); - // gst::debug_add_default_log_function(|lvl, cat, msg| { - // println!("[GST] {lvl:?} {cat}: {msg}"); - // }); - // std::env::set_var("GST_DEBUG", "v4l2src:4,h264parse:3"); - pipeline.set_state(gst::State::Playing)?; - Ok(ReceiverStream::new(rx)) }