forked from graphmalizer/graphmalizer-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
101 lines (82 loc) · 2.48 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var H = require('highland')
var argv = require('minimist')
var neoBatch = require('./utils/neo4batch')
var Queries = require('./queries')
var inputChecker = require('./input')
var config = require('./config')
function Graphmalizer (userConfig) {
// store configuration with user overrides
var conf = config(userConfig)
// setup neo4j client
var batchCommit = H.wrapCallback(neoBatch(conf.Neo4J))
// setup input checker '~ schema validation'
var checkInput = inputChecker(conf.types)
// make query (uses config to determine type ~ structure mapping)
function prepare (o) {
try {
var input = checkInput(o)
var q = Queries.mkQuery(input)
// we have win!
return [q]
} catch (err) {
// other than spewing, we ignore errors
console.error(err.stack)
// so this request will be flatmapped away
return []
}
}
// stream of input streams
this.inputs = H()
// merge all inputs, convert to cypher queries and batch up
var input = this.inputs
.merge()
.flatMap(prepare)
.batchWithTimeOrCount(argv.batchTimeout || conf.batchTimeout, argv.batchSize || conf.batchSize)
// commit batches sequentially
var output = input
.fork()
.map(batchCommit) // a -> stream b
.series()
.map(function (r) {
console.log('GRAPHMALIZER =>', r.results.length, 'docs,', r.duration_ms + 'ms')
return r
})
.pluck('results')
// zip into [request-batch, response-batch]
var rr = input
.fork()
.zip(output)
// unzip, flatten batches
var requests = rr.fork().pluck(0).sequence()
var responses = rr.fork().pluck(1).sequence()
// zip back up and turn it into a dictionary
this.system = requests
.zip(responses)
.map(function (rr) {
return {
request: rr[0],
response: rr[1]
}
})
// now that all streams are setup, ensure schema creatio
// todo, actually this sucks because it cannot be run
// in transaction with write
// this.inputs.write(H([]));//{query: 'schema'}]))
}
// subscribe a stream to the graphmalizer
Graphmalizer.prototype.register = function (stream) { // ensure valid arguments
if (stream) {
if (!H.isStream(stream)) {
throw new Error('Must pass a (highland) stream')
}
// register input stream
this.inputs.write(stream)
}
// return stream of all request-responses
return this.system.fork()
}
Graphmalizer.prototype.shutdown = function () {
// register input stream
this.inputs.end()
}
module.exports = Graphmalizer