# -*- coding: utf-8 -*- ############################ # Controller static module # ############################ # Imports from tubot.torrent.abc import TorrentAPI, TorrentObj from tubot.dirgetter.abc import DirGetter from tubot.db.abc import CacheDB from tubot.db.types import User, UserStates from tubot.static.functions import validate from tubot.static.env import AUTH_PASSWD from tubot.static.exceptions import AlreadyExists, AuthError class Controller(object): """ Controller object """ torrent: TorrentAPI getter: DirGetter cache: CacheDB def __init__(self, torrent_api: TorrentAPI, dg: DirGetter, cache: CacheDB) -> None: """ :param torrent_api: TorrentAPI module :param dg: DirectoryGetter module :param cache: CacheDB module """ self.torrent = torrent_api self.getter = dg self.cache = cache # DG async def get_dirs(self) -> dict: """ Returns dict of dirs from cache / dg """ dirs = await self.cache.get_dirs if len(dirs) > 0: return dirs dirs = await self.getter.folders if len(dirs) > 0: return dirs raise KeyError("No dirs found") # Torrent async def upload_torrent(self, torrent: TorrentObj) -> None: """ Add torrent to query :param torrent: Torrent object """ await validate(torrent, "Wrong torrent object") await self.torrent.upload(torrent) async def get_torrent_list(self) -> str: """ Returns message with current torrents list """ return await self.torrent.torrent_list # Users async def _create_user(self, tg_id: int, name: str) -> User: """ Creates user in database :param tg_id: Telegram id :param name: Telegram profile name """ user_obj = User(tg_id=tg_id, name=name) await self.cache.write_user(tg_id, user_obj) return user_obj async def _get_user_from_db(self, tg_id: int) -> User: """ Gets user from database :param tg_id: Telegram id """ return await self.cache.read_user(tg_id) async def get_user(self, tg_id: int, name: str) -> User: """ Returns user object :param tg_id: Telegram id :param name: Telegram profile name """ if await self.cache.chech_user_existing(tg_id): return await self._get_user_from_db(tg_id) return await self._create_user(tg_id, name) async def auth_user(self, user: User, pwd: str) -> None: """ Auth user :param user: Current user object """ if user.auth: raise AlreadyExists("You already auth") if pwd == AUTH_PASSWD()(): user.auth = True await self.cache.write_user(user.tg_id, user) raise AuthError("Wrong password") async def set_user_state(self, user: User, state: UserStates) -> None: """ Change user status :param user: Current user object :param state: New user status """ user.state = state await self.cache.write_user(user.tg_id, user)