refactor: sequential scan, screens for details, new sidebar layout
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s

- Scan sequential with asyncio.to_thread for proper Textual integration
- Use push_screen/pop_screen for package/CVE lists (no more mount/remove)
- Sidebar now shows 'APT X CVE X' with right-aligned indicators
- Loader uses black circles () for pending tasks
- Removed unused package_table.py and cve_table.py
This commit is contained in:
enzo 2026-05-13 01:36:45 +02:00
parent 2237d83ddd
commit 76eb75d4a6
7 changed files with 150 additions and 319 deletions

View file

@ -4,21 +4,18 @@ from datetime import datetime
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical from textual.containers import Horizontal, Vertical
from textual.reactive import reactive from textual.reactive import reactive
from textual.worker import get_current_worker
from textual import work from textual import work
from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp
from full_updater.backend.scanner import ( from full_updater.backend.scanner import (
Target, ScanResult, get_lxc_list, lxc_is_running, Target, ScanResult, get_lxc_list, lxc_is_running,
ensure_debsecan_installed, scan_apt, scan_cve, write_cache, ensure_debsecan_installed, scan_apt, scan_cve, write_cache, scan_target
run_full_scan_async
) )
from full_updater.backend.executor import UpgradeExecutor from full_updater.backend.executor import UpgradeExecutor
from full_updater.ui.loader import LoaderScreen from full_updater.ui.loader import LoaderScreen
from full_updater.ui.sidebar import Sidebar from full_updater.ui.sidebar import Sidebar
from full_updater.ui.summary import SummaryPanel from full_updater.ui.summary import SummaryPanel
from full_updater.ui.package_table import PackageTable from full_updater.ui.detail_screens import PackageListScreen, CVEListScreen
from full_updater.ui.cve_table import CVETable
from full_updater.ui.log_panel import LogPanel from full_updater.ui.log_panel import LogPanel
from full_updater.ui.confirm_modal import ConfirmModal from full_updater.ui.confirm_modal import ConfirmModal
@ -27,7 +24,7 @@ class FullUpdaterApp(App):
CSS = """ CSS = """
Screen { align: center middle; } Screen { align: center middle; }
#main-layout { layout: horizontal; height: 100%; } #main-layout { layout: horizontal; height: 100%; }
Sidebar { width: 30; } Sidebar { width: 42; }
#content { width: 1fr; } #content { width: 1fr; }
""" """
@ -62,25 +59,19 @@ class FullUpdaterApp(App):
total = len(self.targets) total = len(self.targets)
completed = 0 completed = 0
async def progress_cb(): def progress_cb():
nonlocal completed nonlocal completed
completed += 1 completed += 1
pct = min(100.0, (completed / (total * 4)) * 100) pct = min(100.0, (completed / (total * 4)) * 100)
self._update_loader(pct) self._update_loader(pct)
def on_result(result: ScanResult): for idx, target in enumerate(self.targets):
self.results[result.target.target_id] = result self._update_loader_status(idx, "running")
idx = 0 result = await asyncio.to_thread(scan_target, target, progress_cb)
for i, t in enumerate(self.targets): self.results[target.target_id] = result
if t.target_id == result.target.target_id:
idx = i
break
self._update_loader_status(idx, result.status) self._update_loader_status(idx, result.status)
self._update_sidebar(result.target.target_id, result) self._update_sidebar(target.target_id, result)
results = await run_full_scan_async(progress_cb)
for r in results:
on_result(r)
self._finish_scan() self._finish_scan()
def _update_loader(self, pct: float): def _update_loader(self, pct: float):
@ -116,12 +107,9 @@ class FullUpdaterApp(App):
res.error, res.error,
res.status == "skipped" res.status == "skipped"
) )
# Forcer le rafraichissement de l'interface
sidebar.refresh() sidebar.refresh()
if self.targets: if self.targets:
self._select_target(self.targets[0].target_id) self._select_target(self.targets[0].target_id)
summary = self.query_one(SummaryPanel)
summary.refresh()
def _select_target(self, target_id: str): def _select_target(self, target_id: str):
self.selected_target = target_id self.selected_target = target_id
@ -151,8 +139,8 @@ class FullUpdaterApp(App):
if self.selected_target: if self.selected_target:
self._reload_target(self.selected_target) self._reload_target(self.selected_target)
@work(thread=True) @work
def _reload_target(self, target_id: str): async def _reload_target(self, target_id: str):
target = None target = None
for t in self.targets: for t in self.targets:
if t.target_id == target_id: if t.target_id == target_id:
@ -167,32 +155,18 @@ class FullUpdaterApp(App):
self.results[target.target_id] = result self.results[target.target_id] = result
write_cache(target.target_id, { write_cache(target.target_id, {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"apt_count": 0, "apt_count": 0, "apt_packages": [],
"apt_packages": [], "cve_count": 0, "cve_list": [],
"cve_count": 0,
"cve_list": [],
"error": "LXC éteint" "error": "LXC éteint"
}) })
self.call_from_thread(self._update_sidebar, target.target_id, result) self._update_sidebar(target.target_id, result)
self.call_from_thread(self._select_target, target.target_id) self._select_target(target.target_id)
return return
ensure_debsecan_installed(target.is_host, target.target_id) def dummy_progress():
apt_ok, apt_packages, apt_err = scan_apt(target) pass
cve_ok, cve_list, cve_err = scan_cve(target)
error = apt_err or cve_err
result = ScanResult( result = await asyncio.to_thread(scan_target, target, dummy_progress)
target=target,
apt_ok=apt_ok,
cve_ok=cve_ok,
apt_count=len(apt_packages),
cve_count=len(cve_list),
apt_packages=apt_packages,
cve_list=cve_list,
error=error,
status="done" if (apt_ok and cve_ok) else "error"
)
self.results[target.target_id] = result self.results[target.target_id] = result
cache_id = "host" if target.is_host else target.target_id cache_id = "host" if target.is_host else target.target_id
write_cache(cache_id, { write_cache(cache_id, {
@ -203,8 +177,8 @@ class FullUpdaterApp(App):
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })
self.call_from_thread(self._update_sidebar, target.target_id, result) self._update_sidebar(target.target_id, result)
self.call_from_thread(self._select_target, target.target_id) self._select_target(target.target_id)
def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed): def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed):
self.push_screen( self.push_screen(
@ -253,11 +227,7 @@ class FullUpdaterApp(App):
pkgs = data.get("apt_packages", []) pkgs = data.get("apt_packages", [])
if not pkgs: if not pkgs:
return return
table = PackageTable() self.push_screen(PackageListScreen(pkgs))
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(pkgs)
table.focus()
def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked): def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked):
cache_id = "host" if event.target_id == "host" else event.target_id cache_id = "host" if event.target_id == "host" else event.target_id
@ -265,18 +235,4 @@ class FullUpdaterApp(App):
cves = data.get("cve_list", []) cves = data.get("cve_list", [])
if not cves: if not cves:
return return
table = CVETable() self.push_screen(CVEListScreen(cves))
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(cves)
table.focus()
def on_package_table_back_pressed(self, event: PackageTable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(PackageTable):
widget.remove()
def on_cve_table_back_pressed(self, event: CVETable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(CVETable):
widget.remove()

View file

@ -1,17 +1,16 @@
import asyncio
import re import re
import subprocess import subprocess
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from datetime import datetime
from typing import Callable
from full_updater.backend.cache import write_cache from full_updater.backend.cache import write_cache
from datetime import datetime
@dataclass @dataclass
class Target: class Target:
target_id: str # 'host' ou '100' target_id: str
name: str # 'hote' ou '100 traefik' name: str
is_host: bool is_host: bool
@ -25,14 +24,12 @@ class ScanResult:
apt_packages: list[dict[str, str]] = field(default_factory=list) apt_packages: list[dict[str, str]] = field(default_factory=list)
cve_list: list[dict[str, str]] = field(default_factory=list) cve_list: list[dict[str, str]] = field(default_factory=list)
error: str = "" error: str = ""
status: str = "pending" # pending | running | done | error | skipped status: str = "pending"
def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]: def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
try: try:
proc = subprocess.run( proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
return proc.returncode == 0, proc.stdout, proc.stderr return proc.returncode == 0, proc.stdout, proc.stderr
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return False, "", "Command timed out" return False, "", "Command timed out"
@ -41,7 +38,7 @@ def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
def get_lxc_list() -> list[Target]: def get_lxc_list() -> list[Target]:
ok, stdout, stderr = run_cmd(["pct", "list"]) ok, stdout, _ = run_cmd(["pct", "list"])
if not ok: if not ok:
return [] return []
targets = [] targets = []
@ -56,43 +53,32 @@ def get_lxc_list() -> list[Target]:
def lxc_is_running(vmid: str) -> bool: def lxc_is_running(vmid: str) -> bool:
ok, stdout, _ = run_cmd(["pct", "status", vmid]) ok, stdout, _ = run_cmd(["pct", "status", vmid])
if ok and "running" in stdout.lower(): return ok and "running" in stdout.lower()
return True
return False
def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]: def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]:
"""Retourne (ok, error_msg)"""
if is_host: if is_host:
ok, _, _ = run_cmd(["which", "debsecan"]) ok, _, _ = run_cmd(["which", "debsecan"])
if ok: if ok:
return True, "" return True, ""
ok, out, err = run_cmd(["apt-get", "update"], timeout=60) ok, _, err = run_cmd(["apt-get", "update"], timeout=60)
if not ok: if not ok:
return False, f"apt-get update failed: {err}" return False, f"apt-get update failed: {err}"
ok, out, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120) ok, _, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok: return (True, "") if ok else (False, f"apt-get install debsecan failed: {err}")
return False, f"apt-get install debsecan failed: {err}"
return True, ""
else: else:
ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"]) ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"])
if ok: if ok:
return True, "" return True, ""
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60) ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60)
if not ok: if not ok:
return False, f"apt-get update failed in {vmid}: {err}" return False, f"apt-get update failed in {vmid}: {err}"
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120) ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok: return (True, "") if ok else (False, f"apt-get install debsecan failed in {vmid}: {err}")
return False, f"apt-get install debsecan failed in {vmid}: {err}"
return True, ""
def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]: def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if target.is_host: cmd = ["apt", "list", "--upgradable"] if target.is_host else ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
cmd = ["apt", "list", "--upgradable"]
else:
cmd = ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
ok, stdout, stderr = run_cmd(cmd, timeout=120) ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok: if not ok:
return False, [], stderr or stdout return False, [], stderr or stdout
@ -105,35 +91,20 @@ def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if len(parts) < 2: if len(parts) < 2:
continue continue
name = parts[0].strip() name = parts[0].strip()
# Essayer d'extraire la nouvelle version et l'ancienne
m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line) m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line)
if m: if m:
old_ver = m.group(1).strip() old_ver = m.group(1).strip()
prefix = line.split("[upgradable from:")[0].strip() prefix = line.split("[upgradable from:")[0].strip()
ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix) ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix)
new_ver = ver_m.group(1) if ver_m else "?" new_ver = ver_m.group(1) if ver_m else "?"
packages.append({ packages.append({"name": name, "current": old_ver, "new": new_ver, "size": "-"})
"name": name,
"current": old_ver,
"new": new_ver,
"size": "-"
})
else: else:
packages.append({ packages.append({"name": name, "current": "?", "new": "?", "size": "-"})
"name": name,
"current": "?",
"new": "?",
"size": "-"
})
return True, packages, "" return True, packages, ""
def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]: def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if target.is_host: cmd = ["debsecan", "--format", "report", "--suite", "bookworm"] if target.is_host else ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"]
cmd = ["debsecan", "--format", "report", "--suite", "bookworm"]
else:
cmd = ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"]
ok, stdout, stderr = run_cmd(cmd, timeout=120) ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok: if not ok:
return False, [], stderr or stdout return False, [], stderr or stdout
@ -142,24 +113,18 @@ def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
for line in stdout.splitlines(): for line in stdout.splitlines():
m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line) m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line)
if m: if m:
cve_id = m.group(1) cves.append({"id": m.group(1), "package": m.group(2), "url": f"https://security-tracker.debian.org/tracker/{m.group(1)}"})
pkg = m.group(2)
cves.append({
"id": cve_id,
"package": pkg,
"url": f"https://security-tracker.debian.org/tracker/{cve_id}"
})
return True, cves, "" return True, cves, ""
def scan_single_target(target: Target, progress_cb: Any) -> ScanResult: def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
"""Scanne une cible (APT + CVE) et retourne le résultat."""
result = ScanResult(target=target) result = ScanResult(target=target)
result.status = "running" result.status = "running"
progress_cb() progress_cb()
if not target.is_host and not lxc_is_running(target.target_id): if not target.is_host and not lxc_is_running(target.target_id):
result.status = "skipped" result.status = "skipped"
progress_cb(); progress_cb(); progress_cb(); progress_cb()
return result return result
debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id) debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id)
@ -187,8 +152,7 @@ def scan_single_target(target: Target, progress_cb: Any) -> ScanResult:
if result.error: if result.error:
result.status = "error" result.status = "error"
cache_id = "host" if target.is_host else target.target_id write_cache("host" if target.is_host else target.target_id, {
write_cache(cache_id, {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
@ -198,68 +162,3 @@ def scan_single_target(target: Target, progress_cb: Any) -> ScanResult:
}) })
progress_cb() progress_cb()
return result return result
async def scan_single_target_async(target: Target, progress_cb: Any) -> ScanResult:
"""Version async de scan_single_target (pour asyncio.gather)."""
result = ScanResult(target=target)
result.status = "running"
await progress_cb()
if not target.is_host and not lxc_is_running(target.target_id):
result.status = "skipped"
await progress_cb()
await progress_cb()
await progress_cb()
await progress_cb()
return result
debsecan_ok, debsecan_err = await asyncio.to_thread(ensure_debsecan_installed, target.is_host, target.target_id)
await progress_cb()
apt_ok, apt_packages, apt_err = await asyncio.to_thread(scan_apt, target)
result.apt_ok = apt_ok
result.apt_count = len(apt_packages)
result.apt_packages = apt_packages
await progress_cb()
if debsecan_ok:
cve_ok, cve_list, cve_err = await asyncio.to_thread(scan_cve, target)
result.cve_ok = cve_ok
result.cve_count = len(cve_list)
result.cve_list = cve_list
if not cve_ok:
result.error = cve_err
else:
result.cve_ok = False
result.error = f"debsecan: {debsecan_err}"
await progress_cb()
result.status = "done" if (result.apt_ok and result.cve_ok) else "error"
if result.error:
result.status = "error"
cache_id = "host" if target.is_host else target.target_id
write_cache(cache_id, {
"timestamp": datetime.now().isoformat(),
"apt_count": result.apt_count,
"apt_packages": result.apt_packages,
"cve_count": result.cve_count,
"cve_list": result.cve_list,
"error": result.error
})
await progress_cb()
return result
async def run_full_scan_async(progress_cb: Any) -> list[ScanResult]:
targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
coros = [scan_single_target_async(t, progress_cb) for t in targets]
results = await asyncio.gather(*coros, return_exceptions=True)
out = []
for r in results:
if isinstance(r, Exception):
out.append(ScanResult(target=Target(target_id="unknown", name="unknown", is_host=False), status="error", error=str(r)))
else:
out.append(r)
return out

View file

@ -1,69 +0,0 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
class CVETable(Vertical):
DEFAULT_CSS = """
CVETable {
padding: 1 2;
}
#cve-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = DataTable(id="cve-table")
self.urls = {}
def compose(self):
with Horizontal(id="cve-toolbar"):
yield Button("⬅ Retour", id="cve-back")
self.table.add_columns("CVE-ID", "Paquet", "Lien")
self.table.cursor_type = "row"
yield self.table
def load_data(self, cves: list[dict]):
self.table.clear()
self.urls = {}
for i, cve in enumerate(cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
self.urls[i] = url
self.table.add_row(cve_id, pkg, url)
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.table.rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
# Fallback : afficher le lien dans une notification
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.post_message(self.BackPressed())

View file

@ -0,0 +1,80 @@
from textual.screen import Screen
from textual.widgets import DataTable, Button, Static
from textual.containers import Vertical, Horizontal
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
class PackageListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
def __init__(self, packages: list[dict]):
super().__init__()
self.packages = packages
def compose(self):
with Vertical():
with Horizontal(id="pkg-toolbar"):
yield Button("⬅ Retour", id="pkg-back", variant="default")
table = DataTable(id="pkg-table")
table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
for pkg in self.packages:
table.add_row(pkg.get("name", "?"), pkg.get("current", "?"), pkg.get("new", "?"), pkg.get("size", "-"))
yield table
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()
class CVEListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
def __init__(self, cves: list[dict]):
super().__init__()
self.cves = cves
self.urls = {}
def compose(self):
with Vertical():
with Horizontal(id="cve-toolbar"):
yield Button("⬅ Retour", id="cve-back", variant="default")
table = DataTable(id="cve-table")
table.add_columns("CVE-ID", "Paquet", "Lien")
table.cursor_type = "row"
for i, cve in enumerate(self.cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
self.urls[i] = url
table.add_row(cve_id, pkg, url)
yield table
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.query_one("#cve-table").rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()

View file

@ -16,7 +16,7 @@ class LoaderScreen(Screen):
self.status_lines = [self._fmt_line(t) for t in targets] self.status_lines = [self._fmt_line(t) for t in targets]
def _fmt_line(self, target) -> str: def _fmt_line(self, target) -> str:
return f"{target.name}: 🟠 🟠" return f"{target.name:20} ⚫ ⚫"
def compose(self): def compose(self):
with Container(id="loader-container"): with Container(id="loader-container"):
@ -29,10 +29,8 @@ class LoaderScreen(Screen):
def watch_progress(self, value: float): def watch_progress(self, value: float):
try: try:
bar = self.query_one("#loader-bar", ProgressBar) self.query_one("#loader-bar", ProgressBar).progress = value
bar.progress = value self.query_one("#loader-percent", Static).update(f"{value:.0f} %")
pct = self.query_one("#loader-percent", Static)
pct.update(f"{value:.0f} %")
except Exception: except Exception:
pass pass
@ -51,11 +49,11 @@ class LoaderScreen(Screen):
lines = list(self.status_lines) lines = list(self.status_lines)
t = self._targets[idx] t = self._targets[idx]
if status == "done": if status == "done":
lines[idx] = f"{t.name}: 🟢 🟢" lines[idx] = f"{t.name:20} 🟢 🟢"
elif status == "error": elif status == "error":
lines[idx] = f"{t.name}: 🔴 🔴" lines[idx] = f"{t.name:20} 🔴 🔴"
elif status == "skipped": elif status == "skipped":
lines[idx] = f"{t.name}: ⚪ ⚪" lines[idx] = f"{t.name:20} ⚪ ⚪"
else: else:
lines[idx] = f"{t.name}: 🟠 🟠" lines[idx] = f"{t.name:20} 🟠 🟠"
self.status_lines = lines self.status_lines = lines

View file

@ -1,45 +0,0 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
class PackageTable(Vertical):
DEFAULT_CSS = """
PackageTable {
padding: 1 2;
}
#pkg-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = DataTable(id="pkg-table")
def compose(self):
with Horizontal(id="pkg-toolbar"):
yield Button("⬅ Retour", id="pkg-back")
self.table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
yield self.table
def load_data(self, packages: list[dict]):
self.table.clear()
for pkg in packages:
self.table.add_row(
pkg.get("name", "?"),
pkg.get("current", "?"),
pkg.get("new", "?"),
pkg.get("size", "-")
)
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.post_message(self.BackPressed())

View file

@ -6,7 +6,7 @@ from textual.message import Message
class Sidebar(Vertical): class Sidebar(Vertical):
DEFAULT_CSS = """ DEFAULT_CSS = """
Sidebar { Sidebar {
width: 28; width: 40;
border-right: solid $primary-lighten-2; border-right: solid $primary-lighten-2;
padding: 0 1; padding: 0 1;
} }
@ -42,7 +42,8 @@ class Sidebar(Vertical):
lv.clear() lv.clear()
self._items = [] self._items = []
for t in targets: for t in targets:
label = f"{t.name} 🟠 🟠" # Format: "100 traefik APT ⚫ CVE ⚫"
label = f"{t.name:20} APT ⚫ CVE ⚫"
item = ListItem(Label(label), id=f"target-{t.target_id}") item = ListItem(Label(label), id=f"target-{t.target_id}")
lv.append(item) lv.append(item)
self._items.append((t.target_id, t.name, item)) self._items.append((t.target_id, t.name, item))
@ -51,13 +52,24 @@ class Sidebar(Vertical):
for tid, name, item in self._items: for tid, name, item in self._items:
if tid == target_id: if tid == target_id:
if skipped: if skipped:
label = f"{name} ⚪ ⚪" apt_color = ""
cve_color = ""
elif error: elif error:
label = f"{name} 🔴 🔴" apt_color = "🔴"
elif apt_count > 0 or cve_count > 0: cve_color = "🔴"
label = f"{name} {'🔴' if apt_count > 0 else '🟢'} {'🔴' if cve_count > 0 else '🟢'}" elif apt_count > 0:
apt_color = "🔴"
else: else:
label = f"{name} 🟢 🟢" apt_color = "🟢"
if error and cve_count == 0:
cve_color = "🔴"
elif cve_count > 0:
cve_color = "🔴"
else:
cve_color = "🟢"
label = f"{name:20} APT {apt_color} CVE {cve_color}"
item.children[0].update(label) item.children[0].update(label)
break break