From 2fa3bfbad879d1f714e588019f6e7ea790f1c4ff Mon Sep 17 00:00:00 2001 From: Boyan <36108495+confestim@users.noreply.github.com> Date: Mon, 18 Nov 2024 20:03:08 +0100 Subject: [PATCH] Moved all downloading logic, submitting and status/tc table parsing into base Group class Also created 2 methods to get files and test cases without downloading --- temmies/group.py | 315 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 245 insertions(+), 70 deletions(-) diff --git a/temmies/group.py b/temmies/group.py index 25119ee..04dd2fe 100644 --- a/temmies/group.py +++ b/temmies/group.py @@ -1,98 +1,88 @@ -# temmies/group.py - from bs4 import BeautifulSoup from requests import Session +import os from typing import Optional, Union, Dict from .exceptions.illegal_action import IllegalAction from .submission import Submission - class Group: """ - Base class for Course and ExerciseGroup. + Represents an item in Themis, which can be either a folder (non-submittable) or an assignment (submittable). """ - def __init__(self, url: str, name: str, session: Session, parent=None, full: bool = False, classes=None): - self.url = url - self.name = name - self._session = session - self._parent = parent - self._full = full - self._request = self._session.get(self.url) - self._raw = BeautifulSoup(self._request.text, "lxml") - self.classes = classes or [] + def __init__(self, session, path: str, title: str, parent=None, submitable: bool = False): + self.session = session + self.path = path # e.g., '/2023-2024/adinc-ai/labs' + self.title = title + self.parent = parent + self.submitable = submitable + self.base_url = "https://themis.housing.rug.nl" + self.api_url = f"{self.base_url}/api/navigation{self.path}" + self.classes = [] - def __str__(self): - return f"Group {self.name}" + # Adjust URL construction to include '/course' when accessing HTML pages + if not self.path.startswith('/course/'): + group_url = f"{self.base_url}/course{self.path}" + else: + group_url = f"{self.base_url}{self.path}" - def get_groups(self, full: bool = False): + # Fetch the page and parse it + response = self.session.get(group_url) + if response.status_code != 200: + raise ConnectionError(f"Failed to retrieve page for '{self.title}'. Tried {group_url}") + self._raw = BeautifulSoup(response.text, "lxml") + + + def get_items(self) -> list: """ - Get all groups (exercises and folders) within this group. + Get all items (groups and assignments) under this group. """ section = self._raw.find("div", class_="ass-children") if not section: return [] entries = section.find_all("a", href=True) - groups = [] + items = [] for x in entries: href = x['href'] name = x.text.strip() classes = x.get('class', []) - group = self.create_group( - url=f"https://themis.housing.rug.nl{href}", - name=name, - session=self._session, + submitable = "ass-submitable" in classes + item = Group( + session=self.session, + path=href, + title=name, parent=self, - full=full, - classes=classes + submitable=submitable ) - groups.append(group) - return groups + items.append(item) + return items - def get_group(self, name: str, full: bool = False): + def get_item_by_title(self, title: str): """ - Get a single group by name. + Get a single item by its title, case-insensitive. """ - group_link = self._raw.find("a", text=name) - if not group_link: - raise IllegalAction(f"No such group found: {name}") - href = group_link['href'] - classes = group_link.get('class', []) - return self.create_group( - url=f"https://themis.housing.rug.nl{href}", - name=name, - session=self._session, - parent=self, - full=full, - classes=classes - ) + items = self.get_items() + for item in items: + if (item.title.lower() == title.lower()) or (item.path.split("/")[-1] == title): + return item + raise ValueError(f"Item '{title}' not found under {self.title}.") - def create_group(self, url: str, name: str, session: Session, parent, full: bool, classes=None): - """ - Factory method to create a group. Subclasses must implement this. - """ - raise NotImplementedError("Subclasses must implement create_group") - def get_status(self, text: bool = False) -> Union[Dict[str, Union[str, Submission]], None]: + def get_status(self, text: bool = False) -> Union[Dict[str, Union[str, 'Submission']], None]: """ Get the status of the current group, if available. - - Args: - text (bool): If True, returns text representation of the status. - Otherwise, creates `Submission` objects for applicable fields. - - Returns: - dict[str, Union[str, Submission]] | None: The status data for the group, - with `Submission` objects for links. """ status_link = self._raw.find("a", text="Status") if not status_link: - raise IllegalAction("Status information is not available for this group.") + raise ValueError("Status information is not available for this group.") - status_url = f"https://themis.housing.rug.nl{status_link['href']}" - r = self._session.get(status_url) - soup = BeautifulSoup(r.text, "lxml") + status_url = f"{self.base_url}{status_link['href']}" + response = self.session.get(status_url) + if response.status_code != 200: + raise ConnectionError(f"Failed to retrieve status page for '{self.title}'.") + + soup = BeautifulSoup(response.text, "lxml") section = soup.find("div", class_="cfg-container") if not section: @@ -100,17 +90,9 @@ class Group: return self.__parse_status_section(section, text) - def __parse_status_section(self, section: BeautifulSoup, text: bool) -> Dict[str, Union[str, Submission]]: + def __parse_status_section(self, section: BeautifulSoup, text: bool) -> Dict[str, Union[str, 'Submission']]: """ Parse the status section of the group and clean up keys. - - Args: - section (BeautifulSoup): The HTML section containing the status information. - text (bool): Whether to return text representation. - - Returns: - dict[str, Union[str, Submission]]: Parsed and cleaned status information, - with `Submission` objects for links. """ key_mapping = { "leading the submission that counts towards the grade": "leading", @@ -135,9 +117,202 @@ class Group: # Process value link = value_element.find("a", href=True) if link and not text: - submission_url = link["href"] - parsed[key] = Submission(submission_url, self._session) + href = link["href"] + # Construct full URL + if href.startswith("/"): + submission_url = href + elif href.startswith("http"): + submission_url = href.replace("https://themis.housing.rug.nl", "") + else: + print(f"Invalid href '{href}' found in status page.") + continue # Skip this entry if href is invalid + + # Instantiate Submission with submission_url and session + submission = Submission(submission_url, self.session) + parsed[key] = submission else: parsed[key] = value_element.get_text(separator=" ").strip() return parsed + + + def get_test_cases(self) -> list[Dict[str, str]]: + """ + Get all test cases for this assignment. + """ + if not self.submitable: + raise ValueError(f"No test cases for non-submittable item '{self.title}'.") + + sections = self._raw.find_all("div", class_="subsec round shade") + tcs = [] + for div in sections: + res = div.find("h4", class_="info") + if res and "Test cases" in res.text: + for case in div.find_all("div", class_="cfg-line"): + link = case.find("a") + if link: + tcs.append({ + 'title': link.text.strip(), + 'path': link['href'] + }) + return tcs + + def download_tcs(self, path=".") -> list[str]: + """ + Download all test cases for this assignment. + """ + test_cases = self.get_test_cases() + downloaded = [] + for tc in test_cases: + url = f"{self.base_url}{tc['path']}" + print(f"Downloading {tc['title']}") + response = self.session.get(url) + if response.status_code == 200: + tc_filename = os.path.join(path, tc['title']) + with open(tc_filename, 'wb') as f: + f.write(response.content) + downloaded.append(tc_filename) + else: + print(f"Failed to download test case '{tc['title']}'") + return downloaded + + def get_files(self) -> list[Dict[str, str]]: + """ + Get all downloadable files for this assignment. + """ + details = self._raw.find("div", id=lambda x: x and x.startswith("details")) + if not details: + return [] + + cfg_lines = details.find_all("div", class_="cfg-line") + files = [] + + for line in cfg_lines: + key = line.find("span", class_="cfg-key") + if key and "Downloads" in key.text.strip(): + vals = line.find_all("span", class_="cfg-val") + for val in vals: + links = val.find_all("a") + for link in links: + files.append({ + 'title': link.text.strip(), + 'path': link['href'] + }) + return files + + def download_files(self, path=".") -> list[str]: + """ + Download all files available for this assignment. + """ + files = self.get_files() + downloaded = [] + for file in files: + print(f"Downloading file '{file['title']}'") + url = f"{self.base_url}{file['path']}" + response = self.session.get(url) + if response.status_code == 200: + file_filename = os.path.join(path, file['title']) + with open(file_filename, 'wb') as f: + f.write(response.content) + downloaded.append(file_filename) + else: + print(f"Failed to download file '{file['title']}'") + return downloaded + + def submit(self, files: list[str], judge: bool = True, wait: bool = True, silent: bool = True) -> Optional[dict]: + """ + Submit files to this assignment. + Returns a dictionary of test case results or None if wait is False. + """ + if not self.submitable: + raise ValueError(f"Cannot submit to non-submittable item '{self.title}'.") + + form = self._raw.find("form") + if not form: + raise ValueError("Submission form not found.") + + url = f"{self.base_url}{form['action']}" + file_types = loads(form.get("data-suffixes", "{}")) + + if isinstance(files, str): + files = [files] + + packaged_files = [] + data = {} + found_type = "" + + for file in files: + for suffix, lang in file_types.items(): + if file.endswith(suffix): + found_type = lang + break + if not found_type: + print("WARNING: File type not recognized") + + with open(file, "rb") as f: + packaged_files.append((found_type, (file, f.read()))) + + data = { + "judgenow": "true" if judge else "false", + "judgeLanguage": found_type if found_type else "none" + } + + if not silent: + print(f"Submitting to {self.title}") + for file in files: + print(f"• {file}") + + resp = self.session.post(url, files=packaged_files, data=data) + + if not wait or not judge: + return resp.url if "@submissions" in resp.url else None + + return self.__wait_for_result(resp.url, not silent, []) + + def __wait_for_result(self, url: str, verbose: bool, __printed: list) -> dict: + """ + Wait for the submission result and return the test case results. + """ + r = self.session.get(url) + soup = BeautifulSoup(r.text, "lxml") + return self.__parse_table(soup, url, verbose, __printed) + + def __parse_table(self, soup: BeautifulSoup, url: str, verbose: bool, __printed: list) -> dict: + """ + Parse the results table from the submission result page. + """ + cases = soup.find_all("tr", class_="sub-casetop") + fail_pass = {} + for case in cases: + name = case.find("td", class_="sub-casename").text + status = case.find("td", class_="status-icon") + + if "pending" in status.get("class"): + sleep(1) + return self.__wait_for_result(url, verbose, __printed) + + statuses = { + "Passed": ("✅", True), + "Wrong output": ("❌", False), + "No status": ("🐛", None), + "error": ("🐛", None), + } + + found = False + for k, v in statuses.items(): + if k in status.text: + found = True + if verbose and int(name) not in __printed: + print(f"{name}: {v[0]}") + fail_pass[int(name)] = v[1] + break + if not found: + fail_pass[int(name)] = None + if verbose and int(name) not in __printed: + print(f"{name}: Unrecognized status: {status.text}") + + __printed.append(int(name)) + return fail_pass + + def __str__(self): + return f"Group({self.title}, submitable={self.submitable})"