use anyhow::Context; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::AudioPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, trace}; use gst::prelude::*; const EAR_ID: [&str; 2] = ["l", "r"]; const PIPE: &str = "appsrc name=audsrc is-live=true do-timestamp=true ! aacparse ! queue ! appsink name=asink emit-signals=true"; pub struct AudioStream { _pipe: gst::Pipeline, inner: ReceiverStream>, } impl Stream for AudioStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } pub async fn eye_ear( dev: &str, id: u32, ) -> anyhow::Result { let ear = EAR_ID[id as usize]; gst::init().context("gst init")?; let desc = format!( "v4l2src device=\"{dev}\" io-mode=mmap do-timestamp=true ! queue ! \ tsdemux name=d d.audio_0 ! queue ! \ aacparse ! queue ! \ appsink name=asink emit-signals=true max-buffers=64 drop=true" ); let pipe: gst::Pipeline = gst::parse::launch(&desc)? .downcast() .expect("pipeline"); let sink = pipe.by_name("asink").expect("asink").downcast::().unwrap(); let (tx, rx) = tokio::sync::mpsc::channel(8192); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |s| { let samp = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buf = samp.buffer().ok_or(gst::FlowError::Error)?; let map = buf.map_readable().map_err(|_| gst::FlowError::Error)?; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds()/1_000; let pkt = AudioPacket { id, pts, data: map.as_slice().to_vec() }; debug!(target:"lesavka_server::audio", eye = id, bytes = pkt.data.len(), pts = pkt.pts, "⬆️ pushed audio sample ear-{ear}"); let _ = tx.try_send(Ok(pkt)); Ok(gst::FlowSuccess::Ok) }).build() ); pipe.set_state(gst::State::Playing)?; Ok(AudioStream{ _pipe: pipe, inner: ReceiverStream::new(rx) }) }