lesavka/server/src/audio.rs

60 lines
2.1 KiB
Rust
Raw Normal View History

2025-06-29 03:46:34 -05:00
use anyhow::Context;
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::AudioPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, trace};
use gst::prelude::*;
const PIPE: &str = "appsrc name=audsrc is-live=true do-timestamp=true ! aacparse ! queue ! appsink name=asink emit-signals=true";
pub struct AudioStream {
_pipe: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
}
impl Stream for AudioStream {
type Item = Result<AudioPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
pub async fn eye_ear(
dev: &str,
id: u32,
) -> anyhow::Result<AudioStream> {
gst::init().context("gst init")?;
let desc = format!("v4l2src device={dev} io-mode=mmap !
queue ! tsdemux ! aacparse !
queue ! appsink name=asink emit-signals=true");
let pipe: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink = pipe.by_name("asink").expect("asink").downcast::<gst_app::AppSink>().unwrap();
let (tx, rx) = tokio::sync::mpsc::channel(8192);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |s| {
let samp = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buf = samp.buffer().ok_or(gst::FlowError::Error)?;
let map = buf.map_readable().map_err(|_| gst::FlowError::Error)?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds()/1_000;
let pkt = AudioPacket { id, pts, data: map.as_slice().to_vec() };
trace!("srv→grpc audio-{id} {}", pkt.data.len());
let _ = tx.try_send(Ok(pkt));
Ok(gst::FlowSuccess::Ok)
}).build()
);
pipe.set_state(gst::State::Playing)?;
Ok(AudioStream{ _pipe: pipe, inner: ReceiverStream::new(rx) })
}