use anyhow::Context; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::AudioPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, trace}; use gst::prelude::*; const EAR_ID: [&str; 2] = ["l", "r"]; const PIPE: &str = "appsrc name=audsrc is-live=true do-timestamp=true ! aacparse ! queue ! appsink name=asink emit-signals=true"; pub struct AudioStream { _pipe: gst::Pipeline, inner: ReceiverStream>, } impl Stream for AudioStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } // pub async fn eye_ear( // dev: &str, // id: u32, // ) -> anyhow::Result { // let ear = EAR_ID[id as usize]; // gst::init().context("gst init")?; // let desc = format!( // "v4l2src device=\"{dev}\" io-mode=mmap do-timestamp=true ! \ // video/mpegts,systemstream=true,packetsize=188 ! \ // tsdemux name=d d.audio_0 ! queue ! \ // aacparse ! queue ! \ // appsink name=asink emit-signals=true max-buffers=64 drop=true" // ); // let pipe: gst::Pipeline = gst::parse::launch(&desc)? // .downcast() // .expect("pipeline"); // let sink = pipe.by_name("asink").expect("asink").downcast::().unwrap(); // let (tx, rx) = tokio::sync::mpsc::channel(8192); // sink.set_callbacks( // gst_app::AppSinkCallbacks::builder() // .new_sample(move |s| { // let samp = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; // let buf = samp.buffer().ok_or(gst::FlowError::Error)?; // let map = buf.map_readable().map_err(|_| gst::FlowError::Error)?; // let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds()/1_000; // let pkt = AudioPacket { id, pts, data: map.as_slice().to_vec() }; // debug!(target:"lesavka_server::audio", // eye = id, bytes = pkt.data.len(), pts = pkt.pts, // "⬆️ pushed audio sample ear-{ear}"); // let _ = tx.try_send(Ok(pkt)); // Ok(gst::FlowSuccess::Ok) // }).build() // ); // pipe.set_state(gst::State::Playing)?; // Ok(AudioStream{ _pipe: pipe, inner: ReceiverStream::new(rx) }) // } pub async fn eye_ear(_dev: &str, _id: u32) -> anyhow::Result { use tokio_stream::StreamExt; let (_tx, rx) = tokio::sync::mpsc::channel(1); Ok(AudioStream { _pipe: gst::Pipeline::new(), inner: ReceiverStream::new(rx), // always empty }) }