use super::*; pub struct SyncProbeCapture { pipeline: gst::Pipeline, running: Arc, video_queue: FreshPacketQueue, audio_queue: FreshPacketQueue, video_thread: Option>, audio_thread: Option>, } impl SyncProbeCapture { pub fn new(camera: CameraConfig, schedule: PulseSchedule, duration: Duration) -> Result { gst::init().context("gst init")?; let pipeline = build_pipeline(camera, &schedule)?; let video_src = pipeline .by_name("sync_probe_video_src") .context("missing sync probe video appsrc")? .downcast::() .expect("video appsrc"); let video_sink = pipeline .by_name("sync_probe_video_sink") .context("missing sync probe video appsink")? .downcast::() .expect("video appsink"); let audio_src = pipeline .by_name("sync_probe_audio_src") .context("missing sync probe audio appsrc")? .downcast::() .expect("audio appsrc"); let audio_sink = pipeline .by_name("sync_probe_audio_sink") .context("missing sync probe audio appsink")? .downcast::() .expect("audio appsink"); pipeline .set_state(gst::State::Playing) .context("starting sync probe pipeline")?; let running = Arc::new(AtomicBool::new(true)); let probe_start = Instant::now(); let video_queue = FreshPacketQueue::new(PROBE_VIDEO_QUEUE); let audio_queue = FreshPacketQueue::new(PROBE_AUDIO_QUEUE); let video_thread = spawn_video_thread( video_src, video_sink, camera, schedule.clone(), duration, probe_start, running.clone(), video_queue.clone(), ); let audio_thread = spawn_audio_thread( audio_src, audio_sink, schedule, duration, probe_start, running.clone(), audio_queue.clone(), ); Ok(Self { pipeline, running, video_queue, audio_queue, video_thread: Some(video_thread), audio_thread: Some(audio_thread), }) } pub fn video_queue(&self) -> FreshPacketQueue { self.video_queue.clone() } pub fn audio_queue(&self) -> FreshPacketQueue { self.audio_queue.clone() } } impl Drop for SyncProbeCapture { fn drop(&mut self) { self.running.store(false, Ordering::Release); self.video_queue.close(); self.audio_queue.close(); let _ = self.pipeline.set_state(gst::State::Null); if let Some(handle) = self.video_thread.take() { let _ = handle.join(); } if let Some(handle) = self.audio_thread.take() { let _ = handle.join(); } } } fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result { let video_caps = format!( "video/x-raw,format=RGB,width={},height={},framerate={}/1", camera.width, camera.height, camera.fps.max(1) ); let video_branch = match camera.codec { CameraCodec::Mjpeg => format!( "appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \ queue max-size-buffers=4 leaky=downstream ! videoconvert ! \ jpegenc quality=90 ! image/jpeg,parsed=true,width={},height={},framerate={}/1 ! \ appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true", camera.width, camera.height, camera.fps.max(1), ), CameraCodec::H264 => format!( "appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \ queue max-size-buffers=4 leaky=downstream ! videoconvert ! \ {} ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true", pick_h264_encoder(camera.fps.max(1))? ), }; let audio_branch = format!( "appsrc name=sync_probe_audio_src is-live=true format=time do-timestamp=false \ caps=audio/x-raw,format=S16LE,layout=interleaved,channels={},rate={} ! \ queue max-size-buffers=8 leaky=downstream ! \ audioconvert ! audioresample ! audio/x-raw,channels=2,rate={} ! \ {} ! aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate={},channels=2 ! \ appsink name=sync_probe_audio_sink emit-signals=false sync=false max-buffers=32 drop=true", AUDIO_CHANNELS, AUDIO_SAMPLE_RATE, AUDIO_SAMPLE_RATE, pick_aac_encoder()?, AUDIO_SAMPLE_RATE, ); let desc = format!("{video_branch} {audio_branch}"); gst::parse::launch(&desc) .with_context(|| format!("building sync probe pipeline: {desc}"))? .downcast::() .map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline")) } fn pick_h264_encoder(fps: u32) -> Result { if gst::ElementFactory::find("x264enc").is_some() { return Ok(format!( "x264enc tune=zerolatency speed-preset=ultrafast bitrate=2500 key-int-max={}", fps.max(1) )); } if gst::ElementFactory::find("openh264enc").is_some() { return Ok("openh264enc bitrate=2500000".to_string()); } if gst::ElementFactory::find("v4l2h264enc").is_some() { return Ok("v4l2h264enc".to_string()); } bail!("no usable H.264 encoder found for sync probe") } fn pick_aac_encoder() -> Result<&'static str> { [ "avenc_aac bitrate=128000", "fdkaacenc bitrate=128000", "faac bitrate=128000", ] .into_iter() .find(|entry| { let name = entry.split_ascii_whitespace().next().unwrap_or_default(); gst::ElementFactory::find(name).is_some() }) .ok_or_else(|| anyhow::anyhow!("no usable AAC encoder found for sync probe")) } fn spawn_video_thread( src: gst_app::AppSrc, sink: gst_app::AppSink, camera: CameraConfig, schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { let dark_frame = build_dark_probe_frame(camera.width as usize, camera.height as usize); let regular_pulse_frame = build_regular_probe_frame(camera.width as usize, camera.height as usize); let marker_pulse_frame = build_marker_probe_frame(camera.width as usize, camera.height as usize); let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let mut frame_index = 0u64; while running.load(Ordering::Acquire) { let pts = schedule.frame_pts(frame_index, camera.fps.max(1)); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let frame = if schedule.flash_active(pts) && schedule.pulse_is_marker(pts) { &marker_pulse_frame } else if schedule.flash_active(pts) { ®ular_pulse_frame } else { &dark_frame }; let mut buffer = gst::Buffer::from_slice(frame.clone()); if let Some(meta) = buffer.get_mut() { let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64); meta.set_pts(Some(pts_time)); meta.set_dts(Some(pts_time)); meta.set_duration(Some(gst::ClockTime::from_nseconds( frame_step.as_nanos() as u64 ))); } if src.push_buffer(buffer).is_err() { break; } if let Some(sample) = sink.try_pull_sample(gst::ClockTime::from_mseconds(250)) && let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { let packet = VideoPacket { id: 2, pts: buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000, data: map.as_slice().to_vec(), ..Default::default() }; let _ = queue.push(packet, Duration::ZERO); } frame_index = frame_index.saturating_add(1); } let _ = src.end_of_stream(); queue.close(); }) } fn spawn_audio_thread( src: gst_app::AppSrc, sink: gst_app::AppSink, schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = (AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1); let mut chunk_index = 0u64; while running.load(Ordering::Acquire) { let pts = chunk_duration.saturating_mul(chunk_index as u32); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk); let mut buffer = gst::Buffer::from_slice(chunk); if let Some(meta) = buffer.get_mut() { let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64); meta.set_pts(Some(pts_time)); meta.set_dts(Some(pts_time)); meta.set_duration(Some(gst::ClockTime::from_nseconds( chunk_duration.as_nanos() as u64, ))); } if src.push_buffer(buffer).is_err() { break; } drain_audio_samples(&sink, &queue, duration, gst::ClockTime::ZERO); chunk_index = chunk_index.saturating_add(1); } let _ = src.end_of_stream(); drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(100)); queue.close(); }) } fn drain_audio_samples( sink: &gst_app::AppSink, queue: &FreshPacketQueue, duration: Duration, timeout: gst::ClockTime, ) { while let Some(sample) = sink.try_pull_sample(timeout) { let Some(buffer) = sample.buffer() else { continue; }; let pts_usecs = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; if probe_pts_exceeds_duration(pts_usecs, duration) { break; } let Ok(map) = buffer.map_readable() else { continue; }; let packet = AudioPacket { id: 0, pts: pts_usecs, data: map.as_slice().to_vec(), }; let _ = queue.push(packet, Duration::ZERO); if timeout == gst::ClockTime::ZERO { continue; } } }