diff --git a/docker-compose.yml b/docker-compose.yml index 22e2370..b242d2d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,9 +11,9 @@ services: - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # volumes: # - ./kafka:/bitnami/kafka:rw - ports: - - 9094:9094 - - 9092:9092 + expose: + - 9094 + - 9092 healthcheck: test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] interval: 30s @@ -30,7 +30,7 @@ services: - JVM_OPTS=-Xms32M -Xmx64M - SERVER_SERVLET_CONTEXTPATH=/ ports: - - 9000:9000 + - 9999:9000 restart: on-failure networks: - botdetector-network @@ -62,8 +62,10 @@ services: volumes: - ./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d # - ./mysql/mount:/var/lib/mysql # creates persistence - ports: - - 3306:3306 + # ports: + # - 3306:3306 + expose: + - 3306 networks: - botdetector-network healthcheck: diff --git a/kafka_setup/setup_kafka.py b/kafka_setup/setup_kafka.py index 72fa57f..1cca1c9 100644 --- a/kafka_setup/setup_kafka.py +++ b/kafka_setup/setup_kafka.py @@ -1,7 +1,6 @@ # setup_kafka.py import json import os -import time import zipfile from queue import Queue diff --git a/src/app/schemas/highscores.py b/src/app/schemas/highscores.py index 3ffbc82..00fd30d 100644 --- a/src/app/schemas/highscores.py +++ b/src/app/schemas/highscores.py @@ -1,108 +1,107 @@ -from datetime import date, datetime -from typing import Optional - -from pydantic import BaseModel, ConfigDict - - -class playerHiscoreData(BaseModel): - model_config = ConfigDict(from_attributes=True) - - # id: Optional[int] = None - timestamp: datetime = datetime.utcnow() - # ts_date: Optional[date] = None - Player_id: int - total: int - attack: int - defence: int - strength: int - hitpoints: int - ranged: int - prayer: int - magic: int - cooking: int - woodcutting: int - fletching: int - fishing: int - firemaking: int - crafting: int - smithing: int - mining: int - herblore: int - agility: int - thieving: int - slayer: int - farming: int - runecraft: int - hunter: int - construction: int - league: int - bounty_hunter_hunter: int - bounty_hunter_rogue: int - cs_all: int - cs_beginner: int - cs_easy: int - cs_medium: int - cs_hard: int - cs_elite: int - cs_master: int - lms_rank: int - soul_wars_zeal: int - abyssal_sire: int - alchemical_hydra: int - barrows_chests: int - bryophyta: int - callisto: int - cerberus: int - chambers_of_xeric: int - chambers_of_xeric_challenge_mode: int - chaos_elemental: int - chaos_fanatic: int - commander_zilyana: int - corporeal_beast: int - crazy_archaeologist: int - dagannoth_prime: int - dagannoth_rex: int - dagannoth_supreme: int - deranged_archaeologist: int - general_graardor: int - giant_mole: int - grotesque_guardians: int - hespori: int - kalphite_queen: int - king_black_dragon: int - kraken: int - kreearra: int - kril_tsutsaroth: int - mimic: int - nightmare: int - nex: int = 0 - phosanis_nightmare: int - obor: int - phantom_muspah: int = 0 - sarachnis: int - scorpia: int - skotizo: int - tempoross: int = 0 - the_gauntlet: int - the_corrupted_gauntlet: int - theatre_of_blood: int - theatre_of_blood_hard: int = 0 - thermonuclear_smoke_devil: int - tombs_of_amascut: int = 0 - tombs_of_amascut_expert: int = 0 - tzkal_zuk: int - tztok_jad: int - venenatis: int - vetion: int - vorkath: int - wintertodt: int - zalcano: int - zulrah: int - rifts_closed: int = 0 - artio: int = 0 - calvarion: int = 0 - duke_sucellus: int = 0 - spindel: int = 0 - the_leviathan: int = 0 - the_whisperer: int = 0 - vardorvis: int = 0 +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class playerHiscoreData(BaseModel): + model_config = ConfigDict(from_attributes=True) + + # id: Optional[int] = None + timestamp: datetime = datetime.utcnow() + # ts_date: Optional[date] = None + Player_id: int + total: int + attack: int + defence: int + strength: int + hitpoints: int + ranged: int + prayer: int + magic: int + cooking: int + woodcutting: int + fletching: int + fishing: int + firemaking: int + crafting: int + smithing: int + mining: int + herblore: int + agility: int + thieving: int + slayer: int + farming: int + runecraft: int + hunter: int + construction: int + league: int + bounty_hunter_hunter: int + bounty_hunter_rogue: int + cs_all: int + cs_beginner: int + cs_easy: int + cs_medium: int + cs_hard: int + cs_elite: int + cs_master: int + lms_rank: int + soul_wars_zeal: int + abyssal_sire: int + alchemical_hydra: int + barrows_chests: int + bryophyta: int + callisto: int + cerberus: int + chambers_of_xeric: int + chambers_of_xeric_challenge_mode: int + chaos_elemental: int + chaos_fanatic: int + commander_zilyana: int + corporeal_beast: int + crazy_archaeologist: int + dagannoth_prime: int + dagannoth_rex: int + dagannoth_supreme: int + deranged_archaeologist: int + general_graardor: int + giant_mole: int + grotesque_guardians: int + hespori: int + kalphite_queen: int + king_black_dragon: int + kraken: int + kreearra: int + kril_tsutsaroth: int + mimic: int + nightmare: int + nex: int = 0 + phosanis_nightmare: int + obor: int + phantom_muspah: int = 0 + sarachnis: int + scorpia: int + skotizo: int + tempoross: int = 0 + the_gauntlet: int + the_corrupted_gauntlet: int + theatre_of_blood: int + theatre_of_blood_hard: int = 0 + thermonuclear_smoke_devil: int + tombs_of_amascut: int = 0 + tombs_of_amascut_expert: int = 0 + tzkal_zuk: int + tztok_jad: int + venenatis: int + vetion: int + vorkath: int + wintertodt: int + zalcano: int + zulrah: int + rifts_closed: int = 0 + artio: int = 0 + calvarion: int = 0 + duke_sucellus: int = 0 + spindel: int = 0 + the_leviathan: int = 0 + the_whisperer: int = 0 + vardorvis: int = 0 diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/logging.py b/src/core/logging.py index ced1460..5d282a1 100644 --- a/src/core/logging.py +++ b/src/core/logging.py @@ -1,7 +1,6 @@ import json import logging import sys -import warnings from core.config import settings diff --git a/src/database/database.py b/src/database/database.py index 60832c6..e198895 100644 --- a/src/database/database.py +++ b/src/database/database.py @@ -1,4 +1,3 @@ -import sqlalchemy from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker diff --git a/src/database/models/highscores.py b/src/database/models/highscores.py index a96bd82..ab5ff9b 100644 --- a/src/database/models/highscores.py +++ b/src/database/models/highscores.py @@ -1,107 +1,107 @@ -from sqlalchemy import BigInteger, Column, Date, DateTime, Integer, func - -from database.database import Base - - -class PlayerHiscoreData(Base): - __tablename__ = "playerHiscoreData" - - id = Column(BigInteger, primary_key=True, autoincrement=True) - timestamp = Column(DateTime, nullable=False, server_default=func.now()) - ts_date = Column(Date, nullable=True) - Player_id = Column(Integer, nullable=False) - total = Column(BigInteger, default=0) - attack = Column(Integer, default=0) - defence = Column(Integer, default=0) - strength = Column(Integer, default=0) - hitpoints = Column(Integer, default=0) - ranged = Column(Integer, default=0) - prayer = Column(Integer, default=0) - magic = Column(Integer, default=0) - cooking = Column(Integer, default=0) - woodcutting = Column(Integer, default=0) - fletching = Column(Integer, default=0) - fishing = Column(Integer, default=0) - firemaking = Column(Integer, default=0) - crafting = Column(Integer, default=0) - smithing = Column(Integer, default=0) - mining = Column(Integer, default=0) - herblore = Column(Integer, default=0) - agility = Column(Integer, default=0) - thieving = Column(Integer, default=0) - slayer = Column(Integer, default=0) - farming = Column(Integer, default=0) - runecraft = Column(Integer, default=0) - hunter = Column(Integer, default=0) - construction = Column(Integer, default=0) - league = Column(Integer, default=0) - bounty_hunter_hunter = Column(Integer, default=0) - bounty_hunter_rogue = Column(Integer, default=0) - cs_all = Column(Integer, default=0) - cs_beginner = Column(Integer, default=0) - cs_easy = Column(Integer, default=0) - cs_medium = Column(Integer, default=0) - cs_hard = Column(Integer, default=0) - cs_elite = Column(Integer, default=0) - cs_master = Column(Integer, default=0) - lms_rank = Column(Integer, default=0) - soul_wars_zeal = Column(Integer, default=0) - abyssal_sire = Column(Integer, default=0) - alchemical_hydra = Column(Integer, default=0) - barrows_chests = Column(Integer, default=0) - bryophyta = Column(Integer, default=0) - callisto = Column(Integer, default=0) - cerberus = Column(Integer, default=0) - chambers_of_xeric = Column(Integer, default=0) - chambers_of_xeric_challenge_mode = Column(Integer, default=0) - chaos_elemental = Column(Integer, default=0) - chaos_fanatic = Column(Integer, default=0) - commander_zilyana = Column(Integer, default=0) - corporeal_beast = Column(Integer, default=0) - crazy_archaeologist = Column(Integer, default=0) - dagannoth_prime = Column(Integer, default=0) - dagannoth_rex = Column(Integer, default=0) - dagannoth_supreme = Column(Integer, default=0) - deranged_archaeologist = Column(Integer, default=0) - general_graardor = Column(Integer, default=0) - giant_mole = Column(Integer, default=0) - grotesque_guardians = Column(Integer, default=0) - hespori = Column(Integer, default=0) - kalphite_queen = Column(Integer, default=0) - king_black_dragon = Column(Integer, default=0) - kraken = Column(Integer, default=0) - kreearra = Column(Integer, default=0) - kril_tsutsaroth = Column(Integer, default=0) - mimic = Column(Integer, default=0) - nex = Column(Integer, default=0) - nightmare = Column(Integer, default=0) - phosanis_nightmare = Column(Integer, default=0) - obor = Column(Integer, default=0) - phantom_muspah = Column(Integer, default=0) - sarachnis = Column(Integer, default=0) - scorpia = Column(Integer, default=0) - skotizo = Column(Integer, default=0) - tempoross = Column(Integer, default=0) - the_gauntlet = Column(Integer, default=0) - the_corrupted_gauntlet = Column(Integer, default=0) - theatre_of_blood = Column(Integer, default=0) - theatre_of_blood_hard = Column(Integer, default=0) - thermonuclear_smoke_devil = Column(Integer, default=0) - tombs_of_amascut = Column(Integer, default=0) - tombs_of_amascut_expert = Column(Integer, default=0) - tzkal_zuk = Column(Integer, default=0) - tztok_jad = Column(Integer, default=0) - venenatis = Column(Integer, default=0) - vetion = Column(Integer, default=0) - vorkath = Column(Integer, default=0) - wintertodt = Column(Integer, default=0) - zalcano = Column(Integer, default=0) - zulrah = Column(Integer, default=0) - rifts_closed = Column(Integer, default=0) - artio = Column(Integer, default=0) - calvarion = Column(Integer, default=0) - duke_sucellus = Column(Integer, default=0) - spindel = Column(Integer, default=0) - the_leviathan = Column(Integer, default=0) - the_whisperer = Column(Integer, default=0) - vardorvis = Column(Integer, default=0) +from sqlalchemy import BigInteger, Column, Date, DateTime, Integer, func + +from database.database import Base + + +class PlayerHiscoreData(Base): + __tablename__ = "playerHiscoreData" + + id = Column(BigInteger, primary_key=True, autoincrement=True) + timestamp = Column(DateTime, nullable=False, server_default=func.now()) + ts_date = Column(Date, nullable=True) + Player_id = Column(Integer, nullable=False) + total = Column(BigInteger, default=0) + attack = Column(Integer, default=0) + defence = Column(Integer, default=0) + strength = Column(Integer, default=0) + hitpoints = Column(Integer, default=0) + ranged = Column(Integer, default=0) + prayer = Column(Integer, default=0) + magic = Column(Integer, default=0) + cooking = Column(Integer, default=0) + woodcutting = Column(Integer, default=0) + fletching = Column(Integer, default=0) + fishing = Column(Integer, default=0) + firemaking = Column(Integer, default=0) + crafting = Column(Integer, default=0) + smithing = Column(Integer, default=0) + mining = Column(Integer, default=0) + herblore = Column(Integer, default=0) + agility = Column(Integer, default=0) + thieving = Column(Integer, default=0) + slayer = Column(Integer, default=0) + farming = Column(Integer, default=0) + runecraft = Column(Integer, default=0) + hunter = Column(Integer, default=0) + construction = Column(Integer, default=0) + league = Column(Integer, default=0) + bounty_hunter_hunter = Column(Integer, default=0) + bounty_hunter_rogue = Column(Integer, default=0) + cs_all = Column(Integer, default=0) + cs_beginner = Column(Integer, default=0) + cs_easy = Column(Integer, default=0) + cs_medium = Column(Integer, default=0) + cs_hard = Column(Integer, default=0) + cs_elite = Column(Integer, default=0) + cs_master = Column(Integer, default=0) + lms_rank = Column(Integer, default=0) + soul_wars_zeal = Column(Integer, default=0) + abyssal_sire = Column(Integer, default=0) + alchemical_hydra = Column(Integer, default=0) + barrows_chests = Column(Integer, default=0) + bryophyta = Column(Integer, default=0) + callisto = Column(Integer, default=0) + cerberus = Column(Integer, default=0) + chambers_of_xeric = Column(Integer, default=0) + chambers_of_xeric_challenge_mode = Column(Integer, default=0) + chaos_elemental = Column(Integer, default=0) + chaos_fanatic = Column(Integer, default=0) + commander_zilyana = Column(Integer, default=0) + corporeal_beast = Column(Integer, default=0) + crazy_archaeologist = Column(Integer, default=0) + dagannoth_prime = Column(Integer, default=0) + dagannoth_rex = Column(Integer, default=0) + dagannoth_supreme = Column(Integer, default=0) + deranged_archaeologist = Column(Integer, default=0) + general_graardor = Column(Integer, default=0) + giant_mole = Column(Integer, default=0) + grotesque_guardians = Column(Integer, default=0) + hespori = Column(Integer, default=0) + kalphite_queen = Column(Integer, default=0) + king_black_dragon = Column(Integer, default=0) + kraken = Column(Integer, default=0) + kreearra = Column(Integer, default=0) + kril_tsutsaroth = Column(Integer, default=0) + mimic = Column(Integer, default=0) + nex = Column(Integer, default=0) + nightmare = Column(Integer, default=0) + phosanis_nightmare = Column(Integer, default=0) + obor = Column(Integer, default=0) + phantom_muspah = Column(Integer, default=0) + sarachnis = Column(Integer, default=0) + scorpia = Column(Integer, default=0) + skotizo = Column(Integer, default=0) + tempoross = Column(Integer, default=0) + the_gauntlet = Column(Integer, default=0) + the_corrupted_gauntlet = Column(Integer, default=0) + theatre_of_blood = Column(Integer, default=0) + theatre_of_blood_hard = Column(Integer, default=0) + thermonuclear_smoke_devil = Column(Integer, default=0) + tombs_of_amascut = Column(Integer, default=0) + tombs_of_amascut_expert = Column(Integer, default=0) + tzkal_zuk = Column(Integer, default=0) + tztok_jad = Column(Integer, default=0) + venenatis = Column(Integer, default=0) + vetion = Column(Integer, default=0) + vorkath = Column(Integer, default=0) + wintertodt = Column(Integer, default=0) + zalcano = Column(Integer, default=0) + zulrah = Column(Integer, default=0) + rifts_closed = Column(Integer, default=0) + artio = Column(Integer, default=0) + calvarion = Column(Integer, default=0) + duke_sucellus = Column(Integer, default=0) + spindel = Column(Integer, default=0) + the_leviathan = Column(Integer, default=0) + the_whisperer = Column(Integer, default=0) + vardorvis = Column(Integer, default=0) diff --git a/src/main.py b/src/main.py index 485e8eb..3cbd5be 100644 --- a/src/main.py +++ b/src/main.py @@ -4,20 +4,17 @@ import time import traceback from asyncio import Queue -from datetime import datetime from aiokafka import AIOKafkaConsumer, AIOKafkaProducer -from sqlalchemy import insert, select, update -from sqlalchemy.exc import OperationalError -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.sql.expression import Insert, Select, Update - -import core.logging # for log formatting from app.schemas.highscores import playerHiscoreData as playerHiscoreDataSchema from core.config import settings from database.database import get_session from database.models.highscores import PlayerHiscoreData from database.models.player import Player +from sqlalchemy import insert, update +from sqlalchemy.exc import OperationalError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql.expression import Insert, Update logger = logging.getLogger(__name__) @@ -64,43 +61,6 @@ async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queu await producer.send(topic, value=message) send_queue.task_done() - -# TODO: pydantic data -async def insert_highscore(session: AsyncSession, data: dict): - player_id = data.get("Player_id") - timestamp = data.get("timestamp") - - # Convert the timestamp to a date format - ts_date = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S").date() - - table = PlayerHiscoreData - - select_query: Select = select(table).where( - (table.Player_id == player_id) & (table.ts_date == ts_date) - ) - - # Check if the record exists - existing_record = await session.execute(select_query) - existing_record = existing_record.scalars().first() - - if not existing_record: - data = playerHiscoreDataSchema(**data) - data = data.model_dump(mode="json") - insert_query: Insert = insert(table).values(data).prefix_with("ignore") - await session.execute(insert_query) - - -# TODO: pydantic data -async def update_player(session: AsyncSession, data: dict): - player_id = data.get("id") - - table = Player - query: Update = update(table=table) - query = query.where(Player.id == player_id) - query = query.values(data) - await session.execute(query) - - def log_speed( counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15 ) -> tuple[float, int]: @@ -125,13 +85,51 @@ def log_speed( # Return the current time and reset the counter to zero return time.time(), 0 +async def insert_data(batch: list[dict], error_queue:Queue): + try: + highscores:list[dict] = [msg.get("hiscores") for msg in batch] + players:list[dict] = [msg.get("player") for msg in batch] + + highscores = [playerHiscoreDataSchema(**hs) for hs in highscores if hs] + highscores = [hs.model_dump(mode="json") for hs in highscores ] + + session: AsyncSession = await get_session() + + logger.info(f"Received: {len(players)=}, {len(highscores)=}") + + # start a transaction + async with session.begin(): + # insert into table values () + insert_sql:Insert = insert(PlayerHiscoreData) + insert_sql = insert_sql.values(highscores) + insert_sql = insert_sql.prefix_with("ignore") + await session.execute(insert_sql) + # update table + for player in players: + update_sql:Update = update(Player) + update_sql = update_sql.where(Player.id == player.get("id")) + update_sql = update_sql.values(player) + await session.execute(update_sql) + except OperationalError as e: + for message in batch: + await error_queue.put(message) + + logger.error({"error": e}) + logger.info(f"error_qsize={error_queue.qsize()}, {message=}") + except Exception as e: + for message in batch: + await error_queue.put(message) + + logger.error({"error": e}) + logger.debug(f"Traceback: \n{traceback.format_exc()}") + logger.info(f"error_qsize={error_queue.qsize()}, {message=}") -# Define a function to process data from a queue async def process_data(receive_queue: Queue, error_queue: Queue): # Initialize counter and start time counter = 0 start_time = time.time() + batch = [] # Run indefinitely while True: start_time, counter = log_speed( @@ -148,54 +146,27 @@ async def process_data(receive_queue: Queue, error_queue: Queue): # Get a message from the chosen queue message: dict = await receive_queue.get() - - # Extract 'hiscores' and 'player' dictionaries from the message - highscore: dict = message.get("hiscores") - player: dict = message.get("player") - - # Check the environment and filter out certain player IDs if not in production + + #TODO fix test data if settings.ENV != "PRD": + player = message.get("player") player_id = player.get("id") MIN_PLAYER_ID = 0 MAX_PLAYER_ID = 300 if not (MIN_PLAYER_ID < player_id <= MAX_PLAYER_ID): continue + + # batch message + batch.append(message) - try: - # Acquire an asynchronous database session - session: AsyncSession = await get_session() - async with session.begin(): - # If 'highscore' dictionary is present, insert it into the database - if highscore: - await insert_highscore(session=session, data=highscore) - # Update the player information in the database - await update_player(session=session, data=player) - # Commit the changes to the database - await session.commit() - # Mark the message as processed in the queue - receive_queue.task_done() - except OperationalError as e: - await error_queue.put(message) - # Handle exceptions, log the error, and put the message in the error queue - logger.error({"error": e}) - logger.info(f"error_qsize={error_queue.qsize()}, {message=}") - # Mark the message as processed in the queue and continue to the next iteration - receive_queue.task_done() - continue - except Exception as e: - await error_queue.put(message) - # Handle exceptions, log the error, and put the message in the error queue - logger.error({"error": e}) - logger.debug(f"Traceback: \n{traceback.format_exc()}") - logger.info(f"error_qsize={error_queue.qsize()}, {message=}") - # Mark the message as processed in the queue and continue to the next iteration - receive_queue.task_done() - continue + now = time.time() - # Increment the counter + if len(batch) > 100 or now-start_time > 5: + await insert_data(batch=batch, error_queue=error_queue) + + receive_queue.task_done() counter += 1 - async def main(): # get kafka engine consumer = await kafka_consumer(topic="scraper", group="highscore-worker")