-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathjson-multi-schema-transformer.js
95 lines (83 loc) · 3.03 KB
/
json-multi-schema-transformer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* Node-RED node transforming a JSON observation from whichever format to another format using a specified JSONata URL.
* Schemas are automatically downloaded and cached on disk the first time they are needed.
* JSONata expressions are cached in memory.
*/
// JSONata: A declarative open-source query and transformation language for JSON data.
const jsonata = require('jsonata');
const util = require('util');
module.exports = RED => {
'use strict';
function JsonMultiSchemaTransformerNode(config) {
RED.nodes.createNode(this, config);
const node = this;
const defaultSchemaUrl = config.defaultSchemaUrl;
let lastStatusError = true;
node.status({ fill: 'grey', shape: 'ring', text: 'Uninitialized' });
const jsonCache = require('./json-cache.js')(node);
// Cache of JSONata expressions
const jsonatas = {};
/**
* Transform the given payload with the JSONata expression given in URL.
*/
async function transformAsync(payload, transformUrl) {
let jsonataExpression;
let jsonataCache = jsonatas[transformUrl];
if (jsonataCache) {
if (jsonataCache.expression === null) {
// Wait for another task to be done building the same JSONata, so that we can use its cache
await new Promise((resolve, reject) => jsonataCache.mutexQueue.push(resolve));
}
jsonataExpression = jsonataCache.expression;
} else {
// Build JSONata expression for the given transformation URL
jsonataCache = { expression: null, mutexQueue: [] };
jsonatas[transformUrl] = jsonataCache;
const transform = await jsonCache.loadAsync(transformUrl, false);
node.debug('Build JSONata expression for: ' + transformUrl);
jsonataExpression = jsonata(transform);
jsonataCache.expression = jsonataExpression;
// Resume tasks waiting for the same JSONata expression
let next;
while ((next = jsonataCache.mutexQueue.shift()) != undefined) {
next(); // Resolve promise
}
}
if (jsonataExpression) {
// Perform transformation
return await jsonataExpression.evaluate(payload);
}
return false;
}
node.on('input', async msg => {
msg.error = msg.error ? msg.error + ' ; ' : '';
if (!msg.schemaUrl) {
msg.schemaUrl = defaultSchemaUrl;
}
if (msg.schemaUrl) {
msg.transformUrl = msg.schemaUrl;
try {
const result = await transformAsync(msg.payload, msg.schemaUrl);
if (result) {
msg.payload = result;
msg.error = msg.error != '';
} else {
msg.error += util.format('Failed tranforming using "%s"', msg.schemaUrl);
}
if (lastStatusError) {
node.status({ fill: 'green', shape: 'dot', text: 'OK' });
lastStatusError = false;
}
} catch (ex) {
lastStatusError = true;
node.status({ fill: 'red', shape: 'ring', text: 'Error' });
console.error('Schema2 ' + msg.schemaUrl);
msg.error += util.format('Error tranforming using "%s": %s', msg.schemaUrl, ex);
}
}
delete msg.schemaUrl;
node.send(msg);
});
}
RED.nodes.registerType('json-multi-schema-transformer', JsonMultiSchemaTransformerNode);
};