From 4699557f0daa79cbdd534092aea9eed0c043f6f4 Mon Sep 17 00:00:00 2001 From: boyan <36108495+confestim@users.noreply.github.com> Date: Wed, 17 May 2023 18:19:31 +0300 Subject: [PATCH] :memo: + :wastebasket: + Notifications + Updater --- src/client/classes/Game.py | 31 ++- src/client/classes/Notify.py | 50 +++++ src/client/classes/PeriodicScraper.py | 32 +++- src/client/classes/Scraper.py | 260 +++++++++++++++++--------- src/client/classes/SelfUpdate.py | 18 +- src/client/classes/UI.py | 57 ++++-- src/client/config.ini | 2 +- src/client/main.py | 23 ++- 8 files changed, 341 insertions(+), 132 deletions(-) create mode 100644 src/client/classes/Notify.py diff --git a/src/client/classes/Game.py b/src/client/classes/Game.py index 6915e8c..fe26350 100644 --- a/src/client/classes/Game.py +++ b/src/client/classes/Game.py @@ -1,6 +1,7 @@ -import random -import time +from random import choice, randint +from time import sleep from .Util import WhatTheFuckDidYouDo +from logging import info class Game: def __init__(self, *, loop=None, connection,config): @@ -10,7 +11,7 @@ class Game: def list_all(self): # List all games - time.sleep(2) + sleep(1) self.connection.post("/lol-lobby/v1/custom-games/refresh", data={}) return self.connection.get("/lol-lobby/v1/custom-games").json() @@ -24,18 +25,35 @@ class Game: def join_by_name(self,name): # Joins a game given its name - return self.join_by_id(self.search(str(name))[0]) + try: + return self.join_by_id(self.search(str(name))[0]) + except IndexError: + info("Game not found, try again.") + def join_random(self): # Joins a random public game # mainly debug reasons - return self.join_by_id(random.choice([x["id"] for x in self.list_all() if not x["hasPassword"]])) + return self.join_by_id(choice([x["id"] for x in self.list_all() if not x["hasPassword"]])) + def in_game_with_name(self, lobby_name): + try: + name = self.connection.get("/lol-lobby/v2/lobby").json()["gameConfig"]["customLobbyName"] + if lobby_name == name: + return True + except Exception as e: + info(e) + return False + + def leave_with_creator(self, creator): + members = [x["summonerName"] for x in self.connection.get("/lol-lobby/v2/lobby/").json()["members"]] + if creator not in members: + self.leave() def create(self): # Creates game conn = self.connection - name = "CustoMM " + str(random.randint(100000, 10000000)) + name = "CustoMM " + str(randint(100000, 10000000)) game = conn.post("/lol-lobby/v2/lobby/", data={ "customGameLobby": { "configuration": { @@ -58,6 +76,7 @@ class Game: def leave(self): return self.connection.delete("/lol-lobby/v2/lobby") + def get_teams(self): # Gets team cfg = self.connection.get("/lol-lobby/v2/lobby").json()["gameConfig"] diff --git a/src/client/classes/Notify.py b/src/client/classes/Notify.py new file mode 100644 index 0000000..2fa30af --- /dev/null +++ b/src/client/classes/Notify.py @@ -0,0 +1,50 @@ +import pystray +from PIL import Image +import sys, os +from time import sleep + +class Notify(): + + def __init__(self, base_dir, exit_after=False): + """Standalone notification module + + Args: + base_dir (os.path): base dir of application + exit_after (bool, optional): Specify if you want to exit after a notif. Defaults to False. + """ + self.exit_after = exit_after + + image = Image.open(os.path.join(base_dir, os.path.join("assets", "icon.png"))) + + # Singular menu option + self.menu = pystray.Menu( + pystray.MenuItem( + "Exit", self.quit + ) + ) + + self.icon = pystray.Icon( + "name", image, "CustoMM", self.menu) + + self.icon.run_detached() + + self.notified = False + self.exit = False + + def notification(self, message:str): + """Notification method + + Args: + message (str): Message you want to send + """ + sleep(2) + # Not + if not self.notified: + self.icon.notify(message, title="custoMM") + if self.exit_after: + self.quit() + + def quit(self): + self.exit = True + self.icon.stop() + \ No newline at end of file diff --git a/src/client/classes/PeriodicScraper.py b/src/client/classes/PeriodicScraper.py index 2547798..7ba16bc 100644 --- a/src/client/classes/PeriodicScraper.py +++ b/src/client/classes/PeriodicScraper.py @@ -1,23 +1,41 @@ from time import sleep from threading import Thread from .Scraper import Scraper -import logging +from logging import info +from sys import exit +from configparser import ConfigParser class PeriodicScraper(Thread): - def __init__(self, config): + """Scraper that runs every `offset` seconds + + Args: + config (ConfigParser): Config instance + base_dir (_type_): Base dir of program + offset (int, optional): Seconds to sleep before repeat. Defaults to 5. + """ + + def __init__(self, config:ConfigParser, base_dir, offset:int=5): + + Thread.__init__(self) + self.offset=offset self.daemon = True - self.connector:Scraper = Scraper(config=config) - self.closed = False + self.connector:Scraper = Scraper(config=config, base_dir=base_dir) + self.closed = not self.connector.conn def run(self): + while True: if self.closed: - self.connector.connection.stop() + try: + self.connector.connection.stop() + except AttributeError: + pass break game_state = self.connector.check_for_game() - logging.info("Scraping...") + info("Scraping...") self.connector.scrape() - sleep(5) \ No newline at end of file + sleep(self.offset) + return \ No newline at end of file diff --git a/src/client/classes/Scraper.py b/src/client/classes/Scraper.py index 1344678..143d9cc 100644 --- a/src/client/classes/Scraper.py +++ b/src/client/classes/Scraper.py @@ -1,47 +1,90 @@ from lcu_connector import Connector -from lcu_connector.exceptions import ClientProcessError -import logging -import requests -import time, sys -import json -import configparser +from lcu_connector.exceptions import ClientProcessError, MissingLockfileError +from logging import info, error +from requests import get,put,post,delete +from requests.exceptions import ConnectionError +import sys +from time import sleep +from json import dumps +from configparser import ConfigParser from .Util import WhatTheFuckDidYouDo from .Game import Game +from .Notify import Notify + +TIME_DELAY = 20 +GAMES_TO_SCRAPE = 100000 class Scraper: - def __init__(self, *, loop=None, ui=None, config): - self.ui = ui + """ Scraper + + Args: + config (ConfigParser): Config instance + base_dir (str): Base dir of program + """ + + def __init__(self, *, config:ConfigParser, base_dir:str): + + # Config self.config = config - # Relative paths bad, fix this self.URL = self.config["DEFAULT"]["URL"] - # Loop until we get connection self.connection = None + self.base_dir = base_dir + + # Loop until we get connection or user closes the polling program + self.conn = self.get_connection() + if not self.conn: + return + # Get current summoner + self.summoner = self.connection.get('/lol-summoner/v1/current-summoner').json() + self.name = self.summoner["displayName"] + + def get_connection(self): + """ Tries to get connnection with local LCU. + Returns: + bool: Connection failed or established + """ + notification = Notify(base_dir=self.base_dir) while not self.connection: try: self.connection = Connector() self.connection.start() - except ClientProcessError: - print("League client not open, sleeping...") - time.sleep(90) - self.summoner = self.connection.get('/lol-summoner/v1/current-summoner').json() - self.name = self.summoner["displayName"] + except ClientProcessError or MissingLockfileError: + if notification.exit: + return False + info("Polling...") + notification.notification("League client has not been opened yet.") + notification.notified = True + sleep(TIME_DELAY) + + notification.quit() + return True - def calculate_kda(self, kills:int, assists:int, deaths:int): - """ - Calculates kill, death, assist ratio - Input: kills, assists, deaths - Output: KDA ratio + def calculate_kda(self, kills:int, assists:int, deaths:int, decimals:int=3): + """ Calculates KDA ratio + + Args: + kills (int): Kills + assists (int): Assists + deaths (int): Deaths + decimals (int,optional): Decimal points to round to. Defaults to 3. + + Returns: + float: rounded """ if deaths == 0: deaths = 1 - return round((kills+assists)/deaths, 3) + return round((kills+assists)/deaths, decimals) def parse_history(self, history:dict, old_ids:list) -> list: - """ - Parses current player's history - Input: Logged in player's match history - Output: New data about unaccounted for custom games, ready to send to server + """Parses current player's history + + Args: + history (dict): Current player's history + old_ids (list): Cached games (on server) + + Returns: + list: Serialized games ready for the server """ connection = self.connection @@ -69,7 +112,6 @@ class Scraper: } # Sloppy solution, find fix. - print("Extracting data...") for player in range(10): current_player = match["participants"][player]["stats"] kills = current_player["kills"] @@ -81,31 +123,37 @@ class Scraper: parsed_match["participants"]["t2"]["summoners"].append({"name":match["participantIdentities"][player]["player"]["summonerName"], "kda": self.calculate_kda(kills, assists, deaths)}) parsed_matches.append(parsed_match) if not new: - print("Already up to date.") - time.sleep(3) + # Notify player that we're up to date + sleep(3) return parsed_matches - def register_summoner(self, claim, claimed): + def register_summoner(self, claim:bool, claimed): + """Attempts to register the current summoner + + Args: + claim (bool): Register or Deregister(Delete) + claimed (dict): Claimed user data ["lol", "lol_id", "discord", "discord_id"] + """ if claim: - account = requests.put(f"{self.URL}/players/{claimed['lol']}/", data={ + account = put(f"{self.URL}/players/{claimed['lol']}/", data={ "lol": claimed["lol"], "lol_id": claimed["lol_id"], "discord_id":claimed["discord_id"], "discord":claimed["discord"] }) if account.status_code == 200: - print(f"Alright, the account is now yours, {claimed['discord']}.") + Notify(base_dir=self.base_dir, exit_after=True).notification(f"Alright, the account is now yours, {claimed['discord']}.") else: - print("Something went wrong when claiming your account...") + Notify(base_dir=self.base_dir, exit_after=True).notification("Something went wrong when claiming your account...") else: - requests.delete(f"{self.URL}/players/{claimed['discord_id']}") + delete(f"{self.URL}/players/{claimed['discord_id']}") def check_summoner(self): """ - Checks if summoner is registered + Checks if logged in summoner is registered """ connection = self.connection @@ -114,9 +162,9 @@ class Scraper: # Check if account is claimed try: - claimed = requests.get(f"{self.URL}/players/?search={self.summoner['displayName']}").json()[0] + claimed = get(f"{self.URL}/players/?search={self.summoner['displayName']}").json()[0] except Exception as e: - print(e) + error(e) return "USER_DOES_NOT_EXIST" # Case 1: It belongs to nobody and has yet to be claimed. @@ -142,61 +190,88 @@ class Scraper: else: raise WhatTheFuckDidYouDo() - def move_needed(self, checker, game, name): - # Move if you have to + def move_needed(self, checker, game:Game, name:str): + """ Moves player into other team if needed + + Args: + checker (dict): Information about the remote current game, we need the teams. + game (Game): Local game instance + name (str): Username we're looking for + """ # This is buggy, try to find a better way to do this. # Like for example, letting team 1 pass first, and then team 2. local_teams = game.get_teams() - - if name in local_teams[0] and not name in checker["teams"][0]: + checker = checker["teams"] + if name in local_teams[0] and not name in checker[0]: game.move() - logging.info("Moving to Team 2") - elif name in local_teams[1] and not name in checker["teams"][1]: + info("Moving to Team 2") + elif name in local_teams[1] and not name in checker[1]: game.move() - logging.info("Moving to Team 1") + info("Moving to Team 1") - def start(self, checker, game): + def start(self, checker, game:Game, timeout:int=120): + """ Waits for 10 players and starts game + + Args: + checker (dict): Information about current game (http://yourser.ver/current/) + game (Game): Local game instance + timeout (int, optional): Timeout for lobby in seconds + """ + # Move if needed self.move_needed(checker, game, self.name) - time.sleep(5) - # Wait until there are 10 players(confirmed) in the lobby timeout_counter = 0 - while response := requests.get(f"{self.URL}/current/{self.name}").json()["players"] != 10: - logging.info("Waiting for players...") + + # Wait until 10 players + while response := get(f"{self.URL}/current/{self.name}").json()["players"] != 10: + info("Waiting for players...") timeout_counter += 5 - if timeout_counter == 120: - logging.info("Timeout, aborting...") + if timeout_counter == timeout: + Notify(base_dir=self.base_dir, exit_after=True).notification(message="Timed out, not enough players joined, leaving.") break - time.sleep(5) + sleep(5) + + # Start or leave if response == 10: - logging.info("Starting game...") + Notify(base_dir=self.base_dir, exit_after=True).notification("Starting game...") game.start() else: game.leave() - requests.delete(f"{self.URL}/current/{self.name}") + + # Current game gets deleted either way + sleep(30) + delete(f"{self.URL}/current/{self.name}") def check_for_game(self): """ Checks if a game is going on right now. """ + # Initial check try: - checker = requests.get(f"{self.URL}/current").json()[0] - except IndexError: + checker = get(f"{self.URL}/current").json()[0] + except Exception: return "NO_GAME" + if checker["lobby_name"] == "null": + return + # Local game instance game = Game(connection=self.connection, config=self.config) - # If you are indeed the creator, create the game and disclose its name to the server + # Check if inside the game already + if game.in_game_with_name(checker["lobby_name"]): + game.leave_with_creator(checker["creator"]) + return "JOINED" + + # If you are the creator, create the game and disclose its name to the server if checker["creator"] == self.name: + Notify(base_dir=self.base_dir, exit_after=True).notification("You are the creator! Creating lobby...") created = game.create() # TODO: DEBUG - print(checker["teams"]) - r = requests.put(f"{self.URL}/current/{self.name}/", data={ + r = put(f"{self.URL}/current/{self.name}/", data={ "lobby_name": created, "creator": self.name, "players": 1, - "teams": json.dumps(checker["teams"], indent=4) + "teams": dumps(checker["teams"], indent=4) }) - print(r.content) # Start the game @@ -209,49 +284,54 @@ class Scraper: name = checker["lobby_name"] except KeyError: # Wait until lobby name becomes available - while not requests.get(f"{self.URL}/current").json()[0].get("lobby_name"): - time.sleep(10) - checker = requests.get(f"{self.URL}/current").json() + while not get(f"{self.URL}/current").json()[0].get("lobby_name"): + sleep(10) + checker = get(f"{self.URL}/current").json() name = checker["lobby_name"] - - # Join the lobby - game.join_by_name(name) + Notify(base_dir=self.base_dir, exit_after=True).notification(f"Joining {name}...") + # Join the lobby and move if needed + game.join_by_name(name) + self.move_needed(checker, game, self.name) + # Update count of players - requests.put(f"{self.URL}/current/{checker['creator']}/", data={ + put(f"{self.URL}/current/{checker['creator']}/", data={ "lobby_name": checker["lobby_name"], "creator": name, "players": int(checker["players"])+1, - "teams": json.dumps(checker["teams"], indent=4) + "teams": dumps(checker["teams"], indent=4) }) return "JOINED" - - - def scrape(self): """Scrapes current account and sends it to server""" - - connection = self.connection - self.check_summoner() - # Match History - match_history = connection.get('/lol-match-history/v1/products/lol/current-summoner/matches?endIndex=99') - match_history = match_history.json() + + try: + connection = self.connection + self.check_summoner() + # Match History + match_history = connection.get(f'/lol-match-history/v1/products/lol/current-summoner/matches?endIndex={GAMES_TO_SCRAPE}') + match_history = match_history.json() - # Stage old ids in order for them to be parsed - old_ids = requests.get(f"{self.URL}/games/").json() - old_ids = [x["game_id"] for x in old_ids] + # Stage old ids in order for them to be parsed + old_ids = get(f"{self.URL}/games/").json() + old_ids = [x["game_id"] for x in old_ids] - # TODO: Optimize the process of acquisition of new matches - games = self.parse_history(match_history, old_ids) - - # Post the new games to your server(change in config.json) - for i in games: - req = requests.post(f"{self.URL}/games/", json=i) - if req.status_code == 500: - print("Serverside error! Contact maintainer!") - - return len(games) + # TODO: Optimize the process of acquisition of new matches + games = self.parse_history(match_history, old_ids) + + # Post the new games to your server(change in config.json) + for i in games: + req = post(f"{self.URL}/games/", json=i) + if req.status_code == 500: + Notify(base_dir=self.base_dir, exit_after=True).notification("Serverside error! Contact maintainer!") + + return len(games) + + # If client is not opened, destroy past connection and try to get new one + except ConnectionError: + self.connection = None + self.get_connection() diff --git a/src/client/classes/SelfUpdate.py b/src/client/classes/SelfUpdate.py index d58003a..a92e51f 100644 --- a/src/client/classes/SelfUpdate.py +++ b/src/client/classes/SelfUpdate.py @@ -1,19 +1,29 @@ from requests import get, put, post, delete from wget import download -from os import system +from os import system, path from time import sleep +from logging import info +from .Notify import Notify class SelfUpdate(): - # Auto update when new release is found on github - def __init__(self, version, ui): + """Checks for new updates and prompts user to install + + Args: + version (str): Version number + base_dir(str): Base dir of program + """ + def __init__(self, version:str, base_dir:str): + self.version = version self.newest = get("https://api.github.com/repos/confestim/custoMM/releases/latest").json()["tag_name"] if self.version != self.newest: - ui.icon.notify("New version found", f"New version {newest} found, updating...", title="CustoMM") + Notify(base_dir=base_dir).notification("New version found", f"New version {self.newest} found, updating...", title="CustoMM") sleep(5) self.update() return def update(self): + """ Updater + """ download(f"https://github.com/confestim/custoMM/releases/download/{self.newest}/custoMM_installer.exe") system("custoMM_installer.exe") \ No newline at end of file diff --git a/src/client/classes/UI.py b/src/client/classes/UI.py index 20fdd29..f38370d 100644 --- a/src/client/classes/UI.py +++ b/src/client/classes/UI.py @@ -1,17 +1,32 @@ import pystray from PIL import Image -from .Scraper import Scraper import pyautogui from time import sleep -import sys, os +import os +from logging import info +from .Scraper import Scraper +from .PeriodicScraper import PeriodicScraper class UI(): - - def __init__(self,scraper, periodic, base_dir): + """Tray icon UI module + + Args: + scraper (Scraper): Scraper instance + periodic (PeriodicScraper): PeriodicScraper instance + base_dir (_type_): Base dir of program + """ + def __init__(self,scraper:Scraper, periodic:PeriodicScraper, base_dir): + image = Image.open(os.path.join(base_dir, os.path.join("assets", "icon.png"))) + # Check if user has exited before making a connection with the LoL client + if periodic.closed: + return self.periodic = periodic + self.scraper = scraper + + # Menu items self.menu = pystray.Menu( pystray.MenuItem( "Check registration", self.check_registration, default=True @@ -26,35 +41,43 @@ class UI(): "Exit", self.quit ) ) + self.icon = pystray.Icon( - "name", image, "CustoMM", self.menu) - self.scraper = scraper + "name", image, "custoMM", self.menu) self.icon.run_detached() - self.icon.notify("CustoMM is running in the background.", title="CustoMM") + + # After ui is running, check user registration self.check_registration() def check(self): - self.icon.notify("This is discouraged, as it is done automatically anyway.", "Checking for game...", title="CustoMM") + """ Checks for ongoing game and notifies user + """ + + self.icon.notify("This is discouraged, as it is done automatically anyway.", "Checking for game...") game = self.scraper.check_for_game() if game == "NO_GAME": - self.icon.notify("Please create a game on discord.", "No game found.", title="CustoMM") + self.icon.notify("Please create a game on discord.", "No game found.") elif game == "CREATED": - self.icon.notify("GLHF!", "You are the host of a new game!", title="CustoMM") + self.icon.notify("GLHF!", "You are the host of a new game!") elif game == "JOINED": - self.icon.notify("Waiting for players...", "Game joined!", title="CustoMM") + self.icon.notify("Waiting for players...", "Game joined!") def report(self): + """ Manual game reporting + """ self.icon.notify("Game report initiated.") self.scraper.scrape() - self.icon.notify("Game reported", "Your game has been reported to the server.", title="CustoMM") def check_registration(self): + """ Checks the current logged in user's registration + """ + self.icon.notify("Checking summoner...", title="custoMM") check = self.scraper.check_summoner() if check == "USER_DOES_NOT_EXIST": - self.icon.notify("You are not registered, please register on the website.", title="CustoMM") + self.icon.notify("You are not registered, please register on the website.") elif check == "UNCLAIMED": - self.icon.notify("You have not claimed your account yet, please claim it on discord -> !registed .", title="CustoMM") + self.icon.notify("You have not claimed your account yet, please claim it on discord -> !registed .") elif check[0] == "REGISTRATION_IN_PROGRESS": prompt = pyautogui.confirm(f"Your account is currently being registered by {check[1]}, do you want to proceed?") if prompt: @@ -62,10 +85,12 @@ class UI(): else: self.scraper.register_summoner(False, check[1]) else: - self.icon.notify(f"Your account is registered to {check[0]} and your account name is {check[1]}.", title="CustoMM") + self.icon.notify(f"Your account is registered to {check[0]} and your account name is {check[1]}.") - def quit(self, icon, query): + def quit(self, icon): + """ Quit + """ icon.stop() self.periodic.closed = True diff --git a/src/client/config.ini b/src/client/config.ini index 1eb0d48..a80cf44 100644 --- a/src/client/config.ini +++ b/src/client/config.ini @@ -1,6 +1,6 @@ [DEFAULT] ; Your URL here (with no slash at the end) -URL = http://change.me +URL = http://23.88.44.133:8000 [LEAGUE] ; Password for the generated league lobbies diff --git a/src/client/main.py b/src/client/main.py index 3fb183c..a85a9dc 100644 --- a/src/client/main.py +++ b/src/client/main.py @@ -4,7 +4,7 @@ import requests import sys, os import configparser from time import sleep -import logging +from logging import info, basicConfig, INFO # Custom imports from classes.Util import WhatTheFuckDidYouDo @@ -12,11 +12,12 @@ from classes.UI import UI from classes.PeriodicScraper import PeriodicScraper from classes.Scraper import Scraper from classes.SelfUpdate import SelfUpdate +from classes.Notify import Notify VERSION = "1.1.1" # Config section -logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) +basicConfig(format='%(asctime)s - %(message)s', level=INFO) # Check if bundled if getattr(sys, 'frozen', False): @@ -24,12 +25,15 @@ if getattr(sys, 'frozen', False): elif __file__: base_dir = os.path.dirname(__file__) -logging.info(base_dir) +info("Base directory - " + base_dir) + +# Parse config config = configparser.ConfigParser() conf_path = os.path.join(base_dir, "config.ini") -logging.info(conf_path) config.read(conf_path) URL = config["DEFAULT"]["URL"] + + # Test connection to server try: test = requests.get(URL).json() @@ -37,16 +41,19 @@ try: except Exception: # NEVER DO THIS # although, what could go wrong... - print("Server seems to be down, please contact admin if it keeps doing this") + Notify(base_dir=base_dir, exit_after=True).notification("Server seems to be down, please contact admin if it keeps doing this") sys.exit() # Get current summoner def main(): # Match scraping - # Running the UI - periodic = PeriodicScraper(config=config) + # Periodic scraper + periodic = PeriodicScraper(config=config, base_dir=base_dir) + ui = UI(scraper=periodic.connector, periodic=periodic, base_dir=base_dir) - update = SelfUpdate(ui=ui, version=VERSION) + # Self update only needs to run once, on start of program + # TODO: Test this + update = SelfUpdate(base_dir=base_dir, version=VERSION) periodic.start() periodic.join()