Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes two bugs #7

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d042cc3
updates the queries to occur within the context of promise.all so th…
rubillionaire Mar 14, 2023
ce47d69
modifies to break out of its if statement if the uid=501(rdr) gid=20…
rubillionaire Mar 14, 2023
73e6d92
modifies features.addBuffer to break out of its if statement if the i…
rubillionaire Mar 14, 2023
d94135e
Merge branch 'main' of github.com:rubillionaire/mixmap-peermaps into …
rubillionaire Mar 14, 2023
8760fef
wip moving cpu intensive tasks to workers. starting with the georende…
rubillionaire Mar 31, 2023
a5a240f
simplify stream pipeline for concurrent decoding. rename option for m…
rubillionaire Apr 5, 2023
4100988
moves the onItem option from the Features prototype to the .decorder …
rubillionaire Apr 6, 2023
07fab6e
fixes concurrency bug since features.decoder() returns a function aro…
rubillionaire Apr 6, 2023
e09a4e8
wip eyros queries in workers, and feature decoders in workers as well…
rubillionaire Apr 18, 2023
245088e
moves to event emitter based message passing with eyros queries happe…
rubillionaire Apr 18, 2023
2848617
fixes cull bug, removes queryOpen reference on query:done instead of …
rubillionaire Apr 18, 2023
d7c255d
fixes bug in updating the storage source. reset the query planner whe…
rubillionaire Apr 18, 2023
bdac8f5
adds error handling around eyros query iterator. adds a termiante met…
rubillionaire Apr 18, 2023
e911235
remove streaming based experimental files
rubillionaire Apr 18, 2023
40561b6
fork/scope npm package.
rubillionaire Dec 27, 2023
2b0e28c
1.5.0
rubillionaire Dec 27, 2023
5117b57
Merge branch 'release/1.5.0' into main
rubillionaire Dec 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 86 additions & 77 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ var geotext = require('mixmap-georender/text')
var planner = require('viewbox-query-planner')
var getImagePixels = require('get-image-pixels')
var bboxIntersect = require('bbox-intersect')
var work = require('webworkify')
var storageHooks = require('./lib/storage-hooks.js')
var Features = require('./lib/features.js')
var queryWorker = require('./lib/query-worker')
var workerBundle = require('./lib/browserify-worker-bundler')

module.exports = P
function P(opts) {
Expand All @@ -15,33 +18,35 @@ function P(opts) {
self._map.on('resize', function () {
self._scheduleRecalc()
})
self._trace = {}
self._loading = new Set
self._storage = storageHooks(opts.storage, {
beforeLength: function (name) {
self._loading.add(name)
},
afterLength: function (name) {
self._loading.delete(name)
},
beforeRead: function (name) {
self._loading.add(name)
},
afterRead: function (name) {
self._loading.delete(name)
},
self._features = new Features()
self._queryWorker = work(queryWorker)
self._queryWorker.onmessage = function (e) {
var {type} = e.data
if (type === 'query:result') {
var {queryIndex, rowCount, result} = e.data
self._features.decoder.emit('decode', {
queryIndex,
rowCount,
buffer: Buffer.from(result[1]),
})
}
if (type === 'query:done') {
var {queryIndex} = e.data
self._decodedCache = null
self._queryOpen[queryIndex] = null
}
}
self._features.decoder.on('decoded', function (row) {
self._features.addBufferDecoded(row)
self._scheduleRecalc()
})
self._queryWorker.postMessage({
type: 'init:bundles',
eyros: workerBundle(opts.eyros, 'eyros'),
storage: workerBundle(opts.storage, 'storage'),
storageOptions: opts.storageOptions,
wasmSourceUrl: opts.wasmSourceUrl,
})
self._dbQueue = []
self._features = new Features
opts.eyros({ storage: self._storage, wasmSource: opts.wasmSource })
.then(db => {
self._db = db
for (var i = 0; i < self._dbQueue.length; i++) {
self._dbQueue[i](db)
}
self._dbQueue = null
})
.catch(err => self._error(err))

self._stylePixels = null
self._styleTexture = null
Expand Down Expand Up @@ -102,7 +107,9 @@ function P(opts) {
self._recalcTime = 0
self.props = {}
self.layer = self._map.addLayer({
viewbox: function (bbox, zoom, cb) { self._onviewbox(bbox,zoom,cb) }
viewbox: function (bbox, zoom, cb) {
self._onviewbox(bbox,zoom,cb)
}
})

// for debugging purposes, safe to remove
Expand All @@ -115,33 +122,59 @@ P.prototype._error = function (err) {
console.error('CAUGHT', err)
}

P.prototype.terminate = function () {
var self = this
var resolved = false
var terminating = new Set
terminating.add('query-worker')
terminating.add('decoder-worker')
return new Promise((resolve, reject) => {
self._queryWorker.onmessage = function (e) {
var {type} = e.data
if (type === 'terminated') {
self._queryWorker.terminate()
terminating.delete('query-worker')
if (terminating.has('decoder-worker')) return
if (!resolved) {
resolved = true
resolve()
}
}
}
self._queryWorker.postMessage({
type: 'terminate',
})
self._features.decoder.on('terminated', function () {
terminating.delete('decoder-worker')
if (terminating.has('query-worker')) return
if (!resolved) {
resolved = true
resolve()
}
})
self._features.decoder.emit('terminate')
})

}

P.prototype._onviewbox = function (bbox, zoom, cb) {
var self = this
var boxes = self._plan.update(bbox)
for (var i = 0; i < boxes.length; i++) {
self._plan.add(boxes[i])
}
self._zoom = zoom
self._getDb(function (db) {
boxes.forEach(bbox => {
db.query(bbox, { trace })
.then(async (q) => {
var now = performance.now()
await self._loadQuery(bbox, q)
self._debug('_loadQuery bbox', bbox, 'time', performance.now() - now, 'ms')
})
.catch(e => self._error(e))
function trace(tr) {
self._trace[tr.file] = tr
}
boxes.forEach(function (bbox) {
var queryIndex = ++self._lastQueryIndex
self._queryOpen[queryIndex] = true
self._queryResults.push({ bbox, queryIndex })
self._queryWorker.postMessage({
type: 'query:init',
bbox,
queryIndex,
})
})
}

P.prototype._getDb = function (cb) {
var self = this
if (self._db) cb(self._db)
else self._dbQueue.push(cb)
cb()
}

P.prototype._getStyle = function (cb) {
Expand All @@ -161,22 +194,20 @@ P.prototype._cull = function () {
if (qr === null) continue
if (!bboxIntersect(self._map.viewbox, qr.bbox)) {
culling++
if (self._queryOpen[qr.index]) {
self._queryCanceled[qr.index] = true
delete self._queryOpen[qr.index]
if (self._queryOpen[qr.queryIndex]) {
self._queryCanceled[qr.queryIndex] = true
delete self._queryOpen[qr.queryIndex]
self._queryWorker.postMessage({
type: 'query:cancel',
queryIndex: qr.queryIndex,
currentMapViewbox: self._map.viewbox,
})
}
self._plan.subtract(qr.bbox)
self._queryResults[i] = null
self._features.cull(i)
}
}
for (var file of self._loading) {
var tr = self._trace[file]
if (!tr) continue
if (!bboxIntersect(self._map.viewbox, tr.bbox)) {
self._storage.destroy(file)
}
}
}

P.prototype._scheduleRecalc = function () {
Expand All @@ -191,28 +222,6 @@ P.prototype._scheduleRecalc = function () {
}, interval)
}

P.prototype._loadQuery = async function loadQuery(bbox, q) {
var self = this
self._debug('_loadQuery bbox', bbox, 'q.ptr', q.ptr)
var row
var index = ++self._lastQueryIndex
self._queryOpen[index] = true
self._queryResults.push({ bbox, index })
var rowCount = 0
while (row = await q.next()) {
if (self._queryCanceled[index]) {
self._scheduleRecalc()
return
}
++rowCount
self._features.addBuffer(index, Buffer.from(row[1]))
self._scheduleRecalc()
}
self._debug('_loadQuery number of rows', rowCount)
self._decodedCache = null
delete self._queryOpen[index]
}

P.prototype._recalc = function(fromScheduled) {
var self = this
if (!fromScheduled && !self._recalcTimer) {
Expand Down
61 changes: 61 additions & 0 deletions lib/browserify-worker-bundler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
var browserifyBundleFn = arguments[3];
var browserifySources = arguments[4];
var browserifyCache = arguments[5];

function findModuleId (moduleFn) {
for (var id in browserifyCache) {
if (browserifyCache[id].exports === moduleFn) {
return id;
}
}
throw new Error('Module not found in Browserify bundle.');
}

function createBundleUrl (moduleId, name) {
console.log('createBundleUrl', moduleId, name)
var addedSources = {};
resolveSources({}, addedSources, moduleId);

var deps = Object.keys(addedSources);
if (!deps.length) return;

var src = generateWorkerBundle(deps, name);
console.log({src})
var url = createURL(src);
return {
moduleId,
url,
}
}


function resolveSources(workerSources, addedSources, key) {
if (workerSources[key]) return;

workerSources[key] = true;
addedSources[key] = true;

var deps = browserifySources[key][1];
for (var depPath in deps) {
resolveSources(workerSources, addedSources, deps[depPath]);
}
}

function generateWorkerBundle(deps, name) {
return 'self.'+ name +'=(' + browserifyBundleFn + ')({' + deps.map(function (key) {
var source = browserifySources[key];
return JSON.stringify(key) + ':[' + source[0] + ',' + JSON.stringify(source[1]) + ']';
}).join(',') + '},{},[])';
}

function createURL(src) {
var URL = window.URL || window.webkitURL;
var blob = new Blob([src], {type: 'text/javascript'});
return URL.createObjectURL(blob);
}

function workerBundle (moduleFn, name) {
return createBundleUrl(findModuleId(moduleFn), name)
}

module.exports = workerBundle
73 changes: 73 additions & 0 deletions lib/concurrent-decoder-emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
var {EventEmitter} = require('events')
var work = require('webworkify')
var decodeWorkFn = require('./feature-decode-worker')

module.exports = ConcurrentDecoder

function noop () {}

var STATUS = {
BUSY: 0,
FREE: 1,
}

function ConcurrentDecoder (options) {
if (!(this instanceof ConcurrentDecoder)) return new ConcurrentDecoder(options)
var self = this

if (!options) options = {}

var queue = []

this._count = options.count || 4
this._itemId = options.itemId

this._workers = []

var workerOnMessage = (i) => (e) => {
self._workers[i].status = STATUS.FREE
var row = e.data
// row : { queryIndex, rowCount, buffer, decoded }
self.emit('decoded', row)
tryShiftQueue()
}

for (var i = 0; i < this._count; i++) {
var worker = work(decodeWorkFn)
worker.onmessage = workerOnMessage(i)
this._workers[i] = {
worker,
status: STATUS.FREE,
}
}

function tryShiftQueue () {
var row = queue.shift()
if (!row) return
self.emit('decode', row)
}

this.freeWorker = function () {
return self._workers.find(w => w.status === STATUS.FREE)
}

this.on('decode', function (row) {
// row { queryIndex, rowCount, buffer }
var w = self.freeWorker()
if (!w) {
queue.push(row)
return
}
w.status = STATUS.BUSY
w.worker.postMessage(row)
})

this.on('terminate', function () {
for (var i = 0; i < self._count; i++) {
self._workers[i].worker.terminate()
}
self.emit('terminated')
})
}

ConcurrentDecoder.prototype = Object.create(EventEmitter.prototype)
22 changes: 22 additions & 0 deletions lib/feature-decode-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
var b4a = require('b4a')
var decode = require('georender-pack/decode')

module.exports = function (self) {
self.addEventListener('message',function (e){
var {queryIndex, rowCount, buffer} = e.data
// buffer = b4a.from(buffer)
buffer.readUInt8 = function (offset) {
return buffer[offset]
}
buffer.readFloatLE = function (offset) {
return b4a.readFloatLE(buffer, offset)
}
try {
var decoded = decode([buffer])
postMessage({queryIndex, rowCount, buffer, decoded})
}
catch (err) {
console.log(err)
}
})
}
Loading