Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto close feature to improve memory management of option types. #57

Merged
merged 10 commits into from
Jun 28, 2024
62 changes: 20 additions & 42 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type
ingestOptsPtr: IngestExternalFilesOptionsPtr

proc listColumnFamilies*(
path: string, dbOpts = DbOptionsRef(nil)
path: string, dbOpts: DbOptionsRef
): RocksDBResult[seq[string]] =
## List exisiting column families on disk. This might be used to find out
## whether there were some columns missing with the version on disk.
Expand All @@ -70,42 +70,33 @@ proc listColumnFamilies*(
## above once rocksdb has been upgraded to the latest version, see comments
## at the end of ./columnfamily/cfhandle.nim.
##
let useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil:
useDbOpts.close()
## dbOpts is not closed by this function so it should be managed by the caller

var
lencf: csize_t
cfLen: csize_t
errors: cstring
let cList = rocksdb_list_column_families(
useDbOpts.cPtr, path.cstring, addr lencf, cast[cstringArray](errors.addr)
let cfList = rocksdb_list_column_families(
dbOpts.cPtr, path.cstring, addr cfLen, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)

var cfs: seq[string]
if not cList.isNil:
defer:
rocksdb_free(cList)
if cfList.isNil or cfLen == 0:
return ok(newSeqOfCap[string](0))

for n in 0 ..< lencf:
if cList[n].isNil:
# Clean up the rest
for z in n + 1 ..< lencf:
if not cList[z].isNil:
rocksdb_free(cList[z])
return err("short reply")
defer:
rocksdb_list_column_families_destroy(cfList, cfLen)

cfs.add $cList[n]
rocksdb_free(cList[n])
var colFamilyNames = newSeqOfCap[string](cfLen)
for i in 0 ..< cfLen:
colFamilyNames.add($cfList[i])

ok cfs
ok(colFamilyNames)

proc openRocksDb*(
path: string,
dbOpts = DbOptionsRef(nil),
readOpts = ReadOptionsRef(nil),
writeOpts = WriteOptionsRef(nil),
dbOpts = defaultDbOptions(),
readOpts = defaultReadOptions(),
writeOpts = defaultWriteOptions(),
columnFamilies: openArray[ColFamilyDescriptor] = [],
): RocksDBResult[RocksDbReadWriteRef] =
## Open a RocksDB instance in read-write mode. If `columnFamilies` is empty
Expand All @@ -114,10 +105,6 @@ proc openRocksDb*(
## By default, column families will be created if they don't yet exist.
## All existing column families must be specified if the database has
## previously created any column families.
let useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil:
useDbOpts.close()

var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
Expand All @@ -129,7 +116,7 @@ proc openRocksDb*(
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring
let rocksDbPtr = rocksdb_open_column_families(
useDbOpts.cPtr,
dbOpts.cPtr,
path.cstring,
cfNames.len().cint,
cast[cstringArray](cfNames[0].addr),
Expand All @@ -140,9 +127,6 @@ proc openRocksDb*(
bailOnErrors(errors)

let
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
writeOpts = (if writeOpts.isNil: defaultWriteOptions() else: writeOpts)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadWriteRef(
lock: createLock(),
Expand All @@ -159,8 +143,8 @@ proc openRocksDb*(

proc openRocksDbReadOnly*(
path: string,
dbOpts = DbOptionsRef(nil),
readOpts = ReadOptionsRef(nil),
dbOpts = defaultDbOptions(),
readOpts = defaultReadOptions(),
columnFamilies: openArray[ColFamilyDescriptor] = [],
errorIfWalFileExists = false,
): RocksDBResult[RocksDbReadOnlyRef] =
Expand All @@ -170,10 +154,6 @@ proc openRocksDbReadOnly*(
## families will be created if they don't yet exist. If the database already
## contains any column families, then all or a subset of the existing column
## families can be opened for reading.
let useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil:
useDbOpts.close()

var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
Expand All @@ -185,7 +165,7 @@ proc openRocksDbReadOnly*(
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring
let rocksDbPtr = rocksdb_open_for_read_only_column_families(
useDbOpts.cPtr,
dbOpts.cPtr,
path.cstring,
cfNames.len().cint,
cast[cstringArray](cfNames[0].addr),
Expand All @@ -197,8 +177,6 @@ proc openRocksDbReadOnly*(
bailOnErrors(errors)

let
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadOnlyRef(
lock: createLock(),
Expand Down
3 changes: 1 addition & 2 deletions rocksdb/rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ proc seekToKey*(iter: RocksIteratorRef, key: openArray[byte]) =
## invalid.
##
doAssert not iter.isClosed()
let (cKey, cLen) = (cast[cstring](unsafeAddr key[0]), csize_t(key.len))
rocksdb_iter_seek(iter.cPtr, cKey, cLen)
rocksdb_iter_seek(iter.cPtr, cast[cstring](unsafeAddr key[0]), csize_t(key.len))

proc seekToFirst*(iter: RocksIteratorRef) =
## Seeks to the first entry in the column family.
Expand Down
4 changes: 2 additions & 2 deletions rocksdb/sstfilewriter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ type
dbOpts: DbOptionsRef

proc openSstFileWriter*(
filePath: string, dbOpts = DbOptionsRef(nil)
filePath: string, dbOpts = defaultDbOptions()
): RocksDBResult[SstFileWriterRef] =
## Creates a new `SstFileWriterRef` and opens the file at the given `filePath`.
let dbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
doAssert not dbOpts.isClosed()

let envOptsPtr = rocksdb_envoptions_create()
Expand Down Expand Up @@ -95,6 +94,7 @@ proc finish*(writer: SstFileWriterRef): RocksDBResult[void] =
proc close*(writer: SstFileWriterRef) =
## Closes the `SstFileWriterRef`.
if not writer.isClosed():
writer.dbOpts.close()
rocksdb_envoptions_destroy(writer.envOptsPtr)
writer.envOptsPtr = nil
rocksdb_sstfilewriter_destroy(writer.cPtr)
Expand Down
25 changes: 5 additions & 20 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,13 @@ type

proc openTransactionDb*(
path: string,
dbOpts = DbOptionsRef(nil),
txDbOpts = TransactionDbOptionsRef(nil),
dbOpts = defaultDbOptions(),
txDbOpts = defaultTransactionDbOptions(),
columnFamilies: openArray[ColFamilyDescriptor] = [],
): RocksDBResult[TransactionDbRef] =
## Open a `TransactionDbRef` with the given options and column families.
## If no column families are provided the default column family will be used.
## If no options are provided the default options will be used.
let useDbOpts = (if dbOpts.isNil: defaultDbOptions() else: dbOpts)
defer:
if dbOpts.isNil:
useDbOpts.close()

var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
Expand All @@ -64,7 +60,7 @@ proc openTransactionDb*(
errors: cstring

let txDbPtr = rocksdb_transactiondb_open_column_families(
useDbOpts.cPtr,
dbOpts.cPtr,
txDbOpts.cPtr,
path.cstring,
cfNames.len().cint,
Expand All @@ -76,10 +72,6 @@ proc openTransactionDb*(
bailOnErrors(errors)

let
dbOpts = useDbOpts # don't close on exit
txDbOpts = (if txDbOpts.isNil: defaultTransactionDbOptions()
else: txDbOpts
)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = TransactionDbRef(
lock: createLock(),
Expand Down Expand Up @@ -107,22 +99,15 @@ proc isClosed*(db: TransactionDbRef): bool {.inline.} =

proc beginTransaction*(
db: TransactionDbRef,
readOpts = ReadOptionsRef(nil),
writeOpts = WriteOptionsRef(nil),
txDbOpts = TransactionDbOptionsRef(nil),
readOpts = defaultReadOptions(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is called without supplying readOpts/writeOpts, what happens? Who frees the options allocated by the default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ie in the colfamily pr, the idea was that if nil is given, the temporary options instance is "owned" by the callee (beginTransaction in this case) and therefor it also deallocates it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They get freed when the db instance is closed. db.close() will free all the opts types regardless of whether passed in or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the above comment applies in the other locations. In this instance the opts are freed when the transaction is closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the transaction/database/etc fails to open?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the core issue here is that when we do xxx = defaultOptions in the parameter list, the ownership is unclear which has these knock-on effects. semantically, it becomes strange that "sometimes" the function closes the passed-in options and sometimes it doesn't (if I pass in options, can I use them for another call? etc)

I'm not sure this is worth fixing, but it's certainly worth thinking about and the nil approach was a compromise (non-nil = caller owns) to get there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually found the solution using nil to be more unclear which is why I removed it. Using the default options solution makes it clear that the options will always get cleaned up when the database is closed. When using the nil method, the options require manual clean up if being passed in, if they don't get passed in then the options are cleaned up in the open function before the database is closed (this may cause other issues). The Java RocksDb library states ' Options instance should not be disposed before all DBs using this options instance have been closed', see here: https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/RocksDB.html#open-org.rocksdb.DBOptions-java.lang.String-java.util.List-java.util.List-

You should be able to use options for multiple calls as long as the database is open so for most of the functions that won't be a problem. In the case of beginTransaction these options get cleaned up when closing the transaction and so I guess this could be a problem if we want to reuse the opts across transactions. This function is not currently used in Nimbus by the way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we can use nim destructors with the refc garbage collector but perhaps in a future change we can use destructors to help cleanup the opts types and then don't close any opt types in db.close().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we can use nim destructors

no, these are ORC-only. we can indeed revisit the problem then - until then, perhaps it's worth documenting these quirks in the API as well, similar to java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add that.

I'm also working a change to support choosing if the types are freed when closing the database or not by adding a flag to each opt type. If you pass in an opt it won't get closed automatically but if none is passed in the default opts will get closed when the database is closed.

writeOpts = defaultWriteOptions(),
txOpts = defaultTransactionOptions(),
cfHandle = db.defaultCfHandle,
): TransactionRef =
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
doAssert not db.isClosed()
let
txDbOpts = (if txDbOpts.isNil: defaultTransactionDbOptions()
else: txDbOpts
)
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
writeOpts = (if writeOpts.isNil: defaultWriteOptions() else: writeOpts)

let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil)

Expand Down
4 changes: 1 addition & 3 deletions tests/test_helper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ proc initTransactionDb*(
path: string, columnFamilyNames: openArray[string] = @[]
): TransactionDbRef =
let res = openTransactionDb(
path,
txDbOpts = defaultTransactionDbOptions(),
columnFamilies = columnFamilyNames.mapIt(initColFamilyDescriptor(it)),
path, columnFamilies = columnFamilyNames.mapIt(initColFamilyDescriptor(it))
)
if res.isErr():
echo res.error()
Expand Down
34 changes: 22 additions & 12 deletions tests/test_rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,6 @@ suite "RocksDbRef Tests":
readOnlyDb.close()
check readOnlyDb.isClosed()

test "Close multiple times":
check not db.isClosed()
db.close()
check db.isClosed()
db.close()
check db.isClosed()

test "Unknown column family":
const CF_UNKNOWN = "unknown"
let cfHandleRes = db.getColFamilyHandle(CF_UNKNOWN)
check cfHandleRes.isErr() and cfHandleRes.error() == "rocksdb: unknown column family"

test "Test missing key and values":
let
key1 = @[byte(1)] # exists with non empty value
Expand Down Expand Up @@ -335,3 +323,25 @@ suite "RocksDbRef Tests":
r.value() == false
v.len() == 0
db.get(key5).isErr()

test "List column familes":
let dbOpts = defaultDbOptions()
defer:
dbOpts.close()

let colFamiliesRes = listColumnFamilies(dbPath, dbOpts)
check:
colFamiliesRes.isOk()
colFamiliesRes.value() == @[CF_DEFAULT, CF_OTHER]

test "Unknown column family":
const CF_UNKNOWN = "unknown"
let cfHandleRes = db.getColFamilyHandle(CF_UNKNOWN)
check cfHandleRes.isErr() and cfHandleRes.error() == "rocksdb: unknown column family"

test "Close multiple times":
check not db.isClosed()
db.close()
check db.isClosed()
db.close()
check db.isClosed()
15 changes: 15 additions & 0 deletions tests/test_rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ suite "RocksIteratorRef Tests":
iter2.key() == @[byte(3)]
iter2.value() == @[byte(3)]


test "Iterate forwards using seek to key":
let res = db.openIterator(defaultCfHandle)
check res.isOk()

var iter = res.get()
defer:
iter.close()

iter.seekToKey(key2)
check:
iter.isValid()
iter.key() == key2
iter.value() == val2

test "Empty column family":
let res = db.openIterator(emptyCfHandle)
check res.isOk()
Expand Down
54 changes: 41 additions & 13 deletions tests/test_writebatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ suite "WriteBatchRef Tests":
not batch.isClosed()

test "Test writing batch to column family":
var batch = db.openWriteBatch(otherCfHandle)
var batch = db.openWriteBatch()
defer:
batch.close()
check not batch.isClosed()

check:
batch.put(key1, val1).isOk()
batch.put(key2, val2).isOk()
batch.put(key3, val3).isOk()
batch.put(key1, val1, otherCfHandle).isOk()
batch.put(key2, val2, otherCfHandle).isOk()
batch.put(key3, val3, otherCfHandle).isOk()
batch.count() == 3

batch.delete(key2).isOk()
batch.delete(key2, otherCfHandle).isOk()
batch.count() == 4
not batch.isClosed()

Expand All @@ -92,26 +92,54 @@ suite "WriteBatchRef Tests":
batch.count() == 0
not batch.isClosed()

test "Test writing to multiple column families in single batch":
var batch = db.openWriteBatch()
defer:
batch.close()
check not batch.isClosed()

check:
batch.put(key1, val1, defaultCfHandle).isOk()
batch.put(key1, val1, otherCfHandle).isOk()
batch.put(key2, val2, otherCfHandle).isOk()
batch.put(key3, val3, otherCfHandle).isOk()
batch.count() == 4

batch.delete(key2, otherCfHandle).isOk()
batch.count() == 5
not batch.isClosed()

let res = db.write(batch)
check:
res.isOk()
db.get(key1, defaultCfHandle).get() == val1
db.get(key1, otherCfHandle).get() == val1
db.keyExists(key2, otherCfHandle).get() == false
db.get(key3, otherCfHandle).get() == val3

batch.clear()
check:
batch.count() == 0
not batch.isClosed()

test "Test writing to multiple column families in multiple batches":
var batch1 = db.openWriteBatch(defaultCfHandle)
var batch1 = db.openWriteBatch()
defer:
batch1.close()
check not batch1.isClosed()

var batch2 = db.openWriteBatch(otherCfHandle)
var batch2 = db.openWriteBatch()
defer:
batch2.close()
check not batch2.isClosed()

check:
batch1.put(key1, val1).isOk()
batch1.delete(key2).isOk()
batch1.put(key3, val3).isOk()

batch2.put(key1, val1).isOk()
batch2.delete(key1).isOk()
batch1.delete(key2, otherCfHandle).isOk()
batch1.put(key3, val3, otherCfHandle).isOk()
batch2.put(key1, val1, otherCfHandle).isOk()
batch2.delete(key1, otherCfHandle).isOk()
batch2.put(key3, val3).isOk()

batch1.count() == 3
batch2.count() == 3

Expand Down
Loading