diff --git a/db/__init__.py b/db/__init__.py
index 760c5d3..1eab11c 100644
--- a/db/__init__.py
+++ b/db/__init__.py
@@ -1,3 +1,5 @@
-from .base import Base, GetDB
+"""Database module initialization."""
+
+from .base import Base, get_db
from .crud import TokenManager
from .models import Token
diff --git a/db/alembic/env.py b/db/alembic/env.py
index d20989f..668a9b0 100644
--- a/db/alembic/env.py
+++ b/db/alembic/env.py
@@ -1,3 +1,12 @@
+# pylint: disable=all
+
+"""
+Alembic environment configuration for running database migrations.
+
+This module configures and runs Alembic migrations for the database, supporting both
+synchronous (offline) and asynchronous (online) migration modes.
+"""
+
import asyncio
from logging.config import fileConfig
@@ -8,66 +17,56 @@
from db.base import Base
-# this is the Alembic Config object, which provides
-# access to the values within the .ini file in use.
-config = context.config
+# Alembic Config object for accessing values within the .ini file.
+config = context.config # pylint: disable=no-member
config.set_main_option("sqlalchemy.url", "sqlite+aiosqlite:///data/db.sqlite3")
-# Interpret the config file for Python logging.
-# This line sets up loggers basically.
+
+# Set up loggers from config file if available
if config.config_file_name is not None:
fileConfig(config.config_file_name)
-# add your model's MetaData object here
-# for 'autogenerate' support
-# from myapp import mymodel
-# target_metadata = mymodel.Base.metadata
+# Metadata object for 'autogenerate' support in migrations
target_metadata = Base.metadata
-# other values from the config, defined by the needs of env.py,
-# can be acquired:
-# my_important_option = config.get_main_option("my_important_option")
-# ... etc.
-
def run_migrations_offline() -> None:
- """Run migrations in 'offline' mode.
-
- This configures the context with just a URL
- and not an Engine, though an Engine is acceptable
- here as well. By skipping the Engine creation
- we don't even need a DBAPI to be available.
-
- Calls to context.execute() here emit the given string to the
- script output.
+ """
+ Run migrations in 'offline' mode.
+ Configures the context with a URL instead of an Engine,
+ allowing migrations without DBAPI.
"""
url = config.get_main_option("sqlalchemy.url")
- context.configure(
+ context.configure( # pylint: disable=no-member
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
- with context.begin_transaction():
- context.run_migrations()
+ with context.begin_transaction(): # pylint: disable=no-member
+ context.run_migrations() # pylint: disable=no-member
def do_run_migrations(connection: Connection) -> None:
- context.configure(connection=connection, target_metadata=target_metadata)
+ """
+ Configures the context for a migration and executes the migrations.
+ """
+ context.configure(
+ connection=connection, target_metadata=target_metadata # pylint: disable=no-member
+ )
- with context.begin_transaction():
- context.run_migrations()
+ with context.begin_transaction(): # pylint: disable=no-member
+ context.run_migrations() # pylint: disable=no-member
async def run_async_migrations() -> None:
- """In this scenario we need to create an Engine
- and associate a connection with the context.
-
"""
-
+ Creates an asynchronous Engine and associates a connection
+ with the Alembic migration context.
+ """
connectable = async_engine_from_config(
- config.get_section(config.config_ini_section, {}),
+ config.get_section(config.config_ini_section) or {},
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
@@ -79,12 +78,14 @@ async def run_async_migrations() -> None:
def run_migrations_online() -> None:
- """Run migrations in 'online' mode."""
-
+ """
+ Run migrations in 'online' mode using asynchronous connections.
+ """
asyncio.run(run_async_migrations())
-if context.is_offline_mode():
+# Run migrations based on the mode (offline or online)
+if context.is_offline_mode(): # pylint: disable=no-member
run_migrations_offline()
else:
run_migrations_online()
diff --git a/db/alembic/versions/3e5deef43bf0_init_commit.py b/db/alembic/versions/3e5deef43bf0_init_commit.py
index cf99376..8f5ef70 100644
--- a/db/alembic/versions/3e5deef43bf0_init_commit.py
+++ b/db/alembic/versions/3e5deef43bf0_init_commit.py
@@ -1,18 +1,16 @@
+# pylint: skip-file
"""init commit
Revision ID: 3e5deef43bf0
Revises:
Create Date: 2024-10-11 15:47:37.464534
-
"""
from typing import Sequence, Union
-
from alembic import op
import sqlalchemy as sa
-
-# revision identifiers, used by Alembic.
+# Revision identifiers, used by Alembic.
revision: str = "3e5deef43bf0"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
@@ -20,8 +18,8 @@
def upgrade() -> None:
- # ### commands auto generated by Alembic - please adjust! ###
- op.create_table(
+ """Create the tokens table."""
+ op.create_table( # pylint: disable=no-member
"tokens",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("token", sa.String(length=255), nullable=False),
@@ -30,10 +28,8 @@ def upgrade() -> None:
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("token"),
)
- # ### end Alembic commands ###
def downgrade() -> None:
- # ### commands auto generated by Alembic - please adjust! ###
- op.drop_table("tokens")
- # ### end Alembic commands ###
+ """Drop the tokens table."""
+ op.drop_table("tokens") # pylint: disable=no-member
diff --git a/db/alembic/versions/ab1ce3ef2a57_add_settings.py b/db/alembic/versions/ab1ce3ef2a57_add_settings.py
index feedee6..29dadc2 100644
--- a/db/alembic/versions/ab1ce3ef2a57_add_settings.py
+++ b/db/alembic/versions/ab1ce3ef2a57_add_settings.py
@@ -3,38 +3,35 @@
Revision ID: ab1ce3ef2a57
Revises: 3e5deef43bf0
Create Date: 2024-10-13 01:42:55.733416
-
"""
from typing import Sequence, Union
-
from alembic import op
import sqlalchemy as sa
-
-# revision identifiers, used by Alembic.
+# Revision identifiers, used by Alembic.
revision: str = "ab1ce3ef2a57"
down_revision: Union[str, None] = "3e5deef43bf0"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
-def upgrade():
- # Create settings table
- op.create_table(
+def upgrade() -> None:
+ """Create the settings table."""
+ op.create_table( # pylint: disable=no-member
"settings",
sa.Column("key", sa.String(256), primary_key=True),
sa.Column("value", sa.String(2048), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
- server_default=sa.func.current_timestamp(),
+ server_default=sa.func.current_timestamp(), # pylint: disable=not-callable
nullable=False,
),
sa.Column("updated_at", sa.DateTime(), nullable=True),
)
-def downgrade():
- # Drop settings table
- op.drop_table("settings")
+def downgrade() -> None:
+ """Drop the settings table."""
+ op.drop_table("settings") # pylint: disable=no-member
diff --git a/db/base.py b/db/base.py
index 0d1cc21..764c230 100644
--- a/db/base.py
+++ b/db/base.py
@@ -1,10 +1,16 @@
+"""
+Database base module.
+
+This module provides the asynchronous engine, session factory, and base class for
+declarative models.
+"""
+
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncAttrs, AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
-
# Create an asynchronous engine
engine = create_async_engine(
"sqlite+aiosqlite:///data/db.sqlite3",
@@ -21,13 +27,20 @@
)
-# Define a base class for declarative models
class Base(DeclarativeBase, AsyncAttrs):
- pass
+ """Base class for declarative models using SQLAlchemy."""
+
+ def save(self, session: AsyncSession) -> None:
+ """Save the current instance to the database."""
+ session.add(self)
+
+ def delete(self, session: AsyncSession) -> None:
+ """Delete the current instance from the database."""
+ session.delete(self)
@asynccontextmanager
-async def GetDB() -> AsyncGenerator[AsyncSession, None]:
+async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Provide an asynchronous database session to the application.
"""
diff --git a/db/crud/__init__.py b/db/crud/__init__.py
index 4ff3a2a..5f64f08 100644
--- a/db/crud/__init__.py
+++ b/db/crud/__init__.py
@@ -1,2 +1,8 @@
+"""
+This module initializes the CRUD operations for the application.
+
+It includes managers for handling tokens and settings.
+"""
+
from .token import TokenManager
from .setting import SettingManager
diff --git a/db/crud/setting.py b/db/crud/setting.py
index 14174be..21b3e2f 100644
--- a/db/crud/setting.py
+++ b/db/crud/setting.py
@@ -1,14 +1,33 @@
+"""
+This module provides functionality for managing settings in the application.
+
+It includes methods for upserting and retrieving settings from the database.
+"""
+
from sqlalchemy.future import select
-from db.base import GetDB
+from db.base import get_db
from db.models import Setting
from models import SettingData, SettingUpsert, SettingKeys
class SettingManager:
+ """Manager class for handling settings operations."""
@staticmethod
async def upsert(setting_upsert: SettingUpsert) -> SettingData | None:
- async with GetDB() as db:
+ """
+ Upsert a setting in the database.
+
+ If the setting exists and the value is None, it will be deleted.
+ If the setting does not exist, it will be created.
+
+ Args:
+ setting_upsert (SettingUpsert): The setting data to upsert.
+
+ Returns:
+ SettingData | None: The upserted setting data or None if deleted.
+ """
+ async with get_db() as db:
existing_setting = await db.execute(
select(Setting).where(Setting.key == setting_upsert.key)
)
@@ -19,21 +38,30 @@ async def upsert(setting_upsert: SettingUpsert) -> SettingData | None:
await db.delete(setting)
await db.commit()
return None
- else:
- setting.value = setting_upsert.value
+ setting.value = setting_upsert.value
else:
- if setting_upsert.value is None:
- return None
- setting = Setting(key=setting_upsert.key, value=setting_upsert.value)
- db.add(setting)
+ if setting_upsert.value is not None:
+ setting = Setting(
+ key=setting_upsert.key, value=setting_upsert.value
+ )
+ db.add(setting)
await db.commit()
await db.refresh(setting)
return SettingData.from_orm(setting)
@staticmethod
- async def get(key: SettingKeys) -> SettingData:
- async with GetDB() as db:
+ async def get(key: SettingKeys) -> SettingData | None:
+ """
+ Retrieve a setting by its key.
+
+ Args:
+ key (SettingKeys): The key of the setting to retrieve.
+
+ Returns:
+ SettingData | None: The retrieved setting data or None if not found.
+ """
+ async with get_db() as db:
result = await db.execute(select(Setting).where(Setting.key == key))
setting = result.scalar_one_or_none()
return SettingData.from_orm(setting) if setting else None
diff --git a/db/crud/token.py b/db/crud/token.py
index 7afb09a..4cea832 100644
--- a/db/crud/token.py
+++ b/db/crud/token.py
@@ -1,17 +1,33 @@
+"""
+This module provides functionality for managing tokens in the application.
+
+It includes methods for upserting and retrieving tokens from the database.
+"""
+
from sqlalchemy.future import select
-from db.base import GetDB
+from db.base import get_db
from db.models import Token
-from models import (
- TokenUpsert,
- TokenData,
-)
+from models import TokenUpsert, TokenData
class TokenManager:
+ """Manager class for handling token operations."""
@staticmethod
async def upsert(token_upsert: TokenUpsert) -> TokenData:
- async with GetDB() as db:
+ """
+ Upsert a token in the database.
+
+ If the token with ID 1 exists, it will be updated. If it does not exist,
+ a new token will be created with ID 1.
+
+ Args:
+ token_upsert (TokenUpsert): The token data to upsert.
+
+ Returns:
+ TokenData: The upserted token data.
+ """
+ async with get_db() as db:
existing_token = await db.execute(select(Token).where(Token.id == 1))
token = existing_token.scalar_one_or_none()
@@ -27,7 +43,13 @@ async def upsert(token_upsert: TokenUpsert) -> TokenData:
@staticmethod
async def get() -> TokenData:
- async with GetDB() as db:
+ """
+ Retrieve the token with ID 1 from the database.
+
+ Returns:
+ TokenData | None: The retrieved token data or None if not found.
+ """
+ async with get_db() as db:
result = await db.execute(select(Token).where(Token.id == 1))
token = result.scalar_one_or_none()
return TokenData.from_orm(token) if token else None
diff --git a/db/models.py b/db/models.py
index 62af69f..0dcfaf4 100644
--- a/db/models.py
+++ b/db/models.py
@@ -1,30 +1,40 @@
+"""
+Database models for the application.
+
+This module defines the SQLAlchemy models for the tokens and settings.
+"""
+
from datetime import datetime
-from sqlalchemy import Integer, DateTime, String, func
+from sqlalchemy import Integer, DateTime, String
from sqlalchemy.orm import Mapped, mapped_column
from db.base import Base
class Token(Base):
+ """Model representing a token."""
+
__tablename__ = "tokens"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
token: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
created_at: Mapped[datetime] = mapped_column(
- DateTime(timezone=True), default=func.now(), nullable=False
+ DateTime(timezone=True), default=datetime.now, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
- DateTime(timezone=True), onupdate=func.now(), nullable=True
+ DateTime(timezone=True), onupdate=datetime.now, nullable=True
)
class Setting(Base):
+ """Model representing a setting."""
+
__tablename__ = "settings"
key: Mapped[str] = mapped_column(String(256), primary_key=True)
value: Mapped[str] = mapped_column(String(2048))
created_at: Mapped[datetime] = mapped_column(
- DateTime(timezone=True), default=func.now(), nullable=False
+ DateTime(timezone=True), default=datetime.now, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
- DateTime(timezone=True), onupdate=func.now(), nullable=True
+ DateTime(timezone=True), onupdate=datetime.now, nullable=True
)
diff --git a/jobs/__init__.py b/jobs/__init__.py
index fee7426..e6a99a3 100644
--- a/jobs/__init__.py
+++ b/jobs/__init__.py
@@ -1 +1,7 @@
+"""
+This module serves as the initialization for the jobs package,
+including the scheduling of background tasks such as token updates
+and node monitoring.
+"""
+
from .scheduler import start_scheduler, stop_scheduler
diff --git a/jobs/node_monitoring.py b/jobs/node_monitoring.py
index b91ab5e..e07177b 100644
--- a/jobs/node_monitoring.py
+++ b/jobs/node_monitoring.py
@@ -1,5 +1,9 @@
-import asyncio
+"""
+This module handles monitoring of nodes in the Marzban panel, including error reporting
+and automatic restarts if configured.
+"""
+import asyncio
from marzban import MarzbanAPI
from db.crud import SettingManager, TokenManager
@@ -11,8 +15,9 @@
async def node_checker():
+ """Check the status of nodes and perform actions based on their status."""
node_checker_is_active = await SettingManager.get(
- SettingKeys.NodeMonitoringIsActive
+ SettingKeys.NODE_MONITORING_IS_ACTIVE
)
if not node_checker_is_active:
return
@@ -24,7 +29,6 @@ async def node_checker():
nodes = await panel.get_nodes(token.token)
anti_spam = False
for node in nodes:
-
if node.name in EXCLUDED_MONITORINGS:
continue
@@ -33,7 +37,7 @@ async def node_checker():
await report.node_error(node)
node_auto_restart = await SettingManager.get(
- SettingKeys.NodeMonitoringAutoRestart
+ SettingKeys.NODE_MONITORING_AUTO_RESTART
)
if not node_auto_restart:
continue
@@ -43,7 +47,7 @@ async def node_checker():
try:
await panel.reconnect_node(node.id, token.token)
await report.node_restart(node, True)
- except: # noqa: E722
+ except (ConnectionError, TimeoutError): # Omit the variable if not used
await report.node_restart(node, False)
if anti_spam:
diff --git a/jobs/scheduler.py b/jobs/scheduler.py
index 6889b43..3cafe43 100644
--- a/jobs/scheduler.py
+++ b/jobs/scheduler.py
@@ -1,3 +1,7 @@
+"""
+This module handles scheduling jobs for token updates and node monitoring.
+"""
+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from jobs.token_updater import token_update
@@ -8,6 +12,7 @@
async def start_scheduler() -> bool:
+ """Start the job scheduler for token updates and node monitoring."""
logger.info("Trying to start the scheduler.")
try:
@@ -35,18 +40,19 @@ async def start_scheduler() -> bool:
id="node_monitor",
replace_existing=True,
)
- logger.info("Token update job added to scheduler with ID 'node_monitor'.")
+ logger.info("Node monitoring job added to scheduler with ID 'node_monitor'.")
return True
- except Exception as e:
- logger.error(f"An error occurred while starting the scheduler: {e}")
+ except (RuntimeError, ValueError) as e: # Specify expected exceptions here
+ logger.error("An error occurred while starting the scheduler: %s", e)
return False
async def stop_scheduler() -> None:
+ """Stop the job scheduler."""
logger.info("Trying to stop the scheduler.")
try:
scheduler.shutdown(wait=True)
logger.info("Scheduler stopped successfully.")
- except Exception as e:
- logger.error(f"An error occurred while stopping the scheduler: {e}")
+ except (RuntimeError, ValueError) as e: # Specify expected exceptions here
+ logger.error("An error occurred while stopping the scheduler: %s", e)
diff --git a/jobs/token_updater.py b/jobs/token_updater.py
index 3f480e0..f996c7a 100644
--- a/jobs/token_updater.py
+++ b/jobs/token_updater.py
@@ -1,8 +1,14 @@
+"""
+This module handles updating the Marzban panel token at regular intervals.
+"""
+
+import httpx
+
+from marzban import MarzbanAPI
from utils.config import MARZBAN_PASSWORD, MARZBAN_USERNAME, MARZBAN_ADDRESS
-from db.crud import TokenManager
from utils.log import logger
+from db.crud import TokenManager
from models import TokenUpsert
-from marzban import MarzbanAPI
async def token_update() -> bool:
@@ -25,13 +31,13 @@ async def token_update() -> bool:
if token_data:
logger.info("Token updated successfully.")
return True
- else:
- logger.error("Failed to update token in database.")
- return False
+
+ logger.error("Failed to update token in database.")
+ return False
logger.error("Failed to retrieve token: No token received.")
return False
- except Exception as e:
- logger.error(f"An unexpected TOKEN_UPDATER error occurred: {str(e)}")
+ except (httpx.HTTPStatusError, httpx.RequestError) as e:
+ logger.error("An error occurred during the API request: %s", str(e))
return False
diff --git a/main.py b/main.py
index 8b5c2c7..de2c9bf 100644
--- a/main.py
+++ b/main.py
@@ -1,3 +1,9 @@
+"""
+Main module for the Telegram bot application.
+This module initializes and runs the bot with all necessary configurations,
+including scheduler setup, router configuration, and middleware integration.
+"""
+
import asyncio
from aiogram import Bot, Dispatcher
from aiogram.enums.parse_mode import ParseMode
@@ -60,8 +66,10 @@ async def main() -> None:
# Start polling the bot
try:
await dp.start_polling(bot)
- except Exception as e:
- logger.error(f"An error occurred while polling: {e}")
+ except (ConnectionError, TimeoutError) as conn_err:
+ logger.error("A connection error occurred while polling: %s", conn_err)
+ except Exception as e: # pylint: disable=broad-except
+ logger.error("An error occurred while polling: %s", e)
if __name__ == "__main__":
@@ -70,5 +78,5 @@ async def main() -> None:
asyncio.run(main())
except KeyboardInterrupt:
logger.warning("Bot stopped by user.")
- except Exception as e:
- logger.error(f"An unexpected error occurred: {e}")
+ except Exception as e: # pylint: disable=broad-except
+ logger.error("An unexpected error occurred: %s", e)
diff --git a/middlewares/auth.py b/middlewares/auth.py
index 805ea22..2db27a4 100644
--- a/middlewares/auth.py
+++ b/middlewares/auth.py
@@ -1,12 +1,22 @@
+"""
+Middleware for handling admin access in Telegram bot updates.
+"""
+
from typing import Any, Awaitable, Callable, Dict
from aiogram import BaseMiddleware
from aiogram.types import Update
-from utils.log import logger
-from utils.config import TELEGRAM_ADMINS_ID
-from utils.statedb import storage
+
+from utils import logger, storage, config
+# pylint: disable=too-few-public-methods
class CheckAdminAccess(BaseMiddleware):
+ """
+ Middleware to check if the user is an admin based on their user ID.
+ This middleware processes incoming updates and allows only admins
+ to proceed with the handler.
+ """
+
async def __call__(
self,
handler: Callable[[Update, Dict[str, Any]], Awaitable[Any]],
@@ -28,8 +38,8 @@ async def __call__(
logger.warning("Received update without user information!")
return None
- if user.id not in TELEGRAM_ADMINS_ID:
- logger.warning(f"Blocked {user.username or user.first_name}")
+ if user.id not in config.TELEGRAM_ADMINS_ID:
+ logger.warning("Blocked %s", user.username or user.first_name)
return None
return await handler(event, data)
diff --git a/models/__init__.py b/models/__init__.py
index 1ab4573..9ebb68d 100644
--- a/models/__init__.py
+++ b/models/__init__.py
@@ -1,3 +1,8 @@
+"""
+Module containing all the model definitions for the application.
+This includes models for token, user state, settings, and other core entities.
+"""
+
from .token import TokenData, TokenUpsert
from .state import UserCreateForm
from .setting import SettingData, SettingUpsert, SettingKeys
diff --git a/models/callback.py b/models/callback.py
index 5faeac1..21d60c6 100644
--- a/models/callback.py
+++ b/models/callback.py
@@ -1,43 +1,75 @@
-from aiogram.filters.callback_data import CallbackData
+"""
+Module defining callback data classes for handling bot actions and page navigation.
+"""
+
from enum import Enum
+from aiogram.filters.callback_data import CallbackData
class AdminActions(str, Enum):
- Add = "add"
- Edit = "edit"
- Info = "info"
- Delete = "delete"
+ """
+ Enum representing various admin actions that can be performed.
+ """
+
+ ADD = "add"
+ EDIT = "edit"
+ INFO = "info"
+ DELETE = "delete"
class BotActions(str, Enum):
- NodeChecker = "node_checker"
- NodeAutoRestart = "node_auto_restart"
- UsersInbound = "users_inbound"
+ """
+ Enum representing various bot actions.
+ """
+
+ NODE_CHECKER = "node_checker"
+ NODE_AUTO_RESTART = "node_auto_restart"
+ USERS_INBOUND = "users_inbound"
class PagesActions(str, Enum):
- Home = "home"
- UserCreate = "user_create"
- NodeMonitoring = "node_monitoring"
- UsersMenu = "users_menu"
+ """
+ Enum representing different pages in the bot navigation.
+ """
+
+ HOME = "home"
+ USER_CREATE = "user_create"
+ NODE_MONITORING = "node_monitoring"
+ USERS_MENU = "users_menu"
class PagesCallbacks(CallbackData, prefix="pages"):
+ """
+ Callback data structure for page navigation.
+ """
+
page: PagesActions
-class ConfirmCallbacks(CallbackData, prefix="confim"):
+class ConfirmCallbacks(CallbackData, prefix="confirm"):
+ """
+ Callback data structure for confirmation actions.
+ """
+
page: BotActions
action: AdminActions
is_confirm: bool = False
class UserStatusCallbacks(CallbackData, prefix="user_status"):
+ """
+ Callback data structure for user status actions.
+ """
+
status: str
action: AdminActions
class UserInboundsCallbacks(CallbackData, prefix="user_inbounds"):
+ """
+ Callback data structure for user inbounds actions.
+ """
+
tag: str | None = None
protocol: str | None = None
is_selected: bool | None = None
@@ -47,4 +79,8 @@ class UserInboundsCallbacks(CallbackData, prefix="user_inbounds"):
class AdminSelectCallbacks(CallbackData, prefix="admin_select"):
+ """
+ Callback data structure for selecting an admin by username.
+ """
+
username: str
diff --git a/models/setting.py b/models/setting.py
index c06e0e6..702c9ec 100644
--- a/models/setting.py
+++ b/models/setting.py
@@ -1,23 +1,37 @@
-from pydantic import BaseModel
+"""
+Module defining settings models for application configuration.
+"""
+
from datetime import datetime
from enum import Enum
+from typing import Optional
+from pydantic import BaseModel
class SettingKeys(str, Enum):
- NodeMonitoringIsActive = "node_monitoring_is_active"
- NodeMonitoringAutoRestart = "node_monitoring_auto_restart"
+ """Enum for application setting keys."""
+
+ NODE_MONITORING_IS_ACTIVE = "node_monitoring_is_active"
+ NODE_MONITORING_AUTO_RESTART = "node_monitoring_auto_restart"
class SettingData(BaseModel):
+ """Model for application setting data."""
+
key: str
- value: str | None
+ value: Optional[str]
created_at: datetime
- updated_at: datetime | None
+ updated_at: Optional[datetime]
+ # pylint: disable=R0903
class Config:
+ """Pydantic configuration options."""
+
from_attributes = True
class SettingUpsert(BaseModel):
+ """Model for upserting a setting."""
+
key: str
- value: str | None
+ value: Optional[str]
diff --git a/models/state.py b/models/state.py
index 1c70c11..c4d1e54 100644
--- a/models/state.py
+++ b/models/state.py
@@ -1,12 +1,23 @@
+"""
+Module defining the states for the user creation process.
+"""
+
from aiogram.fsm.state import StatesGroup, State
+# pylint: disable=R0903
class UserCreateForm(StatesGroup):
- base_username = State()
- start_number = State()
- how_much = State()
- data_limit = State()
- date_limit = State()
- status = State()
- admin = State()
- inbounds = State()
+ """
+ States group for the user creation process in the bot.
+ This defines the various states a user can go through while
+ filling out the form for user creation.
+ """
+
+ base_username = State() # State for base username
+ start_number = State() # State for start number
+ how_much = State() # State for amount
+ data_limit = State() # State for data limit
+ date_limit = State() # State for date limit
+ status = State() # State for user status
+ admin = State() # State for admin status
+ inbounds = State() # State for inbounds
diff --git a/models/token.py b/models/token.py
index 7088b92..b9fb49c 100644
--- a/models/token.py
+++ b/models/token.py
@@ -1,16 +1,32 @@
-from pydantic import BaseModel
+"""
+Module defining token-related models.
+"""
+
from datetime import datetime
+from pydantic import BaseModel
class TokenData(BaseModel):
+ """
+ Model for representing a token with its associated data,
+ including creation and update timestamps.
+ """
+
id: int
token: str
updated_at: datetime | None
created_at: datetime
+ # pylint: disable=R0903
class Config:
+ """Pydantic configuration options."""
+
from_attributes = True
class TokenUpsert(BaseModel):
+ """
+ Model for upserting a token.
+ """
+
token: str
diff --git a/requirements.txt b/requirements.txt
index 71ee103..6d46bdc 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,4 +7,5 @@ aiosqlite==0.20.0
APScheduler==3.10.4
marzban==0.2.8
pillow==10.4.0
-alembic==1.13.1
\ No newline at end of file
+alembic==1.13.1
+httpx==0.27.0
\ No newline at end of file
diff --git a/routers/__init__.py b/routers/__init__.py
index 54dad37..acc109f 100644
--- a/routers/__init__.py
+++ b/routers/__init__.py
@@ -1,10 +1,16 @@
+"""
+This module sets up the routers for the bot application.
+It includes base, user, node, and users routers.
+"""
+
from aiogram import Router
+from . import base, user, node, users
def setup_routers() -> Router:
-
- from . import base, user, node, users
-
+ """
+ Sets up the routers for the bot application by including the necessary sub-routers.
+ """
router = Router()
router.include_router(base.router)
diff --git a/routers/base.py b/routers/base.py
index 1ea234b..3fbf64c 100644
--- a/routers/base.py
+++ b/routers/base.py
@@ -1,3 +1,8 @@
+"""
+This module defines the base router for handling commands and callbacks in the bot.
+It includes handlers for commands like start and version, and processes specific callback queries.
+"""
+
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery
from aiogram.filters.command import CommandStart, Command
@@ -13,23 +18,34 @@
@router.message(CommandStart(ignore_case=True))
async def start(message: Message, state: FSMContext):
+ """
+ Handler for the '/start' command. It clears the user's state and sends the start message
+ with the home keyboard.
+ """
await state.clear()
new_message = await message.answer(
- text=MessageTexts.Start, reply_markup=BotKeyboards.home()
+ text=MessageTexts.START, reply_markup=BotKeyboards.home()
)
return await storage.clear_and_add_message(new_message)
-@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.Home)))
+@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.HOME)))
async def home(callback: CallbackQuery, state: FSMContext):
+ """
+ Callback handler for the 'HOME' page action. Clears the state and sends the start message
+ with the home keyboard again.
+ """
await state.clear()
new_message = await callback.message.answer(
- text=MessageTexts.Start, reply_markup=BotKeyboards.home()
+ text=MessageTexts.START, reply_markup=BotKeyboards.home()
)
return await storage.clear_and_add_message(new_message)
@router.message(Command(commands=["version", "v"]))
async def version(message: Message):
- new_message = await message.answer(text=MessageTexts.Version)
+ """
+ Handler for the '/version' and '/v' commands. Sends the version information of the bot.
+ """
+ new_message = await message.answer(text=MessageTexts.VERSION)
return await storage.clear_and_add_message(new_message)
diff --git a/routers/node.py b/routers/node.py
index 066df10..de3aec5 100644
--- a/routers/node.py
+++ b/routers/node.py
@@ -1,3 +1,8 @@
+"""
+This module contains the handlers for the node monitoring menu and actions.
+It includes callbacks for toggling settings related to node monitoring.
+"""
+
from aiogram import Router, F
from aiogram.types import CallbackQuery
@@ -17,23 +22,31 @@
async def get_setting_status(key: SettingKeys) -> str:
+ """
+ Returns the status of the specified setting as 'ON' or 'OFF'.
+ """
return "ON" if await SettingManager.get(key) else "OFF"
async def toggle_setting(key: SettingKeys):
+ """
+ Toggles the value of the specified setting.
+ """
current_value = await SettingManager.get(key)
new_value = None if current_value else "True"
await SettingManager.upsert(SettingUpsert(key=key, value=new_value))
-@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.NodeMonitoring)))
+@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.NODE_MONITORING)))
async def node_monitoring_menu(callback: CallbackQuery):
- checker_status = await get_setting_status(SettingKeys.NodeMonitoringIsActive)
- auto_restart_status = await get_setting_status(
- SettingKeys.NodeMonitoringAutoRestart
- )
+ """
+ Handler for the node monitoring menu callback. It retrieves the current status
+ of node monitoring settings and updates the menu text.
+ """
+ checker_status = await get_setting_status(SettingKeys.NODE_MONITORING_IS_ACTIVE)
+ auto_restart_status = await get_setting_status(SettingKeys.NODE_MONITORING_AUTO_RESTART)
- text = MessageTexts.NodeMonitoringMenu.format(
+ text = MessageTexts.NODE_MONITORING_MENU.format(
checker=checker_status,
auto_restart=auto_restart_status,
)
@@ -42,13 +55,19 @@ async def node_monitoring_menu(callback: CallbackQuery):
)
-@router.callback_query(ConfirmCallbacks.filter(F.page.is_(BotActions.NodeAutoRestart)))
+@router.callback_query(ConfirmCallbacks.filter(F.page.is_(BotActions.NODE_AUTO_RESTART)))
async def node_monitoring_auto_restart(callback: CallbackQuery):
- await toggle_setting(SettingKeys.NodeMonitoringAutoRestart)
+ """
+ Handler for toggling the auto-restart setting for node monitoring.
+ """
+ await toggle_setting(SettingKeys.NODE_MONITORING_AUTO_RESTART)
await node_monitoring_menu(callback)
-@router.callback_query(ConfirmCallbacks.filter(F.page.is_(BotActions.NodeChecker)))
+@router.callback_query(ConfirmCallbacks.filter(F.page.is_(BotActions.NODE_CHECKER)))
async def node_monitoring_checker(callback: CallbackQuery):
- await toggle_setting(SettingKeys.NodeMonitoringIsActive)
+ """
+ Handler for toggling the checker setting for node monitoring.
+ """
+ await toggle_setting(SettingKeys.NODE_MONITORING_IS_ACTIVE)
await node_monitoring_menu(callback)
diff --git a/routers/user.py b/routers/user.py
index d04624f..4c35522 100644
--- a/routers/user.py
+++ b/routers/user.py
@@ -1,3 +1,8 @@
+"""
+This module contains the user-related callback functions and their handlers
+for user creation and management.
+"""
+
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, BufferedInputFile
from aiogram.fsm.context import FSMContext
@@ -20,35 +25,40 @@
AdminSelectCallbacks,
)
-
router = Router()
-@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.UserCreate)))
-async def user_create(
- callback: CallbackQuery, callback_data: PagesCallbacks, state: FSMContext
-):
+@router.callback_query(PagesCallbacks.filter(F.page.is_(PagesActions.USER_CREATE)))
+async def user_create(callback: CallbackQuery, state: FSMContext):
+ """
+ Initiates the user creation process by asking for the base username.
+ """
await state.set_state(UserCreateForm.base_username)
return await callback.message.edit_text(
- text=MessageTexts.AskCreateUserBaseUsername, reply_markup=BotKeyboards.cancel()
+ text=MessageTexts.ASK_CREATE_USER_BASE_USERNAME, reply_markup=BotKeyboards.cancel()
)
@router.message(StateFilter(UserCreateForm.base_username))
async def user_create_base_username(message: Message, state: FSMContext):
+ """
+ Handles the input for the base username in the user creation process.
+ """
await state.update_data(base_username=message.text)
await state.set_state(UserCreateForm.start_number)
new_message = await message.answer(
- text=MessageTexts.AskCreateUserStartNumber, reply_markup=BotKeyboards.cancel()
+ text=MessageTexts.ASK_CREATE_USER_START_NUMBER, reply_markup=BotKeyboards.cancel()
)
return await storage.clear_and_add_message(new_message)
@router.message(StateFilter(UserCreateForm.start_number))
async def user_create_start_number(message: Message, state: FSMContext):
-
+ """
+ Handles the input for the starting number in the user creation process.
+ """
if not message.text.isdigit():
- new_message = await message.answer(text=MessageTexts.JustNumber)
+ new_message = await message.answer(text=MessageTexts.JUST_NUMBER)
return await storage.add_log_message(
message.from_user.id, new_message.message_id
)
@@ -56,16 +66,18 @@ async def user_create_start_number(message: Message, state: FSMContext):
await state.update_data(start_number=int(message.text))
await state.set_state(UserCreateForm.how_much)
new_message = await message.answer(
- text=MessageTexts.AskCreateUserHowMuch, reply_markup=BotKeyboards.cancel()
+ text=MessageTexts.ASK_CREATE_USER_HOW_MUCH, reply_markup=BotKeyboards.cancel()
)
return await storage.clear_and_add_message(new_message)
@router.message(StateFilter(UserCreateForm.how_much))
async def user_create_how_much(message: Message, state: FSMContext):
-
+ """
+ Handles the input for the 'how much' field in the user creation process.
+ """
if not message.text.isdigit():
- new_message = await message.answer(text=MessageTexts.JustNumber)
+ new_message = await message.answer(text=MessageTexts.JUST_NUMBER)
return await storage.add_log_message(
message.from_user.id, new_message.message_id
)
@@ -73,16 +85,18 @@ async def user_create_how_much(message: Message, state: FSMContext):
await state.update_data(how_much=int(message.text))
await state.set_state(UserCreateForm.data_limit)
new_message = await message.answer(
- text=MessageTexts.AskCreateUserDataLimit, reply_markup=BotKeyboards.cancel()
+ text=MessageTexts.ASK_CREATE_USER_DATA_LIMIT, reply_markup=BotKeyboards.cancel()
)
return await storage.clear_and_add_message(new_message)
@router.message(StateFilter(UserCreateForm.data_limit))
async def user_create_data_limit(message: Message, state: FSMContext):
-
+ """
+ Handles the input for the data limit in the user creation process.
+ """
if not message.text.isdigit():
- new_message = await message.answer(text=MessageTexts.JustNumber)
+ new_message = await message.answer(text=MessageTexts.JUST_NUMBER)
return await storage.add_log_message(
message.from_user.id, new_message.message_id
)
@@ -90,36 +104,41 @@ async def user_create_data_limit(message: Message, state: FSMContext):
await state.update_data(data_limit=int(message.text))
await state.set_state(UserCreateForm.date_limit)
new_message = await message.answer(
- text=MessageTexts.AskCreateUserDateLimit, reply_markup=BotKeyboards.cancel()
+ text=MessageTexts.ASK_CREATE_USER_DATE_LIMIT, reply_markup=BotKeyboards.cancel()
)
return await storage.clear_and_add_message(new_message)
@router.message(StateFilter(UserCreateForm.date_limit))
async def user_create_date_limit(message: Message, state: FSMContext):
-
+ """
+ Handles the input for the date limit in the user creation process.
+ """
if not message.text.isdigit():
- new_message = await message.answer(text=MessageTexts.JustNumber)
+ new_message = await message.answer(text=MessageTexts.JUST_NUMBER)
return await storage.add_log_message(
message.from_user.id, new_message.message_id
)
await state.update_data(date_limit=int(message.text))
new_message = await message.answer(
- text=MessageTexts.AskCreateUserStatus,
- reply_markup=BotKeyboards.user_status(AdminActions.Add),
+ text=MessageTexts.ASK_CREATE_USER_STATUS,
+ reply_markup=BotKeyboards.user_status(AdminActions.ADD),
)
return await storage.clear_and_add_message(new_message)
-@router.callback_query(UserStatusCallbacks.filter(F.action.is_(AdminActions.Add)))
+@router.callback_query(UserStatusCallbacks.filter(F.action.is_(AdminActions.ADD)))
async def user_create_status(
callback: CallbackQuery, callback_data: UserStatusCallbacks, state: FSMContext
):
+ """
+ Handles the status selection for user creation.
+ """
await state.update_data(status=callback_data.status)
admins = await panel.admins()
return await callback.message.edit_text(
- text=MessageTexts.AskCreateAdminUsername,
+ text=MessageTexts.ASK_CREATE_ADMIN_USERNAME,
reply_markup=BotKeyboards.admins(admins),
)
@@ -128,11 +147,14 @@ async def user_create_status(
async def user_create_owner_select(
callback: CallbackQuery, callback_data: AdminSelectCallbacks, state: FSMContext
):
+ """
+ Handles the selection of the admin owner during the user creation process.
+ """
await state.update_data(admin=callback_data.username)
- inbounds = await panel.inbounds()
+ inbounds = await panel.get_inbounds()
await state.update_data(inbounds=inbounds)
return await callback.message.edit_text(
- text=MessageTexts.AskCreateUserInbouds,
+ text=MessageTexts.ASK_CREATE_USER_INBOUNDS,
reply_markup=BotKeyboards.inbounds(inbounds),
)
@@ -140,7 +162,7 @@ async def user_create_owner_select(
@router.callback_query(
UserInboundsCallbacks.filter(
(
- F.action.is_(AdminActions.Add)
+ F.action.is_(AdminActions.ADD)
& (F.is_done.is_(False))
& (F.just_one_inbound.is_(False))
)
@@ -151,37 +173,45 @@ async def user_create_inbounds(
callback_data: UserInboundsCallbacks,
state: FSMContext,
):
+ """
+ Handles the inbound selection for user creation.
+ """
data = await state.get_data()
inbounds = data.get("inbounds")
selected_inbounds = set(data.get("selected_inbounds", []))
- (
+
+ if callback_data.is_selected is False:
selected_inbounds.add(callback_data.tag)
- if callback_data.is_selected is False
- else selected_inbounds.discard(callback_data.tag)
- )
+ else:
+ selected_inbounds.discard(callback_data.tag)
+
await state.update_data(selected_inbounds=list(selected_inbounds))
await callback.message.edit_reply_markup(
reply_markup=BotKeyboards.inbounds(inbounds, selected_inbounds)
)
+
@router.callback_query(
UserInboundsCallbacks.filter(
(
- F.action.is_(AdminActions.Add)
+ F.action.is_(AdminActions.ADD)
& (F.is_done.is_(True))
& (F.just_one_inbound.is_(False))
)
)
)
async def user_create_inbounds_save(callback: CallbackQuery, state: FSMContext):
+ """
+ Saves the selected inbounds and creates users with the provided information.
+ """
data = await state.get_data()
inbounds: dict[str, list[ProxyInbound]] = data.get("inbounds")
selected_inbounds = set(data.get("selected_inbounds", []))
if not selected_inbounds:
return await callback.answer(
- text=MessageTexts.NoneUserInbounds, show_alert=True
+ text=MessageTexts.NONE_USER_INBOUNDS, show_alert=True
)
proxies = {
@@ -219,7 +249,7 @@ async def user_create_inbounds_save(callback: CallbackQuery, state: FSMContext):
await callback.message.answer_photo(
caption=text_info.user_info(new_user),
photo=BufferedInputFile(qr_bytes, filename="qr_code.png"),
- reply_markup=BotKeyboards.user(new_user)
+ reply_markup=BotKeyboards.user(new_user),
)
else:
await callback.message.answer(
diff --git a/routers/users.py b/routers/users.py
index e439d41..5acea25 100644
--- a/routers/users.py
+++ b/routers/users.py
@@ -1,3 +1,9 @@
+"""
+This module contains the callback functions for managing user actions,
+such as navigating the users menu, adding or deleting inbounds, and updating
+user settings related to inbounds.
+"""
+
from aiogram import Router, F
from aiogram.types import CallbackQuery
from models import (
@@ -15,18 +21,25 @@
router = Router()
-@router.callback_query(PagesCallbacks.filter(F.page == PagesActions.UsersMenu))
+@router.callback_query(PagesCallbacks.filter(F.page == PagesActions.USERS_MENU))
async def menu(callback: CallbackQuery):
+ """
+ Handles the callback for the Users Menu page and displays the corresponding menu.
+ """
return await callback.message.edit_text(
- text=MessageTexts.UsersMenu, reply_markup=BotKeyboards.users()
+ text=MessageTexts.USERS_MENU, reply_markup=BotKeyboards.users()
)
-@router.callback_query(ConfirmCallbacks.filter(F.page == BotActions.UsersInbound))
+@router.callback_query(ConfirmCallbacks.filter(F.page == BotActions.USERS_INBOUND))
async def inbound_add(callback: CallbackQuery, callback_data: ConfirmCallbacks):
- inbounds = await panel.inbounds()
+ """
+ Handles the callback for adding or managing inbounds in the users' settings.
+ Displays the inbound selection menu based on the provided callback data.
+ """
+ inbounds = await panel.get_inbounds()
return await callback.message.edit_text(
- text=MessageTexts.UsersInboundSelect,
+ text=MessageTexts.USERS_INBOUND_SELECT,
reply_markup=BotKeyboards.inbounds(
inbounds=inbounds, action=callback_data.action, just_one_inbound=True
),
@@ -36,7 +49,7 @@ async def inbound_add(callback: CallbackQuery, callback_data: ConfirmCallbacks):
@router.callback_query(
UserInboundsCallbacks.filter(
(
- F.action.in_([AdminActions.Add, AdminActions.Delete])
+ F.action.in_([AdminActions.ADD, AdminActions.DELETE])
& (F.is_done.is_(True))
& (F.just_one_inbound.is_(True))
)
@@ -45,22 +58,24 @@ async def inbound_add(callback: CallbackQuery, callback_data: ConfirmCallbacks):
async def inbound_confirm(
callback: CallbackQuery, callback_data: UserInboundsCallbacks
):
- working_message = await callback.message.edit_text(text=MessageTexts.Working)
+ """
+ Confirms the addition or deletion of an inbound for the user based on the
+ selected action. After processing the action, it updates the message with the result.
+ """
+ working_message = await callback.message.edit_text(text=MessageTexts.WORKING)
result = await helpers.manage_panel_inbounds(
callback_data.tag,
callback_data.protocol,
(
- AdminActions.Add
- if callback_data.action.value == AdminActions.Add.value
- else AdminActions.Delete
+ AdminActions.ADD
+ if callback_data.action.value == AdminActions.ADD.value
+ else AdminActions.DELETE
),
)
return await working_message.edit_text(
- text=(
- MessageTexts.UsersInboundSuccessUpdated
- if result
- else MessageTexts.UsersInboundErrorUpdated
- ),
- reply_markup=BotKeyboards.home(),
+ text=(MessageTexts.USERS_INBOUND_SUCCESS_UPDATED
+ if result
+ else MessageTexts.USERS_INBOUND_ERROR_UPDATED),
+ reply_markup=BotKeyboards.home(),
)
diff --git a/utils/__init__.py b/utils/__init__.py
new file mode 100644
index 0000000..f731926
--- /dev/null
+++ b/utils/__init__.py
@@ -0,0 +1,5 @@
+"""
+This module imports necessary components for database access and logging.
+"""
+from .statedb import storage
+from .log import logger
diff --git a/utils/config.py b/utils/config.py
index 7bd5736..d567e25 100644
--- a/utils/config.py
+++ b/utils/config.py
@@ -1,8 +1,17 @@
+"""
+This module contains configuration settings for the application, including
+Telegram bot settings, Marzban panel settings, and excluded monitorings.
+It ensures that all required settings are provided and checks for missing values.
+"""
+
from decouple import config
# Function to check if a required configuration value is missing
def require_setting(setting_name, value):
+ """
+ Ensures that a required setting is provided and not empty.
+ """
if not value:
raise ValueError(
f"The '{setting_name}' setting is required and cannot be empty."
diff --git a/utils/helpers.py b/utils/helpers.py
index f3876b5..544d821 100644
--- a/utils/helpers.py
+++ b/utils/helpers.py
@@ -1,10 +1,16 @@
-import qrcode
+"""
+This module contains helper functions for processing users' inbound data,
+generating QR codes, and managing user data updates concurrently with rate limiting.
+"""
+
import asyncio
from io import BytesIO
+import qrcode
+import httpx
+from marzban import UserModify, UserResponse
from models import AdminActions
from utils import panel
from utils.log import logger
-from marzban import UserModify, UserResponse
async def create_qr(text: str) -> bytes:
@@ -33,16 +39,15 @@ async def process_user(
tag: str,
protocol: str,
action: AdminActions,
- max_retries: int = 3,
) -> bool:
- """Process a single user with semaphore for rate limiting and retry mechanism"""
+ """Process a single user with semaphore for rate limiting and retry mechanism."""
async with semaphore:
current_inbounds = user.inbounds.copy() if user.inbounds else {}
current_proxies = user.proxies.copy() if user.proxies else {}
needs_update = False
- if action == AdminActions.Delete:
+ if action == AdminActions.DELETE:
if protocol in current_inbounds and tag in current_inbounds[protocol]:
current_inbounds[protocol].remove(tag)
needs_update = True
@@ -51,7 +56,7 @@ async def process_user(
current_inbounds.pop(protocol, None)
current_proxies.pop(protocol, None)
- elif action == AdminActions.Add:
+ elif action == AdminActions.ADD:
if protocol not in current_inbounds:
current_inbounds[protocol] = []
current_proxies[protocol] = {}
@@ -80,7 +85,7 @@ async def process_user(
async def process_batch(
users: list[UserResponse], tag: str, protocol: str, action: AdminActions
) -> int:
- """Process a batch of users concurrently with rate limiting"""
+ """Process a batch of users concurrently with rate limiting."""
semaphore = asyncio.Semaphore(5)
tasks = []
@@ -93,6 +98,7 @@ async def process_batch(
async def manage_panel_inbounds(tag: str, protocol: str, action: AdminActions) -> bool:
+ """Manage inbounds for users, processing them in batches and handling updates."""
try:
offset = 0
batch_size = 50
@@ -112,6 +118,8 @@ async def manage_panel_inbounds(tag: str, protocol: str, action: AdminActions) -
return True
- except Exception as e:
- logger.error(f"Error in manage panel inbounds: {e}")
- return False
+ except asyncio.CancelledError:
+ logger.warning("Operation was cancelled.")
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("HTTP error in manage panel inbounds: %s", e)
+ return False
diff --git a/utils/keys.py b/utils/keys.py
index 3b06013..2d42615 100644
--- a/utils/keys.py
+++ b/utils/keys.py
@@ -1,8 +1,14 @@
-from aiogram.types import InlineKeyboardMarkup, CopyTextButton
-from aiogram.utils.keyboard import InlineKeyboardBuilder, InlineKeyboardButton
+"""
+Module for managing key configurations and callback handling for the bot.
-from marzban import ProxyInbound, Admin, UserResponse
+This module defines functions to build and handle callback buttons, as well as
+handling key actions related to users, nodes, and admins in the bot system.
+"""
+
+from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CopyTextButton
+from aiogram.utils.keyboard import InlineKeyboardBuilder
+from marzban import ProxyInbound, Admin, UserResponse
from utils.lang import KeyboardTexts
from models import (
PagesActions,
@@ -17,96 +23,116 @@
class BotKeyboards:
+ """
+ A class containing static methods to generate various inline keyboards used by the bot.
+ These keyboards are used for actions like creating users, monitoring nodes, managing users, etc.
+ """
@staticmethod
def home() -> InlineKeyboardMarkup:
+ """
+ Generates the home screen keyboard with buttons for User creation,
+ Node Monitoring, and Users Menu.
+ """
kb = InlineKeyboardBuilder()
kb.button(
- text=KeyboardTexts.UserCreate,
- callback_data=PagesCallbacks(page=PagesActions.UserCreate).pack(),
+ text=KeyboardTexts.USER_CREATE,
+ callback_data=PagesCallbacks(page=PagesActions.USER_CREATE).pack(),
)
kb.button(
- text=KeyboardTexts.NodeMonitoring,
- callback_data=PagesCallbacks(page=PagesActions.NodeMonitoring).pack(),
+ text=KeyboardTexts.NODE_MONITORING,
+ callback_data=PagesCallbacks(page=PagesActions.NODE_MONITORING).pack(),
)
kb.button(
- text=KeyboardTexts.UsersMenu,
- callback_data=PagesCallbacks(page=PagesActions.UsersMenu).pack(),
+ text=KeyboardTexts.USERS_MENU,
+ callback_data=PagesCallbacks(page=PagesActions.USERS_MENU).pack(),
)
return kb.adjust(2).as_markup()
@staticmethod
def cancel() -> InlineKeyboardMarkup:
- return (
- InlineKeyboardBuilder()
- .row(
- InlineKeyboardButton(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
- )
+ """
+ Generates a cancel button to return to the home screen.
+ """
+ return InlineKeyboardBuilder().row(
+ InlineKeyboardButton(
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
)
- .as_markup()
- )
+ ).as_markup()
@staticmethod
def user_status(action: AdminActions) -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard for changing user status to either 'active' or 'on hold'.
+ """
kb = InlineKeyboardBuilder()
kb.row(
InlineKeyboardButton(
- text=KeyboardTexts.Active,
+ text=KeyboardTexts.ACTIVE,
callback_data=UserStatusCallbacks(
status="active", action=action
).pack(),
),
InlineKeyboardButton(
- text=KeyboardTexts.OnHold,
+ text=KeyboardTexts.ON_HOLD,
callback_data=UserStatusCallbacks(
status="on_hold", action=action
).pack(),
),
)
kb.button(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
)
return kb.adjust(2).as_markup()
@staticmethod
def inbounds(
inbounds: dict[str, list[ProxyInbound]],
- selected: set[str] = [],
- action: AdminActions = AdminActions.Add,
+ selected: set[str] = None,
+ action: AdminActions = AdminActions.ADD,
just_one_inbound: bool = False,
- ):
+ ) -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard with available inbounds, allowing the user to select or deselect them.
+ """
+ if selected is None:
+ selected = set()
+
kb = InlineKeyboardBuilder()
for protocol_list in inbounds.values():
for inbound in protocol_list:
is_selected = inbound["tag"] in selected
kb.button(
- text=f"{('β
' if is_selected else 'β') if not just_one_inbound else 'π'} {inbound['tag']} ({inbound['protocol']})",
+ text=f"{('β
' if is_selected else 'β') if not just_one_inbound else 'π'} "
+ f"{inbound['tag']} ({inbound['protocol']})",
callback_data=UserInboundsCallbacks(
- tag=inbound["tag"],
- protocol=inbound["protocol"],
- is_selected=is_selected,
- action=action,
- just_one_inbound=just_one_inbound,
- is_done=just_one_inbound,
+ tag=inbound["tag"],
+ protocol=inbound["protocol"],
+ is_selected=is_selected,
+ action=action,
+ just_one_inbound=just_one_inbound,
+ is_done=just_one_inbound,
),
)
kb.row(
InlineKeyboardButton(
- text=KeyboardTexts.Finish,
+ text=KeyboardTexts.FINISH,
callback_data=UserInboundsCallbacks(action=action, is_done=True).pack(),
),
InlineKeyboardButton(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
),
)
return kb.adjust(2).as_markup()
@staticmethod
def admins(admins: list[Admin]) -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard with buttons for each admin in the list.
+ """
kb = InlineKeyboardBuilder()
for admin in admins:
@@ -117,68 +143,77 @@ def admins(admins: list[Admin]) -> InlineKeyboardMarkup:
kb.row(
InlineKeyboardButton(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
),
)
return kb.adjust(2).as_markup()
@staticmethod
def node_monitoring() -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard for node monitoring actions, such as checking or restarting nodes.
+ """
kb = InlineKeyboardBuilder()
kb.button(
- text=KeyboardTexts.NodeMonitoringChecker,
+ text=KeyboardTexts.NODE_MONITORING_CHECKER,
callback_data=ConfirmCallbacks(
- page=BotActions.NodeChecker, action=AdminActions.Edit, is_confirm=True
+ page=BotActions.NODE_CHECKER, action=AdminActions.EDIT, is_confirm=True
),
)
kb.button(
- text=KeyboardTexts.NodeMonitoringAutoRestart,
+ text=KeyboardTexts.NODE_MONITORING_AUTO_RESTART,
callback_data=ConfirmCallbacks(
- page=BotActions.NodeAutoRestart,
- action=AdminActions.Edit,
+ page=BotActions.NODE_AUTO_RESTART,
+ action=AdminActions.EDIT,
is_confirm=True,
),
)
kb.row(
InlineKeyboardButton(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
),
)
return kb.adjust(2).as_markup()
@staticmethod
def users() -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard with options for managing user inbounds, such as adding or deleting.
+ """
kb = InlineKeyboardBuilder()
kb.button(
- text=KeyboardTexts.UsersAddInbound,
+ text=KeyboardTexts.USERS_ADD_INBOUND,
callback_data=ConfirmCallbacks(
- page=BotActions.UsersInbound, action=AdminActions.Add
+ page=BotActions.USERS_INBOUND, action=AdminActions.ADD
),
)
kb.button(
- text=KeyboardTexts.UsersDeleteInbound,
+ text=KeyboardTexts.USERS_DELETE_INBOUND,
callback_data=ConfirmCallbacks(
- page=BotActions.UsersInbound, action=AdminActions.Delete
+ page=BotActions.USERS_INBOUND, action=AdminActions.DELETE
),
)
kb.row(
InlineKeyboardButton(
- text=KeyboardTexts.Home,
- callback_data=PagesCallbacks(page=PagesActions.Home).pack(),
+ text=KeyboardTexts.HOME,
+ callback_data=PagesCallbacks(page=PagesActions.HOME).pack(),
),
)
return kb.adjust(2).as_markup()
@staticmethod
def user(user: UserResponse) -> InlineKeyboardMarkup:
+ """
+ Generates a keyboard with a button to copy the user subscription link.
+ """
kb = InlineKeyboardBuilder()
kb.button(
- text=KeyboardTexts.UserCreateLinkCopy,
- copy_text=CopyTextButton(text=user.subscription_url)
+ text=KeyboardTexts.USER_CREATE_LINK_COPY,
+ copy_text=CopyTextButton(text=user.subscription_url),
)
- return kb.as_markup()
\ No newline at end of file
+ return kb.as_markup()
diff --git a/utils/lang.py b/utils/lang.py
index fbf5d6f..a1121ce 100644
--- a/utils/lang.py
+++ b/utils/lang.py
@@ -1,55 +1,63 @@
+"""
+This module contains constants and texts used in the HolderBot.
+"""
+
from enum import Enum
+# Module constants
VERSION = "0.2.3"
OWNER = "@ErfJabs"
class KeyboardTexts(str, Enum):
- Home = "π Back to home"
- UserCreate = "π€ User Create"
- NodeMonitoring = "π Node Monitoring"
- Active = "β
Active"
- OnHold = "βΈοΈ On hold"
- Finish = "βοΈ Finish"
- NodeMonitoringChecker = "𧨠Checker"
- NodeMonitoringAutoRestart = "π AutoRestart"
- UsersMenu = "π₯ Users"
- UsersAddInbound = "β Add inbound"
- UsersDeleteInbound = "β Delete inbound"
- UserCreateLinkCopy = "To copy the link, please click."
+ """Keyboard texts used in the bot."""
+ HOME = "π Back to home"
+ USER_CREATE = "π€ User Create"
+ NODE_MONITORING = "π Node Monitoring"
+ ACTIVE = "β
Active"
+ ON_HOLD = "βΈοΈ On hold"
+ FINISH = "βοΈ Finish"
+ NODE_MONITORING_CHECKER = "𧨠Checker"
+ NODE_MONITORING_AUTO_RESTART = "π AutoRestart"
+ USERS_MENU = "π₯ Users"
+ USERS_ADD_INBOUND = "β Add inbound"
+ USERS_DELETE_INBOUND = "β Delete inbound"
+ USER_CREATE_LINK_COPY = "To copy the link, please click."
+
class MessageTexts(str, Enum):
- Start = f"Welcome to HolderBot π€ [{VERSION}]\nDeveloped and designed by {OWNER}"
- Version = f"β‘οΈ Current Version: {VERSION}
"
- AskCreateUserBaseUsername = "π€ Please enter the user base name"
- AskCreateUserStartNumber = "π’ Please enter the starting user number"
- AskCreateUserHowMuch = "π₯ How many users would you like to create?"
- AskCreateUserDataLimit = "π Please enter the data limit in GB"
- AskCreateUserDateLimit = "π
Please enter the date limit in days"
- AskCreateUserStatus = "π Select the user status"
- AskCreateAdminUsername = "π€ Select the owner admin"
- AskCreateUserInbouds = "π Select the user inbounds"
- JustNumber = "π’ Please enter numbers only"
- NoneUserInbounds = "β οΈ Please select an inbound first"
- UserInfo = (
+ """Message texts used in the bot."""
+ START = f"Welcome to HolderBot π€ [{VERSION}]\nDeveloped and designed by {OWNER}"
+ VERSION = f"β‘οΈ Current Version: {VERSION}
"
+ ASK_CREATE_USER_BASE_USERNAME = "π€ Please enter the user base name"
+ ASK_CREATE_USER_START_NUMBER = "π’ Please enter the starting user number"
+ ASK_CREATE_USER_HOW_MUCH = "π₯ How many users would you like to create?"
+ ASK_CREATE_USER_DATA_LIMIT = "π Please enter the data limit in GB"
+ ASK_CREATE_USER_DATE_LIMIT = "π
Please enter the date limit in days"
+ ASK_CREATE_USER_STATUS = "π Select the user status"
+ ASK_CREATE_ADMIN_USERNAME = "π€ Select the owner admin"
+ ASK_CREATE_USER_INBOUNDS = "π Select the user inbounds"
+ JUST_NUMBER = "π’ Please enter numbers only"
+ NONE_USER_INBOUNDS = "β οΈ Please select an inbound first"
+ USER_INFO = (
"{status_emoji} Username: {username}
\n"
"π Data limit: {data_limit}
GB\n"
"π
Date limit: {date_limit}
days\n"
"π Subscription: {subscription}"
)
- NodeError = (
+ NODE_ERROR = (
"π Node: {name}
\n"
"π IP: {ip}
\n"
"πͺ Message: {message}
"
)
- NodeAutoRestartDone = "β
{name}
auto restart is Done!"
- NodeAutoRestartError = "β {name}
auto restart is Wrong!"
- NodeMonitoringMenu = (
+ NODE_AUTO_RESTART_DONE = "β
{name}
auto restart is Done!"
+ NODE_AUTO_RESTART_ERROR = "β {name}
auto restart is Wrong!"
+ NODE_MONITORING_MENU = (
"𧨠Checker is {checker}
\n"
"π AutoRestart is {auto_restart}
"
)
- UsersMenu = "π₯ What do you need?"
- UsersInboundSelect = "π Select Your Inbound:"
- Working = "β³"
- UsersInboundSuccessUpdated = "β
Users Inbounds is Updated!"
- UsersInboundErrorUpdated = "β Users Inbounds not Updated!"
+ USERS_MENU = "π₯ What do you need?"
+ USERS_INBOUND_SELECT = "π Select Your Inbound:"
+ WORKING = "β³"
+ USERS_INBOUND_SUCCESS_UPDATED = "β
Users Inbounds is Updated!"
+ USERS_INBOUND_ERROR_UPDATED = "β Users Inbounds not Updated!"
diff --git a/utils/log.py b/utils/log.py
index bed7531..62b1c89 100644
--- a/utils/log.py
+++ b/utils/log.py
@@ -1,9 +1,19 @@
+"""
+Logging setup module for the HolderBot.
+
+This module provides a function to set up a logger for the bot,
+allowing logging to both the console and a file.
+"""
+
import logging
def setup_logger(bot_name, level=logging.INFO):
- logger = logging.getLogger(bot_name)
- logger.setLevel(level)
+ """
+ Set up a logger for the specified bot.
+ """
+ bot_logger = logging.getLogger(bot_name)
+ bot_logger.setLevel(level)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
@@ -17,10 +27,11 @@ def setup_logger(bot_name, level=logging.INFO):
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
- logger.addHandler(file_handler)
+ bot_logger.addHandler(console_handler)
+ bot_logger.addHandler(file_handler)
- return logger
+ return bot_logger
+# Initialize the logger for HolderBot
logger = setup_logger("HolderBot")
diff --git a/utils/panel.py b/utils/panel.py
index a348b39..2f34bd0 100644
--- a/utils/panel.py
+++ b/utils/panel.py
@@ -1,3 +1,10 @@
+"""
+This module provides functions to interact with the Marzban API,
+including user management, retrieving inbounds, and managing admins.
+"""
+
+from datetime import datetime, timedelta
+import httpx
from marzban import (
MarzbanAPI,
ProxyInbound,
@@ -6,24 +13,24 @@
Admin,
UserModify,
)
-from datetime import datetime, timedelta
-from utils.config import MARZBAN_ADDRESS
from db import TokenManager
+from utils.config import MARZBAN_ADDRESS
from utils.log import logger
marzban_panel = MarzbanAPI(MARZBAN_ADDRESS, timeout=30.0, verify=False)
-
-async def inbounds() -> dict[str, list[ProxyInbound]]:
+async def get_inbounds() -> dict[str, list[ProxyInbound]]:
+ """
+ Retrieve a list of inbounds from the Marzban panel.
+ """
try:
get_token = await TokenManager.get()
- inbounds = await marzban_panel.get_inbounds(get_token.token)
- return inbounds or False
- except Exception as e:
- logger.error(f"Error getting panel inbounds: {e}")
+ return await marzban_panel.get_inbounds(get_token.token) or False
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error getting panel inbounds: %s", e)
return False
-
+# pylint: disable=R0913, R0917
async def create_user(
username: str,
status: str,
@@ -32,6 +39,9 @@ async def create_user(
data_limit: int,
date_limit: int,
) -> UserResponse:
+ """
+ Create a new user in the Marzban panel.
+ """
try:
get_token = await TokenManager.get()
@@ -48,61 +58,63 @@ async def create_user(
new_user.expire = int(
(datetime.utcnow() + timedelta(days=date_limit)).timestamp()
)
-
elif status == "on_hold":
new_user.on_hold_expire_duration = int(date_limit) * 86400
new_user.on_hold_timeout = (
datetime.utcnow() + timedelta(days=365)
).strftime("%Y-%m-%d %H:%M:%S")
- user = await marzban_panel.add_user(new_user, get_token.token)
- return user or None
- except Exception as e:
- logger.error(f"Error create user: {e}")
+ return await marzban_panel.add_user(new_user, get_token.token) or None
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error creating user: %s", e)
return False
-
-
async def admins() -> list[Admin]:
+ """
+ Retrieve a list of admins from the Marzban panel.
+ """
try:
get_token = await TokenManager.get()
- admins = await marzban_panel.get_admins(get_token.token)
- return admins or False
- except Exception as e:
- logger.error(f"Error getting admins list: {e}")
+ return await marzban_panel.get_admins(get_token.token) or False
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error getting admins list: %s", e)
return False
-
async def set_owner(admin: str, user: str) -> bool:
+ """
+ Set an admin as the owner of a user.
+ """
try:
get_token = await TokenManager.get()
- user = await marzban_panel.set_owner(
+ return await marzban_panel.set_owner(
username=user, admin_username=admin, token=get_token.token
- )
- return user or False
- except Exception as e:
- logger.error(f"Error set owner: {e}")
+ ) is not None
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error setting owner: %s", e)
return False
-
async def user_modify(username: str, data: UserModify) -> bool:
+ """
+ Modify an existing user's details.
+ """
try:
get_token = await TokenManager.get()
- user = await marzban_panel.modify_user(
+ return await marzban_panel.modify_user(
username=username, user=data, token=get_token.token
- )
- return True if user else False
- except Exception as e:
- logger.error(f"Error user modify: {e}")
+ ) is not None
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error modifying user: %s", e)
return False
-
async def get_users(offset: int = 0, limit: int = 50) -> list[UserResponse]:
+ """
+ Retrieve a list of users from the Marzban panel.
+ """
try:
get_token = await TokenManager.get()
- users = await marzban_panel.get_users(
+ users_response = await marzban_panel.get_users(
token=get_token.token, offset=offset, limit=limit
)
- return users.users if users else False
- except Exception as e:
- logger.error(f"Error getting all users: {e}")
+ return users_response.users if users_response else False
+ except (httpx.RequestError, httpx.HTTPStatusError) as e:
+ logger.error("Error getting all users: %s", e)
return False
diff --git a/utils/report.py b/utils/report.py
index 77fef3e..2f3f480 100644
--- a/utils/report.py
+++ b/utils/report.py
@@ -1,36 +1,50 @@
+"""
+This module is responsible for sending messages to admins
+"""
+
from aiogram import Bot
from aiogram.client.default import DefaultBotProperties
from aiogram.enums.parse_mode import ParseMode
+from aiogram.exceptions import AiogramError, TelegramAPIError
from marzban import NodeResponse
from utils.lang import MessageTexts
from utils.config import TELEGRAM_BOT_TOKEN, TELEGRAM_ADMINS_ID
+from utils.log import logger
bot = Bot(
token=TELEGRAM_BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML)
)
-
async def send_message(message: str):
+ """
+ Sends a message to all admins.
+ """
try:
for admin_chatid in TELEGRAM_ADMINS_ID:
await bot.send_message(chat_id=admin_chatid, text=message)
- except:
- pass
+ except (AiogramError, TelegramAPIError) as e:
+ logger.error("Failed send report message: %s", str(e))
async def node_error(node: NodeResponse):
- text = (MessageTexts.NodeError).format(
+ """
+ Sends a notification to admins about a node error.
+ """
+ text = (MessageTexts.NODE_ERROR).format(
name=node.name, ip=node.address, message=node.message or "None"
)
await send_message(text)
async def node_restart(node: NodeResponse, success: bool):
+ """
+ Sends a notification to admins about the result of a node restart.
+ """
text = (
- (MessageTexts.NodeAutoRestartDone).format(name=node.name)
+ (MessageTexts.NODE_AUTO_RESTART_DONE).format(name=node.name)
if success is True
- else (MessageTexts.NodeAutoRestartError).format(name=node.name)
+ else (MessageTexts.NODE_AUTO_RESTART_ERROR).format(name=node.name)
)
await send_message(text)
diff --git a/utils/statedb.py b/utils/statedb.py
index b63aeb3..b0bfe66 100644
--- a/utils/statedb.py
+++ b/utils/statedb.py
@@ -1,3 +1,6 @@
+# pylint: disable=all
+# because this is a plugin
+
from typing import Any, Dict, Optional, List, Union
from sqlalchemy import Column, Integer, String, JSON, and_
from sqlalchemy.ext.declarative import declarative_base
diff --git a/utils/text_info.py b/utils/text_info.py
index fe5d79a..e771389 100644
--- a/utils/text_info.py
+++ b/utils/text_info.py
@@ -1,10 +1,17 @@
+"""
+Module docstring: This module contains functions for formatting user information
+for display, including user status, data limit, subscription, etc.
+"""
+from datetime import datetime
from marzban import UserResponse
from utils.lang import MessageTexts
-from datetime import datetime
def user_info(user: UserResponse) -> str:
- return (MessageTexts.UserInfo).format(
+ """
+ Formats the user information for display.
+ """
+ return (MessageTexts.USER_INFO).format(
status_emoji="π£" if user.status == "on_hold" else "π’",
username=user.username,
data_limit=round((user.data_limit / (1024**3)), 3),