HID reset modified

This commit is contained in:
Brad Stein 2025-06-27 06:56:08 -05:00
parent 4157bd68d3
commit 20e33f03d5
4 changed files with 102 additions and 229 deletions

View File

@ -1,4 +1,4 @@
// common/build.rs
fn main() {
tonic_build::configure()

View File

@ -1,26 +1,18 @@
syntax = "proto3";
package lesavka;
// smaller, fixed-size payloads -> less allocation and simpler decoding
message KeyboardReport { bytes data = 1; } // exactly 8 bytes
message MouseReport { bytes data = 1; } // exactly 4 bytes
message KeyboardReport { bytes data = 1; }
message MouseReport { bytes data = 1; }
// ------------ video ------------
message MonitorRequest {
uint32 id = 1; // 0/1 for now
uint32 max_bitrate = 2; // kb/s client hints, server may ignore
}
message MonitorRequest { uint32 id = 1; uint32 max_bitrate = 2; }
message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message VideoPacket {
uint32 id = 1; // monitor id
uint64 pts = 2; // monotonically increasing microseconds
bytes data = 3; // full H.264 accessunit (lengthprefixed)
}
message ResetUsbRequest {} // empty body
message ResetUsbReply { bool ok = 1; } // true = success
service Relay {
rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport);
rpc StreamMouse (stream MouseReport) returns (stream MouseReport);
// client requests one monitor, server pushes raw H.264
rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket);
rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport);
rpc StreamMouse (stream MouseReport) returns (stream MouseReport);
rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket);
rpc ResetUsb (ResetUsbRequest) returns (ResetUsbReply);
}

View File

@ -22,3 +22,7 @@ gstreamer = { version = "0.23", features = ["v1_22"] }
gstreamer-app = { version = "0.23", features = ["v1_22"] }
gstreamer-video = "0.23"
udev = "0.8"
prost-types = "0.13"
[build-dependencies]
prost-build = "0.13"

View File

@ -1,113 +1,72 @@
//! lesavka-server
// sever/src/main.rs
//! lesavkaserver **autocycle disabled**
// server/src/main.rs
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc, time::Duration};
use futures_util::{Stream, StreamExt};
use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use std::sync::atomic::AtomicBool;
use anyhow::Context as _;
use tracing::{info, trace, warn, error};
use futures_util::{Stream, StreamExt};
use tokio::{
fs::{OpenOptions},
io::AsyncWriteExt,
sync::Mutex,
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use tracing::{info, warn, error, trace};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use lesavka_common::lesavka::{
ResetUsbRequest, ResetUsbReply,
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport, MonitorRequest, VideoPacket,
};
use lesavka_server::{usb_gadget::UsbGadget, video};
use lesavka_common::lesavka::{
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket,
};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
const AUTO_CYCLE: bool = false;
// ───────────────── helper ─────────────────────
/*──────────────── logging ───────────────────*/
fn init_tracing() -> anyhow::Result<WorkerGuard> {
// 1. create file writer + guard
let file = std::fs::OpenOptions::new()
.create(true).write(true).append(true)
.create(true).append(true).write(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = non_blocking(file);
// 2. build subscriber once
let env = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| {EnvFilter::new("lesavka_server=info")});
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true);
let file_layer = fmt::layer()
.with_writer(file_writer)
.with_ansi(false);
let (file_writer, guard) = tracing_appender::non_blocking(file);
tracing_subscriber::registry()
.with(env)
.with(console_layer)
.with(file_layer)
.with(EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info")))
.with(fmt::layer().with_target(true).with_thread_ids(true))
.with(fmt::layer().with_writer(file_writer).with_ansi(false))
.init();
Ok(guard)
}
/*──────────────── helpers ───────────────────*/
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
const MAX_ATTEMPTS: usize = 200; // ≈ 10s (@50ms)
for attempt in 1..=MAX_ATTEMPTS {
for attempt in 1..=200 { // ≈10s
match OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
.write(true).custom_flags(libc::O_NONBLOCK).open(path).await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
return Ok(f);
}
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
trace!("⏳ {path} busy, retry #{attempt}");
trace!("⏳ {path} busy retry #{attempt}");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => return Err(e).with_context(|| format!("💥 opening {path}")),
Err(e) => return Err(e).with_context(|| format!("opening {path}")),
}
}
Err(anyhow::anyhow!("💥 timeout waiting for {path} to become available"))
Err(anyhow::anyhow!("timeout waiting for {path}"))
}
fn owners_of(path: &str) -> String {
use std::{fs, os::unix::fs::MetadataExt};
let Ok(target_ino) = fs::metadata(path).map(|m| m.ino()) else { return "-".into() };
let mut pids = Vec::new();
if let Ok(entries) = fs::read_dir("/proc") {
for e in entries.flatten() {
let file_name = e.file_name();
let pid = file_name.to_string_lossy().into_owned();
if !pid.chars().all(|c| c.is_ascii_digit()) { continue }
let fd_dir = e.path().join("fd");
for fd in fs::read_dir(fd_dir).into_iter().flatten().flatten() {
if let Ok(meta) = fd.metadata() {
if meta.ino() == target_ino {
pids.push(pid.to_string());
break;
}
}
}
}
}
if pids.is_empty() { "-".into() } else { pids.join(",") }
}
async fn wait_configured(ctrl: &str, limit_ms: u64) -> anyhow::Result<()> {
for _ in 0..=limit_ms/50 {
let s = UsbGadget::state(ctrl)?;
if s.trim() == "configured" { return Ok(()) }
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(anyhow::anyhow!("host never configured"))
}
/*─────────────────── tonic service ─────────────────────*/
/*──────────────── Handler ───────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
@ -116,45 +75,19 @@ struct Handler {
}
impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
info!("🛠️ Handler::make - cycling gadget ...");
gadget.cycle()?;
let ctrl = UsbGadget::find_controller()?;
let configured = wait_configured(&ctrl, 10_000).await.is_ok();
if configured {
info!("✅ host enumerated (configured)");
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure may boot without host
} else {
warn!("⚠️ host absent queuing HID traffic");
}
let state = UsbGadget::wait_state_any(&ctrl, 5_000)?;
match state.as_str() {
"configured" => info!("✅ host enumerated (configured)"),
"not attached" => warn!("⚠️ host absent HID writes will be queued"),
_ => warn!("⚠️ unexpected UDC state: {state}"),
info!("🛠️ AUTO_CYCLE disabled  no initial reset");
}
tokio::time::sleep(Duration::from_millis(1000)).await;
info!("🛠️ opening HID endpoints ...");
info!("🛠️ opening HID endpoints …");
let kb = open_with_retry("/dev/hidg0").await?;
let ms = open_with_retry("/dev/hidg1").await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(true),
})
}
async fn degraded(gadget: UsbGadget) -> anyhow::Result<Self> {
info!("🛠️ Handler::degraded - opening HID endpoints ...");
let kb = open_with_retry("/dev/hidg0").await?;
let ms = open_with_retry("/dev/hidg1").await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
@ -164,54 +97,32 @@ impl Handler {
}
}
/*──────────────── gRPC service ─────────────*/
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more autoreset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send + Sync>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
// self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
if !self.did_cycle.swap(true, Ordering::SeqCst) {
self.gadget
.cycle()
.map_err(|e| Status::internal(e.to_string()))?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
let (tx, rx) =
tokio::sync::mpsc::channel::<Result<KeyboardReport, Status>>(32);
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let gadget = self.gadget.clone();
let ctrl = UsbGadget::find_controller().unwrap_or_default();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
/* try to write once */
let mut guard = kb.lock().await;
if let Err(e) = guard.write_all(&pkt.data).await {
/* host vanished ? */
if matches!(e.raw_os_error(),
Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) {
warn!("host disappeared recycling gadget");
gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
wait_configured(&ctrl, 10_000).await
.map_err(|e| Status::internal(e.to_string()))?;
/* reopen endpoint & swap into mutex */
*guard = open_with_retry("/dev/hidg0").await
.map_err(|e| Status::internal(e.to_string()))?;
} else {
return Err(Status::internal(e.to_string()));
}
if let Err(e) = kb.lock().await.write_all(&pkt.data).await {
warn!("⌨️ write failed: {e} (dropped)");
}
drop(guard); /* release lock before await */
tx.send(Ok(pkt)).await.ok(); /* besteffort echo */
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
@ -219,37 +130,20 @@ impl Relay for Handler {
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) =
tokio::sync::mpsc::channel::<Result<MouseReport, Status>>(4096);
let (tx, rx) = tokio::sync::mpsc::channel(4096);
let ms = self.ms.clone();
let gadget = self.gadget.clone();
let ctrl = UsbGadget::find_controller().unwrap_or_default();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
/* try to write once */
let mut guard = ms.lock().await;
if let Err(e) = guard.write_all(&pkt.data).await {
/* host vanished ? */
if matches!(e.raw_os_error(),
Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) {
warn!("host disappeared recycling gadget");
gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
wait_configured(&ctrl, 10_000).await
.map_err(|e| Status::internal(e.to_string()))?;
/* reopen endpoint & swap into mutex */
*guard = open_with_retry("/dev/hidg1").await
.map_err(|e| Status::internal(e.to_string()))?;
} else {
return Err(Status::internal(e.to_string()));
}
if let Err(e) = ms.lock().await.write_all(&pkt.data).await {
warn!("🖱️ write failed: {e} (dropped)");
}
drop(guard); /* release lock before await */
tx.send(Ok(pkt)).await.ok(); /* besteffort echo */
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
@ -257,71 +151,54 @@ impl Relay for Handler {
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
// let devs = loop {
// let list = list_gc311_devices()
// .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
// if !list.is_empty() { break list; }
// tokio::time::sleep(Duration::from_secs(1)).await;
// };
// let dev = devs
// .get(r.id as usize)
// .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
// .to_owned();
let dev = match r.id {
let id = req.into_inner().id;
let dev = match id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
}
.to_string();
info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
};
info!("🎥 streaming {dev}");
let s = video::spawn_camera(dev, id, 6_000)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
Ok(Response::new(Box::pin(s) as _))
/*────────────── USBreset RPC ───────────*/
async fn reset_usb(
&self,
_req: Request<ResetUsbRequest>,
) -> Result<Response<ResetUsbReply>, Status> {
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => Ok(Response::new(ResetUsbReply { ok: true })),
Err(e) => {
error!("💥 cycle failed: {e:#}");
Err(Status::internal(e.to_string()))
}
}
}
}
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
/*──────────────── main ───────────────────────*/
#[tokio::main(worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
let _log_guard: WorkerGuard = init_tracing()?;
let _guard = init_tracing()?;
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
/* autocycle task */
// tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
let gadget = UsbGadget::new("lesavka");
let handler = match Handler::make(gadget.clone()).await {
Ok(h) => h,
Err(e) => {
error!("💥 handler degraded (host offline): {e:#}");
Handler::degraded(gadget.clone()).await?
}
};
let gadget = UsbGadget::new("lesavka");
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavkaserver listening on 0.0.0.0:50051");
if let Err(e) = Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(256 * 1024))
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await
{
error!("💥 gRPC server exited: {e:#}");
std::process::exit(1);
}
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(256*1024))
.add_service(RelayServer::new(handler))
.serve(([0,0,0,0], 50051).into())
.await?;
Ok(())
}