7
0
mirror of https://github.com/ChristianLight/tutor.git synced 2024-05-28 11:50:49 +00:00
tutor/tutor/utils.py
Régis Behmo 0a670d7ead refactor: add type annotations
Annotations were generated with pyannotate:
https://github.com/dropbox/pyannotate

We are running in strict mode, which is awesome!

This affects a large part of the code base, which might be an issue for
people running a fork of Tutor. Nonetheless, the behavior should not be
affected. If anything, this process has helped find and resolve a few
type-related bugs. Thus, this is not considered as a breaking change.
2021-03-15 21:46:55 +01:00

210 lines
5.5 KiB
Python

import base64
import json
import os
import random
import shutil
import string
import struct
import subprocess
import sys
from typing import List, Tuple
import click
from Crypto.Protocol.KDF import bcrypt, bcrypt_check
from Crypto.PublicKey import RSA
from Crypto.PublicKey.RSA import RsaKey
from . import exceptions, fmt
def encrypt(text: str) -> str:
"""
Encrypt some textual content with bcrypt.
https://pycryptodome.readthedocs.io/en/latest/src/protocol/kdf.html#bcrypt
The encryption process is compatible with the password verification performed by
`htpasswd <https://httpd.apache.org/docs/2.4/programs/htpasswd.html>`__.
"""
return bcrypt(text.encode(), 12).decode()
def verify_encrypted(encrypted: str, text: str) -> bool:
"""
Return True/False if the encrypted content corresponds to the unencrypted text.
"""
try:
bcrypt_check(text.encode(), encrypted.encode())
return True
except ValueError:
return False
def ensure_file_directory_exists(path: str) -> None:
"""
Create file's base directory if it does not exist.
"""
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
def random_string(length: int) -> str:
return "".join(
[random.choice(string.ascii_letters + string.digits) for _ in range(length)]
)
def list_if(services: List[Tuple[str, bool]]) -> str:
return json.dumps([service[0] for service in services if service[1]])
def common_domain(d1: str, d2: str) -> str:
"""
Return the common domain between two domain names.
Ex: "sub1.domain.com" and "sub2.domain.com" -> "domain.com"
"""
components1 = d1.split(".")[::-1]
components2 = d2.split(".")[::-1]
common = []
for c in range(0, min(len(components1), len(components2))):
if components1[c] == components2[c]:
common.append(components1[c])
else:
break
return ".".join(common[::-1])
def reverse_host(domain: str) -> str:
"""
Return the reverse domain name, java-style.
Ex: "www.google.com" -> "com.google.www"
"""
return ".".join(domain.split(".")[::-1])
def rsa_private_key(bits: int = 2048) -> str:
"""
Export an RSA private key in PEM format.
"""
key = RSA.generate(bits)
return key.export_key().decode()
def rsa_import_key(key: str) -> RsaKey:
"""
Import PEM-formatted RSA key and return the corresponding object.
"""
return RSA.import_key(key.encode())
def long_to_base64(n: int) -> str:
"""
Borrowed from jwkest.__init__
"""
def long2intarr(long_int: int) -> List[int]:
_bytes: List[int] = []
while long_int:
long_int, r = divmod(long_int, 256)
_bytes.insert(0, r)
return _bytes
bys = long2intarr(n)
data = struct.pack("%sB" % len(bys), *bys)
if not data:
data = b"\x00"
s = base64.urlsafe_b64encode(data).rstrip(b"=")
return s.decode("ascii")
def is_root() -> bool:
"""
Check whether tutor is being run as root/sudo.
"""
if sys.platform == "win32":
# Don't even try
return False
return get_user_id() == 0
def get_user_id() -> int:
"""
Portable way to get user ID. Note: I have no idea if it actually works on windows...
"""
if sys.platform == "win32":
# Don't even try
return 0
return os.getuid()
def docker_run(*command: str) -> int:
args = ["run", "--rm"]
if is_a_tty():
args.append("-it")
return docker(*args, *command)
def docker(*command: str) -> int:
if shutil.which("docker") is None:
raise exceptions.TutorError(
"docker is not installed. Please follow instructions from https://docs.docker.com/install/"
)
return execute("docker", *command)
def docker_compose(*command: str) -> int:
if shutil.which("docker-compose") is None:
raise exceptions.TutorError(
"docker-compose is not installed. Please follow instructions from https://docs.docker.com/compose/install/"
)
return execute("docker-compose", *command)
def kubectl(*command: str) -> int:
if shutil.which("kubectl") is None:
raise exceptions.TutorError(
"kubectl is not installed. Please follow instructions from https://kubernetes.io/docs/tasks/tools/install-kubectl/"
)
return execute("kubectl", *command)
def is_a_tty() -> bool:
"""
Return True if stdin is able to allocate a tty. Tty allocation sometimes cannot be
enabled, for instance in cron jobs
"""
return os.isatty(sys.stdin.fileno())
def execute(*command: str) -> int:
click.echo(fmt.command(" ".join(command)))
with subprocess.Popen(command) as p:
try:
result = p.wait(timeout=None)
except KeyboardInterrupt:
p.kill()
p.wait()
raise
except Exception as e:
p.kill()
p.wait()
raise exceptions.TutorError(
"Command failed: {}".format(" ".join(command))
) from e
if result > 0:
raise exceptions.TutorError(
"Command failed with status {}: {}".format(result, " ".join(command))
)
return result
def check_output(*command: str) -> bytes:
click.echo(fmt.command(" ".join(command)))
try:
return subprocess.check_output(command)
except Exception as e:
raise exceptions.TutorError(
"Command failed: {}".format(" ".join(command))
) from e