Skip to content

Commit

Permalink
Refactoring for workers #77
Browse files Browse the repository at this point in the history
  • Loading branch information
m-mohr committed May 26, 2024
1 parent 7e8f7f6 commit 3955dd7
Show file tree
Hide file tree
Showing 17 changed files with 236 additions and 186 deletions.
13 changes: 7 additions & 6 deletions src/api/capabilities.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import API from '../utils/API.js';
import Utils from '../utils/utils.js';
const packageInfo = Utils.require('../../package.json');

Expand Down Expand Up @@ -38,7 +39,7 @@ export default class CapabilitiesAPI {
async getVersions(req, res) {
const versions = this.context.otherVersions.slice(0); // Make sure to clone it
versions.push({
url: Utils.getApiUrl(),
url: API.getUrl(),
production: this.context.production,
api_version: this.context.apiVersion
});
Expand All @@ -64,12 +65,12 @@ export default class CapabilitiesAPI {
links: [
{
rel: "self",
href: Utils.getApiUrl(`/`),
href: API.getUrl(`/`),
type: "application/json"
},
{
rel: "root",
href: Utils.getApiUrl(`/`),
href: API.getUrl(`/`),
type: "application/json"
},
{
Expand Down Expand Up @@ -107,19 +108,19 @@ export default class CapabilitiesAPI {
},
{
rel: 'version-history',
href: Utils.getServerUrl() + '/.well-known/openeo',
href: API.getBaseUrl() + '/.well-known/openeo',
type: 'application/json',
title: 'Supported API versions'
},
{
rel: "data",
href: Utils.getApiUrl("/collections"),
href: API.getUrl("/collections"),
type: "application/json",
title: "Datasets"
},
{
rel: "conformance",
href: Utils.getApiUrl("/conformance"),
href: API.getUrl("/conformance"),
type: "application/json",
title: "OGC Conformance classes"
}
Expand Down
21 changes: 11 additions & 10 deletions src/api/collections.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import API from '../utils/API.js';
import Utils from '../utils/utils.js';
import Errors from '../utils/errors.js';
import GeeProcessing from '../processes/utils/processing.js';
Expand Down Expand Up @@ -46,7 +47,7 @@ export default class Data {
console.log(`Loaded ${num} collections (${Date.now()-a} ms)`);

const b = Date.now();
const pContext = this.context.processingContext({});
const pContext = this.context.processingContext();
this.ee = await pContext.connectGee(true);
console.log(`Established connection to GEE for STAC (${Date.now()-b} ms)`);

Expand Down Expand Up @@ -75,17 +76,17 @@ export default class Data {
links: [
{
rel: "self",
href: Utils.getApiUrl("/collections"),
href: API.getUrl("/collections"),
type: "application/json"
},
{
rel: "root",
href: Utils.getApiUrl("/"),
href: API.getUrl("/"),
type: "application/json"
},
{
rel: "alternate",
href: Utils.getApiUrl("/stac"),
href: API.getUrl("/stac"),
title: "STAC API",
type: "application/json"
},
Expand Down Expand Up @@ -245,36 +246,36 @@ export default class Data {
const links = [
{
rel: "self",
href: Utils.getApiUrl(`/collections/${id}/items`),
href: API.getUrl(`/collections/${id}/items`),
type: "application/geo+json"
},
{
rel: "root",
href: Utils.getApiUrl(`/`),
href: API.getUrl(`/`),
type: "application/json"
},
{
rel: "collection",
href: Utils.getApiUrl(`/collections/${id}`),
href: API.getUrl(`/collections/${id}`),
type: "application/json"
}
]
if (offset > 0) {
links.push({
rel: "first",
href: Utils.getApiUrl(`/collections/${id}/items?limit=${limit}&offset=0`),
href: API.getUrl(`/collections/${id}/items?limit=${limit}&offset=0`),
type: "application/geo+json"
});
links.push({
rel: "prev",
href: Utils.getApiUrl(`/collections/${id}/items?limit=${limit}&offset=${Math.max(0, offset - limit)}`),
href: API.getUrl(`/collections/${id}/items?limit=${limit}&offset=${Math.max(0, offset - limit)}`),
type: "application/geo+json"
});
}
if (hasNextPage) {
links.push({
rel: "next",
href: Utils.getApiUrl(`/collections/${id}/items?limit=${limit}&offset=${offset + limit}`),
href: API.getUrl(`/collections/${id}/items?limit=${limit}&offset=${offset + limit}`),
type: "application/geo+json"
});
}
Expand Down
105 changes: 22 additions & 83 deletions src/api/jobs.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import API from '../utils/API.js';
import Utils from '../utils/utils.js';
import HttpUtils from '../utils/http.js';
import fse from 'fs-extra';
import path from 'path';
import Errors from '../utils/errors.js';
import ProcessGraph from '../processgraph/processgraph.js';
import Logs from '../models/logs.js';
import GeeResults from '../processes/utils/results.js';
import runBatchJob from './worker/batchjob.js';
import runSync, { getResultLogs } from './worker/sync.js';
const packageInfo = Utils.require('../../package.json');

export default class JobsAPI {
Expand Down Expand Up @@ -87,18 +88,11 @@ export default class JobsAPI {
res.json(logs);
}

async getResultLogs(user_id, id, log_level) {
const file = path.normalize(path.join('./storage/user_files/', user_id, 'sync_logs' , id + '.logs.db'));
const logs = new Logs(file, Utils.getApiUrl('/result/logs/' + id), log_level);
await logs.init();
return logs;
}

async getSyncLogFile(req, res) {
this.init(req);

try {
const logs = await this.getResultLogs(req.user._id, req.params.id, req.query.log_level);
const logs = await getResultLogs(req.user._id, req.params.id, req.query.log_level);
res.json(await logs.get(null, 0));
} catch (e) {
throw new Errors.NotFound();
Expand Down Expand Up @@ -131,55 +125,18 @@ export default class JobsAPI {
user_id: req.user._id
};

let logger = console;
try {
const job = await this.storage.findJob(query);
if (job.status === 'queued' || job.status === 'running') {
throw new Errors.JobNotFinished();
}

const job = await this.storage.findJob(query);
if (job.status === 'queued' || job.status === 'running') {
throw new Errors.JobNotFinished();
}
const logger = await this.storage.getLogsById(job._id, job.log_level);
logger.info("Queueing batch job");
await this.storage.updateJobStatus(query, 'queued');

logger = await this.storage.getLogsById(job._id, job.log_level);

const promises = [];
promises.push(this.storage.removeResults(job._id, false));
promises.push(logger.clear());
await Promise.all(promises);

logger.info("Queueing batch job");
await this.storage.updateJobStatus(query, 'queued');

res.send(202);

// ToDo sync: move all the following to a worker #77
logger.info("Starting batch job");
await this.storage.updateJobStatus(query, 'running');

const context = this.context.processingContext(req);
const pg = new ProcessGraph(job.process, context, logger);
const resultNode = await pg.execute();

const response = await GeeResults.retrieve(resultNode, false);
// todo: implement exporting multiple images
// const response = await GeeResults.retrieve(resultNode, true);

const filePath = this.storage.getJobFile(job._id, String(Utils.generateHash()) + GeeResults.getFileExtension(resultNode));
logger.debug("Storing result to: " + filePath);
await fse.ensureDir(path.dirname(filePath));
await new Promise((resolve, reject) => {
const writer = fse.createWriteStream(filePath);
response.data.pipe(writer);
writer.on('error', reject);
writer.on('close', resolve);
});

logger.info("Finished");
this.storage.updateJobStatus(query, 'finished');
} catch(e) {
logger.error(e);
this.storage.updateJobStatus(query, 'error');
throw e;
}
res.send(202);

await runBatchJob(this.context, this.storage, this.user, query);
}

async getJobResultsByToken(req, res) {
Expand Down Expand Up @@ -247,15 +204,15 @@ export default class JobsAPI {
const files = await Utils.walk(folder);
const links = [
{
href: Utils.getApiUrl("/results/" + job.token),
href: API.getUrl("/results/" + job.token),
rel: 'canonical',
type: 'application/json'
}
];
const assets = {};
for(const file of files) {
const fileName = path.relative(folder, file.path);
const href = Utils.getApiUrl("/storage/" + job.token + "/" + fileName);
const href = API.getUrl("/storage/" + job.token + "/" + fileName);
const type = Utils.extensionToMediaType(fileName);
if (fileName === this.storage.logFileName) {
if (!pub) {
Expand Down Expand Up @@ -322,7 +279,7 @@ export default class JobsAPI {
if (this.storage.isFieldEditable(key)) {
switch(key) {
case 'process': {
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req));
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
pg.allowUndefinedParameters(false);
promises.push(pg.validate());
break;
Expand Down Expand Up @@ -362,7 +319,7 @@ export default class JobsAPI {
throw new Errors.RequestBodyMissing();
}

const pg = new ProcessGraph(req.body.process, this.context.processingContext(req));
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
pg.allowUndefinedParameters(false);
await pg.validate();

Expand All @@ -388,7 +345,7 @@ export default class JobsAPI {
await this.storage.getLogsById(job._id, job.log_level);

res.header('OpenEO-Identifier', job._id);
res.redirect(201, Utils.getApiUrl('/jobs/' + job._id), Utils.noop);
res.redirect(201, API.getUrl('/jobs/' + job._id), Utils.noop);
}

async postSyncResult(req, res) {
Expand All @@ -401,32 +358,14 @@ export default class JobsAPI {
// const plan = req.body.plan || this.context.plans.default;
// const budget = req.body.budget || null;
// ToDo: Validate data, handle budget and plan input #73

Check warning on line 360 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (17)

Unexpected 'todo' comment: 'ToDo: Validate data, handle budget and...'

Check warning on line 360 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (lts/*)

Unexpected 'todo' comment: 'ToDo: Validate data, handle budget and...'

Check warning on line 360 in src/api/jobs.js

View workflow job for this annotation

GitHub Actions / deploy (latest)

Unexpected 'todo' comment: 'ToDo: Validate data, handle budget and...'

const id = Utils.timeId();
const log_level = Logs.checkLevel(req.body.log_level, this.context.defaultLogLevel);
const logger = await this.getResultLogs(req.user._id, id, log_level);
logger.debug("Starting to process request");

const context = this.context.processingContext(req);
const pg = new ProcessGraph(req.body.process, context, logger);
pg.allowUndefinedParameters(false);
const errorList = await pg.validate(false);
if (errorList.count() > 0) {
errorList.getAll().forEach(error => logger.error(error));
throw errorList.first();
}
else {
logger.info("Validated without errors");
}

logger.debug("Executing processes");
const resultNode = await pg.execute();
const log_level = Logs.checkLevel(req.body.log_level, this.context.defaultLogLevel);

const response = await GeeResults.retrieve(resultNode);
const response = await runSync(this.context, req.user, id, req.body.process, log_level);

res.header('Content-Type', response?.headers?.['content-type'] || 'application/octet-stream');
res.header('OpenEO-Costs', 0);
const monitorUrl = Utils.getApiUrl('/result/logs/' + id) + '?log_level=' + log_level;
const monitorUrl = API.getUrl('/result/logs/' + id) + '?log_level=' + log_level;
res.header('Link', `<${monitorUrl}>; rel="monitor"`);
response.data.pipe(res);
}
Expand Down
Loading

0 comments on commit 3955dd7

Please sign in to comment.