Skip to content

Commit

Permalink
Merge pull request #23 from tikazyq/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Marvin Zhang authored Apr 23, 2019
2 parents 4d70543 + ae29708 commit 8b7066d
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 91 deletions.
18 changes: 9 additions & 9 deletions crawlab/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ def update_nodes_status_online(event):
recv.capture(limit=None, timeout=None, wakeup=True)


if __name__ == '__main__':
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)
# run scheduler as a separate process
scheduler.run()

# run scheduler as a separate process
scheduler.run()
# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()

# monitor node status
p_monitor = Process(target=monitor_nodes_status, args=(celery_app,))
p_monitor.start()
# create folder if it does not exist
if not os.path.exists(PROJECT_LOGS_FOLDER):
os.makedirs(PROJECT_LOGS_FOLDER)

if __name__ == '__main__':
# run app instance
app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True)
10 changes: 10 additions & 0 deletions crawlab/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# encoding: utf-8

import os

run_env = os.environ.get("RUNENV", "local")

if run_env == "local": # 加载本地配置
from config.config_local import *
else:
from config.config import *
34 changes: 28 additions & 6 deletions crawlab/config.py → crawlab/config/config.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,55 @@
# project variables
import os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# 爬虫源码路径
PROJECT_SOURCE_FILE_FOLDER = '../spiders'
PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders")

# 配置python虚拟环境的路径
PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python'

# 爬虫部署路径
PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'

# 爬虫日志路径
PROJECT_LOGS_FOLDER = '../deployfile/logs'

# 打包临时文件夹
PROJECT_TMP_FOLDER = '/tmp'

# celery variables
# Celery中间者URL
BROKER_URL = 'redis://127.0.0.1:6379/0'

# Celery后台URL
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/'

# Celery MongoDB设置
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}

# Celery时区
CELERY_TIMEZONE = 'Asia/Shanghai'

# 是否启用UTC
CELERY_ENABLE_UTC = True

# Celery Scheduler Redis URL
CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'

# flower variables
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'

# database variables
# MongoDB 变量
MONGO_HOST = '127.0.0.1'
MONGO_PORT = 27017
MONGO_DB = 'crawlab_test'

# flask variables
# Flask 变量
DEBUG = True
FLASK_HOST = '127.0.0.1'
FLASK_PORT = 8000
55 changes: 55 additions & 0 deletions crawlab/config/config_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# 爬虫源码路径
PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders")

# 配置python虚拟环境的路径
PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python'

# 爬虫部署路径
# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile'
PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab'

# 爬虫日志路径
PROJECT_LOGS_FOLDER = '../deployfile/logs'

# 打包临时文件夹
PROJECT_TMP_FOLDER = '/tmp'

# Celery中间者URL
BROKER_URL = 'redis://127.0.0.1:6379/0'

# Celery后台URL
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/'

# Celery MongoDB设置
CELERY_MONGODB_BACKEND_SETTINGS = {
'database': 'crawlab_test',
'taskmeta_collection': 'tasks_celery',
}

# Celery时区
CELERY_TIMEZONE = 'Asia/Shanghai'

# 是否启用UTC
CELERY_ENABLE_UTC = True

# Celery Scheduler Redis URL
CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'

# flower variables
FLOWER_API_ENDPOINT = 'http://localhost:5555/api'

# MongoDB 变量
MONGO_HOST = '127.0.0.1'
MONGO_PORT = 27017
MONGO_DB = 'crawlab_test'

# Flask 变量
DEBUG = True
FLASK_HOST = '127.0.0.1'
FLASK_PORT = 8000
6 changes: 5 additions & 1 deletion crawlab/db/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def save(self, col_name: str, item: dict, **kwargs) -> None:
if item.get('stats') is not None:
item.pop('stats')

col.save(item, **kwargs)
return col.save(item, **kwargs)

def remove(self, col_name: str, cond: dict, **kwargs) -> None:
"""
Expand Down Expand Up @@ -175,5 +175,9 @@ def aggregate(self, col_name: str, pipelines, **kwargs):
col = self.db[col_name]
return col.aggregate(pipelines, **kwargs)

def create_index(self, col_name: str, keys: dict, **kwargs):
col = self.db[col_name]
col.create_index(keys=keys, **kwargs)


db_manager = DbManager()
4 changes: 2 additions & 2 deletions crawlab/routes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self):
super(BaseApi).__init__()
self.parser.add_argument('page_num', type=int)
self.parser.add_argument('page_size', type=int)
self.parser.add_argument('filter', type=dict)
self.parser.add_argument('filter', type=str)

for arg, type in self.arguments:
self.parser.add_argument(arg, type=type)
Expand Down Expand Up @@ -109,7 +109,7 @@ def put(self) -> (dict, tuple):
item[k] = args.get(k)
item = db_manager.save(col_name=self.col_name, item=item)

self.after_update(item._id)
self.after_update()

return item

Expand Down
7 changes: 6 additions & 1 deletion crawlab/routes/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from constants.task import TaskStatus
from db.manager import db_manager
from routes.base import BaseApi
from tasks.scheduler import scheduler
from utils import jsonify
from utils.spider import get_spider_col_fields

Expand All @@ -16,5 +17,9 @@ class ScheduleApi(BaseApi):
('name', str),
('description', str),
('cron', str),
('spider_id', str)
('spider_id', str),
('params', str)
)

def after_update(self, id: str = None):
scheduler.update()
20 changes: 17 additions & 3 deletions crawlab/routes/spiders.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from utils import jsonify
from utils.deploy import zip_file, unzip_file
from utils.file import get_file_suffix_stats, get_file_suffix
from utils.spider import get_lang_by_stats
from utils.spider import get_lang_by_stats, get_last_n_run_errors_count, get_last_n_day_tasks_count

parser = reqparse.RequestParser()
parser.add_argument('file', type=FileStorage, location='files')
Expand Down Expand Up @@ -106,7 +106,7 @@ def get(self, id=None, action=None):
if spider is None:
stats = get_file_suffix_stats(dir_path)
lang = get_lang_by_stats(stats)
db_manager.save('spiders', {
spider = db_manager.save('spiders', {
'name': dir_name,
'src': dir_path,
'lang': lang,
Expand Down Expand Up @@ -137,6 +137,13 @@ def get(self, id=None, action=None):
'suffix_stats': stats,
})

# ---------
# stats
# ---------
# last 5-run errors
spider['last_5_errors'] = get_last_n_run_errors_count(spider_id=spider['_id'], n=5)
spider['last_7d_tasks'] = get_last_n_day_tasks_count(spider_id=spider['_id'], n=5)

# append spider
items.append(spider)

Expand Down Expand Up @@ -193,12 +200,19 @@ def on_crawl(self, id: str) -> (dict, tuple):
:param id: spider_id
:return:
"""
job = execute_spider.delay(id)
args = self.parser.parse_args()
params = args.get('params')

spider = db_manager.get('spiders', id=ObjectId(id))

job = execute_spider.delay(id, params)

# create a new task
db_manager.save('tasks', {
'_id': job.id,
'spider_id': ObjectId(id),
'cmd': spider.get('cmd'),
'params': params,
'create_ts': datetime.utcnow(),
'status': TaskStatus.PENDING
})
Expand Down
46 changes: 43 additions & 3 deletions crawlab/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,21 @@ def get(self, id: str = None, action: str = None):
elif id is not None:
task = db_manager.get(col_name=self.col_name, id=id)
spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))
task['spider_name'] = spider['name']

# spider
task['num_results'] = 0
if spider:
task['spider_name'] = spider['name']
if spider.get('col'):
col = spider.get('col')
num_results = db_manager.count(col, {'task_id': task['_id']})
task['num_results'] = num_results

# duration
if task.get('finish_ts') is not None:
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
task['avg_num_results'] = round(task['num_results'] / task['duration'], 1)

try:
with open(task['log_file_path']) as f:
task['log'] = f.read()
Expand All @@ -56,20 +68,48 @@ def get(self, id: str = None, action: str = None):
args = self.parser.parse_args()
page_size = args.get('page_size') or 10
page_num = args.get('page_num') or 1
tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1),
filter_str = args.get('filter')
filter_ = {}
if filter_str is not None:
filter_ = json.loads(filter_str)
if filter_.get('spider_id'):
filter_['spider_id'] = ObjectId(filter_['spider_id'])
tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1),
sort_key='create_ts')
items = []
for task in tasks:
# celery tasks
# _task = db_manager.get('tasks_celery', id=task['_id'])

# get spider
_spider = db_manager.get(col_name='spiders', id=str(task['spider_id']))

# status
if task.get('status') is None:
task['status'] = TaskStatus.UNAVAILABLE

# spider
task['num_results'] = 0
if _spider:
# spider name
task['spider_name'] = _spider['name']

# number of results
if _spider.get('col'):
col = _spider.get('col')
num_results = db_manager.count(col, {'task_id': task['_id']})
task['num_results'] = num_results

# duration
if task.get('finish_ts') is not None:
task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds()
task['avg_num_results'] = round(task['num_results'] / task['duration'], 1)

items.append(task)

return {
'status': 'ok',
'total_count': db_manager.count('tasks', {}),
'total_count': db_manager.count('tasks', filter_),
'page_num': page_num,
'page_size': page_size,
'items': jsonify(items)
Expand Down
Loading

0 comments on commit 8b7066d

Please sign in to comment.