diff --git a/backend/app/config.py b/backend/app/config.py index 7bd6c5d..22782b6 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,10 +1,13 @@ +from logging.handlers import TimedRotatingFileHandler + from pydantic_settings import BaseSettings, SettingsConfigDict from functools import lru_cache import os from typing import ClassVar - +import logging class Settings(BaseSettings): + SECRET_KEY: str ALGORITHM: str ACCESS_TOKEN_EXPIRE_MINUTES: int @@ -26,6 +29,32 @@ class Settings(BaseSettings): env_file=env_file, _env_file_encoding="utf-8", extra="allow" ) +def setup_logging(): + # 设置日志文件名,包含日期 + log_name = "coin" + # # 创建logger对象。传入logger名字 + # logger = logging.getLogger(log_name) + log_path = os.path.join("./data_logs/", log_name) + # 创建一个 TimedRotatingFileHandler,每天生成一个新日志文件,保留最近7天的日志文件 + handler = TimedRotatingFileHandler( + log_path, + when="D", # "D" 表示每天轮转 + interval=1, # 每 1 天轮转一次 + backupCount=7, # 仅保留最近 7 天的日志 + encoding="utf-8" + ) + + # 设置日志格式 + handler.suffix = "%Y-%m-%d.log" + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + # formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s - [Class: %(module)s, Function: %(funcName)s, Line: %(lineno)d]') + handler.setFormatter(formatter) + # 配置日志 + logging.basicConfig(level=logging.INFO, handlers=[handler]) + + # 设置 watchfiles 的日志级别为 WARNING,以屏蔽 INFO 日志 + logging.getLogger('watchfiles').setLevel(logging.WARNING) + return logging @lru_cache def get_settings(): @@ -33,3 +62,4 @@ def get_settings(): settings = get_settings() +logging = setup_logging() \ No newline at end of file diff --git a/backend/app/database.py b/backend/app/database.py index 84a402e..af740fb 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -2,13 +2,17 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from .config import settings - +from sqlalchemy.orm import scoped_session engine = create_engine( - settings.SQLALCHEMY_DATABASE_URL + settings.SQLALCHEMY_DATABASE_URL, + pool_pre_ping=True, # 启用预检查 + pool_recycle=3600, + pool_size=10, + max_overflow=20 ) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - +# SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine) +SessionLocal = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) Base = declarative_base() diff --git a/backend/get_repo_oauth.py b/backend/get_repo_oauth.py index 164efab..3edcd22 100644 --- a/backend/get_repo_oauth.py +++ b/backend/get_repo_oauth.py @@ -14,7 +14,19 @@ "Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json", } - +import time +import logging +from datetime import datetime +# log_filename = f"./data_logs/log_{datetime.now().strftime('%Y-%m-%d')}.txt" +# +# logging.basicConfig( +# filename=log_filename, # 日志文件名包含日期 +# level=logging.INFO, +# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +# encoding='utf-8' # 支持中文 +# ) +# # 设置 watchfiles 的日志级别为 WARNING,以屏蔽 INFO 日志 +# logging.getLogger('watchfiles').setLevel(logging.WARNING) # 获取用户的所有仓库 def get_repositories(): @@ -97,20 +109,31 @@ def save_info_to_json(info_dict, filename): json.dump(info_dict, f, ensure_ascii=False, indent=4) -# Helper function to get all pages of results + def get_all_pages(url,page=1): - headers = {"Authorization": f"token {token}"} + headers = {"Authorization": f"token {token}",'User-Agent':'insomnia/10.1.0'} items = [] index = 0 - while url and index < page: - response = requests.get(url, headers=headers) - if response.status_code == 200: - items.extend(response.json()) - # Check if there is a 'next' page - url = response.links.get("next", {}).get("url") - index +=1 - else: - break + try: + while url and index < page: + start_time = time.time() + response = requests.get(url, headers=headers) + end_time = time.time() # 结束计时 + elapsed_time = end_time - start_time + logging.info(f"Query completed in {elapsed_time:.2f} seconds.") + if response.status_code == 200: + items.extend(response.json()) + # Check if there is a 'next' page + url = response.links.get("next", {}).get("url") + index +=1 + else: + break + except(Exception) as e: + logging.error(e) + logging.error(url) + # print("exceptions:", e) + # print(url) + # print(headers) return items diff --git a/backend/insert_repo_user.py b/backend/insert_repo_user.py index b93c8fc..cb23164 100644 --- a/backend/insert_repo_user.py +++ b/backend/insert_repo_user.py @@ -16,7 +16,10 @@ ) # 创建引擎并连接到数据库 -engine = create_engine(DATABASE_URL) +engine = create_engine(DATABASE_URL, + # 添加以下两项配置解决MySQL server has gone away问题 + pool_pre_ping=True, # 启用预检查 + pool_recycle=3600) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 初始化密码加密上下文,bcrypt diff --git a/backend/main.py b/backend/main.py index 28e293c..847df87 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,16 +1,14 @@ +import os import uvicorn -import logging from fastapi import FastAPI -from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware -from fastapi import BackgroundTasks from app.routers import users from app.routers import coin from app.routers import item -from app.routers import update_data from app.config import settings from app.database import engine -import datetime +from datetime import datetime +import logging app = FastAPI() origins = [ @@ -44,44 +42,15 @@ def read_root(): from apscheduler.schedulers.background import BackgroundScheduler -from typing import List, Dict -import requests -import threading -import asyncio - # 初始化APScheduler scheduler = BackgroundScheduler() scheduler.start() # 任务函数,用于定时抓取GitHub issues -from update_repo import update_repo -from update_repo import update_repo_test -import time - +from update_repo import update_repo, update_repo_by_multithreaded -# 初始化APScheduler -scheduler = BackgroundScheduler() - -import logging -from datetime import datetime - -# 获取当前日期作为文件名的一部分 -log_filename = f"./data_logs/log_{datetime.now().strftime('%Y-%m-%d')}.txt" - -logging.basicConfig( - filename=log_filename, # 日志文件名包含日期 - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - encoding='utf-8' # 支持中文 -) -# 设置 watchfiles 的日志级别为 WARNING,以屏蔽 INFO 日志 -logging.getLogger('watchfiles').setLevel(logging.WARNING) - -# 启动调度器 -scheduler.start() - # 为定时任务分配一个唯一的ID job_id = "update_repo_job" @@ -91,10 +60,9 @@ async def start_scheduled_update(): 启动定时任务,定期更新 GitHub 仓库的 issues。 """ job = scheduler.get_job(job_id) - if job is None: new_job = scheduler.add_job( - update_repo, "interval", hours=12, minutes=0, seconds=0, id=job_id + update_repo_by_multithreaded, "interval", hours=1, minutes=0, seconds=0, id=job_id ) next_run_time = new_job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if new_job.next_run_time else "Unknown" logging.info(f"Scheduled update started. Next run time: {next_run_time}.") @@ -139,12 +107,13 @@ async def stop_scheduled_update(): @app.get("/execute-update") -async def execute_update(): +def execute_update(): """ 手动执行数据更新任务,将数据写入数据库,并通知前端刷新。记录当前时间。 """ try: - update_repo() + # update_repo() + update_repo_by_multithreaded() current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') logging.info(f"Executed manual update successfully at {current_time}.") return { @@ -158,4 +127,4 @@ async def execute_update(): "message": f"An error occurred while executing the update: {str(e)}", } if __name__ == "__main__": - uvicorn.run("main:app", host=settings.HOST, port=settings.PORT, reload=True) + uvicorn.run("main:app", host=settings.HOST, port=settings.PORT, reload=False,workers=1) diff --git a/backend/update_repo.py b/backend/update_repo.py index 7e4a500..c54a6ec 100644 --- a/backend/update_repo.py +++ b/backend/update_repo.py @@ -1,30 +1,19 @@ -import time - from app.database import Base, engine, SessionLocal from app.core.models.coin import Apply from app.core.models.users import Users from datetime import datetime from get_repo_oauth import get_all_repos, get_all_issues_and_prs -from apscheduler.schedulers.background import BackgroundScheduler -import logging -# from app.config import settings import os from get_repo_oauth import save_info_to_json import json -from app.config import settings - -# from insert_repo_user import hashed_password from passlib.context import CryptContext import time - - - - +from concurrent.futures import ThreadPoolExecutor import asyncio from sqlalchemy.ext.asyncio import AsyncSession import aiofiles - +import logging # 初始化密码加密上下文,bcrypt pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @@ -85,7 +74,7 @@ def update_repo(): repos = json.load(f) # 获取每个仓库的所有issues和PRs - #TODO: 停止更新 需要改为异步数据库插入操作 + # TODO: 停止更新 需要改为异步数据库插入操作 if repos != "Error: Unable to fetch repositories": logging.info(f"lth repos {len(repos)}") for index, repo in enumerate(repos[:]): @@ -93,15 +82,14 @@ def update_repo(): repo_name = repo["name"] logging.info(f"{index} repo: {repo_name}") print(f"{index} repo: {repo_name}") - total_issues = get_all_issues_and_prs(username, repo_name,page=1) + total_issues = get_all_issues_and_prs(username, repo_name, page=1) logging.info("total_issues count: %d", len(total_issues)) for issue in total_issues[:]: url = issue["html_url"] all_issues.add(url) - logging.info(f"{url}") issue_data = db.query(Apply).filter(Apply.url == url).first() if issue_data: - logging.info(f"已存在") + logging.info(f"已存在 {url}") else: # 获取数据表Users的数量 user_count = db.query(Users).count() @@ -256,10 +244,147 @@ async def update_repo1(): await save_list_to_json(all_issues, "./github_data/all_issues.json") +def process_repo(repo, username, all_issues, db): + try: + db = SessionLocal() + repo_name = repo["name"] + logging.info(f"Processing repo: {repo_name}") + + total_issues = get_all_issues_and_prs(username, repo_name) + logging.info("total_issues count: %d", len(total_issues)) + + for issue in total_issues[:]: + url = issue["html_url"] + all_issues.add(url) + + start_time_query = time.time() + issue_data = db.query(Apply).filter(Apply.url == url).first() + logging.debug(f"DB query time: {time.time() - start_time_query:.2f} seconds") + + if issue_data: + logging.info(f"已存在 {url}") + else: + user_count = db.query(Users).count() + phone_number = 15812340000 + user_count - 2 + logging.info(f"phone_number {phone_number}") + + try: + start_time_user_query = time.time() + user_id = db.query(Users).filter_by(username=issue["user"]["login"]).first() + logging.debug(f"DB query time for user: {time.time() - start_time_user_query:.2f} seconds") + + if not user_id: + new_user = Users( + username=issue["user"]["login"], + phone=f"{phone_number}", + github=issue["user"]["html_url"], + role="developer", + email=f'{issue["user"]["login"]}@example.com', + password=hashed_password, + desc=f"我为项目{repo_name}贡献了一个issue", + register_time=datetime.now(), + last_login_time=datetime.now(), + ) + logging.info('db.add new_user') + db.add(new_user) + db.commit() + logging.info(f"Inserted new user: {new_user.username}") + user_id = new_user.id + else: + user_id = user_id.id + + logging.info(f"user id {user_id}") + repo_apply = Apply( + user_id=user_id, + repo=repo_name, + role="developer", + repo_owner_name=username, + user_name=issue["user"]["login"], + pid=issue["number"], + title=issue["title"], + url=url, + content=issue["body"], + state=issue["state"], + record_time=datetime.strptime(issue["created_at"][:-1], "%Y-%m-%dT%H:%M:%S"), + ) + logging.info('db.add repo_apply') + db.add(repo_apply) + except Exception as e: + logging.error(f"error: {e}") + pass + + start_time_commit = time.time() + logging.info('process_repo finished') + db.commit() + logging.debug(f"DB commit time: {time.time() - start_time_commit:.2f} seconds") + except(Exception) as e: + logging.info("this is process_repo error") + logging.error(e) + finally: + # logging.info("db.close") + db.close() + + +def update_repo_by_multithreaded(): + start_time = time.time() # Start time for the total execution time + + username = "datawhalechina" + Base.metadata.create_all(engine) + db = SessionLocal() + + if os.path.exists("./github_data/all_issues.json"): + with open("./github_data/all_issues.json", "r", encoding="utf-8") as f: + all_issues = set(json.load(f)) + else: + all_issues = set() + + # 获取所有仓库信息 + if not os.path.exists("./github_data/all_repos.json"): + repos = get_all_repos("datawhalechina") + save_info_to_json(repos, "./github_data/all_repos.json") + else: + with open("./github_data/all_repos.json", "r", encoding="utf-8") as f: + repos = json.load(f) + + # 获取每个仓库的所有issues和PRs + if repos != "Error: Unable to fetch repositories": + logging.info(f"lth repos {len(repos)}") + + with ThreadPoolExecutor(4) as executor: + futures = [] + for repo in repos[:]: + futures.append(executor.submit(process_repo, repo, username, all_issues, db)) + + # Wait for all threads to complete + for future in futures: + future.result() + + start_time_save = time.time() + save_list_to_json(all_issues, "./github_data/all_issues.json") + logging.info(f"save_list_to_json time: {time.time() - start_time_save:.2f} seconds") + + db.close() + + # Calculate total execution time + total_time = time.time() - start_time + logging.info(f"Total execution time: {total_time:.2f} seconds") + if __name__ == "__main__": print("update repo begin") - update_repo() + # 获取当前日期作为文件名的一部分 + log_filename = f"./data_logs/log_{datetime.now().strftime('%Y-%m-%d')}.txt" + + logging.basicConfig( + filename=log_filename, # 日志文件名包含日期 + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + encoding='utf-8' # 支持中文 + ) + # 设置 watchfiles 的日志级别为 WARNING,以屏蔽 INFO 日志 + logging.getLogger('watchfiles').setLevel(logging.WARNING) + update_repo_by_multithreaded() + # update_repo() # scheduler = BackgroundScheduler() # scheduler.add_job( # update_repo,