diff --git a/crawlab/app.py b/crawlab/app.py index b7a245b34..4eebc804d 100644 --- a/crawlab/app.py +++ b/crawlab/app.py @@ -88,17 +88,17 @@ def update_nodes_status_online(event): recv.capture(limit=None, timeout=None, wakeup=True) -if __name__ == '__main__': - # create folder if it does not exist - if not os.path.exists(PROJECT_LOGS_FOLDER): - os.makedirs(PROJECT_LOGS_FOLDER) +# run scheduler as a separate process +scheduler.run() - # run scheduler as a separate process - scheduler.run() +# monitor node status +p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) +p_monitor.start() - # monitor node status - p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) - p_monitor.start() +# create folder if it does not exist +if not os.path.exists(PROJECT_LOGS_FOLDER): + os.makedirs(PROJECT_LOGS_FOLDER) +if __name__ == '__main__': # run app instance app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True) diff --git a/crawlab/config/__init__.py b/crawlab/config/__init__.py new file mode 100644 index 000000000..609b69de0 --- /dev/null +++ b/crawlab/config/__init__.py @@ -0,0 +1,10 @@ +# encoding: utf-8 + +import os + +run_env = os.environ.get("RUNENV", "local") + +if run_env == "local": # 加载本地配置 + from config.config_local import * +else: + from config.config import * diff --git a/crawlab/config.py b/crawlab/config/config.py similarity index 51% rename from crawlab/config.py rename to crawlab/config/config.py index 4ede83b78..afbcb9bf8 100644 --- a/crawlab/config.py +++ b/crawlab/config/config.py @@ -1,33 +1,55 @@ -# project variables +import os + +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + # 爬虫源码路径 -PROJECT_SOURCE_FILE_FOLDER = '../spiders' +PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") + # 配置python虚拟环境的路径 PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' + # 爬虫部署路径 -PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' +# 爬虫日志路径 PROJECT_LOGS_FOLDER = '../deployfile/logs' + +# 打包临时文件夹 PROJECT_TMP_FOLDER = '/tmp' -# celery variables +# Celery中间者URL BROKER_URL = 'redis://127.0.0.1:6379/0' + +# Celery后台URL CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/' + +# Celery MongoDB设置 CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', } + +# Celery时区 CELERY_TIMEZONE = 'Asia/Shanghai' + +# 是否启用UTC CELERY_ENABLE_UTC = True +# Celery Scheduler Redis URL +CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler' +CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379' +CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks' + # flower variables FLOWER_API_ENDPOINT = 'http://localhost:5555/api' -# database variables +# MongoDB 变量 MONGO_HOST = '127.0.0.1' MONGO_PORT = 27017 MONGO_DB = 'crawlab_test' -# flask variables +# Flask 变量 DEBUG = True FLASK_HOST = '127.0.0.1' FLASK_PORT = 8000 diff --git a/crawlab/config/config_local.py b/crawlab/config/config_local.py new file mode 100644 index 000000000..afbcb9bf8 --- /dev/null +++ b/crawlab/config/config_local.py @@ -0,0 +1,55 @@ +import os + +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# 爬虫源码路径 +PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") + +# 配置python虚拟环境的路径 +PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' + +# 爬虫部署路径 +# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' + +# 爬虫日志路径 +PROJECT_LOGS_FOLDER = '../deployfile/logs' + +# 打包临时文件夹 +PROJECT_TMP_FOLDER = '/tmp' + +# Celery中间者URL +BROKER_URL = 'redis://127.0.0.1:6379/0' + +# Celery后台URL +CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/' + +# Celery MongoDB设置 +CELERY_MONGODB_BACKEND_SETTINGS = { + 'database': 'crawlab_test', + 'taskmeta_collection': 'tasks_celery', +} + +# Celery时区 +CELERY_TIMEZONE = 'Asia/Shanghai' + +# 是否启用UTC +CELERY_ENABLE_UTC = True + +# Celery Scheduler Redis URL +CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler' +CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379' +CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks' + +# flower variables +FLOWER_API_ENDPOINT = 'http://localhost:5555/api' + +# MongoDB 变量 +MONGO_HOST = '127.0.0.1' +MONGO_PORT = 27017 +MONGO_DB = 'crawlab_test' + +# Flask 变量 +DEBUG = True +FLASK_HOST = '127.0.0.1' +FLASK_PORT = 8000 diff --git a/crawlab/db/manager.py b/crawlab/db/manager.py index d210b81c4..71ea9b2b0 100644 --- a/crawlab/db/manager.py +++ b/crawlab/db/manager.py @@ -28,7 +28,7 @@ def save(self, col_name: str, item: dict, **kwargs) -> None: if item.get('stats') is not None: item.pop('stats') - col.save(item, **kwargs) + return col.save(item, **kwargs) def remove(self, col_name: str, cond: dict, **kwargs) -> None: """ @@ -175,5 +175,9 @@ def aggregate(self, col_name: str, pipelines, **kwargs): col = self.db[col_name] return col.aggregate(pipelines, **kwargs) + def create_index(self, col_name: str, keys: dict, **kwargs): + col = self.db[col_name] + col.create_index(keys=keys, **kwargs) + db_manager = DbManager() diff --git a/crawlab/routes/base.py b/crawlab/routes/base.py index 8a3d67090..1578b3f84 100644 --- a/crawlab/routes/base.py +++ b/crawlab/routes/base.py @@ -23,7 +23,7 @@ def __init__(self): super(BaseApi).__init__() self.parser.add_argument('page_num', type=int) self.parser.add_argument('page_size', type=int) - self.parser.add_argument('filter', type=dict) + self.parser.add_argument('filter', type=str) for arg, type in self.arguments: self.parser.add_argument(arg, type=type) @@ -109,7 +109,7 @@ def put(self) -> (dict, tuple): item[k] = args.get(k) item = db_manager.save(col_name=self.col_name, item=item) - self.after_update(item._id) + self.after_update() return item diff --git a/crawlab/routes/schedules.py b/crawlab/routes/schedules.py index f966e2cb9..01db8be11 100644 --- a/crawlab/routes/schedules.py +++ b/crawlab/routes/schedules.py @@ -5,6 +5,7 @@ from constants.task import TaskStatus from db.manager import db_manager from routes.base import BaseApi +from tasks.scheduler import scheduler from utils import jsonify from utils.spider import get_spider_col_fields @@ -16,5 +17,9 @@ class ScheduleApi(BaseApi): ('name', str), ('description', str), ('cron', str), - ('spider_id', str) + ('spider_id', str), + ('params', str) ) + + def after_update(self, id: str = None): + scheduler.update() diff --git a/crawlab/routes/spiders.py b/crawlab/routes/spiders.py index f36903e3f..157218ee5 100644 --- a/crawlab/routes/spiders.py +++ b/crawlab/routes/spiders.py @@ -21,7 +21,7 @@ from utils import jsonify from utils.deploy import zip_file, unzip_file from utils.file import get_file_suffix_stats, get_file_suffix -from utils.spider import get_lang_by_stats +from utils.spider import get_lang_by_stats, get_last_n_run_errors_count, get_last_n_day_tasks_count parser = reqparse.RequestParser() parser.add_argument('file', type=FileStorage, location='files') @@ -106,7 +106,7 @@ def get(self, id=None, action=None): if spider is None: stats = get_file_suffix_stats(dir_path) lang = get_lang_by_stats(stats) - db_manager.save('spiders', { + spider = db_manager.save('spiders', { 'name': dir_name, 'src': dir_path, 'lang': lang, @@ -137,6 +137,13 @@ def get(self, id=None, action=None): 'suffix_stats': stats, }) + # --------- + # stats + # --------- + # last 5-run errors + spider['last_5_errors'] = get_last_n_run_errors_count(spider_id=spider['_id'], n=5) + spider['last_7d_tasks'] = get_last_n_day_tasks_count(spider_id=spider['_id'], n=5) + # append spider items.append(spider) @@ -193,12 +200,19 @@ def on_crawl(self, id: str) -> (dict, tuple): :param id: spider_id :return: """ - job = execute_spider.delay(id) + args = self.parser.parse_args() + params = args.get('params') + + spider = db_manager.get('spiders', id=ObjectId(id)) + + job = execute_spider.delay(id, params) # create a new task db_manager.save('tasks', { '_id': job.id, 'spider_id': ObjectId(id), + 'cmd': spider.get('cmd'), + 'params': params, 'create_ts': datetime.utcnow(), 'status': TaskStatus.PENDING }) diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index 59e8469bf..2afb0cf96 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -42,9 +42,21 @@ def get(self, id: str = None, action: str = None): elif id is not None: task = db_manager.get(col_name=self.col_name, id=id) spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) - task['spider_name'] = spider['name'] + + # spider + task['num_results'] = 0 + if spider: + task['spider_name'] = spider['name'] + if spider.get('col'): + col = spider.get('col') + num_results = db_manager.count(col, {'task_id': task['_id']}) + task['num_results'] = num_results + + # duration if task.get('finish_ts') is not None: task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) + try: with open(task['log_file_path']) as f: task['log'] = f.read() @@ -56,20 +68,48 @@ def get(self, id: str = None, action: str = None): args = self.parser.parse_args() page_size = args.get('page_size') or 10 page_num = args.get('page_num') or 1 - tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1), + filter_str = args.get('filter') + filter_ = {} + if filter_str is not None: + filter_ = json.loads(filter_str) + if filter_.get('spider_id'): + filter_['spider_id'] = ObjectId(filter_['spider_id']) + tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts') items = [] for task in tasks: + # celery tasks # _task = db_manager.get('tasks_celery', id=task['_id']) + + # get spider _spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) + + # status if task.get('status') is None: task['status'] = TaskStatus.UNAVAILABLE + + # spider + task['num_results'] = 0 if _spider: + # spider name task['spider_name'] = _spider['name'] + + # number of results + if _spider.get('col'): + col = _spider.get('col') + num_results = db_manager.count(col, {'task_id': task['_id']}) + task['num_results'] = num_results + + # duration + if task.get('finish_ts') is not None: + task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) + items.append(task) + return { 'status': 'ok', - 'total_count': db_manager.count('tasks', {}), + 'total_count': db_manager.count('tasks', filter_), 'page_num': page_num, 'page_size': page_size, 'items': jsonify(items) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index da6303c9a..55e8fc36d 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -2,7 +2,6 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from pymongo import MongoClient -from flask import current_app from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT from constants.spider import CronEnabled @@ -11,37 +10,36 @@ class Scheduler(object): mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) + task_col = 'apscheduler_jobs' + # scheduler jobstore jobstores = { 'mongo': MongoDBJobStore(database=MONGO_DB, - collection='apscheduler_jobs', + collection=task_col, client=mongo) } + # scheduler instance scheduler = BackgroundScheduler(jobstores=jobstores) - def execute_spider(self, id: str): + def execute_spider(self, id: str, params: str = None): + query = {} + if params is not None: + query['params'] = params r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, FLASK_PORT, id - )) - - def restart(self): - self.scheduler.shutdown() - self.scheduler.start() - current_app.logger.info('restarted') + ), query) def update(self): - current_app.logger.info('updating...') - # remove all existing periodic jobs self.scheduler.remove_all_jobs() + self.mongo[MONGO_DB][self.task_col].remove() - # add new periodic jobs from database - spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON}) - for spider in spiders: - cron = spider.get('cron') + periodical_tasks = db_manager.list('schedules', {}) + for task in periodical_tasks: + cron = task.get('cron') cron_arr = cron.split(' ') second = cron_arr[0] minute = cron_arr[1] @@ -49,13 +47,17 @@ def update(self): day = cron_arr[3] month = cron_arr[4] day_of_week = cron_arr[5] - self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),), + self.scheduler.add_job(func=self.execute_spider, + args=(str(task['spider_id']), task.get('params'),), + trigger='cron', jobstore='mongo', - day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute, + day_of_week=day_of_week, + month=month, + day=day, + hour=hour, + minute=minute, second=second) - current_app.logger.info('updated') - def run(self): self.update() self.scheduler.start() diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py index 3413a021c..0d843e222 100644 --- a/crawlab/tasks/spider.py +++ b/crawlab/tasks/spider.py @@ -1,7 +1,10 @@ import os from datetime import datetime +from time import sleep from bson import ObjectId +from pymongo import ASCENDING, DESCENDING + from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, PYTHON_ENV_PATH from constants.task import TaskStatus from db.manager import db_manager @@ -10,8 +13,19 @@ from utils.log import other as logger +def get_task(id: str): + i = 0 + while i < 5: + task = db_manager.get('tasks', id=id) + if task is not None: + return task + i += 1 + sleep(1) + return None + + @celery_app.task(bind=True) -def execute_spider(self, id: str): +def execute_spider(self, id: str, params: str = None): """ Execute spider task. :param self: @@ -23,7 +37,15 @@ def execute_spider(self, id: str): command = spider.get('cmd') if command.startswith("env"): command = PYTHON_ENV_PATH + command.replace("env", "") + if params is not None: + command += ' ' + params + + # get task object and return if not found + task = get_task(task_id) + if task is None: + return + # current working directory current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) # log info @@ -43,7 +65,7 @@ def execute_spider(self, id: str): stdout = open(log_file_path, 'a') stderr = open(log_file_path, 'a') - # create a new task + # update task status as started db_manager.update_one('tasks', id=task_id, values={ 'start_ts': datetime.utcnow(), 'node_id': hostname, @@ -67,8 +89,13 @@ def execute_spider(self, id: str): if spider.get('col'): env['CRAWLAB_COLLECTION'] = spider.get('col') + # create index to speed results data retrieval + db_manager.create_index(spider.get('col'), [('task_id', ASCENDING)]) + # start process - p = subprocess.Popen(command.split(' '), + cmd_arr = command.split(' ') + cmd_arr = list(filter(lambda x: x != '', cmd_arr)) + p = subprocess.Popen(cmd_arr, stdout=stdout.fileno(), stderr=stderr.fileno(), cwd=current_working_directory, @@ -87,9 +114,6 @@ def execute_spider(self, id: str): # save task when the task is finished db_manager.update_one('tasks', id=task_id, values={ - 'node_id': hostname, - 'hostname': hostname, - 'log_file_path': log_file_path, 'finish_ts': datetime.utcnow(), 'status': status }) diff --git a/crawlab/utils/node.py b/crawlab/utils/node.py index 3a0b7b920..6e40bc2bf 100644 --- a/crawlab/utils/node.py +++ b/crawlab/utils/node.py @@ -24,7 +24,11 @@ def update_nodes_status(refresh=False): url = '%s/workers?status=1' % FLOWER_API_ENDPOINT if refresh: url += '&refresh=1' + res = requests.get(url) + if res.status_code != 200: + return online_node_ids + for k, v in json.loads(res.content.decode('utf-8')).items(): node_name = k node_status = NodeStatus.ONLINE if v else NodeStatus.OFFLINE diff --git a/crawlab/utils/spider.py b/crawlab/utils/spider.py index 0a45d28f8..6f7d4ef67 100644 --- a/crawlab/utils/spider.py +++ b/crawlab/utils/spider.py @@ -1,6 +1,10 @@ import os +from datetime import datetime, timedelta + +from bson import ObjectId from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE, SpiderType +from constants.task import TaskStatus from db.manager import db_manager @@ -43,3 +47,25 @@ def get_spider_col_fields(col_name: str) -> list: for k in item.keys(): fields.add(k) return list(fields) + + +def get_last_n_run_errors_count(spider_id: ObjectId, n: int) -> list: + tasks = db_manager.list(col_name='tasks', + cond={'spider_id': spider_id}, + sort_key='create_ts', + limit=n) + count = 0 + for task in tasks: + if task['status'] == TaskStatus.FAILURE: + count += 1 + return count + + +def get_last_n_day_tasks_count(spider_id: ObjectId, n: int) -> list: + return db_manager.count(col_name='tasks', + cond={ + 'spider_id': spider_id, + 'create_ts': { + '$gte': (datetime.now() - timedelta(n)) + } + }) diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index a02088ada..6bc1a1578 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -38,26 +38,26 @@ - - - - - - - - + + + + + + + + + + + + + + + + + + + + diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue index 8b6fdd16a..92ad58690 100644 --- a/frontend/src/components/InfoView/TaskInfoView.vue +++ b/frontend/src/components/InfoView/TaskInfoView.vue @@ -30,6 +30,13 @@ + + + + + + +
{{taskForm.log}} diff --git a/frontend/src/components/TableView/TaskTableView.vue b/frontend/src/components/TableView/TaskTableView.vue index 6325f8637..7faefbce2 100644 --- a/frontend/src/components/TableView/TaskTableView.vue +++ b/frontend/src/components/TableView/TaskTableView.vue @@ -43,6 +43,12 @@ import { export default { name: 'TaskTableView', + data () { + return { + // setInterval handle + handle: undefined + } + }, props: { title: String }, @@ -71,6 +77,14 @@ export default { this.$store.dispatch('node/getTaskList', this.$route.params.id) } } + }, + mounted () { + this.handle = setInterval(() => { + this.onRefresh() + }, 5000) + }, + destroyed () { + clearInterval(this.handle) } } diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 7e127114b..13baddc7c 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -86,6 +86,8 @@ export default { 'Variable': '变量', 'Value': '值', 'Add Environment Variables': '添加环境变量', + 'Last 7-Day Tasks': '最近7天任务数', + 'Last 5-Run Errors': '最近5次运行错误数', // 爬虫列表 'Name': '名称', @@ -101,6 +103,8 @@ export default { 'Finish Timestamp': '完成时间', 'Duration (sec)': '用时(秒)', 'Error Message': '错误信息', + 'Results Count': '结果数', + 'Average Results Count per Second': '抓取速度(个/秒)', // 任务列表 'Node': '节点', @@ -111,6 +115,12 @@ export default { // 部署 'Time': '时间', + // 定时任务 + 'Schedule Name': '定时任务名称', + 'Schedule Description': '定时任务描述', + 'Parameters': '参数', + 'Add Schedule': '添加定时任务', + // 文件 'Choose Folder': '选择文件', diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index c421a79b7..02a238411 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -9,6 +9,11 @@ const state = { taskResultsData: [], taskResultsColumns: [], taskResultsTotalCount: 0, + // filter + filter: { + node_id: '', + spider_id: '' + }, // pagination pageNum: 0, pageSize: 10, @@ -68,7 +73,11 @@ const actions = { getTaskList ({ state, commit }) { return request.get('/tasks', { page_num: state.pageNum, - page_size: state.pageSize + page_size: state.pageSize, + filter: { + node_id: state.filter.node_id || undefined, + spider_id: state.filter.spider_id || undefined + } }) .then(response => { commit('SET_TASK_LIST', response.data.items) diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 3b1e33070..7bcdcdf0b 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -31,6 +31,15 @@ + + + + + + @@ -130,6 +139,14 @@ export default { ]), filteredTableData () { return this.scheduleList + }, + spider () { + for (let i = 0; i < this.spiderList.length; i++) { + if (this.spiderList[i]._id === this.scheduleForm.spider_id) { + return this.spiderList[i] + } + } + return {} } }, methods: { diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 60785efe4..14a9ffea7 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -84,6 +84,17 @@ {{scope.row.lang}} + + + - +