Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement group by #198

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ First some terminology: offset refers to the byte position in the log
of a message. Seq refers to the 0-based position of a message in the
log.

### paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb)
### paginate(operation, seq, limit, descending, onlyOffset, sortBy, groupBy, cb)

Query the database returning paginated results. If one or more indexes
doesn't exist or are outdated, the indexes will be updated before the
Expand All @@ -497,7 +497,10 @@ ordering messages. Can take values `declared` or `arrival`. `declared`
refers to the timestamp for when a message was created, while
`arrival` refers to when a message was added to the database. This can
be important for messages from other peers that might arrive out of
order compared when they were created.
order compared when they were created. `groupBy` if used, must be a
function that takes a buffer as input and returns an index in the
buffer of the value used for grouping. The idea is to only get 1
result per grouped value.

The result is an object with the fields:

Expand Down
54 changes: 50 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1267,12 +1267,47 @@ module.exports = function (log, indexesPath) {
descending,
onlyOffset,
sortBy,
groupBy,
cb
) {
seq = seq || 0

const sorted = sortedBy(bitset, descending, sortBy)
const resultSize = sorted.size
let sortedBitset = sortedBy(bitset, descending, sortBy)

if (groupBy) {
let seqs = sortedBitset.kSmallest(Infinity).map((x) => x.seq)
const uniqueByValue = new Map()
push(
push.values(seqs),
push.asyncMap(getRecord),
push.drain(
(record) => {
const fieldStart = groupBy(record.value)
if (fieldStart < 0) return true
const value = bipf.decode(record.value, fieldStart)
if (!uniqueByValue.has(value))
uniqueByValue.set(value, record.value)

if (uniqueByValue.size == seq + limit) return false
},
(early) => {
if (early) return
Copy link
Member Author

@arj03 arj03 Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need this with push.drain, without this, we'll get called twice when bailing out early.


const results = Array.from(uniqueByValue.values())
.slice(seq)
.map((x) => bipf.decode(x, 0))
cb(null, {
results: results,
total: results.length,
})
}
)
)
return
}

const sorted = sortedBitset
const resultSize = sortedBitset.size

// seq -> record buffer
const recBufferCache = {}
Expand Down Expand Up @@ -1339,7 +1374,16 @@ module.exports = function (log, indexesPath) {
else return bitset.size() - seq
}

function paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) {
function paginate(
operation,
seq,
limit,
descending,
onlyOffset,
sortBy,
groupBy,
cb
) {
onReady(() => {
const start = Date.now()
executeOperation(operation, (err0, result) => {
Expand All @@ -1353,6 +1397,7 @@ module.exports = function (log, indexesPath) {
descending,
onlyOffset,
sortBy,
groupBy,
(err1, answer) => {
if (err1) cb(err1)
else {
Expand All @@ -1373,7 +1418,7 @@ module.exports = function (log, indexesPath) {
})
}

function all(operation, seq, descending, onlyOffset, sortBy, cb) {
function all(operation, seq, descending, onlyOffset, sortBy, groupBy, cb) {
onReady(() => {
const start = Date.now()
executeOperation(operation, (err0, result) => {
Expand All @@ -1387,6 +1432,7 @@ module.exports = function (log, indexesPath) {
descending,
onlyOffset,
sortBy,
groupBy,
(err1, answer) => {
if (err1) cb(err1)
else {
Expand Down
11 changes: 9 additions & 2 deletions operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ function sortByArrival() {
return (ops) => updateMeta(ops, 'sortBy', 'arrival')
}

function groupBy(seek) {
return (ops) => updateMeta(ops, 'groupBy', seek)
}

function startFrom(seq) {
return (ops) => updateMeta(ops, 'seq', seq)
}
Expand Down Expand Up @@ -484,7 +488,7 @@ function toCallback(cb) {
if (end) return cb(end)

const seq = meta.seq || 0
const { pageSize, descending, asOffsets, sortBy } = meta
const { pageSize, descending, asOffsets, sortBy, groupBy } = meta
if (meta.count) meta.jitdb.count(ops, seq, descending, cb)
else if (pageSize)
meta.jitdb.paginate(
Expand All @@ -494,9 +498,10 @@ function toCallback(cb) {
descending,
asOffsets,
sortBy,
groupBy,
cb
)
else meta.jitdb.all(ops, seq, descending, asOffsets, sortBy, cb)
else meta.jitdb.all(ops, seq, descending, asOffsets, sortBy, groupBy, cb)
})
}
}
Expand Down Expand Up @@ -530,6 +535,7 @@ function toPullStream() {
meta.descending,
meta.asOffsets,
meta.sortBy,
meta.groupBy,
(err, answer) => {
if (err) return cb(err)
else if (answer.total === 0) cb(true)
Expand Down Expand Up @@ -602,6 +608,7 @@ module.exports = {

descending,
sortByArrival,
groupBy,
count,
startFrom,
paginate,
Expand Down
140 changes: 90 additions & 50 deletions test/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 2)

Expand All @@ -56,6 +57,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
true,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 2)
t.equal(results[0].value.author, keys2.id)
Expand All @@ -67,6 +69,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 2)
t.equal(results[0].value.author, keys.id)
Expand All @@ -87,6 +90,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 1)
t.equal(results[0].id, msg1.id)
Expand All @@ -99,6 +103,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 1)
t.equal(results[0].id, msg1.id)
Expand All @@ -113,6 +118,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 1)
t.equal(results[0].id, msg1.id)
Expand Down Expand Up @@ -143,6 +149,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 2)
t.end()
Expand Down Expand Up @@ -185,13 +192,13 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => {
t.equal(typeof db.status.value['type_post'], 'undefined')

addMsg(state.queue[0].value, raf, (err, msg1) => {
db.all(typeQuery, 0, false, false, 'declared', (err, results) => {
db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => {
t.equal(results.length, 1)
t.equal(db.status.value['seq'], 0)
t.equal(db.status.value['type_post'], 0)

addMsg(state.queue[1].value, raf, (err, msg1) => {
db.all(typeQuery, 0, false, false, 'declared', (err, results) => {
db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => {
t.equal(results.length, 2)
t.equal(db.status.value['seq'], 352)
t.equal(db.status.value['type_post'], 352)
Expand Down Expand Up @@ -227,7 +234,7 @@ prepareAndRunTest('obsolete status parts disappear', dir, (t, db, raf) => {
addMsg(q.value, raf, cb)
}),
push.collect(() => {
db.paginate(typeQuery, 0, 1, false, false, 'declared', () => {
db.paginate(typeQuery, 0, 1, false, false, 'declared', null, () => {
t.pass(JSON.stringify(db.status.value))
t.ok(db.status.value['seq'])
t.ok(db.status.value['type_post'])
Expand All @@ -254,13 +261,22 @@ prepareAndRunTest('obsolete status parts disappear', dir, (t, db, raf) => {
addMsg(q.value, raf, cb)
}),
push.collect(() => {
db.paginate(aboutQuery, 0, 1, false, false, 'declared', () => {
t.pass(JSON.stringify(db.status.value))
t.ok(db.status.value['seq'])
t.notOk(db.status.value['type_post'])
t.ok(db.status.value['type_about'])
t.end()
})
db.paginate(
aboutQuery,
0,
1,
false,
false,
'declared',
null,
() => {
t.pass(JSON.stringify(db.status.value))
t.ok(db.status.value['seq'])
t.notOk(db.status.value['type_post'])
t.ok(db.status.value['type_about'])
t.end()
}
)
})
)
})
Expand Down Expand Up @@ -300,6 +316,7 @@ prepareAndRunTest('grow', dir, (t, db, raf) => {
false,
false,
'declared',
null,
(err, { results }) => {
t.equal(results.length, 1)
t.equal(results[0].value.content.text, 'Testing 31999')
Expand Down Expand Up @@ -350,13 +367,21 @@ prepareAndRunTest('indexAll', dir, (t, db, raf) => {
addMsg(state.queue[1].value, raf, (err, msg) => {
addMsg(state.queue[2].value, raf, (err, msg) => {
addMsg(state.queue[3].value, raf, (err, msg) => {
db.all(authorQuery, 0, false, false, 'declared', (err, results) => {
t.error(err)
t.equal(results.length, 1)
t.equal(results[0].value.content.text, 'Testing 1')
t.equal(Object.keys(db.indexes).length, 3 + 2 + 1 + 1)
t.end()
})
db.all(
authorQuery,
0,
false,
false,
'declared',
null,
(err, results) => {
t.error(err)
t.equal(results.length, 1)
t.equal(results[0].value.content.text, 'Testing 1')
t.equal(Object.keys(db.indexes).length, 3 + 2 + 1 + 1)
t.end()
}
)
})
})
})
Expand Down Expand Up @@ -389,40 +414,55 @@ prepareAndRunTest('indexAll multiple reindexes', dir, (t, db, raf) => {

addMsg(state.queue[0].value, raf, (err, msg) => {
addMsg(state.queue[1].value, raf, (err, msg) => {
db.all(typeQuery('post'), 0, false, false, 'declared', (err, results) => {
t.equal(results.length, 1)
t.equal(results[0].value.content.text, 'Testing 1')

addMsg(state.queue[2].value, raf, (err, msg) => {
addMsg(state.queue[3].value, raf, (err, msg) => {
db.all(
typeQuery('about'),
0,
false,
false,
'declared',
(err, results) => {
t.equal(results.length, 1)

db.all(
typeQuery('post'),
0,
false,
false,
'declared',
(err, results) => {
t.equal(results.length, 2)
t.deepEqual(db.indexes['type_post'].bitset.array(), [0, 2])
t.deepEqual(db.indexes['type_contact'].bitset.array(), [1])
t.deepEqual(db.indexes['type_about'].bitset.array(), [3])
t.end()
}
)
}
)
db.all(
typeQuery('post'),
0,
false,
false,
'declared',
null,
(err, results) => {
t.equal(results.length, 1)
t.equal(results[0].value.content.text, 'Testing 1')

addMsg(state.queue[2].value, raf, (err, msg) => {
addMsg(state.queue[3].value, raf, (err, msg) => {
db.all(
typeQuery('about'),
0,
false,
false,
'declared',
null,
(err, results) => {
t.equal(results.length, 1)

db.all(
typeQuery('post'),
0,
false,
false,
'declared',
null,
(err, results) => {
t.equal(results.length, 2)
t.deepEqual(db.indexes['type_post'].bitset.array(), [
0,
2,
])
t.deepEqual(db.indexes['type_contact'].bitset.array(), [
1,
])
t.deepEqual(db.indexes['type_about'].bitset.array(), [3])
t.end()
}
)
}
)
})
})
})
})
}
)
})
})
})
Loading