Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download results with signed url's #63

Merged
merged 7 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions openeo_driver/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,14 @@ def __init__(self, process: str, parameter: str):
super().__init__(message=self.message.format(process=process, parameter=parameter))


class ResultLinkExpiredException(OpenEOApiException):
status_code = 410
code = 'ResultLinkExpired'
message = 'The link to the batch job result has expired. Please request the results again.'
_description = 'The signed URLs for batch job results have expired. Please send a request to `GET /jobs/{job_id}/results` to refresh the links.'
_tags = ['Batch Jobs']


class ServiceConfigUnsupportedException(OpenEOApiException):
status_code = 400
code = 'ServiceConfigUnsupported'
Expand Down
4 changes: 4 additions & 0 deletions openeo_driver/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from typing import Union

import gunicorn.app.base
Expand All @@ -21,6 +22,9 @@ def run(title: str, description: str, deploy_metadata: Union[dict, None], backen
app.config['OPENEO_DESCRIPTION'] = description
app.config['OPENEO_BACKEND_DEPLOY_METADATA'] = deploy_metadata
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 # bytes
app.config['SIGNED_URL'] = os.getenv('SIGNED_URL')
app.config['SIGNED_URL_SECRET'] = os.getenv('SIGNED_URL_SECRET')
app.config['SIGNED_URL_EXPIRATION'] = os.getenv('SIGNED_URL_EXPIRATION')

app.logger.info('App info logging enabled!')
app.logger.debug('App debug logging enabled!')
Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/specs/openeo-api/0.4
56 changes: 50 additions & 6 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import base64
import copy
import datetime
import functools
import logging
import os
import re
import uuid
import time
import typing
from collections import namedtuple, defaultdict
from typing import Callable, Tuple, List
import uuid
from hashlib import md5
from typing import Callable, Tuple, List, Union

import flask
import flask_cors
Expand All @@ -24,7 +28,8 @@
from openeo_driver.datacube import DriverDataCube
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.errors import OpenEOApiException, ProcessGraphMissingException, ServiceNotFoundException, \
FilePathInvalidException, ProcessGraphNotFoundException, FeatureUnsupportedException
FilePathInvalidException, ProcessGraphNotFoundException, FeatureUnsupportedException, CredentialsInvalidException, \
ResultLinkExpiredException
from openeo_driver.processes import DEFAULT_NAMESPACE
from openeo_driver.save_result import SaveResult, get_temp_file
from openeo_driver.users import HttpAuthHandler, User
Expand Down Expand Up @@ -591,13 +596,32 @@ def list_job_results(job_id, user: User):
job_info = backend_implementation.batch_jobs.get_job_info(job_id, user.user_id)
results = backend_implementation.batch_jobs.get_results(job_id=job_id, user_id=user.user_id)

def base64_user_id() -> str:
return base64.urlsafe_b64encode(user.user_id.encode()).decode()

def secure_token(filename, expires) -> str:
return _compute_secure_token(job_id, user.user_id, filename, expires)

def expiration_timestamp() -> Union[int, None]:
expiration = current_app.config.get('SIGNED_URL_EXPIRATION')
return time.time() + int(expiration) if expiration else None

def download_url(filename) -> str:
if smart_bool(current_app.config.get('SIGNED_URL')):
expires = expiration_timestamp()
return url_for('.download_job_result_signed', job_id=job_id, user_base64=base64_user_id(),
secure_key=secure_token(filename, expires), filename=filename,
expires=expires, _external=True)
else:
return url_for('.download_job_result', job_id=job_id, filename=filename, _external=True)

def asset_object(filename: str, asset_metadata: dict) -> dict:
bands = asset_metadata.get("bands")
nodata = asset_metadata.get("nodata")

return dict_no_none(**{
"title": asset_metadata.get("title",filename), #there has to be title
"href": url_for('.download_job_result', job_id=job_id, filename=filename, _external=True),
"href": download_url(filename),
"type": asset_metadata.get("media_type"),
"eo:bands": [dict_no_none(**{"name": band.name, "center_wavelength": band.wavelength_um})
for band in bands] if bands else None,
Expand Down Expand Up @@ -645,8 +669,7 @@ def asset_object(filename: str, asset_metadata: dict) -> dict:
else:
result = {
"links": [
{"href": url_for('.download_job_result', job_id=job_id, filename=filename, _external=True)}
for filename in results.keys()
{"href": download_url(filename)} for filename in results.keys()
]
}

Expand Down Expand Up @@ -702,6 +725,27 @@ def download_job_result(job_id, filename, user: User):
return send_from_directory(output_dir, filename, mimetype=results[filename].get("media_type"))


@api_endpoint
@openeo_bp.route('/jobs/<job_id>/results/<user_base64>/<secure_key>/<filename>', methods=['GET'])
def download_job_result_signed(job_id, user_base64, secure_key, filename):
expires = request.args.get('expires')
user_id = base64.urlsafe_b64decode(user_base64).decode()
if secure_key != _compute_secure_token(job_id, user_id, filename, expires):
raise CredentialsInvalidException()
if expires and int(expires) < time.time():
raise ResultLinkExpiredException()
results = backend_implementation.batch_jobs.get_results(job_id=job_id, user_id=user_id)
if filename not in results.keys():
raise FilePathInvalidException(str(filename) + ' not in ' + str(list(results.keys())))
output_dir = results[filename]["output_dir"]
return send_from_directory(output_dir, filename, mimetype=results[filename].get("media_type"))


def _compute_secure_token(job_id, user_id, filename, expiration_timestamp):
token_key = job_id + user_id + filename + str(expiration_timestamp) + current_app.config.get('SIGNED_URL_SECRET')
return md5(token_key.encode()).hexdigest()


@api_endpoint
@openeo_bp.route('/jobs/<job_id>/logs', methods=['GET'])
@auth_handler.requires_bearer_auth
Expand Down
168 changes: 168 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,138 @@ def test_get_job_results_100(self, api100):
'type': 'Feature'
}

@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#'})
def test_get_job_results_signed_040(self, api040):
with self._fresh_job_registry(next_job_id='job-370'):
dummy_backend.DummyBatchJobs._update_status(
job_id='07024ee9-7847-4b8a-b260-6c879a2b3cdc', user_id=TEST_USER, status='finished')
resp = api040.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
'links': [
{
'href': 'http://oeo.net/openeo/0.4.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/50afb0cad129e61d415278c4ffcd8a83/output.tiff'
}
]
}

@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#'})
def test_get_job_results_signed_100(self, api100):
with self._fresh_job_registry(next_job_id='job-372'):
dummy_backend.DummyBatchJobs._update_status(
job_id='07024ee9-7847-4b8a-b260-6c879a2b3cdc', user_id=TEST_USER, status='finished')
resp = api100.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
'assets': {
'output.tiff': {
'roles': ['data'],
'title': 'output.tiff',
'href': 'http://oeo.net/openeo/1.0.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/50afb0cad129e61d415278c4ffcd8a83/output.tiff',
'type': 'image/tiff; application=geotiff',
'eo:bands': [{
'name': 'NDVI',
'center_wavelength': 1.23
}],
'file:nodata': [123]
}
},
'geometry': None,
'id': '07024ee9-7847-4b8a-b260-6c879a2b3cdc',
'links': [
{
'rel': 'self',
'href': 'http://oeo.net/openeo/1.0.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results',
'type': 'application/json'
},
{
'rel': 'card4l-document',
'href': 'http://ceos.org/ard/files/PFS/SR/v5.0/CARD4L_Product_Family_Specification_Surface_Reflectance-v5.0.pdf',
'type': 'application/pdf'
}
],
'properties': {
'created': '2017-01-01T09:32:12Z',
'datetime': None,
'card4l:processing_chain': {'process_graph': {'foo': {'process_id': 'foo', 'arguments': {}}}},
'card4l:specification': 'SR',
'card4l:specification_version': '5.0',
'processing:facility': 'VITO - SPARK',
'processing:software': 'openeo-geotrellis-0.0.1'
},
'stac_extensions': ['processing',
'card4l-eo',
'https://stac-extensions.github.io/file/v1.0.0/schema.json',
'eo'],
'stac_version': '0.9.0',
'type': 'Feature'
}

@mock.patch('time.time', mock.MagicMock(return_value=1234))
@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#', 'SIGNED_URL_EXPIRATION': '1000'})
def test_get_job_results_signed_with_expiration_040(self, api040):
with self._fresh_job_registry(next_job_id='job-371'):
dummy_backend.DummyBatchJobs._update_status(
job_id='07024ee9-7847-4b8a-b260-6c879a2b3cdc', user_id=TEST_USER, status='finished')
resp = api040.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
'links': [
{
'href': 'http://oeo.net/openeo/0.4.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/fd0ca65e29c6d223da05b2e73a875683/output.tiff?expires=2234'
}
]
}

@mock.patch('time.time', mock.MagicMock(return_value=1234))
@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#', 'SIGNED_URL_EXPIRATION': '1000'})
def test_get_job_results_signed_with_expiration_100(self, api100):
with self._fresh_job_registry(next_job_id='job-373'):
dummy_backend.DummyBatchJobs._update_status(
job_id='07024ee9-7847-4b8a-b260-6c879a2b3cdc', user_id=TEST_USER, status='finished')
resp = api100.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
'assets': {
'output.tiff': {
'roles': ['data'],
'title': 'output.tiff',
'href': 'http://oeo.net/openeo/1.0.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/fd0ca65e29c6d223da05b2e73a875683/output.tiff?expires=2234',
'type': 'image/tiff; application=geotiff',
'eo:bands': [{
'name': 'NDVI',
'center_wavelength': 1.23
}],
'file:nodata': [123]
}
},
'geometry': None,
'id': '07024ee9-7847-4b8a-b260-6c879a2b3cdc',
'links': [
{
'rel': 'self',
'href': 'http://oeo.net/openeo/1.0.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results',
'type': 'application/json'
},
{
'rel': 'card4l-document',
'href': 'http://ceos.org/ard/files/PFS/SR/v5.0/CARD4L_Product_Family_Specification_Surface_Reflectance-v5.0.pdf',
'type': 'application/pdf'
}
],
'properties': {
'created': '2017-01-01T09:32:12Z',
'datetime': None,
'card4l:processing_chain': {'process_graph': {'foo': {'process_id': 'foo', 'arguments': {}}}},
'card4l:specification': 'SR',
'card4l:specification_version': '5.0',
'processing:facility': 'VITO - SPARK',
'processing:software': 'openeo-geotrellis-0.0.1'
},
'stac_extensions': ['processing',
'card4l-eo',
'https://stac-extensions.github.io/file/v1.0.0/schema.json',
'eo'],
'stac_version': '0.9.0',
'type': 'Feature'
}

def test_get_job_results_invalid_job(self, api):
api.get('/jobs/deadbeef-f00/results', headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")

Expand All @@ -881,6 +1013,42 @@ def test_download_result(self, api, tmp_path):
assert resp.assert_status_code(200).data == b"tiffdata"
assert resp.headers["Content-Type"] == "image/tiff; application=geotiff"

@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#'})
def test_download_result_signed(self, api, tmp_path):
output_root = Path(tmp_path)
with mock.patch.object(dummy_backend.DummyBatchJobs, '_output_root', return_value=output_root):
output = output_root / '07024ee9-7847-4b8a-b260-6c879a2b3cdc' / 'output.tiff'
output.parent.mkdir(parents=True)
with output.open('wb') as f:
f.write(b'tiffdata')
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/50afb0cad129e61d415278c4ffcd8a83/output.tiff')
assert resp.assert_status_code(200).data == b'tiffdata'
assert resp.headers['Content-Type'] == 'image/tiff; application=geotiff'

@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#'})
def test_download_result_signed_invalid(self, api):
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/test123/output.tiff')
assert resp.assert_error(403, 'CredentialsInvalid')

@mock.patch('time.time', mock.MagicMock(return_value=1234))
@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#', 'SIGNED_URL_EXPIRATION': '1000'})
def test_download_result_signed_with_expiration(self, api, tmp_path):
output_root = Path(tmp_path)
with mock.patch.object(dummy_backend.DummyBatchJobs, '_output_root', return_value=output_root):
output = output_root / '07024ee9-7847-4b8a-b260-6c879a2b3cdc' / 'output.tiff'
output.parent.mkdir(parents=True)
with output.open('wb') as f:
f.write(b'tiffdata')
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/fd0ca65e29c6d223da05b2e73a875683/output.tiff?expires=2234')
assert resp.assert_status_code(200).data == b'tiffdata'
assert resp.headers['Content-Type'] == 'image/tiff; application=geotiff'

@mock.patch('time.time', mock.MagicMock(return_value=3456))
@mock.patch.dict(app.config, {'SIGNED_URL': 'TRUE', 'SIGNED_URL_SECRET': '123&@#', 'SIGNED_URL_EXPIRATION': '1000'})
def test_download_result_signed_with_expiration_invalid(self, api, tmp_path):
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/TXIuVGVzdA%3D%3D/fd0ca65e29c6d223da05b2e73a875683/output.tiff?expires=2234')
assert resp.assert_error(410, 'ResultLinkExpired')

def test_get_batch_job_logs(self, api):
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/logs', headers=self.AUTH_HEADER)
assert resp.assert_status_code(200).json == {
Expand Down