monitor support - partial

This commit is contained in:
Brad Stein 2025-06-21 05:21:57 -05:00
parent ddb9ed5871
commit 208e7e4447
15 changed files with 307 additions and 38 deletions

View File

@ -18,6 +18,12 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
tracing-appender = "0.2"
futures = "0.3"
evdev = "0.13"
gstreamer = { version = "0.23", features = ["v1_22"] }
gstreamer-app = "0.23"
gstreamer-video = "0.23"
gstreamer-wayland = "0.23"
winit = "0.30"
raw-window-handle = "0.6"
[build-dependencies]
prost-build = "0.13"

View File

@ -9,9 +9,10 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::Request;
use tracing::{debug, error, info, warn};
use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport};
use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket};
use crate::input::inputs::InputAggregator;
use navka_client::input::inputs::InputAggregator;
use navka_client::output::video::MonitorWindow;
pub struct NavkaClientApp {
aggregator: Option<InputAggregator>,
@ -63,9 +64,33 @@ impl NavkaClientApp {
} else { futures::future::pending::<()>().await }
};
/* video windows use a dedicated eventloop thread */
let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::<VideoPacket>();
let (event_tx, event_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let el = EventLoop::with_user_event();
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
el.run(move |_, _, _| {
while let Ok(pkt) = video_rx.try_recv() {
match pkt.id {
0 => win0.push_packet(pkt),
1 => win1.push_packet(pkt),
_ => {}
}
}
});
// never returns
});
let vid_loop = Self::video_loop(self.server_addr.clone(), video_tx);
tokio::select! {
_ = kbd_loop => unreachable!(),
_ = mou_loop => unreachable!(),
_ = vid_loop => unreachable!(),
_ = suicide => unreachable!(),
// _ = suicide => { warn!("devmode timeout"); std::process::exit(0) },
r = agg_task => {
@ -127,6 +152,32 @@ impl NavkaClientApp {
}
}
/*──────────────── monitor stream ────────────────*/
async fn video_loop(addr: String, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
loop {
match RelayClient::connect(addr.clone()).await {
Ok(mut cli) => {
for monitor_id in 0..=1 {
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => let _ = tx.send(p),
Err(e) => { error!("video {monitor_id}: {e}"); break }
}
}
}
Err(e) => error!("video {monitor_id}: {e}"),
}
}
}
Err(e) => error!("video connect: {e}"),
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
}

View File

@ -4,5 +4,6 @@
pub mod app;
pub mod input;
pub mod output;
pub use app::NavkaClientApp;

View File

@ -3,12 +3,13 @@
#![forbid(unsafe_code)]
use anyhow::Result;
use navka_client::NavkaClientApp;
use std::{env, fs::OpenOptions, path::Path};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use navka_client::NavkaClientApp;
#[tokio::main]
async fn main() -> Result<()> {
/*------------- common filter & stderr layer ------------------------*/

3
client/src/output/mod.rs Normal file
View File

@ -0,0 +1,3 @@
// client/src/output/mod.rs
pub mod video;

View File

@ -0,0 +1,49 @@
// client/src/output/video.rs
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use navka_common::navka::VideoPacket;
use winit::{
event_loop::EventLoop,
window::{Window, WindowBuilder},
};
pub struct MonitorWindow {
id: u32,
_window: Window,
src: gst_app::AppSrc,
}
impl MonitorWindow {
pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result<Self> {
gst::init()?;
let window = WindowBuilder::new()
.with_title(format!("Lesavkamonitor{id}"))
.with_decorations(false)
.build(el)?;
// appsrc -> decode -> convert -> waylandsink
let pipeline = gst::parse_launch(
"appsrc name=src is-live=true format=time do-timestamp=true ! \
queue ! h264parse ! avdec_h264 ! videoconvert ! \
waylandsink name=sink sync=false",
)?
.downcast::<gst::Pipeline>()?;
let src = pipeline.by_name("src").unwrap().downcast::<gst_app::AppSrc>()?;
src.set_latency(gst::ClockTime::NONE);
src.set_property_format(gst::Format::Time);
pipeline.set_state(gst::State::Playing)?;
Ok(Self { id, _window: window, src })
}
pub fn push_packet(&self, pkt: VideoPacket) {
if let Ok(mut buf) = gst::Buffer::from_mut_slice(pkt.data) {
buf.set_pts(gst::ClockTime::from_microseconds(pkt.pts));
let _ = self.src.push_buffer(buf);
}
}
}

View File

@ -1,19 +1,15 @@
[[bin]]
name = "navka-common"
path = "build.rs"
[package]
name = "navka_common"
version = "0.1.0"
name = "navka-common"
version = "0.2.0"
edition = "2024"
build = "build.rs"
[dependencies]
tonic = "0.13"
tonic = { version = "0.13", features = ["transport"] }
prost = "0.13"
[build-dependencies]
tonic-build = "0.13"
tonic-build = { version = "0.13", features = ["prost"] }
[lib]
name = "navka_common"

View File

@ -1,3 +1,9 @@
fn main() {
tonic_build::compile_protos("proto/navka.proto").unwrap();
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile_protos(&["proto/navka.proto"], &["proto"])
.expect("prost build failed");
}

View File

@ -1,11 +1,26 @@
syntax = "proto3";
package navka;
// smaller, fixed-size payloads less allocation and simpler decoding
// smaller, fixed-size payloads -> less allocation and simpler decoding
message KeyboardReport { bytes data = 1; } // exactly 8 bytes
message MouseReport { bytes data = 1; } // exactly 4 bytes
// ------------ video ------------
message MonitorRequest {
uint32 id = 1; // 0/1 for now
uint32 max_bitrate = 2; // kb/s client hints, server may ignore
}
message VideoPacket {
uint32 id = 1; // monitor id
uint64 pts = 2; // monotonically increasing microseconds
bytes data = 3; // full H.264 accessunit (lengthprefixed)
}
service Relay {
rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport);
rpc StreamMouse (stream MouseReport) returns (stream MouseReport);
}
// client requests one monitor, server pushes raw H.264
rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket);
}

View File

@ -1,4 +1,6 @@
// Re-export the code generated by build.rs (navka.rs, relay.rs, etc.)
// common/src/lib.rs
pub mod navka {
include!(concat!(env!("OUT_DIR"), "/navka.rs"));
}

View File

@ -17,3 +17,7 @@ tracing = { version = "0.1", features = ["std"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
libc = "0.2"
futures-util = "0.3"
gstreamer = { version = "0.23", features = ["v1_22"] }
gstreamer-app = "0.23"
gstreamer-video = "0.23"
udev = "0.8"

4
server/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
// server/src/lib.rs
pub mod video;
pub mod usb_reset;

View File

@ -2,20 +2,61 @@
// sever/src/main.rs
#![forbid(unsafe_code)]
use std::{io::ErrorKind, pin::Pin, sync::Arc, panic::AssertUnwindSafe};
use std::time::Duration;
use anyhow::Context;
use futures_util::Stream;
use std::{pin::Pin, sync::Arc, time::Duration};
use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{error, info, trace, warn, debug};
use tracing::{error, info, trace, warn};
use tracing_subscriber::{fmt, EnvFilter};
use futures_util::FutureExt;
use udev::{Enumerator, MonitorBuilder};
use navka_server::{video, usb_reset};
use navka_common::navka::{
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket,
};
/*─────────────────── GC311 discovery ───────────────────*/
fn list_gc311_devices() -> anyhow::Result<Vec<String>> {
let mut v = Vec::new();
for entry in std::fs::read_dir("/sys/class/video4linux")? {
let path = entry?.path();
let name = std::fs::read_to_string(path.join("name"))?;
if name.to_lowercase().contains("gc311") {
v.push(
path.file_name()
.unwrap()
.to_string_lossy()
.replace("video", "/dev/video"),
);
}
}
v.sort();
Ok(v)
}
/// background task: whenever GC311 disappears, cycle USB port
async fn monitor_gc311_disconnect() -> anyhow::Result<()> {
let mut mon = MonitorBuilder::new()?
.match_subsystem("usb")?
.match_property("PRODUCT", "7ca/3311/*")? // vendor: 0x07ca, device 0x3311
.listen()?;
while let Some(ev) = mon.next() {
if ev.event_type() == udev::EventType::Remove {
if let (Some(bus), Some(dev)) = (ev.attribute_value("busnum"), ev.attribute_value("devnum")) {
usb_reset::cycle_port(bus.to_str().unwrap(), dev.to_str().unwrap());
}
}
}
Ok(())
}
/*─────────────────── tonic service ─────────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
@ -25,6 +66,7 @@ struct Handler {
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
async fn stream_keyboard(
&self,
@ -76,39 +118,51 @@ impl Relay for Handler {
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
let devs = list_gc311_devices()
.map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
let dev = devs
.get(r.id as usize)
.ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
.to_owned();
info!("🎥 streaming {dev} at ≤{}kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
Ok(Response::new(Box::pin(s) as _))
}
}
#[tokio::main]
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
fmt().with_env_filter(
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env().unwrap_or_else(|_| //{
EnvFilter::new(
// "navka_client=trace,\
// navka_server=trace,\
// tonic=debug,\
// h2=debug,\
// tower=debug",
"navka_server=info"
)
//}
),
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("navka_server=info")),
)
// .with_target(true)
// .with_thread_ids(true)
// .with_file(true)
.init();
/* autocycle task */
tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
let kb = OpenOptions::new()
.write(true)
// .read(true)
// .custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg0")
.await?;
let ms = OpenOptions::new()
.write(true)
// .read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1")
.await?;

21
server/src/usb_reset.rs Normal file
View File

@ -0,0 +1,21 @@
// server/src/usb_reset.rs
//! Helpers to (re)power GC311 if udev reports a disconnect.
use std::process::Command;
use tracing::{info, warn};
/// Try to cycle power on the USB port where the GC311 was.
/// Works only when the Pi is behind a hub that supports perport power control.
/// Uses `uhubctl`, which must be `apt install uhubctl`.
pub fn cycle_port(busnum: &str, devnum: &str) {
warn!("GC311 disappeared ({}:{}), cycling port power", busnum, devnum);
// example: uhubctl -l 1-1 -p 2 -a cycle
// mapping port# requires lsusb -t; we fall back to a generic bus reset
let _ = Command::new("sh")
.arg("-c")
.arg(format!("echo 0 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized; \
sleep 1; \
echo 1 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized"))
.status();
info!("port cycle issued");
}

56
server/src/video.rs Normal file
View File

@ -0,0 +1,56 @@
// server/src/video.rs
use anyhow::Context;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use navka_common::navka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
pub async fn spawn_camera(
dev: &str,
id: u32,
max_bitrate_kbit: u32,
) -> anyhow::Result<ReceiverStream<Result<VideoPacket, Status>>> {
gst::init().context("gst init")?;
// v4l2src → H.264 already, we only parse & relay
let desc = format!(
"v4l2src device={dev} io-mode=dmabuf ! queue ! h264parse config-interval=1 ! \
video/x-h264,stream-format=byte-stream,profile=baseline,level=4,\
bitrate={max_bitrate_kbit}000 ! appsink name=sink emit-signals=true sync=false"
);
let pipeline = gst::parse_launch(&desc)?.downcast::<gst::Pipeline>()?;
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink downcast");
let (tx, rx) = tokio::sync::mpsc::channel(256);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let pkt = VideoPacket {
id,
pts: buffer.pts().nseconds() / 1_000, // →µs
data: map.as_slice().to_vec(),
};
// ignore backpressure: drop oldest if channel is full
let _ = tx.try_send(Ok(pkt));
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline.set_state(gst::State::Playing)?;
Ok(ReceiverStream::new(rx))
}