Compare commits

..

No commits in common. "master" and "v2026.05.12.5a38bb4" have entirely different histories.

13 changed files with 399 additions and 431 deletions

2
debian/control vendored
View file

@ -7,7 +7,7 @@ Standards-Version: 4.6.0
Package: full-updater
Architecture: all
Depends: python3, python3-venv
Depends: python3, python3-venv, git, pct
Description: TUI for APT/CVE updates on Proxmox host and LXC
Full Updater is a terminal user interface (TUI) that visualizes
available APT updates and CVEs for the Proxmox host and all LXC

View file

@ -4,18 +4,20 @@ from datetime import datetime
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.reactive import reactive
from textual.worker import get_current_worker
from textual import work
from full_updater.backend.cache import clear_cache, ensure_cache_dir, read_cache, get_cache_timestamp
from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp
from full_updater.backend.scanner import (
Target, ScanResult, get_lxc_list, lxc_is_running,
ensure_debsecan_installed, scan_apt, scan_cve, write_cache, scan_target
ensure_debsecan_installed, scan_apt, scan_cve, write_cache
)
from full_updater.backend.executor import UpgradeExecutor
from full_updater.ui.loader import LoaderScreen
from full_updater.ui.sidebar import Sidebar
from full_updater.ui.summary import SummaryPanel
from full_updater.ui.detail_screens import PackageListScreen, CVEListScreen
from full_updater.ui.package_table import PackageTable
from full_updater.ui.cve_table import CVETable
from full_updater.ui.log_panel import LogPanel
from full_updater.ui.confirm_modal import ConfirmModal
@ -24,7 +26,7 @@ class FullUpdaterApp(App):
CSS = """
Screen { align: center middle; }
#main-layout { layout: horizontal; height: 100%; }
Sidebar { width: 42; }
Sidebar { width: 30; }
#content { width: 1fr; }
"""
@ -46,7 +48,6 @@ class FullUpdaterApp(App):
yield SummaryPanel()
def on_mount(self):
clear_cache()
ensure_cache_dir()
self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
sidebar = self.query_one(Sidebar)
@ -55,8 +56,8 @@ class FullUpdaterApp(App):
self.push_screen(self.loader_screen)
self.run_scan_all()
@work
async def run_scan_all(self):
@work(thread=True)
def run_scan_all(self):
total = len(self.targets)
completed = 0
@ -64,16 +65,64 @@ class FullUpdaterApp(App):
nonlocal completed
completed += 1
pct = min(100.0, (completed / (total * 4)) * 100)
self._update_loader(pct)
self.call_from_thread(self._update_loader, pct)
for idx, target in enumerate(self.targets):
self._update_loader_status(idx, "running")
result = await asyncio.to_thread(scan_target, target, progress_cb)
self.results[target.target_id] = result
self._update_loader_status(idx, result.status)
self._update_sidebar(target.target_id, result)
self.call_from_thread(self._update_loader_status, idx, "running")
self._finish_scan()
if not target.is_host and not lxc_is_running(target.target_id):
self.call_from_thread(self._update_loader_status, idx, "skipped")
result = ScanResult(target=target)
result.status = "skipped"
self.results[target.target_id] = result
for _ in range(4):
progress_cb()
continue
debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id)
progress_cb()
apt_ok, apt_packages, apt_err = scan_apt(target)
progress_cb()
cve_ok = False
cve_list = []
cve_err = ""
if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target)
else:
cve_err = f"debsecan: {debsecan_err}"
progress_cb()
result = ScanResult(
target=target,
apt_ok=apt_ok,
cve_ok=cve_ok,
apt_count=len(apt_packages),
cve_count=len(cve_list),
apt_packages=apt_packages,
cve_list=cve_list,
error=apt_err or cve_err,
status="done" if (apt_ok and (cve_ok or not debsecan_ok)) else "error"
)
if result.error:
result.status = "error"
self.results[target.target_id] = result
cache_id = "host" if target.is_host else target.target_id
write_cache(cache_id, {
"timestamp": datetime.now().isoformat(),
"apt_count": result.apt_count,
"apt_packages": result.apt_packages,
"cve_count": result.cve_count,
"cve_list": result.cve_list,
"error": result.error
})
self.call_from_thread(self._update_loader_status, idx, result.status)
self.call_from_thread(self._update_sidebar, target.target_id, result)
self.call_from_thread(self._finish_scan)
def _update_loader(self, pct: float):
if self.loader_screen:
@ -94,7 +143,6 @@ class FullUpdaterApp(App):
result.error,
result.status == "skipped"
)
sidebar.refresh()
def _finish_scan(self):
self.pop_screen()
@ -109,9 +157,8 @@ class FullUpdaterApp(App):
res.error,
res.status == "skipped"
)
sidebar.refresh()
if self.targets:
self.set_timer(0.2, lambda: self._select_target(self.targets[0].target_id))
self._select_target(self.targets[0].target_id)
def _select_target(self, target_id: str):
self.selected_target = target_id
@ -128,13 +175,10 @@ class FullUpdaterApp(App):
name=name,
apt_count=data.get("apt_count", 0),
cve_count=data.get("cve_count", 0),
cve_total=data.get("cve_total", 0),
os_info=data.get("os_info", ""),
error=data.get("error", ""),
skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets),
cache_time=get_cache_timestamp(cache_id)
)
summary.refresh()
def on_sidebar_target_selected(self, event: Sidebar.TargetSelected):
self._select_target(event.target_id)
@ -143,8 +187,8 @@ class FullUpdaterApp(App):
if self.selected_target:
self._reload_target(self.selected_target)
@work
async def _reload_target(self, target_id: str):
@work(thread=True)
def _reload_target(self, target_id: str):
target = None
for t in self.targets:
if t.target_id == target_id:
@ -159,19 +203,32 @@ class FullUpdaterApp(App):
self.results[target.target_id] = result
write_cache(target.target_id, {
"timestamp": datetime.now().isoformat(),
"apt_count": 0, "apt_packages": [],
"cve_count": 0, "cve_total": 0, "cve_list": [],
"os_info": "LXC éteint",
"apt_count": 0,
"apt_packages": [],
"cve_count": 0,
"cve_list": [],
"error": "LXC éteint"
})
self._update_sidebar(target.target_id, result)
self._select_target(target.target_id)
self.call_from_thread(self._update_sidebar, target.target_id, result)
self.call_from_thread(self._select_target, target.target_id)
return
def dummy_progress():
pass
ensure_debsecan_installed(target.is_host, target.target_id)
apt_ok, apt_packages, apt_err = scan_apt(target)
cve_ok, cve_list, cve_err = scan_cve(target)
error = apt_err or cve_err
result = await asyncio.to_thread(scan_target, target, dummy_progress)
result = ScanResult(
target=target,
apt_ok=apt_ok,
cve_ok=cve_ok,
apt_count=len(apt_packages),
cve_count=len(cve_list),
apt_packages=apt_packages,
cve_list=cve_list,
error=error,
status="done" if (apt_ok and cve_ok) else "error"
)
self.results[target.target_id] = result
cache_id = "host" if target.is_host else target.target_id
write_cache(cache_id, {
@ -179,13 +236,11 @@ class FullUpdaterApp(App):
"apt_count": result.apt_count,
"apt_packages": result.apt_packages,
"cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list,
"error": result.error
})
self._update_sidebar(target.target_id, result)
self._select_target(target.target_id)
self.call_from_thread(self._update_sidebar, target.target_id, result)
self.call_from_thread(self._select_target, target.target_id)
def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed):
self.push_screen(
@ -213,7 +268,7 @@ class FullUpdaterApp(App):
return
executor = UpgradeExecutor(
target_id, is_host,
on_line=lambda line: log_panel.write(line)
on_line=lambda line: self.call_from_thread(log_panel.write, line)
)
ok = await executor.run()
if ok:
@ -234,7 +289,10 @@ class FullUpdaterApp(App):
pkgs = data.get("apt_packages", [])
if not pkgs:
return
self.push_screen(PackageListScreen(pkgs))
table = PackageTable()
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(pkgs)
def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked):
cache_id = "host" if event.target_id == "host" else event.target_id
@ -242,4 +300,17 @@ class FullUpdaterApp(App):
cves = data.get("cve_list", [])
if not cves:
return
self.push_screen(CVEListScreen(cves))
table = CVETable()
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(cves)
def on_package_table_back_pressed(self, event: PackageTable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(PackageTable):
widget.remove()
def on_cve_table_back_pressed(self, event: CVETable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(CVETable):
widget.remove()

View file

@ -7,12 +7,9 @@ from typing import Any
CACHE_DIR = "/tmp/full-updater-cache"
def clear_cache() -> None:
def ensure_cache_dir() -> None:
if os.path.exists(CACHE_DIR):
shutil.rmtree(CACHE_DIR)
def ensure_cache_dir() -> None:
os.makedirs(CACHE_DIR, exist_ok=True)

View file

@ -1,121 +1,18 @@
import asyncio
import json
import os
import re
import shutil
import subprocess
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable
from typing import Any
from full_updater.backend.cache import write_cache, ensure_cache_dir
CVE_API_CACHE = "/tmp/full-updater-cache/cve-api"
def _ensure_cve_api_cache() -> None:
os.makedirs(CVE_API_CACHE, exist_ok=True)
def _fetch_cve_html(cve_id: str) -> str:
"""Récupère le HTML de la page Debian Security Tracker pour une CVE."""
_ensure_cve_api_cache()
cache_path = os.path.join(CVE_API_CACHE, f"{cve_id}.html")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
pass
url = f"https://security-tracker.debian.org/tracker/{cve_id}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "full-updater/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8")
with open(cache_path, "w", encoding="utf-8") as f:
f.write(html)
return html
except Exception:
return ""
def _enrich_cve(cve_id: str) -> dict[str, str]:
"""Récupère la sévérité et le vecteur d'attaque depuis le HTML Debian Security Tracker."""
html = _fetch_cve_html(cve_id)
if not html:
return {"severity": "?", "vector": "?"}
result = {"severity": "?", "vector": "?"}
# Extraction de la sévérité depuis le bloc "Vulnerable and fixed packages"
# Recherche du motif: <td>remote</td> ou <td>local</td>
vector_match = re.search(r'<td[^>]*>\s*(remote|local)\s*</td>', html, re.IGNORECASE)
if vector_match:
result["vector"] = vector_match.group(1).lower()
# Extraction de la sévérité depuis les notes ou le tableau
# Format courant: [low], [medium], [high], [critical] dans les notes
severity_match = re.search(r'\b(unimportant|low|medium|high|critical)\b', html, re.IGNORECASE)
if severity_match:
result["severity"] = severity_match.group(1).lower()
return result
def _is_cve_actionable(cve_id: str, suite: str = "bookworm") -> bool:
"""Retourne True si la CVE est marquée 'fixed' pour le suite donné dans le HTML."""
html = _fetch_cve_html(cve_id)
if not html:
return False
# Chercher dans le tableau des packages vulnérables/fixed
# Pattern: suite (security)? version fixed
# Ex: bookworm 3.0.18-1~deb12u1 fixed
pattern = re.compile(rf"<td[^>]*>\s*{re.escape(suite)}\s*(?:\(security\))?\s*</td>\s*<td[^>]*>[^<]+</td>\s*<td[^>]*>\s*fixed\s*</td>", re.IGNORECASE)
return bool(pattern.search(html))
def enrich_cves(cves: list[dict]) -> tuple[list[dict], int]:
"""Enrichit les CVEs avec fixable, severity, vector via l'API Debian.
Retourne (cve_enrichies, nombre_actionnables)."""
if not cves:
return [], 0
def check(cve: dict) -> dict:
try:
# Vérifie si corrigeable
cve["fixable"] = _is_cve_actionable(cve["id"])
# Enrichit avec severity et vector
enriched = _enrich_cve(cve["id"])
cve["severity"] = enriched["severity"]
cve["vector"] = enriched["vector"]
return cve
except Exception:
cve["fixable"] = False
return cve
all_cves = []
actionable_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(check, cve) for cve in cves]
for future in futures:
try:
cve = future.result()
all_cves.append(cve)
if cve.get("fixable"):
actionable_count += 1
except Exception:
pass
return all_cves, actionable_count
from full_updater.backend.cache import write_cache
@dataclass
class Target:
target_id: str
name: str
target_id: str # 'host' ou '100'
name: str # 'hote' ou '100 traefik'
is_host: bool
@ -126,17 +23,17 @@ class ScanResult:
cve_ok: bool = False
apt_count: int = 0
cve_count: int = 0
cve_total: int = 0
os_info: str = ""
apt_packages: list[dict[str, str]] = field(default_factory=list)
cve_list: list[dict[str, str]] = field(default_factory=list)
error: str = ""
status: str = "pending"
status: str = "pending" # pending | running | done | error | skipped
def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
return proc.returncode == 0, proc.stdout, proc.stderr
except subprocess.TimeoutExpired:
return False, "", "Command timed out"
@ -145,7 +42,7 @@ def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
def get_lxc_list() -> list[Target]:
ok, stdout, _ = run_cmd(["pct", "list"])
ok, stdout, stderr = run_cmd(["pct", "list"])
if not ok:
return []
targets = []
@ -160,67 +57,45 @@ def get_lxc_list() -> list[Target]:
def lxc_is_running(vmid: str) -> bool:
ok, stdout, _ = run_cmd(["pct", "status", vmid])
return ok and "running" in stdout.lower()
def scan_os(target: Target) -> str:
"""Récupère le nom, la version complète et le codename de l'OS."""
prefix = [] if target.is_host else ["pct", "exec", target.target_id, "--"]
# 1. Lire /etc/os-release et parser ligne par ligne
ok, osrel_out, _ = run_cmd(prefix + ["cat", "/etc/os-release"], timeout=30)
os_data = {}
if ok:
for line in osrel_out.splitlines():
line = line.strip()
if "=" in line:
key, val = line.split("=", 1)
val = val.strip().strip('"').strip("'")
os_data[key] = val
# 2. Extraire les champs
name = os_data.get("NAME", "")
codename = os_data.get("VERSION_CODENAME", "")
debian_version_full = os_data.get("DEBIAN_VERSION_FULL", "")
version_id = os_data.get("VERSION_ID", "")
# 3. Construire le libellé final
# Avec DEBIAN_VERSION_FULL : NAME DEBIAN_VERSION_FULL (VERSION_CODENAME)
# Sans : NAME VERSION_ID (VERSION_CODENAME)
version = debian_version_full if debian_version_full else version_id
if name and version and codename:
return f"{name} {version} ({codename})"
if name and version:
return f"{name} {version}"
# Fallback : PRETTY_NAME ou "OS inconnu"
return os_data.get("PRETTY_NAME", "OS inconnu")
if ok and "running" in stdout.lower():
return True
return False
def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]:
"""Retourne (ok, error_msg)"""
if is_host:
# Vérifier si debsecan est installé sur l'hôte
ok, _, _ = run_cmd(["which", "debsecan"])
if ok:
return True, ""
ok, _, err = run_cmd(["apt-get", "update"], timeout=60)
ok, out, err = run_cmd(["apt-get", "update"], timeout=60)
if not ok:
return False, f"apt-get update failed: {err}"
ok, _, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120)
return (True, "") if ok else (False, f"apt-get install debsecan failed: {err}")
ok, out, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok:
return False, f"apt-get install debsecan failed: {err}"
return True, ""
else:
# Vérifier dans le LXC
ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"])
if ok:
return True, ""
ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60)
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60)
if not ok:
return False, f"apt-get update failed in {vmid}: {err}"
ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120)
return (True, "") if ok else (False, f"apt-get install debsecan failed in {vmid}: {err}")
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok:
return False, f"apt-get install debsecan failed in {vmid}: {err}"
return True, ""
def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
cmd = ["apt", "list", "--upgradable"] if target.is_host else ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
if target.is_host:
cmd = ["apt", "list", "--upgradable"]
else:
cmd = ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok:
return False, [], stderr or stdout
@ -233,96 +108,121 @@ def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if len(parts) < 2:
continue
name = parts[0].strip()
rest = parts[1]
# Format: apt list --upgradable -> name/arch version suite [upgradable from: oldver]
m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line)
if m:
old_ver = m.group(1).strip()
# Essayer d'extraire la nouvelle version (avant le [upgradable from:])
prefix = line.split("[upgradable from:")[0].strip()
ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix)
new_ver = ver_m.group(1) if ver_m else "?"
packages.append({"name": name, "current": old_ver, "new": new_ver, "size": "-"})
packages.append({
"name": name,
"current": old_ver,
"new": new_ver,
"size": "-"
})
else:
packages.append({"name": name, "current": "?", "new": "?", "size": "-"})
# fallback simple
packages.append({
"name": name,
"current": "?",
"new": "?",
"size": "-"
})
return True, packages, ""
def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
# Format par défaut (pas de --format) retourne: CVE-ID package
cmd = ["debsecan", "--suite", "bookworm"] if target.is_host else ["pct", "exec", target.target_id, "--", "debsecan", "--suite", "bookworm"]
if target.is_host:
cmd = ["debsecan", "--format", "report", "--suite", "bookworm"]
else:
cmd = ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"]
ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok:
return False, [], stderr or stdout
cves = []
for line in stdout.splitlines():
# Format debsecan: CVE-XXXX-XXXX package [remote] [low] - description
m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line)
if m:
cve_id = m.group(1)
pkg = m.group(2)
cves.append({
"id": m.group(1),
"package": m.group(2),
"vector": "?",
"severity": "?",
"url": f"https://security-tracker.debian.org/tracker/{m.group(1)}"
"id": cve_id,
"package": pkg,
"url": f"https://security-tracker.debian.org/tracker/{cve_id}"
})
return True, cves, ""
def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
result = ScanResult(target=target)
async def scan_target(target: Target, result: ScanResult, progress_cb: Any) -> None:
"""Scanne une cible (APT puis CVE) et écrit le cache."""
result.status = "running"
progress_cb()
await progress_cb()
# Vérifier si LXC est allumé
if not target.is_host and not lxc_is_running(target.target_id):
result.status = "skipped"
progress_cb(); progress_cb(); progress_cb(); progress_cb()
return result
result.apt_ok = False
result.cve_ok = False
write_cache(target.target_id, {
"timestamp": "",
"apt_count": 0,
"apt_packages": [],
"cve_count": 0,
"cve_list": [],
"error": "LXC éteint"
})
await progress_cb()
return
# Auto-install debsecan
debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id)
progress_cb()
# Scan APT
apt_ok, apt_packages, apt_err = scan_apt(target)
result.apt_ok = apt_ok
result.apt_count = len(apt_packages)
result.apt_packages = apt_packages
progress_cb()
await progress_cb()
# Scan CVE
if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target)
result.cve_ok = cve_ok
result.cve_count = len(cve_list)
result.cve_list = cve_list
if not cve_ok:
result.error = cve_err
# Enrichir les CVE via l'API Debian (severity, vector, fixable)
if cve_list:
all_cves, actionable_count = enrich_cves(cve_list)
result.cve_list = all_cves
result.cve_count = actionable_count
result.cve_total = len(all_cves)
else:
result.cve_list = []
result.cve_count = 0
result.cve_total = 0
else:
result.cve_ok = False
result.cve_count = 0
result.cve_list = []
result.error = f"debsecan: {debsecan_err}"
progress_cb()
# Scan OS
result.os_info = scan_os(target)
progress_cb()
await progress_cb()
result.status = "done" if (result.apt_ok and result.cve_ok) else "error"
if result.error:
result.status = "error"
write_cache("host" if target.is_host else target.target_id, {
"timestamp": datetime.now().isoformat(),
write_cache(target.target_id if not target.is_host else "host", {
"timestamp": "", # sera rempli par l'appelant
"apt_count": result.apt_count,
"apt_packages": result.apt_packages,
"cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list,
"error": result.error
})
progress_cb()
return result
await progress_cb()
async def run_full_scan(progress_cb: Any) -> list[ScanResult]:
targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
results = [ScanResult(target=t) for t in targets]
tasks = [scan_target(r.target, r, progress_cb) for r in results]
await asyncio.gather(*tasks)
return results

View file

@ -0,0 +1,72 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
class CVETable(Vertical):
DEFAULT_CSS = """
CVETable {
padding: 1 2;
}
#cve-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = None
self.urls = {}
def compose(self):
with Horizontal(id="cve-toolbar"):
yield Button("⬅ Retour", id="cve-back")
self.table = DataTable(id="cve-table")
yield self.table
def on_mount(self):
self.table.add_columns("CVE-ID", "Paquet", "Lien")
self.table.cursor_type = "row"
def load_data(self, cves: list[dict]):
self.table.clear()
self.urls = {}
for i, cve in enumerate(cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
self.urls[i] = url
self.table.add_row(cve_id, pkg, url)
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.table.rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
# Fallback : afficher le lien dans une notification
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.post_message(self.BackPressed())

View file

@ -1,126 +0,0 @@
from textual.screen import Screen
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
SCREEN_CSS = """
align: left top;
padding: 1 2;
#toolbar {
height: auto;
dock: top;
margin-bottom: 1;
}
DataTable {
height: 1fr;
border: solid $primary;
}
"""
class PackageListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
DEFAULT_CSS = """
PackageListScreen {
align: left top;
padding: 1 2;
}
PackageListScreen #toolbar {
height: auto;
margin-bottom: 1;
}
PackageListScreen DataTable {
height: 1fr;
border: solid $primary;
}
"""
def __init__(self, packages: list[dict]):
super().__init__()
self.packages = packages
def compose(self):
with Vertical():
with Horizontal(id="toolbar"):
yield Button("⬅ Retour", id="pkg-back", variant="default")
table = DataTable(id="pkg-table")
table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
for pkg in self.packages:
table.add_row(pkg.get("name", "?"), pkg.get("current", "?"), pkg.get("new", "?"), pkg.get("size", "-"))
yield table
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()
class CVEListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
DEFAULT_CSS = """
CVEListScreen {
align: left top;
padding: 1 2;
}
CVEListScreen #toolbar {
height: auto;
margin-bottom: 1;
}
CVEListScreen DataTable {
height: 1fr;
border: solid $primary;
}
"""
def __init__(self, cves: list[dict]):
super().__init__()
self.cves = cves
self.urls = {}
def compose(self):
with Vertical():
with Horizontal(id="toolbar"):
yield Button("⬅ Retour", id="cve-back", variant="default")
table = DataTable(id="cve-table")
table.add_columns("CVE-ID", "Paquet", "Severite", "Vecteur", "Corrigeable", "Lien")
table.cursor_type = "row"
for i, cve in enumerate(self.cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
severity = cve.get("severity", "?")
vector = cve.get("vector", "?")
fixable = "🟢 Oui" if cve.get("fixable") else "🔴 Non"
self.urls[i] = url
table.add_row(cve_id, pkg, severity, vector, fixable, url)
yield table
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.query_one("#cve-table").rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()

View file

@ -16,7 +16,7 @@ class LoaderScreen(Screen):
self.status_lines = [self._fmt_line(t) for t in targets]
def _fmt_line(self, target) -> str:
return f"{target.name:20} ⚫ ⚫"
return f"{target.name}: 🟠 🟠"
def compose(self):
with Container(id="loader-container"):
@ -29,8 +29,10 @@ class LoaderScreen(Screen):
def watch_progress(self, value: float):
try:
self.query_one("#loader-bar", ProgressBar).progress = value
self.query_one("#loader-percent", Static).update(f"{value:.0f} %")
bar = self.query_one("#loader-bar", ProgressBar)
bar.progress = value
pct = self.query_one("#loader-percent", Static)
pct.update(f"{value:.0f} %")
except Exception:
pass
@ -49,11 +51,11 @@ class LoaderScreen(Screen):
lines = list(self.status_lines)
t = self._targets[idx]
if status == "done":
lines[idx] = f"{t.name:20} 🟢 🟢"
lines[idx] = f"{t.name}: 🟢 🟢"
elif status == "error":
lines[idx] = f"{t.name:20} 🔴 🔴"
lines[idx] = f"{t.name}: 🔴 🔴"
elif status == "skipped":
lines[idx] = f"{t.name:20} ⚪ ⚪"
lines[idx] = f"{t.name}: ⚪ ⚪"
else:
lines[idx] = f"{t.name:20} 🟠 🟠"
lines[idx] = f"{t.name}: 🟠 🟠"
self.status_lines = lines

View file

@ -23,21 +23,21 @@ class LogPanel(Vertical):
def __init__(self):
super().__init__()
self._log_widget = None
self.log = None
def compose(self):
with Horizontal(id="log-toolbar"):
yield Button("⬅ Retour", id="log-back")
self._log_widget = RichLog(id="log-view", highlight=True)
yield self._log_widget
self.log = RichLog(id="log-view", highlight=True)
yield self.log
def write(self, line: str):
if self._log_widget:
self._log_widget.write(line)
if self.log:
self.log.write(line)
def clear(self):
if self._log_widget:
self._log_widget.clear()
if self.log:
self.log.clear()
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "log-back":

View file

@ -0,0 +1,48 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
class PackageTable(Vertical):
DEFAULT_CSS = """
PackageTable {
padding: 1 2;
}
#pkg-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = None
def compose(self):
with Horizontal(id="pkg-toolbar"):
yield Button("⬅ Retour", id="pkg-back")
self.table = DataTable(id="pkg-table")
yield self.table
def on_mount(self):
self.table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
def load_data(self, packages: list[dict]):
self.table.clear()
for pkg in packages:
self.table.add_row(
pkg.get("name", "?"),
pkg.get("current", "?"),
pkg.get("new", "?"),
pkg.get("size", "-")
)
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.post_message(self.BackPressed())

View file

@ -6,7 +6,7 @@ from textual.message import Message
class Sidebar(Vertical):
DEFAULT_CSS = """
Sidebar {
width: 40;
width: 28;
border-right: solid $primary-lighten-2;
padding: 0 1;
}
@ -42,8 +42,7 @@ class Sidebar(Vertical):
lv.clear()
self._items = []
for t in targets:
# Format: "100 traefik APT ⚫ CVE ⚫"
label = f"{t.name:20} APT ⚫ CVE ⚫"
label = f"{t.name} 🟠 🟠"
item = ListItem(Label(label), id=f"target-{t.target_id}")
lv.append(item)
self._items.append((t.target_id, t.name, item))
@ -52,24 +51,13 @@ class Sidebar(Vertical):
for tid, name, item in self._items:
if tid == target_id:
if skipped:
apt_color = ""
cve_color = ""
label = f"{name} ⚪ ⚪"
elif error:
apt_color = "🔴"
cve_color = "🔴"
elif apt_count > 0:
apt_color = "🔴"
label = f"{name} 🔴 🔴"
elif apt_count > 0 or cve_count > 0:
label = f"{name} {'🔴' if apt_count > 0 else '🟢'} {'🔴' if cve_count > 0 else '🟢'}"
else:
apt_color = "🟢"
if error and cve_count == 0:
cve_color = "🔴"
elif cve_count > 0:
cve_color = "🔴"
else:
cve_color = "🟢"
label = f"{name:20} APT {apt_color} CVE {cve_color}"
label = f"{name} 🟢 🟢"
item.children[0].update(label)
break

View file

@ -1,5 +1,6 @@
from textual.widgets import Static, Button
from textual.containers import Vertical, Horizontal
from textual.reactive import reactive
from textual.message import Message
@ -25,6 +26,10 @@ class SummaryPanel(Vertical):
height: auto;
margin: 1 0;
}
.summary-count {
color: $primary;
text-style: bold;
}
.summary-error {
color: $error;
text-style: bold;
@ -50,8 +55,13 @@ class SummaryPanel(Vertical):
self.target_id = target_id
super().__init__()
_current_target_id: str = ""
_current_target_name: str = ""
cache_time = reactive("")
apt_count = reactive(0)
cve_count = reactive(0)
target_id = reactive("")
target_name = reactive("")
error_msg = reactive("")
is_skipped = reactive(False)
def compose(self):
with Horizontal(id="summary-header"):
@ -60,29 +70,36 @@ class SummaryPanel(Vertical):
with Vertical(id="summary-body"):
yield Static("Sélectionnez une cible", id="summary-title")
yield Static("", id="summary-os", classes="summary-row")
yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default")
yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default")
yield Static("", id="summary-error", classes="summary-row summary-error")
yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary")
def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, cve_total: int, os_info: str, error: str, skipped: bool, cache_time: str):
self._current_target_id = target_id
self._current_target_name = name
def watch_cache_time(self, value: str):
self.query_one("#summary-cache", Static).update(f"Cache : {value}" if value else "")
title = self.query_one("#summary-title", Static)
os_label = self.query_one("#summary-os", Static)
def watch_apt_count(self, value: int):
self._update_display()
def watch_cve_count(self, value: int):
self._update_display()
def watch_error_msg(self, value: str):
self._update_display()
def watch_is_skipped(self, value: bool):
self._update_display()
def watch_target_name(self, value: str):
self.query_one("#summary-title", Static).update(value if value else "Sélectionnez une cible")
def _update_display(self):
apt_btn = self.query_one("#btn-apt", Button)
cve_btn = self.query_one("#btn-cve", Button)
err_label = self.query_one("#summary-error", Static)
btn = self.query_one("#btn-upgrade", Button)
cache_label = self.query_one("#summary-cache", Static)
title.update(name if name else "Sélectionnez une cible")
os_label.update(os_info if os_info else "")
cache_label.update(f"Cache : {cache_time}" if cache_time else "")
if skipped:
if self.is_skipped:
apt_btn.label = "Mises à jour : LXC éteint"
cve_btn.label = "CVE : LXC éteint"
err_label.update("")
@ -91,38 +108,37 @@ class SummaryPanel(Vertical):
cve_btn.disabled = True
return
if error:
apt_btn.label = f"Mises à jour : {apt_count}"
if self.error_msg:
apt_btn.label = f"Mises à jour : {self.apt_count}"
cve_btn.label = "CVE : ERREUR"
err_label.update(error)
err_label.update(self.error_msg)
btn.disabled = False
apt_btn.disabled = apt_count == 0
apt_btn.disabled = self.apt_count == 0
cve_btn.disabled = True
return
apt_btn.label = f"Mises à jour : {apt_count}"
if cve_total > cve_count:
cve_btn.label = f"CVE : {cve_count} ({cve_total - cve_count} non corrigeables)"
else:
cve_btn.label = f"CVE : {cve_count}"
apt_btn.label = f"Mises à jour : {self.apt_count}"
cve_btn.label = f"CVE : {self.cve_count}"
err_label.update("")
btn.disabled = False
apt_btn.disabled = apt_count == 0
cve_btn.disabled = cve_total == 0
apt_btn.disabled = self.apt_count == 0
cve_btn.disabled = self.cve_count == 0
# Forcer le refresh de tous les widgets modifiés
apt_btn.refresh()
cve_btn.refresh()
err_label.refresh()
btn.refresh()
cache_label.refresh()
def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, error: str, skipped: bool, cache_time: str):
self.target_id = target_id
self.target_name = name
self.apt_count = apt_count
self.cve_count = cve_count
self.error_msg = error
self.is_skipped = skipped
self.cache_time = cache_time
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn-reload":
self.post_message(self.ReloadPressed())
elif event.button.id == "btn-upgrade":
self.post_message(self.UpgradePressed(self._current_target_id, self._current_target_name))
self.post_message(self.UpgradePressed(self.target_id, self.target_name))
elif event.button.id == "btn-apt":
self.post_message(self.AptClicked(self._current_target_id))
self.post_message(self.AptClicked(self.target_id))
elif event.button.id == "btn-cve":
self.post_message(self.CveClicked(self._current_target_id))
self.post_message(self.CveClicked(self.target_id))

View file

@ -11,5 +11,4 @@ if [ ! -d "$VENV_DIR" ]; then
fi
source "$VENV_DIR/bin/activate"
export PYTHONPATH="/opt/full-updater:$PYTHONPATH"
python3 -m full_updater "$@"

View file

@ -5,6 +5,7 @@ REPO_URL="https://git.geronzi.fr"
OWNER="geronzi"
REPO="full_updater"
TOKEN_FILE="/etc/full-updater/token"
CONFIG_FILE="/etc/full-updater/config.ini"
INSTALLED_VERSION=$(dpkg -s full-updater 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "0.0.0")
echo "=== Full Updater Update ==="