mirror of
https://github.com/ChristianLight/tutor.git
synced 2024-11-14 17:24:08 +00:00
6a3138152f
Whenever Tutor executes a shell command, it logs out said command in order to aid in end user understanding/debugging. In some cases (notably, when running jobs in containers) the logged command was not accurately quoted. The command was run correctly, because it was passed in pieces to ``subprocess.Popen``, which correctly joins the pieces together into a valid POSIX shell command; however, the logged version of the command was constructed by simply joining the pieces with spaces. This usually works, but breaks down when running complex shell commands with nested quoting. This commit changes the logging to use ``shlex.join``, which joins command pieces together in a POSIX-compliant way, presumably the same way as ``subprocess.Popen``. Example: tutor local importdemocourse runs the shell command: docker-compose -f /home/kyle/.local/share/tutor/env/local/docker-compose.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.prod.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.tmp.yml --project-name tutor_local -f /home/kyle/.local/share/tutor/env/local/docker-compose.jobs.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.jobs.tmp.yml run --rm cms-job sh -e -c 'echo "Loading settings $DJANGO_SE... (several more script lines) ...eindex_course --all --setup' but the logged shell command was: docker-compose -f /home/kyle/.local/share/tutor/env/local/docker-compose.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.prod.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.tmp.yml --project-name tutor_local -f /home/kyle/.local/share/tutor/env/local/docker-compose.jobs.yml -f /home/kyle/.local/share/tutor/env/local/docker-compose.jobs.tmp.yml run --rm cms-job sh -e -c echo "Loading settings $DJANGO_SE... (several more script lines) ...eindex_course --all --setup which will not run if copied and pasted back into the user's terminal, as the importdemocourse shell script is unquoted.
227 lines
9.2 KiB
Python
227 lines
9.2 KiB
Python
import base64
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from io import StringIO
|
|
from unittest.mock import MagicMock, mock_open, patch
|
|
|
|
from tutor import exceptions, utils
|
|
|
|
|
|
class UtilsTests(unittest.TestCase):
|
|
def test_common_domain(self) -> None:
|
|
self.assertEqual(
|
|
"domain.com", utils.common_domain("sub1.domain.com", "sub2.domain.com")
|
|
)
|
|
self.assertEqual(
|
|
"sub1.domain.com",
|
|
utils.common_domain("sub1.domain.com", "sub2.sub1.domain.com"),
|
|
)
|
|
self.assertEqual("com", utils.common_domain("domain1.com", "domain2.com"))
|
|
self.assertEqual(
|
|
"domain.com", utils.common_domain("sub.domain.com", "ub.domain.com")
|
|
)
|
|
|
|
def test_reverse_host(self) -> None:
|
|
self.assertEqual("com.google.www", utils.reverse_host("www.google.com"))
|
|
|
|
def test_list_if(self) -> None:
|
|
self.assertEqual('["cms"]', utils.list_if([("lms", False), ("cms", True)]))
|
|
|
|
def test_encrypt_success(self) -> None:
|
|
password = "passw0rd"
|
|
encrypted1 = utils.encrypt(password)
|
|
encrypted2 = utils.encrypt(password)
|
|
self.assertNotEqual(encrypted1, encrypted2)
|
|
self.assertTrue(utils.verify_encrypted(encrypted1, password))
|
|
self.assertTrue(utils.verify_encrypted(encrypted2, password))
|
|
|
|
def test_encrypt_fail(self) -> None:
|
|
password = "passw0rd"
|
|
self.assertFalse(utils.verify_encrypted(password, password))
|
|
|
|
def test_ensure_file_directory_exists(self) -> None:
|
|
with tempfile.TemporaryDirectory() as root:
|
|
tempPath = os.path.join(root, "tempDir", "tempFile")
|
|
utils.ensure_file_directory_exists(tempPath)
|
|
self.assertTrue(os.path.exists(os.path.dirname(tempPath)))
|
|
|
|
def test_ensure_file_directory_exists_dirExists(self) -> None:
|
|
with tempfile.TemporaryDirectory() as root:
|
|
tempPath = os.path.join(root, "tempDir")
|
|
os.makedirs(tempPath)
|
|
self.assertRaises(
|
|
exceptions.TutorError, utils.ensure_file_directory_exists, tempPath
|
|
)
|
|
|
|
def test_long_to_base64(self) -> None:
|
|
self.assertEqual(
|
|
b"\x00", base64.urlsafe_b64decode(utils.long_to_base64(0) + "==")
|
|
)
|
|
|
|
def test_rsa_key(self) -> None:
|
|
key = utils.rsa_private_key(1024)
|
|
imported = utils.rsa_import_key(key)
|
|
self.assertIsNotNone(imported.e)
|
|
self.assertIsNotNone(imported.d)
|
|
self.assertIsNotNone(imported.n)
|
|
self.assertIsNotNone(imported.p)
|
|
self.assertIsNotNone(imported.q)
|
|
|
|
def test_is_root(self) -> None:
|
|
result = utils.is_root()
|
|
self.assertFalse(result)
|
|
|
|
@patch("sys.platform", "win32")
|
|
def test_is_root_win32(self) -> None:
|
|
result = utils.is_root()
|
|
self.assertFalse(result)
|
|
|
|
def test_get_user_id(self) -> None:
|
|
result = utils.get_user_id()
|
|
if sys.platform == "win32":
|
|
self.assertEqual(0, result)
|
|
else:
|
|
self.assertNotEqual(0, result)
|
|
|
|
@patch("sys.platform", "win32")
|
|
def test_get_user_id_win32(self) -> None:
|
|
result = utils.get_user_id()
|
|
self.assertEqual(0, result)
|
|
|
|
@patch("sys.stdout", new_callable=StringIO)
|
|
@patch("subprocess.Popen", autospec=True)
|
|
def test_execute_exit_without_error(
|
|
self, mock_popen: MagicMock, mock_stdout: StringIO
|
|
) -> None:
|
|
process = mock_popen.return_value
|
|
mock_popen.return_value.__enter__.return_value = process
|
|
process.wait.return_value = 0
|
|
process.communicate.return_value = ("output", "error")
|
|
|
|
result = utils.execute("echo", "")
|
|
self.assertEqual(0, result)
|
|
self.assertEqual("echo ''\n", mock_stdout.getvalue())
|
|
self.assertEqual(1, process.wait.call_count)
|
|
process.kill.assert_not_called()
|
|
|
|
@patch("sys.stdout", new_callable=StringIO)
|
|
@patch("subprocess.Popen", autospec=True)
|
|
def test_execute_nested_command(
|
|
self, mock_popen: MagicMock, mock_stdout: StringIO
|
|
) -> None:
|
|
process = mock_popen.return_value
|
|
mock_popen.return_value.__enter__.return_value = process
|
|
process.wait.return_value = 0
|
|
process.communicate.return_value = ("output", "error")
|
|
|
|
result = utils.execute("bash", "-c", "echo -n hi")
|
|
self.assertEqual(0, result)
|
|
self.assertEqual("bash -c 'echo -n hi'\n", mock_stdout.getvalue())
|
|
self.assertEqual(1, process.wait.call_count)
|
|
process.kill.assert_not_called()
|
|
|
|
@patch("sys.stdout", new_callable=StringIO)
|
|
@patch("subprocess.Popen", autospec=True)
|
|
def test_execute_exit_with_error(
|
|
self, mock_popen: MagicMock, mock_stdout: StringIO
|
|
) -> None:
|
|
process = mock_popen.return_value
|
|
mock_popen.return_value.__enter__.return_value = process
|
|
process.wait.return_value = 1
|
|
process.communicate.return_value = ("output", "error")
|
|
|
|
self.assertRaises(exceptions.TutorError, utils.execute, "echo", "")
|
|
self.assertEqual("echo ''\n", mock_stdout.getvalue())
|
|
self.assertEqual(1, process.wait.call_count)
|
|
process.kill.assert_not_called()
|
|
|
|
@patch("sys.stdout", new_callable=StringIO)
|
|
@patch("subprocess.Popen", autospec=True)
|
|
def test_execute_throw_exception(
|
|
self, mock_popen: MagicMock, mock_stdout: StringIO
|
|
) -> None:
|
|
process = mock_popen.return_value
|
|
mock_popen.return_value.__enter__.return_value = process
|
|
process.wait.side_effect = ZeroDivisionError("Exception occurred.")
|
|
|
|
self.assertRaises(ZeroDivisionError, utils.execute, "echo", "")
|
|
self.assertEqual("echo ''\n", mock_stdout.getvalue())
|
|
self.assertEqual(2, process.wait.call_count)
|
|
process.kill.assert_called_once()
|
|
|
|
@patch("sys.stdout", new_callable=StringIO)
|
|
@patch("subprocess.Popen", autospec=True)
|
|
def test_execute_keyboard_interrupt(
|
|
self, mock_popen: MagicMock, mock_stdout: StringIO
|
|
) -> None:
|
|
process = mock_popen.return_value
|
|
mock_popen.return_value.__enter__.return_value = process
|
|
process.wait.side_effect = KeyboardInterrupt()
|
|
|
|
with self.assertRaises(KeyboardInterrupt):
|
|
utils.execute("echo", "")
|
|
output = mock_stdout.getvalue()
|
|
self.assertIn("echo", output)
|
|
self.assertEqual(2, process.wait.call_count)
|
|
process.kill.assert_called_once()
|
|
|
|
@patch("sys.platform", "win32")
|
|
def test_check_macos_docker_memory_win32_should_skip(self) -> None:
|
|
utils.check_macos_docker_memory()
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin(self) -> None:
|
|
with patch("tutor.utils.open", mock_open(read_data='{"memoryMiB": 4096}')):
|
|
utils.check_macos_docker_memory()
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_filenotfound(self) -> None:
|
|
with patch("tutor.utils.open", mock_open()) as mock_open_settings:
|
|
mock_open_settings.return_value.__enter__.side_effect = FileNotFoundError
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertIn("Error accessing Docker settings file", e.exception.args[0])
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_json_decode_error(self) -> None:
|
|
with patch("tutor.utils.open", mock_open(read_data="invalid")):
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertIn("invalid JSON", e.exception.args[0])
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_key_error(self) -> None:
|
|
with patch("tutor.utils.open", mock_open(read_data="{}")):
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertIn("key 'memoryMiB' not found", e.exception.args[0])
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_type_error(self) -> None:
|
|
with patch(
|
|
"tutor.utils.open", mock_open(read_data='{"memoryMiB": "invalidstring"}')
|
|
):
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertIn("Unexpected JSON data", e.exception.args[0])
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_insufficient_memory(self) -> None:
|
|
with patch("tutor.utils.open", mock_open(read_data='{"memoryMiB": 4095}')):
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertEqual(
|
|
"Docker is configured to allocate 4095 MiB RAM, less than the recommended 4096 MiB",
|
|
e.exception.args[0],
|
|
)
|
|
|
|
@patch("sys.platform", "darwin")
|
|
def test_check_macos_docker_memory_darwin_encoding_error(self) -> None:
|
|
with patch("tutor.utils.open", mock_open()) as mock_open_settings:
|
|
mock_open_settings.return_value.__enter__.side_effect = TypeError
|
|
with self.assertRaises(exceptions.TutorError) as e:
|
|
utils.check_macos_docker_memory()
|
|
self.assertIn("Text encoding error", e.exception.args[0])
|