Refactor to use base class group for ExerciseGroup and Course to avoid repeated logic

This commit is contained in:
Boyan 2024-11-17 19:35:16 +01:00
parent 9d92db4644
commit aab574cdb0
5 changed files with 272 additions and 346 deletions

View File

@ -1 +1,7 @@
from .themis import Themis
from .themis import Themis
from .year import Year
from .course import Course
from .exercise_group import ExerciseGroup
from .submission import Submission
__all__ = ['Themis', 'Year', 'Course', 'ExerciseGroup', 'Submission']

View File

@ -1,72 +1,28 @@
"""
Houses the Course class which is used to represent a course in a year.
"""
from bs4 import BeautifulSoup
from requests import Session
from .group import Group
from .exercise_group import ExerciseGroup
from requests import Session
from .exceptions.course_unavailable import CourseUnavailable
from .exceptions.illegal_action import IllegalAction
class Course:
class Course(Group):
"""
get_groups: Get all groups in a course. Set full to True to get all subgroups.
get_group: Get a group by name. Set full to True to get all subgroups.
Represents a course in a given academic year.
"""
def __init__(self, url: str, name: str, session: Session, parent):
self.url = url
self.name = name
self.__session = session
self.__parent = parent
self.__request = self.__session.get(self.url)
self.__raw = BeautifulSoup(self.__request.text, "lxml")
self.__course_available(self.__session.get(self.url))
def __init__(self, url: str, name: str, session, parent):
super().__init__(url, name, session, parent=parent, full=False)
self.__course_available(self._request)
def __str__(self):
return f"Course {self.name} in year {self.__parent.year}"
return f"Course {self.name} in year {self._parent.year}"
def __course_available(self, r):
# Check if we got an error
# print(self.url)
if "Something went wrong" in r.text:
def __course_available(self, response):
if "Something went wrong" in response.text:
raise CourseUnavailable(
message="'Something went wrong'. Course most likely not found. "
message="'Something went wrong'. Course most likely not found."
)
def get_groups(self, full: bool = False) -> list[ExerciseGroup]:
def create_group(self, url: str, name: str, session: Session, parent, full: bool, classes=None):
"""
get_groups(full: bool = False) -> list[ExerciseGroup]
Get all groups in a course. Set full to True to get all subgroups.
Create an instance of ExerciseGroup for subgroups within a Course.
"""
section = self.__raw.find("div", class_="ass-children")
entries = section.find_all("a", href=True)
return [
ExerciseGroup(
f"https://themis.housing.rug.nl{x['href']}",
x,
self.__session,
full
)
for x in entries
]
# BAD: Repeated code!!!!
def get_group(self, name: str, full: bool = False) -> ExerciseGroup:
"""
get_group(name:str, full:bool = False) -> ExerciseGroup
Get a single group by name. Set full to True to get all subgroups as well.
"""
group = self.__raw.find("a", text=name)
if not group:
raise IllegalAction(message=f"No such group found: {name}")
return ExerciseGroup(
f"https://themis.housing.rug.nl{group['href']}",
group,
self.__session,
full
)
return ExerciseGroup(url, name, session, parent, full, classes)

View File

@ -1,59 +1,39 @@
"""
Houses the ExerciseGroup class.
Represents a group of exercises or a single exercise.
"""
from json import loads
from time import sleep
from bs4 import BeautifulSoup
from .group import Group
from .exceptions.illegal_action import IllegalAction
from .submission import Submission
from json import loads
from time import sleep
from typing import Optional
from bs4 import BeautifulSoup
class ExerciseGroup:
class ExerciseGroup(Group):
"""
Methods:
`submit`: submit to an exercise
`get_group`: get a group by name
`download_tcs`: download test cases
`download_files`: download files
`find_status`: get status for an exercise by name
`get_all_statuses`: get all available statuses(useful for multiple exercises)
`get_status(idx=0)`: get the available statuses for the exercise. Set the idx if you want to get a specific submission.
Attributes:
`am_exercise`: returns bool which tells you if the instance is an exercise
`folders`: folders in the folder
`exercises`: exercises in the folder
`test_cases`: test cases in the exercise(if it is an exercise)
`files`: files in the exercise/folder
Represents a group of exercises or a single exercise.
"""
def __init__(self, url: str, soup:BeautifulSoup, session, full: bool):
self.url = url
self.name = soup.text
self.__prev_raw = soup
self.__session = session
self.__request = self.__session.get(self.url)
self.__raw = BeautifulSoup(self.__request.text, "lxml")
self.__full = full
def __init__(self, url: str, name: str, session, parent=None, full: bool = False, classes=None):
super().__init__(url, name, session, parent=parent, full=full, classes=classes)
self.am_exercise = "ass-submitable" in self.classes
@property
def am_exercise(self) -> bool:
return "ass-submitable" in self.__prev_raw["class"]
def create_group(self, url: str, name: str, session, parent, full: bool, classes=None):
"""
Create an instance of ExerciseGroup for subgroups.
"""
return ExerciseGroup(url, name, session, parent, full, classes)
# Test cases
@property
def test_cases(self) -> list[str]:
section = self.__raw.find_all("div", class_="subsec round shade")
tcs = []
for div in section:
res = div.find("h4", class_="info")
if not res:
continue
"""
Get all test cases for this exercise.
"""
if not self.am_exercise:
return []
if "Test cases" in res.text:
sections = self._raw.find_all("div", class_="subsec round shade")
tcs = []
for div in sections:
res = div.find("h4", class_="info")
if res and "Test cases" in res.text:
for case in div.find_all("div", class_="cfg-line"):
if link := case.find("a"):
tcs.append(link)
@ -61,143 +41,121 @@ class ExerciseGroup:
def download_tcs(self, path=".") -> list[str]:
"""
download_tcs(path=".") -> list[str]
Downloads every test case available from a given exercise. `path` defaults to '.'.
Download all test cases for this exercise.
"""
if not self.am_exercise:
raise IllegalAction(message="You are downloading test cases from a folder.")
raise IllegalAction("You are downloading test cases from a folder.")
for tc in self.test_cases:
url = f"https://themis.housing.rug.nl{tc['href']}"
print(f"Downloading {tc.text}")
# download the files
with open(f"{path}/{tc.text}", "wb") as f:
f.write(self.__session.get(url).content)
f.write(self._session.get(url).content)
return self.test_cases
# Files
@property
def files(self) -> list[str]:
details = self.__raw.find("div", id=lambda x: x and x.startswith("details"))
"""
Get all downloadable files for this exercise or group.
"""
details = self._raw.find("div", id=lambda x: x and x.startswith("details"))
if not details:
return []
cfg_lines = details.find_all("div", class_="cfg-line")
link_list = []
for line in cfg_lines:
key = line.find("span", class_="cfg-key")
if key and "Downloads" in key.text.strip():
# Extract all links in the cfg-val span
links = line.find_all("span", class_="cfg-val")
for link in links:
a = link.find_all("a")
for i in a:
link_list.append(i)
a_tags = link.find_all("a")
for a in a_tags:
link_list.append(a)
return link_list
def download_files(self, path=".") -> list[str]:
"""
download_files(path=".") -> list[str]
Downloads every file available from a given exercise/folder. `path` defaults to '.'.
Download all files available for this exercise or group.
"""
for file in self.files:
print(f"Downloading file {file.text}")
url = f"https://themis.housing.rug.nl{file['href']}"
with open(f"{path}/{file.text}", "wb") as f:
f.write(self.__session.get(url).content)
f.write(self._session.get(url).content)
return self.files
@property
def exercises(self) -> list[str] | list["ExerciseGroup"]:
if self.am_exercise:
return self
section = self.__raw.find("div", class_="ass-children")
try:
submittables = section.find_all("a", class_="ass-submitable")
except AttributeError:
return []
if not self.__full:
return [(x.text, x["href"]) for x in submittables]
return [
ExerciseGroup(
f"https://themis.housing.rug.nl{x['href']}", x, self.__session, True
)
for x in submittables
]
@property
def folders(self) -> list[str] | list["ExerciseGroup"]:
section = self.__raw.find("div", class_="ass-children")
try:
folders = section.find_all("a", class_="ass-group")
except AttributeError:
return []
if not self.__full:
return [(x.text, x["href"]) for x in folders]
return [
ExerciseGroup(
f"https://themis.housing.rug.nl{x['href']}", x, self.__session, True
)
for x in folders
]
# Get by name
def get_group( # <- 🗿
self, name: str, full: bool = False, link: str = None
) -> "ExerciseGroup":
def submit(self, files: list[str], judge: bool = True, wait: bool = True, silent: bool = True) -> Optional[dict]:
"""
get_group(name:str, full:bool=False, link:str=None) -> ExerciseGroup | list[ExerciseGroup]
Get a single group by name.
Set `full` to True to get all subgroups as well.
Set `link` to directly fetch a group.
Submit files to this exercise.
Returns a dictionary of test case results or None if wait is False.
"""
if link:
return ExerciseGroup(link, self.__prev_raw, self.__session, full)
if not self.am_exercise:
raise IllegalAction("You cannot submit to this assignment.")
group = self.__raw.find("a", text=name)
if not group:
raise IllegalAction(message=f"No such group found: {name}")
form = self._raw.find("form")
if not form:
raise IllegalAction("Submission form not found.")
return ExerciseGroup(
f"https://themis.housing.rug.nl{group['href']}", group, self.__session, full
)
url = "https://themis.housing.rug.nl" + form["action"]
file_types = loads(form["data-suffixes"])
# Wait for result
def __wait_for_result(self, url: str, verbose: bool, __printed: list) -> None:
# This waits for result and returns a bundled info package
r = self.__session.get(url)
if isinstance(files, str):
files = [files]
packaged_files = []
data = {}
found_type = ""
for file in files:
for suffix, lang in file_types.items():
if file.endswith(suffix):
found_type = lang
break
if not found_type:
print("WARNING: File type not recognized")
with open(file, "rb") as f:
packaged_files.append((found_type, (file, f.read())))
data = {
"judgenow": "true" if judge else "false",
"judgeLanguage": found_type if found_type else "none"
}
if not silent:
print(f"Submitting to {self.name}")
for file in files:
print(f"{file}")
resp = self._session.post(url, files=packaged_files, data=data)
if not wait or not judge:
return resp.url if "@submissions" in resp.url else None
return self.__wait_for_result(resp.url, not silent, [])
def __wait_for_result(self, url: str, verbose: bool, __printed: list) -> dict:
"""
Wait for the submission result and return the test case results.
"""
r = self._session.get(url)
soup = BeautifulSoup(r.text, "lxml")
return self.__parse_table(soup, url, verbose, __printed)
# Account for judge
def __race_condition(self, url: str, verbose: bool) -> None:
self.__session.get(url.replace("submission", "judge"))
return self.__wait_for_result(url, verbose, [])
def __parse_table(
self, soup: BeautifulSoup, url: str, verbose: bool, __printed: list
) -> dict:
def __parse_table(self, soup: BeautifulSoup, url: str, verbose: bool, __printed: list) -> dict:
"""
Parse the results table from the submission result page.
"""
cases = soup.find_all("tr", class_="sub-casetop")
fail_pass = {}
i = 1
for case in cases:
name = case.find("td", class_="sub-casename").text
status = case.find("td", class_="status-icon")
if "pending" in status.get("class"):
return self.__race_condition(url, verbose)
# queued status-icon
if "queued" in status.get("class"):
sleep(1) # <- 🗿
sleep(1)
return self.__wait_for_result(url, verbose, __printed)
statuses = {
@ -207,7 +165,6 @@ class ExerciseGroup:
"error": ("🐛", None),
}
# Printing and storing
found = False
for k, v in statuses.items():
if k in status.text:
@ -222,126 +179,4 @@ class ExerciseGroup:
print(f"{name}: Unrecognized status: {status.text}")
__printed.append(int(name))
i += 1
return fail_pass
# Submit
def submit(
self, files: list, judge: bool = True, wait: bool = True, silent: bool = True
) -> dict | None:
"""
submit(files:list, judge:bool=True, wait:bool=True, silent:bool=True) -> dict | None
Submits given files to given exercise. Returns a dictionary of test cases and their status.
Set judge to False to not judge the submission.
Set wait to False to not wait for the result.
Set silent to False to print the results.
"""
form = self.__raw.find("form")
if not form:
raise IllegalAction(message="You cannot submit to this assignment.")
url = "https://themis.housing.rug.nl" + form["action"]
file_types = loads(form["data-suffixes"])
if isinstance(files, str):
temp = []
temp.append(files)
files = temp
packaged_files = []
data = {}
found_type = ""
for file in files:
for t in file_types:
if t in file:
found_type = file_types[t]
break
if not found_type:
print("WARNING: File type not recognized")
with open(file, "rb") as f:
packaged_files.append((found_type, (file, f.read())))
data = {"judgenow": "true" if judge else "false", "judgeLanguage": found_type if found_type else "none"}
if not silent:
print(f"Submitting to {self.name}")
for file in files:
print(f"{file}")
resp = self.__session.post(url, files=packaged_files, data=data)
if not wait or not judge:
return resp.url if "@submissions" in resp.url else None
return self.__wait_for_result(resp.url, not silent, [])
def __status_sections(self) -> list[BeautifulSoup]:
r = self.__session.get("https://themis.housing.rug.nl" + self.__raw.find("a", text="Status")["href"])
soup = BeautifulSoup(r.text, "html.parser")
sections = soup.find_all('section', class_=lambda class_: class_ and 'status' in class_.split())
return sections
def __parse_section(self, section:BeautifulSoup, text) -> dict[str, Submission] | dict[str, str]:
# The section has a heading and a body. We only care about the body
body = section.find("div", class_="sec-body") # Find the body of the section
body = body.find("div", class_="subsec-container") # Find the subsec-container
body = body.find("div", class_="cfg-container")
# Parse the cfg-container
parsed = {}
# Submission instances go here
submissions = {}
cfg_lines = body.find_all("div", class_="cfg-line")
for line in cfg_lines:
key = line.find("span", class_="cfg-key").text.strip().split("\n")[0].replace(":", "").lower()
value = line.find("span", class_="cfg-val").text.strip()
# If there is a span with class tip in the key, it means that the value is a link to a submission
if tip := line.find("span", class_="tip"):
value = line.find("a")["href"]
if not text:
submissions[key.split("\n")[0].lower().replace(" ", "_")] = Submission(value, self.__session)
parsed[key] = value
if text:
return parsed
return (parsed, submissions)
# I assume that the user would usually request submissions for an assignment,
# so I will add a default parameter to the method.
def get_status(self, section:list[BeautifulSoup]=None, text:bool=False) -> dict[str, Submission] | dict[str, str]:
"""Get the available submissions for the exercise.
Set text to True to get the text representation of the submission."""
if not section:
section = self.__status_sections()
try:
section = section[0] # When looking at a single exercise, there is only one status section
except IndexError as exc:
raise IllegalAction("Invalid status") from exc
return self.__parse_section(section, text)
def get_all_statuses(self, text:bool=False) -> list[dict[str, str]] | list[dict[str, Submission]]:
""" Parses every visible status section. """
# This is useless for singular exercises, but if you want the submissions for multiple exercises, you can use this.
statuses = []
for section in self.__status_sections():
if parse := self.__parse_section(section, text):
# Find name of the exercise
name = section.find("h3").text.replace("Status: ", "").replace("\n", "").replace("\t", "")
statuses.append((name,parse))
return statuses
def find_status(self, name:str, text:bool=False) -> dict[str, Submission] | dict[str, str] | None:
""" Find a status block for an exercise by name. """
# Find a section which has h3 with the name
for section in self.__status_sections():
if section.find("h3").text.replace("Status: ", "").replace("\n", "").replace("\t", "") == name:
return self.__parse_section(section, text)

143
temmies/group.py Normal file
View File

@ -0,0 +1,143 @@
# temmies/group.py
from bs4 import BeautifulSoup
from requests import Session
from typing import Optional, Union, Dict
from .exceptions.illegal_action import IllegalAction
from .submission import Submission
class Group:
"""
Base class for Course and ExerciseGroup.
"""
def __init__(self, url: str, name: str, session: Session, parent=None, full: bool = False, classes=None):
self.url = url
self.name = name
self._session = session
self._parent = parent
self._full = full
self._request = self._session.get(self.url)
self._raw = BeautifulSoup(self._request.text, "lxml")
self.classes = classes or []
def __str__(self):
return f"Group {self.name}"
def get_groups(self, full: bool = False):
"""
Get all groups (exercises and folders) within this group.
"""
section = self._raw.find("div", class_="ass-children")
if not section:
return []
entries = section.find_all("a", href=True)
groups = []
for x in entries:
href = x['href']
name = x.text.strip()
classes = x.get('class', [])
group = self.create_group(
url=f"https://themis.housing.rug.nl{href}",
name=name,
session=self._session,
parent=self,
full=full,
classes=classes
)
groups.append(group)
return groups
def get_group(self, name: str, full: bool = False):
"""
Get a single group by name.
"""
group_link = self._raw.find("a", text=name)
if not group_link:
raise IllegalAction(f"No such group found: {name}")
href = group_link['href']
classes = group_link.get('class', [])
return self.create_group(
url=f"https://themis.housing.rug.nl{href}",
name=name,
session=self._session,
parent=self,
full=full,
classes=classes
)
def create_group(self, url: str, name: str, session: Session, parent, full: bool, classes=None):
"""
Factory method to create a group. Subclasses must implement this.
"""
raise NotImplementedError("Subclasses must implement create_group")
def get_status(self, text: bool = False) -> Union[Dict[str, Union[str, Submission]], None]:
"""
Get the status of the current group, if available.
Args:
text (bool): If True, returns text representation of the status.
Otherwise, creates `Submission` objects for applicable fields.
Returns:
dict[str, Union[str, Submission]] | None: The status data for the group,
with `Submission` objects for links.
"""
status_link = self._raw.find("a", text="Status")
if not status_link:
raise IllegalAction("Status information is not available for this group.")
status_url = f"https://themis.housing.rug.nl{status_link['href']}"
r = self._session.get(status_url)
soup = BeautifulSoup(r.text, "lxml")
section = soup.find("div", class_="cfg-container")
if not section:
return None
return self.__parse_status_section(section, text)
def __parse_status_section(self, section: BeautifulSoup, text: bool) -> Dict[str, Union[str, Submission]]:
"""
Parse the status section of the group and clean up keys.
Args:
section (BeautifulSoup): The HTML section containing the status information.
text (bool): Whether to return text representation.
Returns:
dict[str, Union[str, Submission]]: Parsed and cleaned status information,
with `Submission` objects for links.
"""
key_mapping = {
"leading the submission that counts towards the grade": "leading",
"best the latest submission with the best result": "best",
"latest the most recent submission": "latest",
"first pass the first submission that passed": "first_pass",
"last pass the last submission to pass before the deadline": "last_pass",
}
parsed = {}
cfg_lines = section.find_all("div", class_="cfg-line")
for line in cfg_lines:
key_element = line.find("span", class_="cfg-key")
value_element = line.find("span", class_="cfg-val")
if not key_element or not value_element:
continue
# Normalize key
raw_key = " ".join(key_element.get_text(separator=" ").strip().replace(":", "").lower().split())
key = key_mapping.get(raw_key, raw_key) # Use mapped key if available
# Process value
link = value_element.find("a", href=True)
if link and not text:
submission_url = link["href"]
parsed[key] = Submission(submission_url, self._session)
else:
parsed[key] = value_element.get_text(separator=" ").strip()
return parsed

View File

@ -1,66 +1,52 @@
"""
Class which represents an academic year.
"""
from bs4 import BeautifulSoup
from requests import Session
from .course import Course
from .exceptions.course_unavailable import CourseUnavailable
# Works
class Year:
"""
all_courses: Get all visible courses in a year
get_course: Get a course by name
Represents an academic year.
"""
def __init__(self, session: Session, start_year: int, end_year: int):
def __init__(self, session, start_year: int, end_year: int):
self.start = start_year
self.year = end_year
self.url = f"https://themis.housing.rug.nl/course/{self.start}-{self.year}"
self.__session = session
self._session = session
# Method to get the courses of the year
def all_courses(self, errors: bool = True) -> list[Course]:
"""
all_courses(self, errors: bool = False) -> list[Course]
Gets all visible courses in a year.
Set errors to False to not raise an error when a course is unavailable.
"""
r = self.__session.get(self.url)
r = self._session.get(self.url)
soup = BeautifulSoup(r.text, "lxml")
lis = soup.find_all("li", class_="large")
courses = []
for li in lis:
try:
suffix = li.a["href"].replace(f"course/{self.start}-{self.year}", "")
course_url = self.url + suffix
course_name = li.a.text.strip()
courses.append(
Course(self.url + suffix, li.a.text, self.__session, self)
Course(course_url, course_name, self._session, self)
)
except CourseUnavailable as exc:
if errors:
raise CourseUnavailable(
message=f"Course {li.a.text} in year {self.start}-{self.year} unavailable"
) from exc
print("Error with course", li.a.text)
continue
return courses
def get_course(self, name: str) -> Course:
"""
get_course(self, name: str) -> Course
Gets a course by name.
"""
# Get the course
r = self.__session.get(self.url)
r = self._session.get(self.url)
soup = BeautifulSoup(r.text, "lxml")
# Search by name
course = self.url + soup.find("a", text=name)["href"].replace(
f"course/{self.start}-{self.year}", ""
)
# Get the url and transform it into a course object
return Course(url=course, name=name, session=self.__session, parent=self)
course_link = soup.find("a", text=name)
if not course_link:
raise CourseUnavailable(f"No such course found: {name}")
suffix = course_link["href"].replace(f"course/{self.start}-{self.year}", "")
course_url = self.url + suffix
return Course(course_url, name, self._session, self)