testing(ci): centralize quality gate contract

This commit is contained in:
Brad Stein 2026-04-10 17:06:53 -03:00
parent 9f088577b1
commit 370ece5b60
18 changed files with 1100 additions and 22 deletions

19
Jenkinsfile vendored
View File

@ -37,16 +37,16 @@ spec:
sh 'pip install --no-cache-dir -r ci/requirements.txt' sh 'pip install --no-cache-dir -r ci/requirements.txt'
} }
} }
stage('Glue tests') { stage('Run quality gate') {
steps { steps {
sh ''' sh '''
set -eu set -eu
mkdir -p build mkdir -p build
set +e set +e
pytest -q ci/tests/glue --junitxml=build/junit-glue.xml python3 -m testing.quality_gate --profile jenkins --build-dir build
glue_rc=$? quality_gate_rc=$?
set -e set -e
printf '%s\n' "${glue_rc}" > build/glue.rc printf '%s\n' "${quality_gate_rc}" > build/quality-gate.rc
''' '''
} }
} }
@ -54,15 +54,18 @@ spec:
steps { steps {
sh ''' sh '''
set -eu set -eu
export JUNIT_GLOB='build/junit-*.xml'
export QUALITY_GATE_EXIT_CODE_PATH='build/quality-gate.rc'
export QUALITY_GATE_SUMMARY_PATH='build/quality-gate-summary.json'
python3 ci/scripts/publish_test_metrics.py python3 ci/scripts/publish_test_metrics.py
''' '''
} }
} }
stage('Enforce glue tests') { stage('Enforce quality gate') {
steps { steps {
sh ''' sh '''
set -eu set -eu
test "$(cat build/glue.rc 2>/dev/null || echo 1)" -eq 0 test "$(cat build/quality-gate.rc 2>/dev/null || echo 1)" -eq 0
''' '''
} }
} }
@ -103,9 +106,9 @@ spec:
post { post {
always { always {
script { script {
if (fileExists('build/junit-glue.xml')) { if (fileExists('build/junit-unit.xml') || fileExists('build/junit-glue.xml')) {
try { try {
junit allowEmptyResults: true, testResults: 'build/junit-glue.xml' junit allowEmptyResults: true, testResults: 'build/junit-*.xml'
} catch (Throwable err) { } catch (Throwable err) {
echo "junit step unavailable: ${err.class.simpleName}" echo "junit step unavailable: ${err.class.simpleName}"
} }

View File

@ -36,16 +36,16 @@ spec:
sh 'pip install --no-cache-dir -r ci/requirements.txt' sh 'pip install --no-cache-dir -r ci/requirements.txt'
} }
} }
stage('Glue tests') { stage('Run quality gate') {
steps { steps {
sh ''' sh '''
set -eu set -eu
mkdir -p build mkdir -p build
set +e set +e
pytest -q ci/tests/glue --junitxml=build/junit-glue.xml python3 -m testing.quality_gate --profile jenkins --build-dir build
glue_rc=$? quality_gate_rc=$?
set -e set -e
printf '%s\n' "${glue_rc}" > build/glue.rc printf '%s\n' "${quality_gate_rc}" > build/quality-gate.rc
''' '''
} }
} }
@ -53,15 +53,18 @@ spec:
steps { steps {
sh ''' sh '''
set -eu set -eu
export JUNIT_GLOB='build/junit-*.xml'
export QUALITY_GATE_EXIT_CODE_PATH='build/quality-gate.rc'
export QUALITY_GATE_SUMMARY_PATH='build/quality-gate-summary.json'
python3 ci/scripts/publish_test_metrics.py python3 ci/scripts/publish_test_metrics.py
''' '''
} }
} }
stage('Enforce glue tests') { stage('Enforce quality gate') {
steps { steps {
sh ''' sh '''
set -eu set -eu
test "$(cat build/glue.rc 2>/dev/null || echo 1)" -eq 0 test "$(cat build/quality-gate.rc 2>/dev/null || echo 1)" -eq 0
''' '''
} }
} }
@ -102,9 +105,9 @@ spec:
post { post {
always { always {
script { script {
if (fileExists('build/junit-glue.xml')) { if (fileExists('build/junit-unit.xml') || fileExists('build/junit-glue.xml')) {
try { try {
junit allowEmptyResults: true, testResults: 'build/junit-glue.xml' junit allowEmptyResults: true, testResults: 'build/junit-*.xml'
} catch (Throwable err) { } catch (Throwable err) {
echo "junit step unavailable: ${err.class.simpleName}" echo "junit step unavailable: ${err.class.simpleName}"
} }

View File

@ -1,4 +1,7 @@
pytest==8.3.4 pytest==8.3.4
pytest-cov==6.0.0
coverage==7.6.10
kubernetes==30.1.0 kubernetes==30.1.0
PyYAML==6.0.2 PyYAML==6.0.2
requests==2.32.3 requests==2.32.3
ruff==0.8.4

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Publish titan-iac Jenkins glue test results to Pushgateway.""" """Publish titan-iac quality-gate results to Pushgateway."""
from __future__ import annotations from __future__ import annotations
import json import json
import os import os
import re from glob import glob
import urllib.error import urllib.error
import urllib.request import urllib.request
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -63,6 +63,15 @@ def _parse_junit(path: str) -> dict[str, int]:
return totals return totals
def _collect_junit_totals(pattern: str) -> dict[str, int]:
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
for path in sorted(glob(pattern)):
parsed = _parse_junit(path)
for key in totals:
totals[key] += parsed[key]
return totals
def _read_exit_code(path: str) -> int: def _read_exit_code(path: str) -> int:
try: try:
with open(path, "r", encoding="utf-8") as handle: with open(path, "r", encoding="utf-8") as handle:
@ -71,6 +80,14 @@ def _read_exit_code(path: str) -> int:
return 1 return 1
def _load_summary(path: str) -> dict:
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")
for line in text.splitlines(): for line in text.splitlines():
@ -96,6 +113,7 @@ def _build_payload(
failed_count: int, failed_count: int,
branch: str, branch: str,
build_number: str, build_number: str,
summary: dict | None = None,
) -> str: ) -> str:
passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0)
build_labels = _label_str( build_labels = _label_str(
@ -120,6 +138,17 @@ def _build_payload(
"# TYPE titan_iac_quality_gate_build_info gauge", "# TYPE titan_iac_quality_gate_build_info gauge",
f"titan_iac_quality_gate_build_info{build_labels} 1", f"titan_iac_quality_gate_build_info{build_labels} 1",
] ]
results = summary.get("results", []) if isinstance(summary, dict) else []
if results:
lines.append("# TYPE titan_iac_quality_gate_checks_total gauge")
for result in results:
check_name = result.get("name")
check_status = result.get("status")
if not check_name or not check_status:
continue
lines.append(
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1'
)
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
@ -127,14 +156,16 @@ def main() -> int:
suite = os.getenv("SUITE_NAME", "titan-iac") suite = os.getenv("SUITE_NAME", "titan-iac")
pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091") pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091")
job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci") job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci")
junit_path = os.getenv("JUNIT_PATH", "build/junit-glue.xml") junit_glob = os.getenv("JUNIT_GLOB", os.getenv("JUNIT_PATH", "build/junit-*.xml"))
exit_code_path = os.getenv("GLUE_EXIT_CODE_PATH", "build/glue.rc") exit_code_path = os.getenv("QUALITY_GATE_EXIT_CODE_PATH", os.getenv("GLUE_EXIT_CODE_PATH", "build/quality-gate.rc"))
summary_path = os.getenv("QUALITY_GATE_SUMMARY_PATH", "build/quality-gate-summary.json")
branch = os.getenv("BRANCH_NAME", os.getenv("GIT_BRANCH", "")) branch = os.getenv("BRANCH_NAME", os.getenv("GIT_BRANCH", ""))
build_number = os.getenv("BUILD_NUMBER", "") build_number = os.getenv("BUILD_NUMBER", "")
tests = _parse_junit(junit_path) tests = _collect_junit_totals(junit_glob)
exit_code = _read_exit_code(exit_code_path) exit_code = _read_exit_code(exit_code_path)
status = "ok" if exit_code == 0 else "failed" status = "ok" if exit_code == 0 else "failed"
summary = _load_summary(summary_path)
ok_count = int( ok_count = int(
_fetch_existing_counter( _fetch_existing_counter(
@ -163,6 +194,7 @@ def main() -> int:
failed_count=failed_count, failed_count=failed_count,
branch=branch, branch=branch,
build_number=build_number, build_number=build_number,
summary=summary,
) )
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
_post_text(push_url, payload) _post_text(push_url, payload)
@ -176,6 +208,7 @@ def main() -> int:
"tests_skipped": tests["skipped"], "tests_skipped": tests["skipped"],
"ok_count": ok_count, "ok_count": ok_count,
"failed_count": failed_count, "failed_count": failed_count,
"checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0,
} }
print(json.dumps(summary, sort_keys=True)) print(json.dumps(summary, sort_keys=True))
return 0 return 0

View File

@ -1,6 +1,7 @@
max_success_age_hours: 48 max_success_age_hours: 48
allow_suspended: allow_suspended:
- bstein-dev-home/vaultwarden-cred-sync - bstein-dev-home/vaultwarden-cred-sync
- comms/guest-name-randomizer
- comms/othrys-room-reset - comms/othrys-room-reset
- comms/pin-othrys-invite - comms/pin-othrys-invite
- comms/seed-othrys-room - comms/seed-othrys-room
@ -9,6 +10,7 @@ allow_suspended:
- health/wger-user-sync - health/wger-user-sync
- mailu-mailserver/mailu-sync-nightly - mailu-mailserver/mailu-sync-nightly
- nextcloud/nextcloud-mail-sync - nextcloud/nextcloud-mail-sync
- vault/vault-oidc-config
ariadne_schedule_tasks: ariadne_schedule_tasks:
- schedule.mailu_sync - schedule.mailu_sync
- schedule.nextcloud_sync - schedule.nextcloud_sync

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
addopts = -ra
norecursedirs = .git .venv .venv-ci __pycache__ tmp

View File

@ -1,5 +1,7 @@
import importlib.util import importlib.util
import pathlib import pathlib
import sys
import types
import pytest import pytest
@ -20,6 +22,26 @@ def load_sync_module(monkeypatch):
} }
for k, v in env.items(): for k, v in env.items():
monkeypatch.setenv(k, v) monkeypatch.setenv(k, v)
fake_psycopg2 = types.ModuleType("psycopg2")
fake_psycopg2.Error = Exception
fake_psycopg2.connect = lambda **kwargs: None
fake_psycopg2_extras = types.ModuleType("psycopg2.extras")
fake_psycopg2_extras.RealDictCursor = object
fake_passlib = types.ModuleType("passlib")
fake_passlib_hash = types.ModuleType("passlib.hash")
class _FakeBcryptSha256:
@staticmethod
def hash(password):
return f"stub:{password}"
fake_passlib_hash.bcrypt_sha256 = _FakeBcryptSha256
fake_passlib.hash = fake_passlib_hash
monkeypatch.setitem(sys.modules, "psycopg2", fake_psycopg2)
monkeypatch.setitem(sys.modules, "psycopg2.extras", fake_psycopg2_extras)
monkeypatch.setitem(sys.modules, "passlib", fake_passlib)
monkeypatch.setitem(sys.modules, "passlib.hash", fake_passlib_hash)
module_path = ( module_path = (
pathlib.Path(__file__).resolve().parents[2] pathlib.Path(__file__).resolve().parents[2]
/ "services" / "services"

View File

@ -7,7 +7,6 @@ Sync Keycloak users to Mailu mailboxes.
import os import os
import sys import sys
import json
import time import time
import secrets import secrets
import string import string

1
testing/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Top-level testing contract and quality-gate tooling for titan-iac."""

View File

@ -0,0 +1,164 @@
{
"required_docs": [
{
"path": "README.md",
"description": "Top-level repository handbook."
},
{
"path": "AGENTS.md",
"description": "Shared repository operating instructions."
},
{
"path": "Jenkinsfile",
"description": "Top-level Jenkins mirror for multibranch discovery."
},
{
"path": "ci/Jenkinsfile.titan-iac",
"description": "Canonical titan-iac Jenkins pipeline definition."
}
],
"managed_modules": [
"ci/scripts/publish_test_metrics.py",
"services/mailu/scripts/mailu_sync.py",
"testing/__init__.py",
"testing/quality_contract.py",
"testing/quality_docs.py",
"testing/quality_hygiene.py",
"testing/quality_coverage.py",
"testing/quality_gate.py"
],
"lint_paths": [
"ci/scripts/publish_test_metrics.py",
"ci/tests/glue",
"scripts/tests",
"services/comms/scripts/tests",
"services/mailu/scripts/mailu_sync.py",
"testing"
],
"pytest_suites": {
"unit": {
"description": "Fast unit and contract tests for repo automation.",
"paths": [
"scripts/tests",
"services/comms/scripts/tests",
"testing/tests"
],
"junit": "build/junit-unit.xml",
"coverage_sources": [
"ci/scripts",
"services/mailu/scripts",
"testing"
],
"coverage_xml": "build/coverage-unit.xml"
},
"glue": {
"description": "Cluster-live glue checks that validate CronJobs and exported metrics.",
"paths": [
"ci/tests/glue"
],
"junit": "build/junit-glue.xml"
}
},
"profiles": {
"local": [
"docs",
"smell",
"hygiene",
"unit",
"coverage"
],
"jenkins": [
"docs",
"smell",
"hygiene",
"unit",
"coverage",
"glue"
]
},
"manual_scripts": [
{
"path": "scripts/test_atlas_user_cleanup.py",
"description": "Manual cleanup validation for Atlas user lifecycle automation."
},
{
"path": "scripts/test_user_cleanup.py",
"description": "Manual cleanup validation for shared user lifecycle automation."
},
{
"path": "scripts/test_vaultwarden_user_cleanup.py",
"description": "Manual cleanup validation for Vaultwarden user lifecycle automation."
},
{
"path": "services/bstein-dev-home/scripts/test_portal_onboarding_flow.py",
"description": "Portal onboarding end-to-end flow validation with mail delivery checks."
},
{
"path": "services/keycloak/scripts/tests/test_keycloak_execute_actions_email.py",
"description": "Standalone Keycloak SMTP execute-actions-email validation script."
},
{
"path": "services/keycloak/scripts/tests/test_portal_token_exchange.py",
"description": "Standalone Keycloak token-exchange validation script."
}
],
"hygiene": {
"max_lines": 500,
"line_limit_globs": [
"testing/**/*.py",
"ci/scripts/*.py",
"ci/tests/**/*.py",
"scripts/tests/**/*.py",
"services/*/scripts/tests/**/*.py",
"services/mailu/scripts/mailu_sync.py"
],
"naming_rules": [
{
"glob": "testing/*.py",
"pattern": "^(?:__init__|quality_[a-z0-9_]+)\\.py$",
"description": "Top-level testing helpers use quality_* module names."
},
{
"glob": "testing/tests/*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "Top-level pytest files use test_*.py names."
},
{
"glob": "ci/tests/**/*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "CI pytest files use test_*.py names."
},
{
"glob": "scripts/tests/**/*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "Script pytest files use test_*.py names."
},
{
"glob": "scripts/test_*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "Standalone script tests use test_*.py names."
},
{
"glob": "services/*/scripts/tests/**/*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "Service pytest files use test_*.py names."
},
{
"glob": "services/*/scripts/test_*.py",
"pattern": "^test_[a-z0-9_]+\\.py$",
"description": "Standalone service test scripts use test_*.py names."
}
]
},
"coverage": {
"minimum_percent": 95.0,
"tracked_files": [
"ci/scripts/publish_test_metrics.py",
"testing/quality_contract.py",
"testing/quality_docs.py",
"testing/quality_hygiene.py",
"testing/quality_coverage.py",
"testing/quality_gate.py"
]
}
}

View File

@ -0,0 +1,17 @@
"""Helpers for loading the repository testing contract."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
CONTRACT_PATH = Path(__file__).with_name("quality_contract.json")
def load_contract(contract_path: Path | None = None) -> dict[str, Any]:
"""Return the parsed testing contract."""
path = contract_path or CONTRACT_PATH
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)

View File

@ -0,0 +1,58 @@
"""Per-file coverage threshold validation for quality-managed modules."""
from __future__ import annotations
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
tree = ET.parse(xml_path)
xml_root = tree.getroot()
source_roots = [
Path(node.text)
for node in xml_root.findall("./sources/source")
if node.text
]
percentages: dict[str, float] = {}
for class_node in xml_root.findall(".//class"):
filename = class_node.attrib.get("filename")
line_rate = class_node.attrib.get("line-rate")
if not filename or line_rate is None:
continue
normalized = filename.replace("\\", "/")
if normalized.startswith("/"):
key = Path(normalized).relative_to(root).as_posix()
else:
key = normalized
for source_root in source_roots:
candidate = source_root / filename
if candidate.exists():
key = candidate.relative_to(root).as_posix()
break
percentages[key] = float(line_rate) * 100.0
return percentages
def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]:
"""Return human-readable issues for tracked files below the coverage floor."""
if not xml_path.exists():
return [f"coverage xml missing: {xml_path.relative_to(root)}"]
percentages = _load_percentages(xml_path, root)
minimum = float(contract.get("coverage", {}).get("minimum_percent", 95.0))
issues: list[str] = []
for relative_path in contract.get("coverage", {}).get("tracked_files", []):
normalized = relative_path.replace("\\", "/")
percent = percentages.get(normalized)
if percent is None:
issues.append(f"coverage missing for tracked file: {relative_path}")
continue
if percent + 1e-9 < minimum:
issues.append(
f"coverage below {minimum:.1f}%: {relative_path} ({percent:.1f}%)"
)
return issues

59
testing/quality_docs.py Normal file
View File

@ -0,0 +1,59 @@
"""Documentation-oriented validation for the testing contract."""
from __future__ import annotations
import ast
from pathlib import Path
from typing import Any
def _module_has_docstring(path: Path) -> bool:
source = path.read_text(encoding="utf-8")
return ast.get_docstring(ast.parse(source)) is not None
def _iter_contract_paths(contract: dict[str, Any]) -> list[str]:
paths: list[str] = []
for item in contract.get("required_docs", []):
paths.append(item["path"])
paths.extend(contract.get("managed_modules", []))
paths.extend(contract.get("lint_paths", []))
for suite in contract.get("pytest_suites", {}).values():
paths.extend(suite.get("paths", []))
for item in contract.get("manual_scripts", []):
paths.append(item["path"])
return paths
def run_check(contract: dict[str, Any], root: Path) -> list[str]:
"""Return human-readable issues for contract/documentation violations."""
issues: list[str] = []
for item in contract.get("required_docs", []):
path = root / item["path"]
if not path.exists():
issues.append(f"required doc missing: {item['path']}")
continue
if path.is_file() and not path.read_text(encoding="utf-8").strip():
issues.append(f"required doc empty: {item['path']}")
if not item.get("description", "").strip():
issues.append(f"required doc missing description: {item['path']}")
for relative_path in sorted(set(_iter_contract_paths(contract))):
if not (root / relative_path).exists():
issues.append(f"contract path missing: {relative_path}")
for suite_name, suite in contract.get("pytest_suites", {}).items():
if not suite.get("description", "").strip():
issues.append(f"pytest suite missing description: {suite_name}")
for item in contract.get("manual_scripts", []):
if not item.get("description", "").strip():
issues.append(f"manual script missing description: {item['path']}")
for relative_path in contract.get("managed_modules", []):
path = root / relative_path
if path.exists() and path.suffix == ".py" and not _module_has_docstring(path):
issues.append(f"module docstring missing: {relative_path}")
return issues

175
testing/quality_gate.py Normal file
View File

@ -0,0 +1,175 @@
"""Source-of-truth quality-gate runner for titan-iac."""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
from testing.quality_contract import load_contract
from testing.quality_coverage import run_check as run_coverage_check
from testing.quality_docs import run_check as run_docs_check
from testing.quality_hygiene import run_check as run_hygiene_check
RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
RUFF_IGNORE = ["B017", "UP015", "UP035"]
def _status_from_issues(issues: list[str]) -> str:
return "ok" if not issues else "failed"
def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]:
return {"name": name, "description": description, "status": status, **extra}
def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
command = [
sys.executable,
"-m",
"ruff",
"check",
"--select",
",".join(RUFF_SELECT),
"--ignore",
",".join(RUFF_IGNORE),
*contract.get("lint_paths", []),
]
started_at = time.monotonic()
completed = subprocess.run(command, cwd=root, check=False)
return _result(
"smell",
"Code-smell lint for managed Python automation.",
"ok" if completed.returncode == 0 else "failed",
returncode=completed.returncode,
command=command,
duration_seconds=round(time.monotonic() - started_at, 3),
)
def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]:
junit_path = root / suite["junit"]
junit_path.parent.mkdir(parents=True, exist_ok=True)
command = [
sys.executable,
"-m",
"pytest",
"-q",
*suite.get("paths", []),
f"--junitxml={junit_path}",
]
coverage_xml = suite.get("coverage_xml")
if coverage_xml:
for source in suite.get("coverage_sources", []):
command.append(f"--cov={source}")
command.extend(
[
"--cov-branch",
f"--cov-report=xml:{root / coverage_xml}",
]
)
started_at = time.monotonic()
completed = subprocess.run(command, cwd=root, check=False)
return _result(
suite_name,
suite["description"],
"ok" if completed.returncode == 0 else "failed",
returncode=completed.returncode,
command=command,
junit=str(junit_path.relative_to(root)),
coverage_xml=coverage_xml,
duration_seconds=round(time.monotonic() - started_at, 3),
)
def run_profile(
contract: dict[str, Any],
root: Path,
profile_name: str,
build_dir: Path,
) -> dict[str, Any]:
"""Execute the configured profile and return a JSON-serializable summary."""
build_dir.mkdir(parents=True, exist_ok=True)
results: list[dict[str, Any]] = []
profiles = contract.get("profiles", {})
if profile_name not in profiles:
raise SystemExit(f"unknown profile: {profile_name}")
for check_name in profiles[profile_name]:
if check_name == "docs":
issues = run_docs_check(contract, root)
results.append(
_result(
"docs",
"Required docs, contract descriptions, and module docstrings.",
_status_from_issues(issues),
issues=issues,
)
)
continue
if check_name == "smell":
results.append(_run_ruff(contract, root))
continue
if check_name == "hygiene":
issues = run_hygiene_check(contract, root)
results.append(
_result(
"hygiene",
"500 LOC hygiene and naming rules for managed test automation.",
_status_from_issues(issues),
issues=issues,
)
)
continue
if check_name == "coverage":
unit_suite = contract.get("pytest_suites", {}).get("unit", {})
coverage_xml = root / unit_suite.get("coverage_xml", "build/coverage-unit.xml")
issues = run_coverage_check(contract, root, coverage_xml)
results.append(
_result(
"coverage",
"Per-file 95% coverage floor for tracked quality-managed modules.",
_status_from_issues(issues),
issues=issues,
coverage_xml=str(coverage_xml.relative_to(root)),
)
)
continue
suite = contract.get("pytest_suites", {}).get(check_name)
if suite is None:
raise SystemExit(f"profile {profile_name} references unknown check: {check_name}")
results.append(_run_pytest_suite(root, check_name, suite))
status = "ok" if all(item["status"] == "ok" for item in results) else "failed"
return {
"profile": profile_name,
"status": status,
"results": results,
"manual_scripts": contract.get("manual_scripts", []),
}
def main(argv: list[str] | None = None) -> int:
"""CLI entrypoint for the quality gate."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--profile", default="local")
parser.add_argument("--build-dir", default="build")
args = parser.parse_args(argv)
root = Path.cwd()
build_dir = root / args.build_dir
build_dir.mkdir(parents=True, exist_ok=True)
contract = load_contract()
summary = run_profile(contract, root, args.profile, build_dir)
summary_path = build_dir / "quality-gate-summary.json"
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return 0 if summary["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,37 @@
"""File-size and naming validation for the managed testing surface."""
from __future__ import annotations
import re
from collections.abc import Iterable
from pathlib import Path
from typing import Any
def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]:
matched: set[Path] = set()
for pattern in patterns:
matched.update(path for path in root.glob(pattern) if path.is_file())
return sorted(matched)
def run_check(contract: dict[str, Any], root: Path) -> list[str]:
"""Return human-readable issues for naming and file-size rules."""
config = contract.get("hygiene", {})
max_lines = int(config.get("max_lines", 500))
issues: list[str] = []
for path in _expand_globs(root, config.get("line_limit_globs", [])):
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
if line_count > max_lines:
issues.append(f"file exceeds {max_lines} LOC: {path.relative_to(root)} ({line_count})")
for rule in config.get("naming_rules", []):
pattern = re.compile(rule["pattern"])
for path in _expand_globs(root, [rule["glob"]]):
if not pattern.match(path.name):
issues.append(
f"naming rule failed ({rule['description']}): {path.relative_to(root)}"
)
return issues

View File

@ -0,0 +1,264 @@
from __future__ import annotations
import json
from pathlib import Path
from ci.scripts import publish_test_metrics
def test_parse_junit_supports_testsuite_and_missing_file(tmp_path: Path):
junit_path = tmp_path / "suite.xml"
junit_path.write_text(
'<testsuite tests="3" failures="1" errors="0" skipped="1" />',
encoding="utf-8",
)
assert publish_test_metrics._parse_junit(str(junit_path)) == {
"tests": 3,
"failures": 1,
"errors": 0,
"skipped": 1,
}
assert publish_test_metrics._parse_junit(str(tmp_path / "missing.xml")) == {
"tests": 0,
"failures": 0,
"errors": 0,
"skipped": 0,
}
def test_collect_junit_totals_sums_multiple_files(tmp_path: Path):
first = tmp_path / "junit-a.xml"
second = tmp_path / "junit-b.xml"
first.write_text('<testsuite tests="2" failures="1" errors="0" skipped="0" />', encoding="utf-8")
second.write_text('<testsuite tests="3" failures="0" errors="1" skipped="1" />', encoding="utf-8")
totals = publish_test_metrics._collect_junit_totals(str(tmp_path / "junit-*.xml"))
assert totals == {"tests": 5, "failures": 1, "errors": 1, "skipped": 1}
def test_parse_junit_handles_testsuites_and_invalid_counts(tmp_path: Path):
junit_path = tmp_path / "suite.xml"
junit_path.write_text(
(
"<testsuites>"
'<testsuite tests="2" failures="1" errors="0" skipped="0" />'
'<testsuite tests="bad" failures="0" errors="0" skipped="0" />'
"</testsuites>"
),
encoding="utf-8",
)
assert publish_test_metrics._parse_junit(str(junit_path)) == {
"tests": 2,
"failures": 1,
"errors": 0,
"skipped": 0,
}
def test_read_exit_code_and_summary_fallbacks(tmp_path: Path):
rc_path = tmp_path / "rc.txt"
rc_path.write_text("0\n", encoding="utf-8")
summary_path = tmp_path / "summary.json"
summary_path.write_text("{bad json", encoding="utf-8")
assert publish_test_metrics._read_exit_code(str(rc_path)) == 0
assert publish_test_metrics._read_exit_code(str(tmp_path / "missing.rc")) == 1
assert publish_test_metrics._load_summary(str(summary_path)) == {}
assert publish_test_metrics._load_summary(str(tmp_path / "missing.json")) == {}
def test_read_text_post_text_and_fetch_existing_counter(monkeypatch):
class _FakeResponse:
def __init__(self, payload: str, status: int = 200):
self.payload = payload
self.status = status
def read(self):
return self.payload.encode("utf-8")
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
responses = iter(
[
_FakeResponse("alpha"),
_FakeResponse("", status=202),
_FakeResponse(
"\n".join(
[
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="titan-iac",status="ok"} 7',
'platform_quality_gate_runs_total{job="other",suite="titan-iac",status="ok"} 1',
]
)
),
]
)
monkeypatch.setattr(
publish_test_metrics.urllib.request,
"urlopen",
lambda *args, **kwargs: next(responses),
)
assert publish_test_metrics._read_text("http://example.invalid") == "alpha"
publish_test_metrics._post_text("http://example.invalid", "payload")
assert (
publish_test_metrics._fetch_existing_counter(
"http://push.invalid",
"platform_quality_gate_runs_total",
{"job": "platform-quality-ci", "suite": "titan-iac", "status": "ok"},
)
== 7.0
)
def test_post_text_raises_and_counter_handles_bad_metric_lines(monkeypatch):
class _FakeResponse:
def __init__(self, payload: str, status: int = 200):
self.payload = payload
self.status = status
def read(self):
return self.payload.encode("utf-8")
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return None
monkeypatch.setattr(
publish_test_metrics.urllib.request,
"urlopen",
lambda *args, **kwargs: _FakeResponse("", status=500),
)
try:
publish_test_metrics._post_text("http://example.invalid", "payload")
except RuntimeError as exc:
assert "push failed" in str(exc)
else:
raise AssertionError("expected RuntimeError for failing push")
monkeypatch.setattr(
publish_test_metrics,
"_read_text",
lambda url: "\n".join(
[
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="titan-iac",status="ok"}',
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="titan-iac",status="ok"} nope',
]
),
)
assert (
publish_test_metrics._fetch_existing_counter(
"http://push.invalid",
"platform_quality_gate_runs_total",
{"job": "platform-quality-ci", "suite": "titan-iac", "status": "ok"},
)
== 0.0
)
def test_build_payload_includes_summary_metrics():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="ok",
tests={"tests": 4, "failures": 1, "errors": 0, "skipped": 1},
ok_count=7,
failed_count=2,
branch="main",
build_number="42",
summary={
"results": [
{"name": "docs", "status": "ok"},
{"name": "unit", "status": "failed"},
]
},
)
assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload
def test_build_payload_skips_incomplete_results():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="failed",
tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0},
ok_count=1,
failed_count=2,
branch="",
build_number="",
summary={"results": [{"name": "docs"}, {"status": "ok"}]},
)
assert "titan_iac_quality_gate_checks_total" in payload
assert 'check="docs"' not in payload
def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit-unit.xml").write_text(
'<testsuite tests="2" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "junit-glue.xml").write_text(
'<testsuite tests="3" failures="1" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps({"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}]}),
encoding="utf-8",
)
posted = {}
monkeypatch.setenv("SUITE_NAME", "titan-iac")
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.invalid")
monkeypatch.setenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci")
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "junit-*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
monkeypatch.setenv("BRANCH_NAME", "main")
monkeypatch.setenv("BUILD_NUMBER", "88")
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 5)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload}))
rc = publish_test_metrics.main()
assert rc == 0
assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac")
assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"]
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"]
def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8")
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "missing-summary.json"))
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None)
rc = publish_test_metrics.main()
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["status"] == "ok"
assert summary["checks_recorded"] == 0

View File

@ -0,0 +1,167 @@
from __future__ import annotations
from pathlib import Path
import textwrap
from testing.quality_contract import load_contract
from testing.quality_coverage import run_check as run_coverage_check
from testing.quality_docs import run_check as run_docs_check
from testing.quality_hygiene import run_check as run_hygiene_check
def test_bundled_contract_exposes_local_and_jenkins_profiles():
contract = load_contract()
assert "local" in contract["profiles"]
assert "jenkins" in contract["profiles"]
assert contract["pytest_suites"]["unit"]["paths"]
def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path):
module_path = tmp_path / "managed.py"
module_path.write_text("value = 1\n", encoding="utf-8")
(tmp_path / "README.md").write_text("repo docs\n", encoding="utf-8")
contract = {
"required_docs": [{"path": "README.md", "description": "Docs"}],
"managed_modules": ["managed.py"],
"lint_paths": ["missing-dir"],
"pytest_suites": {"unit": {"description": "Unit", "paths": ["missing-tests"]}},
"manual_scripts": [{"path": "missing-script.py", "description": "Manual"}],
}
issues = run_docs_check(contract, tmp_path)
assert "module docstring missing: managed.py" in issues
assert "contract path missing: missing-dir" in issues
assert "contract path missing: missing-tests" in issues
assert "contract path missing: missing-script.py" in issues
def test_docs_check_reports_missing_required_doc_metadata(tmp_path: Path):
(tmp_path / "README.md").write_text("", encoding="utf-8")
contract = {
"required_docs": [{"path": "README.md", "description": ""}, {"path": "missing.md", "description": "Missing"}],
"managed_modules": [],
"lint_paths": [],
"pytest_suites": {"unit": {"description": "", "paths": []}},
"manual_scripts": [{"path": "manual.py", "description": ""}],
}
issues = run_docs_check(contract, tmp_path)
assert "required doc empty: README.md" in issues
assert "required doc missing description: README.md" in issues
assert "required doc missing: missing.md" in issues
assert "pytest suite missing description: unit" in issues
assert "manual script missing description: manual.py" in issues
def test_hygiene_check_enforces_line_limit_and_name_rules(tmp_path: Path):
tests_dir = tmp_path / "tests"
tests_dir.mkdir()
bad_name = tests_dir / "bad-name.py"
bad_name.write_text("x = 1\n", encoding="utf-8")
long_file = tests_dir / "test_too_long.py"
long_file.write_text("line\n" * 4, encoding="utf-8")
contract = {
"hygiene": {
"max_lines": 3,
"line_limit_globs": ["tests/*.py"],
"naming_rules": [
{
"glob": "tests/*.py",
"pattern": r"^test_[a-z0-9_]+\.py$",
"description": "pytest files use test_*.py names.",
}
],
}
}
issues = run_hygiene_check(contract, tmp_path)
assert any("file exceeds 3 LOC" in issue for issue in issues)
assert any("naming rule failed" in issue and "bad-name.py" in issue for issue in issues)
def test_coverage_check_enforces_per_file_floor(tmp_path: Path):
build_dir = tmp_path / "build"
build_dir.mkdir()
coverage_xml = build_dir / "coverage.xml"
coverage_xml.write_text(
textwrap.dedent(
"""\
<coverage>
<packages>
<package>
<classes>
<class filename="ok.py" line-rate="1.0" />
<class filename="low.py" line-rate="0.90" />
</classes>
</package>
</packages>
</coverage>
"""
),
encoding="utf-8",
)
contract = {
"coverage": {
"minimum_percent": 95.0,
"tracked_files": ["ok.py", "low.py", "missing.py"],
}
}
issues = run_coverage_check(contract, tmp_path, coverage_xml)
assert "coverage below 95.0%: low.py (90.0%)" in issues
assert "coverage missing for tracked file: missing.py" in issues
def test_coverage_check_handles_missing_xml_and_source_root_mapping(tmp_path: Path):
missing_xml = tmp_path / "missing.xml"
assert run_coverage_check({"coverage": {"tracked_files": []}}, tmp_path, missing_xml) == [
"coverage xml missing: missing.xml"
]
source_dir = tmp_path / "pkg"
source_dir.mkdir()
(source_dir / "mapped.py").write_text("value = 1\n", encoding="utf-8")
coverage_xml = tmp_path / "coverage.xml"
coverage_xml.write_text(
textwrap.dedent(
f"""\
<coverage>
<sources>
<source>{source_dir}</source>
</sources>
<packages>
<package>
<classes>
<class filename="mapped.py" line-rate="1.0" />
<class filename="{(tmp_path / 'absolute.py').as_posix()}" line-rate="1.0" />
<class filename="skip.py" />
</classes>
</package>
</packages>
</coverage>
"""
),
encoding="utf-8",
)
(tmp_path / "absolute.py").write_text("value = 2\n", encoding="utf-8")
issues = run_coverage_check(
{
"coverage": {
"minimum_percent": 95.0,
"tracked_files": ["pkg/mapped.py", "absolute.py"],
}
},
tmp_path,
coverage_xml,
)
assert issues == []

View File

@ -0,0 +1,68 @@
from __future__ import annotations
from pathlib import Path
from testing import quality_gate
def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monkeypatch):
build_dir = tmp_path / "build"
unit_test = tmp_path / "test_sample.py"
unit_test.write_text("def test_ok():\n assert True\n", encoding="utf-8")
contract = {
"profiles": {"local": ["docs", "smell", "hygiene", "unit", "coverage"]},
"pytest_suites": {
"unit": {
"description": "Unit suite",
"paths": [str(unit_test.relative_to(tmp_path))],
"junit": "build/junit-unit.xml",
"coverage_xml": "build/coverage-unit.xml",
"coverage_sources": [],
}
},
"manual_scripts": [{"path": "manual.py", "description": "Manual"}],
}
monkeypatch.setattr(quality_gate, "run_docs_check", lambda *_: [])
monkeypatch.setattr(quality_gate, "run_hygiene_check", lambda *_: [])
monkeypatch.setattr(quality_gate, "run_coverage_check", lambda *_: [])
calls = []
def fake_run(command, cwd, check):
calls.append((command, cwd, check))
if "--junitxml=" in " ".join(command):
(build_dir / "junit-unit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "coverage-unit.xml").write_text("<coverage />", encoding="utf-8")
return type("Completed", (), {"returncode": 0})()
monkeypatch.setattr(quality_gate.subprocess, "run", fake_run)
summary = quality_gate.run_profile(contract, tmp_path, "local", build_dir)
assert summary["status"] == "ok"
assert [result["name"] for result in summary["results"]] == [
"docs",
"smell",
"hygiene",
"unit",
"coverage",
]
assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"]
assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])
def test_main_writes_summary_file(tmp_path: Path, monkeypatch):
summary = {"status": "ok", "profile": "local", "results": [], "manual_scripts": []}
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(quality_gate, "load_contract", lambda: {"profiles": {"local": []}, "pytest_suites": {}})
monkeypatch.setattr(quality_gate, "run_profile", lambda *args, **kwargs: summary)
rc = quality_gate.main(["--profile", "local", "--build-dir", "build"])
assert rc == 0
assert (tmp_path / "build" / "quality-gate-summary.json").exists()