Skip to content

Commit

Permalink
Merge pull request #8 from neogeo-technologies/develop
Browse files Browse the repository at this point in the history
Merge branch develop (21-01-13)
  • Loading branch information
m431m authored Jan 13, 2021
2 parents 8605324 + 225f2fd commit 3b820e0
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 105 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
__pycache__
frontend/app.wsgi
# Vim temp files
*swp
*swo
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,5 @@ de données vecteur et rasteur. Ils sont prévus pour fonctionner nominalement
avec la configuration "Hors Docker", avec au minimum le frontend lancé au prélable.
Le backend peut être lancé ultérieurement.

Certains tests ont besoin d'avoir un accès à un base de données "autotest" avec l'extension Postgis, sur la machine en
local. L'utilisateur courant doit pouvoir s'y connecter sans mot de passe.
60 changes: 33 additions & 27 deletions celery/extractions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import os.path
import tempfile
import shutil
import logging
import sys
import time
import requests
from osgeo import gdal, ogr, osr
from zipfile import ZipFile
from celery import Task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from functools import wraps

# use billiard instead of standard multiprocessing to avoid
Expand All @@ -25,7 +25,7 @@
sys.path.append("/common")
from common import taskmanager, service_conf

logger = logging.getLogger("worker")
logger = get_task_logger(__name__)

env = os.environ

Expand Down Expand Up @@ -73,7 +73,7 @@ def mark_has_stopped_and_raise_ignore(self):
"query": self.params,
},
)
logging.info("Task has been stopped")
logger.info("Task has been stopped")
raise Ignore()

def check_if_stop_requested_and_report_progress(self, progress_pct=None):
Expand Down Expand Up @@ -110,7 +110,7 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
"end_datetime": self.end_datetime,
"query": kwargs["params"],
}
logging.error("Failure occured: " + str(meta))
logger.error("Failure occured: " + str(meta))
# Change state to FAILED instead of FAILURE, because there are issues
# on the frontend side with the deserialization of exception. And
# we want to embed more state in the meta.
Expand All @@ -134,7 +134,7 @@ def decorated_function(self, *args, **kwargs):
extract_id = kwargs.get("extract_id")
subtasks_ids = kwargs.get("subtasks_ids")

logging.info(
logger.info(
"Receiving task_id %s, pid %d: %s, created at %s"
% (str(self.request.id), os.getpid(), str(params), datetime)
)
Expand Down Expand Up @@ -163,7 +163,7 @@ def decorated_function(self, *args, **kwargs):
if res is None:
res = {}

logging.info(
logger.info(
"Finished task_id %s, pid %d: %s, created at %s, result %s"
% (str(self.request.id), os.getpid(), str(params), datetime, str(res))
)
Expand Down Expand Up @@ -203,7 +203,7 @@ def _forked_process_gdal_callback(pct, msg, callback_data):
assert continue_process in (True, False)
if not continue_process:
callback_data[1] = True
logging.info("Abortion requested")
logger.info("Abortion requested")
return 0
return 1

Expand All @@ -216,7 +216,7 @@ def _forked_process_decorator(f):
def decorated_function(process_func_args, callback, callback_data):

socket = callback_data[0]
logging.info("Forked process PID %d" % os.getpid())
logger.info("Forked process PID %d" % os.getpid())
try:
res = f(process_func_args, callback, callback_data)
stop_requested = callback_data[1]
Expand All @@ -227,17 +227,17 @@ def decorated_function(process_func_args, callback, callback_data):
socket.send(res)
socket.close()
if stop_requested:
logging.info("End of child on task cancellation")
logger.info("End of child on task cancellation")
else:
if "error" in res:
logging.info("End of child on handled error")
logger.info("End of child on handled error")
else:
logging.info("End of child on success")
logger.info("End of child on success")
except Exception as e:
res = {"error": str(e)}
socket.send(res)
socket.close()
logging.info("End of child on error")
logger.info("End of child on error")
raise

return decorated_function
Expand All @@ -249,7 +249,7 @@ def decorated_function(process_func_args, callback, callback_data):
args=(process_func_args, _forked_process_gdal_callback, callback_data),
)
p.start()
logging.info("Processing of task forked as PID %d from %d" % (p.pid, os.getpid()))
logger.info("Processing of task forked as PID %d from %d" % (p.pid, os.getpid()))
child_conn.close()
last_time = time.time()

Expand All @@ -262,12 +262,12 @@ def decorated_function(process_func_args, callback, callback_data):
# check for the task status. If STOP_REQUESTED, then force kill the
# child process
while not parent_conn.poll(5):
logging.info(
logger.info(
"Did not receive information from child in the last 5 seconds"
)
result = task.AsyncResult(task.request.id)
if result.state == "STOP_REQUESTED":
logging.error("STOP_REQUESTED: Hard kill child!")
logger.error("STOP_REQUESTED: Hard kill child!")
p.terminate()
task.mark_has_stopped_and_raise_ignore()

Expand All @@ -282,7 +282,7 @@ def decorated_function(process_func_args, callback, callback_data):

cur_time = time.time()
if cur_time - last_time > 5.0:
logging.info("Progress %f" % progress_pct)
logger.info("Progress %f" % progress_pct)
last_time = cur_time

if task.check_if_stop_requested_and_report_progress(
Expand All @@ -298,7 +298,7 @@ def decorated_function(process_func_args, callback, callback_data):
while True:
if not mp.active_children():
break
logging.info("Joining child...")
logger.info("Joining child...")
time.sleep(1)

parent_conn.close()
Expand Down Expand Up @@ -340,7 +340,7 @@ def process_raster(process_func_args, gdal_callback, gdal_callback_data):
(params, tmpdir) = process_func_args

if "simulate_stuck_process" in params:
logging.info("Simulating stucked proccess")
logger.info("Simulating stucked proccess")
time.sleep(100)

src_filename = params["source"]
Expand Down Expand Up @@ -515,7 +515,7 @@ def process_raster(process_func_args, gdal_callback, gdal_callback_data):
pct_max = 1.0

if can_warp_directly:
logging.info(
logger.info(
"Invoking gdalwarp %s %s %s" % (src_filename, out_filename, warp_options)
)
ret_ds = gdal.Warp(
Expand All @@ -529,16 +529,19 @@ def process_raster(process_func_args, gdal_callback, gdal_callback_data):
ret_ds = None
else:
tmp_vrt = out_filename + ".vrt"
logging.info(
logger.info(
"Invoking gdalwarp %s %s %s" % (src_filename, tmp_vrt, warp_options)
)
tmp_ds = gdal.Warp(tmp_vrt, src_ds, options=warp_options)
if tmp_ds is None:
return {"error": gdal.GetLastErrorMsg()}
err_msg = gdal.GetLastErrorMsg()
err_type = gdal.GetLastErrorType()
logger.error("%s: %s" % (err_type, err_msg))
return {"error": err_msg}
translate_options = "-of " + driver_name
for option in driver_options:
translate_options += " -co %s=%s" % (option, driver_options[option])
logging.info(
logger.info(
"Invoking gdal_translate %s %s %s"
% (tmp_vrt, out_filename, translate_options)
)
Expand Down Expand Up @@ -575,7 +578,7 @@ def process_raster(process_func_args, gdal_callback, gdal_callback_data):
ratio *= 2
ratios.append(ratio)
if len(ratios) > 0:
logging.info(
logger.info(
"Invoking gdaladdo -r %s %s %s"
% (method, out_filename, " ".join(str(r) for r in ratios))
)
Expand All @@ -589,7 +592,10 @@ def process_raster(process_func_args, gdal_callback, gdal_callback_data):
ds = None

if not success:
return {"error": gdal.GetLastErrorMsg()}
err_msg = gdal.GetLastErrorMsg()
err_type = gdal.GetLastErrorType()
logger.error("%s: %s" % (err_type, err_msg))
return {"error": err_msg}
else:
return {"success": True}

Expand All @@ -599,7 +605,7 @@ def process_vector(process_func_args, gdal_callback, gdal_callback_data):
(params, tmpdir) = process_func_args

if params.get("simulate_stuck_process"):
logging.info("Simulating stucked proccess")
logger.info("Simulating stucked proccess")
time.sleep(100)

source = params["source"]
Expand Down Expand Up @@ -744,7 +750,7 @@ def process_vector(process_func_args, gdal_callback, gdal_callback_data):
cbk = create_scaled_progress(
float(idx) / len(layers), float(idx + 1) / len(layers), gdal_callback
)
logging.info(
logger.info(
"Invoking ogr2ogr %s %s %s" % (out_filename, source, translate_options)
)
out_ds = gdal.VectorTranslate(
Expand Down Expand Up @@ -822,7 +828,7 @@ def fake_extraction(self, *args, **kwargs):

total_iters = 20
for i in range(total_iters):
logging.info("Step %d" % i)
logger.info("Step %d" % i)
if self.check_if_stop_requested_and_report_progress(
progress_pct=100.0 * i / total_iters
):
Expand Down
4 changes: 2 additions & 2 deletions celery/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
celery==4.2.1
redis==2.10.5
celery>=4.3.0
redis>=3.0.0
requests==2.20.0
14 changes: 7 additions & 7 deletions frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _(x):
# create the app:
app = Flask(__name__)

logger = logging.getLogger("app")
logger = logging.getLogger(__name__)

env = os.environ
DEBUG = env.get("DEBUG", "False")
Expand Down Expand Up @@ -273,11 +273,11 @@ def check_datasource(self):
# Currently we will process it as vector-only or raster-only depending
# on the presence of layer.
if self.extract_params.get("layer"):
logging.info(
logger.info(
"Source has both raster and vector. Processing it as vector due to 'layer' being specified"
)
else:
logging.info(
logger.info(
"Source has both raster and vector. Processing it as vector due to 'layer' being absent"
)
self.is_raster = not self.has_vector or (self.has_raster and "layer" not in self.extract_params)
Expand Down Expand Up @@ -351,7 +351,7 @@ def check_datasource(self):

def check_driver_options(self):
dst_format = self.extract_params["dst_format"]
dst_format_name = self.extract_params["dst_format"]["gdal_driver"]
dst_format_name = dst_format["gdal_driver"] if "gdal_driver" in dst_format else ""
source = self.extract_params["source"]

if not dst_format or not dst_format_name or not source:
Expand Down Expand Up @@ -386,7 +386,7 @@ def check_driver_options(self):
upper("MapInfo File"),
upper("GPKG"),
)
if self.ds.GetLayerCount() > 1 and not accept_several_out_layers:
if self.ds and self.ds.GetLayerCount() > 1 and not accept_several_out_layers:
param_error = build_missing_parameter_error_message("layer")
self.errors.append(param_error)
return
Expand Down Expand Up @@ -604,7 +604,7 @@ def get_task_info_for_data_extract(extract_param_dict, common_params_dict):
param_warnings.extend(param_checker.warnings)

for warning in param_warnings:
logging.info(warning)
logger.info(warning)

task_info = {
"worker_name": "idgo_extractor.extraction",
Expand Down Expand Up @@ -659,7 +659,7 @@ def jobs_get(task_id):
if res.state == "FAILURE":
resp["exception"] = str(info)
else:
logging.error("res.info is not a dict")
logger.error("res.info is not a dict")
else:
resp.update(info)
return make_json_response(resp, 200)
Expand Down
4 changes: 2 additions & 2 deletions resources/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Flask==1.0.2
celery==4.2.1
redis==2.10.5
celery>=4.3.0
redis>=3.0.0
requests==2.20.0
Loading

0 comments on commit 3b820e0

Please sign in to comment.