Skip to content

Commit

Permalink
Add support for snapshots. (#64)
Browse files Browse the repository at this point in the history
* Add support for snapshots.
  • Loading branch information
bhartnett authored Jul 10, 2024
1 parent cf1267e commit 92df0b0
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 47 deletions.
13 changes: 10 additions & 3 deletions rocksdb/columnfamily.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,24 @@ proc delete*(
cf.db.delete(key, cf.handle)

proc openIterator*(
cf: ColFamilyReadOnly | ColFamilyReadWrite
cf: ColFamilyReadOnly | ColFamilyReadWrite,
readOpts = defaultReadOptions(autoClose = true),
): RocksDBResult[RocksIteratorRef] {.inline.} =
## Opens an `RocksIteratorRef` for the given column family.
cf.db.openIterator(cf.handle)
cf.db.openIterator(readOpts, cf.handle)

proc openWriteBatch*(cf: ColFamilyReadWrite): WriteBatchRef {.inline.} =
## Opens a `WriteBatchRef` for the given column family.
cf.db.openWriteBatch(cf.handle)

proc openWriteBatchWithIndex*(
cf: ColFamilyReadWrite, reservedBytes = 0, overwriteKey = false
): WriteBatchWIRef {.inline.} =
## Opens a `WriteBatchRef` for the given column family.
cf.db.openWriteBatchWithIndex(reservedBytes, overwriteKey, cf.handle)

proc write*(
cf: ColFamilyReadWrite, updates: WriteBatchRef
cf: ColFamilyReadWrite, updates: WriteBatchRef | WriteBatchWIRef
): RocksDBResult[void] {.inline.} =
## Writes the updates in the `WriteBatchRef` to the column family.
cf.db.write(updates)
8 changes: 7 additions & 1 deletion rocksdb/options/readopts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

{.push raises: [].}

import ../lib/librocksdb
import ../lib/librocksdb, ../snapshot

export snapshot.SnapshotRef, snapshot.isClosed, snapshot.getSequenceNumber

type
ReadOptionsPtr* = ptr rocksdb_readoptions_t
Expand Down Expand Up @@ -51,6 +53,10 @@ opt ignoreRangeDeletions, bool, uint8
opt deadline, int, uint64
opt ioTimeout, int, uint64

proc setSnapshot*(readOpts: ReadOptionsRef, snapshot: SnapshotRef) =
doAssert not readOpts.isClosed()
rocksdb_readoptions_set_snapshot(readOpts.cPtr, snapshot.cPtr)

proc defaultReadOptions*(autoClose = false): ReadOptionsRef {.inline.} =
let readOpts = createReadOptions(autoClose)

Expand Down
36 changes: 30 additions & 6 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import
./options/[dbopts, readopts, writeopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./[rocksiterator, rocksresult, writebatch, writebatchwi]
./[rocksiterator, rocksresult, writebatch, writebatchwi, snapshot]

export
rocksresult, dbopts, readopts, writeopts, cfdescriptor, cfhandle, rocksiterator,
writebatch, writebatchwi
writebatch, writebatchwi, snapshot.SnapshotRef, snapshot.isClosed,
snapshot.getSequenceNumber

type
RocksDbPtr* = ptr rocksdb_t
Expand Down Expand Up @@ -355,16 +356,17 @@ proc delete*(
ok()

proc openIterator*(
db: RocksDbRef, cfHandle = db.defaultCfHandle
db: RocksDbRef,
readOpts = defaultReadOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
rocksdb_create_iterator_cf(db.cPtr, db.readOpts.cPtr, cfHandle.cPtr)
let rocksIterPtr = rocksdb_create_iterator_cf(db.cPtr, readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr))
ok(newRocksIterator(rocksIterPtr, readOpts))

proc openWriteBatch*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
Expand Down Expand Up @@ -442,6 +444,28 @@ proc ingestExternalFile*(

ok()

proc getSnapshot*(db: RocksDbRef): RocksDBResult[SnapshotRef] =
## Return a handle to the current DB state. Iterators created with this handle
## will all observe a stable snapshot of the current DB state. The caller must
## call ReleaseSnapshot(result) when the snapshot is no longer needed.
doAssert not db.isClosed()

let sHandle = rocksdb_create_snapshot(db.cPtr)
if sHandle.isNil():
err("rocksdb: failed to create snapshot")
else:
ok(newSnapshot(sHandle, SnapshotType.rocksdb))

proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) =
## Release a previously acquired snapshot. The caller must not use "snapshot"
## after this call.
doAssert not db.isClosed()
doAssert snapshot.kind == SnapshotType.rocksdb

if not snapshot.isClosed():
rocksdb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc close*(db: RocksDbRef) =
## Close the `RocksDbRef` which will release the connection to the database
## and free the memory associated with it. `close` is idempotent and can
Expand Down
11 changes: 8 additions & 3 deletions rocksdb/rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

{.push raises: [].}

import ./lib/librocksdb, ./internal/utils, ./rocksresult
import ./lib/librocksdb, ./internal/utils, ./options/readopts, ./rocksresult

export rocksresult

Expand All @@ -21,10 +21,13 @@ type

RocksIteratorRef* = ref object
cPtr: RocksIteratorPtr
readOpts: ReadOptionsRef

proc newRocksIterator*(cPtr: RocksIteratorPtr): RocksIteratorRef =
proc newRocksIterator*(
cPtr: RocksIteratorPtr, readOpts: ReadOptionsRef
): RocksIteratorRef =
doAssert not cPtr.isNil()
RocksIteratorRef(cPtr: cPtr)
RocksIteratorRef(cPtr: cPtr, readOpts: readOpts)

proc isClosed*(iter: RocksIteratorRef): bool {.inline.} =
## Returns `true` if the iterator is closed and `false` otherwise.
Expand Down Expand Up @@ -128,6 +131,8 @@ proc close*(iter: RocksIteratorRef) =
rocksdb_iter_destroy(iter.cPtr)
iter.cPtr = nil

autoCloseNonNil(iter.readOpts)

iterator pairs*(iter: RocksIteratorRef): tuple[key: seq[byte], value: seq[byte]] =
## Iterates over the key value pairs in the column family yielding them in
## the form of a tuple. The iterator is automatically closed after the
Expand Down
51 changes: 51 additions & 0 deletions rocksdb/snapshot.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `SnapshotRef` represents a view of the state of the database at some point in time.

{.push raises: [].}

import ./lib/librocksdb

type
SnapshotPtr* = ptr rocksdb_snapshot_t

SnapshotType* = enum
rocksdb
transactiondb

SnapshotRef* = ref object
cPtr: SnapshotPtr
kind: SnapshotType

proc newSnapshot*(cPtr: SnapshotPtr, kind: SnapshotType): SnapshotRef =
doAssert not cPtr.isNil()
SnapshotRef(cPtr: cPtr, kind: kind)

proc isClosed*(snapshot: SnapshotRef): bool {.inline.} =
## Returns `true` if the `SnapshotRef` has been closed and `false` otherwise.
snapshot.cPtr.isNil()

proc cPtr*(snapshot: SnapshotRef): SnapshotPtr =
## Get the underlying database pointer.
doAssert not snapshot.isClosed()
snapshot.cPtr

proc kind*(snapshot: SnapshotRef): SnapshotType =
## Get the kind of the `SnapshotRef`.
snapshot.kind

proc getSequenceNumber*(snapshot: SnapshotRef): uint64 =
## Return the associated sequence number.
doAssert not snapshot.isClosed()
rocksdb_snapshot_get_sequence_number(snapshot.cPtr).uint64

proc setClosed*(snapshot: SnapshotRef) =
# The snapshot should be released from `RocksDbRef` or `TransactionDbRef`
snapshot.cPtr = nil
42 changes: 40 additions & 2 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import
./transactions/[transaction, txdbopts, txopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksresult
./[rocksiterator, rocksresult, snapshot]

export
dbopts, txdbopts, cfdescriptor, readopts, writeopts, txopts, transaction, rocksresult
dbopts, txdbopts, cfdescriptor, readopts, writeopts, txopts, transaction,
rocksiterator, rocksresult, snapshot.SnapshotRef, snapshot.isClosed,
snapshot.getSequenceNumber

type
TransactionDbPtr* = ptr rocksdb_transactiondb_t
Expand Down Expand Up @@ -120,6 +122,42 @@ proc beginTransaction*(

newTransaction(txPtr, readOpts, writeOpts, txOpts, nil, cfHandle)

proc openIterator*(
db: TransactionDbRef,
readOpts = defaultReadOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
rocksdb_transactiondb_create_iterator_cf(db.cPtr, readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr, readOpts))

proc getSnapshot*(db: TransactionDbRef): RocksDBResult[SnapshotRef] =
## Return a handle to the current DB state. Iterators created with this handle
## will all observe a stable snapshot of the current DB state. The caller must
## call ReleaseSnapshot(result) when the snapshot is no longer needed.
doAssert not db.isClosed()

let sHandle = rocksdb_transactiondb_create_snapshot(db.cPtr)
if sHandle.isNil():
err("rocksdb: failed to create snapshot")
else:
ok(newSnapshot(sHandle, SnapshotType.transactiondb))

proc releaseSnapshot*(db: TransactionDbRef, snapshot: SnapshotRef) =
## Release a previously acquired snapshot. The caller must not use "snapshot"
## after this call.
doAssert not db.isClosed()
doAssert snapshot.kind == SnapshotType.transactiondb

if not snapshot.isClosed():
rocksdb_transactiondb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`.

Expand Down
18 changes: 16 additions & 2 deletions rocksdb/transactions/transaction.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import
../lib/librocksdb,
../options/[readopts, writeopts],
../internal/[cftable, utils],
../rocksresult,
../[rocksiterator, rocksresult],
./[txopts, otxopts]

export rocksresult
export rocksiterator, rocksresult

type
TransactionPtr* = ptr rocksdb_transaction_t
Expand Down Expand Up @@ -162,6 +162,20 @@ proc rollback*(tx: TransactionRef): RocksDBResult[void] =

ok()

proc openIterator*(
db: TransactionRef,
readOpts = defaultReadOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
rocksdb_transaction_create_iterator_cf(db.cPtr, readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr, readOpts))

proc close*(tx: TransactionRef) =
## Close the `TransactionRef`.
if not tx.isClosed():
Expand Down
8 changes: 4 additions & 4 deletions rocksdb/writebatchwi.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ proc isClosed*(batch: WriteBatchWIRef): bool {.inline.} =
batch.cPtr.isNil()

proc cPtr*(batch: WriteBatchWIRef): WriteBatchWIPtr =
## Get the underlying database pointer.
## Get the underlying write batch pointer.
doAssert not batch.isClosed()
batch.cPtr

Expand Down Expand Up @@ -87,7 +87,7 @@ proc delete*(

ok()

proc get*(
proc getFromBatch*(
batch: WriteBatchWIRef,
key: openArray[byte],
onData: DataProc,
Expand Down Expand Up @@ -118,7 +118,7 @@ proc get*(
rocksdb_free(data)
ok(true)

proc get*(
proc getFromBatch*(
batch: WriteBatchWIRef, key: openArray[byte], cfHandle = batch.defaultCfHandle
): RocksDBResult[seq[byte]] =
## Get the value for a given key from the batch.
Expand All @@ -127,7 +127,7 @@ proc get*(
proc onData(data: openArray[byte]) =
dataRes.ok(@data)

let res = batch.get(key, onData, cfHandle)
let res = batch.getFromBatch(key, onData, cfHandle)
if res.isOk():
return dataRes

Expand Down
16 changes: 16 additions & 0 deletions tests/test_columnfamily.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,19 @@ suite "ColFamily Tests":

readOnlyCf.db.close()
check readOnlyCf.db.isClosed()

test "Test iterator":
let cf = db.getColFamily(CF_OTHER).get()
check cf.put(key, val).isOk()

let iter = cf.openIterator().get()
defer:
iter.close()

iter.seekToKey(key)
check:
iter.isValid() == true
iter.key() == key
iter.value() == val
iter.seekToKey(otherKey)
check iter.isValid() == false
Loading

0 comments on commit 92df0b0

Please sign in to comment.