Skip to content

Commit

Permalink
Merge pull request #31 from markwylde/collections-metadata
Browse files Browse the repository at this point in the history
Implement collections metadata
  • Loading branch information
markwylde authored Dec 4, 2020
2 parents e6366bf + 897e6b1 commit c1ddfbb
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 4 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ async function main () {
}
```

## System Tables
The `system` namespace is used for storing the following metadata related to the database.

You can query them like any normal collection.

### collections
The `system.collections` collection contains a document for each collection, along with the
amount of documents that stores.

```javascript
http.request('/system.collections') === [{
id: 'uuid-uuid-uuid-uuid',
collectionId: 'tests',
documentCount: 1
}]
```

## Endpoints

<table>
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "canhazdb",
"version": "3.1.0",
"version": "3.2.0",
"description": "A shaded and clustered database communicated over http rest.",
"main": "./index.js",
"bin": {
Expand All @@ -12,6 +12,7 @@
"dependencies": {
"axios": "^0.20.0",
"chalk": "^4.1.0",
"debarrel": "^1.0.1",
"final-stream": "^2.0.2",
"get-port": "^5.1.1",
"hinton": "^1.0.0",
Expand All @@ -32,7 +33,7 @@
"ws": "^7.4.0"
},
"devDependencies": {
"basictap": "^1.1.8",
"basictap": "^1.1.11",
"c8": "^7.3.3"
},
"scripts": {
Expand Down
67 changes: 67 additions & 0 deletions server/createCollectionMetadataUpdater.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const debarrel = require('debarrel');

async function upsertRecord (driver, collectionId, changes) {
const collection = (await driver.get('system.collections', { collectionId }))[0];
if (collection) {
await driver.patch('system.collections', {
documentCount: collection.documentCount + changes.documentCountAdd
}, {
collectionId
});
} else {
await driver.post('system.collections', {
collectionId,
documentCount: changes.documentCountAdd
});
}
}

function createCollectionMetadataUpdater (state) {
const cache = {};
let processing = false;

async function processCache (driver, cache) {
if (state.closed) {
return;
}

if (processing) {
setTimeout(() => processCache(driver, cache), 25);
return;
}

processing = true;

const promises = Object.keys(cache).map(async collectionId => {
const promise = upsertRecord(driver, collectionId, cache[collectionId]).catch((error) => {
if (!state.closed) {
throw error;
}
});
delete cache[collectionId];
return promise;
});

await Promise.all(promises);

processing = false;
}

const watch = debarrel(
() => processCache(state.driver, cache),
{
minimumFlushTime: 25,
maximumFlushTime: 100
}
);

return watch((collectionId, change) => {
const collectionMetadata = cache[collectionId] = cache[collectionId] || {
documentCountAdd: 0
};

collectionMetadata.documentCountAdd = collectionMetadata.documentCountAdd + change.documentCountAdd;
});
}

module.exports = createCollectionMetadataUpdater;
9 changes: 8 additions & 1 deletion server/drivers/sqlite/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function createTableFromSchema (db, collectionName) {

function createSqliteDriver (state) {
async function count (collectionId, query) {
collectionId = collectionId.replace(/\./g, '#');
const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db');

if (!await fileExists(dbFile)) {
Expand All @@ -48,15 +49,17 @@ function createSqliteDriver (state) {
}

async function get (collectionId, query, fields, order, limit) {
collectionId = collectionId.replace(/\./g, '#');
const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db');

if (!await fileExists(dbFile)) {
throw Object.assign(new Error('collection not found'), { status: 404 });
return [];
}

const db = await getConnection(10000, dbFile);

const statement = queryStringToSql.records(collectionId, query, fields, order, limit);
console.log(statement);
const resources = await db.all(statement.query, statement.values);

return resources.map(resource => ({
Expand All @@ -66,6 +69,7 @@ function createSqliteDriver (state) {
}

async function post (collectionId, document) {
collectionId = collectionId.replace(/\./g, '#');
const insertableRecord = {
...document,
id: uuid()
Expand All @@ -84,6 +88,7 @@ function createSqliteDriver (state) {
}

async function put (collectionId, document, query) {
collectionId = collectionId.replace(/\./g, '#');
const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db');

if (!await fileExists(dbFile)) {
Expand All @@ -101,6 +106,7 @@ function createSqliteDriver (state) {
}

async function patch (collectionId, document, query) {
collectionId = collectionId.replace(/\./g, '#');
const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db');

if (!await fileExists(dbFile)) {
Expand All @@ -118,6 +124,7 @@ function createSqliteDriver (state) {
}

async function del (collectionId, query) {
collectionId = collectionId.replace(/\./g, '#');
const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db');
if (!await fileExists(dbFile)) {
throw Object.assign(new Error('collection not found'), { status: 404 });
Expand Down
2 changes: 1 addition & 1 deletion server/httpHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async function handlePost (state, request, response, { collectionId }) {
}
});

writeResponse(result[STATUS], result[DOCUMENT], response);
writeResponse(result[STATUS], result[DOCUMENT] || result[DATA], response);
}

async function handlePutOne (state, request, response, { collectionId, resourceId }) {
Expand Down
11 changes: 11 additions & 0 deletions server/tcpHandler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const tcpocket = require('tcpocket');
const lockbase = require('lockbase');

const createCollectionMetadataUpdater = require('./createCollectionMetadataUpdater');

const {
COMMAND,
STATUS,
Expand Down Expand Up @@ -133,6 +135,8 @@ async function post (state, request, response) {

const document = await state.driver.post(collectionId, data[DOCUMENT]);

state.updateCollectionMetadata(collectionId, { documentCountAdd: 1 });

notify(`POST:/${collectionId}/${document.id}`, collectionId, document.id, request);

response.reply({
Expand Down Expand Up @@ -208,6 +212,10 @@ async function del (state, request, response) {

const result = await state.driver.del(collectionId, query);

if (result.changes > 0) {
state.updateCollectionMetadata(collectionId, { documentCountAdd: -result.changes });
}

notify(`DELETE:/${collectionId}/${resourceId}`, collectionId, resourceId, request);

response.reply({ [STATUS]: 200, [DATA]: { changes: result.changes } });
Expand Down Expand Up @@ -291,6 +299,8 @@ const mappings = {
function createInternalServer (state, port, tls) {
state.locks = lockbase();

state.updateCollectionMetadata = createCollectionMetadataUpdater(state);

return tcpocket.createServer({ port, tls }, function (request, response) {
request.socket.state = request.socket.state || {
send: response.send,
Expand All @@ -310,6 +320,7 @@ function createInternalServer (state, port, tls) {

return mapping(state, request, response)
.catch(error => {
console.log(error);
if (error[STATUS] && error[STATUS] >= 500) {
console.log(error);
}
Expand Down
119 changes: 119 additions & 0 deletions test/collectionMetadata.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
const fs = require('fs');

const test = require('basictap');
const httpRequest = require('./helpers/httpRequest');
const createTestCluster = require('./helpers/createTestCluster');

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));

const tls = {
key: fs.readFileSync('./certs/localhost.privkey.pem'),
cert: fs.readFileSync('./certs/localhost.cert.pem'),
ca: [fs.readFileSync('./certs/ca.cert.pem')],
requestCert: true
};

test('collectionMetadata - create a record', async t => {
t.plan(5);

const cluster = await createTestCluster(3, tls);

const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, {
method: 'POST',
data: {
a: 1,
b: 2,
c: 3
}
});

await sleep(100);

const metadataRequest = await httpRequest(`${cluster.nodes[1].url}/system.collections`, {
method: 'GET'
});

t.equal(metadataRequest.data[0].documentCount, 1);

const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, {
method: 'DELETE'
});

const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`);

cluster.closeAll();

t.deepEqual(getRequest.data, {});

t.equal(postRequest.status, 201);
t.equal(deleteRequest.status, 200);
t.equal(getRequest.status, 404);
});

test('collectionMetadata - delete a record before debarrel', async t => {
t.plan(5);

const cluster = await createTestCluster(3, tls);

const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, {
method: 'POST',
data: {
a: 1,
b: 2,
c: 3
}
});

const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, {
method: 'DELETE'
});

await sleep(100);

const metadataRequest = await httpRequest(`${cluster.nodes[1].url}/system.collections`, {
method: 'GET'
});

t.equal(metadataRequest.data[0].documentCount, 0);

const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`);

cluster.closeAll();

t.deepEqual(getRequest.data, {});

t.equal(postRequest.status, 201);
t.equal(deleteRequest.status, 200);
t.equal(getRequest.status, 404);
});

test('collectionMetadata - delete a record after tick', async t => {
t.plan(4);

const cluster = await createTestCluster(3, tls);

const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, {
method: 'POST',
data: {
a: 1,
b: 2,
c: 3
}
});

await sleep(300);

const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, {
method: 'DELETE'
});

const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`);

cluster.closeAll();

t.deepEqual(getRequest.data, {});

t.equal(postRequest.status, 201);
t.equal(deleteRequest.status, 200);
t.equal(getRequest.status, 404);
});
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ require('./client');
require('./notify');
require('./locking');
require('./integration');
require('./collectionMetadata');

0 comments on commit c1ddfbb

Please sign in to comment.