quality(titan-iac): widen enforced coverage contract

This commit is contained in:
jenkins 2026-04-20 21:39:53 -03:00
parent 5ebc320843
commit c4b0389892
12 changed files with 424 additions and 48 deletions

View File

@ -6,18 +6,11 @@ from __future__ import annotations
import json import json
import os import os
from glob import glob from glob import glob
from pathlib import Path
import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
try: from ci.scripts import publish_test_metrics_quality as _quality_helpers
from ci.scripts import publish_test_metrics_quality as _quality_helpers
except ModuleNotFoundError:
# Jenkins executes this file directly; keep sibling helper import working in that mode.
sys.path.insert(0, str(Path(__file__).resolve().parent))
import publish_test_metrics_quality as _quality_helpers
CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS
_build_check_statuses = _quality_helpers._build_check_statuses _build_check_statuses = _quality_helpers._build_check_statuses

View File

@ -0,0 +1,90 @@
"""Glue checks for Ariadne schedules exported to VictoriaMetrics."""
from __future__ import annotations
import os
from datetime import datetime, timezone
from pathlib import Path
import requests
import yaml
CONFIG_PATH = Path(__file__).with_name("config.yaml")
def _load_config() -> dict:
with CONFIG_PATH.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
def _query(promql: str) -> list[dict]:
vm_url = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
response = requests.get(f"{vm_url}/api/v1/query", params={"query": promql}, timeout=10)
response.raise_for_status()
payload = response.json()
return payload.get("data", {}).get("result", [])
def _expected_tasks() -> list[dict]:
cfg = _load_config()
tasks = cfg.get("ariadne_schedule_tasks", [])
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def _tracked_tasks(tasks: list[dict]) -> list[dict]:
tracked = [item for item in tasks if item.get("check_last_success")]
assert tracked, "No Ariadne schedule tasks are marked for success tracking"
return tracked
def _task_regex(tasks: list[dict]) -> str:
return "|".join(item["task"] for item in tasks)
def test_ariadne_schedule_series_exist():
tasks = _expected_tasks()
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing next-run metrics for: {', '.join(missing)}"
def test_ariadne_schedule_recent_success():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing last-success metrics for: {', '.join(missing)}"
now = datetime.now(timezone.utc)
age_by_task = {
item.get("metric", {}).get("task"): (now - datetime.fromtimestamp(float(item["value"][1]), tz=timezone.utc)).total_seconds() / 3600
for item in series
}
too_old = [
f"{task} ({age_by_task[task]:.1f}h > {item['max_success_age_hours']}h)"
for item in tasks
if (task := item["task"]) in age_by_task and age_by_task[task] > float(item["max_success_age_hours"])
]
assert not too_old, "Ariadne schedules are stale: " + ", ".join(too_old)
def test_ariadne_schedule_last_status_present_and_boolean():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing last-status metrics for: {', '.join(missing)}"
invalid = []
for item in series:
task = item.get("metric", {}).get("task")
value = float(item["value"][1])
if value not in (0.0, 1.0):
invalid.append(f"{task}={value}")
assert not invalid, f"Unexpected Ariadne last-status values: {', '.join(invalid)}"

View File

@ -1,3 +1,5 @@
"""Glue checks for the metrics the quality-gate publishes."""
from __future__ import annotations from __future__ import annotations
import os import os
@ -7,10 +9,7 @@ import requests
import yaml import yaml
VM_URL = os.environ.get( VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
"VM_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/")
CONFIG_PATH = Path(__file__).with_name("config.yaml") CONFIG_PATH = Path(__file__).with_name("config.yaml")
@ -26,26 +25,45 @@ def _query(promql: str) -> list[dict]:
return payload.get("data", {}).get("result", []) return payload.get("data", {}).get("result", [])
def test_glue_metrics_present(): def _expected_tasks() -> list[dict]:
series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}') cfg = _load_config()
assert series, "No glue cronjob label series found" tasks = cfg.get("ariadne_schedule_tasks", [])
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def test_glue_metrics_success_join(): def _tracked_tasks(tasks: list[dict]) -> list[dict]:
query = ( tracked = [item for item in tasks if item.get("check_last_success")]
"kube_cronjob_status_last_successful_time " assert tracked, "No Ariadne schedule tasks are marked for success tracking"
'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}' return tracked
)
series = _query(query)
assert series, "No glue cronjob last success series found" def _task_regex(tasks: list[dict]) -> str:
return "|".join(item["task"] for item in tasks)
def test_ariadne_schedule_metrics_present(): def test_ariadne_schedule_metrics_present():
cfg = _load_config() tasks = _expected_tasks()
expected = cfg.get("ariadne_schedule_tasks", []) selector = _task_regex(tasks)
if not expected: series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}')
return seen = {item.get("metric", {}).get("task") for item in series}
series = _query("ariadne_schedule_next_run_timestamp_seconds") missing = [item["task"] for item in tasks if item["task"] not in seen]
tasks = {item.get("metric", {}).get("task") for item in series}
missing = [task for task in expected if task not in tasks]
assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}" assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}"
def test_ariadne_schedule_success_and_status_metrics_present():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
success = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}')
status = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}')
success_tasks = {item.get("metric", {}).get("task") for item in success}
status_tasks = {item.get("metric", {}).get("task") for item in status}
expected = {item["task"] for item in tasks}
missing_success = sorted(expected - success_tasks)
missing_status = sorted(expected - status_tasks)
assert not missing_success, f"Missing Ariadne success metrics for: {', '.join(missing_success)}"
assert not missing_status, f"Missing Ariadne status metrics for: {', '.join(missing_status)}"

View File

@ -1,5 +1,4 @@
{ {
"scope_note": "Quality-gate LOC/naming/coverage checks apply to managed automation and testing modules only, not broad Flux/Kubernetes manifest trees.",
"required_docs": [ "required_docs": [
{ {
"path": "README.md", "path": "README.md",
@ -16,20 +15,27 @@
], ],
"managed_modules": [ "managed_modules": [
"ci/scripts/publish_test_metrics.py", "ci/scripts/publish_test_metrics.py",
"services/mailu/scripts/mailu_sync.py", "ci/scripts/publish_test_metrics_quality.py",
"testing/__init__.py", "testing/__init__.py",
"testing/quality_contract.py", "testing/quality_contract.py",
"testing/quality_docs.py", "testing/quality_docs.py",
"testing/quality_hygiene.py", "testing/quality_hygiene.py",
"testing/quality_coverage.py", "testing/quality_coverage.py",
"testing/quality_gate.py" "testing/quality_gate.py",
"ci/tests/glue/test_ariadne_schedules.py",
"ci/tests/glue/test_glue_metrics.py",
"testing/tests/test_publish_test_metrics.py",
"testing/tests/test_quality_contract.py",
"testing/tests/test_quality_gate.py"
], ],
"lint_paths": [ "lint_paths": [
"ci/scripts/publish_test_metrics.py", "ci/scripts/publish_test_metrics.py",
"ci/scripts/publish_test_metrics_quality.py",
"ci/tests/glue", "ci/tests/glue",
"scripts/tests", "scripts/tests",
"services/comms/scripts/tests", "services/comms/scripts/tests",
"services/mailu/scripts/mailu_sync.py", "services/mailu/scripts/mailu_sync.py",
"testing/tests",
"testing" "testing"
], ],
"pytest_suites": { "pytest_suites": {
@ -70,6 +76,8 @@
"hygiene", "hygiene",
"unit", "unit",
"coverage", "coverage",
"sonarqube",
"ironbank",
"glue" "glue"
] ]
}, },
@ -151,6 +159,7 @@
"minimum_percent": 95.0, "minimum_percent": 95.0,
"tracked_files": [ "tracked_files": [
"ci/scripts/publish_test_metrics.py", "ci/scripts/publish_test_metrics.py",
"ci/scripts/publish_test_metrics_quality.py",
"testing/quality_contract.py", "testing/quality_contract.py",
"testing/quality_docs.py", "testing/quality_docs.py",
"testing/quality_hygiene.py", "testing/quality_hygiene.py",

View File

@ -8,6 +8,7 @@ from typing import Any
def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
"""Load per-file line-rate percentages from a Cobertura XML report."""
tree = ET.parse(xml_path) tree = ET.parse(xml_path)
xml_root = tree.getroot() xml_root = tree.getroot()
source_roots = [ source_roots = [
@ -36,7 +37,11 @@ def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]: def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]:
"""Return human-readable issues for tracked files below the coverage floor.""" """Return human-readable issues for tracked files below the coverage floor.
The report is intentionally per-file so a single weak module cannot hide
behind aggregate suite coverage.
"""
if not xml_path.exists(): if not xml_path.exists():
return [f"coverage xml missing: {xml_path.relative_to(root)}"] return [f"coverage xml missing: {xml_path.relative_to(root)}"]
@ -56,3 +61,25 @@ def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]
) )
return issues return issues
def compute_workspace_line_coverage(
contract: dict[str, Any],
root: Path,
xml_path: Path,
) -> float:
"""Compute mean line coverage across tracked files present in the XML."""
if not xml_path.exists():
return 0.0
percentages = _load_percentages(xml_path, root)
samples: list[float] = []
for relative_path in contract.get("coverage", {}).get("tracked_files", []):
normalized = relative_path.replace("\\", "/")
percent = percentages.get(normalized)
if percent is not None:
samples.append(percent)
if not samples:
return 0.0
return round(sum(samples) / len(samples), 3)

View File

@ -16,9 +16,15 @@ from pathlib import Path
from typing import Any from typing import Any
from testing.quality_contract import load_contract from testing.quality_contract import load_contract
from testing.quality_coverage import run_check as run_coverage_check from testing.quality_coverage import (
compute_workspace_line_coverage,
run_check as run_coverage_check,
)
from testing.quality_docs import run_check as run_docs_check from testing.quality_docs import run_check as run_docs_check
from testing.quality_hygiene import run_check as run_hygiene_check from testing.quality_hygiene import (
count_files_over_line_limit,
run_check as run_hygiene_check,
)
RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
@ -209,14 +215,17 @@ def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
def _status_from_issues(issues: list[str]) -> str: def _status_from_issues(issues: list[str]) -> str:
"""Map an issue list to the gate status string."""
return "ok" if not issues else "failed" return "ok" if not issues else "failed"
def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]: def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]:
"""Build a JSON-serializable result record for the summary payload."""
return {"name": name, "description": description, "status": status, **extra} return {"name": name, "description": description, "status": status, **extra}
def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
"""Run the pedantic ruff subset against the contract's lint paths."""
command = [ command = [
sys.executable, sys.executable,
"-m", "-m",
@ -241,6 +250,7 @@ def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]: def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]:
"""Run a pytest suite and collect junit/coverage artifact locations."""
junit_path = root / suite["junit"] junit_path = root / suite["junit"]
junit_path.parent.mkdir(parents=True, exist_ok=True) junit_path.parent.mkdir(parents=True, exist_ok=True)
command = [ command = [
@ -340,11 +350,23 @@ def run_profile(
results.append(_run_pytest_suite(root, check_name, suite)) results.append(_run_pytest_suite(root, check_name, suite))
status = "ok" if all(item["status"] == "ok" for item in results) else "failed" status = "ok" if all(item["status"] == "ok" for item in results) else "failed"
workspace_line_coverage_percent = 0.0
if "coverage" in profiles[profile_name]:
unit_suite = contract.get("pytest_suites", {}).get("unit", {})
coverage_xml_rel = unit_suite.get("coverage_xml")
if coverage_xml_rel:
workspace_line_coverage_percent = compute_workspace_line_coverage(
contract,
root,
root / coverage_xml_rel,
)
return { return {
"profile": profile_name, "profile": profile_name,
"status": status, "status": status,
"results": results, "results": results,
"manual_scripts": contract.get("manual_scripts", []), "manual_scripts": contract.get("manual_scripts", []),
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": count_files_over_line_limit(contract, root),
} }
@ -365,5 +387,5 @@ def main(argv: list[str] | None = None) -> int:
return 0 if summary["status"] == "ok" else 1 return 0 if summary["status"] == "ok" else 1
if __name__ == "__main__": # pragma: no cover - exercised via CLI execution if __name__ == "__main__":
raise SystemExit(main()) raise SystemExit(main())

View File

@ -9,6 +9,7 @@ from typing import Any
def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]: def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]:
"""Expand a set of relative glob patterns to unique file paths."""
matched: set[Path] = set() matched: set[Path] = set()
for pattern in patterns: for pattern in patterns:
matched.update(path for path in root.glob(pattern) if path.is_file()) matched.update(path for path in root.glob(pattern) if path.is_file())
@ -29,9 +30,24 @@ def run_check(contract: dict[str, Any], root: Path) -> list[str]:
for rule in config.get("naming_rules", []): for rule in config.get("naming_rules", []):
pattern = re.compile(rule["pattern"]) pattern = re.compile(rule["pattern"])
for path in _expand_globs(root, [rule["glob"]]): for path in _expand_globs(root, [rule["glob"]]):
if path.name == "conftest.py":
continue
if not pattern.match(path.name): if not pattern.match(path.name):
issues.append( issues.append(
f"naming rule failed ({rule['description']}): {path.relative_to(root)}" f"naming rule failed ({rule['description']}): {path.relative_to(root)}"
) )
return issues return issues
def count_files_over_line_limit(contract: dict[str, Any], root: Path) -> int:
"""Return the number of managed files that exceed the configured LOC cap."""
config = contract.get("hygiene", {})
max_lines = int(config.get("max_lines", 500))
count = 0
for path in _expand_globs(root, config.get("line_limit_globs", [])):
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
if line_count > max_lines:
count += 1
return count

View File

@ -0,0 +1,105 @@
"""Focused tests for publish_test_metrics quality helper fallbacks."""
from __future__ import annotations
from pathlib import Path
from ci.scripts import publish_test_metrics_quality as quality_helpers
def test_infer_workspace_coverage_percent_handles_candidate_paths_and_parse_failures(tmp_path: Path) -> None:
"""Coverage inference should honor explicit XML hints and fail closed on bad XML."""
build_dir = tmp_path / "build"
build_dir.mkdir()
coverage_xml = build_dir / "custom-coverage.xml"
coverage_xml.write_text('<coverage line-rate="0.975" />', encoding="utf-8")
summary = {"results": [None, {"name": "coverage", "coverage_xml": str(coverage_xml)}]}
assert quality_helpers._infer_workspace_coverage_percent(summary, "build/default.xml") == 97.5
missing_xml = build_dir / "missing.xml"
assert quality_helpers._infer_workspace_coverage_percent({}, str(missing_xml)) == 0.0
no_rate_xml = build_dir / "no-rate.xml"
no_rate_xml.write_text("<coverage />", encoding="utf-8")
assert quality_helpers._infer_workspace_coverage_percent({}, str(no_rate_xml)) == 0.0
bad_xml = build_dir / "bad.xml"
bad_xml.write_text("<coverage", encoding="utf-8")
assert quality_helpers._infer_workspace_coverage_percent({}, str(bad_xml)) == 0.0
def test_infer_source_lines_over_500_counts_only_string_hygiene_issues() -> None:
"""LOC inference should ignore non-lists and only count explicit over-limit strings."""
summary = {
"results": [
None,
{"name": "docs", "issues": ["ignore me"]},
{"name": "loc", "issues": "not-a-list"},
{
"name": "hygiene",
"issues": [
"file exceeds 500 LOC: alpha.py (501)",
42,
"naming rule failed: beta.py",
"file exceeds 500 LOC: gamma.py (650)",
],
},
]
}
assert quality_helpers._infer_source_lines_over_500(summary) == 2
def test_build_check_statuses_uses_fallbacks_for_tests_docs_and_gate_glue() -> None:
"""Fallback status synthesis should derive missing checks from related signals."""
statuses = quality_helpers._build_check_statuses(
summary={
"results": [
{"name": "docs", "status": "ok"},
{"name": "smell", "status": "ok"},
{"name": "gate", "status": "warning"},
]
},
tests={"tests": 3, "failures": 1, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=0.0,
source_lines_over_500=0,
sonarqube_report={},
supply_chain_report={},
supply_chain_required=False,
)
assert statuses["tests"] == "failed"
assert statuses["coverage"] == "not_applicable"
assert statuses["loc"] == "ok"
assert statuses["docs_naming"] == "ok"
assert statuses["gate_glue"] == "failed"
def test_build_check_statuses_preserves_explicit_loc_docs_and_glue_results() -> None:
"""Explicit canonical statuses should win over fallback inference."""
statuses = quality_helpers._build_check_statuses(
summary={
"results": [
{"name": "loc", "status": "ok"},
{"name": "docs_naming", "status": "ok"},
{"name": "gate_glue", "status": "ok"},
]
},
tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=96.0,
source_lines_over_500=9,
sonarqube_report={},
supply_chain_report={},
supply_chain_required=False,
)
assert statuses["tests"] == "not_applicable"
assert statuses["coverage"] == "ok"
assert statuses["loc"] == "ok"
assert statuses["docs_naming"] == "ok"
assert statuses["gate_glue"] == "ok"

View File

@ -1,3 +1,5 @@
"""Unit tests for the repository testing contract helpers."""
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
@ -16,18 +18,6 @@ def test_bundled_contract_exposes_local_and_jenkins_profiles():
assert contract["pytest_suites"]["unit"]["paths"] assert contract["pytest_suites"]["unit"]["paths"]
def test_bundled_contract_keeps_monorepo_manifest_trees_out_of_hygiene_scope():
contract = load_contract()
required_doc_paths = {item["path"] for item in contract.get("required_docs", [])}
assert "AGENTS.md" not in required_doc_paths
globs = contract.get("hygiene", {}).get("line_limit_globs", [])
assert globs
for entry in globs:
assert entry.startswith(("testing/", "ci/", "scripts/tests/", "services/"))
assert "/scripts/" in entry or not entry.startswith("services/")
def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path): def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path):
module_path = tmp_path / "managed.py" module_path = tmp_path / "managed.py"
module_path.write_text("value = 1\n", encoding="utf-8") module_path.write_text("value = 1\n", encoding="utf-8")

View File

@ -0,0 +1,76 @@
"""Focused coverage-helper tests for titan-iac quality modules."""
from __future__ import annotations
import textwrap
from pathlib import Path
from testing.quality_coverage import compute_workspace_line_coverage, run_check
def test_compute_workspace_line_coverage_handles_missing_xml(tmp_path: Path) -> None:
"""Missing coverage XML should produce a zero workspace coverage score."""
contract = {"coverage": {"tracked_files": ["managed.py"]}}
assert compute_workspace_line_coverage(contract, tmp_path, tmp_path / "missing.xml") == 0.0
def test_compute_workspace_line_coverage_averages_present_tracked_files(tmp_path: Path) -> None:
"""Workspace coverage should average only tracked files that appear in the report."""
coverage_xml = tmp_path / "coverage.xml"
coverage_xml.write_text(
textwrap.dedent(
"""\
<coverage>
<packages>
<package>
<classes>
<class filename="alpha.py" line-rate="1.0" />
<class filename="beta.py" line-rate="0.5" />
</classes>
</package>
</packages>
</coverage>
"""
),
encoding="utf-8",
)
contract = {"coverage": {"tracked_files": ["alpha.py", "beta.py", "missing.py"]}}
assert compute_workspace_line_coverage(contract, tmp_path, coverage_xml) == 75.0
def test_run_check_keeps_relative_names_when_source_roots_do_not_match(tmp_path: Path) -> None:
"""Relative filenames should remain relative when no declared source root contains them."""
coverage_xml = tmp_path / "coverage.xml"
unmatched_root = tmp_path / "other-root"
unmatched_root.mkdir()
coverage_xml.write_text(
textwrap.dedent(
f"""\
<coverage>
<sources>
<source>{unmatched_root}</source>
</sources>
<packages>
<package>
<classes>
<class filename="relative.py" line-rate="0.80" />
</classes>
</package>
</packages>
</coverage>
"""
),
encoding="utf-8",
)
issues = run_check(
{"coverage": {"minimum_percent": 95.0, "tracked_files": ["relative.py"]}},
tmp_path,
coverage_xml,
)
assert issues == ["coverage below 95.0%: relative.py (80.0%)"]

View File

@ -1,3 +1,5 @@
"""Unit tests for the top-level quality-gate runner."""
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
@ -52,6 +54,8 @@ def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monk
"unit", "unit",
"coverage", "coverage",
] ]
assert summary["workspace_line_coverage_percent"] == 0.0
assert summary["source_lines_over_500"] == 0
assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"] assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"]
assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"]) assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])

View File

@ -0,0 +1,26 @@
"""Focused hygiene-helper tests for titan-iac quality modules."""
from __future__ import annotations
from pathlib import Path
from testing.quality_hygiene import count_files_over_line_limit
def test_count_files_over_line_limit_counts_only_long_matches(tmp_path: Path) -> None:
"""Only files beyond the configured limit should contribute to the LOC total."""
tests_dir = tmp_path / "tests"
tests_dir.mkdir()
(tests_dir / "test_short.py").write_text("line\n", encoding="utf-8")
(tests_dir / "test_long.py").write_text("line\n" * 4, encoding="utf-8")
(tests_dir / "notes.txt").write_text("line\n" * 8, encoding="utf-8")
contract = {
"hygiene": {
"max_lines": 3,
"line_limit_globs": ["tests/*.py"],
}
}
assert count_files_over_line_limit(contract, tmp_path) == 1