diff --git a/caching.js b/caching.js index 68af834ba..fae9a915a 100644 --- a/caching.js +++ b/caching.js @@ -125,9 +125,18 @@ export const CachingStore = (Store, env) => { // don't cache binary data, since it will be decoded on get this.cache.delete(id); return result; - } - // sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed - let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1); + } + let entry; + if (result?.isSync) { + // sync operation, immediately add to cache + if (result.result) // if it succeeds + entry = this.cache.setValue(id, value, 0); + else { + this.cache.delete(id); + return result; + } // sync failure + // otherwise keep it pinned in memory until it is committed + } else entry = this.cache.setValue(id, value, -1); if (childTxnChanges) childTxnChanges.add(id); if (version !== undefined) @@ -136,19 +145,20 @@ export const CachingStore = (Store, env) => { return result; } putSync(id, value, version, ifVersion) { + let result = super.putSync(id, value, version, ifVersion); if (id !== 'object') { // sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed - if (value && typeof value === 'object') { + if (value && typeof value === 'object' || !result) { let entry = this.cache.setValue(id, value); if (childTxnChanges) childTxnChanges.add(id); if (version !== undefined) { entry.version = typeof version === 'object' ? version.version : version; } - } else // it is possible that a value used to exist here + } else // it is possible that a value used to exist here this.cache.delete(id); } - return super.putSync(id, value, version, ifVersion); + return result; } remove(id, ifVersion) { this.cache.delete(id); diff --git a/dependencies/lmdb/libraries/liblmdb/lmdb.h b/dependencies/lmdb/libraries/liblmdb/lmdb.h index 217acb619..0acd5c594 100644 --- a/dependencies/lmdb/libraries/liblmdb/lmdb.h +++ b/dependencies/lmdb/libraries/liblmdb/lmdb.h @@ -1040,12 +1040,20 @@ int mdb_env_set_userctx(MDB_env *env, void *ctx); */ void *mdb_env_get_userctx(MDB_env *env); - /** @brief A callback function for most LMDB assert() failures, - * called before printing the message and aborting. - * - * @param[in] env An environment handle returned by #mdb_env_create(). - * @param[in] msg The assertion message, not including newline. - */ +/** @brief Get the metrics information associated with the #MDB_env. + * + * @param[in] env An environment handle returned by #mdb_env_create() + * @return The pointer set by #mdb_env_set_userctx(). + */ +MDB_metrics *mdb_env_get_metrics(MDB_env *env); + +/** @brief A callback function for most LMDB assert() failures, + * called before printing the message and aborting. + * + * @param[in] env An environment handle returned by #mdb_env_create(). + * @param[in] msg The assertion message, not including newline. + */ + typedef void MDB_assert_func(MDB_env *env, const char *msg); /** Set or reset the assert() callback of the environment. @@ -1447,6 +1455,7 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx); * */ int mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, mdb_size_t *txn_id); +int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, unsigned int offset, MDB_val *data); int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index 7c2e7b002..bbf38f223 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -1691,6 +1691,7 @@ struct MDB_env { MDB_val me_enckey; /**< key for env encryption */ #endif void *me_userctx; /**< User-settable context */ + MDB_metrics me_metrics; /**< Metrics tracking */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ void *me_callback; /**< General callback */ int64_t boot_id; @@ -3120,7 +3121,7 @@ mdb_env_sync0(MDB_env *env, int force, pgno_t numpgs) } } if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_sync += get_time64() - start; + env->me_metrics.time_sync += get_time64() - start; } return rc; } @@ -3390,7 +3391,7 @@ mdb_txn_renew0(MDB_txn *txn) } else { /* Not yet touching txn == env->me_txn0, it may be active */ if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->clock_txn = get_time64(); + env->me_metrics.clock_txn = get_time64(); } if (ti) { if (LOCK_MUTEX(rc, env, env->me_wmutex)) @@ -3403,8 +3404,8 @@ mdb_txn_renew0(MDB_txn *txn) } if (env->me_flags & MDB_TRACK_METRICS) { uint64_t now = get_time64(); - ((MDB_metrics*) env->me_userctx)->time_start_txns += now - ((MDB_metrics*) env->me_userctx)->clock_txn; - ((MDB_metrics*) env->me_userctx)->clock_txn = now; + env->me_metrics.time_start_txns += now - env->me_metrics.clock_txn; + env->me_metrics.clock_txn = now; } txn->mt_txnid++; #if MDB_DEBUG @@ -4347,9 +4348,9 @@ mdb_page_flush(MDB_txn *txn, int keep) */ CACHEFLUSH(env->me_map, txn->mt_next_pgno * env->me_psize, DCACHE); if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->writes += write_i; - ((MDB_metrics*) env->me_userctx)->page_flushes++; - ((MDB_metrics*) env->me_userctx)->pages_written += pagecount - keep; + env->me_metrics.writes += write_i; + env->me_metrics.page_flushes++; + env->me_metrics.pages_written += pagecount - keep; } #ifdef _WIN32 @@ -4393,7 +4394,7 @@ mdb_page_flush(MDB_txn *txn, int keep) txn->mt_dirty_room += i - j; dl[0].mid = j; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_page_flushes += get_time64() - start; + env->me_metrics.time_page_flushes += get_time64() - start; } return MDB_SUCCESS; } @@ -4663,8 +4664,8 @@ mdb_txn_commit(MDB_txn *txn) done: // if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_during_txns += get_time64() - ((MDB_metrics*) env->me_userctx)->clock_txn; - ((MDB_metrics*) env->me_userctx)->txns++; + env->me_metrics.time_during_txns += get_time64() - env->me_metrics.clock_txn; + env->me_metrics.txns++; } if ((txn->mt_flags & MDB_NOSYNC) && (env->me_flags & MDB_OVERLAPPINGSYNC)) { MDB_txn sync_txn; @@ -7922,6 +7923,38 @@ mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_CURSOR_UNREF(&mc, 1); return rc; } +int +mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, unsigned int offset, MDB_val *data) +{ + if (txn->mt_env->me_flags & MDB_REMAP_CHUNKS) return -1; + MDB_val existing_data; + int rc = mdb_get_with_txn(txn, dbi, key, &existing_data, NULL); + if (rc == 0) { + if (data->mv_size > existing_data.mv_size) { + last_error = malloc(100); + sprintf(last_error, "Attempt to direct write beyond the size of the value"); + return EINVAL; + } + MDB_env* env = txn->mt_env; + mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map + offset; + // if we discover that a direct write can only be safely atomically applied to a memory map if it fits into + // single word, verify that here on some OSes, we can apply logic here: + //if (file_offset >> 3 != (file_offset + data->mv_size - 1) >> 3) + // return -1; +#ifdef _WIN32 + DWORD written; + OVERLAPPED ov; + memset(&ov, 0, sizeof(ov)); + ov.Offset = file_offset; + rc = WriteFile(env->me_fd, data->mv_data, data->mv_size, &written, &ov); +#else + int written = pwrite(env->me_fd, data->mv_data, data->mv_size, file_offset); +#endif; + if (written < 0) rc = written; + } + return rc; +} /** Find a sibling for a page. * Replaces the page at the top of the cursor's stack with the @@ -10707,7 +10740,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, } MDB_env* env = txn->mt_env; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->deletes++; + env->me_metrics.deletes++; } return mdb_del0(txn, dbi, key, data, 0); } @@ -11202,7 +11235,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; MDB_env* env = txn->mt_env; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->puts++; + env->me_metrics.puts++; } mdb_cursor_init(&mc, txn, dbi, &mx); @@ -11803,12 +11836,19 @@ mdb_env_set_userctx(MDB_env *env, void *ctx) return MDB_SUCCESS; } + void * ESECT mdb_env_get_userctx(MDB_env *env) { return env ? env->me_userctx : NULL; } +MDB_metrics * ESECT +mdb_env_get_metrics(MDB_env *env) +{ + return env ? &env->me_metrics : NULL; +} + int ESECT mdb_env_set_assert(MDB_env *env, MDB_assert_func *func) { diff --git a/index.js b/index.js index f181c14dd..69b957233 100644 --- a/index.js +++ b/index.js @@ -25,12 +25,15 @@ import { levelup } from './level.js'; export { clearKeptObjects } from './native.js'; import { nativeAddon } from './native.js'; export let { noop } = nativeAddon; +export const TIMESTAMP_PLACEHOLDER = new Uint8Array([1,1,1,1,0,0,0,0]); +export const DIRECT_WRITE_PLACEHOLDER = new Uint8Array([1,1,1,2,0,0,0,0]); export { open, openAsClass, getLastVersion, allDbs, getLastTxnId } from './open.js'; import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary'; import { open, openAsClass, getLastVersion } from './open.js'; export const TransactionFlags = { ABORTABLE: 1, SYNCHRONOUS_COMMIT: 2, + NO_SYNC_FLUSH: 0x10000, }; export default { diff --git a/native.js b/native.js index fe462e7e5..65d93e81b 100644 --- a/native.js +++ b/native.js @@ -1,7 +1,7 @@ import { dirname, join, default as pathModule } from 'path'; import { fileURLToPath } from 'url'; import loadNAPI from 'node-gyp-build-optional-packages'; -export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress; +export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress, directWrite, attemptLock, unlock; path = pathModule; let dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, ''); export let nativeAddon = loadNAPI(dirName); @@ -61,6 +61,9 @@ export function setNativeFunctions(externals) { iterate = externals.iterate; position = externals.position; resetTxn = externals.resetTxn; + directWrite = externals.directWrite; + attemptLock = externals.attemptLock; + unlock = externals.unlock; getCurrentValue = externals.getCurrentValue; getCurrentShared = externals.getCurrentShared; getStringByBinary = externals.getStringByBinary; diff --git a/package.json b/package.json index f16d041d5..59b513d01 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.8.5", + "version": "2.9.0-beta.1", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -62,13 +62,13 @@ "build-js": "rollup -c", "prepare": "rollup -c", "before-publish": "rollup -c && prebuildify-ci download && node util/set-optional-deps.cjs && npm run test", - "prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 16.18.0", - "prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --tag-libc --target 18.15.0 && prebuildify-platform-packages --platform-packages --tag-libc --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0", - "prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --platform-packages --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 16.18.0", - "prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --target 16.18.0 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 16.18.0", - "prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0", - "prebuildify": "prebuildify-platform-packages --napi --target 18.15.0", - "full-publish": "cd prebuilds/win32-x64 && npm publish --access public && cd ../darwin-x64 && npm publish --access public && cd ../darwin-arm64 && npm publish --access public && cd ../linux-x64 && npm publish --access public && cd ../linux-arm64 && npm publish --access public && cd ../linux-arm && npm publish --access public && cd ../.. && npm publish", + "prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 18.17.1", + "prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --platform-packages --tag-libc --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1", + "prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --platform-packages --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 18.17.1", + "prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.17.1 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 18.17.1", + "prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1", + "prebuildify": "prebuildify-platform-packages --napi --target 18.17.1", + "full-publish": "cd prebuilds/win32-x64 && npm publish --tag dev --access public && cd ../darwin-x64 && npm publish --tag dev --access public && cd ../darwin-arm64 && npm publish --tag dev --access public && cd ../linux-x64 && npm publish --tag dev --access public && cd ../linux-arm64 && npm publish --tag dev --access public && cd ../linux-arm && npm publish --tag dev --access public && cd ../.. && npm publish --tag dev", "recompile": "node-gyp clean && node-gyp configure && node-gyp build", "test": "mocha test/**.test.js --expose-gc --recursive", "deno-test": "deno run --allow-ffi --allow-write --allow-read --allow-env --allow-net --unstable test/deno.ts", @@ -78,7 +78,7 @@ }, "gypfile": true, "dependencies": { - "msgpackr": "^1.9.5", + "msgpackr": "^1.9.9", "node-addon-api": "^6.1.0", "node-gyp-build-optional-packages": "5.1.1", "ordered-binary": "^1.4.1", diff --git a/read.js b/read.js index af8820049..b0f245de0 100644 --- a/read.js +++ b/read.js @@ -1,5 +1,27 @@ import { RangeIterable } from './util/RangeIterable.js'; -import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback } from './native.js'; +import { + getAddress, + Cursor, + Txn, + orderedBinary, + lmdbError, + getByBinary, + setGlobalBuffer, + prefetch, + iterate, + position as doPosition, + resetTxn, + getCurrentValue, + getCurrentShared, + getStringByBinary, + globalBuffer, + getSharedBuffer, + startRead, + setReadCallback, + directWrite, + attemptLock, + unlock +} from './native.js'; import { saveKey } from './keys.js'; const IF_EXISTS = 3.542694326329068e-103; const ITERATOR_DONE = { done: true, value: undefined }; @@ -259,6 +281,34 @@ export function addReadMethods(LMDBStore, { }; } }, + + directWrite(id, options) { + let rc; + let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); + let keySize = this.writeKey(id, keyBytes, 0); + let dataOffset = (((keySize >> 3) + 1) << 3); + keyBytes.set(options.bytes, dataOffset) + rc = directWrite(this.dbAddress, keySize, options.offset, options.bytes.length, txn.address || 0); + if (rc < 0) lmdbError(rc); + }, + + attemptLock(id, version, callback) { + keyBytes.dataView.setUint32(0, this.db.dbi); + keyBytes.dataView.setFloat64(4, version); + let keySize = this.writeKey(id, keyBytes, 12); + return attemptLock(env.address, keySize, callback); + }, + + unlock(id, version, onlyCheck) { + keyBytes.dataView.setUint32(0, this.db.dbi); + keyBytes.dataView.setFloat64(4, version); + let keySize = this.writeKey(id, keyBytes, 12); + return unlock(env.address, keySize, onlyCheck); + }, + hasLock(id, version) { + return this.unlock(id, version, true); + }, + resetReadTxn() { resetReadTxn(); }, diff --git a/src/compression.cpp b/src/compression.cpp index 1fcd41a87..7d3ca1892 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -9,6 +9,7 @@ Compression::Compression(const CallbackInfo& info) : ObjectWrap(inf unsigned int compressionThreshold = 1000; char* dictionary = nullptr; size_t dictSize = 0; + unsigned int startingOffset = 0; if (info[0].IsObject()) { auto dictionaryOption = info[0].As().Get("dictionary"); if (!dictionaryOption.IsUndefined()) { @@ -20,10 +21,13 @@ Compression::Compression(const CallbackInfo& info) : ObjectWrap(inf dictSize = (dictSize >> 3) << 3; // make sure it is word-aligned } auto thresholdOption = info[0].As().Get("threshold"); - if (thresholdOption.IsNumber()) { + if (thresholdOption.IsNumber()) compressionThreshold = thresholdOption.As(); - } + auto offsetOption = info[0].As().Get("startingOffset"); + if (offsetOption.IsNumber()) + startingOffset = offsetOption.As(); } + this->startingOffset = startingOffset; this->dictionary = this->compressDictionary = dictionary; this->dictionarySize = dictSize; this->decompressTarget = dictionary + dictSize; @@ -47,7 +51,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { uint32_t uncompressedLength; int compressionHeaderSize; uint32_t compressedLength = data.mv_size; - unsigned char* charData = (unsigned char*) data.mv_data; + void* originalData = data.mv_data; + unsigned char* charData = (unsigned char*) data.mv_data + startingOffset; if (charData[0] == 254) { uncompressedLength = ((uint32_t)charData[1] << 16) | ((uint32_t)charData[2] << 8) | (uint32_t)charData[3]; @@ -65,7 +70,7 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { return; } data.mv_data = decompressTarget; - data.mv_size = uncompressedLength; + data.mv_size = uncompressedLength + startingOffset; //TODO: For larger blocks with known encoding, it might make sense to allocate space for it and use an ExternalString //fprintf(stdout, "compressed size %u uncompressedLength %u, first byte %u\n", data.mv_size, uncompressedLength, charData[compressionHeaderSize]); if (uncompressedLength > decompressSize) { @@ -73,8 +78,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { return; } int written = LZ4_decompress_safe_usingDict( - (char*)charData + compressionHeaderSize, decompressTarget, - compressedLength - compressionHeaderSize, decompressSize, + (char*)charData + compressionHeaderSize, decompressTarget + startingOffset, + compressedLength - compressionHeaderSize - startingOffset, decompressSize, dictionary, dictionarySize); //fprintf(stdout, "first uncompressed byte %X %X %X %X %X %X\n", uncompressedData[0], uncompressedData[1], uncompressedData[2], uncompressedData[3], uncompressedData[4], uncompressedData[5]); if (written < 0) { @@ -87,6 +92,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { isValid = false; return; } + if (startingOffset) + memcpy(decompressTarget, originalData, startingOffset); isValid = true; } @@ -114,23 +121,26 @@ int Compression::compressInstruction(EnvWrap* env, double* compressionAddress) { } argtokey_callback_t Compression::compress(MDB_val* value, void (*freeValue)(MDB_val&)) { - size_t dataLength = value->mv_size; + size_t dataLength = value->mv_size - startingOffset; char* data = (char*)value->mv_data; if (value->mv_size < compressionThreshold && !(value->mv_size > 0 && ((uint8_t*)data)[0] >= 250)) return freeValue; // don't compress if less than threshold (but we must compress if the first byte is the compression indicator) bool longSize = dataLength >= 0x1000000; - int prefixSize = (longSize ? 8 : 4); + int prefixSize = (longSize ? 8 : 4) + startingOffset; int maxCompressedSize = LZ4_COMPRESSBOUND(dataLength); char* compressed = new char[maxCompressedSize + prefixSize]; //fprintf(stdout, "compressing %u\n", dataLength); if (!stream) stream = LZ4_createStream(); LZ4_loadDict(stream, compressDictionary, dictionarySize); + // TODO: Add in offset here int compressedSize = LZ4_compress_fast_continue(stream, data, compressed + prefixSize, dataLength, maxCompressedSize, acceleration); if (compressedSize > 0) { + if (startingOffset > 0) // copy the uncompressed prefix + memcpy(compressed, data, startingOffset); if (freeValue) freeValue(*value); - uint8_t* compressedData = (uint8_t*)compressed; + uint8_t* compressedData = (uint8_t*)compressed + startingOffset; if (longSize) { compressedData[0] = 255; compressedData[2] = (uint8_t)(dataLength >> 40u); diff --git a/src/dbi.cpp b/src/dbi.cpp index 65b5cf910..a0730c0d5 100644 --- a/src/dbi.cpp +++ b/src/dbi.cpp @@ -174,6 +174,35 @@ int32_t DbiWrap::doGetByBinary(uint32_t keySize, uint32_t ifNotTxnId, int64_t tx } } +NAPI_FUNCTION(directWrite) { + ARGS(5) + GET_INT64_ARG(0); + DbiWrap* dw = (DbiWrap*) i64; + uint32_t keySize; + GET_UINT32_ARG(keySize, 1); + uint32_t offset; + GET_UINT32_ARG(offset, 2); + uint32_t dataSize; + GET_UINT32_ARG(dataSize, 3); + int64_t txnAddress = 0; + napi_status status = napi_get_value_int64(env, args[4], &txnAddress); + if (dw->hasVersions) offset += 8; + EnvWrap* ew = dw->ew; + char* keyBuffer = ew->keyBuffer; + MDB_txn* txn = ew->getReadTxn(txnAddress); + MDB_val key, data; + key.mv_size = keySize; + key.mv_data = (void*) keyBuffer; + data.mv_size = dataSize; + data.mv_data = (void*) (keyBuffer + (((keySize >> 3) + 1) << 3)); +#ifdef MDB_RPAGE_CACHE + int result = mdb_direct_write(txn, dw->dbi, &key, offset, &data); +#else + int result = -1; +#endif + RETURN_INT32(result); +} + NAPI_FUNCTION(getByBinary) { ARGS(4) GET_INT64_ARG(0); @@ -247,8 +276,7 @@ NAPI_FUNCTION(getStringByBinary) { } int DbiWrap::prefetch(uint32_t* keys) { - MDB_txn* txn; - mdb_txn_begin(ew->env, nullptr, MDB_RDONLY, &txn); + MDB_txn* txn = ExtendedEnv::getPrefetchReadTxn(ew->env); MDB_val key; MDB_val data; unsigned int flags; @@ -258,8 +286,10 @@ int DbiWrap::prefetch(uint32_t* keys) { bool findDataValue = false; MDB_cursor *cursor; int rc = mdb_cursor_open(txn, dbi, &cursor); - if (rc) + if (rc) { + ExtendedEnv::donePrefetchReadTxn(txn); return rc; + } while((key.mv_size = *keys++) > 0) { if (key.mv_size == 0xffffffff) { @@ -298,7 +328,7 @@ int DbiWrap::prefetch(uint32_t* keys) { } } mdb_cursor_close(cursor); - mdb_txn_abort(txn); + ExtendedEnv::donePrefetchReadTxn(txn); return effected; } @@ -346,6 +376,7 @@ void DbiWrap::setupExports(Napi::Env env, Object exports) { DbiWrap::InstanceMethod("stat", &DbiWrap::stat), }); exports.Set("Dbi", DbiClass); + EXPORT_NAPI_FUNCTION("directWrite", directWrite); EXPORT_NAPI_FUNCTION("getByBinary", getByBinary); EXPORT_NAPI_FUNCTION("prefetch", prefetchNapi); EXPORT_NAPI_FUNCTION("getStringByBinary", getStringByBinary); diff --git a/src/env.cpp b/src/env.cpp index c4d9e083c..afee5b3e5 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -9,6 +9,9 @@ using namespace Napi; #define IGNORE_NOTFOUND (1) + +MDB_txn* ExtendedEnv::prefetchTxns[20]; +pthread_mutex_t* ExtendedEnv::prefetchTxnsLock; env_tracking_t* EnvWrap::envTracking = EnvWrap::initTracking(); thread_local std::vector* EnvWrap::openEnvWraps = nullptr; thread_local js_buffers_t* EnvWrap::sharedBuffers = nullptr; @@ -18,6 +21,8 @@ void* getSharedBuffers() { } env_tracking_t* EnvWrap::initTracking() { + ExtendedEnv::prefetchTxnsLock = new pthread_mutex_t; + pthread_mutex_init(ExtendedEnv::prefetchTxnsLock, nullptr); env_tracking_t* tracking = new env_tracking_t; tracking->envsLock = new pthread_mutex_t; pthread_mutex_init(tracking->envsLock, nullptr); @@ -293,7 +298,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, this->compression = compression; this->jsFlags = jsFlags; #ifdef MDB_OVERLAPPINGSYNC - MDB_metrics* metrics; + ExtendedEnv* extended_env; #endif int rc; rc = mdb_env_set_maxdbs(env, maxDbs); @@ -327,13 +332,8 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, flags |= MDB_PREVSNAPSHOT; } mdb_env_set_callback(env, checkExistingEnvs); - trackMetrics = !!(flags & MDB_TRACK_METRICS); - if (trackMetrics) { - metrics = new MDB_metrics; - memset(metrics, 0, sizeof(MDB_metrics)); - rc = mdb_env_set_userctx(env, (void*) metrics); - if (rc) goto fail; - } + extended_env = new ExtendedEnv(); + mdb_env_set_userctx(env, extended_env); #endif timeTxnWaiting = 0; @@ -346,9 +346,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, if (rc != 0) { #ifdef MDB_OVERLAPPINGSYNC - if (trackMetrics) { - delete metrics; - } + delete extended_env; #endif if (rc == EXISTING_ENV_FOUND) { mdb_env_close(env); @@ -529,7 +527,7 @@ NAPI_FUNCTION(getTestRef) { return returnValue; } -NAPI_FUNCTION(directWrite) { +/*NAPI_FUNCTION(directWrite) { ARGS(4) GET_INT64_ARG(0); EnvWrap* ew = (EnvWrap*) i64; @@ -556,7 +554,7 @@ NAPI_FUNCTION(directWrite) { } RETURN_UNDEFINED; } - +*/ int32_t EnvWrap::toSharedBuffer(MDB_env* env, uint32_t* keyBuffer, MDB_val data) { unsigned int flags; mdb_env_get_flags(env, (unsigned int*) &flags); @@ -615,9 +613,24 @@ int32_t EnvWrap::toSharedBuffer(MDB_env* env, uint32_t* keyBuffer, MDB_val data return -30001; } +void notifyCallbacks(std::vector callbacks); + void EnvWrap::closeEnv(bool hasLock) { if (!env) return; + // unlock any record locks held by this thread/EnvWrap + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env); + pthread_mutex_lock(&extended_env->locksModificationLock); + auto it = extended_env->lock_callbacks.begin(); + while (it != extended_env->lock_callbacks.end()) + { + if (it->second.ew == this) { + notifyCallbacks(it->second.callbacks); + it = extended_env->lock_callbacks.erase(it); + } else ++it; + } + pthread_mutex_unlock(&extended_env->locksModificationLock); + if (openEnvWraps) { for (auto ewRef = openEnvWraps->begin(); ewRef != openEnvWraps->end(); ) { if (*ewRef == this) { @@ -638,15 +651,14 @@ void EnvWrap::closeEnv(bool hasLock) { envPath->hasWrites = true; if (envPath->count <= 0) { // last thread using it, we can really close it now + ExtendedEnv::removeReadTxns(env); unsigned int envFlags; // This is primarily useful for detecting termination of threads and sync'ing on their termination mdb_env_get_flags(env, &envFlags); #ifdef MDB_OVERLAPPINGSYNC if ((envFlags & MDB_OVERLAPPINGSYNC) && envPath->hasWrites) { mdb_env_sync(env, 1); } - if (envFlags & MDB_TRACK_METRICS) { - delete (MDB_metrics*) mdb_env_get_userctx(env); - } + delete (ExtendedEnv*) mdb_env_get_userctx(env); #endif char* path; mdb_env_get_path(env, (const char**)&path); @@ -750,8 +762,10 @@ Napi::Value EnvWrap::info(const CallbackInfo& info) { stats.Set("maxReaders", Number::New(info.Env(), envinfo.me_maxreaders)); stats.Set("numReaders", Number::New(info.Env(), envinfo.me_numreaders)); #ifdef MDB_OVERLAPPINGSYNC - if (this->trackMetrics) { - MDB_metrics* metrics = (MDB_metrics*) mdb_env_get_userctx(this->env); + unsigned int envFlags; + mdb_env_get_flags(env, &envFlags); + if (envFlags & MDB_TRACK_METRICS) { + MDB_metrics* metrics = (MDB_metrics*) mdb_env_get_metrics(this->env); stats.Set("timeStartTxns", Number::New(info.Env(), (double) metrics->time_start_txns / TICKS_PER_SECOND)); stats.Set("timeDuringTxns", Number::New(info.Env(), (double) metrics->time_during_txns / TICKS_PER_SECOND)); stats.Set("timePageFlushes", Number::New(info.Env(), (double) metrics->time_page_flushes / TICKS_PER_SECOND)); @@ -781,9 +795,9 @@ Napi::Value EnvWrap::readerCheck(const CallbackInfo& info) { return Number::New(info.Env(), dead); } -Array readerStrings; +thread_local Array* readerStrings = nullptr; MDB_msg_func* printReaders = ([](const char* message, void* env) -> int { - readerStrings.Set(readerStrings.Length(), String::New(*(Env*)env, message)); + readerStrings->Set(readerStrings->Length(), String::New(*(Env*)env, message)); return 0; }); @@ -791,14 +805,15 @@ Napi::Value EnvWrap::readerList(const CallbackInfo& info) { if (!this->env) { return throwError(info.Env(), "The environment is already closed."); } - readerStrings = Array::New(info.Env()); + Array reader_strings = Array::New(info.Env()); + readerStrings = &reader_strings; int rc; Napi::Env env = info.Env(); rc = mdb_reader_list(this->env, printReaders, &env); if (rc != 0) { return throwLmdbError(info.Env(), rc); } - return readerStrings; + return reader_strings; } @@ -968,7 +983,147 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) { } return rc; } +ExtendedEnv::ExtendedEnv() { + pthread_mutex_init(&locksModificationLock, nullptr); +} +ExtendedEnv::~ExtendedEnv() { + pthread_mutex_destroy(&locksModificationLock); +} +uint64_t ExtendedEnv::getNextTime() { + uint64_t next_time_int = next_time_double(); + if (next_time_int == lastTime) next_time_int++; + return bswap_64(lastTime = next_time_int); +} +uint64_t ExtendedEnv::getLastTime() { + return bswap_64(lastTime); +} +bool ExtendedEnv::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) { + pthread_mutex_lock(&locksModificationLock); + auto resolution = lock_callbacks.find(key); + bool found; + if (resolution == lock_callbacks.end()) { + callback_holder_t callbacks; + callbacks.ew = ew; + lock_callbacks.emplace(key, callbacks); + found = true; + } else { + if (has_callback) { + napi_threadsafe_function callback; + napi_value resource; + napi_status status; + status = napi_create_object(env, &resource); + napi_value resource_name; + status = napi_create_string_latin1(env, "lock", NAPI_AUTO_LENGTH, &resource_name); + napi_create_threadsafe_function(env, func, resource, resource_name, 0, 1, nullptr, nullptr, nullptr, nullptr, + &callback); + napi_unref_threadsafe_function(env, callback); + resolution->second.callbacks.push_back(callback); + } + found = false; + } + pthread_mutex_unlock(&locksModificationLock); + return found; +} +NAPI_FUNCTION(attemptLock) { + ARGS(3) + GET_INT64_ARG(0) + EnvWrap* ew = (EnvWrap*) i64; + uint32_t size; + GET_UINT32_ARG(size, 1); + napi_value as_bool; + napi_coerce_to_bool(env, args[2], &as_bool); + bool has_callback; + napi_get_value_bool(env, as_bool, &has_callback); + ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env); + std::string key(ew->keyBuffer, size); + bool result = lock_callbacks->attemptLock(key, env, args[2], has_callback, ew); + napi_value return_value; + napi_get_boolean(env, result, &return_value); + return return_value; +} +bool ExtendedEnv::unlock(std::string key, bool only_check) { + pthread_mutex_lock(&locksModificationLock); + auto resolution = lock_callbacks.find(key); + if (resolution == lock_callbacks.end()) { + pthread_mutex_unlock(&locksModificationLock); + return false; + } + if (!only_check) { + notifyCallbacks(resolution->second.callbacks); + lock_callbacks.erase(resolution); + } + pthread_mutex_unlock(&locksModificationLock); + return true; +} +void notifyCallbacks(std::vector callbacks) { + for (auto callback = callbacks.begin(); callback != callbacks.end();) { + napi_call_threadsafe_function(*callback, nullptr, napi_tsfn_blocking); + napi_release_threadsafe_function(*callback, napi_tsfn_release); + callback++; + } +} +NAPI_FUNCTION(unlock) { + ARGS(3) + GET_INT64_ARG(0); + EnvWrap* ew = (EnvWrap*) i64; + uint32_t size; + GET_UINT32_ARG(size, 1); + bool only_check = false; + napi_get_value_bool(env, args[2], &only_check); + ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env); + std::string key(ew->keyBuffer, size); + bool result = lock_callbacks->unlock(key, only_check); + napi_value return_value; + napi_get_boolean(env, result, &return_value); + return return_value; +} +MDB_txn* ExtendedEnv::getPrefetchReadTxn(MDB_env* env) { + MDB_txn* txn; + pthread_mutex_lock(prefetchTxnsLock); + // try to find an existing txn for this env + for (int i = 0; i < 20; i++) { + txn = prefetchTxns[i]; + if (txn && mdb_txn_env(txn) == env) { + mdb_txn_renew(txn); + prefetchTxns[i] = nullptr; // remove it, no one else can use it + pthread_mutex_unlock(prefetchTxnsLock); + return txn; + } + } + pthread_mutex_unlock(prefetchTxnsLock); + // couldn't find one, need to create a new transaction + mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + return txn; +} +void ExtendedEnv::donePrefetchReadTxn(MDB_txn* txn) { + mdb_txn_reset(txn); + pthread_mutex_lock(prefetchTxnsLock); + // reinsert this transaction + MDB_txn* moving; + for (int i = 0; i < 20; i++) { + moving = prefetchTxns[i]; + prefetchTxns[i] = txn; + if (!moving) break; + txn = moving; + } + // if we are full and one has to be removed, abort it + if (moving) mdb_txn_abort(moving); + pthread_mutex_unlock(prefetchTxnsLock); +} + +void ExtendedEnv::removeReadTxns(MDB_env* env) { + pthread_mutex_lock(prefetchTxnsLock); + MDB_txn* txn; + for (int i = 0; i < 20; i++) { + txn = prefetchTxns[i]; + if (txn && mdb_txn_env(txn) == env) { + mdb_txn_abort(txn); + prefetchTxns[i] = nullptr; + } + } + pthread_mutex_unlock(prefetchTxnsLock); +} void EnvWrap::setupExports(Napi::Env env, Object exports) { // EnvWrap: Prepare constructor template @@ -1000,6 +1155,8 @@ void EnvWrap::setupExports(Napi::Env env, Object exports) { EXPORT_NAPI_FUNCTION("getSharedBuffer", getSharedBuffer); EXPORT_NAPI_FUNCTION("setTestRef", setTestRef); EXPORT_NAPI_FUNCTION("getTestRef", getTestRef); + EXPORT_NAPI_FUNCTION("attemptLock", attemptLock); + EXPORT_NAPI_FUNCTION("unlock", unlock); EXPORT_FUNCTION_ADDRESS("writePtr", writeFFI); //envTpl->InstanceTemplate()->SetInternalFieldCount(1); exports.Set("Env", EnvClass); diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 40a14e8ce..3dbe92039 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -18,6 +18,10 @@ using namespace Napi; // set the threshold of when to use shared buffers (for uncompressed entries larger than this value) const size_t SHARED_BUFFER_THRESHOLD = 0x4000; +const uint32_t SPECIAL_WRITE = 0x10101; +const uint32_t REPLACE_WITH_TIMESTAMP_FLAG = 0x1000000; +const uint32_t REPLACE_WITH_TIMESTAMP = 0x1010101; +const uint32_t DIRECT_WRITE = 0x2000000; #ifndef __CPTHREAD_H__ #define __CPTHREAD_H__ @@ -78,10 +82,22 @@ int pthread_cond_broadcast(pthread_cond_t *cond); const uint64_t TICKS_PER_SECOND = 1000000000; #endif uint64_t get_time64(); +uint64_t next_time_double(); +uint64_t last_time_double(); int cond_init(pthread_cond_t *cond); int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, uint64_t ns); +// if we need to add others: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 +#ifdef _WIN32 + #define bswap_64(x) _byteswap_uint64(x) +#elif defined(__APPLE__) + #include + #define bswap_64(x) OSSwapInt64(x) +#else + #include // bswap_64 +#endif + #endif /* __CPTHREAD_H__ */ class Logging { @@ -258,6 +274,30 @@ typedef struct js_buffers_t { // there is one instance of this for each JS (work pthread_mutex_t modification_lock; } js_buffers_t; + +typedef struct callback_holder_t { + EnvWrap* ew; + std::vector callbacks; +} callback_holder_t; +class ExtendedEnv { +public: + ExtendedEnv(); + ~ExtendedEnv(); + static MDB_txn* prefetchTxns[20]; + static pthread_mutex_t* prefetchTxnsLock; + std::unordered_map lock_callbacks; + pthread_mutex_t locksModificationLock; + uint64_t lastTime; // actually encoded as double + uint64_t previousTime; // actually encoded as double + bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew); + bool unlock(std::string key, bool only_check); + uint64_t getNextTime(); + uint64_t getLastTime(); + static MDB_txn* getPrefetchReadTxn(MDB_env* env); + static void donePrefetchReadTxn(MDB_txn* txn); + static void removeReadTxns(MDB_env* env); +}; + class EnvWrap : public ObjectWrap { private: // List of open read transactions @@ -294,7 +334,6 @@ class EnvWrap : public ObjectWrap { WriteWorker* writeWorker; bool readTxnRenewed; bool hasWrites; - bool trackMetrics; uint64_t timeTxnWaiting; unsigned int jsFlags; char* keyBuffer; @@ -541,6 +580,7 @@ class Compression : public ObjectWrap { char* decompressTarget; unsigned int decompressSize; unsigned int compressionThreshold; + unsigned int startingOffset; // compression can be configured to start compression at a certain offset, so header bytes are left uncompressed. // compression acceleration (defaults to 1) int acceleration; static thread_local LZ4_stream_t* stream; diff --git a/src/misc.cpp b/src/misc.cpp index efbcc02e3..0c6f40b23 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -113,10 +113,12 @@ int getVersionAndUncompress(MDB_val &data, DbiWrap* dw) { if (data.mv_size == 0) { return 1;// successFunc(data); } - unsigned char statusByte = dw->compression ? charData[0] : 0; + unsigned char statusByte = (dw->compression && dw->compression->startingOffset < data.mv_size) + ? charData[dw->compression->startingOffset] : 0; //fprintf(stdout, "uncompressing status %X\n", statusByte); if (statusByte >= 250) { bool isValid; + dw->compression->decompress(data, isValid, !dw->getFast); return isValid ? 2 : 0; } @@ -428,26 +430,31 @@ Napi::Value throwError(Napi::Env env, const char* message) { return env.Undefined(); } +const int ASSIGN_NEXT_TIMESTAMP = 0; +const int ASSIGN_LAST_TIMESTAMP = 1; +const int ASSIGN_NEXT_TIMESTAMP_AND_RECORD_PREVIOUS = 2; +const int ASSIGN_PREVIOUS_TIMESTAMP = 3; int putWithVersion(MDB_txn * txn, MDB_dbi dbi, MDB_val * key, MDB_val * data, unsigned int flags, double version) { // leave 8 header bytes available for version and copy in with reserved memory - char* sourceData = (char*) data->mv_data; + char* source_data = (char*) data->mv_data; int size = data->mv_size; data->mv_size = size + 8; int rc = mdb_put(txn, dbi, key, data, flags | MDB_RESERVE); if (rc == 0) { // if put is successful, data->mv_data will point into the database where we copy the data to - memcpy((char*) data->mv_data + 8, sourceData, size); + memcpy((char*) data->mv_data + 8, source_data, size); memcpy(data->mv_data, &version, 8); //*((double*) data->mv_data) = version; // this doesn't work on ARM v7 because it is not (guaranteed) memory-aligned } - data->mv_data = sourceData; // restore this so that if it points to data that needs to be freed, it points to the right place + data->mv_data = source_data; // restore this so that if it points to data that needs to be freed, it points to the right place return rc; } +static uint64_t last_time; // actually encoded as double #ifdef _WIN32 @@ -538,6 +545,19 @@ int pthread_cond_broadcast(pthread_cond_t *cond) uint64_t get_time64() { return GetTickCount64(); } +// from: https://github.com/wadey/node-microtime/blob/master/src/microtime.cc#L19 + +uint64_t next_time_double() { + FILETIME ft; + GetSystemTimePreciseAsFileTime(&ft); + unsigned long long t = ft.dwHighDateTime; + t <<= 32; + t |= ft.dwLowDateTime; + t /= 10; + t -= 11644473600000000ULL; + double next_time = (double)t/ 1000; + return *((uint64_t*)&next_time); +} #else int cond_init(pthread_cond_t *cond) { @@ -570,6 +590,12 @@ uint64_t get_time64() { clock_gettime(CLOCK_MONOTONIC, &time); return time.tv_sec * 1000000000ll + time.tv_nsec; } +uint64_t next_time_double() { + struct timespec time; + clock_gettime(CLOCK_REALTIME, &time); + double next_time = (double)time.tv_sec * 1000 + (double)time.tv_nsec / 1000000; + return *((uint64_t*)&next_time); +} #endif // This file contains code from the node-lmdb project diff --git a/src/writer.cpp b/src/writer.cpp index b8ce807a7..c440b0ac5 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -17,6 +17,12 @@ inline value? #ifndef _WIN32 #include #endif +#ifdef _WIN32 +#define ntohl _byteswap_ulong +#define htonl _byteswap_ulong +#else +#include +#endif // flags: const uint32_t NO_INSTRUCTION_YET = 0; @@ -37,6 +43,7 @@ const int CONDITIONAL = 8; const int CONDITIONAL_VERSION = 0x100; const int CONDITIONAL_VERSION_LESS_THAN = 0x800; const int CONDITIONAL_ALLOW_NOTFOUND = 0x1000; +const int ASSIGN_TIMESTAMP = 0x2000; const int SET_VERSION = 0x200; //const int HAS_INLINE_VALUE = 0x400; const int COMPRESSIBLE = 0x100000; @@ -52,7 +59,6 @@ const int FAILED_CONDITION = 0x4000000; const int FINISHED_OPERATION = 0x1000000; const double ANY_VERSION = 3.542694326329068e-103; // special marker for any version - WriteWorker::~WriteWorker() { // TODO: Make sure this runs on the JS main thread, or we need to move it if (envForTxn->writeWorker == this) @@ -115,7 +121,9 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar pthread_cond_signal(envForTxn->writingCond); interruptionStatus = WORKER_WAITING; uint64_t start; - if (envForTxn->trackMetrics) + unsigned int envFlags; + mdb_env_get_flags(env, &envFlags); + if (envFlags & MDB_TRACK_METRICS) start = get_time64(); if (target) { uint64_t delay = 1; @@ -124,9 +132,8 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar delay = delay << 1ll; if ((*target & 0xf) || (allowCommit && finishedProgress)) { // we are in position to continue writing or commit, so forward progress can be made without interrupting yet - if (envForTxn->trackMetrics) { + if (envFlags & MDB_TRACK_METRICS) envForTxn->timeTxnWaiting += get_time64() - start; - } interruptionStatus = 0; return 0; } @@ -134,9 +141,8 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar } else { pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); } - if (envForTxn->trackMetrics) { + if (envFlags & MDB_TRACK_METRICS) envForTxn->timeTxnWaiting += get_time64() - start; - } if (interruptionStatus == INTERRUPT_BATCH) { // interrupted by JS code that wants to run a synchronous transaction interruptionStatus = RESTART_WORKER_TXN; rc = mdb_txn_commit(*txn); @@ -177,7 +183,7 @@ next_inst: start = instruction++; MDB_dbi dbi = 0; //fprintf(stderr, "do %u %u\n", flags, get_time64()); bool validated = conditionDepth == validatedDepth; - if (flags & 0xf0c0) { + if (flags & 0xc0c0) { fprintf(stderr, "Unknown flag bits %u %p\n", flags, start); fprintf(stderr, "flags after message %u\n", *start); worker->resultCode = 22; @@ -281,13 +287,76 @@ next_inst: start = instruction++; } goto next_inst; case PUT: + if (flags & ASSIGN_TIMESTAMP) { + if ((*(uint64_t*)key.mv_data & 0xfffffffful) == REPLACE_WITH_TIMESTAMP) { + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env); + *(uint64_t*)key.mv_data = ((*(uint64_t*)key.mv_data >> 32) & 0x1) ? + extended_env->getLastTime() : extended_env->getNextTime(); + } + uint64_t first_word = *(uint64_t*)value.mv_data; + // 0 assign new time + // 1 assign last assigned time + // 3 assign last recorded previous time + // 4 record previous time + if ((first_word & 0xffffff) == SPECIAL_WRITE) { + if (first_word & REPLACE_WITH_TIMESTAMP_FLAG) { + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env); + uint32_t next_32 = first_word >> 32; + if (next_32 & 4) { + // preserve last timestamp + MDB_val last_data; + rc = mdb_get(txn, dbi, &key, &last_data); + if (rc) break; + if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8; + extended_env->previousTime = *(uint64_t *) last_data.mv_data; + //fprintf(stderr, "previous time %llx \n", previous_time); + } + uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? extended_env->previousTime : extended_env->getLastTime() + : extended_env->getNextTime(); + if (first_word & DIRECT_WRITE) { + // write to second word, which is used by the direct write + *((uint64_t *) value.mv_data + 1) = timestamp ^ (next_32 >> 8); + first_word = first_word & 0xffffffff; // clear out the offset so it is just zero (always must be at the beginning) + } else + *(uint64_t *) value.mv_data = timestamp ^ (next_32 >> 8); + //fprintf(stderr, "set time %llx \n", timestamp); + } + if (first_word & DIRECT_WRITE) { + // direct in-place write + unsigned int offset = first_word >> 32; + if (flags & SET_VERSION) + offset += 8; + MDB_val bytes_to_write; + bytes_to_write.mv_data = (char*)value.mv_data + 8; + bytes_to_write.mv_size = value.mv_size - 8; +#ifdef MDB_RPAGE_CACHE + rc = mdb_direct_write(txn, dbi, &key, offset, &bytes_to_write); + if (!rc) break; // success +#endif + // if no success, this means we probably weren't able to write to a single + // word safely, so we need to do a real put + MDB_val last_data; + rc = mdb_get(txn, dbi, &key, &last_data); + if (rc) break; // failed to get + bytes_to_write.mv_size = last_data.mv_size; + // attempt a put, using reserve (so we can efficiently copy data in) + rc = mdb_put(txn, dbi, &key, &bytes_to_write, (flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP)) | MDB_RESERVE); + if (!rc) { + // copy the existing data + memcpy(bytes_to_write.mv_data, last_data.mv_data, last_data.mv_size); + // copy the changes + memcpy((char*)bytes_to_write.mv_data + offset, (char*)value.mv_data + 8, value.mv_size - 8); + } + break; // done + } + } + } if (flags & SET_VERSION) rc = putWithVersion(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP), setVersion); else rc = mdb_put(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP)); if (flags & COMPRESSIBLE) delete value.mv_data; - //fprintf(stdout, "put %u \n", key.mv_size); break; case DEL: rc = mdb_del(txn, dbi, &key, nullptr); diff --git a/test/index.test.js b/test/index.test.js index 5e4146022..103c4001a 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,12 +1,15 @@ import chai from 'chai'; -import { spawn } from 'child_process'; -import { encoder as orderedBinaryEncoder } from 'ordered-binary/index.js'; import path, { dirname } from 'path'; import rimraf from 'rimraf'; -import { fileURLToPath } from 'url'; let should = chai.should(); let expect = chai.expect; +import { spawn } from 'child_process'; +import { unlinkSync } from 'fs' +import { fileURLToPath } from 'url' +import { Worker } from 'worker_threads'; +import { encoder as orderedBinaryEncoder } from 'ordered-binary/index.js' +import inspector from 'inspector' //inspector.open(9229, null, true); debugger let nativeMethods, dirName = dirname(fileURLToPath(import.meta.url)); @@ -21,6 +24,8 @@ import { keyValueToBuffer, levelup, open, + TIMESTAMP_PLACEHOLDER, + DIRECT_WRITE_PLACEHOLDER } from '../node-index.js'; import { openAsClass } from '../open.js'; import { RangeIterable } from '../util/RangeIterable.js'; @@ -1388,6 +1393,255 @@ describe('lmdb-js', function () { } await Promise.all(finishedTxns); }); + + it('assign timestamps', async function() { + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-timestamp', + encoding: 'binary' + })); + let value = Buffer.alloc(16, 3); + value.set(TIMESTAMP_PLACEHOLDER); + value[4] = 0; + await dbBinary.put(1, value, { + instructedWrite: true, + }); + let returnedValue = dbBinary.get(1); + let dataView = new DataView(returnedValue.buffer, 0, 16); + let assignedTimestamp = dataView.getFloat64(0); + should.equal(assignedTimestamp + 100000 > Date.now(), true); + should.equal(assignedTimestamp - 100000 < Date.now(), true); + should.equal(returnedValue[9], 3); + + value = Buffer.alloc(16, 3); + value.set(TIMESTAMP_PLACEHOLDER); + value[4] = 1; // assign previous + + await dbBinary.put(1, value, { + instructedWrite: true, + }); + returnedValue = dbBinary.get(1); + dataView = new DataView(returnedValue.buffer, 0, 16); + should.equal(assignedTimestamp, dataView.getFloat64(0)); + should.equal(returnedValue[9], 3); + }); + + it('lock/unlock notifications', async function() { + let listener_called = 0; + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + }), true); + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + }), false); + let finished_locks = new Promise((resolve) => { + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + resolve(); + }), false); + }); + should.equal(db.hasLock('hi', 55555), false); + should.equal(db.hasLock(3.2, 3), false); + should.equal(db.hasLock(3.2, 55555), true); + should.equal(db.hasLock(3.2, 55555), true); + should.equal(db.unlock(3.2, 55555), true); + should.equal(db.hasLock(3.2, 55555), false); + await finished_locks; + should.equal(listener_called, 2); + should.equal(db.hasLock(3.2, 55555), false); + }); + + it('lock/unlock with worker', async function() { + let listener_called = 0; + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + }), true); + let worker = new Worker('./test/lock-test.js', { + workerData: { + path: db.path, + } + }); + let onworkerlock, onworkerunlock; + worker.on('error', (error) => { + console.log(error); + }) + await new Promise((resolve, reject) => { + worker.on('error', (error) => { + reject(error); + }) + worker.on('message', (event) => { + if (event.started) { + should.equal(event.hasLock, true); + resolve(); + } + if (event.locked) onworkerlock(); + //if (event.unlocked) onworkerunlock(); + }); + }); + db.unlock(4, 1); + await new Promise(resolve => { + onworkerlock = resolve; + }); + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + onworkerunlock(); + }), false); + worker.postMessage({unlock: true}); + await new Promise(resolve => { + onworkerunlock = resolve; + }); + should.equal(listener_called, 1); + worker.postMessage({lock: true}); + await new Promise(resolve => { + onworkerlock = resolve; + }); + await new Promise(resolve => { + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + should.equal(listener_called, 2); + resolve(); + }), false); + worker.terminate(); + }); + }); + + it('direct write', async function() { + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct', + encoding: 'binary', + compression: { // options.trackMetrics: true, + threshold: 40, + startingOffset: 16, + } + })); + let value = Buffer.alloc(100, 4); + await dbBinary.put(1, value, { + instructedWrite: true, + }); + + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + should.equal(returnedValue[2], 4); + value = Buffer.alloc(12, 3); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + value.set([1,2,3,4], 8); + + await dbBinary.put(1, value, { + instructedWrite: true, + }); + returnedValue = dbBinary.get(1); + const expected = Buffer.alloc(100, 4); + expected.set([1,2,3,4], 2); + returnedValue.should.deep.equal(expected); + + // this should always trigger the full put operation + value = Buffer.alloc(18, 3); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + value.set([1,2,3,4,5,6,7,8,9,10], 8); + + await dbBinary.put(1, value, { + instructedWrite: true, + }); + returnedValue = dbBinary.get(1); + expected.set([1,2,3,4,5,6,7,8,9,10], 2); + returnedValue.should.deep.equal(expected); + }); + + it.skip('large direct write tearing', async function() { + // this test is for checking whether direct reads and writes cause memory "tearing" + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct-big', + encoding: 'binary', + compression: false, + })); + let value = Buffer.alloc(0x5000, 4); + await dbBinary.put(1, value); + let f64 = new Float64Array(1); + let u8 = new Uint8Array(f64.buffer, 0, 8); + for (let i = 0; i < 10000; i++) { + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + let updated_byte = i % 200; + value = Buffer.alloc(32, updated_byte); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + let promise = dbBinary.put(1, value, { + instructedWrite: true, + }); + await new Promise(resolve => setImmediate(resolve)); + returnedValue = dbBinary.get(1); + let dataView = new DataView(returnedValue.buffer, returnedValue.byteOffset, returnedValue.byteLength); + //let livef64 = new Float64Array(returnedValue.buffer, returnedValue.byteOffset, + // returnedValue.byteLength/8); + let j = 0; + let k = 0; + detect_change: do { + j++; + while(true) { + let a = dataView.getFloat64(6); + let b = dataView.getFloat64(6); + if (a === b) { + f64[0] = a; + break; + } + } + + for (k = 0; k < 8; k++) { + if (u8[k] === updated_byte) + break detect_change; + } + }while(j < 1000); + if (u8[0] !== u8[7]) + console.log(j, k, u8); + } + }); + + it.skip('small direct write tearing', async function() { + // this test is for checking whether direct reads and writes cause memory "tearing" + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct-small', + encoding: 'binary', + compression: false, + })); + let f64 = new Float64Array(1); + let u8 = new Uint8Array(f64.buffer, 0, 8); + for (let i = 0; i < 100000; i++) { + /*for (let j = 0; j < 100;j++) { + dbBinary.put(Math.random(), Buffer.alloc(Math.random() * 10)); // keep the offset random + }*/ + let value = Buffer.alloc(16, 4); + await dbBinary.put(1, value); + + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + let updated_byte = i % 200; + value = Buffer.alloc(16, updated_byte); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + let promise = dbBinary.put(1, value, { + instructedWrite: true, + }); + await new Promise(resolve => setImmediate(resolve)); + let j = 0; + let k = 0; + returnedValue = dbBinary.getBinaryFast(1); + let dataView = new DataView(returnedValue.buffer, returnedValue.byteOffset, returnedValue.byteLength); + detect_change: do { + returnedValue = dbBinary.getBinaryFast(1); + //let livef64 = new Float64Array(returnedValue.buffer, returnedValue.byteOffset, + // returnedValue.byteLength/8); + j++; + for (k = 2; k < 10; k++) { + if (returnedValue[k] === updated_byte) + break detect_change; + } + }while(j < 1000); + if (returnedValue[2] !== returnedValue[9]) + console.log(j, k, returnedValue); + } + }); + it('open and close', async function () { if (options.encryptionKey) // it won't match the environment @@ -1467,7 +1721,7 @@ describe('lmdb-js', function () { await db.put('for-backup-' + (i % 120), value.slice(0, i * 20)); } try { - fs.unlinkSync(testDirPath + '/backup.mdb'); + unlinkSync(testDirPath + '/backup.mdb'); } catch (error) {} await db.flushed; await db.backup(testDirPath + '/backup.mdb', true); diff --git a/test/lock-test.js b/test/lock-test.js new file mode 100644 index 000000000..561ffc5ea --- /dev/null +++ b/test/lock-test.js @@ -0,0 +1,21 @@ +import { open } from '../node-index.js'; +import { parentPort, workerData } from 'worker_threads'; +let db = open({ + name: 'mydb1', + useVersions: true, + path: workerData.path, +}); +function getLock() { + if (db.attemptLock(4, 1, getLock)) + parentPort.postMessage({ locked: true }); +} +getLock(); + +parentPort.on('message', (event) => { + if (event.unlock) { + db.unlock(4, 1); + parentPort.postMessage({ unlocked: true }); + } + if (event.lock) getLock(); +}); +parentPort.postMessage({ started: true, hasLock: db.hasLock(4, 1) }); diff --git a/util/RangeIterable.js b/util/RangeIterable.js index b3e1fccaf..d9573a429 100644 --- a/util/RangeIterable.js +++ b/util/RangeIterable.js @@ -17,7 +17,7 @@ export class RangeIterable { let source = this; let iterable = new RangeIterable(); iterable.iterate = (async) => { - let iterator = source[Symbol.iterator](async); + let iterator = source[async ? Symbol.asyncIterator : Symbol.iterator](); let i = 0; return { next(resolvedResult) { @@ -30,6 +30,7 @@ export class RangeIterable { } else { iteratorResult = iterator.next(); if (iteratorResult.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return iteratorResult.then(iteratorResult => this.next(iteratorResult)); } } @@ -40,6 +41,7 @@ export class RangeIterable { } result = func(iteratorResult.value, i++); if (result && result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return result.then(result => result === SKIP ? this.next() : @@ -69,7 +71,7 @@ export class RangeIterable { return iterable; } [Symbol.asyncIterator]() { - return this.iterator = this.iterate(); + return this.iterator = this.iterate(true); } [Symbol.iterator]() { return this.iterator = this.iterate(); @@ -101,11 +103,12 @@ export class RangeIterable { iterator = secondIterable[Symbol.iterator](async); result = iterator.next(); if (concatIterable.onDone) { - if (result.then) + if (result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); result.then((result) => { if (result.done()) concatIterable.onDone(); }); - else if (result.done) concatIterable.onDone(); + } else if (result.done) concatIterable.onDone(); } } else { if (concatIterable.onDone) concatIterable.onDone(); @@ -115,11 +118,13 @@ export class RangeIterable { return { next() { let result = iterator.next(); - if (result.then) + if (result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return result.then((result) => { if (result.done) return iteratorDone(result); return result; }); + } if (result.done) return iteratorDone(result); return result; }, diff --git a/write.js b/write.js index 5406e0687..1cf2232b3 100644 --- a/write.js +++ b/write.js @@ -19,7 +19,9 @@ const CONDITIONAL_ALLOW_NOTFOUND = 0x800; const SYNC_PROMISE_SUCCESS = Promise.resolve(true); const SYNC_PROMISE_FAIL = Promise.resolve(false); SYNC_PROMISE_SUCCESS.isSync = true; +SYNC_PROMISE_SUCCESS.result = true; SYNC_PROMISE_FAIL.isSync = true; +SYNC_PROMISE_FAIL.result = false; const PROMISE_SUCCESS = Promise.resolve(true); export const ABORT = 4.452694326329068e-106; // random/unguessable numbers, which work across module/versions and native export const IF_EXISTS = 3.542694326329068e-103; @@ -337,7 +339,8 @@ export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, use return; } } - if (ifVersion === undefined) { + // if it is not conditional because of ifVersion or has any flags that can make the write conditional + if (ifVersion === undefined && !(flags & 0x22030)) { if (writtenBatchDepth > 1) { if (!resolution.flag && !store.cache) resolution.flag = NO_RESOLVE; @@ -597,6 +600,8 @@ export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, use flags |= 0x10; if (versionOrOptions.noDupData) flags |= 0x20; + if (versionOrOptions.instructedWrite) + flags |= 0x2000; if (versionOrOptions.append) flags |= 0x20000; if (versionOrOptions.ifVersion != undefined)