full_updater/full_updater/backend/executor.py
2026-05-12 22:36:36 +02:00

38 lines
1.1 KiB
Python

import asyncio
import subprocess
from typing import Callable
class UpgradeExecutor:
def __init__(self, target_id: str, is_host: bool, on_line: Callable[[str], None]):
self.target_id = target_id
self.is_host = is_host
self.on_line = on_line
self.process = None
async def run(self) -> bool:
if self.is_host:
cmd = ["apt-get", "full-upgrade", "-y"]
else:
cmd = ["pct", "exec", self.target_id, "--", "apt-get", "full-upgrade", "-y"]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async def read_stream(stream):
while True:
line = await stream.readline()
if not line:
break
self.on_line(line.decode("utf-8", errors="replace").rstrip())
await asyncio.gather(
read_stream(self.process.stdout),
read_stream(self.process.stderr)
)
returncode = await self.process.wait()
return returncode == 0