Skip to content

Commit

Permalink
Add flush to RocksDbReadWriteRef.
Browse files Browse the repository at this point in the history
  • Loading branch information
bhartnett committed Oct 28, 2024
1 parent 018fd77 commit 22297a4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 13 deletions.
3 changes: 3 additions & 0 deletions rocksdb/options/dbopts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ proc defaultDbOptions*(autoClose = false): DbOptionsRef =
# Enable creating column families if they do not exist
dbOpts.createMissingColumnFamilies = true

# Make sure flush is atomic accross column families
dbOpts.atomicFlush = true

# Options recommended by rocksdb devs themselves, for new databases
# https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options

Expand Down
53 changes: 40 additions & 13 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export
type
RocksDbPtr* = ptr rocksdb_t
IngestExternalFilesOptionsPtr = ptr rocksdb_ingestexternalfileoptions_t
FlushOptionsPtr = ptr rocksdb_flushoptions_t

RocksDbRef* = ref object of RootObj
lock: Lock
Expand All @@ -57,6 +58,7 @@ type
RocksDbReadWriteRef* = ref object of RocksDbRef
writeOpts: WriteOptionsRef
ingestOptsPtr: IngestExternalFilesOptionsPtr
flushOptsPtr: FlushOptionsPtr

proc listColumnFamilies*(path: string): RocksDBResult[seq[string]] =
## List exisiting column families on disk. This might be used to find out
Expand Down Expand Up @@ -135,6 +137,9 @@ proc openRocksDb*(
autoCloseNonNil(writeOpts)
autoCloseAll(cfs)

let flushOptsPtr = rocksdb_flushoptions_create()
rocksdb_flushoptions_set_wait(flushOptsPtr, 1)

let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadWriteRef(
Expand All @@ -144,6 +149,7 @@ proc openRocksDb*(
dbOpts: dbOpts,
readOpts: readOpts,
writeOpts: writeOpts,
flushOptsPtr: flushOptsPtr,
cfDescriptors: cfs,
ingestOptsPtr: rocksdb_ingestexternalfileoptions_create(),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
Expand Down Expand Up @@ -465,19 +471,37 @@ proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) =
rocksdb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc flush*(db: RocksDbRef, cfs: openArray[ColFamilyHandleRef]): RocksDBResult[void] =
withLock(db.lock):
if not db.isClosed():
var cfs = cfs.mapIt(it.cPtr)
var errors: cstring
var opts = rocksdb_flushoptions_create()
defer:
rocksdb_flushoptions_destroy(opts)
rocksdb_flushoptions_set_wait(opts, 1)
rocksdb_flush_cfs(
db.cPtr, opts, addr cfs[0], cint(cfs.len), cast[cstringArray](errors.addr)
)
bailOnErrors(errors)
proc flush*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Flush all memory table data for the given column family.
doAssert not db.isClosed()

var errors: cstring
rocksdb_flush_cf(
db.cPtr, db.flushOptsPtr, cfHandle.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)

ok()

proc flush*(
db: RocksDbReadWriteRef, cfHandles: openArray[ColFamilyHandleRef]
): RocksDBResult[void] =
## Flush all memory table data for the given column families.
doAssert not db.isClosed()

var
cfs = cfHandles.mapIt(it.cPtr)
errors: cstring
rocksdb_flush_cfs(
db.cPtr,
db.flushOptsPtr,
addr cfs[0],
cint(cfs.len),
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)

ok()

Expand Down Expand Up @@ -506,3 +530,6 @@ proc close*(db: RocksDbRef) =

rocksdb_ingestexternalfileoptions_destroy(db.ingestOptsPtr)
db.ingestOptsPtr = nil

rocksdb_flushoptions_destroy(db.flushOptsPtr)
db.flushOptsPtr = nil
15 changes: 15 additions & 0 deletions tests/test_rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,18 @@ suite "RocksDbRef Tests":

db.releaseSnapshot(snapshot)
check snapshot.isClosed()

test "Test flush":
check:
db.put(key, val).isOk()
db.flush().isOk()

check:
db.put(otherKey, val, otherCfHandle).isOk()
db.flush(otherCfHandle).isOk()

let cfHandles = [defaultCfHandle, otherCfHandle]
check:
db.put(otherKey, val, defaultCfHandle).isOk()
db.put(key, val, otherCfHandle).isOk()
db.flush(cfHandles).isOk()

0 comments on commit 22297a4

Please sign in to comment.