use super::*; fn rebase_probe_audio_packet_pts( pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser, source_pts_us: u64, packet_duration_us: u64, lag_cap: Duration, ) -> crate::live_capture_clock::RebasedSourcePts { pts_rebaser.rebase_with_packet_duration(Some(source_pts_us), packet_duration_us, lag_cap) } #[cfg(test)] pub(super) fn rebase_probe_packet_pts( pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser, source_pts_us: u64, lag_cap: Duration, ) -> u64 { pts_rebaser .rebase_with_packet_duration(Some(source_pts_us), 1, lag_cap) .packet_pts_us } pub struct SyncProbeCapture { pipeline: gst::Pipeline, running: Arc, video_queue: FreshPacketQueue, audio_queue: FreshPacketQueue, video_thread: Option>, audio_thread: Option>, } impl SyncProbeCapture { pub fn new(camera: CameraConfig, schedule: PulseSchedule, duration: Duration) -> Result { gst::init().context("gst init")?; let pipeline = build_pipeline(camera, &schedule)?; let video_src = pipeline .by_name("sync_probe_video_src") .context("missing sync probe video appsrc")? .downcast::() .expect("video appsrc"); let video_sink = pipeline .by_name("sync_probe_video_sink") .context("missing sync probe video appsink")? .downcast::() .expect("video appsink"); pipeline .set_state(gst::State::Playing) .context("starting sync probe pipeline")?; let running = Arc::new(AtomicBool::new(true)); let probe_start = Instant::now(); let video_queue = FreshPacketQueue::new(PROBE_VIDEO_QUEUE); let audio_queue = FreshPacketQueue::new(PROBE_AUDIO_QUEUE); let video_thread = spawn_video_thread( video_src, video_sink, camera, schedule.clone(), duration, probe_start, running.clone(), video_queue.clone(), ); let audio_thread = spawn_audio_thread( schedule, duration, probe_start, running.clone(), audio_queue.clone(), ); Ok(Self { pipeline, running, video_queue, audio_queue, video_thread: Some(video_thread), audio_thread: Some(audio_thread), }) } pub fn video_queue(&self) -> FreshPacketQueue { self.video_queue.clone() } pub fn audio_queue(&self) -> FreshPacketQueue { self.audio_queue.clone() } } impl Drop for SyncProbeCapture { fn drop(&mut self) { self.running.store(false, Ordering::Release); self.video_queue.close(); self.audio_queue.close(); let _ = self.pipeline.set_state(gst::State::Null); if let Some(handle) = self.video_thread.take() { let _ = handle.join(); } if let Some(handle) = self.audio_thread.take() { let _ = handle.join(); } } } fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result { let video_caps = format!( "video/x-raw,format=RGB,width={},height={},framerate={}/1", camera.width, camera.height, camera.fps.max(1) ); let video_branch = match camera.codec { CameraCodec::Mjpeg => format!( "appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \ queue max-size-buffers=4 leaky=downstream ! videoconvert ! \ jpegenc quality=90 ! image/jpeg,parsed=true,width={},height={},framerate={}/1 ! \ appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true", camera.width, camera.height, camera.fps.max(1), ), CameraCodec::H264 => format!( "appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \ queue max-size-buffers=4 leaky=downstream ! videoconvert ! \ {} ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true", pick_h264_encoder(camera.fps.max(1))? ), }; gst::parse::launch(&video_branch) .with_context(|| format!("building sync probe pipeline: {video_branch}"))? .downcast::() .map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline")) } fn pick_h264_encoder(fps: u32) -> Result { if gst::ElementFactory::find("x264enc").is_some() { return Ok(format!( "x264enc tune=zerolatency speed-preset=ultrafast bitrate=2500 key-int-max={}", fps.max(1) )); } if gst::ElementFactory::find("openh264enc").is_some() { return Ok("openh264enc bitrate=2500000".to_string()); } if gst::ElementFactory::find("v4l2h264enc").is_some() { return Ok("v4l2h264enc".to_string()); } bail!("no usable H.264 encoder found for sync probe") } fn spawn_video_thread( src: gst_app::AppSrc, sink: gst_app::AppSink, camera: CameraConfig, schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(); let lag_cap = crate::live_capture_clock::upstream_source_lag_cap(); let dark_frame = build_dark_probe_frame(camera.width as usize, camera.height as usize); let regular_pulse_frame = build_regular_probe_frame(camera.width as usize, camera.height as usize); let marker_pulse_frame = build_marker_probe_frame(camera.width as usize, camera.height as usize); let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let mut frame_index = 0u64; while running.load(Ordering::Acquire) { let pts = schedule.frame_pts(frame_index, camera.fps.max(1)); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let frame = if schedule.flash_active(pts) && schedule.pulse_is_marker(pts) { &marker_pulse_frame } else if schedule.flash_active(pts) { ®ular_pulse_frame } else { &dark_frame }; let mut buffer = gst::Buffer::from_slice(frame.clone()); if let Some(meta) = buffer.get_mut() { let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64); meta.set_pts(Some(pts_time)); meta.set_dts(Some(pts_time)); meta.set_duration(Some(gst::ClockTime::from_nseconds( frame_step.as_nanos() as u64 ))); } if src.push_buffer(buffer).is_err() { break; } if let Some(sample) = freshest_probe_video_sample(&sink) && let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { let source_pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; let packet_duration_us = buffer .duration() .map(|ts| (ts.nseconds() / 1_000).max(1)) .unwrap_or(frame_step.as_micros().min(u64::MAX as u128) as u64); let packet = VideoPacket { id: 2, pts: pts_rebaser .rebase_with_packet_duration( Some(source_pts_us), packet_duration_us, lag_cap, ) .packet_pts_us, data: map.as_slice().to_vec(), ..Default::default() }; let _ = queue.push(packet, Duration::ZERO); } frame_index = frame_index.saturating_add(1); } let _ = src.end_of_stream(); queue.close(); }) } fn freshest_probe_video_sample(sink: &gst_app::AppSink) -> Option { let mut newest = sink.try_pull_sample(gst::ClockTime::from_mseconds(250)); while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) { newest = Some(sample); } newest } fn spawn_audio_thread( schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(); let lag_cap = crate::live_capture_clock::upstream_source_lag_cap(); let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = (AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1); let mut chunk_index = 0u64; while running.load(Ordering::Acquire) { let pts = chunk_duration.saturating_mul(chunk_index as u32); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk); let timing = rebase_probe_audio_packet_pts( &pts_rebaser, pts.as_micros().min(u64::MAX as u128) as u64, chunk_duration.as_micros().min(u64::MAX as u128) as u64, lag_cap, ); let packet = AudioPacket { id: 0, pts: timing.packet_pts_us, data: chunk, }; let _ = queue.push(packet, Duration::ZERO); chunk_index = chunk_index.saturating_add(1); } queue.close(); }) }