diff --git a/.vscode/launch.json b/.vscode/launch.json index 8a94e49..ad89780 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,21 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "Python: Remote Attach", + "type": "python", + "request": "attach", + "connect": { + "host": "localhost", + "port": 5678 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/app" + } + ], + }, { "name": "Python: Current File", "type": "python", @@ -11,6 +26,32 @@ "program": "${file}", "console": "integratedTerminal", "justMyCode": true + }, + { + "name": "Compose Up Dev", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/src/main.py", // replace with your script + "console": "integratedTerminal", + "justMyCode": true, + "preLaunchTask": "compose-up-dev" // name of the task to run before launching + }, + { + "name": "Run Dev", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/src/main.py", + "console": "integratedTerminal", + "justMyCode": true, + "args": ["--root_path", "/", "--api_port", "5000"], + "env": { + "KAFKA_HOST": "localhost:9094", + "DATABASE_URL": "mysql+aiomysql://root:root_bot_buster@localhost:3306/playerdata", + "POOL_TIMEOUT": "30", + "POOL_RECYCLE": "30" + }, + "cwd": "${workspaceFolder}/" } ] } + diff --git a/.vscode/settings.json b/.vscode/settings.json index a08d27f..9fde30e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,7 +5,7 @@ "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter", + "editor.defaultFormatter": "charliermarsh.ruff", "editor.formatOnSave": true, "editor.codeActionsOnSave": { "source.organizeImports": "explicit" diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..ee19907 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,17 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "compose-up", + "type": "shell", + "command": "docker-compose down --volumes && docker-compose up --build -d", + "isBackground": true, + }, + { + "label": "compose-up-dev", + "type": "shell", + "command": "docker-compose -f docker-compose-dev.yml down --volumes && docker-compose -f docker-compose-dev.yml up --build -d", + "isBackground": true, + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 6c3f442..8ccac08 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,10 @@ WORKDIR /project COPY ./requirements.txt /project RUN pip install --no-cache-dir -r requirements.txt +# # PTVSD is a Python debugger that can be used in a container +# ARG INSTALL_PTVSD=false +RUN if [ "$INSTALL_PTVSD" = "true" ] ; then pip install debugpy ; fi + # copy the scripts to the folder COPY ./src /project/src diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..493baaa --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +.PHONY: build up down clean cleanbuild + +build: + docker-compose build + +up: + docker-compose up -d + +down: + docker-compose down + +docker-restart: + docker compose down + docker compose up --build -d + +clean: + docker-compose down --volumes + +cleanbuild: clean + docker-compose up --build + +updev: clean + docker-compose -f docker-compose-dev.yml up -d + +create: + python3 -m venv .venv + +activate: + source .venv/bin/activate + +requirements: + pip install -r requirements.txt \ No newline at end of file diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml new file mode 100644 index 0000000..a39f159 --- /dev/null +++ b/docker-compose-dev.yml @@ -0,0 +1,116 @@ +version: '3' +services: + kafka: + container_name: kafka + image: bitnami/kafka:3.5.1-debian-11-r3 + environment: + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + # volumes: + # - ./kafka:/bitnami/kafka:rw + expose: + - 9094 + - 9092 + ports: + # - 9092:9092 + - 9094:9094 + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] + interval: 30s + timeout: 10s + retries: 5 + networks: + - botdetector-network + + # kafdrop: + # container_name: kafdrop + # image: obsidiandynamics/kafdrop:latest + # environment: + # - KAFKA_BROKERCONNECT=kafka:9092 + # - JVM_OPTS=-Xms32M -Xmx64M + # - SERVER_SERVLET_CONTEXTPATH=/ + # ports: + # - 9999:9000 + # restart: on-failure + # networks: + # - botdetector-network + # depends_on: + # kafka: + # condition: service_healthy + + kafka_setup: + container_name: kafka_setup + image: bot-detector/kafka_setup + build: + context: ./kafka_setup + command: ["python", "setup_kafka.py"] + environment: + - KAFKA_BROKER=kafka:9092 + networks: + - botdetector-network + # ports: + # - 9092:9092 + depends_on: + kafka: + condition: service_healthy + + mysql: + container_name: database + build: + context: ./mysql + image: bot-detector/mysql:latest + environment: + - MYSQL_ROOT_PASSWORD=root_bot_buster + volumes: + - ./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d + # - ./mysql/mount:/var/lib/mysql # creates persistence + ports: + - 3306:3306 + expose: + - 3306 + networks: + - botdetector-network + healthcheck: + test: ["CMD-SHELL", "mysqladmin ping -h localhost -u root -proot_bot_buster"] + interval: 10s + retries: 3 + start_period: 30s + timeout: 5s + + # worker: + # container_name: worker + # image: bot-detector/highscore_worker + # build: + # context: . + # dockerfile: Dockerfile + # target: base + # args: + # root_path: / + # api_port: 5000 + # # INSTALL_PTVSD: true + # # command: bash -c "apt update && apt install -y curl && sleep infinity" + # command: python src/main.py + # # ports: + # # - 5678:5678 + # environment: + # - KAFKA_HOST=kafka:9092 + # - DATABASE_URL=mysql+aiomysql://root:root_bot_buster@mysql:3306/playerdata + # - POOL_TIMEOUT=30 + # - POOL_RECYCLE=30 + # # - ENABLE_DEBUGPY=true + # # - PYDEVD_DISABLE_FILE_VALIDATION=1 + # networks: + # - botdetector-network + # volumes: + # - ./src:/project/src + # depends_on: + # kafka: + # condition: service_healthy + # mysql: + # condition: service_healthy + +networks: + botdetector-network: diff --git a/docker-compose.yml b/docker-compose.yml index b242d2d..635c77c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,9 @@ services: expose: - 9094 - 9092 + ports: + - 9092:9092 + - 9094:9094 healthcheck: test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] interval: 30s @@ -22,21 +25,21 @@ services: networks: - botdetector-network - kafdrop: - container_name: kafdrop - image: obsidiandynamics/kafdrop:latest - environment: - - KAFKA_BROKERCONNECT=kafka:9092 - - JVM_OPTS=-Xms32M -Xmx64M - - SERVER_SERVLET_CONTEXTPATH=/ - ports: - - 9999:9000 - restart: on-failure - networks: - - botdetector-network - depends_on: - kafka: - condition: service_healthy + # kafdrop: + # container_name: kafdrop + # image: obsidiandynamics/kafdrop:latest + # environment: + # - KAFKA_BROKERCONNECT=kafka:9092 + # - JVM_OPTS=-Xms32M -Xmx64M + # - SERVER_SERVLET_CONTEXTPATH=/ + # ports: + # - 9999:9000 + # restart: on-failure + # networks: + # - botdetector-network + # depends_on: + # kafka: + # condition: service_healthy kafka_setup: container_name: kafka_setup @@ -62,6 +65,8 @@ services: volumes: - ./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d # - ./mysql/mount:/var/lib/mysql # creates persistence + ports: + - 3307:3306 # ports: # - 3306:3306 expose: @@ -85,13 +90,18 @@ services: args: root_path: / api_port: 5000 + INSTALL_PTVSD: true # command: bash -c "apt update && apt install -y curl && sleep infinity" command: python src/main.py + # ports: + # - 5678:5678 environment: - KAFKA_HOST=kafka:9092 - DATABASE_URL=mysql+aiomysql://root:root_bot_buster@mysql:3306/playerdata - POOL_TIMEOUT=30 - POOL_RECYCLE=30 + # - ENABLE_DEBUGPY=true + # - PYDEVD_DISABLE_FILE_VALIDATION=1 networks: - botdetector-network volumes: diff --git a/mysql/docker-entrypoint-initdb.d/01_tables.sql b/mysql/docker-entrypoint-initdb.d/01_tables.sql index e671895..f7bd314 100644 --- a/mysql/docker-entrypoint-initdb.d/01_tables.sql +++ b/mysql/docker-entrypoint-initdb.d/01_tables.sql @@ -123,4 +123,42 @@ CREATE TABLE `playerHiscoreData` ( UNIQUE KEY `Unique_player_date` (`Player_id`,`ts_date`), CONSTRAINT `FK_Players_id` FOREIGN KEY (`Player_id`) REFERENCES `Players` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT ); -CREATE TRIGGER `hiscore_date_OnInsert` BEFORE INSERT ON `playerHiscoreData` FOR EACH ROW SET new.ts_date = DATE(new.timestamp); +CREATE TRIGGER `hiscore_date_OnInsert` BEFORE INSERT ON `playerHiscoreData` FOR EACH ROW SET new.ts_date = DATE(new.timestamp); + +CREATE TABLE scraper_data ( + scraper_id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + record_date DATE AS (DATE(created_at)) STORED, + player_id INT UNSIGNED NOT NULL, + UNIQUE KEY unique_player_per_day (player_id, record_date) +); + +CREATE TABLE skills ( + skill_id TINYINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, # < 255 + skill_name VARCHAR(50) NOT NULL, + UNIQUE KEY unique_skill_name (skill_name) +); +CREATE TABLE activities ( + activity_id TINYINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, # < 255 + activity_name VARCHAR(50) NOT NULL, + UNIQUE KEY unique_activity_name (activity_name) +); + + +CREATE TABLE player_skills ( + scraper_id BIGINT UNSIGNED NOT NULL, + skill_id TINYINT UNSIGNED NOT NULL, + skill_value INT UNSIGNED NOT NULL DEFAULT 0, # < 200 000 000 + FOREIGN KEY (scraper_id) REFERENCES scraper_data(scraper_id) ON DELETE CASCADE, + FOREIGN KEY (skill_id) REFERENCES skills(skill_id) ON DELETE CASCADE, + PRIMARY KEY (scraper_id, skill_id) +); + +CREATE TABLE player_activities ( + scraper_id BIGINT UNSIGNED NOT NULL, + activity_id TINYINT UNSIGNED NOT NULL, + activity_value INT UNSIGNED NOT NULL DEFAULT 0, # some guy could get over 65k kc + FOREIGN KEY (scraper_id) REFERENCES scraper_data(scraper_id) ON DELETE CASCADE, + FOREIGN KEY (activity_id) REFERENCES activities(activity_id) ON DELETE CASCADE, + PRIMARY KEY (scraper_id, activity_id) +); \ No newline at end of file diff --git a/mysql/docker-entrypoint-initdb.d/02_data.sql b/mysql/docker-entrypoint-initdb.d/02_data.sql index 475ba27..172f519 100644 --- a/mysql/docker-entrypoint-initdb.d/02_data.sql +++ b/mysql/docker-entrypoint-initdb.d/02_data.sql @@ -53,3 +53,105 @@ SET name = CONCAT('player', id), normalized_name = CONCAT('player', id) ; + +INSERT INTO skills (skill_name) VALUES + -- ('total'), + ('attack'), + ('defence'), + ('strength'), + ('hitpoints'), + ('ranged'), + ('prayer'), + ('magic'), + ('cooking'), + ('woodcutting'), + ('fletching'), + ('fishing'), + ('firemaking'), + ('crafting'), + ('smithing'), + ('mining'), + ('herblore'), + ('agility'), + ('thieving'), + ('slayer'), + ('farming'), + ('runecraft'), + ('hunter'), + ('construction') +; + + +INSERT INTO activities (activity_name) VALUES + ('abyssal_sire'), + ('alchemical_hydra'), + ('artio'), + ('barrows_chests'), + ('bounty_hunter_hunter'), + ('bounty_hunter_rogue'), + ('bryophyta'), + ('callisto'), + ('calvarion'), + ('cerberus'), + ('chambers_of_xeric'), + ('chambers_of_xeric_challenge_mode'), + ('chaos_elemental'), + ('chaos_fanatic'), + ('commander_zilyana'), + ('corporeal_beast'), + ('crazy_archaeologist'), + ('cs_all'), + ('cs_beginner'), + ('cs_easy'), + ('cs_elite'), + ('cs_hard'), + ('cs_master'), + ('cs_medium'), + ('dagannoth_prime'), + ('dagannoth_rex'), + ('dagannoth_supreme'), + ('deranged_archaeologist'), + ('duke_sucellus'), + ('general_graardor'), + ('giant_mole'), + ('grotesque_guardians'), + ('hespori'), + ('kalphite_queen'), + ('king_black_dragon'), + ('kraken'), + ('kreearra'), + ('kril_tsutsaroth'), + ('league'), + ('lms_rank'), + ('mimic'), + ('nex'), + ('nightmare'), + ('obor'), + ('phantom_muspah'), + ('phosanis_nightmare'), + ('rifts_closed'), + ('sarachnis'), + ('scorpia'), + ('skotizo'), + ('soul_wars_zeal'), + ('spindel'), + ('tempoross'), + ('the_corrupted_gauntlet'), + ('the_gauntlet'), + ('the_leviathan'), + ('the_whisperer'), + ('theatre_of_blood'), + ('theatre_of_blood_hard'), + ('thermonuclear_smoke_devil'), + ('tombs_of_amascut'), + ('tombs_of_amascut_expert'), + ('tzkal_zuk'), + ('tztok_jad'), + ('vardorvis'), + ('venenatis'), + ('vetion'), + ('vorkath'), + ('wintertodt'), + ('zalcano'), + ('zulrah') +; \ No newline at end of file diff --git a/src/app/repositories/__init__.py b/src/app/repositories/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/repositories/abc.py b/src/app/repositories/abc.py new file mode 100644 index 0000000..c615ab2 --- /dev/null +++ b/src/app/repositories/abc.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod + +from database.database import SessionFactory + + +class ABCRepo(ABC): + """ + Abstract base class for repositories. + """ + + async def _get_session(self): + return SessionFactory() + + @abstractmethod + async def create(self, data): + """ + Creates a new entity. + + Raises: + NotImplementedError: This method must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the create method") + + @abstractmethod + async def request(self, id): + """ + Retrieves an entity by its ID. + + Raises: + NotImplementedError: This method must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the request method") + + @abstractmethod + async def update(self, id, data): + """ + Updates an existing entity. + + Raises: + NotImplementedError: This method must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the update method") + + @abstractmethod + async def delete(self, id): + """ + Deletes an entity by its ID. + + Raises: + NotImplementedError: This method must be implemented in subclasses. + """ + raise NotImplementedError("Subclasses must implement the delete method") diff --git a/src/app/repositories/activities.py b/src/app/repositories/activities.py new file mode 100644 index 0000000..78f4d0a --- /dev/null +++ b/src/app/repositories/activities.py @@ -0,0 +1,75 @@ +from app.repositories.abc import ABCRepo +from app.schemas.input.activities import Activities, PlayerActivities +from database.models.activities import Activities as ActivitiesDB +from database.models.activities import PlayerActivities as PlayerActivitiesDB +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncSession + + +class ActivitiesRepo(ABCRepo): + """Repository for managing activity data.""" + + def __init__(self) -> None: + """Initializes the ActivitiesRepo instance.""" + super().__init__() + + async def create(self, data): + return await super().create(data) + + async def request(self, activity_id: int = None) -> list[Activities]: + """Retrieves activity data from the database. + + Args: + activity_id: Optional activity ID to filter by. + + Returns: + A list of ActivitiesDB objects representing the retrieved activities. + """ + table = ActivitiesDB + sql = select(table) + + if activity_id: + sql = sql.where(table.activity_id == activity_id) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + data = await session.execute(sql) + data = data.scalars() + return [Activities(**d.__dict__) for d in data] + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) + + +class PlayerActivitiesRepo(ABCRepo): + """Repository for managing player activity data.""" + + def __init__(self) -> None: + """Initializes the PlayerActivitiesRepo instance.""" + super().__init__() + + async def create(self, data: list[PlayerActivities]) -> None: + """Creates new player activity entries in the database. + + Args: + data: A list of PlayerActivities objects containing player activity information. + """ + data = [d.model_dump() for d in data] + table = PlayerActivitiesDB + sql = insert(table).values(data) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + await session.execute(sql) + + async def request(self, id): + return await super().request(id) + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) diff --git a/src/app/repositories/highscore.py b/src/app/repositories/highscore.py new file mode 100644 index 0000000..84a8c05 --- /dev/null +++ b/src/app/repositories/highscore.py @@ -0,0 +1,69 @@ +from app.repositories.abc import ABCRepo +from app.schemas.input.highscore import PlayerHiscoreData +from app.schemas.input.player import Player +from database.models.highscores import PlayerHiscoreData as PlayerHiscoreDataDB +from database.models.player import Player as PlayerDB +from sqlalchemy import insert, select, update +from sqlalchemy.ext.asyncio import AsyncSession + + +class HighscoreRepo(ABCRepo): + """Repository for managing highscore data.""" + + def __init__(self) -> None: + """Initializes the HighscoreRepo instance.""" + super().__init__() + + async def create( + self, highscore_data: list[PlayerHiscoreData], player_data: list[Player] + ) -> None: + """Creates new highscore entries and updates player data in the database. + + Args: + highscore_data: A list of PlayerHiscoreData objects containing highscore information. + player_data: A list of Player objects containing player information to update. + """ + table_highscore = PlayerHiscoreDataDB + table_player = PlayerDB + highscore_data = [d.model_dump() for d in highscore_data] + + sql_insert = insert(table_highscore).values(highscore_data) + sql_insert = sql_insert.prefix_with("IGNORE") + + sql_update = update(table_player) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + await session.execute(sql_insert) # Insert highscore data + + for player in player_data: + sql_update = sql_update.where(table_player.id == player.id) + sql_update = sql_update.values(player.model_dump()) + await session.execute(sql_update) # Update player data + await session.commit() + + async def request(self, id: list[int] = None) -> list[PlayerHiscoreDataDB]: + """Retrieves highscore data from the database. + + Args: + id: Optional list of highscore IDs to filter by. + + Returns: + A list of PlayerHiscoreDataDB objects representing the retrieved highscores. + """ + table = PlayerHiscoreDataDB + sql = select(table) + if id: + sql = sql.where(table.id.in_(id)) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + data = await session.execute(sql) + data = await data.all() + return data + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) diff --git a/src/app/repositories/player.py b/src/app/repositories/player.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/repositories/scraper_data.py b/src/app/repositories/scraper_data.py new file mode 100644 index 0000000..e919faa --- /dev/null +++ b/src/app/repositories/scraper_data.py @@ -0,0 +1,86 @@ +import logging + +from app.repositories.abc import ABCRepo +from app.schemas.input.activities import PlayerActivities +from app.schemas.input.player import Player +from app.schemas.input.skills import PlayerSkills +from app.schemas.scraper_data import ScraperCreate, ScraperData +from database.models.activities import PlayerActivities as PlayerActivitiesDB +from database.models.player import Player as PlayerDB +from database.models.scraper_data import ScraperData as ScraperDataDB +from database.models.skills import PlayerSkills as PlayerSkillsDB +from sqlalchemy import and_, insert, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +logger = logging.getLogger(__name__) + + +class ScraperDataRepo(ABCRepo): + """Repository for managing skill data.""" + + def __init__(self) -> None: + """Initializes the SkillsRepo instance.""" + super().__init__() + + async def request(self, id): + return await super().request(id) + + async def create( + self, + highscore_data: list[ + tuple[list[PlayerSkills], list[PlayerActivities], ScraperCreate] + ], + player_data: list[Player], + ) -> None: + table = ScraperDataDB + + async with await self._get_session() as session: + session: AsyncSession + skills = [] + activities = [] + for data in highscore_data: + # insert scraperdata + await session.execute( + insert(table).values(data[2].model_dump()).prefix_with("ignore") + ) + scraper_record = await session.execute( + select(table).where( + and_( + table.player_id == data[2].player_id, + table.record_date == data[2].created_at.date(), + ) + ) + ) + scraper_record = scraper_record.scalars() + scraper_record = [ScraperData(**s.__dict__) for s in scraper_record] + assert len(scraper_record) == 1 + scraper_record = scraper_record[0] + + for d in data[0]: + d.scraper_id = scraper_record.scraper_id + skills.append(d.model_dump()) + for d in data[1]: + d.scraper_id = scraper_record.scraper_id + activities.append(d.model_dump()) + + await session.execute( + insert(PlayerActivitiesDB).values(activities).prefix_with("ignore") + ) + await session.execute( + insert(PlayerSkillsDB).values(skills).prefix_with("ignore") + ) + + for player in player_data: + await session.execute( + update(PlayerDB) + .values(player.model_dump()) + .where(PlayerDB.id == player.id) + ) + await session.commit() + return None + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) diff --git a/src/app/repositories/skills.py b/src/app/repositories/skills.py new file mode 100644 index 0000000..aa98ec2 --- /dev/null +++ b/src/app/repositories/skills.py @@ -0,0 +1,79 @@ +import logging + +from app.repositories.abc import ABCRepo +from app.schemas.input.skills import PlayerSkills, Skills +from database.models.skills import PlayerSkills as PlayerSkillsDB +from database.models.skills import Skills as SkillsDB +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncSession + +logger = logging.getLogger(__name__) + + +class SkillsRepo(ABCRepo): + """Repository for managing skill data.""" + + def __init__(self) -> None: + """Initializes the SkillsRepo instance.""" + super().__init__() + + async def create(self, data): + return await super().create(data) + + async def request(self, skill_id: int = None) -> list[Skills]: + """Retrieves skill data from the database. + + Args: + skill_id: Optional skill ID to filter by. + + Returns: + A list of SkillsDB objects representing the retrieved skills. + """ + table = SkillsDB + sql = select(table) + + if skill_id: + sql = sql.where(table.skill_id == skill_id) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + data = await session.execute(sql) + data = data.scalars() + return [Skills(**d.__dict__) for d in data] + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) + + +class PlayerSkillsRepo(ABCRepo): + """Repository for managing player skill data.""" + + def __init__(self) -> None: + """Initializes the PlayerSkillsRepo instance.""" + super().__init__() + + async def create(self, data: list[PlayerSkills]) -> None: + """Creates new player skill entries in the database. + + Args: + data: A list of PlayerSkills objects containing player skill information. + """ + data = [d.model_dump() for d in data] + table = PlayerSkillsDB + sql = insert(table).values(data) + + async with await self._get_session() as session: + session: AsyncSession # Type annotation for clarity + await session.execute(sql) + + async def request(self, id): + return await super().request(id) + + async def update(self, id, data): + return await super().update(id, data) + + async def delete(self, id): + return await super().delete(id) diff --git a/src/app/schemas/input/__init__.py b/src/app/schemas/input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/schemas/input/activities.py b/src/app/schemas/input/activities.py new file mode 100644 index 0000000..21f20bc --- /dev/null +++ b/src/app/schemas/input/activities.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel, ConfigDict +from typing import Optional + +class Activities(BaseModel): + model_config = ConfigDict(from_attributes=True) + + activity_id: int + activity_name: str + + +class PlayerActivities(BaseModel): + model_config = ConfigDict(from_attributes=True) + + scraper_id: Optional[int] = None + activity_id: int + activity_value: int diff --git a/src/app/schemas/highscores.py b/src/app/schemas/input/highscore.py similarity index 94% rename from src/app/schemas/highscores.py rename to src/app/schemas/input/highscore.py index 00fd30d..4d4f919 100644 --- a/src/app/schemas/highscores.py +++ b/src/app/schemas/input/highscore.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, ConfigDict -class playerHiscoreData(BaseModel): +class PlayerHiscoreData(BaseModel): model_config = ConfigDict(from_attributes=True) # id: Optional[int] = None diff --git a/src/app/schemas/input/message.py b/src/app/schemas/input/message.py new file mode 100644 index 0000000..9b1e7ce --- /dev/null +++ b/src/app/schemas/input/message.py @@ -0,0 +1,8 @@ +from app.schemas.input.highscore import PlayerHiscoreData +from app.schemas.input.player import Player +from pydantic import BaseModel + + +class Message(BaseModel): + hiscores: PlayerHiscoreData | None + player: Player | None diff --git a/src/app/schemas/input/player.py b/src/app/schemas/input/player.py new file mode 100644 index 0000000..8b18339 --- /dev/null +++ b/src/app/schemas/input/player.py @@ -0,0 +1,21 @@ +from typing import Optional + +from pydantic import BaseModel, ConfigDict + + +class Player(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: int + name: str + possible_ban: Optional[bool] = None + confirmed_ban: Optional[bool] = None + confirmed_player: Optional[bool] = None + label_id: Optional[int] = None + label_jagex: Optional[int] = None + ironman: Optional[bool] = None + hardcore_ironman: Optional[bool] = None + ultimate_ironman: Optional[bool] = None + normalized_name: Optional[bool] = None + created_at: Optional[str] = None + updated_at: Optional[str] = None diff --git a/src/app/schemas/input/skills.py b/src/app/schemas/input/skills.py new file mode 100644 index 0000000..06f9309 --- /dev/null +++ b/src/app/schemas/input/skills.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel, ConfigDict +from typing import Optional + +class Skills(BaseModel): + model_config = ConfigDict(from_attributes=True) + + skill_id: int + skill_name: str + + +class PlayerSkills(BaseModel): + model_config = ConfigDict(from_attributes=True) + + scraper_id: Optional[int] = None + skill_id: int + skill_value: int diff --git a/src/app/schemas/output/__init__.py b/src/app/schemas/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/schemas/scraper_data.py b/src/app/schemas/scraper_data.py new file mode 100644 index 0000000..77aecf2 --- /dev/null +++ b/src/app/schemas/scraper_data.py @@ -0,0 +1,17 @@ +from datetime import date, datetime + +from pydantic import BaseModel, ConfigDict + + +class ScraperCreate(BaseModel): + player_id: int + created_at: datetime + + +class ScraperData(ScraperCreate): + model_config = ConfigDict(from_attributes=True) + + # ScraperCreate.player_id + # ScraperCreate.created_at + scraper_id: int + record_date: date diff --git a/src/core/__init__.py b/src/core/__init__.py index 4d49c29..5496911 100644 --- a/src/core/__init__.py +++ b/src/core/__init__.py @@ -1 +1 @@ -from . import logging \ No newline at end of file +from . import logging diff --git a/src/database/database.py b/src/database/database.py index e198895..68d5a0d 100644 --- a/src/database/database.py +++ b/src/database/database.py @@ -1,8 +1,17 @@ +from core.config import settings from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker -from core.config import settings + +class SessionContextManager: + async def __aenter__(self): + self.session = await get_session() + return self.session + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.session.close() + # Create an async SQLAlchemy engine engine = create_async_engine( @@ -23,8 +32,8 @@ # async def get_session() -> AsyncSession: # async with SessionFactory() as session: # yield session -async def get_session() -> AsyncSession: - return SessionFactory() +async def get_session(): + yield SessionFactory() Base = declarative_base() diff --git a/src/database/models/activities.py b/src/database/models/activities.py new file mode 100644 index 0000000..1053c13 --- /dev/null +++ b/src/database/models/activities.py @@ -0,0 +1,48 @@ +from database.database import Base +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + String, +) +from sqlalchemy.dialects.mysql import BIGINT, TINYINT + + +# CREATE TABLE activities ( +# activity_id TINYINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, # < 255 +# activity_name VARCHAR(50) NOT NULL, +# UNIQUE KEY unique_activity_name (activity_name) +# ); +class Activities(Base): + __tablename__ = "activities" + + activity_id = Column(TINYINT, primary_key=True, autoincrement=True) + activity_name = Column(String(50), nullable=False) + + # __table_args__ = (UniqueConstraint("activity_name", name="unique_activity_name"),) + + +# CREATE TABLE player_activities ( +# scraper_id BIGINT UNSIGNED NOT NULL, +# activity_id TINYINT UNSIGNED NOT NULL, +# activity_value INT UNSIGNED NOT NULL DEFAULT 0, # some guy could get over 65k kc +# FOREIGN KEY (scraper_id) REFERENCES scraper_data(scraper_id) ON DELETE CASCADE, +# FOREIGN KEY (activity_id) REFERENCES activities(activity_id) ON DELETE CASCADE, +# PRIMARY KEY (scraper_id, activity_id) +# ); + + +class PlayerActivities(Base): + __tablename__ = "player_activities" + + scraper_id = Column( + BIGINT, + ForeignKey("scraper_data.scraper_id", ondelete="CASCADE"), + primary_key=True, + ) + activity_id = Column( + TINYINT, + ForeignKey("activities.activity_id", ondelete="CASCADE"), + primary_key=True, + ) + activity_value = Column(Integer, nullable=False, default=0) diff --git a/src/database/models/highscores.py b/src/database/models/highscores.py index ab5ff9b..571e28a 100644 --- a/src/database/models/highscores.py +++ b/src/database/models/highscores.py @@ -1,16 +1,22 @@ -from sqlalchemy import BigInteger, Column, Date, DateTime, Integer, func - from database.database import Base +from sqlalchemy import ( + Column, + Date, + DateTime, + Integer, + func, +) +from sqlalchemy.dialects.mysql import BIGINT class PlayerHiscoreData(Base): __tablename__ = "playerHiscoreData" - id = Column(BigInteger, primary_key=True, autoincrement=True) + id = Column(BIGINT, primary_key=True, autoincrement=True) timestamp = Column(DateTime, nullable=False, server_default=func.now()) ts_date = Column(Date, nullable=True) Player_id = Column(Integer, nullable=False) - total = Column(BigInteger, default=0) + total = Column(BIGINT, default=0) attack = Column(Integer, default=0) defence = Column(Integer, default=0) strength = Column(Integer, default=0) diff --git a/src/database/models/scraper_data.py b/src/database/models/scraper_data.py new file mode 100644 index 0000000..04f77df --- /dev/null +++ b/src/database/models/scraper_data.py @@ -0,0 +1,33 @@ + +from database.database import Base +from sqlalchemy import ( + Column, + Date, + DateTime, + func, +) +from sqlalchemy.dialects.mysql import BIGINT, SMALLINT + + +# CREATE TABLE scraper_data ( +# scraper_id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, +# created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, +# player_id SMALLINT UNSIGNED NOT NULL, +# record_date DATE AS (DATE(created_at)) STORED, +# UNIQUE KEY unique_player_per_day (player_id, record_date) +# ); +class ScraperData(Base): + __tablename__ = "scraper_data" + + scraper_id = Column(BIGINT, primary_key=True, autoincrement=True) + created_at = Column(DateTime, nullable=False, server_default=func.now()) + player_id = Column(SMALLINT, nullable=False) + record_date = Column(Date, nullable=True) + + # __table_args__ = ( + # UniqueConstraint("player_id", "record_date", name="unique_player_per_day"), + # ) + + # __table_args__ = ( + # UniqueConstraint("player_id", "record_date", name="unique_player_per_day"), + # ) diff --git a/src/database/models/skills.py b/src/database/models/skills.py new file mode 100644 index 0000000..b664999 --- /dev/null +++ b/src/database/models/skills.py @@ -0,0 +1,46 @@ +from database.database import Base +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + String, +) +from sqlalchemy.dialects.mysql import BIGINT, TINYINT + + +# CREATE TABLE skills ( +# skill_id TINYINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, # < 255 +# skill_name VARCHAR(50) NOT NULL, +# UNIQUE KEY unique_skill_name (skill_name) +# ); +class Skills(Base): + __tablename__ = "skills" + + skill_id = Column(TINYINT, primary_key=True, autoincrement=True) + skill_name = Column(String(50), nullable=False) + + # __table_args__ = (UniqueConstraint("skill_name", name="unique_skill_name"),) + + +# CREATE TABLE player_skills ( +# scraper_id BIGINT UNSIGNED NOT NULL, +# skill_id TINYINT UNSIGNED NOT NULL, +# skill_value INT UNSIGNED NOT NULL DEFAULT 0, # < 200 000 000 +# FOREIGN KEY (scraper_id) REFERENCES scraper_data(scraper_id) ON DELETE CASCADE, +# FOREIGN KEY (skill_id) REFERENCES skills(skill_id) ON DELETE CASCADE, +# PRIMARY KEY (scraper_id, skill_id) +# ); +class PlayerSkills(Base): + __tablename__ = "player_skills" + + scraper_id = Column( + BIGINT, + ForeignKey("scraper_data.scraper_id", ondelete="CASCADE"), + primary_key=True, + ) + skill_id = Column( + TINYINT, + ForeignKey("skills.skill_id", ondelete="CASCADE"), + primary_key=True, + ) + skill_value = Column(Integer, nullable=False, default=0) diff --git a/src/kafka_setup/setup_kafka.py b/src/kafka_setup/setup_kafka.py new file mode 100644 index 0000000..1cca1c9 --- /dev/null +++ b/src/kafka_setup/setup_kafka.py @@ -0,0 +1,134 @@ +# setup_kafka.py +import json +import os +import zipfile +from queue import Queue + +from kafka import KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic + + +def create_topics(): + # Get the Kafka broker address from the environment variable + kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094") + + # Create Kafka topics + admin_client = KafkaAdminClient(bootstrap_servers=kafka_broker) + + topics = admin_client.list_topics() + print("existing topics", topics) + + if not topics == []: + admin_client.delete_topics(topics) + + res = admin_client.create_topics( + [ + NewTopic( + name="player", + num_partitions=3, + replication_factor=1, + ), + NewTopic( + name="scraper", + num_partitions=4, + replication_factor=1, + ), + NewTopic( + name="reports", + num_partitions=4, + replication_factor=1, + ), + ] + ) + + print("created_topic", res) + + topics = admin_client.list_topics() + print("all topics", topics) + return + + +def extract_zip(extract_to: str): + current_dir = "./kafka_data" # Get the current working directory + + # Find zip file in the current directory + zip_files = [file for file in os.listdir(current_dir) if file.endswith(".zip")] + + if not zip_files: + print("No zip file found in the current directory") + return + + # Select the first zip file found + for zip_file in zip_files: + print(f"extracting: {zip_file}") + zip_file_path = os.path.join(current_dir, zip_file) + # Create the extraction directory if it doesn't exist + if not os.path.exists(extract_to): + os.makedirs(extract_to) + + # Extract zipfile + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(extract_to) + return + + +def get_messages_from_json(path: str, send_queue: Queue): + paths = [] + for file_name in os.listdir(path): + print(f"{file_name=}") + if file_name.endswith(".json"): + file_path = os.path.join(path, file_name) + paths.append(file_path) + + for _path in paths: + print(f"{_path=}") + with open(_path) as file: + data = json.load(file) + print(f"{_path}:{len(data)}") + _ = [send_queue.put(item=d) for d in data] + return + + +def kafka_producer(): + kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094") + producer = KafkaProducer( + bootstrap_servers=kafka_broker, + value_serializer=lambda x: json.dumps(x).encode(), + ) + return producer + + +def send_messages(producer: KafkaProducer, send_queue: Queue, topic: str = "scraper"): + while True: + if send_queue.empty(): + break + + if send_queue.qsize() % 100 == 0: + print(f"{send_queue.qsize()=}") + message = send_queue.get() + producer.send(topic=topic, value=message) + send_queue.task_done() + + +def insert_data(): + send_queue = Queue() + extract_to = "kafka_data" + producer = kafka_producer() + + print("extract_zip") + extract_zip(extract_to) + print("get_messages_from_json") + get_messages_from_json(extract_to, send_queue=send_queue) + print("send_messages") + send_messages(producer=producer, send_queue=send_queue) + + +def main(): + print("create_topics") + create_topics() + print("insert_data") + insert_data() + print("done") + + +main() diff --git a/src/main.py b/src/main.py index 27c6247..124d6e0 100644 --- a/src/main.py +++ b/src/main.py @@ -1,66 +1,41 @@ import asyncio -import json import logging import time import traceback from asyncio import Queue - -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer -from app.schemas.highscores import playerHiscoreData as playerHiscoreDataSchema +from functools import wraps + +import my_kafka as my_kafka +from app.repositories.activities import ActivitiesRepo + +# schemas import +from app.repositories.highscore import HighscoreRepo +from app.repositories.scraper_data import ScraperDataRepo +from app.repositories.skills import SkillsRepo +from app.schemas.input.activities import Activities, PlayerActivities +from app.schemas.input.message import Message +from app.schemas.input.skills import PlayerSkills, Skills +from app.schemas.scraper_data import ScraperCreate from core.config import settings -from database.database import get_session -from database.models.highscores import PlayerHiscoreData -from database.models.player import Player -from sqlalchemy import insert, update from sqlalchemy.exc import IntegrityError, OperationalError -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.sql.expression import Insert, Update logger = logging.getLogger(__name__) -async def kafka_consumer(topic: str, group: str): - consumer = AIOKafkaConsumer( - topic, - bootstrap_servers=[settings.KAFKA_HOST], - group_id=group, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - auto_offset_reset="earliest", - ) - await consumer.start() - return consumer - - -async def kafka_producer(): - producer = AIOKafkaProducer( - bootstrap_servers=[settings.KAFKA_HOST], - value_serializer=lambda v: json.dumps(v).encode(), - acks="all", - ) - await producer.start() - return producer - +def async_timeit(func): + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = time.time() + result = await func(*args, **kwargs) + end_time = time.time() + logger.debug( + f"Execution time for {func.__name__}: {end_time - start_time} seconds" + ) + return result -async def receive_messages( - consumer: AIOKafkaConsumer, receive_queue: Queue, error_queue: Queue -): - async for message in consumer: - if error_queue.qsize() > 100: - await asyncio.sleep(1) - continue - value = message.value - await receive_queue.put(value) + return wrapper -async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queue): - while True: - if send_queue.empty(): - await asyncio.sleep(1) - continue - message = await send_queue.get() - await producer.send(topic, value=message) - send_queue.task_done() - def log_speed( counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15 ) -> tuple[float, int]: @@ -85,31 +60,82 @@ def log_speed( # Return the current time and reset the counter to zero return time.time(), 0 -async def insert_data(batch: list[dict], error_queue:Queue): + +@async_timeit +async def insert_data_v1(batch: list[Message], error_queue: Queue): try: - highscores:list[dict] = [msg.get("hiscores") for msg in batch] - players:list[dict] = [msg.get("player") for msg in batch] + highscores = [msg.hiscores for msg in batch if msg.hiscores] + players = [msg.player for msg in batch if msg.player] + + logger.info(f"Received: {len(players)=}, {len(highscores)=}") + + repo = HighscoreRepo() + await repo.create(highscore_data=highscores, player_data=players) + except (OperationalError, IntegrityError) as e: + for message in batch: + await error_queue.put(message) + + logger.error({"error": e}) + logger.info(f"error_qsize={error_queue.qsize()}, {message=}") + except Exception as e: + for message in batch: + await error_queue.put(message) - highscores = [playerHiscoreDataSchema(**hs) for hs in highscores if hs] - highscores = [hs.model_dump(mode="json") for hs in highscores ] + logger.error({"error": e}) + logger.debug(f"Traceback: \n{traceback.format_exc()}") + logger.info(f"error_qsize={error_queue.qsize()}, {message=}") + + +@async_timeit +async def insert_data_v2(batch: list[Message], error_queue: Queue): + try: + highscores = [msg.hiscores for msg in batch if msg.hiscores] + players = [msg.player for msg in batch if msg.player] - session: AsyncSession = await get_session() - logger.info(f"Received: {len(players)=}, {len(highscores)=}") - # start a transaction - async with session.begin(): - # insert into table values () - insert_sql:Insert = insert(PlayerHiscoreData) - insert_sql = insert_sql.values(highscores) - insert_sql = insert_sql.prefix_with("ignore") - await session.execute(insert_sql) - # update table - for player in players: - update_sql:Update = update(Player) - update_sql = update_sql.where(Player.id == player.get("id")) - update_sql = update_sql.values(player) - await session.execute(update_sql) + scraper_repo = ScraperDataRepo() + + skills_repo = SkillsRepo() + activities_repo = ActivitiesRepo() + + skills = {s.skill_name: s for s in await skills_repo.request()} + + activities = {a.activity_name: a for a in await activities_repo.request()} + + highscore_data = [] + scraper_data = [] + for highscore in highscores: + player_skills: list[PlayerSkills] = [] + player_activities: list[PlayerActivities] = [] + scraper_data = ScraperCreate( + player_id=highscore.Player_id, created_at=highscore.timestamp + ) + _highscore = highscore.model_dump() + assert isinstance(_highscore, dict) + # logger.info(_highscore) + for k, v in _highscore.items(): + if k in skills.keys(): + skill = skills.get(k) + assert isinstance(skill, Skills) + player_skills.append( + PlayerSkills( + scraper_id=None, skill_id=skill.skill_id, skill_value=v + ) + ) + if k in activities.keys(): + activity = activities.get(k) + assert isinstance(activity, Activities) + player_activities.append( + PlayerActivities( + scraper_id=None, + activity_id=activity.activity_id, + activity_value=v, + ) + ) + highscore_data.append((player_skills, player_activities, scraper_data)) + # logger.info(f"{highscore_data[0]}, {players[0]}") + await scraper_repo.create(highscore_data=highscore_data, player_data=players) except (OperationalError, IntegrityError) as e: for message in batch: await error_queue.put(message) @@ -124,6 +150,7 @@ async def insert_data(batch: list[dict], error_queue:Queue): logger.debug(f"Traceback: \n{traceback.format_exc()}") logger.info(f"error_qsize={error_queue.qsize()}, {message=}") + async def process_data(receive_queue: Queue, error_queue: Queue): # Initialize counter and start time counter = 0 @@ -140,7 +167,7 @@ async def process_data(receive_queue: Queue, error_queue: Queue): start_time=start_time, _queue=receive_queue, topic="scraper", - interval=15 + interval=15, ) # Check if queue is empty @@ -149,46 +176,50 @@ async def process_data(receive_queue: Queue, error_queue: Queue): continue # Get a message from the chosen queue - message: dict = await receive_queue.get() - - #TODO fix test data + message = await receive_queue.get() + message = Message(**message) + + # TODO fix test data if settings.ENV != "PRD": - player = message.get("player") - player_id = player.get("id") + player_id = message.player.id MIN_PLAYER_ID = 0 MAX_PLAYER_ID = 300 if not (MIN_PLAYER_ID < player_id <= MAX_PLAYER_ID): continue - + # batch message batch.append(message) now = time.time() # insert data in batches of N or interval of N - if len(batch) > 100 or now-start_time > 15: + if len(batch) > 100 or now - start_time > 15: async with semaphore: - await insert_data(batch=batch, error_queue=error_queue) + await insert_data_v1(batch=batch, error_queue=error_queue) + await insert_data_v2(batch=batch, error_queue=error_queue) batch = [] - + receive_queue.task_done() counter += 1 + async def main(): # get kafka engine - consumer = await kafka_consumer(topic="scraper", group="highscore-worker") - producer = await kafka_producer() + consumer = await my_kafka.kafka_consumer(topic="scraper", group="highscore-worker") + producer = await my_kafka.kafka_producer() receive_queue = Queue(maxsize=100) send_queue = Queue(maxsize=100) asyncio.create_task( - receive_messages( + my_kafka.receive_messages( consumer=consumer, receive_queue=receive_queue, error_queue=send_queue ) ) asyncio.create_task( - send_messages(topic="scraper", producer=producer, send_queue=send_queue) + my_kafka.send_messages( + topic="scraper", producer=producer, send_queue=send_queue + ) ) asyncio.create_task( process_data(receive_queue=receive_queue, error_queue=send_queue) diff --git a/src/my_kafka.py b/src/my_kafka.py new file mode 100644 index 0000000..0f18878 --- /dev/null +++ b/src/my_kafka.py @@ -0,0 +1,55 @@ +import asyncio +import json +import logging +from asyncio import Queue +from main import Message +from app.schemas.input.highscore import PlayerHiscoreData +from app.schemas.input.player import Player + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from core.config import settings + +logger = logging.getLogger(__name__) + + +async def kafka_consumer(topic: str, group: str): + consumer = AIOKafkaConsumer( + topic, + bootstrap_servers=[settings.KAFKA_HOST], + group_id=group, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + auto_offset_reset="earliest", + ) + await consumer.start() + return consumer + + +async def kafka_producer(): + producer = AIOKafkaProducer( + bootstrap_servers=[settings.KAFKA_HOST], + value_serializer=lambda v: v.model_dump_json().encode() if isinstance(v, (Message, PlayerHiscoreData, Player)) else json.dumps(v).encode(), + acks="all", + ) + await producer.start() + return producer + + +async def receive_messages( + consumer: AIOKafkaConsumer, receive_queue: Queue, error_queue: Queue +): + async for message in consumer: + if error_queue.qsize() > 100: + await asyncio.sleep(1) + continue + value = message.value + await receive_queue.put(value) + + +async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queue): + while True: + if send_queue.empty(): + await asyncio.sleep(1) + continue + message = await send_queue.get() + await producer.send(topic, value=message) + send_queue.task_done()