Skip to content

Commit

Permalink
[orcaman#136] Add custom shard count value in concurrent map
Browse files Browse the repository at this point in the history
  • Loading branch information
SeunghoonBaek committed May 4, 2023
1 parent 85296bc commit 4c5d25a
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions concurrent_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type Stringer interface {
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
shardCount uint32
}

// A "thread" safe string to anything map.
Expand All @@ -26,35 +27,41 @@ type ConcurrentMapShared[K comparable, V any] struct {
sync.RWMutex // Read Write mutex, guards access to internal map.
}

func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
func create[K comparable, V any](sharding func(key K) uint32, shardCount uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], shardCount),
shardCount: shardCount,
}
for i := 0; i < SHARD_COUNT; i++ {
for i := 0; i < int(m.shardCount); i++ {
m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
}
return m
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
return create[string, V](fnv32, uint32(SHARD_COUNT))
}

// Creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
return create[K, V](strfnv32[K])
return create[K, V](strfnv32[K], uint32(SHARD_COUNT))
}

// Creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
return create[K, V](sharding)
return create[K, V](sharding, uint32(SHARD_COUNT))
}

// NewWithCustomShardingCountFunction Create a new concurrent map using the given shardCount
func NewWithCustomShardingCountFunction[K comparable, V any](sharding func(key K) uint32, shardCount uint32) ConcurrentMap[K, V] {
return create[K, V](sharding, shardCount)
}

// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
return m.shards[uint(m.sharding(key))%uint(m.shardCount)]
}

func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
Expand Down Expand Up @@ -119,7 +126,7 @@ func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
// Count returns the number of elements within the map.
func (m ConcurrentMap[K, V]) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
for i := 0; i < int(m.shardCount); i++ {
shard := m.shards[i]
shard.RLock()
count += len(shard.items)
Expand Down Expand Up @@ -228,9 +235,9 @@ func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K,
if len(m.shards) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
}
chans = make([]chan Tuple[K, V], SHARD_COUNT)
chans = make([]chan Tuple[K, V], m.shardCount)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(int(m.shardCount))
// Foreach shard.
for index, shard := range m.shards {
go func(index int, shard *ConcurrentMapShared[K, V]) {
Expand Down Expand Up @@ -303,7 +310,7 @@ func (m ConcurrentMap[K, V]) Keys() []K {
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
wg.Add(int(m.shardCount))
for _, shard := range m.shards {
go func(shard *ConcurrentMapShared[K, V]) {
// Foreach key, value pair.
Expand Down

0 comments on commit 4c5d25a

Please sign in to comment.