Compare commits

..

No commits in common. "master" and "v2026.05.12.07e84ae" have entirely different histories.

6 changed files with 69 additions and 245 deletions

View file

@ -6,7 +6,7 @@ from textual.containers import Horizontal, Vertical
from textual.reactive import reactive from textual.reactive import reactive
from textual import work from textual import work
from full_updater.backend.cache import clear_cache, ensure_cache_dir, read_cache, get_cache_timestamp from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp
from full_updater.backend.scanner import ( from full_updater.backend.scanner import (
Target, ScanResult, get_lxc_list, lxc_is_running, Target, ScanResult, get_lxc_list, lxc_is_running,
ensure_debsecan_installed, scan_apt, scan_cve, write_cache, scan_target ensure_debsecan_installed, scan_apt, scan_cve, write_cache, scan_target
@ -46,7 +46,6 @@ class FullUpdaterApp(App):
yield SummaryPanel() yield SummaryPanel()
def on_mount(self): def on_mount(self):
clear_cache()
ensure_cache_dir() ensure_cache_dir()
self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list() self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
sidebar = self.query_one(Sidebar) sidebar = self.query_one(Sidebar)
@ -94,7 +93,6 @@ class FullUpdaterApp(App):
result.error, result.error,
result.status == "skipped" result.status == "skipped"
) )
sidebar.refresh()
def _finish_scan(self): def _finish_scan(self):
self.pop_screen() self.pop_screen()
@ -111,7 +109,7 @@ class FullUpdaterApp(App):
) )
sidebar.refresh() sidebar.refresh()
if self.targets: if self.targets:
self.set_timer(0.2, lambda: self._select_target(self.targets[0].target_id)) self._select_target(self.targets[0].target_id)
def _select_target(self, target_id: str): def _select_target(self, target_id: str):
self.selected_target = target_id self.selected_target = target_id
@ -128,8 +126,6 @@ class FullUpdaterApp(App):
name=name, name=name,
apt_count=data.get("apt_count", 0), apt_count=data.get("apt_count", 0),
cve_count=data.get("cve_count", 0), cve_count=data.get("cve_count", 0),
cve_total=data.get("cve_total", 0),
os_info=data.get("os_info", ""),
error=data.get("error", ""), error=data.get("error", ""),
skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets), skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets),
cache_time=get_cache_timestamp(cache_id) cache_time=get_cache_timestamp(cache_id)
@ -160,8 +156,7 @@ class FullUpdaterApp(App):
write_cache(target.target_id, { write_cache(target.target_id, {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"apt_count": 0, "apt_packages": [], "apt_count": 0, "apt_packages": [],
"cve_count": 0, "cve_total": 0, "cve_list": [], "cve_count": 0, "cve_list": [],
"os_info": "LXC éteint",
"error": "LXC éteint" "error": "LXC éteint"
}) })
self._update_sidebar(target.target_id, result) self._update_sidebar(target.target_id, result)
@ -179,8 +174,6 @@ class FullUpdaterApp(App):
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })
@ -213,7 +206,7 @@ class FullUpdaterApp(App):
return return
executor = UpgradeExecutor( executor = UpgradeExecutor(
target_id, is_host, target_id, is_host,
on_line=lambda line: log_panel.write(line) on_line=lambda line: self.call_from_thread(log_panel.write, line)
) )
ok = await executor.run() ok = await executor.run()
if ok: if ok:

View file

@ -7,12 +7,9 @@ from typing import Any
CACHE_DIR = "/tmp/full-updater-cache" CACHE_DIR = "/tmp/full-updater-cache"
def clear_cache() -> None: def ensure_cache_dir() -> None:
if os.path.exists(CACHE_DIR): if os.path.exists(CACHE_DIR):
shutil.rmtree(CACHE_DIR) shutil.rmtree(CACHE_DIR)
def ensure_cache_dir() -> None:
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)

View file

@ -1,115 +1,10 @@
import json
import os
import re import re
import subprocess import subprocess
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Callable from typing import Callable
from full_updater.backend.cache import write_cache, ensure_cache_dir from full_updater.backend.cache import write_cache
CVE_API_CACHE = "/tmp/full-updater-cache/cve-api"
def _ensure_cve_api_cache() -> None:
os.makedirs(CVE_API_CACHE, exist_ok=True)
def _fetch_cve_html(cve_id: str) -> str:
"""Récupère le HTML de la page Debian Security Tracker pour une CVE."""
_ensure_cve_api_cache()
cache_path = os.path.join(CVE_API_CACHE, f"{cve_id}.html")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
pass
url = f"https://security-tracker.debian.org/tracker/{cve_id}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "full-updater/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8")
with open(cache_path, "w", encoding="utf-8") as f:
f.write(html)
return html
except Exception:
return ""
def _enrich_cve(cve_id: str) -> dict[str, str]:
"""Récupère la sévérité et le vecteur d'attaque depuis le HTML Debian Security Tracker."""
html = _fetch_cve_html(cve_id)
if not html:
return {"severity": "?", "vector": "?"}
result = {"severity": "?", "vector": "?"}
# Extraction de la sévérité depuis le bloc "Vulnerable and fixed packages"
# Recherche du motif: <td>remote</td> ou <td>local</td>
vector_match = re.search(r'<td[^>]*>\s*(remote|local)\s*</td>', html, re.IGNORECASE)
if vector_match:
result["vector"] = vector_match.group(1).lower()
# Extraction de la sévérité depuis les notes ou le tableau
# Format courant: [low], [medium], [high], [critical] dans les notes
severity_match = re.search(r'\b(unimportant|low|medium|high|critical)\b', html, re.IGNORECASE)
if severity_match:
result["severity"] = severity_match.group(1).lower()
return result
def _is_cve_actionable(cve_id: str, suite: str = "bookworm") -> bool:
"""Retourne True si la CVE est marquée 'fixed' pour le suite donné dans le HTML."""
html = _fetch_cve_html(cve_id)
if not html:
return False
# Chercher dans le tableau des packages vulnérables/fixed
# Pattern: suite (security)? version fixed
# Ex: bookworm 3.0.18-1~deb12u1 fixed
pattern = re.compile(rf"<td[^>]*>\s*{re.escape(suite)}\s*(?:\(security\))?\s*</td>\s*<td[^>]*>[^<]+</td>\s*<td[^>]*>\s*fixed\s*</td>", re.IGNORECASE)
return bool(pattern.search(html))
def enrich_cves(cves: list[dict]) -> tuple[list[dict], int]:
"""Enrichit les CVEs avec fixable, severity, vector via l'API Debian.
Retourne (cve_enrichies, nombre_actionnables)."""
if not cves:
return [], 0
def check(cve: dict) -> dict:
try:
# Vérifie si corrigeable
cve["fixable"] = _is_cve_actionable(cve["id"])
# Enrichit avec severity et vector
enriched = _enrich_cve(cve["id"])
cve["severity"] = enriched["severity"]
cve["vector"] = enriched["vector"]
return cve
except Exception:
cve["fixable"] = False
return cve
all_cves = []
actionable_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(check, cve) for cve in cves]
for future in futures:
try:
cve = future.result()
all_cves.append(cve)
if cve.get("fixable"):
actionable_count += 1
except Exception:
pass
return all_cves, actionable_count
@dataclass @dataclass
@ -126,8 +21,6 @@ class ScanResult:
cve_ok: bool = False cve_ok: bool = False
apt_count: int = 0 apt_count: int = 0
cve_count: int = 0 cve_count: int = 0
cve_total: int = 0
os_info: str = ""
apt_packages: list[dict[str, str]] = field(default_factory=list) apt_packages: list[dict[str, str]] = field(default_factory=list)
cve_list: list[dict[str, str]] = field(default_factory=list) cve_list: list[dict[str, str]] = field(default_factory=list)
error: str = "" error: str = ""
@ -163,41 +56,6 @@ def lxc_is_running(vmid: str) -> bool:
return ok and "running" in stdout.lower() return ok and "running" in stdout.lower()
def scan_os(target: Target) -> str:
"""Récupère le nom, la version complète et le codename de l'OS."""
prefix = [] if target.is_host else ["pct", "exec", target.target_id, "--"]
# 1. Lire /etc/os-release et parser ligne par ligne
ok, osrel_out, _ = run_cmd(prefix + ["cat", "/etc/os-release"], timeout=30)
os_data = {}
if ok:
for line in osrel_out.splitlines():
line = line.strip()
if "=" in line:
key, val = line.split("=", 1)
val = val.strip().strip('"').strip("'")
os_data[key] = val
# 2. Extraire les champs
name = os_data.get("NAME", "")
codename = os_data.get("VERSION_CODENAME", "")
debian_version_full = os_data.get("DEBIAN_VERSION_FULL", "")
version_id = os_data.get("VERSION_ID", "")
# 3. Construire le libellé final
# Avec DEBIAN_VERSION_FULL : NAME DEBIAN_VERSION_FULL (VERSION_CODENAME)
# Sans : NAME VERSION_ID (VERSION_CODENAME)
version = debian_version_full if debian_version_full else version_id
if name and version and codename:
return f"{name} {version} ({codename})"
if name and version:
return f"{name} {version}"
# Fallback : PRETTY_NAME ou "OS inconnu"
return os_data.get("PRETTY_NAME", "OS inconnu")
def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]: def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]:
if is_host: if is_host:
ok, _, _ = run_cmd(["which", "debsecan"]) ok, _, _ = run_cmd(["which", "debsecan"])
@ -246,8 +104,7 @@ def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]: def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
# Format par défaut (pas de --format) retourne: CVE-ID package cmd = ["debsecan", "--format", "report", "--suite", "bookworm"] if target.is_host else ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"]
cmd = ["debsecan", "--suite", "bookworm"] if target.is_host else ["pct", "exec", target.target_id, "--", "debsecan", "--suite", "bookworm"]
ok, stdout, stderr = run_cmd(cmd, timeout=120) ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok: if not ok:
return False, [], stderr or stdout return False, [], stderr or stdout
@ -256,13 +113,7 @@ def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
for line in stdout.splitlines(): for line in stdout.splitlines():
m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line) m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line)
if m: if m:
cves.append({ cves.append({"id": m.group(1), "package": m.group(2), "url": f"https://security-tracker.debian.org/tracker/{m.group(1)}"})
"id": m.group(1),
"package": m.group(2),
"vector": "?",
"severity": "?",
"url": f"https://security-tracker.debian.org/tracker/{m.group(1)}"
})
return True, cves, "" return True, cves, ""
@ -288,28 +139,15 @@ def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
if debsecan_ok: if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target) cve_ok, cve_list, cve_err = scan_cve(target)
result.cve_ok = cve_ok result.cve_ok = cve_ok
result.cve_count = len(cve_list)
result.cve_list = cve_list
if not cve_ok: if not cve_ok:
result.error = cve_err result.error = cve_err
# Enrichir les CVE via l'API Debian (severity, vector, fixable)
if cve_list:
all_cves, actionable_count = enrich_cves(cve_list)
result.cve_list = all_cves
result.cve_count = actionable_count
result.cve_total = len(all_cves)
else:
result.cve_list = []
result.cve_count = 0
result.cve_total = 0
else: else:
result.cve_ok = False result.cve_ok = False
result.error = f"debsecan: {debsecan_err}" result.error = f"debsecan: {debsecan_err}"
progress_cb() progress_cb()
# Scan OS
result.os_info = scan_os(target)
progress_cb()
result.status = "done" if (result.apt_ok and result.cve_ok) else "error" result.status = "done" if (result.apt_ok and result.cve_ok) else "error"
if result.error: if result.error:
result.status = "error" result.status = "error"
@ -319,8 +157,6 @@ def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })

View file

@ -9,21 +9,6 @@ except Exception:
PYPERCLIP_OK = False PYPERCLIP_OK = False
SCREEN_CSS = """
align: left top;
padding: 1 2;
#toolbar {
height: auto;
dock: top;
margin-bottom: 1;
}
DataTable {
height: 1fr;
border: solid $primary;
}
"""
class PackageListScreen(Screen): class PackageListScreen(Screen):
BINDINGS = [("b", "back", "Retour")] BINDINGS = [("b", "back", "Retour")]
DEFAULT_CSS = """ DEFAULT_CSS = """
@ -90,17 +75,14 @@ class CVEListScreen(Screen):
with Horizontal(id="toolbar"): with Horizontal(id="toolbar"):
yield Button("⬅ Retour", id="cve-back", variant="default") yield Button("⬅ Retour", id="cve-back", variant="default")
table = DataTable(id="cve-table") table = DataTable(id="cve-table")
table.add_columns("CVE-ID", "Paquet", "Severite", "Vecteur", "Corrigeable", "Lien") table.add_columns("CVE-ID", "Paquet", "Lien")
table.cursor_type = "row" table.cursor_type = "row"
for i, cve in enumerate(self.cves): for i, cve in enumerate(self.cves):
cve_id = cve.get("id", "?") cve_id = cve.get("id", "?")
pkg = cve.get("package", "?") pkg = cve.get("package", "?")
url = cve.get("url", "") url = cve.get("url", "")
severity = cve.get("severity", "?")
vector = cve.get("vector", "?")
fixable = "🟢 Oui" if cve.get("fixable") else "🔴 Non"
self.urls[i] = url self.urls[i] = url
table.add_row(cve_id, pkg, severity, vector, fixable, url) table.add_row(cve_id, pkg, url)
yield table yield table
def on_data_table_row_selected(self, event: DataTable.RowSelected): def on_data_table_row_selected(self, event: DataTable.RowSelected):

View file

@ -23,21 +23,21 @@ class LogPanel(Vertical):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._log_widget = None self.log = None
def compose(self): def compose(self):
with Horizontal(id="log-toolbar"): with Horizontal(id="log-toolbar"):
yield Button("⬅ Retour", id="log-back") yield Button("⬅ Retour", id="log-back")
self._log_widget = RichLog(id="log-view", highlight=True) self.log = RichLog(id="log-view", highlight=True)
yield self._log_widget yield self.log
def write(self, line: str): def write(self, line: str):
if self._log_widget: if self.log:
self._log_widget.write(line) self.log.write(line)
def clear(self): def clear(self):
if self._log_widget: if self.log:
self._log_widget.clear() self.log.clear()
def on_button_pressed(self, event: Button.Pressed): def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "log-back": if event.button.id == "log-back":

View file

@ -1,5 +1,6 @@
from textual.widgets import Static, Button from textual.widgets import Static, Button
from textual.containers import Vertical, Horizontal from textual.containers import Vertical, Horizontal
from textual.reactive import reactive
from textual.message import Message from textual.message import Message
@ -25,6 +26,10 @@ class SummaryPanel(Vertical):
height: auto; height: auto;
margin: 1 0; margin: 1 0;
} }
.summary-count {
color: $primary;
text-style: bold;
}
.summary-error { .summary-error {
color: $error; color: $error;
text-style: bold; text-style: bold;
@ -50,8 +55,13 @@ class SummaryPanel(Vertical):
self.target_id = target_id self.target_id = target_id
super().__init__() super().__init__()
_current_target_id: str = "" cache_time = reactive("")
_current_target_name: str = "" apt_count = reactive(0)
cve_count = reactive(0)
target_id = reactive("")
target_name = reactive("")
error_msg = reactive("")
is_skipped = reactive(False)
def compose(self): def compose(self):
with Horizontal(id="summary-header"): with Horizontal(id="summary-header"):
@ -60,29 +70,36 @@ class SummaryPanel(Vertical):
with Vertical(id="summary-body"): with Vertical(id="summary-body"):
yield Static("Sélectionnez une cible", id="summary-title") yield Static("Sélectionnez une cible", id="summary-title")
yield Static("", id="summary-os", classes="summary-row")
yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default") yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default")
yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default") yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default")
yield Static("", id="summary-error", classes="summary-row summary-error") yield Static("", id="summary-error", classes="summary-row summary-error")
yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary") yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary")
def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, cve_total: int, os_info: str, error: str, skipped: bool, cache_time: str): def watch_cache_time(self, value: str):
self._current_target_id = target_id self.query_one("#summary-cache", Static).update(f"Cache : {value}" if value else "")
self._current_target_name = name
title = self.query_one("#summary-title", Static) def watch_apt_count(self, value: int):
os_label = self.query_one("#summary-os", Static) self._update_display()
def watch_cve_count(self, value: int):
self._update_display()
def watch_error_msg(self, value: str):
self._update_display()
def watch_is_skipped(self, value: bool):
self._update_display()
def watch_target_name(self, value: str):
self.query_one("#summary-title", Static).update(value if value else "Sélectionnez une cible")
def _update_display(self):
apt_btn = self.query_one("#btn-apt", Button) apt_btn = self.query_one("#btn-apt", Button)
cve_btn = self.query_one("#btn-cve", Button) cve_btn = self.query_one("#btn-cve", Button)
err_label = self.query_one("#summary-error", Static) err_label = self.query_one("#summary-error", Static)
btn = self.query_one("#btn-upgrade", Button) btn = self.query_one("#btn-upgrade", Button)
cache_label = self.query_one("#summary-cache", Static)
title.update(name if name else "Sélectionnez une cible") if self.is_skipped:
os_label.update(os_info if os_info else "")
cache_label.update(f"Cache : {cache_time}" if cache_time else "")
if skipped:
apt_btn.label = "Mises à jour : LXC éteint" apt_btn.label = "Mises à jour : LXC éteint"
cve_btn.label = "CVE : LXC éteint" cve_btn.label = "CVE : LXC éteint"
err_label.update("") err_label.update("")
@ -91,38 +108,37 @@ class SummaryPanel(Vertical):
cve_btn.disabled = True cve_btn.disabled = True
return return
if error: if self.error_msg:
apt_btn.label = f"Mises à jour : {apt_count}" apt_btn.label = f"Mises à jour : {self.apt_count}"
cve_btn.label = "CVE : ERREUR" cve_btn.label = "CVE : ERREUR"
err_label.update(error) err_label.update(self.error_msg)
btn.disabled = False btn.disabled = False
apt_btn.disabled = apt_count == 0 apt_btn.disabled = self.apt_count == 0
cve_btn.disabled = True cve_btn.disabled = True
return return
apt_btn.label = f"Mises à jour : {apt_count}" apt_btn.label = f"Mises à jour : {self.apt_count}"
if cve_total > cve_count: cve_btn.label = f"CVE : {self.cve_count}"
cve_btn.label = f"CVE : {cve_count} ({cve_total - cve_count} non corrigeables)"
else:
cve_btn.label = f"CVE : {cve_count}"
err_label.update("") err_label.update("")
btn.disabled = False btn.disabled = False
apt_btn.disabled = apt_count == 0 apt_btn.disabled = self.apt_count == 0
cve_btn.disabled = cve_total == 0 cve_btn.disabled = self.cve_count == 0
# Forcer le refresh de tous les widgets modifiés def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, error: str, skipped: bool, cache_time: str):
apt_btn.refresh() self.target_id = target_id
cve_btn.refresh() self.target_name = name
err_label.refresh() self.apt_count = apt_count
btn.refresh() self.cve_count = cve_count
cache_label.refresh() self.error_msg = error
self.is_skipped = skipped
self.cache_time = cache_time
def on_button_pressed(self, event: Button.Pressed): def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn-reload": if event.button.id == "btn-reload":
self.post_message(self.ReloadPressed()) self.post_message(self.ReloadPressed())
elif event.button.id == "btn-upgrade": elif event.button.id == "btn-upgrade":
self.post_message(self.UpgradePressed(self._current_target_id, self._current_target_name)) self.post_message(self.UpgradePressed(self.target_id, self.target_name))
elif event.button.id == "btn-apt": elif event.button.id == "btn-apt":
self.post_message(self.AptClicked(self._current_target_id)) self.post_message(self.AptClicked(self.target_id))
elif event.button.id == "btn-cve": elif event.button.id == "btn-cve":
self.post_message(self.CveClicked(self._current_target_id)) self.post_message(self.CveClicked(self.target_id))