setup-cockpit/install.py

551 lines
17 KiB
Python
Executable file

#!/usr/bin/python3
"""Main installer file.
Inspired by https://github.com/dmerejkowsky/dotfiles
"""
import argparse
import subprocess
import sys
from typing import List, Optional
from urllib.request import urlretrieve
import cli_ui as ui
from path import Path # type: ignore
from ruamel.yaml import YAML # type: ignore
class Installer:
"""Regroups all the installation methods listed in the yaml conf file."""
def __init__(
self,
config: str,
first_install: bool = False,
force: bool = False,
hide_commands: bool = False,
update: bool = False,
):
self.verify_secrets()
self.base_dir = Path.getcwd()
yaml = YAML(typ="safe")
self.conf = yaml.load(Path(config).text())
self.force = force
self.first_install = first_install
self.hide_commands = hide_commands
self.home = Path("~").expanduser()
self.operating_system = self.define_os()
self.update = update
def verify_secrets(self) -> None:
"""The repository contains a secrets.template, that must be taken care of
by the user. If the secrets is wrong, or missing, fail.
"""
try:
with open("secrets.template", "r") as template_file:
template_content = [
line.strip()
for line in template_file.read().split("\n\n")
if not line.startswith("#") and line.strip()
]
except FileNotFoundError:
print("No 'secrets.template' file found. Did you delete it?")
sys.exit(1)
try:
with open("secrets", "r") as secrets_file:
secrets_content = [
line.strip()
for line in secrets_file.readlines()
if not line.startswith("#") and line.strip()
]
except FileNotFoundError:
print("No 'secrets' file found. Did you forget to create it?")
sys.exit(1)
template_keys = [line.split("=")[0] for line in template_content if "=" in line]
secrets_keys = [line.split("=")[0] for line in secrets_content if "=" in line]
# Check that the template file and secrets file have the same keys
if template_keys != secrets_keys:
print(
"'secrets.template' and 'secrets' don't have the same keys.\n"
"Perhaps you have forgotten to add some?"
)
sys.exit(1)
secrets_values = [line.split("=")[1] for line in secrets_content if "=" in line]
# Check that each key has a value that has been set:
for i, value in enumerate(secrets_values):
if not value:
key = secrets_keys[i]
print(f" 'secrets' file has no value for {key}, please add one.")
sys.exit(1)
def define_os(self) -> str:
"""Define what OS we are using."""
oses = ("arch", "debian", "manjaro", "ubuntu")
os_release = Path("/etc/os-release")
operating_system = None
for operating_system in oses:
if operating_system in os_release.read_text().lower():
ui.info_3("Found", operating_system, "based distribution")
break
if operating_system is None:
ui.warning("Operating system wasn't found")
operating_system = input("What is it ? ")
if operating_system in ("arch", "debian"):
self._update_system(operating_system)
return operating_system
ui.fatal("I only support Arch and Debian based distros.")
return "Unsupported OS"
def _update_system(self, operating_system: str) -> None:
# defaults for subprocess.run()
this_stdout: Optional[int] = subprocess.DEVNULL if self.hide_commands else None
this_stderr: Optional[int] = subprocess.DEVNULL if self.hide_commands else None
if operating_system in ("arch", "manjaro"):
command = "sudo pacman -Syu"
operating_system = "arch"
elif operating_system in ("debian", "ubuntu"):
command = "sudo apt update"
operating_system = "debian"
else:
raise AssertionError("The operating system should have been defined")
subprocess.run(
command,
check=True,
shell=True,
stdout=this_stdout,
stderr=this_stderr,
)
def evaluate_condition(self, condition: str) -> bool:
"""Run a bash command. On success return True, else return False."""
conditions = {
"arch": self.operating_system == "arch",
"debian": self.operating_system == "debian",
"force": self.force,
"no force": not self.force,
"first install": self.first_install,
"no first install": not self.first_install,
"update": self.update,
"no update": not self.update,
}
if condition in conditions.keys():
return conditions[condition]
command = subprocess.run(condition, check=False, shell=True)
return command.returncode == 0
def generate_process_list(self) -> None:
"""Use the config file to generate a file with the list of programs."""
ui.info("Generating 'process_list.txt'")
with open("process_list.txt", "w") as process_file:
process_file.write("# " + "\n# ".join(self.conf.keys()) + "\n")
def pretty_path(self, path: Path) -> str:
"""Put the ~/ back in the path."""
relpath: str = path.relpath(self.home)
return "~/" + relpath
def do_clone(
self, url: str, dest: str, branch: str = "master", condition: str = "true"
) -> None:
"""Clone if new, remove and clone if force, pull if update."""
if not self.evaluate_condition(condition):
ui.info_2("Skipping", url)
return
p_dest = Path(dest).expanduser()
pretty_dest = self.pretty_path(p_dest)
p_dest.parent.makedirs_p()
if p_dest.exists():
if self.force:
ui.info_2("Removing", pretty_dest)
p_dest.rmtree()
elif self.update:
ui.info_2("Updating", pretty_dest)
self.do_run("cd {} && git pull".format(p_dest))
return
else:
ui.info_2("Skipping", pretty_dest)
return
ui.info_2("Cloning", url, "->", pretty_dest)
git_command = "git clone {} {} --branch {}".format(url, p_dest, branch)
subprocess.run(git_command, check=True, shell=True)
def do_copy(self, src: str, dest: str) -> None:
"""Copy two files."""
p_dest = Path(dest).expanduser()
pretty_dest = self.pretty_path(p_dest)
p_dest.parent.makedirs_p()
if p_dest.exists() and not self.force:
ui.info_2("Skipping", pretty_dest)
return
p_src: Path = self.base_dir / src
ui.info_2("Copy", p_src, "->", self.pretty_path(p_src))
p_src.copy(p_dest)
def do_download(self, *, url: str, dest: str, executable: bool = False) -> None:
"""Retrieve a file from a url."""
p_dest = Path(dest).expanduser()
pretty_dest = self.pretty_path(p_dest)
p_dest.parent.makedirs_p()
if p_dest.exists() and not self.force:
ui.info_2("Skipping", pretty_dest)
else:
ui.info_2("Fetching", url, "->", pretty_dest)
urlretrieve(url, p_dest)
if executable:
p_dest.chmod(0o755)
def do_install(self, *packages: str, **os_specific_packages: List[str]) -> None:
"""Install packages with OS-specific package manager.
Packages can either be in a tuple for non OS-specific packages, or in a
dict for OS-specific packages.
"""
if not packages:
try:
# dict only contains os-specific packages
if "both" not in os_specific_packages.keys():
packages = tuple(os_specific_packages[self.operating_system])
# some packages for other operating systems are in the dict
elif self.operating_system not in os_specific_packages.keys():
packages = tuple(os_specific_packages["both"])
# dict contains specific and non-specific packages
else:
packages = tuple(
os_specific_packages[self.operating_system]
+ os_specific_packages["both"]
)
except KeyError:
ui.warning("No packages for {}".format(self.operating_system))
failed_installs = []
# defaults for subprocess.run()
this_stdout: Optional[int] = subprocess.DEVNULL if self.hide_commands else None
this_stderr: Optional[int] = subprocess.DEVNULL if self.hide_commands else None
ui.info_2("Installing packages...")
for i, package in enumerate(packages):
if self.operating_system == "arch":
command = "sudo pacman -S --needed --noconfirm {}".format(package)
elif self.operating_system == "debian":
command = "sudo apt install -y {}".format(package)
ui.info_count(i, len(packages), package, end="... ")
install = subprocess.run(
command,
check=False,
shell=True,
stdout=this_stdout,
stderr=this_stderr,
)
if install.returncode != 0:
ui.info(ui.cross)
failed_installs.append(package)
else:
ui.info(ui.check)
if len(failed_installs) > 0:
ui.warning("These packages failed to install:")
for failed in failed_installs:
ui.info(" ", failed)
ui.warning(
f"Are the packages really meant for {self.operating_system} systems?"
)
def do_run(self, command: str, condition: str = "true") -> None:
"""Run a command."""
if not self.evaluate_condition(condition):
ui.info_2("Skipping", "`{}`".format(command))
return
ui.info_2("Running", "`{}`".format(command))
runned = subprocess.run(command, check=False, shell=True)
if runned.returncode != 0:
ui.warning("`{}` failed".format(command))
self.base_dir.chdir()
def do_symlink(self, src: str, dest: str, condition: str = "true") -> None:
"""Make a symlink to a file."""
self._do_symlink(src, dest, condition=condition, is_dir=False)
def do_symlink_dir(self, src: str, dest: str, condition: str = "true") -> None:
"""Make a symlink to a dir."""
self._do_symlink(src, dest, condition=condition, is_dir=True)
def _do_symlink(self, src: str, dest: str, *, condition: str, is_dir: bool) -> None:
p_src = Path(src).expanduser()
pretty_src = self.pretty_path(p_src)
p_dest = Path(dest).expanduser()
pretty_dest = self.pretty_path(p_dest)
if not self.evaluate_condition(condition):
ui.info_2("Skipping", pretty_dest)
return
if is_dir:
p_dest.parent.parent.makedirs_p()
else:
p_dest.parent.makedirs_p()
if p_dest.exists() and not self.force:
if not ui.ask_yes_no(f"{pretty_dest} already exists. Overwrite?"):
ui.info_2("Skipping", pretty_dest)
return
ui.info_2("Deleting", pretty_dest)
p_dest.remove()
if p_dest.islink():
p_dest.remove()
ui.info_2("Symlink", pretty_dest, "->", pretty_src)
if p_src.startswith("/"):
src_full = p_src
else:
src_full = self.base_dir / p_src
src_full.symlink(p_dest)
def do_append(self, dest: str, content: str, condition: str = "true") -> None:
"""Append to a file."""
self._do_write(dest, content, condition=condition, append=True)
def do_write(self, dest: str, content: str, condition: str = "true") -> None:
"""Write into a file."""
self._do_write(dest, content, condition=condition, append=False)
def _do_write(
self, dest: str, content: str, *, condition: str, append: bool
) -> None:
p_dest = Path(dest).expanduser()
pretty_dest = self.pretty_path(p_dest)
if not self.evaluate_condition(condition):
ui.info_2("Skipping", pretty_dest)
return
if p_dest.exists() and not self.force and not append:
if not ui.ask_yes_no(f"{pretty_dest} already exists. Overwrite?"):
ui.info_2("Skipping", pretty_dest)
return
ui.info_2("Creating", pretty_dest)
p_dest.parent.makedirs_p()
content = content.format(base_dir=self.base_dir, home=self.home)
if not content.endswith("\n"):
content += "\n"
p_dest.write_text(content, append=append)
def process(self, programs: Optional[List[str]] = None) -> None:
"""Install the programs provided.
If no program is provided, use the conf file to find the programs.
"""
if not programs:
ui.info(
"No programs were specified.", "Fetching from the configuration file."
)
programs = list(self.conf.keys())
for program in programs:
if ui.ask_yes_no("Do you wish to install {}?".format(program)):
self.process_program(program)
else:
ui.info_2("Skipping {}".format(program))
else:
for program in programs:
self.process_program(program)
def process_program(self, program: str) -> None:
"""Install one program (called by self.process()).
Call indivdual methods from the conf file.
"""
ui.info(ui.green, program)
ui.info("-" * len(program))
try:
todo = self.conf[program]
except KeyError:
ui.fatal("'{}' wasn't found in conf file.".format(program))
for action in todo:
name = list(action.keys())[0]
params = action[name]
func = getattr(self, "do_{}".format(name))
if isinstance(params, dict):
func(**params)
else:
func(*params)
ui.info()
def read_programs_from_process_list() -> List[str]:
"""Read the process_list file to return a list of programs to process."""
ui.info("Found these programs in 'process_list.txt':")
with open("process_list.txt", "r") as process_file:
content = process_file.readlines()
programs = [
line.strip() for line in content if not line.startswith("#") and len(line) > 0
]
for program in programs:
ui.info_1(program)
return programs
def main() -> None:
"""Parse args and instantiate the Installer."""
parser = argparse.ArgumentParser()
parser.add_argument(
"programs",
nargs="*",
help="Programs to process, can be none",
)
parser.add_argument(
"-c",
"--config",
help="Use CONFIG file (default: 'configs.yml')",
)
parser.add_argument(
"-i",
"--first_install",
action="store_true",
help="Assume the program is not installed",
)
parser.add_argument(
"-F",
"--force",
action="store_true",
help="Overwrite existing files",
)
parser.add_argument(
"-f",
"--from_file",
action="store_true",
help="Use 'process_list.txt' as programs to process",
)
parser.add_argument(
"-g",
"--generate",
action="store_true",
help="Generate a file containing all the programs from the config file",
)
parser.add_argument(
"-H",
"--hide_commands",
action="store_true",
help="Hide command outputs",
)
parser.add_argument(
"-l",
"--list_programs",
action="store_true",
help="List all programs from the configuration file",
)
parser.add_argument(
"-u",
"--update",
action="store_true",
help="Update programs",
)
args = parser.parse_args()
programs = args.programs
if args.config:
config = args.config
else:
config = "configs.yml"
first_install = args.first_install
force = args.force
from_file = args.from_file
generate = args.generate
hide_commands = args.hide_commands
list_programs = args.list_programs
update = args.update
installer = Installer(
config=config,
first_install=first_install,
force=force,
hide_commands=hide_commands,
update=update,
)
if generate:
installer.generate_process_list()
return
if list_programs:
ui.info("The following items were found in the config file:")
for program in installer.conf.keys():
ui.info_1(program)
return
if from_file:
assert (
not programs
), "Can't use 'process_list.txt' if you also specify programs!"
programs = read_programs_from_process_list()
installer.process(programs=programs)
if __name__ == "__main__":
main()