diff --git a/client/Cargo.toml b/client/Cargo.toml index 4412f80..e33d8cc 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,6 +18,12 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tracing-appender = "0.2" futures = "0.3" evdev = "0.13" +gstreamer = { version = "0.23", features = ["v1_22"] } +gstreamer-app = "0.23" +gstreamer-video = "0.23" +gstreamer-wayland = "0.23" +winit = "0.30" +raw-window-handle = "0.6" [build-dependencies] prost-build = "0.13" diff --git a/client/src/app.rs b/client/src/app.rs index 595c7c5..506399f 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -9,9 +9,10 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::Request; use tracing::{debug, error, info, warn}; -use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport}; +use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket}; -use crate::input::inputs::InputAggregator; +use navka_client::input::inputs::InputAggregator; +use navka_client::output::video::MonitorWindow; pub struct NavkaClientApp { aggregator: Option, @@ -63,9 +64,33 @@ impl NavkaClientApp { } else { futures::future::pending::<()>().await } }; + /* video windows use a dedicated event‑loop thread */ + let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (event_tx, event_rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let el = EventLoop::with_user_event(); + let win0 = MonitorWindow::new(0, &el).expect("win0"); + let win1 = MonitorWindow::new(1, &el).expect("win1"); + + el.run(move |_, _, _| { + while let Ok(pkt) = video_rx.try_recv() { + match pkt.id { + 0 => win0.push_packet(pkt), + 1 => win1.push_packet(pkt), + _ => {} + } + } + }); + // never returns + }); + + let vid_loop = Self::video_loop(self.server_addr.clone(), video_tx); + tokio::select! { _ = kbd_loop => unreachable!(), _ = mou_loop => unreachable!(), + _ = vid_loop => unreachable!(), _ = suicide => unreachable!(), // _ = suicide => { warn!("dev‑mode timeout"); std::process::exit(0) }, r = agg_task => { @@ -127,6 +152,32 @@ impl NavkaClientApp { } } + /*──────────────── monitor stream ────────────────*/ + async fn video_loop(addr: String, tx: tokio::sync::mpsc::UnboundedSender) { + loop { + match RelayClient::connect(addr.clone()).await { + Ok(mut cli) => { + for monitor_id in 0..=1 { + let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; + match cli.capture_video(Request::new(req)).await { + Ok(mut stream) => { + while let Some(pkt) = stream.get_mut().message().await.transpose() { + match pkt { + Ok(p) => let _ = tx.send(p), + Err(e) => { error!("video {monitor_id}: {e}"); break } + } + } + } + Err(e) => error!("video {monitor_id}: {e}"), + } + } + } + Err(e) => error!("video connect: {e}"), + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + #[inline(always)] async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 75d23fa..b87b645 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -4,5 +4,6 @@ pub mod app; pub mod input; +pub mod output; pub use app::NavkaClientApp; diff --git a/client/src/main.rs b/client/src/main.rs index 67f4aef..24c13f1 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -3,12 +3,13 @@ #![forbid(unsafe_code)] use anyhow::Result; -use navka_client::NavkaClientApp; use std::{env, fs::OpenOptions, path::Path}; use tracing_appender::non_blocking; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use navka_client::NavkaClientApp; + #[tokio::main] async fn main() -> Result<()> { /*------------- common filter & stderr layer ------------------------*/ diff --git a/client/src/output/mod.rs b/client/src/output/mod.rs new file mode 100644 index 0000000..f244f31 --- /dev/null +++ b/client/src/output/mod.rs @@ -0,0 +1,3 @@ +// client/src/output/mod.rs + +pub mod video; \ No newline at end of file diff --git a/client/src/output/video.rs b/client/src/output/video.rs new file mode 100644 index 0000000..13b0398 --- /dev/null +++ b/client/src/output/video.rs @@ -0,0 +1,49 @@ +// client/src/output/video.rs + +use gstreamer as gst; +use gstreamer_app as gst_app; +use gst::prelude::*; +use navka_common::navka::VideoPacket; +use winit::{ + event_loop::EventLoop, + window::{Window, WindowBuilder}, +}; + +pub struct MonitorWindow { + id: u32, + _window: Window, + src: gst_app::AppSrc, +} + +impl MonitorWindow { + pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result { + gst::init()?; + + let window = WindowBuilder::new() + .with_title(format!("Lesavka‑monitor‑{id}")) + .with_decorations(false) + .build(el)?; + + // appsrc -> decode -> convert -> waylandsink + let pipeline = gst::parse_launch( + "appsrc name=src is-live=true format=time do-timestamp=true ! \ + queue ! h264parse ! avdec_h264 ! videoconvert ! \ + waylandsink name=sink sync=false", + )? + .downcast::()?; + + let src = pipeline.by_name("src").unwrap().downcast::()?; + src.set_latency(gst::ClockTime::NONE); + src.set_property_format(gst::Format::Time); + + pipeline.set_state(gst::State::Playing)?; + Ok(Self { id, _window: window, src }) + } + + pub fn push_packet(&self, pkt: VideoPacket) { + if let Ok(mut buf) = gst::Buffer::from_mut_slice(pkt.data) { + buf.set_pts(gst::ClockTime::from_microseconds(pkt.pts)); + let _ = self.src.push_buffer(buf); + } + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml index a5f7dd4..3347adb 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,19 +1,15 @@ -[[bin]] -name = "navka-common" -path = "build.rs" - [package] -name = "navka_common" -version = "0.1.0" +name = "navka-common" +version = "0.2.0" edition = "2024" build = "build.rs" [dependencies] -tonic = "0.13" +tonic = { version = "0.13", features = ["transport"] } prost = "0.13" [build-dependencies] -tonic-build = "0.13" +tonic-build = { version = "0.13", features = ["prost"] } [lib] name = "navka_common" diff --git a/common/build.rs b/common/build.rs index 33d28f6..aa9d918 100644 --- a/common/build.rs +++ b/common/build.rs @@ -1,3 +1,9 @@ + + fn main() { - tonic_build::compile_protos("proto/navka.proto").unwrap(); + tonic_build::configure() + .build_server(true) + .build_client(true) + .compile_protos(&["proto/navka.proto"], &["proto"]) + .expect("prost build failed"); } diff --git a/common/proto/navka.proto b/common/proto/navka.proto index 8ce78c4..0f30f88 100644 --- a/common/proto/navka.proto +++ b/common/proto/navka.proto @@ -1,11 +1,26 @@ syntax = "proto3"; package navka; -// smaller, fixed-size payloads ⇒ less allocation and simpler decoding +// smaller, fixed-size payloads -> less allocation and simpler decoding message KeyboardReport { bytes data = 1; } // exactly 8 bytes message MouseReport { bytes data = 1; } // exactly 4 bytes +// ------------ video ------------ +message MonitorRequest { + uint32 id = 1; // 0/1 for now + uint32 max_bitrate = 2; // kb/s – client hints, server may ignore +} + +message VideoPacket { + uint32 id = 1; // monitor id + uint64 pts = 2; // monotonically increasing micro‑seconds + bytes data = 3; // full H.264 access‑unit (length‑prefixed) +} + service Relay { rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); rpc StreamMouse (stream MouseReport) returns (stream MouseReport); -} \ No newline at end of file + + // client requests one monitor, server pushes raw H.264 + rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); +} diff --git a/common/src/lib.rs b/common/src/lib.rs index d024a88..75f2691 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,4 +1,6 @@ // Re-export the code generated by build.rs (navka.rs, relay.rs, etc.) +// common/src/lib.rs + pub mod navka { include!(concat!(env!("OUT_DIR"), "/navka.rs")); } diff --git a/server/Cargo.toml b/server/Cargo.toml index 143c739..41e434d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,3 +17,7 @@ tracing = { version = "0.1", features = ["std"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } libc = "0.2" futures-util = "0.3" +gstreamer = { version = "0.23", features = ["v1_22"] } +gstreamer-app = "0.23" +gstreamer-video = "0.23" +udev = "0.8" \ No newline at end of file diff --git a/server/src/lib.rs b/server/src/lib.rs new file mode 100644 index 0000000..d11ab4f --- /dev/null +++ b/server/src/lib.rs @@ -0,0 +1,4 @@ +// server/src/lib.rs + +pub mod video; +pub mod usb_reset; \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 55e0c22..9b11bcb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,20 +2,61 @@ // sever/src/main.rs #![forbid(unsafe_code)] -use std::{io::ErrorKind, pin::Pin, sync::Arc, panic::AssertUnwindSafe}; -use std::time::Duration; +use anyhow::Context; +use futures_util::Stream; +use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex}; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; -use tracing::{error, info, trace, warn, debug}; +use tracing::{error, info, trace, warn}; use tracing_subscriber::{fmt, EnvFilter}; -use futures_util::FutureExt; +use udev::{Enumerator, MonitorBuilder}; + +use navka_server::{video, usb_reset}; use navka_common::navka::{ relay_server::{Relay, RelayServer}, KeyboardReport, MouseReport, + MonitorRequest, VideoPacket, }; +/*─────────────────── GC311 discovery ───────────────────*/ +fn list_gc311_devices() -> anyhow::Result> { + let mut v = Vec::new(); + for entry in std::fs::read_dir("/sys/class/video4linux")? { + let path = entry?.path(); + let name = std::fs::read_to_string(path.join("name"))?; + if name.to_lowercase().contains("gc311") { + v.push( + path.file_name() + .unwrap() + .to_string_lossy() + .replace("video", "/dev/video"), + ); + } + } + v.sort(); + Ok(v) +} + +/// background task: whenever GC311 disappears, cycle USB port +async fn monitor_gc311_disconnect() -> anyhow::Result<()> { + let mut mon = MonitorBuilder::new()? + .match_subsystem("usb")? + .match_property("PRODUCT", "7ca/3311/*")? // vendor: 0x07ca, device 0x3311 + .listen()?; + + while let Some(ev) = mon.next() { + if ev.event_type() == udev::EventType::Remove { + if let (Some(bus), Some(dev)) = (ev.attribute_value("busnum"), ev.attribute_value("devnum")) { + usb_reset::cycle_port(bus.to_str().unwrap(), dev.to_str().unwrap()); + } + } + } + Ok(()) +} + +/*─────────────────── tonic service ─────────────────────*/ struct Handler { kb: Arc>, ms: Arc>, @@ -25,6 +66,7 @@ struct Handler { impl Relay for Handler { type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; + type CaptureVideoStream = Pin> + Send + Sync + 'static>>; async fn stream_keyboard( &self, @@ -76,39 +118,51 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + + async fn capture_video( + &self, + req: Request, + ) -> Result, Status> { + let r = req.into_inner(); + + let devs = list_gc311_devices() + .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?; + + let dev = devs + .get(r.id as usize) + .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))? + .to_owned(); + + info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate); + + let s = video::spawn_camera(&dev, r.id, r.max_bitrate) + .await + .map_err(|e| Status::internal(format!("{e:#?}")))?; + + Ok(Response::new(Box::pin(s) as _)) + } } -#[tokio::main] +/*─────────────────── main ──────────────────────────────*/ +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()> { + /* logging */ fmt().with_env_filter( - // honour RUST_LOG but fall back to very chatty defaults - EnvFilter::try_from_default_env().unwrap_or_else(|_| //{ - EnvFilter::new( - // "navka_client=trace,\ - // navka_server=trace,\ - // tonic=debug,\ - // h2=debug,\ - // tower=debug", - "navka_server=info" - ) - //} - ), + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("navka_server=info")), ) - // .with_target(true) - // .with_thread_ids(true) - // .with_file(true) .init(); + /* auto‑cycle task */ + tokio::spawn(async { monitor_gc311_disconnect().await.ok(); }); + let kb = OpenOptions::new() .write(true) - // .read(true) - // .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg0") .await?; let ms = OpenOptions::new() .write(true) - // .read(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg1") .await?; diff --git a/server/src/usb_reset.rs b/server/src/usb_reset.rs new file mode 100644 index 0000000..b941dd4 --- /dev/null +++ b/server/src/usb_reset.rs @@ -0,0 +1,21 @@ +// server/src/usb_reset.rs +//! Helpers to (re‑)power GC311 if udev reports a disconnect. + +use std::process::Command; +use tracing::{info, warn}; + +/// Try to cycle power on the USB port where the GC311 was. +/// Works only when the Pi is behind a hub that supports per‑port power control. +/// Uses `uhubctl`, which must be `apt install uhubctl`. +pub fn cycle_port(busnum: &str, devnum: &str) { + warn!("GC311 disappeared ({}:{}), cycling port power", busnum, devnum); + // example: uhubctl -l 1-1 -p 2 -a cycle + // mapping port# requires lsusb -t; we fall back to a generic bus reset + let _ = Command::new("sh") + .arg("-c") + .arg(format!("echo 0 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized; \ + sleep 1; \ + echo 1 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized")) + .status(); + info!("port cycle issued"); +} diff --git a/server/src/video.rs b/server/src/video.rs new file mode 100644 index 0000000..d690e1a --- /dev/null +++ b/server/src/video.rs @@ -0,0 +1,56 @@ +// server/src/video.rs + +use anyhow::Context; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gst::prelude::*; +use navka_common::navka::VideoPacket; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Status; + +pub async fn spawn_camera( + dev: &str, + id: u32, + max_bitrate_kbit: u32, +) -> anyhow::Result>> { + gst::init().context("gst init")?; + + // v4l2src → H.264 already, we only parse & relay + let desc = format!( + "v4l2src device={dev} io-mode=dmabuf ! queue ! h264parse config-interval=1 ! \ + video/x-h264,stream-format=byte-stream,profile=baseline,level=4,\ + bitrate={max_bitrate_kbit}000 ! appsink name=sink emit-signals=true sync=false" + ); + let pipeline = gst::parse_launch(&desc)?.downcast::()?; + + let sink = pipeline + .by_name("sink") + .expect("appsink") + .dynamic_cast::() + .expect("appsink downcast"); + + let (tx, rx) = tokio::sync::mpsc::channel(256); + + sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; + + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let pkt = VideoPacket { + id, + pts: buffer.pts().nseconds() / 1_000, // → µs + data: map.as_slice().to_vec(), + }; + // ignore back‑pressure: drop oldest if channel is full + let _ = tx.try_send(Ok(pkt)); + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + pipeline.set_state(gst::State::Playing)?; + + Ok(ReceiverStream::new(rx)) +}