use super::*; pub(super) struct MjpegProbeFrames { dark: Vec, regular_pulse: Vec, marker_pulse: Vec, coded_pulses: BTreeMap>, } pub(super) enum VideoPacketSource { Mjpeg(MjpegProbeFrames), Pipeline { pipeline: gst::Pipeline, src: gst_app::AppSrc, sink: gst_app::AppSink, first_sample_pts: Option, }, } #[derive(Clone, Copy)] pub(super) enum ProbeFrameKind { Dark, RegularPulse, MarkerPulse, Coded(u32), } pub(super) struct EncodedVideoData { pub data: Vec, pub pts: Duration, } /// Build the encoded video source used by the synthetic client transport probe. /// /// Inputs: negotiated camera profile and pulse schedule. /// Outputs: either pre-encoded MJPEG signature frames or a live H.264 encoder. /// Why: MJPEG encoding 1080p frames in the hot loop caused transport jitter, so /// deterministic signature frames are encoded once before capture starts. pub(super) fn build_video_packet_source( camera: CameraConfig, schedule: &PulseSchedule, ) -> Result { match camera.codec { CameraCodec::Mjpeg => { let coded_pulses = schedule .event_width_codes() .iter() .copied() .map(|code| { encode_mjpeg_probe_frame( camera, &build_coded_probe_frame( camera.width as usize, camera.height as usize, code, ), ) .map(|frame| (code, frame)) }) .collect::>>()?; Ok(VideoPacketSource::Mjpeg(MjpegProbeFrames { dark: encode_mjpeg_probe_frame( camera, &build_dark_probe_frame(camera.width as usize, camera.height as usize), )?, regular_pulse: encode_mjpeg_probe_frame( camera, &build_regular_probe_frame(camera.width as usize, camera.height as usize), )?, marker_pulse: encode_mjpeg_probe_frame( camera, &build_marker_probe_frame(camera.width as usize, camera.height as usize), )?, coded_pulses, })) } CameraCodec::H264 | CameraCodec::Hevc => { let pipeline = build_encoded_pipeline(camera)?; let src = pipeline .by_name("sync_probe_video_src") .context("missing sync probe video appsrc")? .downcast::() .expect("video appsrc"); let sink = pipeline .by_name("sync_probe_video_sink") .context("missing sync probe video appsink")? .downcast::() .expect("video appsink"); pipeline .set_state(gst::State::Playing) .context("starting sync probe pipeline")?; Ok(VideoPacketSource::Pipeline { pipeline, src, sink, first_sample_pts: None, }) } } } /// Stop any live encoder held by a video packet source. /// /// Inputs: packet source being consumed by the video thread. /// Outputs: GStreamer shutdown side effects only. /// Why: the probe should release local encoder resources even when the RCT /// capture or upstream transport test exits early. pub(super) fn stop_video_packet_source(packet_source: VideoPacketSource) { if let VideoPacketSource::Pipeline { pipeline, src, .. } = packet_source { let _ = src.end_of_stream(); let _ = pipeline.set_state(gst::State::Null); } } /// Encode one RGB probe frame as MJPEG. /// /// Inputs: camera profile and raw RGB frame bytes. /// Outputs: JPEG packet payload bytes. /// Why: pre-encoding still frames keeps the client transport test focused on /// bundled network timing rather than local software JPEG throughput. fn encode_mjpeg_probe_frame(camera: CameraConfig, frame: &[u8]) -> Result> { let video_caps = format!( "video/x-raw,format=RGB,width={},height={},framerate={}/1", camera.width, camera.height, camera.fps.max(1) ); let desc = format!( "appsrc name=sync_probe_still_src is-live=false format=time do-timestamp=false caps={video_caps} ! \ videoconvert ! jpegenc quality=90 ! image/jpeg,parsed=true,width={},height={},framerate={}/1 ! \ appsink name=sync_probe_still_sink emit-signals=false sync=false max-buffers=1 drop=false", camera.width, camera.height, camera.fps.max(1), ); let pipeline = gst::parse::launch(&desc) .with_context(|| format!("building still MJPEG encoder: {desc}"))? .downcast::() .map_err(|_| anyhow::anyhow!("still MJPEG encoder did not build a pipeline"))?; let src = pipeline .by_name("sync_probe_still_src") .context("missing still MJPEG appsrc")? .downcast::() .expect("still appsrc"); let sink = pipeline .by_name("sync_probe_still_sink") .context("missing still MJPEG appsink")? .downcast::() .expect("still appsink"); pipeline .set_state(gst::State::Playing) .context("starting still MJPEG encoder")?; let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let mut buffer = gst::Buffer::from_slice(frame.to_vec()); if let Some(meta) = buffer.get_mut() { meta.set_pts(Some(gst::ClockTime::ZERO)); meta.set_dts(Some(gst::ClockTime::ZERO)); meta.set_duration(Some(gst::ClockTime::from_nseconds( frame_step.as_nanos() as u64 ))); } src.push_buffer(buffer) .map_err(|err| anyhow::anyhow!("pushing still MJPEG frame failed: {err:?}"))?; let _ = src.end_of_stream(); let sample = sink .try_pull_sample(gst::ClockTime::from_seconds(2)) .context("still MJPEG encoder produced no sample")?; let data = sample .buffer() .context("still MJPEG sample had no buffer")? .map_readable() .context("mapping still MJPEG sample")? .as_slice() .to_vec(); let _ = pipeline.set_state(gst::State::Null); Ok(data) } /// Build the live encoder pipeline for non-MJPEG negotiated profiles. /// /// Inputs: camera profile. /// Outputs: an appsrc-to-appsink GStreamer pipeline. /// Why: inter-frame codecs cannot reuse still JPEG packets, but they still need /// the same RGB signature frames so analyzer identity remains comparable. fn build_encoded_pipeline(camera: CameraConfig) -> Result { let video_caps = format!( "video/x-raw,format=RGB,width={},height={},framerate={}/1", camera.width, camera.height, camera.fps.max(1) ); let (encoder, parse_chain) = match camera.codec { CameraCodec::H264 => ( pick_h264_encoder(camera.fps.max(1))?, "h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au", ), CameraCodec::Hevc => ( pick_hevc_encoder(camera.fps.max(1))?, "h265parse config-interval=-1 ! video/x-h265,stream-format=byte-stream,alignment=au", ), CameraCodec::Mjpeg => unreachable!("MJPEG uses pre-encoded still frames"), }; let video_branch = format!( "appsrc name=sync_probe_video_src is-live=true format=time do-timestamp=false caps={video_caps} ! \ queue max-size-buffers=4 leaky=downstream ! videoconvert ! \ {encoder} ! {parse_chain} ! \ appsink name=sync_probe_video_sink emit-signals=false sync=false max-buffers=4 drop=true", ); gst::parse::launch(&video_branch) .with_context(|| format!("building sync probe pipeline: {video_branch}"))? .downcast::() .map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline")) } /// Choose an available low-latency H.264 encoder. /// /// Inputs: target frame rate, used for GOP sizing where the encoder supports it. /// Outputs: a GStreamer encoder element description. /// Why: this probe should run on different developer hosts without hardcoding a /// single hardware encoder, while still preferring low-latency behavior. fn pick_h264_encoder(fps: u32) -> Result { if gst::ElementFactory::find("x264enc").is_some() { return Ok(format!( "x264enc tune=zerolatency speed-preset=ultrafast bitrate=2500 key-int-max={}", fps.max(1) )); } if gst::ElementFactory::find("openh264enc").is_some() { return Ok("openh264enc bitrate=2500000".to_string()); } if gst::ElementFactory::find("v4l2h264enc").is_some() { return Ok("v4l2h264enc".to_string()); } bail!("no usable H.264 encoder found for sync probe") } /// Choose an available low-latency HEVC encoder. /// /// Inputs: target frame rate, used for GOP sizing where the encoder supports it. /// Outputs: a GStreamer encoder element description. /// Why: the client-to-server probe should exercise the same HEVC transport /// shape as real webcam uplink without requiring a specific GPU encoder. fn pick_hevc_encoder(fps: u32) -> Result { if gst::ElementFactory::find("x265enc").is_some() { let keyframe_interval = low_latency_hevc_keyframe_interval(fps); return Ok(format!( "x265enc tune=zerolatency speed-preset=ultrafast bitrate=2500 key-int-max={}", keyframe_interval )); } for encoder in ["nvh265enc", "vah265enc", "vaapih265enc", "v4l2h265enc"] { if gst::ElementFactory::find(encoder).is_some() { return Ok(encoder.to_string()); } } bail!("no usable HEVC encoder found for sync probe") } /// Match the real webcam HEVC keyframe cadence in synthetic transport probes. /// /// Inputs: target frame rate. Output: low-latency keyframe interval in frames. /// Why: the client-to-RCT probe should stress the same inter-frame shape as /// real webcam uplink; a one-second GOP made coded flashes less representative /// than Lesavka's default live-call HEVC pipeline. fn low_latency_hevc_keyframe_interval(fps: u32) -> u32 { fps.clamp(1, 5) } /// Select the visual signature for a video timestamp. /// /// Inputs: deterministic pulse schedule and current video PTS. /// Outputs: frame kind used by packet encoding. /// Why: the frame decision must be shared by MJPEG and H.264 so both codecs /// carry the same event identity to the RCT analyzer. pub(super) fn probe_frame_kind(schedule: &PulseSchedule, pts: Duration) -> ProbeFrameKind { if !schedule.flash_active(pts) { return ProbeFrameKind::Dark; } if let Some(code) = schedule.event_code(pts) { return ProbeFrameKind::Coded(code); } if schedule.pulse_is_marker(pts) { ProbeFrameKind::MarkerPulse } else { ProbeFrameKind::RegularPulse } } /// Produce an encoded video payload for a probe frame. /// /// Inputs: packet source, frame kind, raw RGB frame, and timing metadata. /// Outputs: encoded video bytes, or `None` when the live encoder is drained. /// Why: bundled transport tests need fresh video packets paced from local PTS, /// but MJPEG and H.264 require different packet-production paths. pub(super) fn video_packet_data( packet_source: &mut VideoPacketSource, frame_kind: ProbeFrameKind, raw_frame: &[u8], pts: Duration, frame_step: Duration, ) -> Option { match packet_source { VideoPacketSource::Mjpeg(frames) => match frame_kind { ProbeFrameKind::Dark => Some(EncodedVideoData { data: frames.dark.clone(), pts, }), ProbeFrameKind::RegularPulse => Some(EncodedVideoData { data: frames.regular_pulse.clone(), pts, }), ProbeFrameKind::MarkerPulse => Some(EncodedVideoData { data: frames.marker_pulse.clone(), pts, }), ProbeFrameKind::Coded(code) => frames .coded_pulses .get(&code) .cloned() .map(|data| EncodedVideoData { data, pts }), }, VideoPacketSource::Pipeline { src, sink, first_sample_pts, .. } => { let mut buffer = gst::Buffer::from_slice(raw_frame.to_vec()); if let Some(meta) = buffer.get_mut() { let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64); meta.set_pts(Some(pts_time)); meta.set_dts(Some(pts_time)); meta.set_duration(Some(gst::ClockTime::from_nseconds( frame_step.as_nanos() as u64 ))); } if src.push_buffer(buffer).is_err() { return None; } freshest_probe_video_sample(sink).and_then(|sample| { let buffer = sample.buffer()?; let sample_pts = normalized_sample_pts_duration(buffer, first_sample_pts).unwrap_or(pts); let map = buffer.map_readable().ok()?; Some(EncodedVideoData { data: map.as_slice().to_vec(), pts: sample_pts, }) }) } } } /// Read the encoder output timestamp for one sample buffer. /// /// Inputs: encoded sample buffer. Output: packet PTS as a `Duration` when the /// encoder preserved it. Why: inter-frame encoders may return an older access /// unit than the frame just pushed, so transport packets must use the actual /// output PTS instead of the current input-loop PTS. fn sample_pts_duration(buffer: &gst::BufferRef) -> Option { buffer.pts().map(|pts| Duration::from_nanos(pts.nseconds())) } /// Rebase encoder output timestamps onto the probe's zero-based timeline. /// /// Inputs: encoded sample buffer and mutable first-sample timestamp. /// Output: normalized sample PTS. /// Why: some GStreamer encoders emit a segment-offset PTS while still preserving /// correct sample-to-sample cadence, so the probe keeps the cadence and drops /// the absolute segment origin before bundling media for transport. fn normalized_sample_pts_duration( buffer: &gst::BufferRef, first_sample_pts: &mut Option, ) -> Option { let sample_pts = sample_pts_duration(buffer)?; let first = first_sample_pts.get_or_insert(sample_pts); Some(sample_pts.saturating_sub(*first)) } /// Drain a live appsink and return the newest encoded sample. /// /// Inputs: GStreamer appsink for the H.264 probe pipeline. /// Outputs: most recent sample if one was produced. /// Why: the transport probe should prefer freshness over preserving an encoder /// backlog that would make client-origin media look older than it really is. fn freshest_probe_video_sample(sink: &gst_app::AppSink) -> Option { let mut newest = sink.try_pull_sample(gst::ClockTime::from_mseconds(250)); while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) { newest = Some(sample); } newest } #[cfg(test)] mod tests { use gstreamer as gst; /// Verifies synthetic HEVC probes use the same short GOP shape as live /// camera transport. /// /// Input: representative target frame rates. Output: bounded keyframe /// interval. Why: coded flash recovery should fail for real transport /// reasons, not because the probe used an easier one-second GOP. #[test] fn low_latency_hevc_keyframe_interval_matches_live_camera_default() { assert_eq!(super::low_latency_hevc_keyframe_interval(0), 1); assert_eq!(super::low_latency_hevc_keyframe_interval(1), 1); assert_eq!(super::low_latency_hevc_keyframe_interval(5), 5); assert_eq!(super::low_latency_hevc_keyframe_interval(20), 5); assert_eq!(super::low_latency_hevc_keyframe_interval(30), 5); } /// Verifies encoded packet timestamps come from the encoder output sample. /// /// Input: one encoded GStreamer buffer with explicit PTS. Output: matching /// `Duration`. Why: HEVC encoders may return a delayed access unit, so the /// bundle must carry the timestamp of what actually left the encoder. #[test] fn sample_pts_duration_uses_encoder_output_pts() { gst::init().expect("gst init"); let mut buffer = gst::Buffer::with_size(4).expect("buffer"); { let meta = buffer.get_mut().expect("mutable buffer"); meta.set_pts(Some(gst::ClockTime::from_mseconds(123))); } assert_eq!( super::sample_pts_duration(buffer.as_ref()), Some(std::time::Duration::from_millis(123)) ); } /// Verifies encoder segment origins are removed while cadence is retained. /// /// Input: two encoded buffers whose PTS starts far from zero. Output: /// zero-based probe timestamps. Why: the server analyzer compares client /// media against the synthetic probe timeline, not GStreamer segment /// wall-clock origins. #[test] fn normalized_sample_pts_duration_preserves_cadence_without_segment_origin() { gst::init().expect("gst init"); let mut first_sample_pts = None; let mut first = gst::Buffer::with_size(4).expect("first"); first .get_mut() .expect("first mutable") .set_pts(Some(gst::ClockTime::from_seconds(3_600))); assert_eq!( super::normalized_sample_pts_duration(first.as_ref(), &mut first_sample_pts), Some(std::time::Duration::ZERO) ); let mut second = gst::Buffer::with_size(4).expect("second"); second.get_mut().expect("second mutable").set_pts(Some( gst::ClockTime::from_seconds(3_600) + gst::ClockTime::from_mseconds(33), )); assert_eq!( super::normalized_sample_pts_duration(second.as_ref(), &mut first_sample_pts), Some(std::time::Duration::from_millis(33)) ); } }