Stream fixes

This commit is contained in:
Brad Stein 2025-06-28 19:40:48 -05:00
parent c742d364e4
commit b2ccf79768
3 changed files with 27 additions and 4 deletions

View File

@ -109,7 +109,7 @@ Environment=RUST_LOG=lesavka_server::video=trace,lesavka_server::usb_gadget=debu
Environment=RUST_BACKTRACE=1
Restart=always
RestartSec=5
StandardError=append:/tmp/lesavka-server.log
StandardError=append:/tmp/lesavka-server.stderr
StartLimitIntervalSec=30
StartLimitBurst=10
User=root

View File

@ -119,7 +119,7 @@ impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send + Sync>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send>>;
async fn stream_keyboard(
&self,

View File

@ -9,15 +9,38 @@ use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, enabled, trace, Level};
use futures_util::Stream;
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
// shut down nicely avoids the “dispose element … READY/PLAYING …” spam
let _ = self._pipeline.set_state(gst::State::Null);
}
}
pub async fn eye_ball(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
) -> anyhow::Result<ReceiverStream<Result<VideoPacket, Status>>> {
) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
@ -118,5 +141,5 @@ pub async fn eye_ball(
None => continue,
}
}
Ok(ReceiverStream::new(rx))
Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) })
}