From f3e30ca4aa2f0ed9f9c6f6605c364dc36b877642 Mon Sep 17 00:00:00 2001 From: Jat Date: Fri, 10 Feb 2023 12:48:53 +0800 Subject: [PATCH] update ReturnCode Signed-off-by: Jat --- python/fate_flow/apps/client/data_app.py | 6 +- python/fate_flow/apps/client/job_app.py | 10 +-- python/fate_flow/apps/client/output_app.py | 16 ++--- python/fate_flow/apps/client/site_app.py | 6 +- python/fate_flow/apps/partner/partner_app.py | 52 +++++++------- python/fate_flow/apps/worker/worker_app.py | 10 +-- python/fate_flow/controller/job_controller.py | 2 +- python/fate_flow/entity/__init__.py | 1 - python/fate_flow/entity/types.py | 67 ++++++------------- .../scheduler/federated_scheduler.py | 5 +- python/fate_flow/scheduler/job_scheduler.py | 14 ++-- python/fate_flow/scheduler/task_scheduler.py | 4 +- python/fate_flow/utils/api_utils.py | 11 ++- 13 files changed, 89 insertions(+), 115 deletions(-) diff --git a/python/fate_flow/apps/client/data_app.py b/python/fate_flow/apps/client/data_app.py index 12be05b35..516ad261a 100644 --- a/python/fate_flow/apps/client/data_app.py +++ b/python/fate_flow/apps/client/data_app.py @@ -15,7 +15,7 @@ # from webargs import fields -from fate_flow.entity.types import Code +from fate_flow.entity.types import ReturnCode from fate_flow.utils.api_utils import get_json_result, validate_request_json, validate_request_params from fate_flow.utils.data_upload import Upload, UploadParam @@ -30,10 +30,10 @@ def upload_data(file, head, partitions, namespace, name, meta, destroy=False, storage_engine=""): data = Upload().run(parameters=UploadParam(file=file, head=head, partitions=partitions, namespace=namespace, name=name, storage_engine=storage_engine, meta=meta, destroy=destroy)) - return get_json_result(code=Code.SUCCESS, message="success", data=data) + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data=data) @manager.route('/download', methods=['GET']) @validate_request_params(name=fields.String(required=True), namespace=fields.String(required=True)) def download(name, namespace): - return get_json_result(code=Code.SUCCESS, message="success") \ No newline at end of file + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success") diff --git a/python/fate_flow/apps/client/job_app.py b/python/fate_flow/apps/client/job_app.py index ea98fe1c5..505fcf529 100644 --- a/python/fate_flow/apps/client/job_app.py +++ b/python/fate_flow/apps/client/job_app.py @@ -33,8 +33,8 @@ def submit_job(dag_schema): def query_job(job_id=None, role=None, party_id=None, status=None): jobs = JobController.query_job(job_id=job_id, role=role, party_id=party_id, status=status) if not jobs: - return get_json_result(code=ReturnCode.JOB.NO_FOUND, message="no found job") - return get_json_result(code=ReturnCode.JOB.SUCCESS, message="success", + return get_json_result(code=ReturnCode.Job.NOT_FOUND, message="job no found") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data=[job.to_human_model_dict() for job in jobs]) @@ -47,8 +47,8 @@ def query_task(job_id=None, role=None, party_id=None, status=None, task_name=Non tasks = JobController.query_tasks(job_id=job_id, role=role, party_id=party_id, status=status, task_name=task_name, task_id=task_id, task_version=task_version) if not tasks: - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") - return get_json_result(code=ReturnCode.TASK.SUCCESS, message="success", + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task no found") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data=[task.to_human_model_dict() for task in tasks]) @@ -64,6 +64,6 @@ def request_stop_job(job_id=None): def request_rerun_job(job_id=None): jobs = JobController.query_job(job_id=job_id) if not jobs: - return get_json_result(code=ReturnCode.JOB.NO_FOUND, message="no found job") + return get_json_result(code=ReturnCode.Job.NOT_FOUND, message="job not found") rerun_result = JobController.request_rerun_job(job=jobs[0]) return get_json_result(**rerun_result) diff --git a/python/fate_flow/apps/client/output_app.py b/python/fate_flow/apps/client/output_app.py index 220ceb02d..54eb815a6 100644 --- a/python/fate_flow/apps/client/output_app.py +++ b/python/fate_flow/apps/client/output_app.py @@ -15,11 +15,11 @@ # from webargs import fields -from fate_flow.entity.types import ReturnCode, Code +from fate_flow.entity.types import ReturnCode from fate_flow.manager.model_manager import PipelinedModel from fate_flow.manager.output_manager import OutputMetric from fate_flow.operation.job_saver import JobSaver -from fate_flow.utils.api_utils import get_json_result, validate_request_json, validate_request_params +from fate_flow.utils.api_utils import get_json_result, validate_request_params @manager.route('/metric/key/query', methods=['GET']) @@ -28,10 +28,10 @@ def query_metric_key(job_id, role, party_id, task_name): tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, task_name=task_name) if not tasks: - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") metric_keys = OutputMetric(job_id=job_id, role=role, party_id=party_id, task_name=task_name, task_id=tasks[0].f_task_id, task_version=tasks[0].f_task_version).query_metric_keys() - return get_json_result(code=Code.SUCCESS, message='success', data=metric_keys) + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success', data=metric_keys) @manager.route('/metric/query', methods=['GET']) @@ -41,10 +41,10 @@ def query_metric_key(job_id, role, party_id, task_name): def query_metric(job_id, role, party_id, task_name, filters=None): tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, task_name=task_name) if not tasks: - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") metrics = OutputMetric(job_id=job_id, role=role, party_id=party_id, task_name=task_name, task_id=tasks[0].f_task_id, task_version=tasks[0].f_task_version).read_metrics(filters) - return get_json_result(code=Code.SUCCESS, message='success', data=metrics) + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success', data=metrics) @manager.route('/model/query', methods=['GET']) @@ -53,7 +53,7 @@ def query_metric(job_id, role, party_id, task_name, filters=None): def query_model(job_id, role, party_id, task_name): tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, task_name=task_name) if not tasks: - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") model_data, message = PipelinedModel(role=role, party_id=party_id, job_id=job_id).read_model_data(task_name) - return get_json_result(code=Code.SUCCESS, message=message, data=model_data) + return get_json_result(code=ReturnCode.Base.SUCCESS, message=message, data=model_data) diff --git a/python/fate_flow/apps/client/site_app.py b/python/fate_flow/apps/client/site_app.py index 1452b0b41..814a9a3cd 100644 --- a/python/fate_flow/apps/client/site_app.py +++ b/python/fate_flow/apps/client/site_app.py @@ -15,7 +15,7 @@ # from webargs import fields -from fate_flow.entity.types import Code, SiteCode, ReturnCode +from fate_flow.entity.types import ReturnCode from fate_flow.settings import PARTY_ID, IS_STANDALONE from fate_flow.utils.api_utils import get_json_result @@ -23,6 +23,6 @@ @manager.route('/info/query', methods=['GET']) def query_site_info(): if not IS_STANDALONE: - return get_json_result(code=ReturnCode.SITE.SUCCESS, message="success", data={"party_id": PARTY_ID}) + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data={"party_id": PARTY_ID}) else: - return get_json_result(code=ReturnCode.SITE.IS_STANDALONE, message="site is standalone") + return get_json_result(code=ReturnCode.Site.IS_STANDALONE, message="site is standalone") diff --git a/python/fate_flow/apps/partner/partner_app.py b/python/fate_flow/apps/partner/partner_app.py index 53a0d36e5..261521ad1 100644 --- a/python/fate_flow/apps/partner/partner_app.py +++ b/python/fate_flow/apps/partner/partner_app.py @@ -31,16 +31,16 @@ def partner_create_job(dag_schema, job_id, role, party_id): try: JobController.create_job(dag_schema, job_id, role, party_id) - return get_json_result(code=ReturnCode.JOB.SUCCESS, message="create job success") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="create job success") except RuntimeError as e: - return get_json_result(code=ReturnCode.JOB.CREATE_JOB_FAILED, message=str(e), data={"job_id": job_id}) + return get_json_result(code=ReturnCode.Job.CREATE_JOB_FAILED, message=str(e), data={"job_id": job_id}) @manager.route('/job/start', methods=['POST']) @job_request_json(extra_info=fields.Dict(required=False)) def start_job(job_id, role, party_id, extra_info=None): JobController.start_job(job_id=job_id, role=role, party_id=party_id, extra_info=extra_info) - return get_json_result(code=ReturnCode.JOB.SUCCESS, message="start job success") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="start job success") @manager.route('/job/status/update', methods=['POST']) @@ -53,9 +53,9 @@ def partner_job_status_update(job_id, role, party_id, status): "status": status } if JobController.update_job_status(job_info=job_info): - return get_json_result(code=ReturnCode.JOB.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.JOB.UPDATE_STATUS_FAILED, + return get_json_result(code=ReturnCode.Job.UPDATE_STATUS_FAILED, message="update job status does not take effect") @@ -70,15 +70,15 @@ def partner_job_update(job_id, role, party_id, progress): if progress: job_info.update({"progress": progress}) if JobController.update_job(job_info=job_info): - return get_json_result(code=ReturnCode.JOB.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.JOB.UPDATE_FAILED, message="update job does not take effect") + return get_json_result(code=ReturnCode.Job.UPDATE_FAILED, message="update job does not take effect") @manager.route('/job/pipeline/save', methods=['POST']) @job_request_json() def save_pipeline(job_id, role, party_id): - return get_json_result(code=ReturnCode.JOB.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') @manager.route('/job/resource/apply', methods=['POST']) @@ -86,9 +86,9 @@ def save_pipeline(job_id, role, party_id): def apply_resource(job_id, role, party_id): status = ResourceManager.apply_for_job_resource(job_id, role, party_id) if status: - return get_json_result(code=ReturnCode.JOB.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.JOB.APPLY_RESOURCE_FAILED, + return get_json_result(code=ReturnCode.Job.APPLY_RESOURCE_FAILED, message=f'apply for job {job_id} resource failed') @@ -97,9 +97,9 @@ def apply_resource(job_id, role, party_id): def return_resource(job_id, role, party_id): status = ResourceManager.return_job_resource(job_id=job_id, role=role, party_id=party_id) if status: - return get_json_result(ReturnCode.JOB.SUCCESS, message='success') + return get_json_result(ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.JOB.APPLY_RESOURCE_FAILED, + return get_json_result(code=ReturnCode.Job.APPLY_RESOURCE_FAILED, message=f'return for job {job_id} resource failed') @@ -107,7 +107,7 @@ def return_resource(job_id, role, party_id): @job_request_json() def stop_job(job_id, role, party_id): kill_status, kill_details = JobController.stop_jobs(job_id=job_id, role=role, party_id=party_id) - return get_json_result(code=ReturnCode.JOB.SUCCESS if kill_status else ReturnCode.JOB.KILL_FAILED, + return get_json_result(code=ReturnCode.Base.SUCCESS if kill_status else ReturnCode.Job.KILL_FAILED, message='success' if kill_status else 'failed', data=kill_details) @@ -118,9 +118,9 @@ def apply_task_resource(job_id, role, party_id, task_id, task_version): status = ResourceManager.apply_for_task_resource(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version) if status: - return get_json_result(code=ReturnCode.TASK.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.TASK.APPLY_RESOURCE_FAILED, + return get_json_result(code=ReturnCode.Task.APPLY_RESOURCE_FAILED, message=f'apply for task {job_id} resource failed') @@ -130,9 +130,9 @@ def return_task_resource(job_id, role, party_id, task_id, task_version): status = ResourceManager.return_task_resource(job_id=job_id, role=role, party_id=party_id, task_id=task_id, task_version=task_version) if status: - return get_json_result(ReturnCode.TASK.SUCCESS, message='success') + return get_json_result(ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.TASK.APPLY_RESOURCE_FAILED, + return get_json_result(code=ReturnCode.Task.APPLY_RESOURCE_FAILED, message=f'return for task {job_id} resource failed') @@ -140,9 +140,9 @@ def return_task_resource(job_id, role, party_id, task_id, task_version): @task_request_json() def start_task(job_id, role, party_id, task_id, task_version): if TaskController.start_task(job_id, role, party_id, task_id, task_version): - return get_json_result(code=ReturnCode.TASK.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: - return get_json_result(code=ReturnCode.TASK.START_FAILED, message='start task failed') + return get_json_result(code=ReturnCode.Task.START_FAILED, message='start task failed') @manager.route('/task/collect', methods=['POST']) @@ -151,9 +151,9 @@ def collect_task(job_id, role, party_id, task_id, task_version): task_info = TaskController.collect_task(job_id=job_id, task_id=task_id, task_version=task_version, role=role, party_id=party_id) if task_info: - return get_json_result(code=ReturnCode.TASK.SUCCESS, message="success", data=task_info) + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data=task_info) else: - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") @manager.route('/task/status/update', methods=['POST']) @@ -169,10 +169,10 @@ def task_status_update(job_id, role, party_id, task_id, task_version, status): "status": status }) if TaskController.update_task_status(task_info=task_info): - return get_json_result(code=ReturnCode.TASK.SUCCESS, message='success') + return get_json_result(code=ReturnCode.Base.SUCCESS, message='success') else: return get_json_result( - code=ReturnCode.TASK.UPDATE_STATUS_FAILED, + code=ReturnCode.Task.UPDATE_STATUS_FAILED, message="update job status does not take effect" ) @@ -186,7 +186,7 @@ def stop_task(job_id, role, party_id, task_id, task_version, status=None): kill_status = True for task in tasks: kill_status = kill_status & TaskController.stop_task(task=task, stop_status=status) - return get_json_result(code=ReturnCode.TASK.SUCCESS if kill_status else ReturnCode.TASK.KILL_FAILED, + return get_json_result(code=ReturnCode.Base.SUCCESS if kill_status else ReturnCode.Task.KILL_FAILED, message='success' if kill_status else 'failed') @@ -196,8 +196,8 @@ def rerun_task(job_id, role, party_id, task_id, task_version, new_version): tasks = JobSaver.query_task(job_id=job_id, task_id=task_id, role=role, party_id=party_id) if not tasks: return get_json_result( - code=ReturnCode.TASK.NO_FOUND, - message="no found task" + code=ReturnCode.Task.NOT_FOUND, + message="task not found" ) TaskController.create_new_version_task(task=tasks[0], new_version=new_version) return get_json_result() diff --git a/python/fate_flow/apps/worker/worker_app.py b/python/fate_flow/apps/worker/worker_app.py index 5fc6a13c8..8b410191d 100644 --- a/python/fate_flow/apps/worker/worker_app.py +++ b/python/fate_flow/apps/worker/worker_app.py @@ -45,7 +45,7 @@ def report_task_status(status, execution_id, error=None): task_info.update({"error_report": error}) TaskController.update_task(task_info) return get_json_result() - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") @manager.route('/task/status', methods=['GET']) @@ -56,8 +56,8 @@ def query_task_status(execution_id): task_info = { "status": tasks[0].f_status, } - return get_json_result(code=ReturnCode.TASK.SUCCESS, message="success", data=task_info) - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success", data=task_info) + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") @manager.route('/task/output/tracking', methods=['POST']) @@ -81,8 +81,8 @@ def log_output_artifacts(execution_id, meta_data, type, uri, output_key): "task_name": task.f_task_name } OutputDataTracking.create(data_info) - return get_json_result(code=ReturnCode.TASK.SUCCESS, message="success") - return get_json_result(code=ReturnCode.TASK.NO_FOUND, message="no found task") + return get_json_result(code=ReturnCode.Base.SUCCESS, message="success") + return get_json_result(code=ReturnCode.Task.NOT_FOUND, message="task not found") @manager.route('/task/model////////', methods=['POST']) diff --git a/python/fate_flow/controller/job_controller.py b/python/fate_flow/controller/job_controller.py index ee0a646be..4a4e054a9 100644 --- a/python/fate_flow/controller/job_controller.py +++ b/python/fate_flow/controller/job_controller.py @@ -46,7 +46,7 @@ def request_stop_job(cls, job_id): schedule_logger(job_id).info(f"stop job on this party") jobs = JobSaver.query_job(job_id=job_id) if not jobs: - return {"code": ReturnCode.JOB.NO_FOUND, "message": "no found job"} + return {"code": ReturnCode.Job.NOT_FOUND, "message": "job not found"} status = JobStatus.CANCELED kill_status, kill_details = JobController.stop_jobs(job_id=job_id, stop_status=status) schedule_logger(job_id).info(f"stop job on this party status {kill_status}") diff --git a/python/fate_flow/entity/__init__.py b/python/fate_flow/entity/__init__.py index 9e4f7e9c0..854b85271 100644 --- a/python/fate_flow/entity/__init__.py +++ b/python/fate_flow/entity/__init__.py @@ -14,4 +14,3 @@ # limitations under the License. # from ._base import BaseEntity, BaseModel -from .types import RetCode diff --git a/python/fate_flow/entity/types.py b/python/fate_flow/entity/types.py index 0ae73a458..58c64cb52 100644 --- a/python/fate_flow/entity/types.py +++ b/python/fate_flow/entity/types.py @@ -84,50 +84,27 @@ class Stage(object): DEFAULT = "default" -class RetCode(IntEnum, CustomEnum): - SUCCESS = 0 - NOT_EFFECTIVE = 10 - EXCEPTION_ERROR = 100 - ARGUMENT_ERROR = 101 - DATA_ERROR = 102 - OPERATING_ERROR = 103 - FEDERATED_ERROR = 104 - CONNECTION_ERROR = 105 - RUNNING = 106 - INCOMPATIBLE_FATE_VER = 107 - PERMISSION_ERROR = 108 - AUTHENTICATION_ERROR = 109 - SERVER_ERROR = 500 - - -class Code: - SUCCESS = 0 - - -class JobCode(Code): - NO_FOUND = 1000 - CREATE_JOB_FAILED = 1001 - UPDATE_STATUS_FAILED = 1002 - UPDATE_FAILED = 1003 - KILL_FAILED = 1004 - APPLY_RESOURCE_FAILED = 1005 - - -class TaskCode(Code): - NO_FOUND = 2000 - START_FAILED = 2001 - UPDATE_STATUS_FAILED = 2002 - UPDATE_FAILED = 2003 - KILL_FAILED = 2004 - APPLY_RESOURCE_FAILED = 2005 - - -class SiteCode(Code): - IS_STANDALONE = 3000 - - class ReturnCode: - JOB = JobCode - TASK = TaskCode - SITE = SiteCode + class Base: + SUCCESS = 0 + EXCEPTION_ERROR = 100 + + class Job: + NOT_FOUND = 1000 + CREATE_JOB_FAILED = 1001 + UPDATE_STATUS_FAILED = 1002 + UPDATE_FAILED = 1003 + KILL_FAILED = 1004 + APPLY_RESOURCE_FAILED = 1005 + + class Task: + NOT_FOUND = 2000 + START_FAILED = 2001 + UPDATE_STATUS_FAILED = 2002 + UPDATE_FAILED = 2003 + KILL_FAILED = 2004 + APPLY_RESOURCE_FAILED = 2005 + + class Site: + IS_STANDALONE = 3000 diff --git a/python/fate_flow/scheduler/federated_scheduler.py b/python/fate_flow/scheduler/federated_scheduler.py index c2780d615..26322a2e0 100644 --- a/python/fate_flow/scheduler/federated_scheduler.py +++ b/python/fate_flow/scheduler/federated_scheduler.py @@ -15,9 +15,8 @@ # from functools import wraps -from fate_flow.entity import RetCode from fate_flow.entity.run_status import FederatedSchedulingStatusCode -from fate_flow.entity.types import Code +from fate_flow.entity.types import ReturnCode from fate_flow.operation.job_saver import ScheduleJobSaver from fate_flow.runtime.runtime_config import RuntimeConfig from fate_flow.utils.log_utils import schedule_logger @@ -79,7 +78,7 @@ def return_federated_response(federated_response): for dest_role in federated_response.keys(): for party_id in federated_response[dest_role].keys(): retcode_set.add(federated_response[dest_role][party_id]["code"]) - if len(retcode_set) == 1 and Code.SUCCESS in retcode_set: + if len(retcode_set) == 1 and ReturnCode.Base.SUCCESS in retcode_set: federated_scheduling_status_code = FederatedSchedulingStatusCode.SUCCESS else: federated_scheduling_status_code = FederatedSchedulingStatusCode.FAILED diff --git a/python/fate_flow/scheduler/job_scheduler.py b/python/fate_flow/scheduler/job_scheduler.py index 3cbc61807..4a242a95c 100644 --- a/python/fate_flow/scheduler/job_scheduler.py +++ b/python/fate_flow/scheduler/job_scheduler.py @@ -77,13 +77,13 @@ def submit(cls, dag_schema: DAGSchema): ScheduleJobSaver.update_job_status({"job_id": job.f_job_id, "status": job.f_status}) schedule_logger(job_id).info(f"submit job successfully, job id is {job.f_job_id}") result = { - "code": ReturnCode.JOB.SUCCESS, + "code": ReturnCode.Base.SUCCESS, "message": "success" } submit_result.update(result) except Exception as e: schedule_logger(job_id).exception(e) - submit_result["code"] = ReturnCode.JOB.CREATE_JOB_FAILED + submit_result["code"] = ReturnCode.Job.CREATE_JOB_FAILED submit_result["message"] = exception_to_trace_string(e) return submit_result @@ -187,7 +187,7 @@ def apply_job_resource(cls, job): for dest_role in federated_response.keys(): for dest_party_id in federated_response[dest_role].keys(): retcode = federated_response[dest_role][dest_party_id]["code"] - if retcode == ReturnCode.JOB.SUCCESS: + if retcode == ReturnCode.Base.SUCCESS: rollback_party.append({"role": dest_role, "party_id": [dest_party_id]}) else: failed_party.append({"role": dest_role, "party_id": [dest_party_id]}) @@ -332,15 +332,15 @@ def stop_job(cls, job_id, stop_status): status_code, response = FederatedScheduler.stop_job(job_id=job_id, roles=job.f_parties) if status_code == FederatedSchedulingStatusCode.SUCCESS: schedule_logger(job_id).info(f"stop job with {stop_status} successfully") - return ReturnCode.JOB.SUCCESS, "success" + return ReturnCode.Base.SUCCESS, "success" else: tasks_group = ScheduleJobSaver.get_status_tasks_asc(job_id=job.f_job_id) for task in tasks_group.values(): TaskScheduler.collect_task_of_all_party(job, task=task, set_status=stop_status) schedule_logger(job_id).info(f"stop job with {stop_status} failed, {response}") - return ReturnCode.JOB.KILL_FAILED, json_dumps(response) + return ReturnCode.Job.KILL_FAILED, json_dumps(response) else: - return ReturnCode.JOB.NO_FOUND, "can not found job" + return ReturnCode.Job.NOT_FOUND, "job not found" @classmethod @DB.connection_context() @@ -394,4 +394,4 @@ def finish(cls, job, end_status): schedule_logger(job.f_job_id).info(f"job finished with {end_status}, do something...") cls.stop_job(job_id=job.f_job_id, stop_status=end_status) # todo: clean job - schedule_logger(job.f_job_id).info(f"job finished with {end_status}, done") \ No newline at end of file + schedule_logger(job.f_job_id).info(f"job finished with {end_status}, done") diff --git a/python/fate_flow/scheduler/task_scheduler.py b/python/fate_flow/scheduler/task_scheduler.py index efad8f47d..f5e597bce 100644 --- a/python/fate_flow/scheduler/task_scheduler.py +++ b/python/fate_flow/scheduler/task_scheduler.py @@ -128,7 +128,7 @@ def apply_task_resource(cls, task, job): for dest_role in federated_response.keys(): for dest_party_id in federated_response[dest_role].keys(): retcode = federated_response[dest_role][dest_party_id]["code"] - if retcode == ReturnCode.TASK.SUCCESS: + if retcode == ReturnCode.Base.SUCCESS: rollback_party.append({"role": dest_role, "party_id": [dest_party_id]}) else: failed_party.append({"role": dest_role, "party_id": [dest_party_id]}) @@ -157,7 +157,7 @@ def collect_task_of_all_party(cls, job, task, set_status=None): schedule_logger(job.f_job_id).warning(f"collect task {task.f_task_id} {task.f_task_version} failed") for _role in federated_response.keys(): for _party_id, party_response in federated_response[_role].items(): - if party_response["code"] == ReturnCode.TASK.SUCCESS: + if party_response["code"] == ReturnCode.Base.SUCCESS: ScheduleJobSaver.update_task_status(task_info=party_response["data"]) elif set_status: tmp_task_info = { diff --git a/python/fate_flow/utils/api_utils.py b/python/fate_flow/utils/api_utils.py index 5ef031e1c..f1ce5facd 100644 --- a/python/fate_flow/utils/api_utils.py +++ b/python/fate_flow/utils/api_utils.py @@ -23,11 +23,11 @@ from werkzeug.http import HTTP_STATUS_CODES from fate_flow.entity.engine_types import CoordinationProxyService -from fate_flow.entity.types import RetCode, CoordinationCommunicationProtocol, FederatedMode +from fate_flow.entity.types import CoordinationCommunicationProtocol, FederatedMode, ReturnCode from fate_flow.settings import stat_logger, PROXY_NAME, ENGINES, PROXY, HOST, HTTP_PORT -def get_json_result(code=RetCode.SUCCESS, message='success', data=None, job_id=None, meta=None): +def get_json_result(code=ReturnCode.Base.SUCCESS, message='success', data=None, job_id=None, meta=None): result_dict = { "code": code, "message": message, @@ -46,15 +46,14 @@ def get_json_result(code=RetCode.SUCCESS, message='success', data=None, job_id=N def server_error_response(e): stat_logger.exception(e) if len(e.args) > 1: - return get_json_result(code=RetCode.EXCEPTION_ERROR, message=repr(e.args[0]), data=e.args[1]) - return get_json_result(code=RetCode.EXCEPTION_ERROR, message=repr(e)) + return get_json_result(code=ReturnCode.Base.EXCEPTION_ERROR, message=repr(e.args[0]), data=e.args[1]) + return get_json_result(code=ReturnCode.Base.EXCEPTION_ERROR, message=repr(e)) def args_error_response(e): stat_logger.exception(e) messages = e.data.get("messages", {}) - return get_json_result(code=RetCode.EXCEPTION_ERROR, message="Invalid request.", - data=messages) + return get_json_result(code=ReturnCode.Base.EXCEPTION_ERROR, message="Invalid request.", data=messages) def error_response(response_code, retmsg=None):