-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
120 lines (105 loc) · 2.75 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
const Redis = require('ioredis');
const escapeRegexp = require('escape-regex');
const { filterPaths, filterViews } = require('micro-analytics-adapter-utils');
const Observable = require('zen-observable');
const options = [
{
name: 'db-config',
description:
'Redis connection string or ioredis compatible JSON object. Environment variable: MAA_REDIS_DB_CONFIG',
defaultValue: process.env.MAA_REDIS_DB_CONFIG || 'redis://localhost:6379'
},
{
name: 'hash-key',
description:
'Hash key for the redis hash to store all the data in. Environment variable: MAA_REDIS_DB_CONFIG',
defaultValue: process.env.MAA_REDIS_HASH_KEY || 'micro-analytics'
}
];
let hashKey;
let redis;
let observable;
let subscriber;
function init(options) {
let dbConfig;
try {
dbConfig = JSON.parse(options.dbConfig);
} catch (e) {
dbConfig = options.dbConfig;
}
hashKey = options.hashKey;
redis = new Redis(dbConfig);
subscriber = new Redis(dbConfig);
let handlers = [];
observable = new Observable(observer => {
handlers.push(data => {
observer.next(data);
});
let index = handlers.length;
return () => {
handlers = [...handlers.slice(0, index), ...handlers.slice(index)];
};
});
subscriber.subscribe('views', error => {
if (error) {
throw error;
}
});
subscriber.on('message', (channel, message) => {
if (channel === 'views') {
const [key, value] = JSON.parse(message);
handlers.forEach(handler => {
handler({ key, value });
});
}
});
}
function get(key, options) {
return redis
.hget(hashKey, key)
.then(value => JSON.parse(value))
.then(value => ({ views: filterViews(value ? value.views : [], options) }));
}
function put(key, value) {
redis.publish('views', JSON.stringify([key, value]));
return redis.hset(hashKey, key, JSON.stringify(value));
}
function getAll(options) {
return redis.hgetall(hashKey).then(value =>
filterPaths(Object.keys(value), options).reduce((lastValue, item) => {
const parsed = JSON.parse(value[item]);
return Object.assign({}, lastValue, {
[item]: Object.assign({}, parsed, { views: filterViews(parsed.views, options) })
});
}, {})
);
}
function has(key) {
return redis.hexists(hashKey, key).then(function(result) {
return !!result;
});
}
function keys() {
return redis.hkeys(hashKey);
}
function subscribe(cb) {
return observable.subscribe(cb);
}
async function close() {
await Promise.all([redis.disconnect(), subscriber.disconnect()]);
}
function clear() {
return redis.flushdb();
}
module.exports = {
clear: clear,
close: close,
get: get,
getAll: getAll,
has: has,
init: init,
keys: keys,
options: options,
put: put,
subscribe: subscribe
};