diff --git a/README.md b/README.md index a871e0c..347d106 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,23 @@ async function main () { } ``` +## System Tables +The `system` namespace is used for storing the following metadata related to the database. + +You can query them like any normal collection. + +### collections +The `system.collections` collection contains a document for each collection, along with the +amount of documents that stores. + +```javascript +http.request('/system.collections') === [{ + id: 'uuid-uuid-uuid-uuid', + collectionId: 'tests', + documentCount: 1 +}] +``` + ## Endpoints diff --git a/package.json b/package.json index df967d7..3392420 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "canhazdb", - "version": "3.1.0", + "version": "3.2.0", "description": "A shaded and clustered database communicated over http rest.", "main": "./index.js", "bin": { @@ -12,6 +12,7 @@ "dependencies": { "axios": "^0.20.0", "chalk": "^4.1.0", + "debarrel": "^1.0.1", "final-stream": "^2.0.2", "get-port": "^5.1.1", "hinton": "^1.0.0", @@ -32,7 +33,7 @@ "ws": "^7.4.0" }, "devDependencies": { - "basictap": "^1.1.8", + "basictap": "^1.1.11", "c8": "^7.3.3" }, "scripts": { diff --git a/server/createCollectionMetadataUpdater.js b/server/createCollectionMetadataUpdater.js new file mode 100644 index 0000000..88515e1 --- /dev/null +++ b/server/createCollectionMetadataUpdater.js @@ -0,0 +1,67 @@ +const debarrel = require('debarrel'); + +async function upsertRecord (driver, collectionId, changes) { + const collection = (await driver.get('system.collections', { collectionId }))[0]; + if (collection) { + await driver.patch('system.collections', { + documentCount: collection.documentCount + changes.documentCountAdd + }, { + collectionId + }); + } else { + await driver.post('system.collections', { + collectionId, + documentCount: changes.documentCountAdd + }); + } +} + +function createCollectionMetadataUpdater (state) { + const cache = {}; + let processing = false; + + async function processCache (driver, cache) { + if (state.closed) { + return; + } + + if (processing) { + setTimeout(() => processCache(driver, cache), 25); + return; + } + + processing = true; + + const promises = Object.keys(cache).map(async collectionId => { + const promise = upsertRecord(driver, collectionId, cache[collectionId]).catch((error) => { + if (!state.closed) { + throw error; + } + }); + delete cache[collectionId]; + return promise; + }); + + await Promise.all(promises); + + processing = false; + } + + const watch = debarrel( + () => processCache(state.driver, cache), + { + minimumFlushTime: 25, + maximumFlushTime: 100 + } + ); + + return watch((collectionId, change) => { + const collectionMetadata = cache[collectionId] = cache[collectionId] || { + documentCountAdd: 0 + }; + + collectionMetadata.documentCountAdd = collectionMetadata.documentCountAdd + change.documentCountAdd; + }); +} + +module.exports = createCollectionMetadataUpdater; diff --git a/server/drivers/sqlite/index.js b/server/drivers/sqlite/index.js index dab5481..923d89f 100644 --- a/server/drivers/sqlite/index.js +++ b/server/drivers/sqlite/index.js @@ -31,6 +31,7 @@ function createTableFromSchema (db, collectionName) { function createSqliteDriver (state) { async function count (collectionId, query) { + collectionId = collectionId.replace(/\./g, '#'); const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db'); if (!await fileExists(dbFile)) { @@ -48,15 +49,17 @@ function createSqliteDriver (state) { } async function get (collectionId, query, fields, order, limit) { + collectionId = collectionId.replace(/\./g, '#'); const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db'); if (!await fileExists(dbFile)) { - throw Object.assign(new Error('collection not found'), { status: 404 }); + return []; } const db = await getConnection(10000, dbFile); const statement = queryStringToSql.records(collectionId, query, fields, order, limit); + console.log(statement); const resources = await db.all(statement.query, statement.values); return resources.map(resource => ({ @@ -66,6 +69,7 @@ function createSqliteDriver (state) { } async function post (collectionId, document) { + collectionId = collectionId.replace(/\./g, '#'); const insertableRecord = { ...document, id: uuid() @@ -84,6 +88,7 @@ function createSqliteDriver (state) { } async function put (collectionId, document, query) { + collectionId = collectionId.replace(/\./g, '#'); const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db'); if (!await fileExists(dbFile)) { @@ -101,6 +106,7 @@ function createSqliteDriver (state) { } async function patch (collectionId, document, query) { + collectionId = collectionId.replace(/\./g, '#'); const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db'); if (!await fileExists(dbFile)) { @@ -118,6 +124,7 @@ function createSqliteDriver (state) { } async function del (collectionId, query) { + collectionId = collectionId.replace(/\./g, '#'); const dbFile = path.join(state.options.dataDirectory, './' + collectionId + '.db'); if (!await fileExists(dbFile)) { throw Object.assign(new Error('collection not found'), { status: 404 }); diff --git a/server/httpHandler.js b/server/httpHandler.js index 1f85bf8..9445133 100644 --- a/server/httpHandler.js +++ b/server/httpHandler.js @@ -170,7 +170,7 @@ async function handlePost (state, request, response, { collectionId }) { } }); - writeResponse(result[STATUS], result[DOCUMENT], response); + writeResponse(result[STATUS], result[DOCUMENT] || result[DATA], response); } async function handlePutOne (state, request, response, { collectionId, resourceId }) { diff --git a/server/tcpHandler.js b/server/tcpHandler.js index 9c1031e..22cfa1e 100644 --- a/server/tcpHandler.js +++ b/server/tcpHandler.js @@ -1,6 +1,8 @@ const tcpocket = require('tcpocket'); const lockbase = require('lockbase'); +const createCollectionMetadataUpdater = require('./createCollectionMetadataUpdater'); + const { COMMAND, STATUS, @@ -133,6 +135,8 @@ async function post (state, request, response) { const document = await state.driver.post(collectionId, data[DOCUMENT]); + state.updateCollectionMetadata(collectionId, { documentCountAdd: 1 }); + notify(`POST:/${collectionId}/${document.id}`, collectionId, document.id, request); response.reply({ @@ -208,6 +212,10 @@ async function del (state, request, response) { const result = await state.driver.del(collectionId, query); + if (result.changes > 0) { + state.updateCollectionMetadata(collectionId, { documentCountAdd: -result.changes }); + } + notify(`DELETE:/${collectionId}/${resourceId}`, collectionId, resourceId, request); response.reply({ [STATUS]: 200, [DATA]: { changes: result.changes } }); @@ -291,6 +299,8 @@ const mappings = { function createInternalServer (state, port, tls) { state.locks = lockbase(); + state.updateCollectionMetadata = createCollectionMetadataUpdater(state); + return tcpocket.createServer({ port, tls }, function (request, response) { request.socket.state = request.socket.state || { send: response.send, @@ -310,6 +320,7 @@ function createInternalServer (state, port, tls) { return mapping(state, request, response) .catch(error => { + console.log(error); if (error[STATUS] && error[STATUS] >= 500) { console.log(error); } diff --git a/test/collectionMetadata.js b/test/collectionMetadata.js new file mode 100644 index 0000000..6b3fb71 --- /dev/null +++ b/test/collectionMetadata.js @@ -0,0 +1,119 @@ +const fs = require('fs'); + +const test = require('basictap'); +const httpRequest = require('./helpers/httpRequest'); +const createTestCluster = require('./helpers/createTestCluster'); + +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); + +const tls = { + key: fs.readFileSync('./certs/localhost.privkey.pem'), + cert: fs.readFileSync('./certs/localhost.cert.pem'), + ca: [fs.readFileSync('./certs/ca.cert.pem')], + requestCert: true +}; + +test('collectionMetadata - create a record', async t => { + t.plan(5); + + const cluster = await createTestCluster(3, tls); + + const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, { + method: 'POST', + data: { + a: 1, + b: 2, + c: 3 + } + }); + + await sleep(100); + + const metadataRequest = await httpRequest(`${cluster.nodes[1].url}/system.collections`, { + method: 'GET' + }); + + t.equal(metadataRequest.data[0].documentCount, 1); + + const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, { + method: 'DELETE' + }); + + const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`); + + cluster.closeAll(); + + t.deepEqual(getRequest.data, {}); + + t.equal(postRequest.status, 201); + t.equal(deleteRequest.status, 200); + t.equal(getRequest.status, 404); +}); + +test('collectionMetadata - delete a record before debarrel', async t => { + t.plan(5); + + const cluster = await createTestCluster(3, tls); + + const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, { + method: 'POST', + data: { + a: 1, + b: 2, + c: 3 + } + }); + + const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, { + method: 'DELETE' + }); + + await sleep(100); + + const metadataRequest = await httpRequest(`${cluster.nodes[1].url}/system.collections`, { + method: 'GET' + }); + + t.equal(metadataRequest.data[0].documentCount, 0); + + const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`); + + cluster.closeAll(); + + t.deepEqual(getRequest.data, {}); + + t.equal(postRequest.status, 201); + t.equal(deleteRequest.status, 200); + t.equal(getRequest.status, 404); +}); + +test('collectionMetadata - delete a record after tick', async t => { + t.plan(4); + + const cluster = await createTestCluster(3, tls); + + const postRequest = await httpRequest(`${cluster.nodes[1].url}/tests`, { + method: 'POST', + data: { + a: 1, + b: 2, + c: 3 + } + }); + + await sleep(300); + + const deleteRequest = await httpRequest(`${cluster.nodes[1].url}/tests/${postRequest.data.id}`, { + method: 'DELETE' + }); + + const getRequest = await httpRequest(`${cluster.nodes[2].url}/tests/${postRequest.data.id}`); + + cluster.closeAll(); + + t.deepEqual(getRequest.data, {}); + + t.equal(postRequest.status, 201); + t.equal(deleteRequest.status, 200); + t.equal(getRequest.status, 404); +}); diff --git a/test/index.js b/test/index.js index 58f0620..4199621 100644 --- a/test/index.js +++ b/test/index.js @@ -3,3 +3,4 @@ require('./client'); require('./notify'); require('./locking'); require('./integration'); +require('./collectionMetadata');