298 lines
10 KiB
Rust

use super::*;
fn rebase_probe_audio_packet_pts(
pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser,
source_pts_us: u64,
packet_duration_us: u64,
lag_cap: Duration,
) -> crate::live_capture_clock::RebasedSourcePts {
pts_rebaser.rebase_with_packet_duration(Some(source_pts_us), packet_duration_us, lag_cap)
}
#[cfg(test)]
pub(super) fn rebase_probe_packet_pts(
pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser,
source_pts_us: u64,
lag_cap: Duration,
) -> u64 {
pts_rebaser
.rebase_with_packet_duration(Some(source_pts_us), 1, lag_cap)
.packet_pts_us
}
pub struct SyncProbeCapture {
pipeline: gst::Pipeline,
running: Arc<AtomicBool>,
video_queue: FreshPacketQueue<VideoPacket>,
audio_queue: FreshPacketQueue<AudioPacket>,
video_thread: Option<JoinHandle<()>>,
audio_thread: Option<JoinHandle<()>>,
}
impl SyncProbeCapture {
pub fn new(camera: CameraConfig, schedule: PulseSchedule, duration: Duration) -> Result<Self> {
gst::init().context("gst init")?;
let pipeline = build_pipeline(camera, &schedule)?;
let video_src = pipeline
.by_name("sync_probe_video_src")
.context("missing sync probe video appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("video appsrc");
let video_sink = pipeline
.by_name("sync_probe_video_sink")
.context("missing sync probe video appsink")?
.downcast::<gst_app::AppSink>()
.expect("video appsink");
pipeline
.set_state(gst::State::Playing)
.context("starting sync probe pipeline")?;
let running = Arc::new(AtomicBool::new(true));
let probe_start = Instant::now();
let video_queue = FreshPacketQueue::new(PROBE_VIDEO_QUEUE);
let audio_queue = FreshPacketQueue::new(PROBE_AUDIO_QUEUE);
let video_thread = spawn_video_thread(
video_src,
video_sink,
camera,
schedule.clone(),
duration,
probe_start,
running.clone(),
video_queue.clone(),
);
let audio_thread = spawn_audio_thread(
schedule,
duration,
probe_start,
running.clone(),
audio_queue.clone(),
);
Ok(Self {
pipeline,
running,
video_queue,
audio_queue,
video_thread: Some(video_thread),
audio_thread: Some(audio_thread),
})
}
pub fn video_queue(&self) -> FreshPacketQueue<VideoPacket> {
self.video_queue.clone()
}
pub fn audio_queue(&self) -> FreshPacketQueue<AudioPacket> {
self.audio_queue.clone()
}
}
impl Drop for SyncProbeCapture {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
self.video_queue.close();
self.audio_queue.close();
let _ = self.pipeline.set_state(gst::State::Null);
if let Some(handle) = self.video_thread.take() {
let _ = handle.join();
}
if let Some(handle) = self.audio_thread.take() {
let _ = handle.join();
}
}
}
fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result<gst::Pipeline> {
let video_caps = format!(
"video/x-raw,format=GRAY8,width={},height={},framerate={}/1",
camera.width,
camera.height,
camera.fps.max(1)
);
let video_branch = match camera.codec {
CameraCodec::Mjpeg => format!(
"appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \
queue max-size-buffers=4 leaky=downstream ! videoconvert ! \
jpegenc quality=90 ! image/jpeg,parsed=true,width={},height={},framerate={}/1 ! \
appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true",
camera.width,
camera.height,
camera.fps.max(1),
),
CameraCodec::H264 => format!(
"appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \
queue max-size-buffers=4 leaky=downstream ! videoconvert ! \
{} ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true",
pick_h264_encoder(camera.fps.max(1))?
),
};
gst::parse::launch(&video_branch)
.with_context(|| format!("building sync probe pipeline: {video_branch}"))?
.downcast::<gst::Pipeline>()
.map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline"))
}
fn pick_h264_encoder(fps: u32) -> Result<String> {
if gst::ElementFactory::find("x264enc").is_some() {
return Ok(format!(
"x264enc tune=zerolatency speed-preset=ultrafast bitrate=2500 key-int-max={}",
fps.max(1)
));
}
if gst::ElementFactory::find("openh264enc").is_some() {
return Ok("openh264enc bitrate=2500000".to_string());
}
if gst::ElementFactory::find("v4l2h264enc").is_some() {
return Ok("v4l2h264enc".to_string());
}
bail!("no usable H.264 encoder found for sync probe")
}
fn spawn_video_thread(
src: gst_app::AppSrc,
sink: gst_app::AppSink,
camera: CameraConfig,
schedule: PulseSchedule,
duration: Duration,
probe_start: Instant,
running: Arc<AtomicBool>,
queue: FreshPacketQueue<VideoPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default();
let lag_cap = crate::live_capture_clock::upstream_source_lag_cap();
let dark_frame = build_dark_probe_frame(camera.width as usize, camera.height as usize);
let regular_pulse_frame =
build_regular_probe_frame(camera.width as usize, camera.height as usize);
let marker_pulse_frame =
build_marker_probe_frame(camera.width as usize, camera.height as usize);
let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1)));
let mut frame_index = 0u64;
while running.load(Ordering::Acquire) {
let pts = schedule.frame_pts(frame_index, camera.fps.max(1));
if pts > duration {
break;
}
let deadline = probe_start + pts;
if let Some(remaining) = deadline.checked_duration_since(Instant::now())
&& !remaining.is_zero()
{
thread::sleep(remaining);
}
let frame = if schedule.flash_active(pts) && schedule.pulse_is_marker(pts) {
&marker_pulse_frame
} else if schedule.flash_active(pts) {
&regular_pulse_frame
} else {
&dark_frame
};
let mut buffer = gst::Buffer::from_slice(frame.clone());
if let Some(meta) = buffer.get_mut() {
let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64);
meta.set_pts(Some(pts_time));
meta.set_dts(Some(pts_time));
meta.set_duration(Some(gst::ClockTime::from_nseconds(
frame_step.as_nanos() as u64
)));
}
if src.push_buffer(buffer).is_err() {
break;
}
if let Some(sample) = freshest_probe_video_sample(&sink)
&& let Some(buffer) = sample.buffer()
&& let Ok(map) = buffer.map_readable()
{
let source_pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
let packet_duration_us = buffer
.duration()
.map(|ts| (ts.nseconds() / 1_000).max(1))
.unwrap_or(frame_step.as_micros().min(u64::MAX as u128) as u64);
let packet = VideoPacket {
id: 2,
pts: pts_rebaser
.rebase_with_packet_duration(
Some(source_pts_us),
packet_duration_us,
lag_cap,
)
.packet_pts_us,
data: map.as_slice().to_vec(),
..Default::default()
};
let _ = queue.push(packet, Duration::ZERO);
}
frame_index = frame_index.saturating_add(1);
}
let _ = src.end_of_stream();
queue.close();
})
}
fn freshest_probe_video_sample(sink: &gst_app::AppSink) -> Option<gst::Sample> {
let mut newest = sink.try_pull_sample(gst::ClockTime::from_mseconds(250));
while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) {
newest = Some(sample);
}
newest
}
fn spawn_audio_thread(
schedule: PulseSchedule,
duration: Duration,
probe_start: Instant,
running: Arc<AtomicBool>,
queue: FreshPacketQueue<AudioPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default();
let lag_cap = crate::live_capture_clock::upstream_source_lag_cap();
let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS);
let samples_per_chunk =
(AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1);
let mut chunk_index = 0u64;
while running.load(Ordering::Acquire) {
let pts = chunk_duration.saturating_mul(chunk_index as u32);
if pts > duration {
break;
}
let deadline = probe_start + pts;
if let Some(remaining) = deadline.checked_duration_since(Instant::now())
&& !remaining.is_zero()
{
thread::sleep(remaining);
}
let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk);
let timing = rebase_probe_audio_packet_pts(
&pts_rebaser,
pts.as_micros().min(u64::MAX as u128) as u64,
chunk_duration.as_micros().min(u64::MAX as u128) as u64,
lag_cap,
);
let packet = AudioPacket {
id: 0,
pts: timing.packet_pts_us,
data: chunk,
};
let _ = queue.push(packet, Duration::ZERO);
chunk_index = chunk_index.saturating_add(1);
}
queue.close();
})
}