# -*- coding: utf-8 -*- ########################### # CacheDB implementations # ########################### # Imports from tubot.db.abc import CacheDB from tubot.db.types import CacheDBTypes, UserStates, User from tubot.static.env import PN_CACHE from pickle import loads, dumps from aiofiles.ospath import isdir, isfile from aiofiles.os import mkdir from aiofiles import open from asyncio import sleep from redis import asyncio as aioredis from json import loads as json_loads from json import dumps as json_dumps class PythonCache(CacheDB): """ Native python implementation of Cache DataBase """ CACHE_DIR = PN_CACHE()() _ctype = CacheDBTypes.PythonPKL users: dict dirs: dict def __init__(self) -> None: super().__init__() async def _init(self) -> bool: self.users = {} self.dirs = {} if await isfile(f"{self.CACHE_DIR}/user_cache.pkl"): try: await self._load_pkl() except Exception: return False return True async def _load_pkl(self) -> None: if not await isdir(self.CACHE_DIR): await mkdir(self.CACHE_DIR) async with open(f"{self.CACHE_DIR}/user_cache.pkl", "rb") as file: buffer = await file.read() pkl = loads(buffer) self.users = pkl async def _save_pkl(self) -> None: if not await isdir(self.CACHE_DIR): await mkdir(self.CACHE_DIR) async with open(f"{self.CACHE_DIR}/user_cache.pkl", "wb") as file: await file.write(dumps(self.users)) async def __validate__(self) -> bool: return await self._init() # Users async def add_user(self, tg_id: int, name: str) -> None: if tg_id in tuple(self.users.keys()): raise ValueError("User already exists") user = User(tg_id, name) self.users[tg_id] = user.dict await self._save_pkl() async def user_state(self, tg_id: int) -> UserStates: if tg_id not in tuple(self.users.keys()): raise ValueError("User doesn't exists") user = self.users[tg_id] return user["state"] async def change_user_state(self, tg_id: int, status: UserStates) -> None: if tg_id not in tuple(self.users.keys()): raise ValueError("User doesn't exists") user = self.users[tg_id] user["state"] = status await self._save_pkl() async def auth_user(self, tg_id: int) -> None: if tg_id not in tuple(self.users.keys()): raise ValueError("User doesn't exists") user = self.users[tg_id] user["auth"] = True await self._save_pkl() async def is_user_auth(self, tg_id: int) -> bool: if tg_id not in tuple(self.users.keys()): raise ValueError("User doesn't exists") user = self.users[tg_id] return user["auth"] # Dirs async def cache_dirs(self, dirs: dict, expire: int) -> None: self.dirs = dirs await sleep(expire) self.dirs = {} @property async def get_dirs(self) -> dict: return self.dirs class RedisCache(CacheDB): """ Redis implementation of Cache DataBase """ _ctype = CacheDBTypes.Redis host: str def __init__(self, redis_host: str) -> None: super().__init__() self.host = redis_host async def __validate__(self) -> bool: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: return await redis.ping() # Users async def add_user(self, tg_id: int, name: str) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: user = User(tg_id, name) json = json_dumps(user.dict) await redis.set(str(tg_id), json) async def user_state(self, tg_id: int) -> UserStates: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = await redis.get(str(tg_id)) user = json_loads(json) return user["state"] async def change_user_state(self, tg_id: int, status: UserStates) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = await redis.get(str(tg_id)) user = json_loads(json) user["state"] = status json = json_dumps(user) await redis.set(str(tg_id), json) async def auth_user(self, tg_id: int) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = await redis.get(str(tg_id)) user = json_loads(json) user["auth"] = True json = json_dumps(user) await redis.set(str(tg_id), json) async def is_user_auth(self, tg_id: int) -> bool: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = await redis.get(str(tg_id)) user = json_loads(json) return bool(user["auth"]) # Dirs async def cache_dirs(self, dirs: dict, expire: int) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = json_dumps(dirs) await redis.set("dirs", json, ex=expire) @property async def get_dirs(self) -> dict: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: resp = await redis.get("dirs") if resp is None: return {} return json_loads(resp)