Skip to content

Commit

Permalink
Merge pull request #28 from citysciencelab/small_fixes
Browse files Browse the repository at this point in the history
Small fixes
  • Loading branch information
StefanSchuhart authored Aug 22, 2024
2 parents 93b6a31 + 13d3c73 commit d76340e
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 50 deletions.
13 changes: 9 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ RUN groupadd --gid $USER_GID $USERNAME && \
useradd --create-home --no-log-init --gid $USER_GID --uid $USER_UID --shell /bin/bash $USERNAME && \
chown -R $USERNAME:$USERNAME /home/$USERNAME /usr/local/lib /usr/local/bin

USER $USERNAME
WORKDIR /home/$USERNAME

ENV VIRTUAL_ENV=/home/.venv \
PATH="/home/$USERNAME/.venv/bin:$PATH"
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"

COPY --from=base \
--chmod=0755 \
--chown=$USERNAME:$USERNAME \
/app/.venv ./.venv
/app/.venv /app/.venv

ENTRYPOINT [".venv/python", "-u"]
COPY scripts/entrypoint.sh entrypoint.sh

EXPOSE 5000

ENTRYPOINT [ "/home/pythonuser/entrypoint.sh" ]
6 changes: 3 additions & 3 deletions docker-compose-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ services:
- path: .env
required: true # default
ports:
- ${WEBAPP_PORT_EXTERNAL}:5001
command: ['src/app.py']
- ${WEBAPP_PORT_EXTERNAL}:5000
networks:
- dev

volumes:
- ./providers.yaml:/home/pythonuser/providers.yaml

postgis:
image: postgis/postgis:14-3.3
Expand Down
29 changes: 25 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions providers.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ geopandas = "^1.0.1"
geoalchemy2 = "^0.15.2"
apiflask = "^2.2.0"
python-dotenv = "^1.0.1"
gunicorn = "^23.0.0"
pyyaml = "^6.0.2"
flask-migrate = "^4.0.7"
python-keycloak = "^4.3.0"

Expand Down
16 changes: 6 additions & 10 deletions scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ if [ -z "$CORS_URL_REGEX" ]; then
exit 1
fi

if [ $FLASK_DEBUG == 1 ]; then
echo "Running API Server in debug mode."
python main.py
else
echo "Running API Server in production mode."
NUMBER_OF_WORKERS="${NUMBER_OF_WORKERS:-1}"
echo "Running gunicorn with ${NUMBER_OF_WORKERS} workers."
export PATH=$PATH:/home/python/.local/bin
gunicorn --workers=$NUMBER_OF_WORKERS --bind=0.0.0.0:5001 main:app
fi
echo "Running API Server in production mode."
NUMBER_OF_WORKERS="${NUMBER_OF_WORKERS:-1}"
echo "Running gunicorn with ${NUMBER_OF_WORKERS} workers."
# export PATH=$PATH:/home/python/.local/bin
exec gunicorn --workers=$NUMBER_OF_WORKERS --bind=0.0.0.0:5000 ump.main:app

54 changes: 36 additions & 18 deletions src/ump/api/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@
from multiprocessing import dummy

import aiohttp
import yaml

import ump.api.providers as providers
import ump.config as config
from ump.api.job import Job, JobStatus
from ump.errors import CustomException, InvalidUsage
from ump.geoserver.geoserver import Geoserver

logging.basicConfig(level=logging.INFO)


class Process:
def __init__(self, process_id_with_prefix=None):

self.inputs: dict
self.outputs: dict

self.process_id_with_prefix = process_id_with_prefix

match = re.search(r"(.*):(.*)", self.process_id_with_prefix)
Expand Down Expand Up @@ -169,11 +171,14 @@ def execute(self, parameters, user):
_process = dummy.Process(target=self._wait_for_results_async, args=([job]))
_process.start()

result = {"job_id": job.job_id, "status": job.status}
result = {"jobID": job.job_id, "status": job.status}
return result

async def start_process_execution(self, params, user):
params["mode"] = "async"
async def start_process_execution(self, request_body, user):
# execution mode:
# to maintain backwards compatibility to models using
# pre-1.0.0 versions of OGC api processes
request_body["mode"] = "async"
p = providers.PROVIDERS[self.provider_prefix]

try:
Expand All @@ -182,27 +187,34 @@ async def start_process_execution(self, params, user):
async with aiohttp.ClientSession() as session:
response = await session.post(
f"{p['url']}/processes/{self.process_id}/execution",
json=params,
json=request_body,
auth=auth,
headers={
"Content-type": "application/json",
"Accept": "application/json",
# execution mode shall be async, if model supports it
"Prefer": "respond-async"
},
)

response.raise_for_status()

if response.ok and response.headers:
# Retrieve the job id from the simulation model server from the location header:
match = re.search("http.*/jobs/(.*)$", response.headers["location"])
# Retrieve the job id from the simulation model server from the
# location header:
match = re.search(
"http.*/jobs/(.*)$", response.headers["location"]
) or (
re.search('.*/jobs/(.*)$', response.headers["location"])
)
if match:
remote_job_id = match.group(1)

job = Job()
job.create(
remote_job_id=remote_job_id,
process_id_with_prefix=self.process_id_with_prefix,
parameters=params,
parameters=request_body,
user=user
)
job.started = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
Expand All @@ -218,10 +230,10 @@ async def start_process_execution(self, params, user):
except Exception as e:
raise CustomException(f"Job could not be started remotely: {e}")

def _wait_for_results_async(self, job):
def _wait_for_results_async(self, job: Job):
asyncio.run(self._wait_for_results(job))

async def _wait_for_results(self, job):
async def _wait_for_results(self, job: Job):

logging.info(" --> Waiting for results in Thread")

Expand All @@ -237,25 +249,25 @@ async def _wait_for_results(self, job):
async with aiohttp.ClientSession() as session:

auth = providers.authenticate_provider(p)

response = await session.get(
async with session.get(
f"{p['url']}/jobs/{job.remote_job_id}",
auth=auth,
headers={
"Content-type": "application/json",
"Accept": "application/json",
},
)
) as response:

response.raise_for_status()

job_details = await response.json()
response.raise_for_status()
job_details: dict = await response.json()

finished = self.is_finished(job_details)

logging.info(" --> Current Job status: " + str(job_details))

job.progress = job_details["progress"]
# either remote job has progress info or else we cannot provide it either
job.progress = job_details.get("progress")

job.updated = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
job.save()

Expand Down Expand Up @@ -340,6 +352,12 @@ def to_dict(self):
process_dict.pop("process_id")
process_dict.pop("provider_prefix")
process_dict["id"] = process_dict.pop("process_id_with_prefix")

# delete all keys containing None
for key,value in list(process_dict.items()):
if value is None:
process_dict.pop(key)

return process_dict

def to_json(self):
Expand Down

0 comments on commit d76340e

Please sign in to comment.