feat(cve): filter actionable CVEs via Debian Security Tracker API
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s

- Add filter_actionable_cves() that queries security-tracker.debian.org
- Cache API responses in /tmp/full-updater-cache/cve-api/
- Use ThreadPoolExecutor(max_workers=10) for parallel API calls
- cve_count now shows only actionable CVEs (with fixed_version)
- cve_total stored for info, shown as 'CVE: X (Y non corrigeables)'
This commit is contained in:
enzo 2026-05-13 02:27:22 +02:00
parent af9e061ab5
commit e22f416500
3 changed files with 95 additions and 6 deletions

View file

@ -128,6 +128,7 @@ class FullUpdaterApp(App):
name=name, name=name,
apt_count=data.get("apt_count", 0), apt_count=data.get("apt_count", 0),
cve_count=data.get("cve_count", 0), cve_count=data.get("cve_count", 0),
cve_total=data.get("cve_total", 0),
error=data.get("error", ""), error=data.get("error", ""),
skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets), skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets),
cache_time=get_cache_timestamp(cache_id) cache_time=get_cache_timestamp(cache_id)
@ -158,7 +159,7 @@ class FullUpdaterApp(App):
write_cache(target.target_id, { write_cache(target.target_id, {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"apt_count": 0, "apt_packages": [], "apt_count": 0, "apt_packages": [],
"cve_count": 0, "cve_list": [], "cve_count": 0, "cve_total": 0, "cve_list": [],
"error": "LXC éteint" "error": "LXC éteint"
}) })
self._update_sidebar(target.target_id, result) self._update_sidebar(target.target_id, result)
@ -176,6 +177,7 @@ class FullUpdaterApp(App):
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })

View file

@ -1,10 +1,83 @@
import json
import os
import re import re
import subprocess import subprocess
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Callable from typing import Callable
from full_updater.backend.cache import write_cache from full_updater.backend.cache import write_cache, ensure_cache_dir
CVE_API_CACHE = "/tmp/full-updater-cache/cve-api"
def _ensure_cve_api_cache() -> None:
os.makedirs(CVE_API_CACHE, exist_ok=True)
def _fetch_cve_status(cve_id: str) -> dict:
"""Interroge l'API Debian Security Tracker pour une CVE, avec cache local."""
_ensure_cve_api_cache()
cache_path = os.path.join(CVE_API_CACHE, f"{cve_id}.json")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
url = f"https://security-tracker.debian.org/tracker/{cve_id}/json"
try:
req = urllib.request.Request(url, headers={"User-Agent": "full-updater/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.load(resp)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(data, f)
return data
except Exception:
return {}
def _is_cve_actionable(cve_id: str, suite: str = "bookworm") -> bool:
"""Retourne True si la CVE a un fixed_version pour le suite donné."""
data = _fetch_cve_status(cve_id)
cve_data = data.get(cve_id, {})
debian = cve_data.get("debian", {})
suite_data = debian.get(suite, {})
return suite_data.get("status") == "resolved" and "fixed_version" in suite_data
def filter_actionable_cves(cves: list[dict]) -> tuple[list[dict], int]:
"""Filtre la liste des CVE pour ne garder que les actionnables.
Retourne (cve_actionnables, cve_total)."""
if not cves:
return [], 0
total = len(cves)
def check(cve: dict) -> dict | None:
try:
if _is_cve_actionable(cve["id"]):
return cve
except Exception:
pass
return None
actionable = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(check, cve) for cve in cves]
for future in futures:
try:
result = future.result()
if result:
actionable.append(result)
except Exception:
pass
return actionable, total
@dataclass @dataclass
@ -21,6 +94,7 @@ class ScanResult:
cve_ok: bool = False cve_ok: bool = False
apt_count: int = 0 apt_count: int = 0
cve_count: int = 0 cve_count: int = 0
cve_total: int = 0
apt_packages: list[dict[str, str]] = field(default_factory=list) apt_packages: list[dict[str, str]] = field(default_factory=list)
cve_list: list[dict[str, str]] = field(default_factory=list) cve_list: list[dict[str, str]] = field(default_factory=list)
error: str = "" error: str = ""
@ -139,10 +213,19 @@ def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
if debsecan_ok: if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target) cve_ok, cve_list, cve_err = scan_cve(target)
result.cve_ok = cve_ok result.cve_ok = cve_ok
result.cve_count = len(cve_list)
result.cve_list = cve_list
if not cve_ok: if not cve_ok:
result.error = cve_err result.error = cve_err
# Filtrer les CVE actionnables via l'API Debian
if cve_list:
actionable, total = filter_actionable_cves(cve_list)
result.cve_list = actionable
result.cve_count = len(actionable)
result.cve_total = total
else:
result.cve_list = []
result.cve_count = 0
result.cve_total = 0
else: else:
result.cve_ok = False result.cve_ok = False
result.error = f"debsecan: {debsecan_err}" result.error = f"debsecan: {debsecan_err}"
@ -157,6 +240,7 @@ def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })

View file

@ -65,7 +65,7 @@ class SummaryPanel(Vertical):
yield Static("", id="summary-error", classes="summary-row summary-error") yield Static("", id="summary-error", classes="summary-row summary-error")
yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary") yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary")
def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, error: str, skipped: bool, cache_time: str): def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, cve_total: int, error: str, skipped: bool, cache_time: str):
self._current_target_id = target_id self._current_target_id = target_id
self._current_target_name = name self._current_target_name = name
@ -98,7 +98,10 @@ class SummaryPanel(Vertical):
return return
apt_btn.label = f"Mises à jour : {apt_count}" apt_btn.label = f"Mises à jour : {apt_count}"
cve_btn.label = f"CVE : {cve_count}" if cve_total > cve_count:
cve_btn.label = f"CVE : {cve_count} ({cve_total - cve_count} non corrigeables)"
else:
cve_btn.label = f"CVE : {cve_count}"
err_label.update("") err_label.update("")
btn.disabled = False btn.disabled = False
apt_btn.disabled = apt_count == 0 apt_btn.disabled = apt_count == 0