Skip to content

Commit

Permalink
Merge pull request #54 from slatedb/thrawn/race-conditions
Browse files Browse the repository at this point in the history
Fix race condition in sstable.Iterator
  • Loading branch information
naveen246 authored Dec 21, 2024
2 parents 7382570 + f01443f commit a307980
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 241 deletions.
47 changes: 26 additions & 21 deletions docs/adr/0002-design-goals.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,42 @@ Accepted

## Context

Understanding the high level goals of the project is important information for contributors and users alike. This
allows us to focus on what is important and not get side tracked with implementing features which clash with the
primary goals of the project.
Understanding the high level goals of the project is important information for
contributors and users alike. This allows us to focus on what is important and
not get side tracked with implementing features which clash with the primary
goals of the project.

## Decision

The primary goal of the project is to implement an LSM based key value store which is intended to be used
for OLTP workloads backed by object storage systems like S3.
The primary goal of the project is to implement an LSM (Log-Structured Merge)
based key-value store, which is intended to be used for OLTP (Online
Transaction Processing) workloads backed by object storage systems like S3.

Primary feature set includes
- Support for [Strict Serializability](https://jepsen.io/consistency/models/strong-serializable) - Which
allows users to have the strongest consistency guarantees, ensuring that transactions appear to execute
atomically and in a total order that respects real-time ordering of operations.
- Support for Range Queries - Allows a user to search for all keys starting from a certain point and
ending at another, effectively retrieving a continuous sequence of data based on key order.
- Support for a Merge Operator - Allows a user to build efficient streaming operations through the use
of Atomic Read-Modify-Write operations on key values.
Primary feature set includes:
- **Support for Transactions**: To ensure database consistency, and durability.
- **Support for Range Queries**: Allows a user to search for all keys starting
from a certain point and ending at another, effectively retrieving a
continuous sequence of data based on key order.
- **Support for a Merge Operator**: Allows a user to build efficient streaming
operations through the use of Atomic Read-Modify-Write operations on key
values.

### Configurability
Since slateDB is primarily backed by object storage systems, it will provide configurable options to
tune latency, throughput and durability depending on the user workload.
Since slateDB is primarily backed by object storage systems, it will provide
configurable options to tune latency, throughput and durability depending on
the user workload.

### Expected use case
For golang, we expect slateDB will be used for event processing and event capture workloads. As such,
much of the design goals revolves around solving for that use case.
For golang, we expect slateDB will be used for event processing and event
capture workloads. As such, much of the design goals revolves around solving
for that use case.

TODO: Other expected use cases?

## Consequences

SlateDB is not intended to solve write synchronization, nor provide a network protocol for interacting
with SlateDB, nor support long-running transactions for use in non database synchronization. Users will need
to implement multi-client synchronization and partitioning schemes which make sense for their workloads and
achieve their throughput and availability goals.
SlateDB is not intended to solve write synchronization, nor provide a network
protocol for interacting with SlateDB, nor support long-running transactions
for use in non database synchronization. Users will need to implement
multi-client synchronization and partitioning schemes which make sense for
their workloads and achieve their throughput and availability goals.
2 changes: 1 addition & 1 deletion internal/sstable/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ReadBlocks(info *Info, index *Index, r common.Range, obj common.ReadOnlyBlo
}

startOffset := rng.Start
decodedBlocks := make([]block.Block, 0)
var decodedBlocks []block.Block
blockMetaList := index.BlockMeta()
compressionCodec := info.CompressionCodec

Expand Down
202 changes: 65 additions & 137 deletions internal/sstable/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,82 +3,54 @@ package sstable
import (
"bytes"
"fmt"
"github.com/samber/mo"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable/block"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/common"
"math"
"sync"
)

type TableStore interface {
ReadIndex(*Handle) (*Index, error)
ReadBlocksUsingIndex(*Handle, common.Range, *Index) ([]block.Block, error)
}

// Iterator helps in iterating through KeyValue pairs present in the SSTable.
// Since each SSTable is a list of Blocks, this iterator internally uses block.Iterator to iterate through each Block
// Iterator iterates through KeyValue pairs present in the SSTable.
type Iterator struct {
table *Handle
indexData *Index
tableStore TableStore
currentBlockIter mo.Option[*block.Iterator]
fromKey mo.Option[[]byte]
nextBlockIdxToFetch uint64
fetchTasks chan chan mo.Option[[]block.Block]
maxFetchTasks uint64
numBlocksToFetch uint64
warn types.ErrWarn
blockIter *block.Iterator
warn types.ErrWarn
store TableStore
handle *Handle
index *Index
fromKey []byte
nextBlock uint64
}

func NewIterator(
table *Handle,
tableStore TableStore,
maxFetchTasks uint64,
numBlocksToFetch uint64,
) (*Iterator, error) {
indexData, err := tableStore.ReadIndex(table)
func NewIterator(handle *Handle, store TableStore) (*Iterator, error) {
index, err := store.ReadIndex(handle)
if err != nil {
return nil, err
}

return &Iterator{
table: table,
indexData: indexData,
tableStore: tableStore,
currentBlockIter: mo.None[*block.Iterator](),
fromKey: mo.None[[]byte](),
nextBlockIdxToFetch: 0,
fetchTasks: make(chan chan mo.Option[[]block.Block], maxFetchTasks),
maxFetchTasks: maxFetchTasks,
numBlocksToFetch: numBlocksToFetch,
handle: handle,
store: store,
index: index,
nextBlock: 0,
}, nil
}

func NewIteratorAtKey(
table *Handle,
fromKey []byte,
tableStore TableStore,
maxFetchTasks uint64,
numBlocksToFetch uint64,
) (*Iterator, error) {
indexData, err := tableStore.ReadIndex(table)
func NewIteratorAtKey(handle *Handle, key []byte, store TableStore) (*Iterator, error) {
index, err := store.ReadIndex(handle)
if err != nil {
return nil, err
}

iter := &Iterator{
table: table,
indexData: indexData,
tableStore: tableStore,
currentBlockIter: mo.None[*block.Iterator](),
fromKey: mo.Some(fromKey),
fetchTasks: make(chan chan mo.Option[[]block.Block], maxFetchTasks),
maxFetchTasks: maxFetchTasks,
numBlocksToFetch: numBlocksToFetch,
fromKey: bytes.Clone(key),
handle: handle,
store: store,
index: index,
}
iter.nextBlockIdxToFetch = iter.firstBlockWithDataIncludingOrAfterKey(indexData, fromKey)
iter.nextBlock = iter.firstBlockIncludingOrAfterKey(index, key)
return iter, nil
}

Expand All @@ -102,137 +74,93 @@ func (iter *Iterator) Next() (types.KeyValue, bool) {

func (iter *Iterator) NextEntry() (types.RowEntry, bool) {
for {
if iter.currentBlockIter.IsAbsent() {
nextBlockIter, err := iter.nextBlockIter()
if iter.blockIter == nil {
it, err := iter.nextBlockIter()
if err != nil {
// TODO(thrawn01): This could be a transient error, or a corruption error
// we need to handle each differently.
iter.warn.Add("while fetching blocks for SST '%s': %s",
iter.handle.Id.String(), err.Error())
return types.RowEntry{}, false
}

if nextBlockIter.IsPresent() {
iter.currentBlockIter = nextBlockIter
} else {
if it == nil { // No more blocks
return types.RowEntry{}, false
}
iter.blockIter = it
}

currentBlockIter, _ := iter.currentBlockIter.Get()
kv, ok := currentBlockIter.NextEntry()
kv, ok := iter.blockIter.NextEntry()
if !ok {
if warn := currentBlockIter.Warnings(); warn != nil {
if warn := iter.blockIter.Warnings(); warn != nil {
iter.warn.Merge(warn)
}
// We have exhausted the current block, but not necessarily the entire SST,
// so we fall back to the top to check if we have more blocks to read.
iter.currentBlockIter = mo.None[*block.Iterator]()
iter.blockIter = nil
continue
}

return kv, true
}
}

// SpawnFetches - Each SST has multiple blocks, this method will create goroutines to fetch blocks within a range
// Range{blocksStart, blocksEnd} for a given SST from object storage
// TODO(thrawn01): This is called from compaction, we should instead call it from the constructor and it should be
// - made private to this struct
func (iter *Iterator) SpawnFetches() {

numBlocks := iter.indexData.BlockMetaLength()
table := iter.table.Clone()
// TODO(thrawn01): I don't believe we want to clone the store here. this
// this invalidates the cache and the mutex of the store, which likely
// will cause a race in TableStore.
tableStore := iter.tableStore
index := iter.indexData.Clone()
var wg sync.WaitGroup

for len(iter.fetchTasks) < int(iter.maxFetchTasks) && int(iter.nextBlockIdxToFetch) < numBlocks {
numBlocksToFetch := math.Min(
float64(iter.numBlocksToFetch),
float64(numBlocks-int(iter.nextBlockIdxToFetch)),
)
blocksStart := iter.nextBlockIdxToFetch
blocksEnd := iter.nextBlockIdxToFetch + uint64(numBlocksToFetch)

blocksCh := make(chan mo.Option[[]block.Block], 1)
iter.fetchTasks <- blocksCh

blocksRange := common.Range{Start: blocksStart, End: blocksEnd}

wg.Add(1)
go func() {
blocks, err := tableStore.ReadBlocksUsingIndex(table, blocksRange, index)
if err != nil {
// TODO(thrawn01): handle error
blocksCh <- mo.None[[]block.Block]()
} else {
blocksCh <- mo.Some(blocks)
}
wg.Done()
}()
wg.Wait()

iter.nextBlockIdxToFetch = blocksEnd
// nextBlockIter fetches the next block and returns an iterator for that block
func (iter *Iterator) nextBlockIter() (*block.Iterator, error) {
if iter.nextBlock >= uint64(iter.index.BlockMetaLength()) {
return nil, nil // No more blocks to read
}
}

func (iter *Iterator) nextBlockIter() (mo.Option[*block.Iterator], error) {
for {
iter.SpawnFetches()
// TODO(thrawn01): This is a race, we should not expect an empty channel to indicate there are no more
// items to process.
if len(iter.fetchTasks) == 0 {
assert.True(int(iter.nextBlockIdxToFetch) == iter.indexData.BlockMetaLength(), "")
fmt.Printf("Iteration Stopped Due To Empty Task Channel\n")
return mo.None[*block.Iterator](), nil
}
// Fetch the next block
rng := common.Range{Start: iter.nextBlock, End: iter.nextBlock + 1}
blocks, err := iter.store.ReadBlocksUsingIndex(iter.handle, rng, iter.index)
if err != nil {
return nil, fmt.Errorf("while reading block range [%d:%d]: %w", rng.Start, rng.End, err)
}
if len(blocks) == 0 {
return nil, fmt.Errorf("block read range [%d:%d] returned zero blocks", rng.Start, rng.End)
}

blocksCh := <-iter.fetchTasks
blocks := <-blocksCh
if blocks.IsPresent() {
blks, _ := blocks.Get()
if len(blks) == 0 {
continue
}
// Increment the iter.nextBlock
iter.nextBlock++

b := &blks[0]
fromKey, _ := iter.fromKey.Get()
if iter.fromKey.IsPresent() {
// TODO(thrawn01): Handle this error
it, _ := block.NewIteratorAtKey(b, fromKey)
return mo.Some(it), nil
} else {
return mo.Some(block.NewIterator(b)), nil
}
} else {
// TODO(thrawn01): Return the actual error which occurred.
return mo.None[*block.Iterator](), common.ErrReadBlocks
}
// If iter.fromKey is present use NewIteratorAtKey() to find the key in the block
if iter.fromKey != nil {
// Will return an iterator nearest to where the key should be if it doesn't exist.
return block.NewIteratorAtKey(&blocks[0], iter.fromKey)
}

// Iterate through all the blocks
return block.NewIterator(&blocks[0]), nil
}

func (iter *Iterator) firstBlockWithDataIncludingOrAfterKey(index *Index, key []byte) uint64 {
// firstBlockIncludingOrAfterKey performs a binary search on the SSTable index to find the first block
// that either includes the given key or is the first block after the key. This ensures we start reading
// from either the block containing the key or the first block that could contain keys greater than the search key.
func (iter *Iterator) firstBlockIncludingOrAfterKey(index *Index, key []byte) uint64 {
low := 0
high := index.BlockMetaLength() - 1
// if the key is less than all the blocks' first key, scan the whole sst
foundBlockID := 0

loop:
for low <= high {
mid := low + (high-low)/2
// Compare the middle block's first key with the search key.
midBlockFirstKey := index.BlockMeta()[mid].FirstKey
cmp := bytes.Compare(midBlockFirstKey, key)
switch cmp {
case -1:
// If the search key is greater, narrow the search to the upper half.
case -1: // key > midBlockFirstKey
low = mid + 1
foundBlockID = mid
case 1:
// If the search key is smaller, narrow the search to the lower half.
case 1: // key < midBlockFirstKey
if mid > 0 {
high = mid - 1
} else {
break loop
}
case 0:
// If they're equal, we've found the exact block, return its index.
case 0: // exact match
return uint64(mid)
}
}
Expand Down
7 changes: 5 additions & 2 deletions internal/sstable/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (s *ID) CompactedID() mo.Option[ulid.ULID] {

return mo.Some(val)
}
func (s *ID) String() string {
return s.Value
}

func (s *ID) Clone() ID {
var sstID ID
Expand All @@ -71,8 +74,8 @@ func (s *ID) Clone() ID {
return sstID
}

// Handle represents the SSTable
// TODO(thrawn01): I think this should merge with sstable.Table
// Handle is a reference to an SSTable, which does not contain the actual
// SST data. It can represent both WALs and Compacted SSTs
type Handle struct {
Id ID
Info *Info
Expand Down
Loading

0 comments on commit a307980

Please sign in to comment.