lesavka/scripts/ci/hygiene_gate.sh

511 lines
19 KiB
Bash
Executable File

#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)
REPORT_DIR="${ROOT_DIR}/target/hygiene-gate"
CLIPPY_JSON="${REPORT_DIR}/clippy.json"
SUMMARY_TXT="${REPORT_DIR}/summary.txt"
BASELINE_JSON="${ROOT_DIR}/scripts/ci/hygiene_gate_baseline.json"
METADATA_JSON="${REPORT_DIR}/cargo-metadata.json"
METRICS_FILE="${REPORT_DIR}/metrics.prom"
mkdir -p "${REPORT_DIR}"
cargo fmt --all -- --check
cargo check --workspace --all-targets
cargo metadata --locked --format-version 1 >"${METADATA_JSON}"
cargo clippy --workspace --all-targets --message-format json -- -D warnings >"${CLIPPY_JSON}"
branch=${BRANCH_NAME:-${GIT_BRANCH:-}}
if [[ -z "${branch}" ]]; then
branch=$(git -C "${ROOT_DIR}" rev-parse --abbrev-ref HEAD 2>/dev/null || echo unknown)
fi
commit=${GIT_COMMIT:-}
if [[ -z "${commit}" ]]; then
commit=$(git -C "${ROOT_DIR}" rev-parse --short HEAD 2>/dev/null || echo unknown)
fi
python3 - "${CLIPPY_JSON}" "${BASELINE_JSON}" "${SUMMARY_TXT}" "${ROOT_DIR}" "${METRICS_FILE}" "${branch}" "${commit}" <<'PY'
import json
import os
import pathlib
import re
import stat
import subprocess
import sys
from collections import defaultdict
clippy_path = pathlib.Path(sys.argv[1])
baseline_path = pathlib.Path(sys.argv[2])
summary_path = pathlib.Path(sys.argv[3])
root = pathlib.Path(sys.argv[4])
metrics_path = pathlib.Path(sys.argv[5])
branch = sys.argv[6]
commit = sys.argv[7]
fn_re = re.compile(r'^\s*(?:pub(?:\([^)]+\))?\s+)?(?:async\s+)?(?:unsafe\s+)?fn\s+\w+')
env_re = re.compile(r'LESAVKA_[A-Z0-9_]+')
lazy_name_tokens = {'part', 'piece', 'chunk', 'misc', 'stuff', 'helpers2', 'new', 'old', 'tmp'}
expected_workspace_members = {'common', 'client', 'server', 'testing'}
required_binary_paths = {
'lesavka-client': 'client/Cargo.toml',
'lesavka-server': 'server/Cargo.toml',
'lesavka-uvc': 'server/Cargo.toml',
'lesavka-relayctl': 'client/src/bin/lesavka-relayctl.rs',
}
def load_json_lines(path: pathlib.Path):
for raw in path.read_text(encoding='utf-8').splitlines():
raw = raw.strip()
if not raw:
continue
try:
yield json.loads(raw)
except json.JSONDecodeError:
continue
def repo_relative(path: str) -> str | None:
try:
return pathlib.Path(path).resolve().relative_to(root).as_posix()
except Exception:
return None
def run_git(*args: str) -> list[str]:
proc = subprocess.run(
['git', '-C', str(root), *args],
check=True,
text=True,
capture_output=True,
)
return [line for line in proc.stdout.splitlines() if line]
def repo_files() -> list[str]:
tracked = run_git('ls-files')
untracked = run_git('ls-files', '--others', '--exclude-standard')
return sorted(set(tracked + untracked))
def is_test_path(rel: str) -> bool:
return 'tests' in pathlib.Path(rel).parts
def parse_workspace_members() -> set[str]:
text = (root / 'Cargo.toml').read_text(encoding='utf-8')
match = re.search(r'members\s*=\s*\[(?P<body>.*?)\]', text, re.S)
if not match:
return set()
return set(re.findall(r'"([^"]+)"', match.group('body')))
def repo_policy_violations(files: list[str]) -> list[str]:
violations: list[str] = []
tracked = set(files)
if 'Cargo.lock' not in tracked:
violations.append('Cargo.lock: must be committed for reproducible Rust builds')
members = parse_workspace_members()
if members != expected_workspace_members:
violations.append(
f'Cargo.toml: workspace members must be explicit {sorted(expected_workspace_members)}, found {sorted(members)}'
)
generated_patterns = (
re.compile(r'(^|/)target/'),
re.compile(r'(^|/)dist/'),
re.compile(r'(^|/)logs/'),
re.compile(r'(^|/)coverage/'),
re.compile(r'(^|/)captures/'),
re.compile(r'\.(log|h264|aac|wav|rgba)$'),
)
for path in files:
if pathlib.Path(path).name == 'AGENTS.md':
violations.append(f'{path}: local AGENTS notes must not be committed')
if any(pattern.search(path) for pattern in generated_patterns):
violations.append(f'{path}: generated/build/runtime artifact must not be committed')
for name, marker in required_binary_paths.items():
marker_path = root / marker
if marker.endswith('.rs'):
if not marker_path.exists():
violations.append(f'{name}: stable public binary source {marker} is missing')
elif name not in marker_path.read_text(encoding='utf-8'):
violations.append(f'{name}: stable public binary name missing from {marker}')
return violations
def naming_policy_violations(files: list[str]) -> list[str]:
violations: list[str] = []
for path in files:
if path.startswith('.git/') or path.startswith('target/'):
continue
stem = pathlib.Path(path).stem.lower()
tokens = [token for token in re.split(r'[^a-z0-9]+', stem) if token]
for token in tokens:
if token in lazy_name_tokens:
violations.append(f'{path}: lazy split token "{token}" is not allowed in filenames')
if path.endswith('.rs'):
rel = pathlib.Path(path)
if len(rel.parts) >= 2 and rel.parts[-2] == 'bin' and rel.stem.startswith('lesavka-'):
continue
if not re.match(r'^[a-z0-9_]+$', rel.stem):
violations.append(f'{path}: Rust filenames must use meaningful snake_case')
return violations
def script_policy_violations(files: list[str]) -> list[str]:
violations: list[str] = []
ci_text_parts: list[str] = []
ci_paths = [root / 'Jenkinsfile', *sorted((root / 'scripts' / 'ci').glob('*.sh'))]
for path in ci_paths:
if path.exists():
ci_text_parts.append(path.read_text(encoding='utf-8', errors='replace'))
ci_text = '\n'.join(ci_text_parts)
if re.search(r'(?:^|\s)(?:sh\s+)?scripts/manual/', ci_text):
violations.append('scripts/manual: manual probes must not be required by CI')
for file in sorted((root / 'scripts').rglob('*')):
if not file.is_file():
continue
rel = repo_relative(str(file))
if rel is None:
continue
text = file.read_text(encoding='utf-8', errors='replace')
lines = text.splitlines()
first = lines[0] if lines else ''
if first.startswith('#!'):
mode = file.stat().st_mode
if not mode & stat.S_IXUSR:
violations.append(f'{rel}: shebang script must be executable')
header = '\n'.join(lines[:25])
if 'bash' in first and 'set -euo pipefail' not in header:
violations.append(f'{rel}: bash scripts must use set -euo pipefail where safe')
if rel.startswith('scripts/manual/') and rel.endswith('.sh'):
header = '\n'.join(lines[:12]).lower()
if 'manual:' not in header or 'not part of ci' not in header:
violations.append(f'{rel}: manual scripts must be clearly marked manual and outside CI')
return violations
def env_doc_violations(files: list[str]) -> list[str]:
docs_path = root / 'docs' / 'operational-env.md'
if not docs_path.exists():
return ['docs/operational-env.md: missing env-var inventory']
docs_text = docs_path.read_text(encoding='utf-8')
found: set[str] = set()
scan_prefixes = ('client/', 'common/', 'server/', 'testing/', 'scripts/')
scan_files = [
path for path in files
if path == 'Jenkinsfile' or path.endswith('.toml') or path.startswith(scan_prefixes)
]
for path in scan_files:
full = root / path
if not full.exists() or full.is_dir():
continue
text = full.read_text(encoding='utf-8', errors='replace')
found.update(env_re.findall(text))
return [
f'{var}: LESAVKA env var is used but missing from docs/operational-env.md'
for var in sorted(found)
if var not in docs_text
]
def esc(value: str) -> str:
return value.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')
def clippy_counts(path: pathlib.Path) -> dict[str, int]:
counts: dict[str, int] = defaultdict(int)
for item in load_json_lines(path):
if item.get('reason') != 'compiler-message':
continue
message = item.get('message', {})
if message.get('level') != 'warning':
continue
spans = message.get('spans') or []
primary = next((span for span in spans if span.get('is_primary')), None)
if primary is None:
primary = spans[0] if spans else None
if not primary:
continue
rel = repo_relative(primary.get('file_name', ''))
if rel is None or '/src/' not in rel or '/target/' in rel:
continue
if is_test_path(rel):
continue
counts[rel] += 1
return dict(sorted(counts.items()))
def function_blocks(lines: list[str]):
index = 0
while index < len(lines):
if not fn_re.match(lines[index]):
index += 1
continue
start = index
doc_ok = False
prev = index - 1
while prev >= 0 and (
not lines[prev].strip() or lines[prev].lstrip().startswith('#[')
):
prev -= 1
if prev >= 0:
stripped = lines[prev].lstrip()
doc_ok = stripped.startswith('///') or stripped.startswith('#[doc =')
brace_depth = 0
seen_open = False
body_lines = 0
j = index
while j < len(lines):
text = lines[j]
brace_depth += text.count('{') - text.count('}')
if '{' in text:
seen_open = True
if seen_open and text.strip():
body_lines += 1
if seen_open and brace_depth <= 0:
break
j += 1
block_text = '\n'.join(lines[start:j + 1])
non_trivial = body_lines >= 12 or any(token in block_text for token in (' if ', ' match ', ' for ', ' while ', ' loop ', '?.'))
yield start + 1, j + 1, doc_ok, non_trivial
index = j + 1
def doc_debt_counts(path: pathlib.Path) -> dict[str, int]:
counts: dict[str, int] = defaultdict(int)
for file in sorted(root.rglob('*.rs')):
rel = repo_relative(str(file))
if rel is None or '/src/' not in rel or '/target/' in rel:
continue
if is_test_path(rel):
continue
lines = file.read_text(encoding='utf-8').splitlines()
debt = 0
for _, _, doc_ok, non_trivial in function_blocks(lines):
if non_trivial and not doc_ok:
debt += 1
counts[rel] = debt
return dict(sorted(counts.items()))
def source_loc_counts() -> dict[str, int]:
counts: dict[str, int] = {}
for file in sorted(root.rglob('*.rs')):
rel = repo_relative(str(file))
if rel is None or '/src/' not in rel or '/target/' in rel:
continue
if is_test_path(rel):
continue
counts[rel] = sum(1 for _ in file.open('r', encoding='utf-8'))
return dict(sorted(counts.items()))
def integration_layout_violations() -> list[str]:
violations: list[str] = []
for file in sorted(root.rglob('*.rs')):
rel = repo_relative(str(file))
if rel is None or rel.startswith('target/') or rel.startswith('testing/'):
continue
parts = pathlib.Path(rel).parts
if len(parts) >= 2 and parts[1] == 'tests':
violations.append(
f'{rel}: integration tests must live under testing/tests/ instead of package-local tests/'
)
return violations
def testing_contract_violations() -> list[str]:
violations: list[str] = []
contract_dir = root / 'testing' / 'tests'
if not contract_dir.exists():
return ['testing/tests: missing dedicated top-level integration test directory']
test_files = sorted(contract_dir.rglob('*.rs'))
if not test_files:
return ['testing/tests: no integration test files found']
filename_re = re.compile(r'^[a-z0-9_]+\.rs$')
required_markers = ('Scope:', 'Targets:', 'Why:')
for file in test_files:
rel = repo_relative(str(file))
if rel is None:
continue
loc = sum(1 for _ in file.open('r', encoding='utf-8'))
if loc > 500:
violations.append(f'{rel}: exceeds 500 LOC contract ({loc})')
if not filename_re.match(file.name):
violations.append(f'{rel}: filename must use snake_case for meaningful modularization')
text = file.read_text(encoding='utf-8')
header = '\n'.join(text.splitlines()[:20])
for marker in required_markers:
if marker not in header:
violations.append(f'{rel}: missing required module contract marker {marker}')
if '#[test]' not in text and '#[tokio::test]' not in text:
violations.append(f'{rel}: missing test entrypoints')
return violations
current = {}
for path, loc in source_loc_counts().items():
current[path] = {'loc': loc}
for path, count in clippy_counts(clippy_path).items():
current.setdefault(path, {})['clippy_warnings'] = count
for path, count in doc_debt_counts(root).items():
current.setdefault(path, {})['doc_debt'] = count
baseline = {'files': {}}
if baseline_path.exists():
with baseline_path.open('r', encoding='utf-8') as fh:
baseline = json.load(fh)
baseline_files = baseline.get('files', {})
style_regressions = []
loc_regressions = []
for path, current_entry in current.items():
baseline_entry = baseline_files.get(path)
if baseline_entry is None:
loc_regressions.append(f'{path}: missing baseline entry')
continue
for key in ('loc', 'clippy_warnings', 'doc_debt'):
current_value = int(current_entry.get(key, 0))
baseline_value = int(baseline_entry.get(key, 0))
if current_value > baseline_value:
message = f'{path}: {key} grew from {baseline_value} to {current_value}'
if key == 'loc':
loc_regressions.append(message)
else:
style_regressions.append(message)
layout_violations = integration_layout_violations()
testing_violations = testing_contract_violations()
files = repo_files()
repo_violations = repo_policy_violations(files)
naming_violations = naming_policy_violations(files)
script_violations = script_policy_violations(files)
env_violations = env_doc_violations(files)
loc_policy_violations = [
f'{path}: exceeds 500 LOC hard limit ({entry["loc"]})'
for path, entry in sorted(current.items())
if int(entry.get('loc', 0)) > 500
]
totals = {
'files': len(current),
'over_500': sum(1 for entry in current.values() if int(entry.get('loc', 0)) > 500),
'clippy_warnings': sum(int(entry.get('clippy_warnings', 0)) for entry in current.values()),
'doc_debt': sum(int(entry.get('doc_debt', 0)) for entry in current.values()),
}
style_docs_failed = bool(style_regressions or repo_violations or script_violations or env_violations)
loc_naming_failed = bool(
loc_regressions
or layout_violations
or testing_violations
or naming_violations
or loc_policy_violations
)
lines = []
lines.append('hygiene gate report')
lines.append('stage order: style/docs -> LOC/naming')
lines.append(f'style/docs stage: {"failed" if style_docs_failed else "ok"}')
lines.append(f'LOC/naming stage: {"failed" if loc_naming_failed else "ok"}')
lines.append(f"files tracked: {totals['files']}")
lines.append(f"files over 500 LOC: {totals['over_500']}")
lines.append(f"clippy warnings tracked: {totals['clippy_warnings']}")
lines.append(f"non-trivial undocumented functions tracked: {totals['doc_debt']}")
lines.append(f'legacy integration-test layout violations: {len(layout_violations)}')
lines.append(f'testing module contract violations: {len(testing_violations)}')
lines.append(f'repository policy violations: {len(repo_violations)}')
lines.append(f'naming policy violations: {len(naming_violations)}')
lines.append(f'script policy violations: {len(script_violations)}')
lines.append(f'env documentation violations: {len(env_violations)}')
lines.append(f'LOC hard-limit violations: {len(loc_policy_violations)}')
lines.append('')
lines.append('path | loc | clippy warnings | doc debt | baseline status')
lines.append('-' * 78)
for path in sorted(current):
entry = current[path]
baseline_entry = baseline_files.get(path)
if baseline_entry is None:
status = 'new'
baseline_loc = 'n/a'
baseline_clippy = 'n/a'
baseline_doc = 'n/a'
else:
baseline_loc = str(baseline_entry.get('loc', 0))
baseline_clippy = str(baseline_entry.get('clippy_warnings', 0))
baseline_doc = str(baseline_entry.get('doc_debt', 0))
status = 'ok'
if (
int(entry.get('loc', 0)) > int(baseline_entry.get('loc', 0))
or int(entry.get('clippy_warnings', 0)) > int(baseline_entry.get('clippy_warnings', 0))
or int(entry.get('doc_debt', 0)) > int(baseline_entry.get('doc_debt', 0))
):
status = 'regressed'
lines.append(
f"{path} | {entry.get('loc', 0)} | {entry.get('clippy_warnings', 0)} | {entry.get('doc_debt', 0)} | {baseline_loc}/{baseline_clippy}/{baseline_doc} | {status}"
)
if layout_violations:
lines.append('')
lines.append('layout violations')
lines.append('-' * 78)
lines.extend(layout_violations)
if testing_violations:
lines.append('')
lines.append('testing module contract violations')
lines.append('-' * 78)
lines.extend(testing_violations)
policy_sections = [
('repository policy violations', repo_violations),
('naming policy violations', naming_violations),
('script policy violations', script_violations),
('env documentation violations', env_violations),
('LOC hard-limit violations', loc_policy_violations),
]
for title, violations in policy_sections:
if violations:
lines.append('')
lines.append(title)
lines.append('-' * 78)
lines.extend(violations)
summary_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
print(summary_path.read_text(encoding='utf-8'))
policy_violations = (
repo_violations
+ naming_violations
+ script_violations
+ env_violations
+ loc_policy_violations
)
failed = bool(style_docs_failed or loc_naming_failed)
labels = f'suite="lesavka",branch="{esc(branch)}",commit="{esc(commit)}"'
style_ok_value = 0 if style_docs_failed else 1
style_failed_value = 1 if style_docs_failed else 0
loc_ok_value = 0 if loc_naming_failed else 1
loc_failed_value = 1 if loc_naming_failed else 0
metrics = [
'# HELP platform_quality_gate_checks_total Check outcomes from the latest lesavka gate run.',
'# TYPE platform_quality_gate_checks_total gauge',
f'platform_quality_gate_checks_total{{{labels},check="style",status="ok"}} {style_ok_value}',
f'platform_quality_gate_checks_total{{{labels},check="style",status="failed"}} {style_failed_value}',
f'platform_quality_gate_checks_total{{{labels},check="loc",status="ok"}} {loc_ok_value}',
f'platform_quality_gate_checks_total{{{labels},check="loc",status="failed"}} {loc_failed_value}',
]
metrics_path.write_text('\n'.join(metrics) + '\n', encoding='utf-8')
if failed:
for line in style_regressions:
print(line, file=sys.stderr)
for line in loc_regressions:
print(line, file=sys.stderr)
for line in layout_violations:
print(line, file=sys.stderr)
for line in testing_violations:
print(line, file=sys.stderr)
for line in policy_violations:
print(line, file=sys.stderr)
raise SystemExit(1)
PY