commit 184b0e6033a868a2b6002c3f3008812fcd7c5e3e Author: enzo Date: Tue May 12 22:36:36 2026 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9230906 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +.env +.venv +env/ +venv/ diff --git a/full_updater/__init__.py b/full_updater/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/full_updater/__main__.py b/full_updater/__main__.py new file mode 100644 index 0000000..fad97c9 --- /dev/null +++ b/full_updater/__main__.py @@ -0,0 +1,8 @@ +from full_updater.app import FullUpdaterApp + +def main(): + app = FullUpdaterApp() + app.run() + +if __name__ == "__main__": + main() diff --git a/full_updater/app.py b/full_updater/app.py new file mode 100644 index 0000000..9797244 --- /dev/null +++ b/full_updater/app.py @@ -0,0 +1,316 @@ +import asyncio +from datetime import datetime + +from textual.app import App, ComposeResult +from textual.containers import Horizontal, Vertical +from textual.reactive import reactive +from textual.worker import get_current_worker +from textual import work + +from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp +from full_updater.backend.scanner import ( + Target, ScanResult, get_lxc_list, lxc_is_running, + ensure_debsecan_installed, scan_apt, scan_cve, write_cache +) +from full_updater.backend.executor import UpgradeExecutor +from full_updater.ui.loader import LoaderScreen +from full_updater.ui.sidebar import Sidebar +from full_updater.ui.summary import SummaryPanel +from full_updater.ui.package_table import PackageTable +from full_updater.ui.cve_table import CVETable +from full_updater.ui.log_panel import LogPanel +from full_updater.ui.confirm_modal import ConfirmModal + + +class FullUpdaterApp(App): + CSS = """ + Screen { align: center middle; } + #main-layout { layout: horizontal; height: 100%; } + Sidebar { width: 30; } + #content { width: 1fr; } + """ + + BINDINGS = [("q", "quit", "Quitter")] + + targets: list[Target] = [] + results: dict[str, ScanResult] = {} + selected_target: str | None = None + + def __init__(self): + super().__init__() + self.loader_screen = None + self._log_panel = None + + def compose(self) -> ComposeResult: + with Horizontal(id="main-layout"): + yield Sidebar() + with Vertical(id="content"): + yield SummaryPanel() + + def on_mount(self): + ensure_cache_dir() + self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list() + sidebar = self.query_one(Sidebar) + sidebar.set_targets(self.targets) + self.loader_screen = LoaderScreen(self.targets) + self.push_screen(self.loader_screen) + self.run_scan_all() + + @work(thread=True) + def run_scan_all(self): + total = len(self.targets) + completed = 0 + + def progress_cb(): + nonlocal completed + completed += 1 + pct = min(100.0, (completed / (total * 4)) * 100) + self.call_from_thread(self._update_loader, pct) + + for idx, target in enumerate(self.targets): + self.call_from_thread(self._update_loader_status, idx, "running") + + if not target.is_host and not lxc_is_running(target.target_id): + self.call_from_thread(self._update_loader_status, idx, "skipped") + result = ScanResult(target=target) + result.status = "skipped" + self.results[target.target_id] = result + for _ in range(4): + progress_cb() + continue + + debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id) + progress_cb() + + apt_ok, apt_packages, apt_err = scan_apt(target) + progress_cb() + + cve_ok = False + cve_list = [] + cve_err = "" + if debsecan_ok: + cve_ok, cve_list, cve_err = scan_cve(target) + else: + cve_err = f"debsecan: {debsecan_err}" + progress_cb() + + result = ScanResult( + target=target, + apt_ok=apt_ok, + cve_ok=cve_ok, + apt_count=len(apt_packages), + cve_count=len(cve_list), + apt_packages=apt_packages, + cve_list=cve_list, + error=apt_err or cve_err, + status="done" if (apt_ok and (cve_ok or not debsecan_ok)) else "error" + ) + if result.error: + result.status = "error" + self.results[target.target_id] = result + + cache_id = "host" if target.is_host else target.target_id + write_cache(cache_id, { + "timestamp": datetime.now().isoformat(), + "apt_count": result.apt_count, + "apt_packages": result.apt_packages, + "cve_count": result.cve_count, + "cve_list": result.cve_list, + "error": result.error + }) + + self.call_from_thread(self._update_loader_status, idx, result.status) + self.call_from_thread(self._update_sidebar, target.target_id, result) + + self.call_from_thread(self._finish_scan) + + def _update_loader(self, pct: float): + if self.loader_screen: + self.loader_screen.progress = pct + + def _update_loader_status(self, idx: int, status: str): + if self.loader_screen: + self.loader_screen.update_target_status(idx, status) + + def _update_sidebar(self, target_id: str, result: ScanResult): + sidebar = self.query_one(Sidebar) + sidebar.update_status( + target_id, + result.apt_ok, + result.cve_ok, + result.apt_count, + result.cve_count, + result.error, + result.status == "skipped" + ) + + def _finish_scan(self): + self.pop_screen() + sidebar = self.query_one(Sidebar) + for tid, res in self.results.items(): + sidebar.update_status( + tid, + res.apt_ok, + res.cve_ok, + res.apt_count, + res.cve_count, + res.error, + res.status == "skipped" + ) + if self.targets: + self._select_target(self.targets[0].target_id) + + def _select_target(self, target_id: str): + self.selected_target = target_id + cache_id = "host" if target_id == "host" else target_id + data = read_cache(cache_id) or {} + name = "hote" + for t in self.targets: + if t.target_id == target_id: + name = t.name + break + summary = self.query_one(SummaryPanel) + summary.set_target( + target_id=target_id, + name=name, + apt_count=data.get("apt_count", 0), + cve_count=data.get("cve_count", 0), + error=data.get("error", ""), + skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets), + cache_time=get_cache_timestamp(cache_id) + ) + + def on_sidebar_target_selected(self, event: Sidebar.TargetSelected): + self._select_target(event.target_id) + + def on_summary_panel_reload_pressed(self, event: SummaryPanel.ReloadPressed): + if self.selected_target: + self._reload_target(self.selected_target) + + @work(thread=True) + def _reload_target(self, target_id: str): + target = None + for t in self.targets: + if t.target_id == target_id: + target = t + break + if not target: + return + + if not target.is_host and not lxc_is_running(target.target_id): + result = ScanResult(target=target) + result.status = "skipped" + self.results[target.target_id] = result + write_cache(target.target_id, { + "timestamp": datetime.now().isoformat(), + "apt_count": 0, + "apt_packages": [], + "cve_count": 0, + "cve_list": [], + "error": "LXC éteint" + }) + self.call_from_thread(self._update_sidebar, target.target_id, result) + self.call_from_thread(self._select_target, target.target_id) + return + + ensure_debsecan_installed(target.is_host, target.target_id) + apt_ok, apt_packages, apt_err = scan_apt(target) + cve_ok, cve_list, cve_err = scan_cve(target) + error = apt_err or cve_err + + result = ScanResult( + target=target, + apt_ok=apt_ok, + cve_ok=cve_ok, + apt_count=len(apt_packages), + cve_count=len(cve_list), + apt_packages=apt_packages, + cve_list=cve_list, + error=error, + status="done" if (apt_ok and cve_ok) else "error" + ) + self.results[target.target_id] = result + cache_id = "host" if target.is_host else target.target_id + write_cache(cache_id, { + "timestamp": datetime.now().isoformat(), + "apt_count": result.apt_count, + "apt_packages": result.apt_packages, + "cve_count": result.cve_count, + "cve_list": result.cve_list, + "error": result.error + }) + self.call_from_thread(self._update_sidebar, target.target_id, result) + self.call_from_thread(self._select_target, target.target_id) + + def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed): + self.push_screen( + ConfirmModal( + "Confirmation", + f"Lancer la mise à jour sur {event.name} ?" + ), + callback=lambda confirmed: self._on_upgrade_confirmed(confirmed, event.target_id, event.name) + ) + + def _on_upgrade_confirmed(self, confirmed: bool, target_id: str, name: str): + if not confirmed: + return + self.query_one("#main-layout").display = False + self._log_panel = LogPanel() + self.mount(self._log_panel) + self._log_panel.clear() + self._log_panel.write(f"Lancement de la mise à jour sur {name}...") + self.run_upgrade(target_id, target_id == "host") + + @work + async def run_upgrade(self, target_id: str, is_host: bool): + log_panel = self._log_panel + if not log_panel: + return + executor = UpgradeExecutor( + target_id, is_host, + on_line=lambda line: self.call_from_thread(log_panel.write, line) + ) + ok = await executor.run() + if ok: + log_panel.write("\n✅ Mise à jour terminée avec succès.") + else: + log_panel.write("\n❌ Erreur lors de la mise à jour.") + self._reload_target(target_id) + + def on_log_panel_back_pressed(self, event: LogPanel.BackPressed): + self.query_one("#main-layout").display = True + if self._log_panel: + self._log_panel.remove() + self._log_panel = None + + def on_summary_panel_apt_clicked(self, event: SummaryPanel.AptClicked): + cache_id = "host" if event.target_id == "host" else event.target_id + data = read_cache(cache_id) or {} + pkgs = data.get("apt_packages", []) + if not pkgs: + return + table = PackageTable() + self.mount(table) + self.query_one("#main-layout").display = False + table.load_data(pkgs) + + def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked): + cache_id = "host" if event.target_id == "host" else event.target_id + data = read_cache(cache_id) or {} + cves = data.get("cve_list", []) + if not cves: + return + table = CVETable() + self.mount(table) + self.query_one("#main-layout").display = False + table.load_data(cves) + + def on_package_table_back_pressed(self, event: PackageTable.BackPressed): + self.query_one("#main-layout").display = True + for widget in self.query(PackageTable): + widget.remove() + + def on_cve_table_back_pressed(self, event: CVETable.BackPressed): + self.query_one("#main-layout").display = True + for widget in self.query(CVETable): + widget.remove() diff --git a/full_updater/backend/__init__.py b/full_updater/backend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/full_updater/backend/cache.py b/full_updater/backend/cache.py new file mode 100644 index 0000000..1edb94d --- /dev/null +++ b/full_updater/backend/cache.py @@ -0,0 +1,44 @@ +import json +import os +import shutil +from datetime import datetime +from typing import Any + +CACHE_DIR = "/tmp/full-updater-cache" + + +def ensure_cache_dir() -> None: + if os.path.exists(CACHE_DIR): + shutil.rmtree(CACHE_DIR) + os.makedirs(CACHE_DIR, exist_ok=True) + + +def write_cache(target_id: str, data: dict[str, Any]) -> None: + ensure_cache_dir() + path = os.path.join(CACHE_DIR, f"{target_id}.json") + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +def read_cache(target_id: str) -> dict[str, Any] | None: + path = os.path.join(CACHE_DIR, f"{target_id}.json") + if not os.path.exists(path): + return None + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def get_cache_timestamp(target_id: str) -> str: + path = os.path.join(CACHE_DIR, f"{target_id}.json") + if not os.path.exists(path): + return "" + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + ts = data.get("timestamp", "") + if ts: + dt = datetime.fromisoformat(ts) + return dt.strftime("%H:%M:%S") + except Exception: + pass + return "" diff --git a/full_updater/backend/executor.py b/full_updater/backend/executor.py new file mode 100644 index 0000000..0ce218a --- /dev/null +++ b/full_updater/backend/executor.py @@ -0,0 +1,38 @@ +import asyncio +import subprocess +from typing import Callable + + +class UpgradeExecutor: + def __init__(self, target_id: str, is_host: bool, on_line: Callable[[str], None]): + self.target_id = target_id + self.is_host = is_host + self.on_line = on_line + self.process = None + + async def run(self) -> bool: + if self.is_host: + cmd = ["apt-get", "full-upgrade", "-y"] + else: + cmd = ["pct", "exec", self.target_id, "--", "apt-get", "full-upgrade", "-y"] + + self.process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + async def read_stream(stream): + while True: + line = await stream.readline() + if not line: + break + self.on_line(line.decode("utf-8", errors="replace").rstrip()) + + await asyncio.gather( + read_stream(self.process.stdout), + read_stream(self.process.stderr) + ) + + returncode = await self.process.wait() + return returncode == 0 diff --git a/full_updater/backend/scanner.py b/full_updater/backend/scanner.py new file mode 100644 index 0000000..2f738fe --- /dev/null +++ b/full_updater/backend/scanner.py @@ -0,0 +1,228 @@ +import asyncio +import json +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from typing import Any + +from full_updater.backend.cache import write_cache + + +@dataclass +class Target: + target_id: str # 'host' ou '100' + name: str # 'hote' ou '100 traefik' + is_host: bool + + +@dataclass +class ScanResult: + target: Target + apt_ok: bool = False + cve_ok: bool = False + apt_count: int = 0 + cve_count: int = 0 + apt_packages: list[dict[str, str]] = field(default_factory=list) + cve_list: list[dict[str, str]] = field(default_factory=list) + error: str = "" + status: str = "pending" # pending | running | done | error | skipped + + +def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]: + try: + proc = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout, check=False + ) + return proc.returncode == 0, proc.stdout, proc.stderr + except subprocess.TimeoutExpired: + return False, "", "Command timed out" + except Exception as e: + return False, "", str(e) + + +def get_lxc_list() -> list[Target]: + ok, stdout, stderr = run_cmd(["pct", "list"]) + if not ok: + return [] + targets = [] + for line in stdout.strip().splitlines()[1:]: + parts = line.split() + if len(parts) >= 3: + vmid = parts[0] + name = parts[-1] if len(parts) > 2 else f"lxc-{vmid}" + targets.append(Target(target_id=vmid, name=f"{vmid} {name}", is_host=False)) + return targets + + +def lxc_is_running(vmid: str) -> bool: + ok, stdout, _ = run_cmd(["pct", "status", vmid]) + if ok and "running" in stdout.lower(): + return True + return False + + +def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]: + """Retourne (ok, error_msg)""" + if is_host: + # Vérifier si debsecan est installé sur l'hôte + ok, _, _ = run_cmd(["which", "debsecan"]) + if ok: + return True, "" + ok, out, err = run_cmd(["apt-get", "update"], timeout=60) + if not ok: + return False, f"apt-get update failed: {err}" + ok, out, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120) + if not ok: + return False, f"apt-get install debsecan failed: {err}" + return True, "" + else: + # Vérifier dans le LXC + ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"]) + if ok: + return True, "" + ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60) + if not ok: + return False, f"apt-get update failed in {vmid}: {err}" + ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120) + if not ok: + return False, f"apt-get install debsecan failed in {vmid}: {err}" + return True, "" + + +def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]: + if target.is_host: + cmd = ["apt", "list", "--upgradable"] + else: + cmd = ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"] + + ok, stdout, stderr = run_cmd(cmd, timeout=120) + if not ok: + return False, [], stderr or stdout + + packages = [] + for line in stdout.splitlines(): + if not line.strip() or line.startswith("Listing"): + continue + parts = line.split("/") + if len(parts) < 2: + continue + name = parts[0].strip() + rest = parts[1] + # Format: apt list --upgradable -> name/arch version suite [upgradable from: oldver] + m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line) + if m: + old_ver = m.group(1).strip() + # Essayer d'extraire la nouvelle version (avant le [upgradable from:]) + prefix = line.split("[upgradable from:")[0].strip() + ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix) + new_ver = ver_m.group(1) if ver_m else "?" + packages.append({ + "name": name, + "current": old_ver, + "new": new_ver, + "size": "-" + }) + else: + # fallback simple + packages.append({ + "name": name, + "current": "?", + "new": "?", + "size": "-" + }) + return True, packages, "" + + +def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]: + if target.is_host: + cmd = ["debsecan", "--format", "report", "--suite", "bookworm"] + else: + cmd = ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"] + + ok, stdout, stderr = run_cmd(cmd, timeout=120) + if not ok: + return False, [], stderr or stdout + + cves = [] + for line in stdout.splitlines(): + # Format debsecan: CVE-XXXX-XXXX package [remote] [low] - description + m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line) + if m: + cve_id = m.group(1) + pkg = m.group(2) + cves.append({ + "id": cve_id, + "package": pkg, + "url": f"https://security-tracker.debian.org/tracker/{cve_id}" + }) + return True, cves, "" + + +async def scan_target(target: Target, result: ScanResult, progress_cb: Any) -> None: + """Scanne une cible (APT puis CVE) et écrit le cache.""" + result.status = "running" + await progress_cb() + + # Vérifier si LXC est allumé + if not target.is_host and not lxc_is_running(target.target_id): + result.status = "skipped" + result.apt_ok = False + result.cve_ok = False + write_cache(target.target_id, { + "timestamp": "", + "apt_count": 0, + "apt_packages": [], + "cve_count": 0, + "cve_list": [], + "error": "LXC éteint" + }) + await progress_cb() + return + + # Auto-install debsecan + debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id) + + # Scan APT + apt_ok, apt_packages, apt_err = scan_apt(target) + result.apt_ok = apt_ok + result.apt_count = len(apt_packages) + result.apt_packages = apt_packages + await progress_cb() + + # Scan CVE + if debsecan_ok: + cve_ok, cve_list, cve_err = scan_cve(target) + result.cve_ok = cve_ok + result.cve_count = len(cve_list) + result.cve_list = cve_list + if not cve_ok: + result.error = cve_err + else: + result.cve_ok = False + result.cve_count = 0 + result.cve_list = [] + result.error = f"debsecan: {debsecan_err}" + await progress_cb() + + result.status = "done" if (result.apt_ok and result.cve_ok) else "error" + if result.error: + result.status = "error" + + write_cache(target.target_id if not target.is_host else "host", { + "timestamp": "", # sera rempli par l'appelant + "apt_count": result.apt_count, + "apt_packages": result.apt_packages, + "cve_count": result.cve_count, + "cve_list": result.cve_list, + "error": result.error + }) + await progress_cb() + + +async def run_full_scan(progress_cb: Any) -> list[ScanResult]: + targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list() + results = [ScanResult(target=t) for t in targets] + tasks = [scan_target(r.target, r, progress_cb) for r in results] + await asyncio.gather(*tasks) + return results diff --git a/full_updater/ui/__init__.py b/full_updater/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/full_updater/ui/confirm_modal.py b/full_updater/ui/confirm_modal.py new file mode 100644 index 0000000..08172e7 --- /dev/null +++ b/full_updater/ui/confirm_modal.py @@ -0,0 +1,50 @@ +from textual.screen import ModalScreen +from textual.widgets import Static, Button +from textual.containers import Vertical, Horizontal + + +class ConfirmModal(ModalScreen[bool]): + DEFAULT_CSS = """ + ConfirmModal { + align: center middle; + } + #modal-box { + width: 60; + height: auto; + border: thick $background 80%; + background: $surface; + padding: 1 2; + } + #modal-title { + text-align: center; + text-style: bold; + margin-bottom: 1; + } + #modal-msg { + margin-bottom: 1; + } + #modal-buttons { + height: auto; + layout: horizontal; + content-align: center middle; + } + """ + + def __init__(self, title: str, message: str): + super().__init__() + self.title = title + self.message = message + + def compose(self): + with Vertical(id="modal-box"): + yield Static(self.title, id="modal-title") + yield Static(self.message, id="modal-msg") + with Horizontal(id="modal-buttons"): + yield Button("Oui", id="modal-yes", variant="success") + yield Button("Non", id="modal-no", variant="error") + + def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "modal-yes": + self.dismiss(True) + else: + self.dismiss(False) diff --git a/full_updater/ui/cve_table.py b/full_updater/ui/cve_table.py new file mode 100644 index 0000000..ae4a144 --- /dev/null +++ b/full_updater/ui/cve_table.py @@ -0,0 +1,72 @@ +from textual.widgets import DataTable, Button +from textual.containers import Vertical, Horizontal +from textual.message import Message + +try: + import pyperclip + PYPERCLIP_OK = True +except Exception: + PYPERCLIP_OK = False + + +class CVETable(Vertical): + DEFAULT_CSS = """ + CVETable { + padding: 1 2; + } + #cve-toolbar { + height: auto; + margin-bottom: 1; + } + DataTable { + height: 1fr; + } + """ + + class BackPressed(Message): + pass + + def __init__(self): + super().__init__() + self.table = None + self.urls = {} + + def compose(self): + with Horizontal(id="cve-toolbar"): + yield Button("⬅ Retour", id="cve-back") + self.table = DataTable(id="cve-table") + yield self.table + + def on_mount(self): + self.table.add_columns("CVE-ID", "Paquet", "Lien") + self.table.cursor_type = "row" + + def load_data(self, cves: list[dict]): + self.table.clear() + self.urls = {} + for i, cve in enumerate(cves): + cve_id = cve.get("id", "?") + pkg = cve.get("package", "?") + url = cve.get("url", "") + self.urls[i] = url + self.table.add_row(cve_id, pkg, url) + + def on_data_table_row_selected(self, event: DataTable.RowSelected): + row_key = event.row_key + row_index = list(self.table.rows.keys()).index(row_key) + url = self.urls.get(row_index, "") + if not url: + return + if PYPERCLIP_OK: + try: + pyperclip.copy(url) + self.notify(f"Lien copié : {url}", severity="information") + return + except Exception: + pass + # Fallback : afficher le lien dans une notification + self.notify(f"Lien (copie manuelle) : {url}", severity="warning") + + def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "cve-back": + self.post_message(self.BackPressed()) diff --git a/full_updater/ui/loader.py b/full_updater/ui/loader.py new file mode 100644 index 0000000..2fc36c2 --- /dev/null +++ b/full_updater/ui/loader.py @@ -0,0 +1,61 @@ +from textual.screen import Screen +from textual.widgets import Static, ProgressBar +from textual.containers import Vertical, Container +from textual.reactive import reactive + + +class LoaderScreen(Screen): + BINDINGS = [("q", "quit", "Quitter")] + + progress: float = reactive(0.0) + status_lines: list[str] = reactive([]) + + def __init__(self, targets: list): + super().__init__() + self._targets = targets + self.status_lines = [self._fmt_line(t) for t in targets] + + def _fmt_line(self, target) -> str: + return f"{target.name}: 🟠 🟠" + + def compose(self): + with Container(id="loader-container"): + yield Static("Full Updater", id="loader-title") + yield ProgressBar(total=100.0, id="loader-bar") + yield Static(f"{self.progress:.0f} %", id="loader-percent") + with Vertical(id="loader-list"): + for line in self.status_lines: + yield Static(line, classes="loader-item") + + def watch_progress(self, value: float): + try: + bar = self.query_one("#loader-bar", ProgressBar) + bar.progress = value + pct = self.query_one("#loader-percent", Static) + pct.update(f"{value:.0f} %") + except Exception: + pass + + def watch_status_lines(self, lines: list): + try: + items = list(self.query(".loader-item")) + for i, item in enumerate(items): + if i < len(lines): + item.update(lines[i]) + except Exception: + pass + + def update_target_status(self, idx: int, status: str): + if idx < 0 or idx >= len(self._targets): + return + lines = list(self.status_lines) + t = self._targets[idx] + if status == "done": + lines[idx] = f"{t.name}: 🟢 🟢" + elif status == "error": + lines[idx] = f"{t.name}: 🔴 🔴" + elif status == "skipped": + lines[idx] = f"{t.name}: ⚪ ⚪" + else: + lines[idx] = f"{t.name}: 🟠 🟠" + self.status_lines = lines diff --git a/full_updater/ui/log_panel.py b/full_updater/ui/log_panel.py new file mode 100644 index 0000000..0ccfb15 --- /dev/null +++ b/full_updater/ui/log_panel.py @@ -0,0 +1,44 @@ +from textual.widgets import RichLog, Button +from textual.containers import Vertical, Horizontal +from textual.message import Message + + +class LogPanel(Vertical): + DEFAULT_CSS = """ + LogPanel { + padding: 1 2; + } + #log-toolbar { + height: auto; + margin-bottom: 1; + } + RichLog { + height: 1fr; + border: solid $primary; + } + """ + + class BackPressed(Message): + pass + + def __init__(self): + super().__init__() + self.log = None + + def compose(self): + with Horizontal(id="log-toolbar"): + yield Button("⬅ Retour", id="log-back") + self.log = RichLog(id="log-view", highlight=True) + yield self.log + + def write(self, line: str): + if self.log: + self.log.write(line) + + def clear(self): + if self.log: + self.log.clear() + + def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "log-back": + self.post_message(self.BackPressed()) diff --git a/full_updater/ui/package_table.py b/full_updater/ui/package_table.py new file mode 100644 index 0000000..25f4fcd --- /dev/null +++ b/full_updater/ui/package_table.py @@ -0,0 +1,48 @@ +from textual.widgets import DataTable, Button +from textual.containers import Vertical, Horizontal +from textual.message import Message + + +class PackageTable(Vertical): + DEFAULT_CSS = """ + PackageTable { + padding: 1 2; + } + #pkg-toolbar { + height: auto; + margin-bottom: 1; + } + DataTable { + height: 1fr; + } + """ + + class BackPressed(Message): + pass + + def __init__(self): + super().__init__() + self.table = None + + def compose(self): + with Horizontal(id="pkg-toolbar"): + yield Button("⬅ Retour", id="pkg-back") + self.table = DataTable(id="pkg-table") + yield self.table + + def on_mount(self): + self.table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille") + + def load_data(self, packages: list[dict]): + self.table.clear() + for pkg in packages: + self.table.add_row( + pkg.get("name", "?"), + pkg.get("current", "?"), + pkg.get("new", "?"), + pkg.get("size", "-") + ) + + def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "pkg-back": + self.post_message(self.BackPressed()) diff --git a/full_updater/ui/sidebar.py b/full_updater/ui/sidebar.py new file mode 100644 index 0000000..88f0c22 --- /dev/null +++ b/full_updater/ui/sidebar.py @@ -0,0 +1,69 @@ +from textual.widgets import Static, ListView, ListItem, Label +from textual.containers import Vertical +from textual.message import Message + + +class Sidebar(Vertical): + DEFAULT_CSS = """ + Sidebar { + width: 28; + border-right: solid $primary-lighten-2; + padding: 0 1; + } + ListView { + height: 1fr; + border: none; + } + ListView > ListItem { + height: auto; + padding: 0 1; + } + ListView > ListItem.--highlight { + background: $primary-darken-1; + } + """ + + class TargetSelected(Message): + def __init__(self, target_id: str, name: str): + self.target_id = target_id + self.name = name + super().__init__() + + def __init__(self): + super().__init__() + self._items: list[tuple[str, str, ListItem]] = [] + + def compose(self): + yield Static("Targets", id="sidebar-title") + yield ListView(id="sidebar-list") + + def set_targets(self, targets: list): + lv = self.query_one("#sidebar-list", ListView) + lv.clear() + self._items = [] + for t in targets: + label = f"{t.name} 🟠 🟠" + item = ListItem(Label(label), id=f"target-{t.target_id}") + lv.append(item) + self._items.append((t.target_id, t.name, item)) + + def update_status(self, target_id: str, apt_ok: bool, cve_ok: bool, apt_count: int, cve_count: int, error: str, skipped: bool = False): + for tid, name, item in self._items: + if tid == target_id: + if skipped: + label = f"{name} ⚪ ⚪" + elif error: + label = f"{name} 🔴 🔴" + elif apt_count > 0 or cve_count > 0: + label = f"{name} {'🔴' if apt_count > 0 else '🟢'} {'🔴' if cve_count > 0 else '🟢'}" + else: + label = f"{name} 🟢 🟢" + item.children[0].update(label) + break + + def on_list_view_selected(self, event: ListView.Selected): + item = event.item + for tid, name, it in self._items: + if it == item: + self.post_message(self.TargetSelected(tid, name)) + break diff --git a/full_updater/ui/summary.py b/full_updater/ui/summary.py new file mode 100644 index 0000000..b012b9f --- /dev/null +++ b/full_updater/ui/summary.py @@ -0,0 +1,144 @@ +from textual.widgets import Static, Button +from textual.containers import Vertical, Horizontal +from textual.reactive import reactive +from textual.message import Message + + +class SummaryPanel(Vertical): + DEFAULT_CSS = """ + SummaryPanel { + padding: 1 2; + } + #summary-header { + height: auto; + layout: horizontal; + content-align: right middle; + } + #summary-cache { + width: auto; + margin-left: 2; + } + #summary-body { + height: auto; + margin-top: 1; + } + .summary-row { + height: auto; + margin: 1 0; + } + .summary-count { + color: $primary; + text-style: bold; + } + .summary-error { + color: $error; + text-style: bold; + } + """ + + class ReloadPressed(Message): + pass + + class UpgradePressed(Message): + def __init__(self, target_id: str, name: str): + self.target_id = target_id + self.name = name + super().__init__() + + class AptClicked(Message): + def __init__(self, target_id: str): + self.target_id = target_id + super().__init__() + + class CveClicked(Message): + def __init__(self, target_id: str): + self.target_id = target_id + super().__init__() + + cache_time = reactive("") + apt_count = reactive(0) + cve_count = reactive(0) + target_id = reactive("") + target_name = reactive("") + error_msg = reactive("") + is_skipped = reactive(False) + + def compose(self): + with Horizontal(id="summary-header"): + yield Button("🔄 Reload", id="btn-reload") + yield Static("", id="summary-cache") + + with Vertical(id="summary-body"): + yield Static("Sélectionnez une cible", id="summary-title") + yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default") + yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default") + yield Static("", id="summary-error", classes="summary-row summary-error") + yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary") + + def watch_cache_time(self, value: str): + self.query_one("#summary-cache", Static).update(f"Cache : {value}" if value else "") + + def watch_apt_count(self, value: int): + self._update_display() + + def watch_cve_count(self, value: int): + self._update_display() + + def watch_error_msg(self, value: str): + self._update_display() + + def watch_is_skipped(self, value: bool): + self._update_display() + + def watch_target_name(self, value: str): + self.query_one("#summary-title", Static).update(value if value else "Sélectionnez une cible") + + def _update_display(self): + apt_btn = self.query_one("#btn-apt", Button) + cve_btn = self.query_one("#btn-cve", Button) + err_label = self.query_one("#summary-error", Static) + btn = self.query_one("#btn-upgrade", Button) + + if self.is_skipped: + apt_btn.label = "Mises à jour : LXC éteint" + cve_btn.label = "CVE : LXC éteint" + err_label.update("") + btn.disabled = True + apt_btn.disabled = True + cve_btn.disabled = True + return + + if self.error_msg: + apt_btn.label = f"Mises à jour : {self.apt_count}" + cve_btn.label = "CVE : ERREUR" + err_label.update(self.error_msg) + btn.disabled = False + apt_btn.disabled = self.apt_count == 0 + cve_btn.disabled = True + return + + apt_btn.label = f"Mises à jour : {self.apt_count}" + cve_btn.label = f"CVE : {self.cve_count}" + err_label.update("") + btn.disabled = False + apt_btn.disabled = self.apt_count == 0 + cve_btn.disabled = self.cve_count == 0 + + def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, error: str, skipped: bool, cache_time: str): + self.target_id = target_id + self.target_name = name + self.apt_count = apt_count + self.cve_count = cve_count + self.error_msg = error + self.is_skipped = skipped + self.cache_time = cache_time + + def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "btn-reload": + self.post_message(self.ReloadPressed()) + elif event.button.id == "btn-upgrade": + self.post_message(self.UpgradePressed(self.target_id, self.target_name)) + elif event.button.id == "btn-apt": + self.post_message(self.AptClicked(self.target_id)) + elif event.button.id == "btn-cve": + self.post_message(self.CveClicked(self.target_id)) diff --git a/install.sh b/install.sh new file mode 100644 index 0000000..cc53445 --- /dev/null +++ b/install.sh @@ -0,0 +1,30 @@ +#!/bin/bash +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV_DIR="$SCRIPT_DIR/.venv" +BIN_PATH="/usr/local/bin/fullupdater" + +echo "=== Full Updater - Installation ===" + +# Créer le venv +if [ ! -d "$VENV_DIR" ]; then + echo "Création du venv..." + python3 -m venv "$VENV_DIR" +fi + +echo "Installation des dépendances..." +"$VENV_DIR/bin/pip" install --upgrade pip +"$VENV_DIR/bin/pip" install -r "$SCRIPT_DIR/requirements.txt" + +# Créer le wrapper +cat > "$BIN_PATH" << 'EOF' +#!/bin/bash +SCRIPT_DIR="/opt/full-updater" +source "$SCRIPT_DIR/.venv/bin/activate" +python3 -m full_updater "$@" +EOF + +chmod +x "$BIN_PATH" + +echo "Installation terminée. Lancez 'fullupdater'." diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3b969d0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +textual>=0.50.0 +pyperclip>=1.8.0