# -*- coding: utf-8 -*- ########################### # CacheDB implementations # ########################### # Imports from tubot.db.abc import CacheDB from tubot.db.types import CacheDBTypes, UserStates, User from tubot.static.env import PN_CACHE from pickle import loads, dumps from aiofiles.ospath import isdir, isfile from aiofiles.os import mkdir from aiofiles import open from asyncio import sleep from redis import asyncio as aioredis from json import loads as json_loads from json import dumps as json_dumps class PythonCache(CacheDB): """ Native python implementation of Cache DataBase """ CACHE_DIR = PN_CACHE()() _ctype = CacheDBTypes.PythonPKL users: dict dirs: dict def __init__(self) -> None: super().__init__() async def _init(self) -> bool: self.users = {} self.dirs = {} if await isfile(f"{self.CACHE_DIR}/user_cache.pkl"): try: await self._load_pkl() except Exception: return False return True async def _load_pkl(self) -> None: if not await isdir(self.CACHE_DIR): await mkdir(self.CACHE_DIR) async with open(f"{self.CACHE_DIR}/user_cache.pkl", "rb") as file: buffer = await file.read() pkl = loads(buffer) self.users = pkl async def _save_pkl(self) -> None: if not await isdir(self.CACHE_DIR): await mkdir(self.CACHE_DIR) async with open(f"{self.CACHE_DIR}/user_cache.pkl", "wb") as file: await file.write(dumps(self.users)) async def __validate__(self) -> bool: return await self._init() # Users async def write_user(self, tg_id: int, user: User) -> None: self.users[tg_id] = user.to_dict await self._save_pkl() async def read_user(self, tg_id: int) -> User: user_data = self.users[tg_id] return User.from_dict(user_data) # Dirs async def cache_dirs(self, dirs: dict, expire: int) -> None: self.dirs = dirs await sleep(expire) self.dirs = {} @property async def get_dirs(self) -> dict: return self.dirs class RedisCache(CacheDB): """ Redis implementation of Cache DataBase """ _ctype = CacheDBTypes.Redis host: str def __init__(self, redis_host: str) -> None: super().__init__() self.host = redis_host async def __validate__(self) -> bool: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: return await redis.ping() # Users async def write_user(self, tg_id: int, user: User) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = json_dumps(user.to_dict) await redis.set(str(tg_id), json) async def read_user(self, tg_id: int) -> User: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = await redis.get(str(tg_id)) user_data = json_loads(json) user_data["state"] = UserStates(user_data["state"]) return User.from_dict(user_data) # Dirs async def cache_dirs(self, dirs: dict, expire: int) -> None: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: json = json_dumps(dirs) await redis.set("dirs", json, ex=expire) @property async def get_dirs(self) -> dict: async with aioredis.from_url( f"redis://{self.host}", encoding="utf-8", decode_responses=True ) as redis: resp = await redis.get("dirs") if resp is None: return {} return json_loads(resp)