Skip to content

Commit

Permalink
Remove /temp endpoint #75, refactor result retrieval and some other p…
Browse files Browse the repository at this point in the history
…arts
  • Loading branch information
m-mohr committed Jan 23, 2024
1 parent 0f511eb commit c145dc0
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 87 deletions.
45 changes: 17 additions & 28 deletions src/api/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,9 @@ export default class JobsAPI {
// We could use https://github.com/axios/axios#cancellation in the future. #76

server.addEndpoint('get', '/results/{token}', this.getJobResultsByToken.bind(this), false);
// todo: What do we need the temp endpoint for? #75
server.addEndpoint('get', '/temp/{token}/{file}', this.getTempFile.bind(this), false);
server.addEndpoint('get', '/storage/{token}/{file}', this.getStorageFile.bind(this), false);
}

async getTempFile(req, res) {
const p = this.storage.makeFolder(this.context.getTempFolder(), [req.params.token, req.params.file]);
if (!p) {
throw new Errors.NotFound();
}
await this.deliverFile(res, p);
}

async getStorageFile(req, res) {
const job = await this.storage.findJob({
token: req.params.token
Expand Down Expand Up @@ -179,24 +169,21 @@ export default class JobsAPI {
const pg = new ProcessGraph(job.process, context);
pg.setLogger(logger);
const resultNode = await pg.execute();

const cube = resultNode.getResult();
const url = await context.retrieveResults(cube);

logger.debug("Downloading data from Google: " + url);
const stream = await HttpUtils.stream({
method: 'get',
url: url,
responseType: 'stream'
});
let response = await context.retrieveResults(cube);
if (typeof response === 'string') {
logger.debug("Downloading data from Google: " + response);
response = await HttpUtils.stream(response);

Check warning on line 177 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (17)

Possible race condition: `response` might be reassigned based on an outdated value of `response`

Check warning on line 177 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (lts/*)

Possible race condition: `response` might be reassigned based on an outdated value of `response`

Check warning on line 177 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (latest)

Possible race condition: `response` might be reassigned based on an outdated value of `response`
}

const extension = context.getExtension(cube.getOutputFormat());
const filePath = this.storage.getJobFile(job._id, Utils.generateHash() + "." + extension);
logger.debug("Storing result to: " + filePath);
await fse.ensureDir(path.dirname(filePath));
await new Promise((resolve, reject) => {
const writer = fse.createWriteStream(filePath);
stream.data.pipe(writer);
response.data.pipe(writer);
writer.on('error', reject);
writer.on('close', resolve);
});
Expand Down Expand Up @@ -427,21 +414,23 @@ export default class JobsAPI {

logger.debug("Executing processes");
const resultNode = await pg.execute();
const url = await context.retrieveResults(resultNode.getResult());
const cube = resultNode.getResult();

logger.debug("Downloading data from Google: " + url);
const stream = await HttpUtils.stream({
method: 'get',
url: url,
responseType: 'stream'
});
let response = await context.retrieveResults(cube);
if (typeof response === 'string') {
logger.debug("Downloading data from Google: " + response);
response = await HttpUtils.stream(response);
}

const contentType = typeof stream.headers['content-type'] !== 'undefined' ? stream.headers['content-type'] : 'application/octet-stream';
let contentType = 'application/octet-stream';
if (Utils.isObject(response.headers) && typeof response.headers['content-type'] !== 'undefined') {
contentType = response.headers['content-type'];
}
res.header('Content-Type', contentType);
res.header('OpenEO-Costs', 0);
const monitorUrl = Utils.getApiUrl('/result/logs/' + id) + '?log_level=' + log_level;
res.header('Link', `<${monitorUrl}>; rel="monitor"`);
stream.data.pipe(res);
response.data.pipe(res);
}

makeJobResponse(job, full = true) {
Expand Down
26 changes: 19 additions & 7 deletions src/api/services.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,25 @@ export default class ServicesAPI {

pg.optimizeLoadCollectionRect(rect);
const resultNode = await pg.execute();
const dataCube = resultNode.getResult();
dataCube.setOutputFormatParameter('size', '256x256');
dataCube.setSpatialExtent(rect);
dataCube.setCrs(3857);
const url = await context.retrieveResults(dataCube);
logger.debug(`Serving ${url} for tile ${req.params.x}/${req.params.y}/${req.params.z}`);
return res.redirect(url, Utils.noop);
const cube = resultNode.getResult();
cube.setOutputFormatParameter('size', '256x256');
cube.setSpatialExtent(rect);
cube.setCrs(3857);

const response = await context.retrieveResults(cube);
if (typeof response === 'string') {
logger.debug(`Serving ${response} for tile ${req.params.x}/${req.params.y}/${req.params.z}`);
return res.redirect(response, Utils.noop);
}
else {
logger.debug(`Streaming to tile ${req.params.x}/${req.params.y}/${req.params.z}`);
let contentType = 'application/octet-stream';
if (typeof response.headers['content-type'] !== 'undefined') {
contentType = response.headers['content-type'];
}
res.header('Content-Type', contentType);
response.data.pipe(res);
}
} catch(e) {
logger.error(e);
throw e;
Expand Down
13 changes: 2 additions & 11 deletions src/processes/save_result.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,7 @@
{
"name": "data",
"description": "The data to save.",
"schema": [
{
"type": "object",
"subtype": "raster-cube"
},
{
"type": "object",
"subtype": "vector-cube"
}
]
"schema": {}
},
{
"name": "format",
Expand Down Expand Up @@ -63,4 +54,4 @@
"title": "OGR Vector Formats"
}
]
}
}
6 changes: 3 additions & 3 deletions src/processgraph/commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ export default class Commons {
const ee = node.ee;
let result;

const dataCubeA = new DataCube(node.ee, null, valA);
const dataCubeA = new DataCube(node.ee, valA);
dataCubeA.setLogger(node.getLogger());

const dataCubeB = new DataCube(node.ee, null, valB);
const dataCubeB = new DataCube(node.ee, valB);
dataCubeA.setLogger(node.getLogger());

const imgReducer = (a,b) => eeImgReducer(a,b).copyProperties({source: a, properties: a.propertyNames()});
Expand Down Expand Up @@ -201,7 +201,7 @@ export default class Commons {

static applyInCallback(node, eeImgProcess, jsProcess = null, dataArg = "x") {
const data = node.getArgument(dataArg);
const dc = new DataCube(node.ee, null, data);
const dc = new DataCube(node.ee, data);
dc.setLogger(node.getLogger());
const imgProcess = a => eeImgProcess(a).copyProperties({source: a, properties: a.propertyNames()});
if (dc.isNull()) {
Expand Down
41 changes: 23 additions & 18 deletions src/processgraph/context.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import Utils from '../utils/utils.js';
import HttpUtils from '../utils/http.js';
import Errors from '../utils/errors.js';
import path from 'path';
import fse from 'fs-extra';
import StringStream from '../utils/stringstream.js';

export default class ProcessingContext {

Expand All @@ -16,7 +15,7 @@ export default class ProcessingContext {
async connectGee() {
const user = this.getUser();
const ee = this.ee;
if (user._id.startsWith("google-")) {
if (this.userId.startsWith("google-")) {
console.log("Authenticate via user token");
const expires = 59 * 60;
// todo: get expiration from token and set more parameters
Expand Down Expand Up @@ -79,7 +78,7 @@ export default class ProcessingContext {
return this.user;
}

// ToDo processes: the selection of formats and bands is really strict at the moment, maybe some of them are too strict
// Returns AxiosResponse (object) or URL (string)
async retrieveResults(dataCube) {
const logger = dataCube.getLogger();
const parameters = dataCube.getOutputFormatParameters();
Expand All @@ -90,15 +89,20 @@ export default class ProcessingContext {
else {
format = 'png';
}
// Handle CRS + bbox settings
if (!parameters.epsgCode && (format === 'jpeg' || format === 'png')) {
dataCube.setCrs(4326);
}
else if (parameters.epsgCode > 0) {
dataCube.setCrs(parameters.epsgCode);

let region = null;
let crs = null;
if (dataCube.hasDimensionsXY()) {
// Handle CRS + bbox settings
if (!parameters.epsgCode && (format === 'jpeg' || format === 'png')) {
dataCube.setCrs(4326);
}
else if (parameters.epsgCode > 0) {
dataCube.setCrs(parameters.epsgCode);
}
region = Utils.bboxToGeoJson(dataCube.getSpatialExtent());
crs = Utils.crsToString(dataCube.getCrs());
}
const region = Utils.bboxToGeoJson(dataCube.getSpatialExtent());
const crs = Utils.crsToString(dataCube.getCrs());

switch(format) {
case 'jpeg':
Expand Down Expand Up @@ -186,12 +190,13 @@ export default class ProcessingContext {
});
});
case 'json': {
const fileName = Utils.generateHash() + "/result-" + Date.now() + "." + this.getExtension(format);
const p = path.normalize(path.join(this.serverContext.getTempFolder(), fileName));
const parent = path.dirname(p);
await fse.ensureDir(parent);
await fse.writeJson(p, dataCube.getData());
return Utils.getApiUrl("/temp/" + fileName);
const data = dataCube.getData();
if (typeof data === 'undefined') {
throw new Errors.Internal({message: 'Computation did not lead to any results'});
}
const json = JSON.stringify(data);
const stream = new StringStream(json);
return HttpUtils.createResponse(stream, {'content-type': 'application/json'});
}
default:
throw new Error('File format not supported.');
Expand Down
29 changes: 17 additions & 12 deletions src/processgraph/datacube.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import Errors from '../utils/errors.js';

export default class DataCube {

constructor(ee, sourceDataCube = null, data = undefined) {
constructor(ee, data = undefined) {
this.ee = ee;
// Don't set this data directly, always use setData() to reset the type cache!
this.data = data;
// Cache the data type for less overhead, especially for ee.ComputedObject
this.type = null;
this.dimensions = {};
Expand All @@ -18,17 +16,19 @@ export default class DataCube {
this.col_id = null;
this.logger = null;

if (sourceDataCube instanceof DataCube) {
if (data === undefined) {
this.data = sourceDataCube.data;
this.type = sourceDataCube.type;
}
this.logger = sourceDataCube.logger;
this.output = Object.assign({}, sourceDataCube.output);
for(const i in sourceDataCube.dimensions) {
this.dimensions[i] = new Dimension(this, sourceDataCube.dimensions[i]);
// Don't set this data directly, always use setData() to reset the type cache!
if (data instanceof DataCube) {
this.data = data.data;
this.type = data.type;
this.logger = data.logger;
this.output = Object.assign({}, data.output);
for(const i in data.dimensions) {
this.dimensions[i] = new Dimension(this, data.dimensions[i]);
}
}
else {
this.data = data;
}
}

getLogger() {
Expand Down Expand Up @@ -195,6 +195,11 @@ export default class DataCube {
return dims[0];
}

hasDimensionsXY() {
const spatialDimensions = Object.values(this.dimensions).filter(dim => dim.type === 'spatial' && ['x', 'y'].includes(dim.axis));
return spatialDimensions.length >= 2;
}

dimX() {
return this.findSingleDimension('spatial', 'x');
}
Expand Down
2 changes: 1 addition & 1 deletion src/processgraph/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export default class GeeProcessGraphNode extends ProcessGraphNode {
invalidArgument(argument, reason) {
return new Errors.ProcessArgumentInvalid({
process: this.process_id,
namespace: this.namespace,
namespace: this.namespace || 'n/a',
argument,
reason
});
Expand Down
6 changes: 4 additions & 2 deletions src/processgraph/processgraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import GeeJsonSchemaValidator from './jsonschema.js';
import GeeProcessGraphNode from './node.js';
import Errors from '../utils/errors.js';
import Utils from '../utils/utils.js';
const epsg = Utils.require('../../package.json');
const epsg = Utils.require('epsg-index/all.json');

export default class GeeProcessGraph extends ProcessGraph {

Expand Down Expand Up @@ -54,7 +54,9 @@ export default class GeeProcessGraph extends ProcessGraph {

async validateNode(node) {
const process = this.getProcess(node);
return await process.validate(node, this.context);
if (process) {
return await process.validate(node, this.context);
}
}

async execute(args = null) {
Expand Down
18 changes: 18 additions & 0 deletions src/utils/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ import fse from 'fs-extra';

const HttpUtils = {

createResponse(data, headers = {}) {
return {
data,
status: 200,
statusText: "OK",
headers,
config: {},
request: {}
};
},

async get(url, headers = {}) {
const response = await axios.get(url, {
headers
Expand All @@ -26,6 +37,13 @@ const HttpUtils = {
},

stream(opts) {
if (typeof opts === 'string') {
opts = {
method: 'get',
url: opts,
responseType: 'stream'
};
}
return axios(opts).catch(error => {
if (opts.responseType === 'stream' && error.response !== null && typeof error.response === 'object' && error.response.data !== null) {
// JSON error responses are Blobs and streams if responseType is set as such, so convert to JSON if required.
Expand Down
5 changes: 0 additions & 5 deletions src/utils/servercontext.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ export default class ServerContext extends Config {
this.jobStore = new JobStore();
this.userStore = new UserStore(this);
this.serviceStore = new ServiceStore();
this.tempFolder = './storage/temp_files';
if (this.serviceAccountCredentialsFile) {
this.geePrivateKey = fse.readJsonSync(this.serviceAccountCredentialsFile);
}
Expand Down Expand Up @@ -58,10 +57,6 @@ export default class ServerContext extends Config {
return this.serviceStore;
}

getTempFolder() {
return this.tempFolder;
}

isValidOutputFormat(format) {
return (typeof format === 'string' && Utils.isObject(this.outputFormats[format.toUpperCase()]));
}
Expand Down
20 changes: 20 additions & 0 deletions src/utils/stringstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Readable } from 'stream'

export default class StringStream extends Readable {

constructor(str) {
super();
this.str = str;
this.sent = false;
}

_read() {
if (!this.sent) {
this.push(Buffer.from(this.str));
this.sent = true;
}
else {
this.push(null);
}
}
}

0 comments on commit c145dc0

Please sign in to comment.