tutor/tests/test_utils.py

235 lines
9.7 KiB
Python

import base64
import os
import tempfile
import unittest
from io import StringIO
from unittest.mock import MagicMock, mock_open, patch
from tutor import exceptions, utils
class UtilsTests(unittest.TestCase):
def test_common_domain(self) -> None:
self.assertEqual(
"domain.com", utils.common_domain("sub1.domain.com", "sub2.domain.com")
)
self.assertEqual(
"sub1.domain.com",
utils.common_domain("sub1.domain.com", "sub2.sub1.domain.com"),
)
self.assertEqual("com", utils.common_domain("domain1.com", "domain2.com"))
self.assertEqual(
"domain.com", utils.common_domain("sub.domain.com", "ub.domain.com")
)
def test_reverse_host(self) -> None:
self.assertEqual("com.google.www", utils.reverse_host("www.google.com"))
def test_list_if(self) -> None:
self.assertEqual('["cms"]', utils.list_if([("lms", False), ("cms", True)]))
def test_encrypt_success(self) -> None:
password = "passw0rd"
encrypted1 = utils.encrypt(password)
encrypted2 = utils.encrypt(password)
self.assertNotEqual(encrypted1, encrypted2)
self.assertTrue(utils.verify_encrypted(encrypted1, password))
self.assertTrue(utils.verify_encrypted(encrypted2, password))
def test_encrypt_fail(self) -> None:
password = "passw0rd"
self.assertFalse(utils.verify_encrypted(password, password))
def test_ensure_file_directory_exists(self) -> None:
with tempfile.TemporaryDirectory() as root:
tempPath = os.path.join(root, "tempDir", "tempFile")
utils.ensure_file_directory_exists(tempPath)
self.assertTrue(os.path.exists(os.path.dirname(tempPath)))
def test_ensure_file_directory_exists_dirExists(self) -> None:
with tempfile.TemporaryDirectory() as root:
tempPath = os.path.join(root, "tempDir")
os.makedirs(tempPath)
self.assertRaises(
exceptions.TutorError, utils.ensure_file_directory_exists, tempPath
)
def test_long_to_base64(self) -> None:
self.assertEqual(
b"\x00", base64.urlsafe_b64decode(utils.long_to_base64(0) + "==")
)
def test_rsa_key(self) -> None:
key = utils.rsa_private_key(1024)
imported = utils.rsa_import_key(key)
self.assertIsNotNone(imported.e)
self.assertIsNotNone(imported.d)
self.assertIsNotNone(imported.n)
self.assertIsNotNone(imported.p)
self.assertIsNotNone(imported.q)
def test_is_root(self) -> None:
with patch("sys.platform", "win32"):
with patch.object(utils, "get_user_id", return_value=42):
self.assertFalse(utils.is_root())
with patch.object(utils, "get_user_id", return_value=0):
self.assertFalse(utils.is_root())
with patch("sys.platform", "linux"):
with patch.object(utils, "get_user_id", return_value=42):
self.assertFalse(utils.is_root())
with patch.object(utils, "get_user_id", return_value=0):
self.assertTrue(utils.is_root())
@patch("sys.platform", "win32")
def test_is_root_win32(self) -> None:
result = utils.is_root()
self.assertFalse(result)
def test_get_user_id(self) -> None:
with patch("os.getuid", return_value=42):
self.assertEqual(42, utils.get_user_id())
with patch("sys.platform", new="win32"):
self.assertEqual(0, utils.get_user_id())
@patch("sys.platform", "win32")
def test_get_user_id_win32(self) -> None:
result = utils.get_user_id()
self.assertEqual(0, result)
@patch("sys.stdout", new_callable=StringIO)
@patch("subprocess.Popen", autospec=True)
def test_execute_exit_without_error(
self, mock_popen: MagicMock, mock_stdout: StringIO
) -> None:
process = mock_popen.return_value
mock_popen.return_value.__enter__.return_value = process
process.wait.return_value = 0
process.communicate.return_value = ("output", "error")
result = utils.execute("echo", "")
self.assertEqual(0, result)
self.assertEqual("echo ''\n", mock_stdout.getvalue())
self.assertEqual(1, process.wait.call_count)
process.kill.assert_not_called()
@patch("sys.stdout", new_callable=StringIO)
@patch("subprocess.Popen", autospec=True)
def test_execute_nested_command(
self, mock_popen: MagicMock, mock_stdout: StringIO
) -> None:
process = mock_popen.return_value
mock_popen.return_value.__enter__.return_value = process
process.wait.return_value = 0
process.communicate.return_value = ("output", "error")
result = utils.execute("bash", "-c", "echo -n hi")
self.assertEqual(0, result)
self.assertEqual("bash -c 'echo -n hi'\n", mock_stdout.getvalue())
self.assertEqual(1, process.wait.call_count)
process.kill.assert_not_called()
@patch("sys.stdout", new_callable=StringIO)
@patch("subprocess.Popen", autospec=True)
def test_execute_exit_with_error(
self, mock_popen: MagicMock, mock_stdout: StringIO
) -> None:
process = mock_popen.return_value
mock_popen.return_value.__enter__.return_value = process
process.wait.return_value = 1
process.communicate.return_value = ("output", "error")
self.assertRaises(exceptions.TutorError, utils.execute, "echo", "")
self.assertEqual("echo ''\n", mock_stdout.getvalue())
self.assertEqual(1, process.wait.call_count)
process.kill.assert_not_called()
@patch("sys.stdout", new_callable=StringIO)
@patch("subprocess.Popen", autospec=True)
def test_execute_throw_exception(
self, mock_popen: MagicMock, mock_stdout: StringIO
) -> None:
process = mock_popen.return_value
mock_popen.return_value.__enter__.return_value = process
process.wait.side_effect = ZeroDivisionError("Exception occurred.")
self.assertRaises(ZeroDivisionError, utils.execute, "echo", "")
self.assertEqual("echo ''\n", mock_stdout.getvalue())
self.assertEqual(2, process.wait.call_count)
process.kill.assert_called_once()
@patch("sys.stdout", new_callable=StringIO)
@patch("subprocess.Popen", autospec=True)
def test_execute_keyboard_interrupt(
self, mock_popen: MagicMock, mock_stdout: StringIO
) -> None:
process = mock_popen.return_value
mock_popen.return_value.__enter__.return_value = process
process.wait.side_effect = KeyboardInterrupt()
with self.assertRaises(KeyboardInterrupt):
utils.execute("echo", "")
output = mock_stdout.getvalue()
self.assertIn("echo", output)
self.assertEqual(2, process.wait.call_count)
process.kill.assert_called_once()
@patch("sys.platform", "win32")
def test_check_macos_docker_memory_win32_should_skip(self) -> None:
utils.check_macos_docker_memory()
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin(self) -> None:
with patch("tutor.utils.open", mock_open(read_data='{"memoryMiB": 4096}')):
utils.check_macos_docker_memory()
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_filenotfound(self) -> None:
with patch("tutor.utils.open", mock_open()) as mock_open_settings:
mock_open_settings.return_value.__enter__.side_effect = FileNotFoundError
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertIn("Error accessing Docker settings file", e.exception.args[0])
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_json_decode_error(self) -> None:
with patch("tutor.utils.open", mock_open(read_data="invalid")):
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertIn("invalid JSON", e.exception.args[0])
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_key_error(self) -> None:
with patch("tutor.utils.open", mock_open(read_data="{}")):
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertIn("key 'memoryMiB' not found", e.exception.args[0])
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_type_error(self) -> None:
with patch(
"tutor.utils.open", mock_open(read_data='{"memoryMiB": "invalidstring"}')
):
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertIn("Unexpected JSON data", e.exception.args[0])
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_insufficient_memory(self) -> None:
with patch("tutor.utils.open", mock_open(read_data='{"memoryMiB": 4095}')):
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertEqual(
"Docker is configured to allocate 4095 MiB RAM, less than the recommended 4096 MiB",
e.exception.args[0],
)
@patch("sys.platform", "darwin")
def test_check_macos_docker_memory_darwin_encoding_error(self) -> None:
with patch("tutor.utils.open", mock_open()) as mock_open_settings:
mock_open_settings.return_value.__enter__.side_effect = TypeError
with self.assertRaises(exceptions.TutorError) as e:
utils.check_macos_docker_memory()
self.assertIn("Text encoding error", e.exception.args[0])