Moved all downloading logic, submitting and status/tc table parsing into base Group class

Also created 2 methods to get files and test cases without downloading
This commit is contained in:
Boyan 2024-11-18 20:03:08 +01:00
parent 9f99df54d8
commit 2fa3bfbad8

View File

@ -1,98 +1,88 @@
# temmies/group.py
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from requests import Session from requests import Session
import os
from typing import Optional, Union, Dict from typing import Optional, Union, Dict
from .exceptions.illegal_action import IllegalAction from .exceptions.illegal_action import IllegalAction
from .submission import Submission from .submission import Submission
class Group: class Group:
""" """
Base class for Course and ExerciseGroup. Represents an item in Themis, which can be either a folder (non-submittable) or an assignment (submittable).
""" """
def __init__(self, url: str, name: str, session: Session, parent=None, full: bool = False, classes=None): def __init__(self, session, path: str, title: str, parent=None, submitable: bool = False):
self.url = url self.session = session
self.name = name self.path = path # e.g., '/2023-2024/adinc-ai/labs'
self._session = session self.title = title
self._parent = parent self.parent = parent
self._full = full self.submitable = submitable
self._request = self._session.get(self.url) self.base_url = "https://themis.housing.rug.nl"
self._raw = BeautifulSoup(self._request.text, "lxml") self.api_url = f"{self.base_url}/api/navigation{self.path}"
self.classes = classes or [] self.classes = []
def __str__(self): # Adjust URL construction to include '/course' when accessing HTML pages
return f"Group {self.name}" if not self.path.startswith('/course/'):
group_url = f"{self.base_url}/course{self.path}"
else:
group_url = f"{self.base_url}{self.path}"
def get_groups(self, full: bool = False): # Fetch the page and parse it
response = self.session.get(group_url)
if response.status_code != 200:
raise ConnectionError(f"Failed to retrieve page for '{self.title}'. Tried {group_url}")
self._raw = BeautifulSoup(response.text, "lxml")
def get_items(self) -> list:
""" """
Get all groups (exercises and folders) within this group. Get all items (groups and assignments) under this group.
""" """
section = self._raw.find("div", class_="ass-children") section = self._raw.find("div", class_="ass-children")
if not section: if not section:
return [] return []
entries = section.find_all("a", href=True) entries = section.find_all("a", href=True)
groups = [] items = []
for x in entries: for x in entries:
href = x['href'] href = x['href']
name = x.text.strip() name = x.text.strip()
classes = x.get('class', []) classes = x.get('class', [])
group = self.create_group( submitable = "ass-submitable" in classes
url=f"https://themis.housing.rug.nl{href}", item = Group(
name=name, session=self.session,
session=self._session, path=href,
title=name,
parent=self, parent=self,
full=full, submitable=submitable
classes=classes
) )
groups.append(group) items.append(item)
return groups return items
def get_group(self, name: str, full: bool = False): def get_item_by_title(self, title: str):
""" """
Get a single group by name. Get a single item by its title, case-insensitive.
""" """
group_link = self._raw.find("a", text=name) items = self.get_items()
if not group_link: for item in items:
raise IllegalAction(f"No such group found: {name}") if (item.title.lower() == title.lower()) or (item.path.split("/")[-1] == title):
href = group_link['href'] return item
classes = group_link.get('class', []) raise ValueError(f"Item '{title}' not found under {self.title}.")
return self.create_group(
url=f"https://themis.housing.rug.nl{href}",
name=name,
session=self._session,
parent=self,
full=full,
classes=classes
)
def create_group(self, url: str, name: str, session: Session, parent, full: bool, classes=None):
"""
Factory method to create a group. Subclasses must implement this.
"""
raise NotImplementedError("Subclasses must implement create_group")
def get_status(self, text: bool = False) -> Union[Dict[str, Union[str, Submission]], None]: def get_status(self, text: bool = False) -> Union[Dict[str, Union[str, 'Submission']], None]:
""" """
Get the status of the current group, if available. Get the status of the current group, if available.
Args:
text (bool): If True, returns text representation of the status.
Otherwise, creates `Submission` objects for applicable fields.
Returns:
dict[str, Union[str, Submission]] | None: The status data for the group,
with `Submission` objects for links.
""" """
status_link = self._raw.find("a", text="Status") status_link = self._raw.find("a", text="Status")
if not status_link: if not status_link:
raise IllegalAction("Status information is not available for this group.") raise ValueError("Status information is not available for this group.")
status_url = f"https://themis.housing.rug.nl{status_link['href']}" status_url = f"{self.base_url}{status_link['href']}"
r = self._session.get(status_url) response = self.session.get(status_url)
soup = BeautifulSoup(r.text, "lxml") if response.status_code != 200:
raise ConnectionError(f"Failed to retrieve status page for '{self.title}'.")
soup = BeautifulSoup(response.text, "lxml")
section = soup.find("div", class_="cfg-container") section = soup.find("div", class_="cfg-container")
if not section: if not section:
@ -100,17 +90,9 @@ class Group:
return self.__parse_status_section(section, text) return self.__parse_status_section(section, text)
def __parse_status_section(self, section: BeautifulSoup, text: bool) -> Dict[str, Union[str, Submission]]: def __parse_status_section(self, section: BeautifulSoup, text: bool) -> Dict[str, Union[str, 'Submission']]:
""" """
Parse the status section of the group and clean up keys. Parse the status section of the group and clean up keys.
Args:
section (BeautifulSoup): The HTML section containing the status information.
text (bool): Whether to return text representation.
Returns:
dict[str, Union[str, Submission]]: Parsed and cleaned status information,
with `Submission` objects for links.
""" """
key_mapping = { key_mapping = {
"leading the submission that counts towards the grade": "leading", "leading the submission that counts towards the grade": "leading",
@ -135,9 +117,202 @@ class Group:
# Process value # Process value
link = value_element.find("a", href=True) link = value_element.find("a", href=True)
if link and not text: if link and not text:
submission_url = link["href"] href = link["href"]
parsed[key] = Submission(submission_url, self._session) # Construct full URL
if href.startswith("/"):
submission_url = href
elif href.startswith("http"):
submission_url = href.replace("https://themis.housing.rug.nl", "")
else:
print(f"Invalid href '{href}' found in status page.")
continue # Skip this entry if href is invalid
# Instantiate Submission with submission_url and session
submission = Submission(submission_url, self.session)
parsed[key] = submission
else: else:
parsed[key] = value_element.get_text(separator=" ").strip() parsed[key] = value_element.get_text(separator=" ").strip()
return parsed return parsed
def get_test_cases(self) -> list[Dict[str, str]]:
"""
Get all test cases for this assignment.
"""
if not self.submitable:
raise ValueError(f"No test cases for non-submittable item '{self.title}'.")
sections = self._raw.find_all("div", class_="subsec round shade")
tcs = []
for div in sections:
res = div.find("h4", class_="info")
if res and "Test cases" in res.text:
for case in div.find_all("div", class_="cfg-line"):
link = case.find("a")
if link:
tcs.append({
'title': link.text.strip(),
'path': link['href']
})
return tcs
def download_tcs(self, path=".") -> list[str]:
"""
Download all test cases for this assignment.
"""
test_cases = self.get_test_cases()
downloaded = []
for tc in test_cases:
url = f"{self.base_url}{tc['path']}"
print(f"Downloading {tc['title']}")
response = self.session.get(url)
if response.status_code == 200:
tc_filename = os.path.join(path, tc['title'])
with open(tc_filename, 'wb') as f:
f.write(response.content)
downloaded.append(tc_filename)
else:
print(f"Failed to download test case '{tc['title']}'")
return downloaded
def get_files(self) -> list[Dict[str, str]]:
"""
Get all downloadable files for this assignment.
"""
details = self._raw.find("div", id=lambda x: x and x.startswith("details"))
if not details:
return []
cfg_lines = details.find_all("div", class_="cfg-line")
files = []
for line in cfg_lines:
key = line.find("span", class_="cfg-key")
if key and "Downloads" in key.text.strip():
vals = line.find_all("span", class_="cfg-val")
for val in vals:
links = val.find_all("a")
for link in links:
files.append({
'title': link.text.strip(),
'path': link['href']
})
return files
def download_files(self, path=".") -> list[str]:
"""
Download all files available for this assignment.
"""
files = self.get_files()
downloaded = []
for file in files:
print(f"Downloading file '{file['title']}'")
url = f"{self.base_url}{file['path']}"
response = self.session.get(url)
if response.status_code == 200:
file_filename = os.path.join(path, file['title'])
with open(file_filename, 'wb') as f:
f.write(response.content)
downloaded.append(file_filename)
else:
print(f"Failed to download file '{file['title']}'")
return downloaded
def submit(self, files: list[str], judge: bool = True, wait: bool = True, silent: bool = True) -> Optional[dict]:
"""
Submit files to this assignment.
Returns a dictionary of test case results or None if wait is False.
"""
if not self.submitable:
raise ValueError(f"Cannot submit to non-submittable item '{self.title}'.")
form = self._raw.find("form")
if not form:
raise ValueError("Submission form not found.")
url = f"{self.base_url}{form['action']}"
file_types = loads(form.get("data-suffixes", "{}"))
if isinstance(files, str):
files = [files]
packaged_files = []
data = {}
found_type = ""
for file in files:
for suffix, lang in file_types.items():
if file.endswith(suffix):
found_type = lang
break
if not found_type:
print("WARNING: File type not recognized")
with open(file, "rb") as f:
packaged_files.append((found_type, (file, f.read())))
data = {
"judgenow": "true" if judge else "false",
"judgeLanguage": found_type if found_type else "none"
}
if not silent:
print(f"Submitting to {self.title}")
for file in files:
print(f"{file}")
resp = self.session.post(url, files=packaged_files, data=data)
if not wait or not judge:
return resp.url if "@submissions" in resp.url else None
return self.__wait_for_result(resp.url, not silent, [])
def __wait_for_result(self, url: str, verbose: bool, __printed: list) -> dict:
"""
Wait for the submission result and return the test case results.
"""
r = self.session.get(url)
soup = BeautifulSoup(r.text, "lxml")
return self.__parse_table(soup, url, verbose, __printed)
def __parse_table(self, soup: BeautifulSoup, url: str, verbose: bool, __printed: list) -> dict:
"""
Parse the results table from the submission result page.
"""
cases = soup.find_all("tr", class_="sub-casetop")
fail_pass = {}
for case in cases:
name = case.find("td", class_="sub-casename").text
status = case.find("td", class_="status-icon")
if "pending" in status.get("class"):
sleep(1)
return self.__wait_for_result(url, verbose, __printed)
statuses = {
"Passed": ("", True),
"Wrong output": ("", False),
"No status": ("🐛", None),
"error": ("🐛", None),
}
found = False
for k, v in statuses.items():
if k in status.text:
found = True
if verbose and int(name) not in __printed:
print(f"{name}: {v[0]}")
fail_pass[int(name)] = v[1]
break
if not found:
fail_pass[int(name)] = None
if verbose and int(name) not in __printed:
print(f"{name}: Unrecognized status: {status.text}")
__printed.append(int(name))
return fail_pass
def __str__(self):
return f"Group({self.title}, submitable={self.submitable})"