#!/usr/bin/env bash set -euo pipefail ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd) REPORT_DIR="${ROOT_DIR}/target/hygiene-gate" CLIPPY_JSON="${REPORT_DIR}/clippy.json" SUMMARY_TXT="${REPORT_DIR}/summary.txt" BASELINE_JSON="${ROOT_DIR}/scripts/ci/hygiene_gate_baseline.json" METADATA_JSON="${REPORT_DIR}/cargo-metadata.json" METRICS_FILE="${REPORT_DIR}/metrics.prom" mkdir -p "${REPORT_DIR}" cargo fmt --all -- --check cargo check --workspace --all-targets cargo metadata --locked --format-version 1 >"${METADATA_JSON}" cargo clippy --workspace --all-targets --message-format json -- -W clippy::pedantic >"${CLIPPY_JSON}" branch=${BRANCH_NAME:-${GIT_BRANCH:-}} if [[ -z "${branch}" ]]; then branch=$(git -C "${ROOT_DIR}" rev-parse --abbrev-ref HEAD 2>/dev/null || echo unknown) fi commit=${GIT_COMMIT:-} if [[ -z "${commit}" ]]; then commit=$(git -C "${ROOT_DIR}" rev-parse --short HEAD 2>/dev/null || echo unknown) fi python3 - "${CLIPPY_JSON}" "${BASELINE_JSON}" "${SUMMARY_TXT}" "${ROOT_DIR}" "${METRICS_FILE}" "${branch}" "${commit}" <<'PY' import json import os import pathlib import re import stat import subprocess import sys from collections import defaultdict clippy_path = pathlib.Path(sys.argv[1]) baseline_path = pathlib.Path(sys.argv[2]) summary_path = pathlib.Path(sys.argv[3]) root = pathlib.Path(sys.argv[4]) metrics_path = pathlib.Path(sys.argv[5]) branch = sys.argv[6] commit = sys.argv[7] fn_re = re.compile(r'^\s*(?:pub(?:\([^)]+\))?\s+)?(?:async\s+)?(?:unsafe\s+)?fn\s+\w+') env_re = re.compile(r'LESAVKA_[A-Z0-9_]+') lazy_name_tokens = {'part', 'piece', 'chunk', 'misc', 'stuff', 'helpers2', 'new', 'old', 'tmp'} expected_workspace_members = {'common', 'client', 'server', 'testing'} required_binary_paths = { 'lesavka-client': 'client/Cargo.toml', 'lesavka-server': 'server/Cargo.toml', 'lesavka-uvc': 'server/Cargo.toml', 'lesavka-relayctl': 'client/src/bin/lesavka-relayctl.rs', } def load_json_lines(path: pathlib.Path): for raw in path.read_text(encoding='utf-8').splitlines(): raw = raw.strip() if not raw: continue try: yield json.loads(raw) except json.JSONDecodeError: continue def repo_relative(path: str) -> str | None: try: return pathlib.Path(path).resolve().relative_to(root).as_posix() except Exception: return None def run_git(*args: str) -> list[str]: proc = subprocess.run( ['git', '-C', str(root), *args], check=True, text=True, capture_output=True, ) return [line for line in proc.stdout.splitlines() if line] def tracked_files() -> list[str]: return run_git('ls-files') def parse_workspace_members() -> set[str]: text = (root / 'Cargo.toml').read_text(encoding='utf-8') match = re.search(r'members\s*=\s*\[(?P.*?)\]', text, re.S) if not match: return set() return set(re.findall(r'"([^"]+)"', match.group('body'))) def repo_policy_violations(files: list[str]) -> list[str]: violations: list[str] = [] tracked = set(files) if 'Cargo.lock' not in tracked: violations.append('Cargo.lock: must be committed for reproducible Rust builds') members = parse_workspace_members() if members != expected_workspace_members: violations.append( f'Cargo.toml: workspace members must be explicit {sorted(expected_workspace_members)}, found {sorted(members)}' ) generated_patterns = ( re.compile(r'(^|/)target/'), re.compile(r'(^|/)dist/'), re.compile(r'(^|/)logs/'), re.compile(r'(^|/)coverage/'), re.compile(r'(^|/)captures/'), re.compile(r'\.(log|h264|aac|wav|rgba)$'), ) for path in files: if pathlib.Path(path).name == 'AGENTS.md': violations.append(f'{path}: local AGENTS notes must not be committed') if any(pattern.search(path) for pattern in generated_patterns): violations.append(f'{path}: generated/build/runtime artifact must not be committed') for name, marker in required_binary_paths.items(): marker_path = root / marker if marker.endswith('.rs'): if not marker_path.exists(): violations.append(f'{name}: stable public binary source {marker} is missing') elif name not in marker_path.read_text(encoding='utf-8'): violations.append(f'{name}: stable public binary name missing from {marker}') return violations def naming_policy_violations(files: list[str]) -> list[str]: violations: list[str] = [] for path in files: if path.startswith('.git/') or path.startswith('target/'): continue stem = pathlib.Path(path).stem.lower() tokens = [token for token in re.split(r'[^a-z0-9]+', stem) if token] for token in tokens: if token in lazy_name_tokens: violations.append(f'{path}: lazy split token "{token}" is not allowed in filenames') if path.endswith('.rs'): rel = pathlib.Path(path) if len(rel.parts) >= 2 and rel.parts[-2] == 'bin' and rel.stem.startswith('lesavka-'): continue if not re.match(r'^[a-z0-9_]+$', rel.stem): violations.append(f'{path}: Rust filenames must use meaningful snake_case') return violations def script_policy_violations(files: list[str]) -> list[str]: violations: list[str] = [] ci_text_parts: list[str] = [] ci_paths = [root / 'Jenkinsfile', *sorted((root / 'scripts' / 'ci').glob('*.sh'))] for path in ci_paths: if path.exists(): ci_text_parts.append(path.read_text(encoding='utf-8', errors='replace')) ci_text = '\n'.join(ci_text_parts) if re.search(r'(?:^|\s)(?:sh\s+)?scripts/manual/', ci_text): violations.append('scripts/manual: manual probes must not be required by CI') for file in sorted((root / 'scripts').rglob('*')): if not file.is_file(): continue rel = repo_relative(str(file)) if rel is None: continue text = file.read_text(encoding='utf-8', errors='replace') lines = text.splitlines() first = lines[0] if lines else '' if first.startswith('#!'): mode = file.stat().st_mode if not mode & stat.S_IXUSR: violations.append(f'{rel}: shebang script must be executable') header = '\n'.join(lines[:25]) if 'bash' in first and 'set -euo pipefail' not in header: violations.append(f'{rel}: bash scripts must use set -euo pipefail where safe') if rel.startswith('scripts/manual/') and rel.endswith('.sh'): header = '\n'.join(lines[:12]).lower() if 'manual:' not in header or 'not part of ci' not in header: violations.append(f'{rel}: manual scripts must be clearly marked manual and outside CI') return violations def env_doc_violations(files: list[str]) -> list[str]: docs_path = root / 'docs' / 'operational-env.md' if not docs_path.exists(): return ['docs/operational-env.md: missing env-var inventory'] docs_text = docs_path.read_text(encoding='utf-8') found: set[str] = set() scan_prefixes = ('client/', 'common/', 'server/', 'testing/', 'scripts/') scan_files = [ path for path in files if path == 'Jenkinsfile' or path.endswith('.toml') or path.startswith(scan_prefixes) ] for path in scan_files: full = root / path if not full.exists() or full.is_dir(): continue text = full.read_text(encoding='utf-8', errors='replace') found.update(env_re.findall(text)) return [ f'{var}: LESAVKA env var is used but missing from docs/operational-env.md' for var in sorted(found) if var not in docs_text ] def esc(value: str) -> str: return value.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"') def clippy_counts(path: pathlib.Path) -> dict[str, int]: counts: dict[str, int] = defaultdict(int) for item in load_json_lines(path): if item.get('reason') != 'compiler-message': continue message = item.get('message', {}) if message.get('level') != 'warning': continue spans = message.get('spans') or [] primary = next((span for span in spans if span.get('is_primary')), None) if primary is None: primary = spans[0] if spans else None if not primary: continue rel = repo_relative(primary.get('file_name', '')) if rel is None or '/src/' not in rel or '/target/' in rel: continue if '/src/tests/' in rel: continue counts[rel] += 1 return dict(sorted(counts.items())) def function_blocks(lines: list[str]): index = 0 while index < len(lines): if not fn_re.match(lines[index]): index += 1 continue start = index doc_ok = False prev = index - 1 while prev >= 0 and not lines[prev].strip(): prev -= 1 if prev >= 0: stripped = lines[prev].lstrip() doc_ok = stripped.startswith('///') or stripped.startswith('#[doc =') brace_depth = 0 seen_open = False body_lines = 0 j = index while j < len(lines): text = lines[j] brace_depth += text.count('{') - text.count('}') if '{' in text: seen_open = True if seen_open and text.strip(): body_lines += 1 if seen_open and brace_depth <= 0: break j += 1 block_text = '\n'.join(lines[start:j + 1]) non_trivial = body_lines >= 12 or any(token in block_text for token in (' if ', ' match ', ' for ', ' while ', ' loop ', '?.')) yield start + 1, j + 1, doc_ok, non_trivial index = j + 1 def doc_debt_counts(path: pathlib.Path) -> dict[str, int]: counts: dict[str, int] = defaultdict(int) for file in sorted(root.rglob('*.rs')): rel = repo_relative(str(file)) if rel is None or '/src/' not in rel or '/target/' in rel: continue if '/src/tests/' in rel: continue lines = file.read_text(encoding='utf-8').splitlines() debt = 0 for _, _, doc_ok, non_trivial in function_blocks(lines): if non_trivial and not doc_ok: debt += 1 counts[rel] = debt return dict(sorted(counts.items())) def source_loc_counts() -> dict[str, int]: counts: dict[str, int] = {} for file in sorted(root.rglob('*.rs')): rel = repo_relative(str(file)) if rel is None or '/src/' not in rel or '/target/' in rel: continue if '/src/tests/' in rel: continue counts[rel] = sum(1 for _ in file.open('r', encoding='utf-8')) return dict(sorted(counts.items())) def integration_layout_violations() -> list[str]: violations: list[str] = [] for file in sorted(root.rglob('*.rs')): rel = repo_relative(str(file)) if rel is None or rel.startswith('target/') or rel.startswith('testing/'): continue parts = pathlib.Path(rel).parts if len(parts) >= 3 and parts[1] == 'src' and parts[2] == 'tests': violations.append( f'{rel}: integration tests must live under testing/tests/ instead of package-local src/tests/' ) elif len(parts) >= 2 and parts[1] == 'tests': violations.append( f'{rel}: integration tests must live under testing/tests/ instead of package-local tests/' ) return violations def testing_contract_violations() -> list[str]: violations: list[str] = [] contract_dir = root / 'testing' / 'tests' if not contract_dir.exists(): return ['testing/tests: missing dedicated top-level integration test directory'] test_files = sorted(contract_dir.rglob('*.rs')) if not test_files: return ['testing/tests: no integration test files found'] filename_re = re.compile(r'^[a-z0-9_]+\.rs$') required_markers = ('Scope:', 'Targets:', 'Why:') for file in test_files: rel = repo_relative(str(file)) if rel is None: continue loc = sum(1 for _ in file.open('r', encoding='utf-8')) if loc > 500: violations.append(f'{rel}: exceeds 500 LOC contract ({loc})') if not filename_re.match(file.name): violations.append(f'{rel}: filename must use snake_case for meaningful modularization') text = file.read_text(encoding='utf-8') header = '\n'.join(text.splitlines()[:20]) for marker in required_markers: if marker not in header: violations.append(f'{rel}: missing required module contract marker {marker}') if '#[test]' not in text and '#[tokio::test]' not in text: violations.append(f'{rel}: missing test entrypoints') return violations current = {} for path, loc in source_loc_counts().items(): current[path] = {'loc': loc} for path, count in clippy_counts(clippy_path).items(): current.setdefault(path, {})['clippy_warnings'] = count for path, count in doc_debt_counts(root).items(): current.setdefault(path, {})['doc_debt'] = count baseline = {'files': {}} if baseline_path.exists(): with baseline_path.open('r', encoding='utf-8') as fh: baseline = json.load(fh) baseline_files = baseline.get('files', {}) regressions = [] for path, current_entry in current.items(): baseline_entry = baseline_files.get(path) if baseline_entry is None: regressions.append(f'{path}: missing baseline entry') continue for key in ('loc', 'clippy_warnings', 'doc_debt'): current_value = int(current_entry.get(key, 0)) baseline_value = int(baseline_entry.get(key, 0)) if current_value > baseline_value: regressions.append( f'{path}: {key} grew from {baseline_value} to {current_value}' ) layout_violations = integration_layout_violations() testing_violations = testing_contract_violations() files = tracked_files() repo_violations = repo_policy_violations(files) naming_violations = naming_policy_violations(files) script_violations = script_policy_violations(files) env_violations = env_doc_violations(files) totals = { 'files': len(current), 'over_500': sum(1 for entry in current.values() if int(entry.get('loc', 0)) > 500), 'clippy_warnings': sum(int(entry.get('clippy_warnings', 0)) for entry in current.values()), 'doc_debt': sum(int(entry.get('doc_debt', 0)) for entry in current.values()), } lines = [] lines.append('hygiene gate report') lines.append(f"files tracked: {totals['files']}") lines.append(f"files over 500 LOC: {totals['over_500']}") lines.append(f"clippy warnings tracked: {totals['clippy_warnings']}") lines.append(f"non-trivial undocumented functions tracked: {totals['doc_debt']}") lines.append(f'legacy integration-test layout violations: {len(layout_violations)}') lines.append(f'testing module contract violations: {len(testing_violations)}') lines.append(f'repository policy violations: {len(repo_violations)}') lines.append(f'naming policy violations: {len(naming_violations)}') lines.append(f'script policy violations: {len(script_violations)}') lines.append(f'env documentation violations: {len(env_violations)}') lines.append('') lines.append('path | loc | clippy warnings | doc debt | baseline status') lines.append('-' * 78) for path in sorted(current): entry = current[path] baseline_entry = baseline_files.get(path) if baseline_entry is None: status = 'new' baseline_loc = 'n/a' baseline_clippy = 'n/a' baseline_doc = 'n/a' else: baseline_loc = str(baseline_entry.get('loc', 0)) baseline_clippy = str(baseline_entry.get('clippy_warnings', 0)) baseline_doc = str(baseline_entry.get('doc_debt', 0)) status = 'ok' if ( int(entry.get('loc', 0)) > int(baseline_entry.get('loc', 0)) or int(entry.get('clippy_warnings', 0)) > int(baseline_entry.get('clippy_warnings', 0)) or int(entry.get('doc_debt', 0)) > int(baseline_entry.get('doc_debt', 0)) ): status = 'regressed' lines.append( f"{path} | {entry.get('loc', 0)} | {entry.get('clippy_warnings', 0)} | {entry.get('doc_debt', 0)} | {baseline_loc}/{baseline_clippy}/{baseline_doc} | {status}" ) if layout_violations: lines.append('') lines.append('layout violations') lines.append('-' * 78) lines.extend(layout_violations) if testing_violations: lines.append('') lines.append('testing module contract violations') lines.append('-' * 78) lines.extend(testing_violations) policy_sections = [ ('repository policy violations', repo_violations), ('naming policy violations', naming_violations), ('script policy violations', script_violations), ('env documentation violations', env_violations), ] for title, violations in policy_sections: if violations: lines.append('') lines.append(title) lines.append('-' * 78) lines.extend(violations) summary_path.write_text('\n'.join(lines) + '\n', encoding='utf-8') print(summary_path.read_text(encoding='utf-8')) policy_violations = repo_violations + naming_violations + script_violations + env_violations failed = bool(regressions or layout_violations or testing_violations or policy_violations) labels = f'suite="lesavka",branch="{esc(branch)}",commit="{esc(commit)}"' ok_value = 0 if failed else 1 failed_value = 1 if failed else 0 metrics = [ '# HELP platform_quality_gate_checks_total Check outcomes from the latest lesavka gate run.', '# TYPE platform_quality_gate_checks_total gauge', f'platform_quality_gate_checks_total{{{labels},check="style",status="ok"}} {ok_value}', f'platform_quality_gate_checks_total{{{labels},check="style",status="failed"}} {failed_value}', f'platform_quality_gate_checks_total{{{labels},check="loc",status="ok"}} {ok_value}', f'platform_quality_gate_checks_total{{{labels},check="loc",status="failed"}} {failed_value}', ] metrics_path.write_text('\n'.join(metrics) + '\n', encoding='utf-8') if failed: for line in regressions: print(line, file=sys.stderr) for line in layout_violations: print(line, file=sys.stderr) for line in testing_violations: print(line, file=sys.stderr) for line in policy_violations: print(line, file=sys.stderr) raise SystemExit(1) PY