Skip to content

Commit

Permalink
Merge pull request #54 from citysciencelab/fix-linting-errors
Browse files Browse the repository at this point in the history
fix: fix linting errors
  • Loading branch information
hwbllmnn authored Oct 10, 2024
2 parents def7faf + 37de9db commit d77eca4
Show file tree
Hide file tree
Showing 19 changed files with 1,147 additions and 842 deletions.
1,293 changes: 745 additions & 548 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pytest-xdist = ">=2.2.0,<3"
types-toml = ">=0.10.1,<1"
pre-commit = ">=3.4.0,<4"
debugpy = "^1.8.5"
flake8 = "^7.1.1"
pylint = "^3.3.1"

[tool.poetry.group.docs]
optional = true
Expand Down
3 changes: 2 additions & 1 deletion src/ump/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""urban-model-platform package.
server federation api, OGC Api Processes-based to connect model servers and centralize access to them
server federation api, OGC Api Processes-based to connect model servers and
centralize access to them
"""

from __future__ import annotations
Expand Down
114 changes: 63 additions & 51 deletions src/ump/api/db_handler.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,81 @@
import ump.config as config
import logging

import psycopg2 as db
from psycopg2.extras import RealDictCursor

import logging
from ump import config

logger = logging.getLogger(__name__)

class DBHandler():
def __init__(self):
self.connection = db.connect(
database = config.postgres_db,
host = config.postgres_host,
user = config.postgres_user,
password = config.postgres_password,
port = config.postgres_port
)
self.sortable_columns = []

def set_sortable_columns(self, sortable_columns):
self.sortable_columns = sortable_columns
def __init__(self):
self.connection = db.connect(
database = config.postgres_db,
host = config.postgres_host,
user = config.postgres_user,
password = config.postgres_password,
port = config.postgres_port
)
self.sortable_columns = []

def run_query(self, query, conditions=[], query_params={}, order=[], limit=None, page=None):
if conditions:
query += " WHERE " + " AND ".join(conditions)
def set_sortable_columns(self, sortable_columns):
self.sortable_columns = sortable_columns

if order and set(order).issubset(set(self.sortable_columns)):
query += f" ORDER BY {', '.join(order)} DESC"
elif order:
logging.debug(f" --> Could not order by {order} since sortable_columns hasn't been set! Please call set_sortable_columns!")
def run_query(
self,
query,
conditions=None,
query_params=None,
order=None,
limit=None,
page=None
):
if query_params is None:
query_params = {}
if conditions:
query += " WHERE " + " AND ".join(conditions)

if limit:
offset = 0
if page:
offset = (page - 1) * limit
if order and set(order).issubset(set(self.sortable_columns)):
query += f" ORDER BY {', '.join(order)} DESC"
elif order:
logging.debug(
" --> Could not order by %s since sortable_columns hasn't been set!" +
" Please call set_sortable_columns!",
order
)

query += " LIMIT %(limit)s OFFSET %(offset)s"
query_params['limit'] = limit
query_params['offset'] = offset
if limit:
offset = 0
if page:
offset = (page - 1) * limit

#logging.debug(f" --> SQL query = {query}")
#logging.debug(f" --> query_params = {query_params}")
query += " LIMIT %(limit)s OFFSET %(offset)s"
query_params['limit'] = limit
query_params['offset'] = offset

with self.connection:
with self.connection.cursor(cursor_factory = RealDictCursor) as cursor:
cursor.execute(query, query_params)
try:
results = cursor.fetchall()
except db.ProgrammingError as e:
if str(e) == "no results to fetch":
return
else:
raise e
with self.connection:
with self.connection.cursor(cursor_factory = RealDictCursor) as cursor:
cursor.execute(query, query_params)
try:
results = cursor.fetchall()
except db.ProgrammingError as e:
if str(e) == "no results to fetch":
return
else:
raise e

return results
return results

# needed so that this class can be used as a context manager
def __enter__(self):
return self
# needed so that this class can be used as a context manager
def __enter__(self):
return self

def __exit__(self, type, value, traceback):
if (self.connection):
self.connection.close()
def __exit__(self, exc_type, value, traceback):
if self.connection:
self.connection.close()

if type is None and value is None and traceback is None:
return True
if exc_type is None and value is None and traceback is None:
return True

logger.error(f"{type}: {value} - {traceback}")
return False
logger.error("%s: %s - %s", exc_type, value, traceback)
return False
6 changes: 5 additions & 1 deletion src/ump/api/ensemble.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Ensemble and related entities."""
from datetime import datetime, timezone
from typing import ClassVar

from sqlalchemy import BigInteger, DateTime, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, declarative_base
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from sqlalchemy_serializer import SerializerMixin

Base = declarative_base()

class JobsEnsembles(Base, SerializerMixin):
"""Entity linking jobs and ensembles."""
__tablename__ = 'jobs_ensembles'

id: Mapped[int] = mapped_column(primary_key=True)
Expand All @@ -29,6 +31,7 @@ class EnsemblesUsers(Base, SerializerMixin):
user_id: Mapped[str] = mapped_column(String())

class Ensemble(Base, SerializerMixin):
"""Ensemble entity"""
__tablename__ = "ensembles"

id: Mapped[int] = mapped_column(primary_key=True)
Expand Down Expand Up @@ -66,6 +69,7 @@ def _to_dict(self):
}

class Comment(Base, SerializerMixin):
"""Comments for ensembles"""
__tablename__ = "ensemble_comments"

id: Mapped[int] = mapped_column(primary_key=True)
Expand Down
51 changes: 34 additions & 17 deletions src/ump/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ def __init__(self, job_id=None, user=None):
self.name = None
self.process_title = None
self.process_version = None
self.remote_job_id = None
self.process_id_with_prefix = None
self.parameters = None
self.provider_prefix = None
self.process_id = None
self.provider_url = None

if job_id and not self._init_from_db(job_id, user):
raise CustomException(f"Job could not be found!")
raise CustomException("Job could not be found!")

def create(
self,
Expand All @@ -71,7 +77,7 @@ def create(
process_id_with_prefix=None,
process_title=None,
name=None,
parameters={},
parameters=None,
user=None,
process_version=None,
):
Expand Down Expand Up @@ -100,7 +106,7 @@ def create(
logging.error(self._to_dict())
db.run_query(query, query_params=self._to_dict())

logging.info(f" --> Job {self.job_id} for {self.process_id} created.")
logging.info(" --> Job %s for %s created.", self.job_id, self.process_id)

def _set_attributes(
self,
Expand All @@ -109,7 +115,7 @@ def _set_attributes(
process_id_with_prefix=None,
process_title=None,
name=None,
parameters={},
parameters=None,
user_id=None,
process_version=None,
):
Expand All @@ -134,7 +140,8 @@ def _set_attributes(
match = re.search(r"(.*):(.*)", self.process_id_with_prefix)
if not match:
raise InvalidUsage(
f"Process ID {self.process_id_with_prefix} is not known! Please check endpoint api/processes for a list of available processes."
f"Process ID {self.process_id_with_prefix} is not known! " +
"Please check endpoint api/processes for a list of available processes."
)

self.provider_prefix = match.group(1)
Expand All @@ -146,7 +153,7 @@ def _set_attributes(

def _init_from_db(self, job_id, user):
query = """
SELECT * FROM jobs j left join jobs_users u on j.job_id = u.job_id WHERE j.job_id = %(job_id)s
SELECT j.* FROM jobs j left join jobs_users u on j.job_id = u.job_id WHERE j.job_id = %(job_id)s
"""
if user is None:
query += " and j.user_id is null"
Expand All @@ -157,10 +164,10 @@ def _init_from_db(self, job_id, user):
job_details = db.run_query(query, query_params={"job_id": job_id})

if len(job_details) > 0:
logging.error(job_details[0])
self._init_from_dict(dict(job_details[0]))
return True
else:
return False
return False

def _init_from_dict(self, data):
self.job_id = data["job_id"]
Expand All @@ -182,6 +189,10 @@ def _init_from_dict(self, data):
self.process_title = data["process_title"]
self.name = data["name"]
self.process_version = data["process_version"]
logging.error('next')
logging.error(data)
logging.error(self._to_dict())
logging.error(self.display())

def _to_dict(self):
return {
Expand Down Expand Up @@ -232,14 +243,14 @@ def set_results_metadata(self, results_as_json):
values = []
for column in maximal_values_dict:

type = str(types[column])
if type == "float64" and results_df[column].apply(float.is_integer).all():
type = "int"
data_type = str(types[column])
if data_type == "float64" and results_df[column].apply(float.is_integer).all():
data_type = "int"

values.append(
{
column: {
"type": type,
"type": data_type,
"min": minimal_values_dict[column],
"max": maximal_values_dict[column],
}
Expand All @@ -257,7 +268,7 @@ def set_results_metadata(self, results_as_json):
}
)
except Exception as e:
logging.error(f"Unable to store column {column}, skipping: {e}")
logging.error("Unable to store column %s, skipping: %s", column, e)

self.results_metadata = {"values": values}

Expand Down Expand Up @@ -290,7 +301,8 @@ def display(self):
"rel": "service",
"type": "application/json",
"hreflang": "en",
"title": f"Results of job {self.job_id} as geojson - available when job is finished.",
"title": f"Results of job {self.job_id} as geojson" +
" - available when job is finished.",
}
]

Expand Down Expand Up @@ -322,7 +334,8 @@ async def results(self):
return await response.json()
else:
raise CustomException(
f"Could not retrieve results from model server {self.provider_url} - {response.status}: {response.reason}"
"Could not retrieve results from model server " +
f"{self.provider_url} - {response.status}: {response.reason}"
)

async def results_to_geoserver(self):
Expand All @@ -338,12 +351,16 @@ async def results_to_geoserver(self):
geoserver.save_results(job_id=self.job_id, data=results)

logging.info(
f" --> Successfully stored results for job {self.process_id_with_prefix} (={self.process_id})/{self.job_id} to geoserver."
" --> Successfully stored results for job %s (=%s)/%s to geoserver.",
self.process_id_with_prefix, self.process_id, self.job_id
)

except Exception as e:
logging.error(
f" --> Could not store results for job {self.process_id_with_prefix} (={self.process_id})/{self.job_id} to geoserver: {e}",
" --> Could not store results for job %s (=%s)/%s to geoserver: %s",
self.process_id_with_prefix,
self.process_id,
self.job_id,
e,
)

Expand Down
4 changes: 3 additions & 1 deletion src/ump/api/job_comments.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Comments for jobs."""
from datetime import datetime

from sqlalchemy import DateTime, String
from sqlalchemy.orm import Mapped, mapped_column, declarative_base
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from sqlalchemy_serializer import SerializerMixin

Base = declarative_base()

class JobComment(Base, SerializerMixin):
"""Comments for jobs."""
__tablename__ = "job_comments"

id: Mapped[int] = mapped_column(primary_key=True)
Expand Down
17 changes: 9 additions & 8 deletions src/ump/api/job_status.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from enum import Enum


class JobStatus(Enum):
"""
Enum for the job status options specified in the WPS 2.0 specification
"""
accepted = 'accepted'
running = 'running'
successful = 'successful'
failed = 'failed'
dismissed = 'dismissed'
"""
Enum for the job status options specified in the WPS 2.0 specification
"""
accepted = 'accepted'
running = 'running'
successful = 'successful'
failed = 'failed'
dismissed = 'dismissed'
Loading

0 comments on commit d77eca4

Please sign in to comment.