264 lines
8.4 KiB
Python
264 lines
8.4 KiB
Python
|
#!/usr/bin/env nix-shell
|
||
|
#!nix-shell -I nixpkgs=channel:nixpkgs-unstable -i python3 -p "python3.withPackages (ps: with ps; [ aiohttp packaging ])" -p git nurl pyright ruff isort
|
||
|
|
||
|
import asyncio
|
||
|
import json
|
||
|
import os
|
||
|
import re
|
||
|
import sys
|
||
|
from subprocess import check_output, run
|
||
|
from typing import Dict, Final, List, Optional, Union
|
||
|
|
||
|
import aiohttp
|
||
|
from aiohttp import ClientSession
|
||
|
from packaging.version import Version
|
||
|
|
||
|
ROOT: Final = check_output([
|
||
|
"git",
|
||
|
"rev-parse",
|
||
|
"--show-toplevel",
|
||
|
]).decode().strip()
|
||
|
|
||
|
|
||
|
def run_sync(cmd: List[str]) -> None:
|
||
|
print(f"$ {' '.join(cmd)}")
|
||
|
process = run(cmd)
|
||
|
|
||
|
if process.returncode != 0:
|
||
|
sys.exit(1)
|
||
|
|
||
|
|
||
|
async def check_async(cmd: List[str]) -> str:
|
||
|
print(f"$ {' '.join(cmd)}")
|
||
|
process = await asyncio.create_subprocess_exec(
|
||
|
*cmd,
|
||
|
stdout=asyncio.subprocess.PIPE,
|
||
|
stderr=asyncio.subprocess.PIPE
|
||
|
)
|
||
|
stdout, stderr = await process.communicate()
|
||
|
|
||
|
if process.returncode != 0:
|
||
|
error = stderr.decode()
|
||
|
raise RuntimeError(f"{cmd[0]} failed: {error}")
|
||
|
|
||
|
return stdout.decode().strip()
|
||
|
|
||
|
|
||
|
async def run_async(cmd: List[str]):
|
||
|
print(f"$ {' '.join(cmd)}")
|
||
|
|
||
|
process = await asyncio.create_subprocess_exec(
|
||
|
*cmd,
|
||
|
stdout=asyncio.subprocess.PIPE,
|
||
|
stderr=asyncio.subprocess.PIPE,
|
||
|
)
|
||
|
stdout, stderr = await process.communicate()
|
||
|
|
||
|
print(stdout.decode())
|
||
|
|
||
|
if process.returncode != 0:
|
||
|
error = stderr.decode()
|
||
|
raise RuntimeError(f"{cmd[0]} failed: {error}")
|
||
|
|
||
|
|
||
|
class File:
|
||
|
def __init__(self, path: str):
|
||
|
self.path = os.path.join(ROOT, path)
|
||
|
|
||
|
def __enter__(self):
|
||
|
with open(self.path, "r") as handle:
|
||
|
self.text = handle.read()
|
||
|
return self
|
||
|
|
||
|
def get_exact_match(self, attr: str, value: str):
|
||
|
matches = re.findall(
|
||
|
rf'{re.escape(attr)}\s+=\s+\"?{re.escape(value)}\"?',
|
||
|
self.text
|
||
|
)
|
||
|
|
||
|
n = len(matches)
|
||
|
if n > 1:
|
||
|
raise ValueError(f"multiple occurrences found for {attr}={value}")
|
||
|
elif n == 1:
|
||
|
return matches.pop()
|
||
|
else:
|
||
|
raise ValueError(f"no occurrence found for {attr}={value}")
|
||
|
|
||
|
def substitute(self, attr: str, old_value: str, new_value: str) -> None:
|
||
|
old_line = self.get_exact_match(attr, old_value)
|
||
|
new_line = old_line.replace(old_value, new_value)
|
||
|
self.text = self.text.replace(old_line, new_line)
|
||
|
print(f"Substitute `{attr}` value `{old_value}` with `{new_value}`")
|
||
|
|
||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
|
with open(self.path, "w") as handle:
|
||
|
handle.write(self.text)
|
||
|
|
||
|
class Nurl:
|
||
|
@classmethod
|
||
|
async def prefetch(cls, url: str, version: str, *extra_args: str) -> str:
|
||
|
cmd = [
|
||
|
"nurl",
|
||
|
"--hash",
|
||
|
url,
|
||
|
version,
|
||
|
]
|
||
|
cmd.extend(extra_args)
|
||
|
return await check_async(cmd)
|
||
|
|
||
|
|
||
|
class Nix:
|
||
|
base_cmd: Final = [
|
||
|
"nix",
|
||
|
"--show-trace",
|
||
|
"--extra-experimental-features", "nix-command"
|
||
|
]
|
||
|
|
||
|
@classmethod
|
||
|
async def _run(cls, args: List[str]) -> Optional[str]:
|
||
|
return await check_async(cls.base_cmd + args)
|
||
|
|
||
|
@classmethod
|
||
|
async def eval(cls, expr: str) -> Union[List, Dict, int, float, str, bool]:
|
||
|
response = await cls._run([
|
||
|
"eval",
|
||
|
"-f", f"{ROOT}/default.nix",
|
||
|
"--json",
|
||
|
expr
|
||
|
])
|
||
|
if response is None:
|
||
|
raise RuntimeError("Nix eval expression returned no response")
|
||
|
try:
|
||
|
return json.loads(response)
|
||
|
except (TypeError, ValueError):
|
||
|
raise RuntimeError("Nix eval response could not be parsed from JSON")
|
||
|
|
||
|
@classmethod
|
||
|
async def hash_to_sri(cls, algorithm: str, value: str) -> Optional[str]:
|
||
|
return await cls._run([
|
||
|
"hash",
|
||
|
"to-sri",
|
||
|
"--type", algorithm,
|
||
|
value
|
||
|
])
|
||
|
|
||
|
|
||
|
class HomeAssistant:
|
||
|
def __init__(self, session: ClientSession):
|
||
|
self._session = session
|
||
|
|
||
|
async def get_latest_core_version(
|
||
|
self,
|
||
|
owner: str = "home-assistant",
|
||
|
repo: str = "core"
|
||
|
) -> str:
|
||
|
async with self._session.get(
|
||
|
f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
|
||
|
) as response:
|
||
|
document = await response.json()
|
||
|
try:
|
||
|
return str(document.get("name"))
|
||
|
except KeyError:
|
||
|
raise RuntimeError("No tag name in response document")
|
||
|
|
||
|
|
||
|
async def get_latest_frontend_version(
|
||
|
self,
|
||
|
core_version: str
|
||
|
) -> str:
|
||
|
async with self._session.get(
|
||
|
f"https://raw.githubusercontent.com/home-assistant/core/{core_version}/homeassistant/components/frontend/manifest.json"
|
||
|
) as response:
|
||
|
document = await response.json(content_type="text/plain")
|
||
|
|
||
|
requirements = [
|
||
|
requirement
|
||
|
for requirement in document.get("requirements", [])
|
||
|
if requirement.startswith("home-assistant-frontend==")
|
||
|
]
|
||
|
|
||
|
if len(requirements) > 1:
|
||
|
raise RuntimeError(
|
||
|
"Found more than one version specifier for the frontend package"
|
||
|
)
|
||
|
elif len(requirements) == 1:
|
||
|
requirement = requirements.pop()
|
||
|
_, version = requirement.split("==", maxsplit=1)
|
||
|
return str(version)
|
||
|
else:
|
||
|
raise RuntimeError(
|
||
|
"Found no version specifier for frontend package"
|
||
|
)
|
||
|
|
||
|
|
||
|
async def update_core(self, old_version: str, new_version: str) -> None:
|
||
|
old_sdist_hash = str(await Nix.eval("home-assistant.sdist.outputHash"))
|
||
|
new_sdist_hash = await Nurl.prefetch("https://pypi.org/project/homeassistant/", new_version)
|
||
|
print(f"sdist: {old_sdist_hash} -> {new_sdist_hash}")
|
||
|
|
||
|
old_git_hash = str(await Nix.eval("home-assistant.src.outputHash"))
|
||
|
new_git_hash = await Nurl.prefetch("https://github.com/home-assistant/core/", new_version)
|
||
|
print(f"git: {old_git_hash} -> {new_git_hash}")
|
||
|
|
||
|
with File("pkgs/servers/home-assistant/default.nix") as file:
|
||
|
file.substitute("hassVersion", old_version, new_version)
|
||
|
file.substitute("hash", old_sdist_hash, new_sdist_hash)
|
||
|
file.substitute("hash", old_git_hash, new_git_hash)
|
||
|
|
||
|
async def update_frontend(self, old_version: str, new_version: str) -> None:
|
||
|
old_hash = str(await Nix.eval("home-assistant.frontend.src.outputHash"))
|
||
|
new_hash = await Nurl.prefetch(
|
||
|
"https://pypi.org/project/home_assistant_frontend/",
|
||
|
new_version,
|
||
|
"-A", "format", "wheel",
|
||
|
"-A", "dist", "py3",
|
||
|
"-A", "python", "py3"
|
||
|
)
|
||
|
print(f"frontend: {old_hash} -> {new_hash}")
|
||
|
|
||
|
with File("pkgs/servers/home-assistant/frontend.nix") as file:
|
||
|
file.substitute("version", old_version, new_version)
|
||
|
file.substitute("hash", old_hash, new_hash)
|
||
|
|
||
|
async def update_components(self):
|
||
|
await run_async([
|
||
|
f"{ROOT}/pkgs/servers/home-assistant/update-component-packages.py"
|
||
|
])
|
||
|
|
||
|
|
||
|
async def main():
|
||
|
headers = {}
|
||
|
if token := os.environ.get("GITHUB_TOKEN", None):
|
||
|
headers.update({"GITHUB_TOKEN": token})
|
||
|
|
||
|
async with aiohttp.ClientSession(headers=headers) as client:
|
||
|
hass = HomeAssistant(client)
|
||
|
|
||
|
core_current = str(await Nix.eval("home-assistant.version"))
|
||
|
core_latest = await hass.get_latest_core_version()
|
||
|
|
||
|
if Version(core_latest) > Version(core_current):
|
||
|
print(f"New Home Assistant version {core_latest} is available")
|
||
|
await hass.update_core(str(core_current), str(core_latest))
|
||
|
|
||
|
frontend_current = str(await Nix.eval("home-assistant.frontend.version"))
|
||
|
frontend_latest = await hass.get_latest_frontend_version(str(core_latest))
|
||
|
|
||
|
if Version(frontend_latest) > Version(frontend_current):
|
||
|
await hass.update_frontend(str(frontend_current), str(frontend_latest))
|
||
|
|
||
|
await hass.update_components()
|
||
|
|
||
|
else:
|
||
|
print(f"Home Assistant {core_current} is still the latest version.")
|
||
|
|
||
|
# wait for async client sessions to close
|
||
|
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
|
||
|
await asyncio.sleep(0)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
run_sync(["pyright", __file__])
|
||
|
run_sync(["ruff", "check", "--ignore=E501", __file__])
|
||
|
run_sync(["isort", __file__])
|
||
|
asyncio.run(main())
|