Skip to content

Commit

Permalink
Merge pull request #70 from citysciencelab/add-caching
Browse files Browse the repository at this point in the history
Add caching
  • Loading branch information
hwbllmnn authored Nov 4, 2024
2 parents 8b3e01c + fa37a76 commit 662c32e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 3 deletions.
24 changes: 24 additions & 0 deletions migrations/versions/1.0.11_add_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add hash
Revision ID: 1.0.11
Revises:
Create Date: 2024-10-02 14:00
"""

from alembic import op
from sqlalchemy import Column, String

revision = "1.0.11"
down_revision = "1.0.10"
branch_labels = "add_hash"
depends_on = "1.0.10"

def upgrade():
op.add_column('jobs', Column('hash', String(), index = True))
op.execute('create extension pgcrypto')
op.execute("update jobs set hash = encode(sha512((parameters :: text || process_version || user_id) :: bytea), 'base64')")

def downgrade():
op.drop_column('jobs', 'hash')
op.execute('drop extension pgcrypto')
3 changes: 3 additions & 0 deletions src/ump/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ modelserver-1:
anonymous-access: True
process-2
result-storage: "remote"
deterministic: True
process-3
exclude: True
Expand All @@ -38,6 +39,8 @@ For each process, it is possible to choose from result-storage options. If the a

Processes configured with `anonymous-access: True` can be seen and run by anonymous users. Jobs and layers created by anonymous users will be cleaned up after some time (this can be configured in `config.py`).

Process can be configured with `deterministic: True`. If so, jobs will be cached based on a hash of the input parameters, the process version and the user id.

## Keycloak

You can secure processes and model servers in keycloak by adding users to special client roles. In order to secure a specific process, create a role named `modelserver_processid`, in order to secure all processes of a model server just create a role named `modelserver`. The ids correspond to the keys used in the providers.yaml.
Expand Down
4 changes: 2 additions & 2 deletions src/ump/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def create(

query = """
INSERT INTO jobs
(job_id, remote_job_id, process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, user_id, process_title, name, process_version)
(job_id, remote_job_id, process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, user_id, process_title, name, process_version, hash)
VALUES
(%(job_id)s, %(remote_job_id)s, %(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(user_id)s, %(process_title)s, %(name)s, %(process_version)s)
(%(job_id)s, %(remote_job_id)s, %(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(user_id)s, %(process_title)s, %(name)s, %(process_version)s, encode(sha512((%(parameters)s :: json :: text || %(process_version)s || %(user_id)s) :: bytea), 'base64'))
"""
with DBHandler() as db:
db.run_query(query, query_params=self._to_dict())
Expand Down
33 changes: 32 additions & 1 deletion src/ump/api/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import aiohttp
from flask import g
from sqlalchemy import create_engine

import ump.api.providers as providers
import ump.config as config
Expand All @@ -16,6 +17,7 @@

logging.basicConfig(level=logging.INFO)

engine = create_engine("postgresql+psycopg2://postgres:postgres@postgis/cut_dev")

class Process:
def __init__(self, process_id_with_prefix=None):
Expand All @@ -25,6 +27,7 @@ def __init__(self, process_id_with_prefix=None):

self.process_id_with_prefix = process_id_with_prefix
self.process_title = None
self.version = None

match = re.search(r"([^:]+):(.*)", self.process_id_with_prefix)
if not match:
Expand Down Expand Up @@ -193,6 +196,29 @@ def is_required(self, parameter_metadata):

return False

def check_for_cache(self, parameters, user_id):
"""
Checks if the job has already been executed. Returns the job id if it has, None otherwise.
"""
p = providers.PROVIDERS[self.provider_prefix]['processes'][self.process_id]
if 'deterministic' not in p or not p['deterministic']:
return None
sql = """
select job_id from jobs where hash = encode(sha512((%(parameters)s :: json :: text || %(process_version)s || %(user_id)s) :: bytea), 'base64')
"""
with engine.begin() as conn:
result = conn.exec_driver_sql(
sql,
{
"parameters": json.dumps(parameters),
"process_version": self.version,
"user_id": user_id,
},
)
for row in result:
return row.job_id
return None

def execute(self, parameters, user):
p = providers.PROVIDERS[self.provider_prefix]

Expand All @@ -209,7 +235,7 @@ def execute(self, parameters, user):

job = asyncio.run(self.start_process_execution(parameters, user))

_process = dummy.Process(target=self._wait_for_results_async, args=([job]))
_process = dummy.Process(target=self._wait_for_results_async, args=[job])
_process.start()

result = {"jobID": job.job_id, "status": job.status}
Expand All @@ -225,6 +251,11 @@ async def start_process_execution(self, request_body, user):
# extract job_name from request_body
name = request_body.pop("job_name")

job_id = self.check_for_cache(request_body, user)
if job_id:
logging.info('Job found, returning cached job.')
return Job(job_id, user)

try:
auth = providers.authenticate_provider(p)

Expand Down
1 change: 1 addition & 0 deletions src/ump/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

@app.before_request
def check_jwt():
"""Decodes the JWT token before each request is handled"""
auth = request.authorization
if auth is not None:
decoded = keycloak_openid.decode_token(auth.token)
Expand Down

0 comments on commit 662c32e

Please sign in to comment.