Compare commits

..

No commits in common. "rewrite" and "legacy-deprecated" have entirely different histories.

33 changed files with 613 additions and 1432 deletions

5
.gitignore vendored
View File

@ -1,5 +0,0 @@
venv
probe.*
__pycache__
*.egg-info
dist

10
Dockerfile Normal file
View File

@ -0,0 +1,10 @@
FROM alpine:latest
COPY bot /opt/bot
RUN apk update && apk add tzdata bash python3 py-pip wget icu-libs krb5-libs libgcc libintl libssl1.1 libstdc++ zlib && pip install telebot
RUN wget https://github.com/fedarovich/qbittorrent-cli/releases/download/v1.7.22315.1/qbt-linux-alpine-x64-1.7.22315.1.tar.gz && \
mkdir /opt/qbt && \
tar -zxf qbt-linux-alpine-x64-1.7.22315.1.tar.gz -C /opt/qbt && \
chmod a+x /opt/qbt/* && \
ln -sf /opt/qbt/qbt /bin/qbt && ln -sf /opt/bot/bot.py /bin/bot
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENTRYPOINT ["/bin/bot"]

View File

@ -1,5 +0,0 @@
# Torrent Uploader Bot
## A simple Telegram bot that will allow you to upload torrent files / magnet links to a remote Torrent server (qBitTorrent, Transmission, etc).
***

132
Readme.md Normal file
View File

@ -0,0 +1,132 @@
# qBitDownload Bot
![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/pgxm2mKT5KHEHFE?file=/&fileId=23795&x=1920&y=1200&a=true&etag=430e9d84364f13b79e42991fede6609a)
## Telegram bot designed to remotely add downloads to the queue on qBitTorrent server
| [**git.orudo.ru**](https://git.orudo.ru/OrudoCA/qBitDownload-Bot) | [**GitHub**](https://github.com/OrudoCA/qBitDownload-Bot) | [**DockerHub**](https://hub.docker.com/r/orudoca/qbitdownload-bot) |
| ---------------- | ---------- | ------------- |
| [![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/AmggNTQWgR6KkyB?file=/&fileId=23836&x=1920&y=1200&a=true&etag=0ef9694cea6e4d85c05aef9be72b927a)](https://git.orudo.ru/OrudoCA/qBitDownload-Bot) | [![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/ip5qtGcwKHMPMAG?file=/&fileId=23819&x=1920&y=1200&a=true&etag=c540068d990ac47217a31f7450afc0ee)](https://github.com/OrudoCA/qBitDownload-Bot) |[![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/7AEeEAzHYikFd5B?file=/&fileId=23806&x=1920&y=1200&a=true&etag=59894ecdfa7aaa6fb832cc4bf99c418d)](https://hub.docker.com/r/orudoca/qbitdownload-bot) |
### Bot requires [**qBitTorrent**](https://www.qbittorrent.org/) server
---
### Current features:
---
- **Authorization by password**
- **Adding downloads to the queue via .torrent files / Magnet-links**
- **Add/Delete download directories**
- **Multiple language support**
---
### Image uses ["fedarovich/qbittorrent-cli"](https://github.com/fedarovich/qbittorrent-cli)
---
[![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/rRcdSnCEaA85tWf?file=/&fileId=23784&x=1920&y=1200&a=true&etag=32928842bc4e76adaba194cdd9ec1351)](https://hub.docker.com/r/orudoca/qbitdownload-bot)
## Deploy with Docker
#### 1. Build image or clone it from [Dockerhub](https://hub.docker.com/r/orudoca/qbitdownload-bot)
**Clone a repository and go to its directory**
```bash
git clone https://git.orudo.ru/OrudoCA/qBitDownload-Bot.git && cd qBitDownload-Bot
```
**Build image**
```bash
docker build -t <IMAGE_NAME> .
```
#### 2. Deploy via docker-cli or docker-compose
**Docker-cli**
```bash
docker run \
--name qbitdl_bot \
--restart=unless-stopped \
-v /path/to/config:/etc/dbot \
-v /path/to/media:/path/to/media \
-e TOKEN="<YOUR_BOT_TOKEN_HERE>" \
-e PASS="change_me" \
-e QURL="http://<YOUR_QBIT_SERVER_IP_HERE>:<PORT>" \
-e QUSER="<YOUR_QBIT_USERNAME>" \
-e QPASS="<YOUR_QBIT_PASSWORD>" \
-e LANG="YOUR_LANG" \
-e TZ="Europe/Moscow" \
-d -it your_image_here
```
##### or
**docker-compose**
```yml
services:
qbitdl_bot:
image: <YOUR_IMAGE_HERE>
tty: true
container_name: qbitdl_bot
volumes:
- /path/to/config:/etc/bot
- /path/to/data/:/path/to/data
restart: 'unless-stopped'
environment:
TOKEN: "<YOUR_BOT_TOKEN_HERE>"
PASS: "change_me"
QURL: "http://<YOUR_QBIT_SERVER_IP_HERE>:<PORT>"
QUSER: "<YOUR_QBIT_USERNAME>"
QPASS: "<YOUR_QBIT_PASSWORD>"
LANG: "<YOUR_LANG>"
TZ: "Europe/Moscow"
```
```bash
docker compose up -d
```
---
![](https://cloud.orudo.ru/apps/files_sharing/publicpreview/ffSABnXQ3cQrLZG?file=/&fileId=23851&x=1920&y=1200&a=true&etag=d2d4704b2ab90afe5edee647a19a5540)
## Run natively:
### 1. Install deps:
- python3
- py3-pip
- [qbittorrent-cli](https://github.com/fedarovich/qbittorrent-cli)
### 2. Install TeleBot lib:
```bash
pip install telebot
```
### 3. Set system ENV
```bash
# Linux
export TOKEN="YOUR_BOT_TOKEN"
export PASS="change_me"
export QURL="http://<YOUR_QBIT_SERVER_IP_HERE>:<PORT>"
export QUSER="<YOUR_QBIT_USERNAME>"
export QPASS="<YOUR_QBIT_PASSWORD>"
export LANG="<YOUR_LANG>"
# Windows PS
set TOKEN="YOUR_BOT_TOKEN"
set PASS="change_me"
set QURL="http://<YOUR_QBIT_SERVER_IP_HERE>:<PORT>"
set QUSER="<YOUR_QBIT_USERNAME>"
set QPASS="<YOUR_QBIT_PASSWORD>"
set LANG="<YOUR_LANG>"
```
#### 3.1 On windows change PATH var in `db.py`
### 4. Run
```bash
python3 bot.py
```
---
### Available languages:
##### - **"ENG"** - English
##### - **"RU"** - Russian

80
bot.py
View File

@ -1,80 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#####################
# Aiogram bot logic #
#####################
# Imports
from asyncio import run
from tubot.db.abc import CacheDB
from tubot.dirgetter.abc import DirGetter
from tubot.static.init import init_modules
from tubot.static.controller import Controller
from tubot.static.env import BOT_TOKEN
from tubot.torrent.abc import TorrentAPI
from aiogram import Bot, Dispatcher, Router
from aiogram.types import Message
from aiogram.filters.command import Command
# Init
## --- Modules --- ##
cache: CacheDB
dirgetter: DirGetter
torrent_api: TorrentAPI
ctrl: Controller
## --- Bot --- ##
router = Router()
dp = Dispatcher()
dp.include_router(router)
async def initialize() -> None:
# --- Modules --- #
global cache, dirgetter, torrent_api, ctrl
torrent_api, dirgetter, cache = await init_modules()
ctrl = Controller(torrent_api, dirgetter, cache)
# --- Bot --- #
bot = Bot(BOT_TOKEN()())
await dp.start_polling(bot)
def main() -> None:
run(initialize())
## -- Functions -- ##
async def check_auth(msg: Message) -> bool:
if msg.from_user is None:
raise ValueError
tg_id = msg.from_user.id
name = msg.from_user.first_name
user = await ctrl.get_user(tg_id, name)
return user.auth
@dp.message(Command("auth"))
async def auth(msg: Message) -> None:
if msg.from_user is None:
raise ValueError
password = msg.text
if password:
password = " ".join(password.split()[1:])
tgid = msg.from_user.id
name = msg.from_user.first_name
user = await ctrl.get_user(tgid, name)
try:
await ctrl.auth_user(user, password)
await msg.answer("Auth complete!")
except Exception as e:
await msg.answer(str(e))
if __name__ == "__main__":
main()

183
bot/bot.py Executable file
View File

@ -0,0 +1,183 @@
#!/usr/bin/python3
# -- coding: utf-8 --
import func, telebot, os, log, sys
from db import PATH
from lang import LANG as msg
TOKEN = os.environ.get('TOKEN','None')
bot = telebot.TeleBot(TOKEN)
folder_list = []
dir = None
# Start
@bot.message_handler(commands=['start'])
def welcome(message):
id = message.from_user.id
if func.auth_check(id):
bot.reply_to(message,str(msg.get('type')),reply_markup=home())
else:
bot.reply_to(message,str(msg.get('adeny')))
# Keyboard: Homepage
def home():
keyboard = telebot.types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
file = telebot.types.KeyboardButton(str(msg.get('file')))
magnet = telebot.types.KeyboardButton(str(msg.get('magnet')))
keyboard.add(file,magnet)
return keyboard
# Login
@bot.message_handler(commands=['login'])
def login(message):
id = message.from_user.id
name = message.from_user.first_name
passwd = message.text.replace('/login ', '')
f = str(func.u_auth(name,id,passwd))
if f == str(msg.get('sucauth')) or f == str(msg.get('alauth')):
bot.reply_to(message,f,reply_markup=home())
else:
bot.reply_to(message,f)
# Folders
def folder_menu():
folders = func.dirlist()
if len(folders) == 0:
return None
else:
keyboard = telebot.types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
i = 0
global folder_list
folder_list = []
lst = []
for folder in folders.keys():
globals()[f'var_{i}'] = telebot.types.KeyboardButton(f'{folder}')
lst.append(f'var_{i}')
folder_list.append(folder)
i += 1
for var in lst:
keyboard.add(globals()[var])
return keyboard
# Folder add
@bot.message_handler(commands=['add'])
def add(message):
id = message.from_user.id
name = message.from_user.first_name
txt = message.text.split(' ', 2)
if len(txt) == 3:
key = txt[1]
path = txt[2]
f = str(func.add_dir(name,id,key,path))
else:
f = str(msg.get('aerr'))
bot.reply_to(message,f,reply_markup=home())
# Folder del
@bot.message_handler(commands=['del'])
def rm(message):
id = message.from_user.id
name = message.from_user.first_name
folder = message.text.replace('/del ', '')
f = func.del_dir(name,id,folder)
bot.reply_to(message,str(f),reply_markup=home())
# Magnet
@bot.message_handler(func=lambda message: message.text == str(msg.get('magnet')))
def magnet(message):
id = message.from_user.id
if func.auth_check(id):
global type
type = 'magnet'
f = folder_menu()
if f == None:
bot.reply_to(message,str(msg.get('cff')))
else:
bot.reply_to(message,str(msg.get('chf')),reply_markup=f)
else:
bot.reply_to(message,str(msg.get('adeny')))
# File
@bot.message_handler(func=lambda message: message.text == str(msg.get('file')))
def file(message):
id = message.from_user.id
if func.auth_check(id):
global type
type = 'file'
f = folder_menu()
if f == None:
bot.reply_to(message,str(msg.get('cff')))
else:
bot.reply_to(message,str(msg.get('chf')),reply_markup=f)
else:
bot.reply_to(message,str(msg.get('adeny')))
# File download
@bot.message_handler(content_types=['document'])
def download(message):
id = message.from_user.id
name = message.from_user.first_name
if func.auth_check(id):
global type, dir, folder_list
if dir != None and type == 'file':
if message.document.file_name.lower().endswith('.torrent'):
file_info = bot.get_file(message.document.file_id)
file_path = file_info.file_path
file = bot.download_file(file_path)
file_name = os.path.join(PATH, message.document.file_name)
with open(file_name, 'wb') as dl:
dl.write(file)
f = str(func.file(name,id,file_name,dir))
dir, type, folder_list = None,None,[]
bot.reply_to(message,f)
else:
bot.reply_to(message,str(msg.get('ntorr')))
bot.reply_to(message,str(msg.get('type')),reply_markup=home())
else:
bot.reply_to(message,str(msg.get('adeny')))
# Dir choose
def dirchoose(message):
global dir
dir = message.text
if type == 'magnet':
bot.reply_to(message,str(msg.get('sendm')))
if type == 'file':
bot.reply_to(message,str(msg.get('sendf')))
# Unknown message
@bot.message_handler(func=lambda message: True)
def unknown(message):
global type, dir, folder_list
id = message.from_user.id
name = message.from_user.first_name
if func.auth_check(id):
txt = message.text
if txt in folder_list:
dirchoose(message)
return None
if dir != None and type == 'magnet':
f = str(func.magnet(name,id,txt,dir))
dir, type, folder_list = None,None,[]
bot.reply_to(message,f)
bot.reply_to(message,str(msg.get('type')),reply_markup=home())
else:
bot.reply_to(message,str(msg.get('adeny')))
def run():
if os.path.exists(PATH) == False:
os.mkdir(PATH)
log.start()
try:
func.qbt()
except:
log.errqbt()
sys.exit(1)
try:
bot.polling()
except:
log.errtelebot()
sys.exit(1)
if __name__ == "__main__":
run()

29
bot/db.py Normal file
View File

@ -0,0 +1,29 @@
#!/usr/bin/python3
# -- coding: utf-8 --
import pickle, os
PATH = "/etc/bot/"
AUTH_FILE= "auth.pkl"
DIR_FILE = "dir.pkl"
def check(type,FILE):
if type == 'dir':
if os.path.exists(PATH) == False:
os.mkdir(PATH)
return True
elif type == 'obj':
if os.path.exists(f'{PATH}{FILE}'):
return True
else:
return False
def write(obj,FILE):
if check('dir',None):
with open(f'{PATH}{FILE}',"wb") as file:
pickle.dump(obj,file)
def read(FILE):
with open(f'{PATH}{FILE}',"rb") as file:
obj = pickle.load(file)
return obj

103
bot/func.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/python3
# -- coding: utf-8 --
import db, os, log, subprocess
from db import *
from lang import LANG as msg
def qbt():
url = os.environ['QURL']
username = os.environ['QUSER']
password = os.environ['QPASS']
commands = [
f"qbt settings set url {url}",
f"qbt settings set username {username}",
f"echo {password} | qbt settings set password --no-warn",
f"qbt server info "
]
for command in commands:
os.system(f"bash -c '{command}'")
output = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
def u_auth(name,id,passwd):
list = []
if db.check('obj',AUTH_FILE):
list = db.read(AUTH_FILE)
if id in list:
return msg.get('alauth')
else:
if passwd == os.environ['PASS']:
list.append(id)
db.write(list,AUTH_FILE)
log.auth(name,id)
return msg.get('sucauth')
else:
return msg.get('wrauth')
def auth_check(id):
if db.check('obj',AUTH_FILE):
list = db.read(AUTH_FILE)
else:
list = []
if id in list:
return True
def add_dir(name,id,dir,path):
if auth_check(id):
if os.path.exists(path) == False:
return str(msg.get('pne')).format(path)
if db.check('obj',DIR_FILE):
dict = db.read(DIR_FILE)
else:
dict = {}
dict.setdefault(dir,path)
db.write(dict,DIR_FILE)
log.add(name,id,dir,path)
return str(msg.get('fsa')).format(dir)
else:
return msg.get('adeny')
def del_dir(name,id,dir):
if auth_check(id):
if db.check('obj',DIR_FILE):
dict = db.read(DIR_FILE)
else:
dict = {}
if dir in dict:
del dict[dir]
db.write(dict,DIR_FILE)
log.rm(name,id,dir)
return str(msg.get('frm')).format(dir)
else:
return str(msg.get('fne')).format(dir)
else:
return msg.get('adeny')
def magnet(name,id,link,dir):
if auth_check(id):
dict = db.read(DIR_FILE)
path = dict[dir]
command = f'''qbt torrent add url "{link}" -f "{path}"'''
os.system(f"bash -c '{command}'")
log.addmagnet(name,id,link)
return msg.get('add')
else:
return msg.get('adeny')
def file(name,id,file,dir):
if auth_check(id):
dict = db.read(DIR_FILE)
path = dict[dir]
command = f'''qbt torrent add file "{file}" -f {path}'''
os.system(f"bash -c '{command}'")
os.remove(file)
log.addfile(name,id,file)
return msg.get('add')
else:
return msg.get('adeny')
def dirlist():
dirs = {}
if db.check('obj',DIR_FILE):
dirs = db.read(DIR_FILE)
return dirs

74
bot/lang.py Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/python3
# -- coding: utf-8 --
import os
langs = ['ENG','RU']
# Russian
RU = {
'alauth': 'Вы уже авторизированны',
'sucauth': 'Вы успешно авторизировались',
'wrauth': 'Неверный пароль',
'pne': "Директории '{}' не сушествует на сервере",
'fsa': "Папка '{}' успешно добавлена",
'frm': "Папка '{}' успешно удалена",
'fne': "Папки '{}' не существует",
'add': 'Torrent добавлен в очередь',
'type': 'Выберите тип загрузки:',
'magnet': 'Magnet-ссылка',
'file': 'Файл',
'aerr': 'Неверные аргументы',
'cff': 'Папок не обнаруженно, воспользуйтесь коммандой /add',
'chf': 'Выберите папку:',
'ntorr': 'Неверное расширение файла',
'sendm': 'Отправте Magnet-ссылку',
'sendf': 'Отправте .torrent файл',
'adeny': 'Этот бот запривачен, гнида, блять',
# Logs
'l_create': "Log Файл '{}' создан",
'l_start': 'Запуск бота...',
'l_auth': "Пользователь '{} ({})' успешно авторизировался",
'l_add': "Пользователь '{} ({})' добавил папку '{}' по пути '{}'",
'l_rm': "Пользователь '{} ({})' удалил папку '{}'",
'l_file': "Пользователь '{} ({})' добавил в очередь файл '{}'",
'l_magnet': "Пользователь '{} ({})' добавил в очередь ссылку '{}'",
'l_errqbt': "Ошибка подключения к qBitTorrent",
'l_errtele': "Ошибка подключения к Telegram API, проверьте ваш токен",
}
# English
ENG = {
'alauth': 'You are already authorized',
'sucauth': 'You have successfully logged in',
'wrauth': 'Wrong password',
'pne': "The '{}' directory does not exist on the server",
'fsa': "The '{}' folder has been successfully added",
'frm': "The '{}' folder has been successfully deleted",
'fne': "The '{}' folder does not exist",
'add': 'Torrent has been added to the queue',
'type': 'Select the download type:',
'magnet': 'Magnet',
'file': 'File',
'aerr': 'Wrong arguments',
'cff': 'No folders found, use the /add command',
'chf': 'Select folder:',
'ntorr': 'Incorrect file extension',
'sendm': 'Send Magnet link',
'sendf': 'Send .torrent file',
'adeny': "You do not have access, first authorize '/login <password>'",
# Logs
'l_create': "Log File '{}' created",
'l_start': 'Start bot polling...',
'l_auth': "User '{} ({})' successfully authorized",
'l_add': "User '{} ({})' added a folder '{}' with the path '{}'",
'l_rm': "User '{} ({})' deleted '{}' folder",
'l_file': "User '{} ({})' added file '{}' to the queue",
'l_magnet': "User '{} ({})' added the link '{}' to the queue",
'l_errqbt': "Error connecting to qBitTorrent",
'l_errtele': "Error connecting to Telegram API, check your token"
}
for i in langs:
if i == os.environ.get('LANG','ENG'):
LANG = globals()[i]

65
bot/log.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/python3
# -- coding: utf-8 --
import os, uuid
from datetime import datetime
from lang import LANG as msg
from db import PATH
def dt():
date = datetime.now().date()
time = datetime.now().time()
str = f'{date} | {time.strftime("%H:%M:%S")}'
return str
DEFAULT = ['{} LOG: ','{} ERROR: ']
ID = str(uuid.uuid1())[0:7]
FILE = f'{ID}.txt'
def file(log):
if os.path.exists(f'{PATH}logs') == False:
os.mkdir(f'{PATH}logs')
with open(f'{PATH}logs/{FILE}','a') as logfile:
logfile.write(f'{log}\n')
logfile.close()
def start():
log1 = DEFAULT[0].format(dt()) + str(msg.get('l_create').format(FILE))
log2 = DEFAULT[0].format(dt()) + str(msg.get('l_start'))
file(log2)
print(f'{log1}\n{log2}')
def auth(name,id):
log = DEFAULT[0].format(dt()) + str(msg.get('l_auth').format(name,id))
file(log)
print(log)
def add(name,id,folder,path):
log = DEFAULT[0].format(dt()) + str(msg.get('l_add').format(name,id,folder,path))
file(log)
print(log)
def rm(name,id,folder):
log = DEFAULT[0].format(dt()) + str(msg.get('l_rm').format(name,id,folder))
file(log)
print(log)
def addfile(name,id,filename):
log = DEFAULT[0].format(dt()) + str(msg.get('l_file').format(name,id,filename[9:]))
file(log)
print(log)
def addmagnet(name,id,link):
log = DEFAULT[0].format(dt()) + str(msg.get('l_magnet').format(name,id,link))
file(log)
print(log)
def errqbt():
log = DEFAULT[1].format(dt()) + str(msg.get('l_errqbt'))
file(log)
print(log)
def errtelebot():
log = DEFAULT[1].format(dt()) + str(msg.get('l_errtele'))
file(log)
print(log)

17
docker-compose.yml Normal file
View File

@ -0,0 +1,17 @@
services:
qbitdl_bot:
image: <YOUR_IMAGE_HERE>
tty: true
container_name: qbitdl_bot
volumes:
- /path/to/config:/etc/bot
- /path/to/data/:/path/to/data
restart: 'unless-stopped'
environment:
TOKEN: "<YOUR_BOT_TOKEN_HERE>"
PASS: "change_me"
QURL: "http://<YOUR_QBIT_SERVER_IP_HERE>:<PORT>"
QUSER: "<YOUR_QBIT_USERNAME>"
QPASS: "<YOUR_QBIT_PASSWORD>"
LANG: "<YOUR_LANG>"
TZ: "Europe/Moscow"

View File

@ -1,19 +0,0 @@
aiofiles>=23.2.1
aiogram>=3.10.0
aiohappyeyeballs>=2.3.4
aiohttp>=3.9.5
aiosignal>=1.3.1
annotated-types>=0.7.0
attrs>=24.2.0
certifi>=2024.7.4
frozenlist>=1.4.1
idna>=3.7
magic-filter>=1.0.12
multidict>=6.0.5
pydantic>=2.8.2
pydantic_core>=2.20.1
python-magic>=0.4.27
redis>=5.0.8
ruff>=0.5.6
typing_extensions>=4.12.2
yarl>=1.9.4

View File

@ -1,17 +0,0 @@
from setuptools import setup
setup(
name="TorrentUploaderBot",
version="",
url="",
author="ORUDO",
author_email="root@orudo.ru",
description="A simple Telegram bot that will allow you to upload torrent files / magnet links to a remote Torrent server (qBitTorrent, Transmission, etc.)",
install_requires=[
"aiohttp>=3.9.5",
"aiofiles>=23.2.1",
"redis>=5.0.8",
"aiogram>=3.10.0",
],
packages=["tubot", "tubot.static", "tubot.torrent", "tubot.dirgetter", "db"],
)

View File

View File

@ -1,80 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#####################
# Aiogram bot logic #
#####################
# Imports
from asyncio import run
from tubot.db.abc import CacheDB
from tubot.dirgetter.abc import DirGetter
from tubot.static.init import init_modules
from tubot.static.controller import Controller
from tubot.static.env import BOT_TOKEN
from tubot.torrent.abc import TorrentAPI
from aiogram import Bot, Dispatcher, Router
from aiogram.types import Message
from aiogram.filters.command import Command
# Init
## --- Modules --- ##
cache: CacheDB
dirgetter: DirGetter
torrent_api: TorrentAPI
ctrl: Controller
## --- Bot --- ##
router = Router()
dp = Dispatcher()
dp.include_router(router)
async def initialize() -> None:
# --- Modules --- #
global cache, dirgetter, torrent_api, ctrl
torrent_api, dirgetter, cache = await init_modules()
ctrl = Controller(torrent_api, dirgetter, cache)
# --- Bot --- #
bot = Bot(BOT_TOKEN()())
await dp.start_polling(bot)
def main() -> None:
run(initialize())
## -- Functions -- ##
async def check_auth(msg: Message) -> bool:
if msg.from_user is None:
raise ValueError
tg_id = msg.from_user.id
name = msg.from_user.first_name
user = await ctrl.get_user(tg_id, name)
return user.auth
@dp.message(Command("auth"))
async def auth(msg: Message) -> None:
if msg.from_user is None:
raise ValueError
password = msg.text
if password:
password = " ".join(password.split()[1:])
tgid = msg.from_user.id
name = msg.from_user.first_name
user = await ctrl.get_user(tgid, name)
try:
await ctrl.auth_user(user, password)
await msg.answer("Auth complete!")
except Exception as e:
await msg.answer(str(e))
if __name__ == "__main__":
main()

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
####################################
# DataBase module abstract classes #
####################################
# Imports
from abc import ABC, abstractmethod
from tubot.static.abc import IValidatable
from tubot.db.types import CacheDBTypes, User
class CacheDB(IValidatable, ABC):
"""
Abstract class for CacheDB
"""
_ctype: CacheDBTypes
def __init__(self) -> None:
if self._ctype is None:
raise NotImplementedError("CacheDB type not implemented")
# Users
@abstractmethod
async def write_user(self, tg_id: int, user: User) -> None:
"""
Writes user to cache db
:param tg_id: User telegram id
:param user: User object
"""
raise NotImplementedError
@abstractmethod
async def read_user(self, tg_id: int) -> User:
"""
Writes user to cache db
:param tg_id: User telegram id
:return: User object
"""
raise NotImplementedError
@abstractmethod
async def chech_user_existing(self, tg_id: int):
"""
Checks if user exist in db
:param tg_id: User telegram id
"""
raise NotImplementedError
# Dirs
@abstractmethod
async def cache_dirs(self, dirs: dict, expire: int) -> None:
"""
Cache dirs from DirectoryGetter
:param dirs: Dirs dict
:param expire: Expire time (in seconds)
"""
raise NotImplementedError
@property
@abstractmethod
async def get_dirs(self) -> dict:
"""
Returns precached dirs
:return: Dirs dict
"""
raise NotImplementedError

View File

@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
###########################
# CacheDB implementations #
###########################
# Imports
from tubot.db.abc import CacheDB
from tubot.db.types import CacheDBTypes, UserStates, User
from tubot.static.env import PN_CACHE
from pickle import loads, dumps
from aiofiles.ospath import isdir, isfile
from aiofiles.os import mkdir
from aiofiles import open
from asyncio import sleep
from redis import asyncio as aioredis
from json import loads as json_loads
from json import dumps as json_dumps
class PythonCache(CacheDB):
"""
Native python implementation of Cache DataBase
"""
CACHE_DIR = PN_CACHE()()
_ctype = CacheDBTypes.PythonPKL
users: dict
dirs: dict
def __init__(self) -> None:
super().__init__()
async def _init(self) -> bool:
self.users = {}
self.dirs = {}
if await isfile(f"{self.CACHE_DIR}/user_cache.pkl"):
try:
await self._load_pkl()
except Exception:
return False
return True
async def _load_pkl(self) -> None:
if not await isdir(self.CACHE_DIR):
await mkdir(self.CACHE_DIR)
async with open(f"{self.CACHE_DIR}/user_cache.pkl", "rb") as file:
buffer = await file.read()
pkl = loads(buffer)
self.users = pkl
async def _save_pkl(self) -> None:
if not await isdir(self.CACHE_DIR):
await mkdir(self.CACHE_DIR)
async with open(f"{self.CACHE_DIR}/user_cache.pkl", "wb") as file:
await file.write(dumps(self.users))
async def __validate__(self) -> bool:
return await self._init()
# Users
async def write_user(self, tg_id: int, user: User) -> None:
self.users[tg_id] = user.to_dict
await self._save_pkl()
async def read_user(self, tg_id: int) -> User:
user_data = self.users[tg_id]
return User.from_dict(user_data)
async def chech_user_existing(self, tg_id: int) -> bool:
try:
await self.read_user(tg_id)
return True
except KeyError:
return False
# Dirs
async def cache_dirs(self, dirs: dict, expire: int) -> None:
self.dirs = dirs
await sleep(expire)
self.dirs = {}
@property
async def get_dirs(self) -> dict:
return self.dirs
class RedisCache(CacheDB):
"""
Redis implementation of Cache DataBase
"""
_ctype = CacheDBTypes.Redis
host: str
def __init__(self, redis_host: str) -> None:
super().__init__()
self.host = redis_host
async def __validate__(self) -> bool:
async with aioredis.from_url(
f"redis://{self.host}", encoding="utf-8", decode_responses=True
) as redis:
return await redis.ping()
# Users
async def write_user(self, tg_id: int, user: User) -> None:
async with aioredis.from_url(
f"redis://{self.host}", encoding="utf-8", decode_responses=True
) as redis:
json = json_dumps(user.to_dict)
await redis.set(str(tg_id), json)
async def read_user(self, tg_id: int) -> User:
async with aioredis.from_url(
f"redis://{self.host}", encoding="utf-8", decode_responses=True
) as redis:
json = await redis.get(str(tg_id))
if json is None:
raise KeyError
user_data = json_loads(json)
user_data["state"] = UserStates(user_data["state"])
return User.from_dict(user_data)
async def chech_user_existing(self, tg_id: int) -> bool:
try:
await self.read_user(tg_id)
return True
except KeyError:
return False
# Dirs
async def cache_dirs(self, dirs: dict, expire: int) -> None:
async with aioredis.from_url(
f"redis://{self.host}", encoding="utf-8", decode_responses=True
) as redis:
json = json_dumps(dirs)
await redis.set("dirs", json, ex=expire)
@property
async def get_dirs(self) -> dict:
async with aioredis.from_url(
f"redis://{self.host}", encoding="utf-8", decode_responses=True
) as redis:
resp = await redis.get("dirs")
if resp is None:
return {}
return json_loads(resp)

View File

@ -1,67 +0,0 @@
# -*- coding: utf-8 -*-
#############################
# Types for DataBase module #
#############################
# Imports
from enum import Enum
class CacheDBTypes(Enum):
"""
Types of CacheDB
"""
PythonPKL = "python"
Redis = "redis"
class UserStates(Enum):
"""
Types of User status
"""
IDLE = "IDLE"
DIRS = "DIRS"
WAIT_FOR_TORRENT = "WAIT_FOR_TORRENT"
class User(object):
"""
User class
"""
tg_id: int
name: str
state: UserStates = UserStates.IDLE
auth: bool = False
def __init__(
self,
tg_id: int,
name: str,
state: UserStates = UserStates.IDLE,
auth: bool = False,
) -> None:
self.tg_id = tg_id
self.name = name
self.state = state
self.auth = auth
@property
def to_dict(self):
return {
"tg_id": self.tg_id,
"name": self.name,
"state": self.state.value,
"auth": self.auth,
}
@classmethod
def from_dict(cls, usr: dict) -> "User":
tg = usr["tg_id"]
name = usr["name"]
state = UserStates(usr["state"])
auth = usr["auth"]
return cls(tg, name, state, auth)

View File

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
############################################
# Directory-Getter module abstract classes #
############################################
# Imports
from abc import ABC, abstractmethod
from tubot.static.abc import IValidatable
from tubot.dirgetter.types import GetterTypes
class DirGetter(IValidatable, ABC):
"""
DirectoryGetter Abstract class
"""
_gtype: GetterTypes
def __init__(self) -> None:
if self._gtype is None:
raise NotImplementedError("DirGetter type is not implemented")
@property
@abstractmethod
async def folders(self) -> dict:
"""
Returns a dictionary of media folders {name: path}
:return: Dict of media folders
"""
raise NotImplementedError

View File

@ -1,96 +0,0 @@
# -*- coding: utf-8 -*-
####################################
# Directory-Getter implementations #
####################################
# Imports
from tubot.dirgetter.types import GetterTypes
from tubot.dirgetter.abc import DirGetter
from aiofiles.os import listdir
from aiofiles.ospath import isdir
from aiohttp import ClientResponse, ClientSession
class OSGetter(DirGetter):
"""
Python.os module implementation of DirectoryGetter
"""
_gtype = GetterTypes.OS
base_dir: str
def __init__(self, base_dir: str) -> None:
"""
:param base_dir: Path to parent directory
"""
super().__init__()
self.base_dir = base_dir
@property
async def folders(self) -> dict:
dirs = {}
ls = await listdir(self.base_dir)
if len(ls) == 0:
raise KeyError("No dirs found")
for item in ls:
if await isdir(f"{self.base_dir}/{item}"):
dirs[item] = f"{self.base_dir}/{item}"
return dirs
async def __validate__(self) -> bool:
return await isdir(self.base_dir)
class Jellyfin(DirGetter):
"""
Jellyfin API implementation of DirectoryGetter
"""
_gtype = GetterTypes.Jellyfin
host: str
token: str
def __init__(self, host: str, api_token: str) -> None:
"""
:param host: Adress of Jellyfin server
:param api_token: Jellyfin API Token for auth
"""
super().__init__()
self.host = host
self.token = api_token
async def _get(self, api: str) -> ClientResponse:
async with ClientSession() as session:
resp = await session.get(f"{self.host}/{api}?api_key={self.token}")
status = resp.status
match status:
case 200:
return resp
case 401:
raise ConnectionError("401: Auth error")
case 403:
raise ConnectionError("403: Forbidden")
case 404:
raise ConnectionError("403: Not found")
raise ConnectionError()
@property
async def idx(self) -> str | None:
resp = await self._get("System/Info")
json = await resp.json()
return json["Id"]
@property
async def folders(self) -> dict:
resp = await self._get("Library/VirtualFolders")
json = await resp.json()
dirs = {}
for folder in json:
dirs[folder["Name"]] = folder["Locations"][0]
return dirs
async def __validate__(self) -> bool:
if await self.idx is not None:
return True
return False

View File

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
#####################################
# Types for Directory-Getter module #
#####################################
# Imports
from enum import Enum
class GetterTypes(Enum):
"""
Types of getters
"""
OS = "os"
Jellyfin = "jellyfin"

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
##########################################
# Shared Abstract classes and interfaces #
##########################################
# Imports
from abc import ABC, abstractmethod
from os import environ
class IValidatable(ABC):
"""
Interface initializing a class with a magic method for
checking the validity of its objects
"""
@abstractmethod
async def __validate__(self) -> bool:
"""
Checks if the object of the class is valid
:return: Object validity boolean
"""
raise NotImplementedError
class ENV(object):
_name: str | None = None
DEFAULT: str
def __init__(self) -> None:
if self._name is None or self.DEFAULT is None:
raise NotImplementedError
@property
def from_os(self) -> str | None:
if self._name is not None:
return environ.get(self._name)
@property
def value(self) -> str:
val = self.from_os
if val is not None:
return val
return self.DEFAULT
def __call__(self) -> str:
return self.value

View File

@ -1,122 +0,0 @@
# -*- coding: utf-8 -*-
############################
# Controller static module #
############################
# Imports
from tubot.torrent.abc import TorrentAPI, TorrentObj
from tubot.dirgetter.abc import DirGetter
from tubot.db.abc import CacheDB
from tubot.db.types import User, UserStates
from tubot.static.functions import validate
from tubot.static.env import AUTH_PASSWD, CACHE_EXPIRE
from tubot.static.exceptions import AlreadyExists, AuthError
class Controller(object):
"""
Controller object
"""
torrent: TorrentAPI
getter: DirGetter
cache: CacheDB
def __init__(self, torrent_api: TorrentAPI, dg: DirGetter, cache: CacheDB) -> None:
"""
:param torrent_api: TorrentAPI module
:param dg: DirectoryGetter module
:param cache: CacheDB module
"""
self.torrent = torrent_api
self.getter = dg
self.cache = cache
# DG
async def get_dirs(self) -> dict:
"""
Returns dict of dirs from cache / dg
"""
dirs = await self.cache.get_dirs
if len(dirs) > 0:
return dirs
dirs = await self.getter.folders
if len(dirs) > 0:
await self.cache.cache_dirs(dirs, int(CACHE_EXPIRE()()))
return dirs
raise KeyError("No dirs found")
# Torrent
async def upload_torrent(self, torrent: TorrentObj) -> None:
"""
Add torrent to query
:param torrent: Torrent object
"""
await validate(torrent, "Wrong torrent object")
await self.torrent.upload(torrent)
async def get_torrent_list(self) -> str:
"""
Returns message with current torrents list
"""
return await self.torrent.torrent_list
# Users
async def _create_user(self, tg_id: int, name: str) -> User:
"""
Creates user in database
:param tg_id: Telegram id
:param name: Telegram profile name
"""
user_obj = User(tg_id=tg_id, name=name)
await self.cache.write_user(tg_id, user_obj)
return user_obj
async def _get_user_from_db(self, tg_id: int) -> User:
"""
Gets user from database
:param tg_id: Telegram id
"""
return await self.cache.read_user(tg_id)
async def get_user(self, tg_id: int, name: str) -> User:
"""
Returns user object
:param tg_id: Telegram id
:param name: Telegram profile name
"""
if await self.cache.chech_user_existing(tg_id):
return await self._get_user_from_db(tg_id)
return await self._create_user(tg_id, name)
async def auth_user(self, user: User, pwd: str) -> None:
"""
Auth user
:param user: Current user object
"""
if user.auth:
raise AlreadyExists("You already auth")
if pwd == AUTH_PASSWD()():
user.auth = True
await self.cache.write_user(user.tg_id, user)
return
raise AuthError("Wrong password")
async def set_user_state(self, user: User, state: UserStates) -> None:
"""
Change user status
:param user: Current user object
:param state: New user status
"""
user.state = state
await self.cache.write_user(user.tg_id, user)

View File

@ -1,134 +0,0 @@
# -*- coding: utf-8 -*-
############
# ENV Vars #
############
# Imports
from tubot.static.abc import ENV
class PN_CACHE(ENV):
"""
Python Native Cache dir
"""
_name = "PN_CACHE"
DEFAULT = "/etc/tubot"
class REDIS_HOST(ENV):
"""
Redis host adress
"""
_name = "REDIS_HOST"
DEFAULT = "localhost:6379"
class CACHE_TYPE(ENV):
"""
CacheDB Type
"""
_name = "CACHE_TYPE"
DEFAULT = "python"
class DIR_GETTER(ENV):
"""
DirGetter Type
"""
_name = "DIR_GETTER"
DEFAULT = "os"
class DG_OS_FOLDER(ENV):
"""
Path to parent directory for OS_DirGetter
"""
_name = "DG_OS_FOLDER"
DEFAULT = "/mnt/Media"
class DG_JELLYFIN_HOST(ENV):
"""
Jellyfin Server API host
"""
_name = "DG_JELLYFIN_HOST"
DEFAULT = "http://localhost:8096"
class DG_JELLYFIN_TOKEN(ENV):
"""
Jellyfin API key
"""
_name = "DG_JELLYFIN_TOKEN"
DEFAULT = ""
class TORRENT_SERVER(ENV):
"""
Torrent Server Type
"""
_name = "TORRENT_SERVER"
DEFAULT = "qbit"
class TS_USER(ENV):
"""
Torrent Server auth username
"""
_name = "TS_USER"
DEFAULT = ""
class TS_PASSWORD(ENV):
"""
Torrent Server auth password
"""
_name = "TS_PASSWORD"
DEFAULT = ""
class TS_HOST(ENV):
"""
Torrent Server host
"""
_name = "TS_HOST"
DEFAULT = "http://localhost"
class AUTH_PASSWD(ENV):
"""
Password for users auth
"""
_name = "AUTH_PASSWD"
DEFAULT = "changeme"
class BOT_TOKEN(ENV):
"""
TelegramAPI Bot Token
"""
_name = "BOT_TOKEN"
DEFAULT = ""
class CACHE_EXPIRE(ENV):
"""
Cache expire time in seconds
"""
_name = "CACHE_EXPIRE"
DEFAULT = "120"

View File

@ -1,23 +0,0 @@
# -*- coding: utf-8 -*-
##################
# ExceptionTypes #
##################
class ValidationError(Exception):
"""
Validation error exception
"""
class AuthError(Exception):
"""
Authentification error exception
"""
class AlreadyExists(Exception):
"""
Object already exists error exception
"""

View File

@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
####################
# Static functions #
####################
# Imports
from tubot.static.abc import IValidatable
from tubot.static.exceptions import ValidationError
async def validate(obj: IValidatable, msg: str | None = None) -> bool:
"""
Throws an exception if the object has not been validated
:return: Object validity boolean
"""
if await obj.__validate__():
return True
if msg is None:
raise ValidationError("Object validation failed")
raise ValidationError(f"Object validation failed: {msg}")

View File

@ -1,103 +0,0 @@
# -*- coding: utf-8 -*-
######################
# Init static module #
######################
# Imports
from typing import Iterable
from tubot.static import env
from tubot.static.functions import validate
from tubot.torrent.apis import qBitTorrent
from tubot.torrent.types import ServerTypes
from tubot.torrent.abc import TorrentAPI
from tubot.dirgetter.types import GetterTypes
from tubot.dirgetter.abc import DirGetter
from tubot.dirgetter.getter import OSGetter, Jellyfin
from tubot.db.types import CacheDBTypes
from tubot.db.abc import CacheDB
from tubot.db.cache import PythonCache, RedisCache
from asyncio import create_task, gather
class InitBuilder(object):
"""
Init all bot modules
"""
TORRENT_SERVER: ServerTypes | TorrentAPI
DG: GetterTypes | DirGetter
CACHE: CacheDBTypes | CacheDB
def set_torrent_server(self, server_type: ServerTypes) -> "InitBuilder":
self.TORRENT_SERVER = server_type
return self
def set_directory_getter(self, dg_type: GetterTypes) -> "InitBuilder":
self.DG = dg_type
return self
def set_cache_type(self, cache_type: CacheDBTypes) -> "InitBuilder":
self.CACHE = cache_type
return self
async def init_ts(self) -> None:
host = env.TS_HOST()()
user = env.TS_USER()()
pwd = env.TS_PASSWORD()()
match self.TORRENT_SERVER:
case ServerTypes.qBitTorrent:
self.TORRENT_SERVER = qBitTorrent(host, user, pwd)
case _:
raise TypeError
await validate(self.TORRENT_SERVER, "TorrentServerAPI validation error")
async def init_dg(self) -> None:
match self.DG:
case GetterTypes.OS:
base_dir = env.DG_OS_FOLDER()()
self.DG = OSGetter(base_dir)
case GetterTypes.Jellyfin:
host = env.DG_JELLYFIN_HOST()()
key = env.DG_JELLYFIN_TOKEN()()
self.DG = Jellyfin(host, key)
case _:
raise TypeError
await validate(self.DG, "DirectoryGetter validation error")
async def init_cache(self) -> None:
match self.CACHE:
case CacheDBTypes.PythonPKL:
self.CACHE = PythonCache()
case CacheDBTypes.Redis:
host = env.REDIS_HOST()()
self.CACHE = RedisCache(host)
case _:
raise TypeError
await validate(self.CACHE, "CacheDataBase validation error")
async def init_all_modules(self) -> None:
tasks = (
create_task(self.init_ts()),
create_task(self.init_dg()),
create_task(self.init_cache()),
)
await gather(*tasks)
@property
def tuple(self) -> Iterable:
return (self.TORRENT_SERVER, self.DG, self.CACHE)
async def init_modules() -> Iterable:
ts = ServerTypes(env.TORRENT_SERVER()())
dg = GetterTypes(env.DIR_GETTER()())
cache = CacheDBTypes(env.CACHE_TYPE()())
builder = (
InitBuilder()
.set_torrent_server(ts)
.set_directory_getter(dg)
.set_cache_type(cache)
)
await builder.init_all_modules()
return builder.tuple

View File

@ -1,82 +0,0 @@
# -*- coding: utf-8 -*-
######################################################
# Abstract methods and interfaces for torrent module #
######################################################
from abc import ABC, abstractmethod
from tubot.static.abc import IValidatable
from tubot.torrent.types import TorrentTypes, ServerTypes
class TorrentObj(IValidatable, ABC):
"""
Abstract class of torrent object
"""
_ttype: TorrentTypes # Torrent type property
dest: str
content: str
@property
def torrent_type(self) -> TorrentTypes:
"""
:return: Torrent type
"""
if self._ttype is None:
raise NotImplementedError("Torrent type not implemented")
return self._ttype
def __init__(self, content: str, destination: str) -> None:
"""
:param content: Torrent content (link, file path)
:param destination: Download directory
"""
self.content = content
self.dest = destination
class TorrentAPI(IValidatable, ABC):
"""
Abstract class of torrent-server API's
"""
_atype: ServerTypes # Server type propery
def __init__(self) -> None:
if self._atype is None:
raise NotImplementedError("Torrent Server type not implemented")
async def upload(self, torrent: TorrentObj) -> None:
"""
Adds the torrent to a queue on the server
:param torrent: TorrenObject type (file, magnet, etc.)
"""
match torrent.torrent_type:
case TorrentTypes.File:
await self.upload_file(torrent)
case TorrentTypes.Magnet:
await self.upload_magnet(torrent)
case TorrentTypes.URL:
await self.upload_url(torrent)
@abstractmethod
async def upload_file(self, torrent) -> None:
raise NotImplementedError
@abstractmethod
async def upload_magnet(self, torrent) -> None:
raise NotImplementedError
@abstractmethod
async def upload_url(self, torrent) -> None:
raise NotImplementedError
@property
@abstractmethod
async def torrent_list(self) -> str:
"""
Returns PlainString with current torrent queue
"""
raise NotImplementedError

View File

@ -1,131 +0,0 @@
# -*- coding: utf-8 -*-
########################################
# Torrent Server API's implementations #
########################################
# Imports
from http.cookies import SimpleCookie
from aiohttp import ClientSession, ClientResponse, FormData
from tubot.torrent.abc import TorrentAPI
from tubot.torrent.torrents import TorrentFile, TorrentMagnet, TorrentURL
from tubot.torrent.types import ServerTypes, TorrentListBuilder
from tubot.static.functions import validate
from tubot.static.exceptions import AuthError
class qBitTorrent(TorrentAPI):
"""
qBitTorrent API implementation
"""
host: str
username: str
password: str
cookie: SimpleCookie | None
_atype = ServerTypes.qBitTorrent
def __init__(self, host: str, username: str, password: str) -> None:
"""
:param host: qBitTorrent remote server adress
:param username: qBitTorrent remote username
:param password: qBitTorrent remote password
"""
super().__init__()
self.cookie = None
self.host = host
self.username = username
self.password = password
async def _get(
self, api: str, cookie: SimpleCookie | None = None
) -> ClientResponse:
"""
Send get request to Torrent server
:param api: API schema
:param cookie: Cookies for auth
"""
async with ClientSession() as session:
return await session.get(url=f"{self.host}/{api}", cookies=cookie)
async def _post(
self,
api: str,
cookie: SimpleCookie | None = None,
data: dict | FormData | None = None,
) -> ClientResponse:
"""
Send post request to Torrent server
:param api: API schema
:param cookie: Cookies for auth
:param data: Request data
"""
async with ClientSession() as session:
return await session.post(
url=f"{self.host}/{api}", cookies=cookie, data=data
)
async def auth(self) -> bool:
"""
Generates cookies for auth
"""
creds = {"username": self.username, "password": self.password}
resp = await self._post(api="api/v2/auth/login", data=creds)
try:
if resp.status == 200:
cookies = resp.cookies
resp = await self._get(api="api/v2/app/version", cookie=cookies)
if resp.status != 200:
raise AuthError("Wrong creds")
self.cookie = cookies
return True
except Exception:
pass
return False
async def upload_file(self, torrent: TorrentFile) -> None:
await validate(self, "Connection to TorrentServer failed")
await validate(torrent, "Bad .torrent file")
bytes = await torrent.getbytes()
data = FormData()
data.add_field(
"torrents",
bytes,
filename=torrent.content,
content_type="application/x-bittorrent",
)
data.add_field("savepath", torrent.dest)
await self._post("api/v2/torrents/add", cookie=self.cookie, data=data)
async def upload_magnet(self, torrent: TorrentMagnet) -> None:
await validate(self, "Connection to TorrentServer failed")
await validate(torrent, "Bad magnet link")
data = {"urls": torrent.content, "savepath": torrent.dest}
await self._post("api/v2/torrents/add", cookie=self.cookie, data=data)
async def upload_url(self, torrent: TorrentURL) -> None:
await validate(self, "Connection to TorrentServer failed")
await validate(torrent, "Bad url")
data = {"urls": torrent.content, "savepath": torrent.dest}
await self._post("api/v2/torrents/add", cookie=self.cookie, data=data)
@property
async def torrent_list(self) -> str:
await validate(self, "Connection to TorrentServer failed")
responce = await self._get(
"api/v2/torrents/info?filter=completed,downloading&sort=progress",
cookie=self.cookie,
)
responce = await responce.json()
lst = tuple(
map(lambda i: (i["name"], i["state"], float(i["progress"])), responce)
)
lb = TorrentListBuilder()
for torrent in lst:
lb.append(torrent)
return str(lb)
async def __validate__(self) -> bool:
return await self.auth()

View File

@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
#################################
# Torrent types implementations #
#################################
# Imports
from tubot.torrent.types import TorrentTypes
from tubot.torrent.abc import TorrentObj
from aiofiles import open, ospath
from magic import Magic
from re import match
from urllib.parse import urlparse
class TorrentFile(TorrentObj):
"""
.torrent file
"""
_ttype = TorrentTypes.File
async def __validate__(self) -> bool:
if await ospath.isfile(self.content):
mime = Magic(mime=True).from_file(self.content)
if mime == "application/x-bittorrent":
return True
return False
async def getbytes(self) -> bytes:
async with open(self.content, "rb") as dottorrent:
return await dottorrent.read()
class TorrentMagnet(TorrentObj):
"""
Torrent magnet link
"""
_ttype = TorrentTypes.Magnet
async def __validate__(self) -> bool:
pattern = r"^magnet:\?xt=urn:btih:[a-fA-F0-9]{40}.*$"
if match(pattern, self.content):
return True
return False
class TorrentURL(TorrentObj):
"""
Http(s) link to .torrent file
"""
_ttype = TorrentTypes.URL
async def __validate__(self) -> bool:
try:
parse = urlparse(self.content)
return all([parse.scheme, parse.netloc])
except (TypeError, AttributeError):
return False

View File

@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
############################
# Types for torrent module #
############################
# Imports
from collections.abc import Iterable
from enum import Enum
class TorrentTypes(Enum):
"""
Types of torrents
"""
File = ".torrent file"
Magnet = "torrent magnet link"
URL = "http(s) link to .torrent file"
class ServerTypes(Enum):
"""
Types of Torrent servers API's
"""
qBitTorrent = "qbit"
class TorrentFromServer(object):
name: str
state: str
percent: float
def __init__(self, name: str, state: str, percent: float) -> None:
self.name = name
self.state = state
self.percent = round(percent * 100, 1)
def __str__(self) -> str:
return f"*Torrent:* {self.name}\n*State:* {self.state}\n*Progress:* {self.percent}%"
class TorrentListBuilder(object):
"""
Torrent list type
"""
collection: list
def __init__(self) -> None:
self.collection = []
def append(self, torrent_data: Iterable) -> "TorrentListBuilder":
item = TorrentFromServer(*torrent_data)
self.collection.append(str(item))
return self
def __str__(self) -> str:
return "\n---\n".join(self.collection)