From 2f4102426822190640084e21ac00e5e8b3ac292c Mon Sep 17 00:00:00 2001 From: sgrtye Date: Mon, 6 Jan 2025 02:53:19 +0000 Subject: [PATCH] move to async --- apiserver/main.py | 148 +++++++++++++++++++++---------------- apiserver/requirements.txt | 6 +- 2 files changed, 87 insertions(+), 67 deletions(-) diff --git a/apiserver/main.py b/apiserver/main.py index 6507662..71c8c36 100644 --- a/apiserver/main.py +++ b/apiserver/main.py @@ -5,13 +5,14 @@ import signal import random import pandas +import asyncio import yfinance import requests -import schedule import datetime -import threading -import http.server -import socketserver +from fastapi import FastAPI +import aioschedule as schedule +from uvicorn import Config, Server +from fastapi.responses import JSONResponse XUI_URL: str | None = os.environ.get("XUI_URL") XUI_USERNAME: str | None = os.environ.get("XUI_USERNAME") @@ -29,7 +30,7 @@ commodity_status: dict[str, str] = dict() UPDATE_INTERVAL: int = 12 -TREND_ENDING = "_TREND" +TREND_ENDING: str = "_TREND" STOCKS: str = "AAPL GOOG NVDA TSLA" INDICES: str = "^IXIC ^GSPC 000001.SS" @@ -59,42 +60,52 @@ last_updated_time: float = time.time() xui_rate_limit_time: float = time.time() +app = FastAPI() +NO_CACHE_HEADER = { + "Content-Type": "application/json", + "Cache-Control": "no-store, no-cache, must-revalidate, max-age=0", + "Pragma": "no-cache", + "Expires": "0", +} + + +@app.get("/health") +async def health_endpoint(): + if time.time() - last_updated_time < (UPDATE_INTERVAL + 1) * 60: + return JSONResponse( + content={"message": "OK"}, status_code=200, headers=NO_CACHE_HEADER + ) + else: + return JSONResponse( + content={"message": "Yahoo finance info not up to date"}, + status_code=500, + headers=NO_CACHE_HEADER, + ) + + +@app.get("/xui") +async def xui_endpoint(): + await update_xui_status() + return JSONResponse( + content=xui_status, + headers=NO_CACHE_HEADER, + ) -class apiHandler(http.server.BaseHTTPRequestHandler): - def log_message(self, format, *args) -> None: - # Override the log_message method to do nothing - pass - def do_GET(self) -> None: - if self.path == "/xui": - update_xui_status() - response = json.dumps(xui_status) - elif self.path == "/capital": - response = json.dumps(stock_status | index_status) - elif self.path == "/exchange": - response = json.dumps(crypto_status | currency_status | commodity_status) - elif self.path == "/health": - if time.time() - last_updated_time > (UPDATE_INTERVAL + 1) * 60: - self.send_response(500) - else: - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - return - else: - message = {"message": "Not Found"} - response = json.dumps(message) - - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - - self.wfile.write(response.encode("utf-8")) - - -def start_api_server() -> None: - with socketserver.TCPServer(("0.0.0.0", 80), apiHandler) as httpd: - httpd.serve_forever() +@app.get("/capital") +async def capital_endpoint(): + return JSONResponse( + content=stock_status | index_status, + headers=NO_CACHE_HEADER, + ) + + +@app.get("/exchange") +async def exchange_endpoint(): + return JSONResponse( + content=crypto_status | currency_status | commodity_status, + headers=NO_CACHE_HEADER, + ) def load_cache(cache_path: str, symbols: str) -> dict[str, str]: @@ -140,11 +151,11 @@ def bytes_to_speed(bytes: int, decimal_place: int = 2) -> str: return format_bytes(bytes, decimal_place) + "/s" -def xui_login() -> None: +async def xui_login() -> None: global xui_rate_limit_time if (sleep_time := 5 - (time.time() - xui_rate_limit_time)) > 0: - time.sleep(sleep_time) + await asyncio.sleep(sleep_time) xui_rate_limit_time = time.time() xui_session.post( @@ -153,16 +164,16 @@ def xui_login() -> None: ) -def get_xui_info(path_suffix: str) -> dict: +async def get_xui_info(path_suffix: str) -> dict: while (info := xui_session.post(XUI_URL + path_suffix)).status_code != 200: - threading.Thread(target=xui_login, daemon=True).start() + await xui_login() return info.json() -def get_xui_status() -> dict[str, str]: - status: dict = get_xui_info("/server/status") - online: dict[str] = get_xui_info("/xui/inbound/onlines") +async def get_xui_status() -> dict[str, str]: + status: dict = await get_xui_info("/server/status") + online: dict[str] = await get_xui_info("/xui/inbound/onlines") online_count: int = len(online["obj"]) if online["obj"] else 0 online_name: str = random.choice(online["obj"]) if online_count > 0 else "-" @@ -177,7 +188,7 @@ def get_xui_status() -> dict[str, str]: return info -def update_xui_status() -> None: +async def update_xui_status() -> None: global xui_status xui_status = { "speed": -1, @@ -186,7 +197,7 @@ def update_xui_status() -> None: } try: - xui_status.update(get_xui_status()) + xui_status.update(await get_xui_status()) except Exception: pass @@ -242,21 +253,12 @@ def save_status() -> None: json.dump(MAPPING[symbols][0], file) -def handle_sigterm(signum, frame) -> None: - save_status() - print( - datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "All status saved before exiting", - ) - raise SystemExit(0) - - -def main() -> None: - load_all_cache() - signal.signal(signal.SIGTERM, handle_sigterm) +async def start_api_server(): + config = Config(app=app, host="127.0.0.1", port=80) + await Server(config).serve() - threading.Thread(target=start_api_server, daemon=True).start() +async def update_finance_status() -> None: schedule.every().hour.at(f":{str(UPDATE_INTERVAL * 0).zfill(2)}").do( update_status, symbols=STOCKS ) @@ -274,12 +276,28 @@ def main() -> None: ) schedule.every().day.at("10:24").do(save_status) + while True: + await schedule.run_pending() + await asyncio.sleep(60) + + +def handle_sigterm(signum, frame) -> None: + save_status() + print( + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "All status saved before exiting", + ) + raise SystemExit(0) + + +async def main() -> None: + load_all_cache() + signal.signal(signal.SIGTERM, handle_sigterm) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "API server started") - while True: - schedule.run_pending() - time.sleep(60) + await asyncio.gather(start_api_server(), update_finance_status()) if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/apiserver/requirements.txt b/apiserver/requirements.txt index eaf3d1a..60fa238 100644 --- a/apiserver/requirements.txt +++ b/apiserver/requirements.txt @@ -1,4 +1,6 @@ pandas +fastapi +uvicorn requests -schedule -yfinance \ No newline at end of file +yfinance +aioschedule \ No newline at end of file