#![forbid(unsafe_code)] use std::{path::PathBuf, time::Duration}; use anyhow::{Context, Result, bail}; use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use lesavka_common::lesavka::{ AudioEncoding, AudioPacket, UpstreamMediaBundle, VideoPacket, relay_client::RelayClient, }; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; const DEFAULT_SERVER: &str = "http://127.0.0.1:50051"; const DEFAULT_SAMPLE_RATE: u32 = 48_000; const DEFAULT_CHANNELS: u32 = 2; const DEFAULT_JPEG_QUALITY: i32 = 82; const MARKER_BITS: usize = 32; const MARKER_COLUMNS: usize = 16; #[derive(Clone, Debug)] struct Args { server: String, width: usize, height: usize, fps: u32, duration: Duration, jpeg_quality: i32, session_id: u64, artifact_dir: Option, print_every: u64, max_frame_bytes: usize, tls_ca: Option, tls_client_cert: Option, tls_client_key: Option, tls_domain: Option, } impl Args { fn parse() -> Result { let mut args = Self { server: DEFAULT_SERVER.to_string(), width: 1280, height: 720, fps: 30, duration: Duration::from_secs(300), jpeg_quality: DEFAULT_JPEG_QUALITY, session_id: unix_millis(), artifact_dir: None, print_every: 150, max_frame_bytes: env_usize("LESAVKA_SYNTHETIC_MAX_FRAME_BYTES").unwrap_or(0), tls_ca: env_path("LESAVKA_TLS_CA").or_else(|| default_pki_path("ca.crt")), tls_client_cert: env_path("LESAVKA_TLS_CLIENT_CERT") .or_else(|| default_pki_path("client.crt")), tls_client_key: env_path("LESAVKA_TLS_CLIENT_KEY") .or_else(|| default_pki_path("client.key")), tls_domain: std::env::var("LESAVKA_TLS_DOMAIN") .ok() .filter(|value| !value.trim().is_empty()), }; let mut it = std::env::args().skip(1); while let Some(flag) = it.next() { match flag.as_str() { "--server" => args.server = next_value(&mut it, &flag)?, "--width" => args.width = parse_next(&mut it, &flag)?, "--height" => args.height = parse_next(&mut it, &flag)?, "--fps" => args.fps = parse_next(&mut it, &flag)?, "--duration" => { let seconds: f64 = parse_next(&mut it, &flag)?; args.duration = Duration::from_secs_f64(seconds.max(0.0)); } "--jpeg-quality" => args.jpeg_quality = parse_next(&mut it, &flag)?, "--session-id" => args.session_id = parse_next(&mut it, &flag)?, "--artifact-dir" => { args.artifact_dir = Some(PathBuf::from(next_value(&mut it, &flag)?)) } "--print-every" => args.print_every = parse_next(&mut it, &flag)?, "--max-frame-bytes" => args.max_frame_bytes = parse_next(&mut it, &flag)?, "--tls-ca" => args.tls_ca = Some(PathBuf::from(next_value(&mut it, &flag)?)), "--tls-client-cert" => { args.tls_client_cert = Some(PathBuf::from(next_value(&mut it, &flag)?)) } "--tls-client-key" => { args.tls_client_key = Some(PathBuf::from(next_value(&mut it, &flag)?)) } "--tls-domain" => args.tls_domain = Some(next_value(&mut it, &flag)?), "--mode" => { let value = next_value(&mut it, &flag)?; let (width, height, fps) = parse_mode(&value)?; args.width = width; args.height = height; args.fps = fps; } "--help" | "-h" => { print_help(); std::process::exit(0); } other => bail!("unknown argument {other:?}; pass --help for usage"), } } if args.width == 0 || args.height == 0 || args.fps == 0 { bail!("width, height, and fps must be positive"); } args.jpeg_quality = args.jpeg_quality.clamp(1, 100); Ok(args) } fn frame_step_us(&self) -> u64 { (1_000_000_u64 / u64::from(self.fps)).max(1) } fn total_frames(&self) -> u64 { let frames = self.duration.as_secs_f64() * f64::from(self.fps); frames.ceil().max(1.0) as u64 } } struct MjpegEncoder { src: gst_app::AppSrc, sink: gst_app::AppSink, pipeline: gst::Pipeline, width: usize, height: usize, frame_step_us: u64, } #[derive(Clone, Copy, Debug, Default)] struct EncodeStats { frames: u64, total_bytes: u128, min_bytes: usize, max_bytes: usize, oversize_frames: u64, } impl EncodeStats { fn record(&mut self, bytes: usize, max_frame_bytes: usize) { self.frames = self.frames.saturating_add(1); self.total_bytes = self.total_bytes.saturating_add(bytes as u128); self.min_bytes = if self.frames == 1 { bytes } else { self.min_bytes.min(bytes) }; self.max_bytes = self.max_bytes.max(bytes); if max_frame_bytes > 0 && bytes > max_frame_bytes { self.oversize_frames = self.oversize_frames.saturating_add(1); } } fn mean_bytes(&self) -> usize { if self.frames == 0 { 0 } else { (self.total_bytes / u128::from(self.frames)).min(usize::MAX as u128) as usize } } } impl MjpegEncoder { fn new(args: &Args) -> Result { gst::init().context("gst init")?; let width = args.width as i32; let height = args.height as i32; let fps = args.fps as i32; let raw_caps = gst::Caps::builder("video/x-raw") .field("format", "RGB") .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let jpeg_caps = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let pipeline = gst::Pipeline::new(); let src = gst::ElementFactory::make("appsrc") .name("lesavka_synthetic_uplink_src") .build()? .downcast::() .expect("appsrc"); src.set_is_live(false); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); src.set_caps(Some(&raw_caps)); let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", args.jpeg_quality) .build()?; let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &jpeg_caps) .build()?; let sink = gst::ElementFactory::make("appsink") .name("lesavka_synthetic_uplink_sink") .property("sync", false) .property("emit-signals", false) .property("max-buffers", 8u32) .build()? .downcast::() .expect("appsink"); pipeline.add_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; pipeline .set_state(gst::State::Playing) .context("starting synthetic MJPEG encoder")?; Ok(Self { src, sink, pipeline, width: args.width, height: args.height, frame_step_us: args.frame_step_us(), }) } fn encode(&mut self, sequence: u64) -> Result> { let pts_us = sequence.saturating_mul(self.frame_step_us); let mut buffer = gst::Buffer::from_slice(synthetic_rgb_frame(self.width, self.height, sequence)); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(pts_us); meta.set_pts(Some(pts)); meta.set_dts(Some(pts)); meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); } self.src .push_buffer(buffer) .context("encoding synthetic frame")?; let sample = self .sink .pull_sample() .context("pulling encoded synthetic frame")?; let buffer = sample .buffer() .context("encoded synthetic frame had no buffer")?; let map = buffer .map_readable() .context("mapping encoded synthetic frame")?; Ok(map.as_slice().to_vec()) } } impl Drop for MjpegEncoder { fn drop(&mut self) { let _ = self.src.end_of_stream(); let _ = self.pipeline.set_state(gst::State::Null); } } async fn connect_channel(args: &Args) -> Result { let mut endpoint = tonic::transport::Channel::from_shared(args.server.clone())?.tcp_nodelay(true); if is_https(&args.server) { endpoint = endpoint .tls_config(client_tls_config(args)?) .context("configuring synthetic uplink TLS")?; } endpoint .connect() .await .with_context(|| format!("connecting to {}", args.server)) } fn client_tls_config(args: &Args) -> Result { let mut tls = ClientTlsConfig::new().domain_name( args.tls_domain .clone() .or_else(|| host_from_uri(&args.server)) .unwrap_or_else(|| "lesavka-server".to_string()), ); let ca_path = args .tls_ca .as_ref() .context("https synthetic uplink requires --tls-ca or LESAVKA_TLS_CA")?; let cert_path = args .tls_client_cert .as_ref() .context("https synthetic uplink requires --tls-client-cert or LESAVKA_TLS_CLIENT_CERT")?; let key_path = args .tls_client_key .as_ref() .context("https synthetic uplink requires --tls-client-key or LESAVKA_TLS_CLIENT_KEY")?; let ca = std::fs::read(ca_path) .with_context(|| format!("reading TLS CA certificate {}", ca_path.display()))?; tls = tls.ca_certificate(Certificate::from_pem(ca)); let cert = std::fs::read(cert_path) .with_context(|| format!("reading TLS client certificate {}", cert_path.display()))?; let key = std::fs::read(key_path) .with_context(|| format!("reading TLS client key {}", key_path.display()))?; Ok(tls.identity(Identity::from_pem(cert, key))) } fn is_https(server: &str) -> bool { server.trim_start().starts_with("https://") } fn host_from_uri(server: &str) -> Option { let rest = server.split_once("://")?.1; let host_port = rest.split('/').next().unwrap_or(rest); let host = host_port .rsplit_once('@') .map(|(_, host)| host) .unwrap_or(host_port); if host.starts_with('[') { return host .split_once(']') .map(|(value, _)| value.trim_start_matches('[').to_string()); } Some(host.split(':').next().unwrap_or(host).to_string()).filter(|host| !host.is_empty()) } fn env_path(name: &str) -> Option { std::env::var_os(name) .filter(|value| !value.is_empty()) .map(PathBuf::from) } fn default_pki_path(file_name: &str) -> Option { let home = std::env::var_os("HOME")?; Some( PathBuf::from(home) .join(".config") .join("lesavka") .join("pki") .join(file_name), ) } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { let args = Args::parse()?; if let Some(dir) = &args.artifact_dir { std::fs::create_dir_all(dir).with_context(|| format!("creating {}", dir.display()))?; std::fs::write( dir.join("command.txt"), std::env::args().collect::>().join(" ") + "\n", )?; write_summary(&args, None)?; } let channel = connect_channel(&args).await?; let mut client = RelayClient::new(channel); let (tx, rx) = mpsc::channel::(8); let response_task = tokio::spawn(async move { let response = client .stream_webcam_media(Request::new(ReceiverStream::new(rx))) .await .context("opening StreamWebcamMedia")?; let mut inbound = response.into_inner(); while inbound .message() .await .context("reading StreamWebcamMedia response")? .is_some() {} Ok::<(), anyhow::Error>(()) }); let mut encoder = MjpegEncoder::new(&args)?; let mut encode_stats = EncodeStats::default(); let frame_step = Duration::from_micros(args.frame_step_us()); let started = tokio::time::Instant::now() + Duration::from_millis(250); let total_frames = args.total_frames(); eprintln!( "lesavka synthetic uplink: mode={}x{}@{} frames={} server={} session={}", args.width, args.height, args.fps, total_frames, args.server, args.session_id ); for sequence in 0..total_frames { tokio::time::sleep_until(started + duration_mul(frame_step, sequence)).await; let pts_us = sequence.saturating_mul(args.frame_step_us()); let encoded = encoder.encode(sequence)?; encode_stats.record(encoded.len(), args.max_frame_bytes); if args.max_frame_bytes > 0 && encoded.len() > args.max_frame_bytes { write_summary(&args, Some(&encode_stats))?; bail!( "encoded synthetic frame {sequence} is {} bytes, above --max-frame-bytes {}; lower --jpeg-quality or use a more compressible synthetic pattern before trusting the RCT probe", encoded.len(), args.max_frame_bytes ); } let bundle = synthetic_bundle(&args, sequence, pts_us, encoded); if tx.send(bundle).await.is_err() { let response_result = response_task .await .context("joining StreamWebcamMedia task after early close")?; match response_result { Ok(()) => bail!( "StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe" ), Err(err) => { return Err(err) .context("StreamWebcamMedia closed before accepting synthetic frame"); } } } if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 { eprintln!("sent synthetic frame {sequence}/{total_frames}"); } } drop(tx); response_task .await .context("joining StreamWebcamMedia task")??; write_summary(&args, Some(&encode_stats))?; eprintln!("lesavka synthetic uplink complete: frames={total_frames}"); Ok(()) } fn synthetic_bundle(args: &Args, sequence: u64, pts_us: u64, data: Vec) -> UpstreamMediaBundle { let video = VideoPacket { id: 0, pts: pts_us, data, seq: sequence, effective_fps: args.fps, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, ..Default::default() }; let audio = AudioPacket { id: 0, pts: pts_us, data: silence_pcm(args.frame_step_us()), seq: sequence, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, encoding: AudioEncoding::PcmS16le as i32, sample_rate: DEFAULT_SAMPLE_RATE, channels: DEFAULT_CHANNELS, frame_duration_us: args.frame_step_us().min(u64::from(u32::MAX)) as u32, ..Default::default() }; UpstreamMediaBundle { session_id: args.session_id, seq: sequence, capture_start_us: pts_us, capture_end_us: pts_us, video: Some(video), audio: vec![audio], audio_sample_rate: DEFAULT_SAMPLE_RATE, audio_channels: DEFAULT_CHANNELS, video_width: args.width as u32, video_height: args.height as u32, video_fps: args.fps, audio_encoding: AudioEncoding::PcmS16le as i32, } } fn silence_pcm(duration_us: u64) -> Vec { let samples = (u64::from(DEFAULT_SAMPLE_RATE).saturating_mul(duration_us) / 1_000_000).max(1); let bytes = samples .saturating_mul(u64::from(DEFAULT_CHANNELS)) .saturating_mul(std::mem::size_of::() as u64) .min(usize::MAX as u64) as usize; vec![0; bytes] } fn synthetic_rgb_frame(width: usize, height: usize, sequence: u64) -> Vec { let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; let moving_width = (width / 10).max(32).min(width.max(1)); let moving_offset = (sequence as usize).wrapping_mul(13) % width.max(1); for y in 0..height { for x in 0..width { let value = synthetic_luma( (width, height), x, y, sequence, (moving_width, moving_offset), ); let offset = (y * width + x) * 3; frame[offset] = value; frame[offset + 1] = value; frame[offset + 2] = value; } } draw_sequence_marker(&mut frame, width, height, sequence); frame } fn synthetic_luma( frame_size: (usize, usize), x: usize, y: usize, sequence: u64, moving_bar: (usize, usize), ) -> u8 { let (width, height) = frame_size; let (moving_width, moving_offset) = moving_bar; let width = width.max(1); let height = height.max(1); let block_w = (width / 24).max(24); let block_h = (height / 18).max(18); let base = 44 + (x.saturating_mul(72) / width) + (y.saturating_mul(52) / height) + ((sequence as usize).saturating_mul(3) % 28); let checker = if (((x / block_w) + (y / block_h) + (sequence as usize / 5)) & 1) == 0 { 30 } else { 0 }; let mut value = (base + checker).min(238) as u8; let moving = (x + width - moving_offset) % width; if moving < moving_width { value = (220usize.saturating_sub(y.saturating_mul(54) / height)).min(255) as u8; } else if moving < moving_width + 4 { value = 28; } let center_x = width / 2; let center_y = height / 2; if x.abs_diff(center_x) < width / 9 && y.abs_diff(center_y) < height / 12 { value = 255u8.saturating_sub(value / 2); } value } fn marker_cell(width: usize, height: usize) -> usize { (width.min(height) / 80).clamp(6, 16) } fn draw_sequence_marker(frame: &mut [u8], width: usize, height: usize, sequence: u64) { let cell = marker_cell(width, height); let rows = MARKER_BITS.div_ceil(MARKER_COLUMNS); if width < (MARKER_COLUMNS + 4) * cell || height < (rows + 4) * cell { return; } let x0 = 2 * cell; let y0 = 2 * cell; fill_rect( frame, width, cell, cell, (MARKER_COLUMNS + 2) * cell, (rows + 2) * cell, 32, ); fill_rect(frame, width, x0 - cell, y0 - cell, cell, cell, 255); fill_rect( frame, width, x0 + MARKER_COLUMNS * cell, y0 - cell, cell, cell, 0, ); for bit in 0..MARKER_BITS { let col = bit % MARKER_COLUMNS; let row = bit / MARKER_COLUMNS; let value = if ((sequence >> bit) & 1) != 0 { 255 } else { 0 }; fill_rect( frame, width, x0 + col * cell, y0 + row * cell, cell, cell, value, ); } } fn fill_rect(frame: &mut [u8], width: usize, x0: usize, y0: usize, w: usize, h: usize, value: u8) { let pixels = frame.len() / 3; let height = pixels / width.max(1); let x1 = (x0 + w).min(width); let y1 = (y0 + h).min(height); for y in y0..y1 { for x in x0..x1 { let offset = (y * width + x) * 3; if let Some(pixel) = frame.get_mut(offset..offset + 3) { pixel[0] = value; pixel[1] = value; pixel[2] = value; } } } } fn parse_mode(value: &str) -> Result<(usize, usize, u32)> { let (size, fps) = value .split_once('@') .with_context(|| format!("mode must look like WIDTHxHEIGHT@FPS, got {value:?}"))?; let (width, height) = size .split_once('x') .with_context(|| format!("mode must look like WIDTHxHEIGHT@FPS, got {value:?}"))?; Ok((width.parse()?, height.parse()?, fps.parse()?)) } fn next_value(it: &mut impl Iterator, flag: &str) -> Result { it.next() .with_context(|| format!("{flag} requires a value")) } fn parse_next(it: &mut impl Iterator, flag: &str) -> Result where T: std::str::FromStr, T::Err: std::error::Error + Send + Sync + 'static, { Ok(next_value(it, flag)?.parse()?) } fn env_usize(name: &str) -> Option { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) } fn duration_mul(duration: Duration, count: u64) -> Duration { Duration::from_nanos( duration .as_nanos() .saturating_mul(u128::from(count)) .min(u128::from(u64::MAX)) as u64, ) } fn unix_millis() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() .min(u128::from(u64::MAX)) as u64 } fn write_summary(args: &Args, stats: Option<&EncodeStats>) -> Result<()> { if let Some(dir) = &args.artifact_dir { std::fs::write( dir.join("summary.json"), args_summary_json(args, stats) + "\n", )?; } Ok(()) } fn json_usize_or_null(value: Option) -> String { value .map(|value| value.to_string()) .unwrap_or_else(|| "null".to_string()) } fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String { let frames = stats.map(|stats| stats.frames).unwrap_or(0); let min_bytes = json_usize_or_null(stats.and_then(|stats| (stats.frames > 0).then_some(stats.min_bytes))); let max_bytes = json_usize_or_null(stats.and_then(|stats| (stats.frames > 0).then_some(stats.max_bytes))); let mean_bytes = json_usize_or_null( stats.and_then(|stats| (stats.frames > 0).then_some(stats.mean_bytes())), ); let oversize_frames = stats.map(|stats| stats.oversize_frames).unwrap_or(0); format!( "{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session},\"tls\":{tls},\"jpeg_quality\":{quality},\"max_frame_bytes\":{max_frame_bytes},\"encoded_frames\":{frames},\"encoded_min_bytes\":{min_bytes},\"encoded_max_bytes\":{max_bytes},\"encoded_mean_bytes\":{mean_bytes},\"encoded_oversize_frames\":{oversize_frames}}}", server = args.server, width = args.width, height = args.height, fps = args.fps, duration = args.duration.as_secs_f64(), session = args.session_id, tls = is_https(&args.server), quality = args.jpeg_quality, max_frame_bytes = args.max_frame_bytes, frames = frames, min_bytes = min_bytes, max_bytes = max_bytes, mean_bytes = mean_bytes, oversize_frames = oversize_frames, ) } fn print_help() { println!( "lesavka-synthetic-uplink\n\n\ Sends sequence-coded synthetic MJPEG plus silent PCM through StreamWebcamMedia.\n\n\ Options:\n\ --server URL gRPC endpoint, default {DEFAULT_SERVER}\n\ --mode WIDTHxHEIGHT@FPS shorthand for width/height/fps\n\ --width N --height N --fps N\n\ --duration SECONDS default 300\n\ --jpeg-quality N default {DEFAULT_JPEG_QUALITY}\n\ --max-frame-bytes N fail fast if an encoded frame exceeds N bytes\n\ --artifact-dir PATH write command/summary metadata\n\ --print-every N progress interval in frames" ); } #[cfg(test)] mod tests { use super::*; fn test_args(width: usize, height: usize, fps: u32) -> Args { Args { server: DEFAULT_SERVER.to_string(), width, height, fps, duration: Duration::from_secs(1), jpeg_quality: DEFAULT_JPEG_QUALITY, session_id: 1, artifact_dir: None, print_every: 0, max_frame_bytes: 232_106, tls_ca: None, tls_client_cert: None, tls_client_key: None, tls_domain: None, } } #[test] fn synthetic_frames_fit_safe_720p_and_1080p_isochronous_budget() { for (width, height, fps) in [(1280, 720, 30), (1920, 1080, 30)] { let args = test_args(width, height, fps); let mut encoder = MjpegEncoder::new(&args).expect("synthetic encoder"); for sequence in [0, 1, 30, 120, 300] { let encoded = encoder.encode(sequence).expect("encode frame"); assert!( encoded.len() <= args.max_frame_bytes, "{}x{}@{} synthetic frame {sequence} encoded to {} bytes, above {}", width, height, fps, encoded.len(), args.max_frame_bytes ); } } } }