This commit is contained in:
Brad Stein 2025-06-25 16:23:50 -05:00
parent 4ef2c9337e
commit 26abca78bb
6 changed files with 78 additions and 25 deletions

View File

@ -83,7 +83,7 @@ impl LesavkaClientApp {
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
el.run(move |_: Event<()>, _| {
let _ = el.run(move |_: Event<()>, _| {
while let Ok(pkt) = video_rx.try_recv() {
match pkt.id {
0 => win0.push_packet(pkt),

View File

@ -10,8 +10,21 @@ use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use lesavka_client::LesavkaClientApp;
fn ensure_runtime_dir() {
if env::var_os("XDG_RUNTIME_DIR").is_none() {
eprintln!(
"Error: $XDG_RUNTIME_DIR is not set. \
Launch the client from a regular desktop session or export it manually, \
e.g. `export XDG_RUNTIME_DIR=/run/user/$(id -u)`."
);
std::process::exit(1);
}
}
#[tokio::main]
async fn main() -> Result<()> {
ensure_runtime_dir();
/*------------- common filter & stderr layer ------------------------*/
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(

View File

@ -24,10 +24,16 @@ impl MonitorWindow {
.with_decorations(false)
)?;
// appsrc -> decode -> convert -> waylandsink
let desc = "appsrc name=src is-live=true format=time do-timestamp=true ! \
queue ! h264parse ! decodebin ! videoconvert ! \
waylandsink name=sink sync=false";
// appsrc -> decode -> convert -> autovideosink
let desc = if std::env::var_os("XDG_RUNTIME_DIR").is_some() {
// graphical
"appsrc name=src is-live=true format=time do-timestamp=true ! \
queue ! h264parse ! decodebin ! videoconvert ! autovideosink sync=false"
} else {
// headless / debugging over ssh
"appsrc name=src is-live=true format=time do-timestamp=true ! \
fakesink sync=false"
};
let pipeline = gst::parse::launch(desc)?
.downcast::<gst::Pipeline>()

View File

@ -4,7 +4,7 @@ set -euo pipefail
ORIG_USER=${SUDO_USER:-$(id -un)}
echo "==> 1a. Base packages"
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc pipewire pipewire-pulse tailscale base-devel
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc pipewire pipewire-pulse tailscale base-devel gst-plugin-libav
if ! command -v yay >/dev/null 2>&1; then
echo "==> 1b. installing yay from AUR ..."
sudo -u "$ORIG_USER" bash -c '
@ -12,7 +12,7 @@ if ! command -v yay >/dev/null 2>&1; then
cd yay && makepkg -si --noconfirm'
fi
echo "==> 2a. GC311 kerneldriver tweaks"
echo "==> 2a. Kerneldriver tweaks"
cat <<'EOF' | sudo tee /etc/modprobe.d/gc311-stream.conf >/dev/null
options uvcvideo quirks=0x200 timeout=10000
EOF
@ -104,7 +104,7 @@ After=network.target lesavka-core.service
[Service]
ExecStart=/usr/local/bin/lesavka-server
Restart=always
Environment="RUST_LOG=lesavka_server=trace"
Environment=RUST_LOG=lesavka_server=debug,lesavka_server::usb_gadget=trace
User=root
[Install]

View File

@ -15,6 +15,7 @@ anyhow = "1.0"
lesavka_common = { path = "../common" }
tracing = { version = "0.1", features = ["std"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
tracing-appender = "0.2"
libc = "0.2"
futures-util = "0.3"
gstreamer = { version = "0.23", features = ["v1_22"] }

View File

@ -2,14 +2,15 @@
// sever/src/main.rs
#![forbid(unsafe_code)]
use futures_util::{Stream, StreamExt};
use std::{pin::Pin, sync::Arc, time::Duration};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{pin::Pin, sync::Arc, time::Duration};
use futures_util::{Stream, StreamExt};
use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, trace, warn};
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use udev::{MonitorBuilder};
use lesavka_server::{usb_gadget::UsbGadget, video};
@ -20,6 +21,32 @@ use lesavka_common::lesavka::{
MonitorRequest, VideoPacket,
};
// ───────────────── helper ─────────────────────
fn ensure_log_file() -> anyhow::Result<()> {
let file = std::fs::OpenOptions::new()
.create(true).write(true).truncate(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, _guard) = non_blocking(file);
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=trace"));
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true);
let file_layer = fmt::layer()
.with_writer(file_writer)
.with_ansi(false);
tracing_subscriber::registry()
.with(env_filter)
.with(console_layer)
.with(file_layer)
.init();
Ok(())
}
/*─────────────────── tonic service ─────────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
@ -30,14 +57,20 @@ struct Handler {
impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
let kb = OpenOptions::new().write(true).open("/dev/hidg0").await?;
let ms = OpenOptions::new().write(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1").await?;
gadget.cycle()?;
tokio::time::sleep(Duration::from_secs(1)).await;
let kb = OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg0").await?;
let ms = OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1").await?;
Ok(Self { kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(false),
did_cycle: AtomicBool::new(true),
})
}
}
@ -66,21 +99,19 @@ impl Relay for Handler {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
// kb.lock().await.write_all(&pkt.data).await?;
for attempt in 0..50 {
loop {
match kb.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("⌨️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
},
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
tokio::time::sleep(Duration::from_millis(20)).await;
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
if attempt == 49 {
return Err(Status::internal("hidg0 stayed BUSY"));
}
}
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
}
@ -108,10 +139,11 @@ impl Relay for Handler {
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tokio::time::sleep(Duration::from_micros(500)).await;
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(format!("hidg1: {e}"))),
Err(e) => return Err(Status::internal(e.to_string())),
}
}
let _ = tx.send(Ok(pkt)).await;
@ -161,6 +193,7 @@ impl Relay for Handler {
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
ensure_log_file()?;
fmt().with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info")),