Compare commits

..

4 Commits

Author SHA1 Message Date
trueold89 e6c44193f8
Update bot.py
- Add move bot event
2024-06-03 18:00:23 +03:00
trueold89 1199c74aa7
Add comments/docstrings to bot.py 2024-06-03 16:59:02 +03:00
trueold89 8b36f6c0b2
Init bot.py
- Add create bot command
2024-06-03 16:53:40 +03:00
trueold89 b6690160be
Update ENV class & ServersDB class & DB class
- Add BOT_TOKEN property to ENV
- Add CIRCLES_COUNT property to ENV
- Add channel_id param to add_channel method
- Update accessibility of some methods
2024-06-03 16:44:41 +03:00
3 changed files with 130 additions and 15 deletions

88
hellmbot/bot.py Normal file
View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
from hellmbot.env import ENV
from hellmbot.db import ServersDB
from discord import Intents, Member, VoiceState
from discord.ext import commands
###############
# Bot's logic #
###############
def init_bot() -> commands.Bot:
"""
Initializes the bot object
:return: Bot class object
"""
intents = Intents.default()
intents.message_content = True
return commands.Bot(command_prefix="/", intents=intents)
# Init bot object
bot = init_bot()
def start() -> None:
"""
Starts bot polling
"""
bot.run(ENV.BOT_TOKEN.fget(None))
@bot.command()
async def create(ctx: commands.Context) -> None:
"""
Creates a group of voice channels to move the user and adds their id to the database
:param ctx: Command context
"""
server = ctx.guild
db = ServersDB(server.id)
circles_count = ENV.CIRCLES_COUNT.fget(None)
group = await server.create_category(f"{circles_count} Circles of Hell")
if db:
db.clear_channels()
for circle in range(circles_count):
vc = await server.create_voice_channel(f"{circle + 1} Circle", category=group)
db.add_channel(vc.id, circle + 1)
user_before_channels = {}
@bot.event
async def on_voice_state_update(member: Member, before: VoiceState, after: VoiceState):
"""
Moves the user through the group channels if the user has been connected to one of them
:param member: Discord member class objet
:param before: Before state of voice channel
:param after: After state of voice channel
"""
server = member.guild.id
if before.channel != after.channel:
db = ServersDB(server)
channels = db.channels
if after.channel and after.channel.id in channels:
if member.id not in tuple(user_before_channels.keys()):
user_before_channels[member.id] = None
if before.channel is not None:
user_before_channels[member.id] = before.channel.id
current_idx = channels.index(after.channel.id)
if current_idx + 1 != len(channels):
next_channel = bot.get_channel(channels[current_idx + 1])
else:
next_id = user_before_channels[member.id]
del user_before_channels[member.id]
if next_id is None:
next_channel = next_id
else:
next_channel = bot.get_channel(next_id)
await member.move_to(next_channel)
if __name__ == "__main__":
start()

View File

@ -39,7 +39,7 @@ class DataBase(object):
lst = [f"{column_title.lower()} {column_type.value}" for column_title, column_type in lst] lst = [f"{column_title.lower()} {column_type.value}" for column_title, column_type in lst]
return ", ".join(lst) return ", ".join(lst)
def execute(self, request: str, params: list[Any] = None) -> None: def _execute(self, request: str, params: list[Any] = None) -> None:
""" """
Executes sqlite query Executes sqlite query
@ -54,7 +54,7 @@ class DataBase(object):
cursor.execute(request, params) cursor.execute(request, params)
db.commit() db.commit()
def gentable(self, table: str, columns: dict[str: DBColumnsTypes]) -> None: def _gentable(self, table: str, columns: dict[str: DBColumnsTypes]) -> None:
""" """
Generates table in the database Generates table in the database
@ -62,9 +62,9 @@ class DataBase(object):
:param columns: Columns in the table :param columns: Columns in the table
""" """
request = f"CREATE TABLE IF NOT EXISTS {table} (id INTEGER PRIMARY KEY, {self.__getcolumns(columns)})" request = f"CREATE TABLE IF NOT EXISTS {table} (id INTEGER PRIMARY KEY, {self.__getcolumns(columns)})"
self.execute(request) self._execute(request)
def insert(self, table: str, items: dict[str: Any]) -> None: def _insert(self, table: str, items: dict[str: Any]) -> None:
""" """
Inserts data into a field of a database table Inserts data into a field of a database table
@ -75,9 +75,9 @@ class DataBase(object):
columns = ", ".join(columns) columns = ", ".join(columns)
values = list(items.values()) values = list(items.values())
request = f"INSERT INTO {table} ({columns}) VALUES ({("?, " * len(values))[:-2]})" request = f"INSERT INTO {table} ({columns}) VALUES ({("?, " * len(values))[:-2]})"
self.execute(request, values) self._execute(request, values)
def get(self, table: str, column: list[str] = None, where: dict[str: Any] = None, order: str = None) -> list[Any]: def _get(self, table: str, column: list[str] = None, where: dict[str: Any] = None, order: str = None) -> list[Any]:
""" """
Returns data from sqlite table Returns data from sqlite table
@ -108,10 +108,10 @@ class DataBase(object):
cursor.execute(*request) cursor.execute(*request)
return cursor.fetchall() return cursor.fetchall()
def delete(self, table: str, where: dict[str: Any]) -> None: def _delete(self, table: str, where: dict[str: Any]) -> None:
values = list(where.values()) values = list(where.values())
request = f"DELETE FROM {table} WHERE {", ".join(tuple(where.keys()))} = ({("?, " * len(values))[:-2]})" request = f"DELETE FROM {table} WHERE {", ".join(tuple(where.keys()))} = ({("?, " * len(values))[:-2]})"
self.execute(request, values) self._execute(request, values)
class ServersDB(DataBase): class ServersDB(DataBase):
@ -129,7 +129,7 @@ class ServersDB(DataBase):
:param server_id: id of discord server :param server_id: id of discord server
""" """
self.gentable(self.TABLE, { self._gentable(self.TABLE, {
"server_id": DBColumnsTypes.integer_number, # id of discord server "server_id": DBColumnsTypes.integer_number, # id of discord server
"channel_id": DBColumnsTypes.integer_number, # id of discord voice channel id "channel_id": DBColumnsTypes.integer_number, # id of discord voice channel id
"loop": DBColumnsTypes.integer_number # Channel sequence number "loop": DBColumnsTypes.integer_number # Channel sequence number
@ -140,20 +140,22 @@ class ServersDB(DataBase):
""" """
Checks the existence of the server in the database table Checks the existence of the server in the database table
""" """
lst = self.get(self.TABLE, ["server_id"]) lst = self._get(self.TABLE, ["server_id"])
if len(lst) > 0: if len(lst) > 0:
return True return True
return False return False
def add_channel(self, channel_id: int) -> None: def add_channel(self, channel_id: int, loop_number: int) -> None:
""" """
Adds channel to the database Adds channel to the database
:param channel_id: id of discord channel :param channel_id: id of discord channel
:param loop_number: Channel sequence number in the group
""" """
self.insert(self.TABLE, { self._insert(self.TABLE, {
"server_id": self.server, "server_id": self.server,
"channel_id": channel_id "channel_id": channel_id,
"loop": loop_number
}) })
@property @property
@ -163,7 +165,7 @@ class ServersDB(DataBase):
:return: tuple of server channel id's :return: tuple of server channel id's
""" """
lst = self.get(self.TABLE, ["channel_id"], {"server_id": self.server}, "loop") lst = self._get(self.TABLE, ["channel_id"], {"server_id": self.server}, "loop")
if len(lst) == 0: if len(lst) == 0:
raise IndexError("This server has no added channels") raise IndexError("This server has no added channels")
return tuple(map(lambda element: element[0], lst)) return tuple(map(lambda element: element[0], lst))
@ -172,7 +174,7 @@ class ServersDB(DataBase):
""" """
Deletes all server channels from the database Deletes all server channels from the database
""" """
self.delete(self.TABLE, {"server_id": self.server}) self._delete(self.TABLE, {"server_id": self.server})
def __iter__(self) -> Iterable[int]: def __iter__(self) -> Iterable[int]:
""" """

View File

@ -21,3 +21,28 @@ class ENV(object):
if env is None: if env is None:
return default return default
return env return env
@property
def BOT_TOKEN(self) -> str:
"""
Gets the Discord bot authorization token from the system environment
:return: Discord bot token
"""
env = environ.get("BOT_TOKEN")
if env is None:
raise ValueError("Bot token is not set\nTry to install the system ENV BOT_TOKEN\n(export "
"BOT_TOKEN=inserthereyourbottoken)")
return env
@property
def CIRCLES_COUNT(self) -> int:
"""
Sets count of channels to be created
:return: Count of channels
"""
env = environ.get("CIRCLES_COUNT")
if env is None:
env = 9
return env