Compare commits

..

27 commits

Author SHA1 Message Date
11ada690a1 fix(cve): enrich CVEs with severity/vector from Debian Security Tracker HTML
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- debsecan default format has no [remote|local] or [severity] flags
- Now fetches severity and vector from security-tracker.debian.org HTML
- scan_cve uses default format (no --format report)
- enrich_cves replaces filter_actionable_cves, adds severity+vector
2026-05-13 04:33:05 +02:00
9107009370 fix(cve): robust parsing of [remote|local] and [severity] flags
All checks were successful
Build and Release .deb / build-deb (push) Successful in 22s
- Use re.findall to extract all bracketed flags instead of positional regex
- Fixes issue where optional groups were not captured correctly
2026-05-13 04:23:45 +02:00
13d826ecd2 feat(cve): add severity and vector columns to CVE list
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Parse [remote|local] and [severity] from debsecan output
- Display Severity and Vector columns in CVEListScreen
- Severity values: unimportant, low, medium, high, critical
2026-05-13 04:20:14 +02:00
721e677fa6 fix(os): use NAME directly, avoid double 'GNU/Linux'
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- NAME already contains 'Debian GNU/Linux', don't append it again
- With DEBIAN_VERSION_FULL: NAME + DEBIAN_VERSION_FULL + (codename)
- Without: NAME + VERSION_ID + (codename)
- Fixes duplication bug
2026-05-13 04:15:57 +02:00
c1f462d209 fix(os): parse /etc/os-release line-by-line instead of regex
All checks were successful
Build and Release .deb / build-deb (push) Successful in 22s
- pct exec returns stdout differently than local cat
- Line-by-line parsing is more robust across LXC boundaries
- Handles quotes correctly
2026-05-13 04:10:01 +02:00
f0f59f0468 fix(os): read DEBIAN_VERSION_FULL from /etc/os-release first
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Check DEBIAN_VERSION_FULL (ex: 13.4) as primary source
- Fallback to VERSION, then VERSION_ID, then /etc/debian_version
2026-05-13 04:05:45 +02:00
d90f74dd8f fix(os): use VERSION_ID from /etc/os-release for full version number in LXC
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 03:59:47 +02:00
2d339cc397 feat(os): show full version number from /etc/debian_version
All checks were successful
Build and Release .deb / build-deb (push) Successful in 22s
- Use lsb_release -is/-cs for name and codename
- Use /etc/debian_version for full version (e.g. 12.9)
- Display format: 'Debian GNU/Linux 12.9 (bookworm)'
2026-05-13 03:52:10 +02:00
6e707da98d feat(ui): display OS name and version in SummaryPanel
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Add scan_os() to detect OS via lsb_release or /etc/os-release
- Store os_info in cache and ScanResult
- Display OS info in SummaryPanel below the target name
2026-05-13 03:07:28 +02:00
3bb213a00d fix(cve): parse HTML instead of non-existent JSON API
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Debian Security Tracker has no public JSON API for individual CVEs
- Now fetches and parses the HTML page directly
- Searches for 'bookworm ... fixed' pattern in the vulnerability table
- Cache files changed from .json to .html
2026-05-13 02:56:16 +02:00
9c4e40505b fix(summary): enable CVE button when non-fixable CVEs exist
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Use cve_total instead of cve_count to determine button disabled state
- Allows viewing all CVEs even when none are fixable
2026-05-13 02:38:58 +02:00
86eda73eb9 feat(cve): show all CVEs with fixable indicator in list screen
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- filter_actionable_cves now marks all CVEs with 'fixable' boolean
- cve_list in cache contains ALL CVEs (not just actionable ones)
- CVEListScreen adds 'Corrigeable' column with 🟢/🔴 indicator
- Sidebar counter still shows only actionable CVEs (cve_count)
2026-05-13 02:35:15 +02:00
e22f416500 feat(cve): filter actionable CVEs via Debian Security Tracker API
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Add filter_actionable_cves() that queries security-tracker.debian.org
- Cache API responses in /tmp/full-updater-cache/cve-api/
- Use ThreadPoolExecutor(max_workers=10) for parallel API calls
- cve_count now shows only actionable CVEs (with fixed_version)
- cve_total stored for info, shown as 'CVE: X (Y non corrigeables)'
2026-05-13 02:27:22 +02:00
af9e061ab5 fix(upgrade): remove call_from_thread in async worker, call log_panel.write directly
All checks were successful
Build and Release .deb / build-deb (push) Successful in 22s
2026-05-13 02:11:08 +02:00
eed84f36e8 fix(log_panel): rename self.log to self._log_widget to avoid Textual property conflict
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 02:07:25 +02:00
f4db16327f fix(cache): purge cache only once at startup, not on every write
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Add clear_cache() called once in on_mount
- ensure_cache_dir() no longer deletes existing files
- This fixes the issue where only the last target's cache survived
- Add widget.refresh() calls to force UI updates
2026-05-13 02:00:48 +02:00
8f623e1df6 fix(ui): remove reactive vars in SummaryPanel, use direct widget update
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Replace reactive attributes with direct widget updates in set_target
- Delay _select_target by 0.1s after pop_screen to ensure DOM is stable
- Remove summary.refresh() calls that interfere with direct updates
2026-05-13 01:54:57 +02:00
07e84ae69f fix(ui): use DEFAULT_CSS with selectors in detail screens
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Replace inline CSS string with DEFAULT_CSS class attribute
- Add proper Textual CSS selectors (ScreenName, #toolbar, DataTable)
- Remove invalid align at root level
2026-05-13 01:48:00 +02:00
1715a6cf01 fix(ui): add proper CSS layout for detail screens
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Add dock: top and fixed height for toolbar buttons
- DataTable now fills remaining space with height: 1fr
- Add borders and padding for better visual separation
2026-05-13 01:44:28 +02:00
76eb75d4a6 refactor: sequential scan, screens for details, new sidebar layout
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Scan sequential with asyncio.to_thread for proper Textual integration
- Use push_screen/pop_screen for package/CVE lists (no more mount/remove)
- Sidebar now shows 'APT X CVE X' with right-aligned indicators
- Loader uses black circles () for pending tasks
- Removed unused package_table.py and cve_table.py
2026-05-13 01:36:45 +02:00
2237d83ddd fix(ui,backend): async scan with asyncio.gather, fix CVE back button focus
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Replace ThreadPoolExecutor with asyncio.gather + asyncio.to_thread
- Use @work async worker for proper Textual integration
- Add table.focus() after mount to ensure button events are received
- Remove all call_from_thread calls (now in main thread)
2026-05-13 01:26:18 +02:00
8f7cb33a9c fix(ui,backend): parallel scan, fix DataTable init, force refresh
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
- Use ThreadPoolExecutor for parallel scanning of all targets
- Fix PackageTable and CVETable crash by initializing columns in compose()
- Force widget refresh after scan completion to update counters
2026-05-13 01:18:08 +02:00
e2e504ceff fix(scripts): add PYTHONPATH so venv finds full_updater module
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 01:06:54 +02:00
978297455a fix(debian): remove git and pct from Depends, not available on standard Debian
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 01:02:23 +02:00
1adaf9768f fix(scripts): revert to public URL, rely on /etc/hosts for NAT loopback
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 00:57:49 +02:00
ed7ce07f13 fix(scripts): detect internal network, use 10.0.0.4:3000 for NAT loopback 2026-05-13 00:54:18 +02:00
5a38bb4bbd fix(ci): remove unsupported upload-artifact@v4, replace with ls
All checks were successful
Build and Release .deb / build-deb (push) Successful in 21s
2026-05-13 00:50:02 +02:00
14 changed files with 435 additions and 403 deletions

View file

@ -77,8 +77,8 @@ jobs:
"http://10.0.0.4:3000/api/v1/repos/geronzi/full_updater/releases/${RELEASE_ID}/assets?name=$(basename ${FILE})" "http://10.0.0.4:3000/api/v1/repos/geronzi/full_updater/releases/${RELEASE_ID}/assets?name=$(basename ${FILE})"
fi fi
- name: Store artifact - name: List dist artifacts
uses: actions/upload-artifact@v4 working-directory: /workspace/repo
with: run: |
name: full-updater-deb echo "=== Built artifacts ==="
path: /workspace/repo/dist/*.deb ls -la dist/

2
debian/control vendored
View file

@ -7,7 +7,7 @@ Standards-Version: 4.6.0
Package: full-updater Package: full-updater
Architecture: all Architecture: all
Depends: python3, python3-venv, git, pct Depends: python3, python3-venv
Description: TUI for APT/CVE updates on Proxmox host and LXC Description: TUI for APT/CVE updates on Proxmox host and LXC
Full Updater is a terminal user interface (TUI) that visualizes Full Updater is a terminal user interface (TUI) that visualizes
available APT updates and CVEs for the Proxmox host and all LXC available APT updates and CVEs for the Proxmox host and all LXC

View file

@ -4,20 +4,18 @@ from datetime import datetime
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical from textual.containers import Horizontal, Vertical
from textual.reactive import reactive from textual.reactive import reactive
from textual.worker import get_current_worker
from textual import work from textual import work
from full_updater.backend.cache import ensure_cache_dir, read_cache, get_cache_timestamp from full_updater.backend.cache import clear_cache, ensure_cache_dir, read_cache, get_cache_timestamp
from full_updater.backend.scanner import ( from full_updater.backend.scanner import (
Target, ScanResult, get_lxc_list, lxc_is_running, Target, ScanResult, get_lxc_list, lxc_is_running,
ensure_debsecan_installed, scan_apt, scan_cve, write_cache ensure_debsecan_installed, scan_apt, scan_cve, write_cache, scan_target
) )
from full_updater.backend.executor import UpgradeExecutor from full_updater.backend.executor import UpgradeExecutor
from full_updater.ui.loader import LoaderScreen from full_updater.ui.loader import LoaderScreen
from full_updater.ui.sidebar import Sidebar from full_updater.ui.sidebar import Sidebar
from full_updater.ui.summary import SummaryPanel from full_updater.ui.summary import SummaryPanel
from full_updater.ui.package_table import PackageTable from full_updater.ui.detail_screens import PackageListScreen, CVEListScreen
from full_updater.ui.cve_table import CVETable
from full_updater.ui.log_panel import LogPanel from full_updater.ui.log_panel import LogPanel
from full_updater.ui.confirm_modal import ConfirmModal from full_updater.ui.confirm_modal import ConfirmModal
@ -26,7 +24,7 @@ class FullUpdaterApp(App):
CSS = """ CSS = """
Screen { align: center middle; } Screen { align: center middle; }
#main-layout { layout: horizontal; height: 100%; } #main-layout { layout: horizontal; height: 100%; }
Sidebar { width: 30; } Sidebar { width: 42; }
#content { width: 1fr; } #content { width: 1fr; }
""" """
@ -48,6 +46,7 @@ class FullUpdaterApp(App):
yield SummaryPanel() yield SummaryPanel()
def on_mount(self): def on_mount(self):
clear_cache()
ensure_cache_dir() ensure_cache_dir()
self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list() self.targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
sidebar = self.query_one(Sidebar) sidebar = self.query_one(Sidebar)
@ -56,8 +55,8 @@ class FullUpdaterApp(App):
self.push_screen(self.loader_screen) self.push_screen(self.loader_screen)
self.run_scan_all() self.run_scan_all()
@work(thread=True) @work
def run_scan_all(self): async def run_scan_all(self):
total = len(self.targets) total = len(self.targets)
completed = 0 completed = 0
@ -65,64 +64,16 @@ class FullUpdaterApp(App):
nonlocal completed nonlocal completed
completed += 1 completed += 1
pct = min(100.0, (completed / (total * 4)) * 100) pct = min(100.0, (completed / (total * 4)) * 100)
self.call_from_thread(self._update_loader, pct) self._update_loader(pct)
for idx, target in enumerate(self.targets): for idx, target in enumerate(self.targets):
self.call_from_thread(self._update_loader_status, idx, "running") self._update_loader_status(idx, "running")
result = await asyncio.to_thread(scan_target, target, progress_cb)
if not target.is_host and not lxc_is_running(target.target_id):
self.call_from_thread(self._update_loader_status, idx, "skipped")
result = ScanResult(target=target)
result.status = "skipped"
self.results[target.target_id] = result
for _ in range(4):
progress_cb()
continue
debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id)
progress_cb()
apt_ok, apt_packages, apt_err = scan_apt(target)
progress_cb()
cve_ok = False
cve_list = []
cve_err = ""
if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target)
else:
cve_err = f"debsecan: {debsecan_err}"
progress_cb()
result = ScanResult(
target=target,
apt_ok=apt_ok,
cve_ok=cve_ok,
apt_count=len(apt_packages),
cve_count=len(cve_list),
apt_packages=apt_packages,
cve_list=cve_list,
error=apt_err or cve_err,
status="done" if (apt_ok and (cve_ok or not debsecan_ok)) else "error"
)
if result.error:
result.status = "error"
self.results[target.target_id] = result self.results[target.target_id] = result
self._update_loader_status(idx, result.status)
self._update_sidebar(target.target_id, result)
cache_id = "host" if target.is_host else target.target_id self._finish_scan()
write_cache(cache_id, {
"timestamp": datetime.now().isoformat(),
"apt_count": result.apt_count,
"apt_packages": result.apt_packages,
"cve_count": result.cve_count,
"cve_list": result.cve_list,
"error": result.error
})
self.call_from_thread(self._update_loader_status, idx, result.status)
self.call_from_thread(self._update_sidebar, target.target_id, result)
self.call_from_thread(self._finish_scan)
def _update_loader(self, pct: float): def _update_loader(self, pct: float):
if self.loader_screen: if self.loader_screen:
@ -143,6 +94,7 @@ class FullUpdaterApp(App):
result.error, result.error,
result.status == "skipped" result.status == "skipped"
) )
sidebar.refresh()
def _finish_scan(self): def _finish_scan(self):
self.pop_screen() self.pop_screen()
@ -157,8 +109,9 @@ class FullUpdaterApp(App):
res.error, res.error,
res.status == "skipped" res.status == "skipped"
) )
sidebar.refresh()
if self.targets: if self.targets:
self._select_target(self.targets[0].target_id) self.set_timer(0.2, lambda: self._select_target(self.targets[0].target_id))
def _select_target(self, target_id: str): def _select_target(self, target_id: str):
self.selected_target = target_id self.selected_target = target_id
@ -175,10 +128,13 @@ class FullUpdaterApp(App):
name=name, name=name,
apt_count=data.get("apt_count", 0), apt_count=data.get("apt_count", 0),
cve_count=data.get("cve_count", 0), cve_count=data.get("cve_count", 0),
cve_total=data.get("cve_total", 0),
os_info=data.get("os_info", ""),
error=data.get("error", ""), error=data.get("error", ""),
skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets), skipped=not data and any(t.target_id == target_id and not t.is_host and not lxc_is_running(t.target_id) for t in self.targets),
cache_time=get_cache_timestamp(cache_id) cache_time=get_cache_timestamp(cache_id)
) )
summary.refresh()
def on_sidebar_target_selected(self, event: Sidebar.TargetSelected): def on_sidebar_target_selected(self, event: Sidebar.TargetSelected):
self._select_target(event.target_id) self._select_target(event.target_id)
@ -187,8 +143,8 @@ class FullUpdaterApp(App):
if self.selected_target: if self.selected_target:
self._reload_target(self.selected_target) self._reload_target(self.selected_target)
@work(thread=True) @work
def _reload_target(self, target_id: str): async def _reload_target(self, target_id: str):
target = None target = None
for t in self.targets: for t in self.targets:
if t.target_id == target_id: if t.target_id == target_id:
@ -203,32 +159,19 @@ class FullUpdaterApp(App):
self.results[target.target_id] = result self.results[target.target_id] = result
write_cache(target.target_id, { write_cache(target.target_id, {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"apt_count": 0, "apt_count": 0, "apt_packages": [],
"apt_packages": [], "cve_count": 0, "cve_total": 0, "cve_list": [],
"cve_count": 0, "os_info": "LXC éteint",
"cve_list": [],
"error": "LXC éteint" "error": "LXC éteint"
}) })
self.call_from_thread(self._update_sidebar, target.target_id, result) self._update_sidebar(target.target_id, result)
self.call_from_thread(self._select_target, target.target_id) self._select_target(target.target_id)
return return
ensure_debsecan_installed(target.is_host, target.target_id) def dummy_progress():
apt_ok, apt_packages, apt_err = scan_apt(target) pass
cve_ok, cve_list, cve_err = scan_cve(target)
error = apt_err or cve_err
result = ScanResult( result = await asyncio.to_thread(scan_target, target, dummy_progress)
target=target,
apt_ok=apt_ok,
cve_ok=cve_ok,
apt_count=len(apt_packages),
cve_count=len(cve_list),
apt_packages=apt_packages,
cve_list=cve_list,
error=error,
status="done" if (apt_ok and cve_ok) else "error"
)
self.results[target.target_id] = result self.results[target.target_id] = result
cache_id = "host" if target.is_host else target.target_id cache_id = "host" if target.is_host else target.target_id
write_cache(cache_id, { write_cache(cache_id, {
@ -236,11 +179,13 @@ class FullUpdaterApp(App):
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })
self.call_from_thread(self._update_sidebar, target.target_id, result) self._update_sidebar(target.target_id, result)
self.call_from_thread(self._select_target, target.target_id) self._select_target(target.target_id)
def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed): def on_summary_panel_upgrade_pressed(self, event: SummaryPanel.UpgradePressed):
self.push_screen( self.push_screen(
@ -268,7 +213,7 @@ class FullUpdaterApp(App):
return return
executor = UpgradeExecutor( executor = UpgradeExecutor(
target_id, is_host, target_id, is_host,
on_line=lambda line: self.call_from_thread(log_panel.write, line) on_line=lambda line: log_panel.write(line)
) )
ok = await executor.run() ok = await executor.run()
if ok: if ok:
@ -289,10 +234,7 @@ class FullUpdaterApp(App):
pkgs = data.get("apt_packages", []) pkgs = data.get("apt_packages", [])
if not pkgs: if not pkgs:
return return
table = PackageTable() self.push_screen(PackageListScreen(pkgs))
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(pkgs)
def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked): def on_summary_panel_cve_clicked(self, event: SummaryPanel.CveClicked):
cache_id = "host" if event.target_id == "host" else event.target_id cache_id = "host" if event.target_id == "host" else event.target_id
@ -300,17 +242,4 @@ class FullUpdaterApp(App):
cves = data.get("cve_list", []) cves = data.get("cve_list", [])
if not cves: if not cves:
return return
table = CVETable() self.push_screen(CVEListScreen(cves))
self.mount(table)
self.query_one("#main-layout").display = False
table.load_data(cves)
def on_package_table_back_pressed(self, event: PackageTable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(PackageTable):
widget.remove()
def on_cve_table_back_pressed(self, event: CVETable.BackPressed):
self.query_one("#main-layout").display = True
for widget in self.query(CVETable):
widget.remove()

View file

@ -7,9 +7,12 @@ from typing import Any
CACHE_DIR = "/tmp/full-updater-cache" CACHE_DIR = "/tmp/full-updater-cache"
def ensure_cache_dir() -> None: def clear_cache() -> None:
if os.path.exists(CACHE_DIR): if os.path.exists(CACHE_DIR):
shutil.rmtree(CACHE_DIR) shutil.rmtree(CACHE_DIR)
def ensure_cache_dir() -> None:
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)

View file

@ -1,18 +1,121 @@
import asyncio
import json import json
import os
import re import re
import shutil
import subprocess import subprocess
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from datetime import datetime
from typing import Callable
from full_updater.backend.cache import write_cache from full_updater.backend.cache import write_cache, ensure_cache_dir
CVE_API_CACHE = "/tmp/full-updater-cache/cve-api"
def _ensure_cve_api_cache() -> None:
os.makedirs(CVE_API_CACHE, exist_ok=True)
def _fetch_cve_html(cve_id: str) -> str:
"""Récupère le HTML de la page Debian Security Tracker pour une CVE."""
_ensure_cve_api_cache()
cache_path = os.path.join(CVE_API_CACHE, f"{cve_id}.html")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
pass
url = f"https://security-tracker.debian.org/tracker/{cve_id}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "full-updater/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8")
with open(cache_path, "w", encoding="utf-8") as f:
f.write(html)
return html
except Exception:
return ""
def _enrich_cve(cve_id: str) -> dict[str, str]:
"""Récupère la sévérité et le vecteur d'attaque depuis le HTML Debian Security Tracker."""
html = _fetch_cve_html(cve_id)
if not html:
return {"severity": "?", "vector": "?"}
result = {"severity": "?", "vector": "?"}
# Extraction de la sévérité depuis le bloc "Vulnerable and fixed packages"
# Recherche du motif: <td>remote</td> ou <td>local</td>
vector_match = re.search(r'<td[^>]*>\s*(remote|local)\s*</td>', html, re.IGNORECASE)
if vector_match:
result["vector"] = vector_match.group(1).lower()
# Extraction de la sévérité depuis les notes ou le tableau
# Format courant: [low], [medium], [high], [critical] dans les notes
severity_match = re.search(r'\b(unimportant|low|medium|high|critical)\b', html, re.IGNORECASE)
if severity_match:
result["severity"] = severity_match.group(1).lower()
return result
def _is_cve_actionable(cve_id: str, suite: str = "bookworm") -> bool:
"""Retourne True si la CVE est marquée 'fixed' pour le suite donné dans le HTML."""
html = _fetch_cve_html(cve_id)
if not html:
return False
# Chercher dans le tableau des packages vulnérables/fixed
# Pattern: suite (security)? version fixed
# Ex: bookworm 3.0.18-1~deb12u1 fixed
pattern = re.compile(rf"<td[^>]*>\s*{re.escape(suite)}\s*(?:\(security\))?\s*</td>\s*<td[^>]*>[^<]+</td>\s*<td[^>]*>\s*fixed\s*</td>", re.IGNORECASE)
return bool(pattern.search(html))
def enrich_cves(cves: list[dict]) -> tuple[list[dict], int]:
"""Enrichit les CVEs avec fixable, severity, vector via l'API Debian.
Retourne (cve_enrichies, nombre_actionnables)."""
if not cves:
return [], 0
def check(cve: dict) -> dict:
try:
# Vérifie si corrigeable
cve["fixable"] = _is_cve_actionable(cve["id"])
# Enrichit avec severity et vector
enriched = _enrich_cve(cve["id"])
cve["severity"] = enriched["severity"]
cve["vector"] = enriched["vector"]
return cve
except Exception:
cve["fixable"] = False
return cve
all_cves = []
actionable_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(check, cve) for cve in cves]
for future in futures:
try:
cve = future.result()
all_cves.append(cve)
if cve.get("fixable"):
actionable_count += 1
except Exception:
pass
return all_cves, actionable_count
@dataclass @dataclass
class Target: class Target:
target_id: str # 'host' ou '100' target_id: str
name: str # 'hote' ou '100 traefik' name: str
is_host: bool is_host: bool
@ -23,17 +126,17 @@ class ScanResult:
cve_ok: bool = False cve_ok: bool = False
apt_count: int = 0 apt_count: int = 0
cve_count: int = 0 cve_count: int = 0
cve_total: int = 0
os_info: str = ""
apt_packages: list[dict[str, str]] = field(default_factory=list) apt_packages: list[dict[str, str]] = field(default_factory=list)
cve_list: list[dict[str, str]] = field(default_factory=list) cve_list: list[dict[str, str]] = field(default_factory=list)
error: str = "" error: str = ""
status: str = "pending" # pending | running | done | error | skipped status: str = "pending"
def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]: def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
try: try:
proc = subprocess.run( proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
return proc.returncode == 0, proc.stdout, proc.stderr return proc.returncode == 0, proc.stdout, proc.stderr
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return False, "", "Command timed out" return False, "", "Command timed out"
@ -42,7 +145,7 @@ def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[bool, str, str]:
def get_lxc_list() -> list[Target]: def get_lxc_list() -> list[Target]:
ok, stdout, stderr = run_cmd(["pct", "list"]) ok, stdout, _ = run_cmd(["pct", "list"])
if not ok: if not ok:
return [] return []
targets = [] targets = []
@ -57,45 +160,67 @@ def get_lxc_list() -> list[Target]:
def lxc_is_running(vmid: str) -> bool: def lxc_is_running(vmid: str) -> bool:
ok, stdout, _ = run_cmd(["pct", "status", vmid]) ok, stdout, _ = run_cmd(["pct", "status", vmid])
if ok and "running" in stdout.lower(): return ok and "running" in stdout.lower()
return True
return False
def scan_os(target: Target) -> str:
"""Récupère le nom, la version complète et le codename de l'OS."""
prefix = [] if target.is_host else ["pct", "exec", target.target_id, "--"]
# 1. Lire /etc/os-release et parser ligne par ligne
ok, osrel_out, _ = run_cmd(prefix + ["cat", "/etc/os-release"], timeout=30)
os_data = {}
if ok:
for line in osrel_out.splitlines():
line = line.strip()
if "=" in line:
key, val = line.split("=", 1)
val = val.strip().strip('"').strip("'")
os_data[key] = val
# 2. Extraire les champs
name = os_data.get("NAME", "")
codename = os_data.get("VERSION_CODENAME", "")
debian_version_full = os_data.get("DEBIAN_VERSION_FULL", "")
version_id = os_data.get("VERSION_ID", "")
# 3. Construire le libellé final
# Avec DEBIAN_VERSION_FULL : NAME DEBIAN_VERSION_FULL (VERSION_CODENAME)
# Sans : NAME VERSION_ID (VERSION_CODENAME)
version = debian_version_full if debian_version_full else version_id
if name and version and codename:
return f"{name} {version} ({codename})"
if name and version:
return f"{name} {version}"
# Fallback : PRETTY_NAME ou "OS inconnu"
return os_data.get("PRETTY_NAME", "OS inconnu")
def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]: def ensure_debsecan_installed(is_host: bool, vmid: str = "") -> tuple[bool, str]:
"""Retourne (ok, error_msg)"""
if is_host: if is_host:
# Vérifier si debsecan est installé sur l'hôte
ok, _, _ = run_cmd(["which", "debsecan"]) ok, _, _ = run_cmd(["which", "debsecan"])
if ok: if ok:
return True, "" return True, ""
ok, out, err = run_cmd(["apt-get", "update"], timeout=60) ok, _, err = run_cmd(["apt-get", "update"], timeout=60)
if not ok: if not ok:
return False, f"apt-get update failed: {err}" return False, f"apt-get update failed: {err}"
ok, out, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120) ok, _, err = run_cmd(["apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok: return (True, "") if ok else (False, f"apt-get install debsecan failed: {err}")
return False, f"apt-get install debsecan failed: {err}"
return True, ""
else: else:
# Vérifier dans le LXC
ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"]) ok, _, _ = run_cmd(["pct", "exec", vmid, "--", "which", "debsecan"])
if ok: if ok:
return True, "" return True, ""
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60) ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "update"], timeout=60)
if not ok: if not ok:
return False, f"apt-get update failed in {vmid}: {err}" return False, f"apt-get update failed in {vmid}: {err}"
ok, out, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120) ok, _, err = run_cmd(["pct", "exec", vmid, "--", "apt-get", "install", "-y", "debsecan"], timeout=120)
if not ok: return (True, "") if ok else (False, f"apt-get install debsecan failed in {vmid}: {err}")
return False, f"apt-get install debsecan failed in {vmid}: {err}"
return True, ""
def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]: def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if target.is_host: cmd = ["apt", "list", "--upgradable"] if target.is_host else ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
cmd = ["apt", "list", "--upgradable"]
else:
cmd = ["pct", "exec", target.target_id, "--", "apt", "list", "--upgradable"]
ok, stdout, stderr = run_cmd(cmd, timeout=120) ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok: if not ok:
return False, [], stderr or stdout return False, [], stderr or stdout
@ -108,121 +233,96 @@ def scan_apt(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if len(parts) < 2: if len(parts) < 2:
continue continue
name = parts[0].strip() name = parts[0].strip()
rest = parts[1]
# Format: apt list --upgradable -> name/arch version suite [upgradable from: oldver]
m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line) m = re.search(r"\[upgradable from:\s+([^\]]+)\]", line)
if m: if m:
old_ver = m.group(1).strip() old_ver = m.group(1).strip()
# Essayer d'extraire la nouvelle version (avant le [upgradable from:])
prefix = line.split("[upgradable from:")[0].strip() prefix = line.split("[upgradable from:")[0].strip()
ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix) ver_m = re.search(r"/\S+\s+(\S+)\s+\S+", prefix)
new_ver = ver_m.group(1) if ver_m else "?" new_ver = ver_m.group(1) if ver_m else "?"
packages.append({ packages.append({"name": name, "current": old_ver, "new": new_ver, "size": "-"})
"name": name,
"current": old_ver,
"new": new_ver,
"size": "-"
})
else: else:
# fallback simple packages.append({"name": name, "current": "?", "new": "?", "size": "-"})
packages.append({
"name": name,
"current": "?",
"new": "?",
"size": "-"
})
return True, packages, "" return True, packages, ""
def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]: def scan_cve(target: Target) -> tuple[bool, list[dict[str, str]], str]:
if target.is_host: # Format par défaut (pas de --format) retourne: CVE-ID package
cmd = ["debsecan", "--format", "report", "--suite", "bookworm"] cmd = ["debsecan", "--suite", "bookworm"] if target.is_host else ["pct", "exec", target.target_id, "--", "debsecan", "--suite", "bookworm"]
else:
cmd = ["pct", "exec", target.target_id, "--", "debsecan", "--format", "report", "--suite", "bookworm"]
ok, stdout, stderr = run_cmd(cmd, timeout=120) ok, stdout, stderr = run_cmd(cmd, timeout=120)
if not ok: if not ok:
return False, [], stderr or stdout return False, [], stderr or stdout
cves = [] cves = []
for line in stdout.splitlines(): for line in stdout.splitlines():
# Format debsecan: CVE-XXXX-XXXX package [remote] [low] - description
m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line) m = re.match(r"(CVE-\d{4}-\d+)\s+(\S+)", line)
if m: if m:
cve_id = m.group(1)
pkg = m.group(2)
cves.append({ cves.append({
"id": cve_id, "id": m.group(1),
"package": pkg, "package": m.group(2),
"url": f"https://security-tracker.debian.org/tracker/{cve_id}" "vector": "?",
"severity": "?",
"url": f"https://security-tracker.debian.org/tracker/{m.group(1)}"
}) })
return True, cves, "" return True, cves, ""
async def scan_target(target: Target, result: ScanResult, progress_cb: Any) -> None: def scan_target(target: Target, progress_cb: Callable) -> ScanResult:
"""Scanne une cible (APT puis CVE) et écrit le cache.""" result = ScanResult(target=target)
result.status = "running" result.status = "running"
await progress_cb() progress_cb()
# Vérifier si LXC est allumé
if not target.is_host and not lxc_is_running(target.target_id): if not target.is_host and not lxc_is_running(target.target_id):
result.status = "skipped" result.status = "skipped"
result.apt_ok = False progress_cb(); progress_cb(); progress_cb(); progress_cb()
result.cve_ok = False return result
write_cache(target.target_id, {
"timestamp": "",
"apt_count": 0,
"apt_packages": [],
"cve_count": 0,
"cve_list": [],
"error": "LXC éteint"
})
await progress_cb()
return
# Auto-install debsecan
debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id) debsecan_ok, debsecan_err = ensure_debsecan_installed(target.is_host, target.target_id)
progress_cb()
# Scan APT
apt_ok, apt_packages, apt_err = scan_apt(target) apt_ok, apt_packages, apt_err = scan_apt(target)
result.apt_ok = apt_ok result.apt_ok = apt_ok
result.apt_count = len(apt_packages) result.apt_count = len(apt_packages)
result.apt_packages = apt_packages result.apt_packages = apt_packages
await progress_cb() progress_cb()
# Scan CVE
if debsecan_ok: if debsecan_ok:
cve_ok, cve_list, cve_err = scan_cve(target) cve_ok, cve_list, cve_err = scan_cve(target)
result.cve_ok = cve_ok result.cve_ok = cve_ok
result.cve_count = len(cve_list)
result.cve_list = cve_list
if not cve_ok: if not cve_ok:
result.error = cve_err result.error = cve_err
# Enrichir les CVE via l'API Debian (severity, vector, fixable)
if cve_list:
all_cves, actionable_count = enrich_cves(cve_list)
result.cve_list = all_cves
result.cve_count = actionable_count
result.cve_total = len(all_cves)
else:
result.cve_list = []
result.cve_count = 0
result.cve_total = 0
else: else:
result.cve_ok = False result.cve_ok = False
result.cve_count = 0
result.cve_list = []
result.error = f"debsecan: {debsecan_err}" result.error = f"debsecan: {debsecan_err}"
await progress_cb() progress_cb()
# Scan OS
result.os_info = scan_os(target)
progress_cb()
result.status = "done" if (result.apt_ok and result.cve_ok) else "error" result.status = "done" if (result.apt_ok and result.cve_ok) else "error"
if result.error: if result.error:
result.status = "error" result.status = "error"
write_cache(target.target_id if not target.is_host else "host", { write_cache("host" if target.is_host else target.target_id, {
"timestamp": "", # sera rempli par l'appelant "timestamp": datetime.now().isoformat(),
"apt_count": result.apt_count, "apt_count": result.apt_count,
"apt_packages": result.apt_packages, "apt_packages": result.apt_packages,
"cve_count": result.cve_count, "cve_count": result.cve_count,
"cve_total": result.cve_total,
"os_info": result.os_info,
"cve_list": result.cve_list, "cve_list": result.cve_list,
"error": result.error "error": result.error
}) })
await progress_cb() progress_cb()
return result
async def run_full_scan(progress_cb: Any) -> list[ScanResult]:
targets = [Target(target_id="host", name="hote", is_host=True)] + get_lxc_list()
results = [ScanResult(target=t) for t in targets]
tasks = [scan_target(r.target, r, progress_cb) for r in results]
await asyncio.gather(*tasks)
return results

View file

@ -1,72 +0,0 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
class CVETable(Vertical):
DEFAULT_CSS = """
CVETable {
padding: 1 2;
}
#cve-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = None
self.urls = {}
def compose(self):
with Horizontal(id="cve-toolbar"):
yield Button("⬅ Retour", id="cve-back")
self.table = DataTable(id="cve-table")
yield self.table
def on_mount(self):
self.table.add_columns("CVE-ID", "Paquet", "Lien")
self.table.cursor_type = "row"
def load_data(self, cves: list[dict]):
self.table.clear()
self.urls = {}
for i, cve in enumerate(cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
self.urls[i] = url
self.table.add_row(cve_id, pkg, url)
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.table.rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
# Fallback : afficher le lien dans une notification
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.post_message(self.BackPressed())

View file

@ -0,0 +1,126 @@
from textual.screen import Screen
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
try:
import pyperclip
PYPERCLIP_OK = True
except Exception:
PYPERCLIP_OK = False
SCREEN_CSS = """
align: left top;
padding: 1 2;
#toolbar {
height: auto;
dock: top;
margin-bottom: 1;
}
DataTable {
height: 1fr;
border: solid $primary;
}
"""
class PackageListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
DEFAULT_CSS = """
PackageListScreen {
align: left top;
padding: 1 2;
}
PackageListScreen #toolbar {
height: auto;
margin-bottom: 1;
}
PackageListScreen DataTable {
height: 1fr;
border: solid $primary;
}
"""
def __init__(self, packages: list[dict]):
super().__init__()
self.packages = packages
def compose(self):
with Vertical():
with Horizontal(id="toolbar"):
yield Button("⬅ Retour", id="pkg-back", variant="default")
table = DataTable(id="pkg-table")
table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
for pkg in self.packages:
table.add_row(pkg.get("name", "?"), pkg.get("current", "?"), pkg.get("new", "?"), pkg.get("size", "-"))
yield table
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()
class CVEListScreen(Screen):
BINDINGS = [("b", "back", "Retour")]
DEFAULT_CSS = """
CVEListScreen {
align: left top;
padding: 1 2;
}
CVEListScreen #toolbar {
height: auto;
margin-bottom: 1;
}
CVEListScreen DataTable {
height: 1fr;
border: solid $primary;
}
"""
def __init__(self, cves: list[dict]):
super().__init__()
self.cves = cves
self.urls = {}
def compose(self):
with Vertical():
with Horizontal(id="toolbar"):
yield Button("⬅ Retour", id="cve-back", variant="default")
table = DataTable(id="cve-table")
table.add_columns("CVE-ID", "Paquet", "Severite", "Vecteur", "Corrigeable", "Lien")
table.cursor_type = "row"
for i, cve in enumerate(self.cves):
cve_id = cve.get("id", "?")
pkg = cve.get("package", "?")
url = cve.get("url", "")
severity = cve.get("severity", "?")
vector = cve.get("vector", "?")
fixable = "🟢 Oui" if cve.get("fixable") else "🔴 Non"
self.urls[i] = url
table.add_row(cve_id, pkg, severity, vector, fixable, url)
yield table
def on_data_table_row_selected(self, event: DataTable.RowSelected):
row_key = event.row_key
row_index = list(self.query_one("#cve-table").rows.keys()).index(row_key)
url = self.urls.get(row_index, "")
if not url:
return
if PYPERCLIP_OK:
try:
pyperclip.copy(url)
self.notify(f"Lien copié : {url}", severity="information")
return
except Exception:
pass
self.notify(f"Lien (copie manuelle) : {url}", severity="warning")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "cve-back":
self.app.pop_screen()
def action_back(self):
self.app.pop_screen()

View file

@ -16,7 +16,7 @@ class LoaderScreen(Screen):
self.status_lines = [self._fmt_line(t) for t in targets] self.status_lines = [self._fmt_line(t) for t in targets]
def _fmt_line(self, target) -> str: def _fmt_line(self, target) -> str:
return f"{target.name}: 🟠 🟠" return f"{target.name:20} ⚫ ⚫"
def compose(self): def compose(self):
with Container(id="loader-container"): with Container(id="loader-container"):
@ -29,10 +29,8 @@ class LoaderScreen(Screen):
def watch_progress(self, value: float): def watch_progress(self, value: float):
try: try:
bar = self.query_one("#loader-bar", ProgressBar) self.query_one("#loader-bar", ProgressBar).progress = value
bar.progress = value self.query_one("#loader-percent", Static).update(f"{value:.0f} %")
pct = self.query_one("#loader-percent", Static)
pct.update(f"{value:.0f} %")
except Exception: except Exception:
pass pass
@ -51,11 +49,11 @@ class LoaderScreen(Screen):
lines = list(self.status_lines) lines = list(self.status_lines)
t = self._targets[idx] t = self._targets[idx]
if status == "done": if status == "done":
lines[idx] = f"{t.name}: 🟢 🟢" lines[idx] = f"{t.name:20} 🟢 🟢"
elif status == "error": elif status == "error":
lines[idx] = f"{t.name}: 🔴 🔴" lines[idx] = f"{t.name:20} 🔴 🔴"
elif status == "skipped": elif status == "skipped":
lines[idx] = f"{t.name}: ⚪ ⚪" lines[idx] = f"{t.name:20} ⚪ ⚪"
else: else:
lines[idx] = f"{t.name}: 🟠 🟠" lines[idx] = f"{t.name:20} 🟠 🟠"
self.status_lines = lines self.status_lines = lines

View file

@ -23,21 +23,21 @@ class LogPanel(Vertical):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.log = None self._log_widget = None
def compose(self): def compose(self):
with Horizontal(id="log-toolbar"): with Horizontal(id="log-toolbar"):
yield Button("⬅ Retour", id="log-back") yield Button("⬅ Retour", id="log-back")
self.log = RichLog(id="log-view", highlight=True) self._log_widget = RichLog(id="log-view", highlight=True)
yield self.log yield self._log_widget
def write(self, line: str): def write(self, line: str):
if self.log: if self._log_widget:
self.log.write(line) self._log_widget.write(line)
def clear(self): def clear(self):
if self.log: if self._log_widget:
self.log.clear() self._log_widget.clear()
def on_button_pressed(self, event: Button.Pressed): def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "log-back": if event.button.id == "log-back":

View file

@ -1,48 +0,0 @@
from textual.widgets import DataTable, Button
from textual.containers import Vertical, Horizontal
from textual.message import Message
class PackageTable(Vertical):
DEFAULT_CSS = """
PackageTable {
padding: 1 2;
}
#pkg-toolbar {
height: auto;
margin-bottom: 1;
}
DataTable {
height: 1fr;
}
"""
class BackPressed(Message):
pass
def __init__(self):
super().__init__()
self.table = None
def compose(self):
with Horizontal(id="pkg-toolbar"):
yield Button("⬅ Retour", id="pkg-back")
self.table = DataTable(id="pkg-table")
yield self.table
def on_mount(self):
self.table.add_columns("Nom", "Version actuelle", "Nouvelle version", "Taille")
def load_data(self, packages: list[dict]):
self.table.clear()
for pkg in packages:
self.table.add_row(
pkg.get("name", "?"),
pkg.get("current", "?"),
pkg.get("new", "?"),
pkg.get("size", "-")
)
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "pkg-back":
self.post_message(self.BackPressed())

View file

@ -6,7 +6,7 @@ from textual.message import Message
class Sidebar(Vertical): class Sidebar(Vertical):
DEFAULT_CSS = """ DEFAULT_CSS = """
Sidebar { Sidebar {
width: 28; width: 40;
border-right: solid $primary-lighten-2; border-right: solid $primary-lighten-2;
padding: 0 1; padding: 0 1;
} }
@ -42,7 +42,8 @@ class Sidebar(Vertical):
lv.clear() lv.clear()
self._items = [] self._items = []
for t in targets: for t in targets:
label = f"{t.name} 🟠 🟠" # Format: "100 traefik APT ⚫ CVE ⚫"
label = f"{t.name:20} APT ⚫ CVE ⚫"
item = ListItem(Label(label), id=f"target-{t.target_id}") item = ListItem(Label(label), id=f"target-{t.target_id}")
lv.append(item) lv.append(item)
self._items.append((t.target_id, t.name, item)) self._items.append((t.target_id, t.name, item))
@ -51,13 +52,24 @@ class Sidebar(Vertical):
for tid, name, item in self._items: for tid, name, item in self._items:
if tid == target_id: if tid == target_id:
if skipped: if skipped:
label = f"{name} ⚪ ⚪" apt_color = ""
cve_color = ""
elif error: elif error:
label = f"{name} 🔴 🔴" apt_color = "🔴"
elif apt_count > 0 or cve_count > 0: cve_color = "🔴"
label = f"{name} {'🔴' if apt_count > 0 else '🟢'} {'🔴' if cve_count > 0 else '🟢'}" elif apt_count > 0:
apt_color = "🔴"
else: else:
label = f"{name} 🟢 🟢" apt_color = "🟢"
if error and cve_count == 0:
cve_color = "🔴"
elif cve_count > 0:
cve_color = "🔴"
else:
cve_color = "🟢"
label = f"{name:20} APT {apt_color} CVE {cve_color}"
item.children[0].update(label) item.children[0].update(label)
break break

View file

@ -1,6 +1,5 @@
from textual.widgets import Static, Button from textual.widgets import Static, Button
from textual.containers import Vertical, Horizontal from textual.containers import Vertical, Horizontal
from textual.reactive import reactive
from textual.message import Message from textual.message import Message
@ -26,10 +25,6 @@ class SummaryPanel(Vertical):
height: auto; height: auto;
margin: 1 0; margin: 1 0;
} }
.summary-count {
color: $primary;
text-style: bold;
}
.summary-error { .summary-error {
color: $error; color: $error;
text-style: bold; text-style: bold;
@ -55,13 +50,8 @@ class SummaryPanel(Vertical):
self.target_id = target_id self.target_id = target_id
super().__init__() super().__init__()
cache_time = reactive("") _current_target_id: str = ""
apt_count = reactive(0) _current_target_name: str = ""
cve_count = reactive(0)
target_id = reactive("")
target_name = reactive("")
error_msg = reactive("")
is_skipped = reactive(False)
def compose(self): def compose(self):
with Horizontal(id="summary-header"): with Horizontal(id="summary-header"):
@ -70,36 +60,29 @@ class SummaryPanel(Vertical):
with Vertical(id="summary-body"): with Vertical(id="summary-body"):
yield Static("Sélectionnez une cible", id="summary-title") yield Static("Sélectionnez une cible", id="summary-title")
yield Static("", id="summary-os", classes="summary-row")
yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default") yield Button("Mises à jour : -", id="btn-apt", classes="summary-row", variant="default")
yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default") yield Button("CVE : -", id="btn-cve", classes="summary-row", variant="default")
yield Static("", id="summary-error", classes="summary-row summary-error") yield Static("", id="summary-error", classes="summary-row summary-error")
yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary") yield Button("📦 Mettre à jour", id="btn-upgrade", variant="primary")
def watch_cache_time(self, value: str): def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, cve_total: int, os_info: str, error: str, skipped: bool, cache_time: str):
self.query_one("#summary-cache", Static).update(f"Cache : {value}" if value else "") self._current_target_id = target_id
self._current_target_name = name
def watch_apt_count(self, value: int): title = self.query_one("#summary-title", Static)
self._update_display() os_label = self.query_one("#summary-os", Static)
def watch_cve_count(self, value: int):
self._update_display()
def watch_error_msg(self, value: str):
self._update_display()
def watch_is_skipped(self, value: bool):
self._update_display()
def watch_target_name(self, value: str):
self.query_one("#summary-title", Static).update(value if value else "Sélectionnez une cible")
def _update_display(self):
apt_btn = self.query_one("#btn-apt", Button) apt_btn = self.query_one("#btn-apt", Button)
cve_btn = self.query_one("#btn-cve", Button) cve_btn = self.query_one("#btn-cve", Button)
err_label = self.query_one("#summary-error", Static) err_label = self.query_one("#summary-error", Static)
btn = self.query_one("#btn-upgrade", Button) btn = self.query_one("#btn-upgrade", Button)
cache_label = self.query_one("#summary-cache", Static)
if self.is_skipped: title.update(name if name else "Sélectionnez une cible")
os_label.update(os_info if os_info else "")
cache_label.update(f"Cache : {cache_time}" if cache_time else "")
if skipped:
apt_btn.label = "Mises à jour : LXC éteint" apt_btn.label = "Mises à jour : LXC éteint"
cve_btn.label = "CVE : LXC éteint" cve_btn.label = "CVE : LXC éteint"
err_label.update("") err_label.update("")
@ -108,37 +91,38 @@ class SummaryPanel(Vertical):
cve_btn.disabled = True cve_btn.disabled = True
return return
if self.error_msg: if error:
apt_btn.label = f"Mises à jour : {self.apt_count}" apt_btn.label = f"Mises à jour : {apt_count}"
cve_btn.label = "CVE : ERREUR" cve_btn.label = "CVE : ERREUR"
err_label.update(self.error_msg) err_label.update(error)
btn.disabled = False btn.disabled = False
apt_btn.disabled = self.apt_count == 0 apt_btn.disabled = apt_count == 0
cve_btn.disabled = True cve_btn.disabled = True
return return
apt_btn.label = f"Mises à jour : {self.apt_count}" apt_btn.label = f"Mises à jour : {apt_count}"
cve_btn.label = f"CVE : {self.cve_count}" if cve_total > cve_count:
cve_btn.label = f"CVE : {cve_count} ({cve_total - cve_count} non corrigeables)"
else:
cve_btn.label = f"CVE : {cve_count}"
err_label.update("") err_label.update("")
btn.disabled = False btn.disabled = False
apt_btn.disabled = self.apt_count == 0 apt_btn.disabled = apt_count == 0
cve_btn.disabled = self.cve_count == 0 cve_btn.disabled = cve_total == 0
def set_target(self, target_id: str, name: str, apt_count: int, cve_count: int, error: str, skipped: bool, cache_time: str): # Forcer le refresh de tous les widgets modifiés
self.target_id = target_id apt_btn.refresh()
self.target_name = name cve_btn.refresh()
self.apt_count = apt_count err_label.refresh()
self.cve_count = cve_count btn.refresh()
self.error_msg = error cache_label.refresh()
self.is_skipped = skipped
self.cache_time = cache_time
def on_button_pressed(self, event: Button.Pressed): def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn-reload": if event.button.id == "btn-reload":
self.post_message(self.ReloadPressed()) self.post_message(self.ReloadPressed())
elif event.button.id == "btn-upgrade": elif event.button.id == "btn-upgrade":
self.post_message(self.UpgradePressed(self.target_id, self.target_name)) self.post_message(self.UpgradePressed(self._current_target_id, self._current_target_name))
elif event.button.id == "btn-apt": elif event.button.id == "btn-apt":
self.post_message(self.AptClicked(self.target_id)) self.post_message(self.AptClicked(self._current_target_id))
elif event.button.id == "btn-cve": elif event.button.id == "btn-cve":
self.post_message(self.CveClicked(self.target_id)) self.post_message(self.CveClicked(self._current_target_id))

View file

@ -11,4 +11,5 @@ if [ ! -d "$VENV_DIR" ]; then
fi fi
source "$VENV_DIR/bin/activate" source "$VENV_DIR/bin/activate"
export PYTHONPATH="/opt/full-updater:$PYTHONPATH"
python3 -m full_updater "$@" python3 -m full_updater "$@"

View file

@ -5,7 +5,6 @@ REPO_URL="https://git.geronzi.fr"
OWNER="geronzi" OWNER="geronzi"
REPO="full_updater" REPO="full_updater"
TOKEN_FILE="/etc/full-updater/token" TOKEN_FILE="/etc/full-updater/token"
CONFIG_FILE="/etc/full-updater/config.ini"
INSTALLED_VERSION=$(dpkg -s full-updater 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "0.0.0") INSTALLED_VERSION=$(dpkg -s full-updater 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "0.0.0")
echo "=== Full Updater Update ===" echo "=== Full Updater Update ==="