Skip to content

Commit

Permalink
resource-directory: dont use RWmutex as recursive mutex - causes dead…
Browse files Browse the repository at this point in the history
…lock (#736)

* resource-directory: dont use RWmutex as recursive mutex - causes deadlock

* fixup! resource-directory: dont use RWmutex as recursive mutex - causes deadlock

* set go1.18 for check format
  • Loading branch information
jkralik authored May 26, 2022
1 parent 2cae575 commit 8b5fd90
Show file tree
Hide file tree
Showing 13 changed files with 327 additions and 235 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/checkFormat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ jobs:
with:
# fetch branches and history so `git merge-base` in check-format-on-diff works correctly
fetch-depth: 0
- uses: actions/setup-go@v2
with:
go-version: "^1.18" # The Go version to download (if necessary) and use.

- name: Check formatting
shell: bash
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ DEVICE_SIMULATOR_RES_OBSERVABLE_IMG := ghcr.io/iotivity/iotivity-lite/cloud-serv
# - name of the simulator ("$(1)-$(SIMULATOR_NAME_SUFFIX)")
define RUN-DOCKER-DEVICE
mkdir -p "$(WORKING_DIRECTORY)/.tmp/$(1)" ; \
mkdir -p "$(WORKING_DIRECTORY)/.tmp/$(1)/cloud_server_creds" ; \
docker pull $(2) ; \
docker run \
-d \
--privileged \
--name=$(1) \
--network=host \
-v $(WORKING_DIRECTORY)/.tmp/$(1):/tmp \
-v $(WORKING_DIRECTORY)/.tmp/$(1)/cloud_server_creds:/cloud_server_creds \
$(2) \
$(1)-$(SIMULATOR_NAME_SUFFIX)
endef
Expand Down
72 changes: 49 additions & 23 deletions resource-aggregate/cqrs/eventstore/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ func (i *iterator) Err() error {
return i.iter.Err()
}

func (p *Projection) getModelLocked(ctx context.Context, groupID, aggregateID string) (*aggregateModel, error) {
func (p *Projection) getModel(ctx context.Context, groupID, aggregateID string) (*aggregateModel, error) {
var ok bool
var mapApm map[string]*aggregateModel
var apm *aggregateModel

p.lock.Lock()
defer p.lock.Unlock()
if mapApm, ok = p.aggregateModels[groupID]; !ok {
mapApm = make(map[string]*aggregateModel)
p.aggregateModels[groupID] = mapApm
Expand All @@ -202,11 +204,9 @@ func (p *Projection) handle(ctx context.Context, iter Iter) (reloadQueries []Ver
}
ie := e
reloadQueries = make([]VersionQuery, 0, 32)
p.lock.Lock()
defer p.lock.Unlock()
for ie != nil {
p.LogDebugfFunc("projection.iterator.handle: GroupID %v: AggregateID %v: Version %v, EvenType %v", ie.GroupID(), ie.AggregateID(), ie.Version(), ie.EventType())
am, err := p.getModelLocked(ctx, ie.GroupID(), ie.AggregateID())
am, err := p.getModel(ctx, ie.GroupID(), ie.AggregateID())
if err != nil {
return nil, fmt.Errorf("cannot handle projection: %w", err)
}
Expand Down Expand Up @@ -289,44 +289,70 @@ func (p *Projection) Forget(queries []SnapshotQuery) (err error) {
return nil
}

func (p *Projection) allModelsLocked(onModel func(m Model) (wantNext bool)) {
func (p *Projection) iterateOverAllModels(onModel func(m Model) (wantNext bool)) {
p.lock.RLock()
defer p.lock.RUnlock()
for _, group := range p.aggregateModels {
for _, apm := range group {
if !onModel(apm.model) {
return // stop iteration
p.lock.RUnlock()
wantNext := onModel(apm.model)
p.lock.RLock()
if !wantNext {
return
}
}
}
}

// Models return models from projection.
func (p *Projection) Models(queries []SnapshotQuery, onModel func(m Model) (wantNext bool)) {
func (p *Projection) iterateOverGroupModels(groupID string, onModel func(m Model) (wantNext bool)) (wantNext bool) {
p.lock.RLock()
defer p.lock.RUnlock()
if aggregates, ok := p.aggregateModels[groupID]; ok {
for _, apm := range aggregates {
p.lock.RUnlock()
wantNext := onModel(apm.model)
p.lock.RLock()
if !wantNext {
return false
}
}
}
return true
}

func (p *Projection) iterateOverAggregateModel(groupID, aggregateID string, onModel func(m Model) (wantNext bool)) (wantNext bool) {
p.lock.RLock()
defer p.lock.RUnlock()
if aggregates, ok := p.aggregateModels[groupID]; ok {
if apm, ok := aggregates[aggregateID]; ok {
p.lock.RUnlock()
wantNext := onModel(apm.model)
p.lock.RLock()
if !wantNext {
return false
}
}
}
return true
}

// Models return models from projection.
func (p *Projection) Models(queries []SnapshotQuery, onModel func(m Model) (wantNext bool)) {
if len(queries) == 0 {
p.allModelsLocked(onModel)
p.iterateOverAllModels(onModel)
}
for _, query := range queries {
switch {
case query.GroupID == "" && query.AggregateID == "":
p.allModelsLocked(onModel)
p.iterateOverAllModels(onModel)
return
case query.GroupID != "" && query.AggregateID == "":
if aggregates, ok := p.aggregateModels[query.GroupID]; ok {
for _, apm := range aggregates {
if !onModel(apm.model) {
return // stop iteration
}
}
if !p.iterateOverGroupModels(query.GroupID, onModel) {
return
}
default:
if aggregates, ok := p.aggregateModels[query.GroupID]; ok {
if apm, ok := aggregates[query.AggregateID]; ok {
if !onModel(apm.model) {
return // stop iteration
}
}
if !p.iterateOverAggregateModel(query.GroupID, query.AggregateID, onModel) {
return
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions resource-directory/service/deviceDirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (dd *DeviceDirectory) sendDevices(deviceIDs strings.Set, req *pb.GetDevices
typeFilter := make(strings.Set)
typeFilter.Add(req.TypeFilter...)
return dd.projection.LoadDevicesMetadata(srv.Context(), deviceIDs, toReloadDevices, func(m *deviceMetadataProjection) error {
if !hasMatchingStatus(m.data.GetDeviceMetadataUpdated().GetStatus().IsOnline(), req.StatusFilter) {
deviceMetadataUpdated := m.GetDeviceMetadataUpdated()
if !hasMatchingStatus(deviceMetadataUpdated.GetStatus().IsOnline(), req.StatusFilter) {
return nil
}
resourceIdFilter := []*commands.ResourceId{commands.NewResourceID(m.GetDeviceID(), device.ResourceURI)}
Expand All @@ -132,8 +133,8 @@ func (dd *DeviceDirectory) sendDevices(deviceIDs strings.Set, req *pb.GetDevices
return nil
}
device.Metadata = &pb.Device_Metadata{
Status: m.data.GetDeviceMetadataUpdated().GetStatus(),
ShadowSynchronization: m.data.GetDeviceMetadataUpdated().GetShadowSynchronization(),
Status: deviceMetadataUpdated.GetStatus(),
ShadowSynchronization: deviceMetadataUpdated.GetShadowSynchronization(),
}
err := srv.Send(device.ToProto())
if err != nil {
Expand Down
63 changes: 45 additions & 18 deletions resource-directory/service/deviceMetadataProjection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package service

import (
"context"
"sync"
"time"

"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventstore"
Expand All @@ -10,24 +12,45 @@ import (

// protected by lock in Projection struct in resource-aggregate/cqrs/eventstore/projection.go
type deviceMetadataProjection struct {
data *events.DeviceMetadataSnapshotTaken
deviceID string

private struct {
lock sync.RWMutex // protects snapshot
snapshot *events.DeviceMetadataSnapshotTaken
}
}

func NewDeviceMetadataProjection() eventstore.Model {
return &deviceMetadataProjection{}
func NewDeviceMetadataProjection(deviceID string) eventstore.Model {
return &deviceMetadataProjection{deviceID: deviceID}
}

func (p *deviceMetadataProjection) GetDeviceID() string {
if p.data == nil {
return ""
}
return p.data.GetDeviceId()
return p.deviceID
}

func (p *deviceMetadataProjection) GetDeviceMetadataUpdated() *events.DeviceMetadataUpdated {
p.private.lock.RLock()
defer p.private.lock.RUnlock()
return p.private.snapshot.GetDeviceMetadataUpdated()
}

func (p *deviceMetadataProjection) Clone() *deviceMetadataProjection {
return &deviceMetadataProjection{
data: p.data.Clone(),
func (p *deviceMetadataProjection) GetDeviceUpdatePendings(now time.Time) []*events.DeviceMetadataUpdatePending {
updatePendings := make([]*events.DeviceMetadataUpdatePending, 0, 4)
p.private.lock.RLock()
defer p.private.lock.RUnlock()
for _, pendingCmd := range p.private.snapshot.GetUpdatePendings() {
if pendingCmd.IsExpired(now) {
continue
}
updatePendings = append(updatePendings, pendingCmd)
}
return updatePendings
}

func (p *deviceMetadataProjection) IsInitialized() bool {
p.private.lock.RLock()
defer p.private.lock.RUnlock()
return p.private.snapshot != nil
}

func (p *deviceMetadataProjection) EventType() string {
Expand All @@ -36,42 +59,46 @@ func (p *deviceMetadataProjection) EventType() string {
}

func (p *deviceMetadataProjection) Handle(ctx context.Context, iter eventstore.Iter) error {
p.private.lock.Lock()
defer p.private.lock.Unlock()
for {
eu, ok := iter.Next(ctx)
if !ok {
break
}
log.Debugf("deviceMetadataProjection.Handle deviceID=%v eventype%v version=%v", eu.GroupID(), eu.EventType(), eu.Version())
if p.data == nil {
p.data = &events.DeviceMetadataSnapshotTaken{
if p.private.snapshot == nil {
p.private.snapshot = &events.DeviceMetadataSnapshotTaken{
DeviceId: eu.GroupID(),
EventMetadata: events.MakeEventMeta("", 0, eu.Version()),
}
}
p.data.GetEventMetadata().Version = eu.Version()
eventMetadata := p.private.snapshot.GetEventMetadata().Clone()
eventMetadata.Version = eu.Version()
p.private.snapshot.EventMetadata = eventMetadata
switch eu.EventType() {
case (&events.DeviceMetadataSnapshotTaken{}).EventType():
var e events.DeviceMetadataSnapshotTaken
if err := eu.Unmarshal(&e); err != nil {
return err
}
p.data = &e
p.private.snapshot = &e
case (&events.DeviceMetadataUpdatePending{}).EventType():
var e events.DeviceMetadataUpdatePending
if err := eu.Unmarshal(&e); err != nil {
return err
}
if err := p.data.HandleDeviceMetadataUpdatePending(ctx, &e); err != nil {
if err := p.private.snapshot.HandleDeviceMetadataUpdatePending(ctx, &e); err != nil {
continue
}
p.data.DeviceId = e.GetDeviceId()
p.private.snapshot.DeviceId = e.GetDeviceId()
case (&events.DeviceMetadataUpdated{}).EventType():
var e events.DeviceMetadataUpdated
if err := eu.Unmarshal(&e); err != nil {
return err
}
p.data.DeviceId = e.GetDeviceId()
if _, err := p.data.HandleDeviceMetadataUpdated(ctx, &e, false); err != nil {
p.private.snapshot.DeviceId = e.GetDeviceId()
if _, err := p.private.snapshot.HandleDeviceMetadataUpdated(ctx, &e, false); err != nil {
continue
}
}
Expand Down
4 changes: 2 additions & 2 deletions resource-directory/service/grpcApi.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func NewEventStoreModelFactory() func(context.Context, string, string) (eventsto
return func(ctx context.Context, deviceID, resourceID string) (eventstore.Model, error) {
switch resourceID {
case commands.MakeLinksResourceUUID(deviceID):
return NewResourceLinksProjection(), nil
return NewResourceLinksProjection(deviceID), nil
case commands.MakeStatusResourceUUID(deviceID):
return NewDeviceMetadataProjection(), nil
return NewDeviceMetadataProjection(deviceID), nil
}
return NewResourceProjection(), nil
}
Expand Down
37 changes: 19 additions & 18 deletions resource-directory/service/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *Projection) LoadResourceLinks(ctx context.Context, deviceIDFilter, toRe
var err error
p.Models(func(m eventstore.Model) (wantNext bool) {
rl := m.(*resourceLinksProjection)
if len(rl.snapshot.GetResources()) == 0 {
if rl.LenResources() == 0 {
return false
}
reload = false
Expand Down Expand Up @@ -111,8 +111,8 @@ func (p *Projection) LoadDevicesMetadata(ctx context.Context, deviceIDFilter, to
reload := true
p.Models(func(m eventstore.Model) (wantNext bool) {
dm := m.(*deviceMetadataProjection)
if dm.data == nil {
return
if !dm.IsInitialized() {
return true
}
reload = false
err = onDeviceMetadataProjection(dm)
Expand Down Expand Up @@ -146,23 +146,26 @@ func getResourceIDMapFilter(resourceIDFilter []*commands.ResourceId) map[string]
}

func (p *Projection) wantToReloadDevice(rl *resourceLinksProjection, hrefFilter map[string]bool, typeFilter strings.Set) bool {
for _, res := range rl.snapshot.GetResources() {
var finalReload bool
rl.IterateOverResources(func(res *commands.Resource) (wantNext bool) {
if len(hrefFilter) > 0 && !hrefFilter[res.GetHref()] {
continue
return true
}
if !hasMatchingType(res.ResourceTypes, typeFilter) {
continue
return true
}
reload := true
p.Models(func(eventstore.Model) (wantNext bool) {
reload = false
return true
}, commands.NewResourceID(rl.GetDeviceID(), res.Href))
if reload {
return true
finalReload = true
return false
}
}
return false
return true
})
return finalReload
}

func (p *Projection) LoadResourcesWithLinks(ctx context.Context, resourceIDFilter []*commands.ResourceId, typeFilter strings.Set, toReloadDevices strings.Set, onResource func(*Resource) error) error {
Expand All @@ -175,14 +178,14 @@ func (p *Projection) LoadResourcesWithLinks(ctx context.Context, resourceIDFilte
}
return nil
}
for _, res := range rl.snapshot.GetResources() {
var err error
rl.IterateOverResources(func(res *commands.Resource) (wantNext bool) {
if len(hrefFilter) > 0 && !hrefFilter[res.GetHref()] {
continue
return true
}
if !hasMatchingType(res.ResourceTypes, typeFilter) {
continue
return true
}
var err error
p.Models(func(model eventstore.Model) (wantNext bool) {
t := model.(interface{ EventType() string }).EventType()
if t == events.NewResourceLinksSnapshotTaken().EventType() ||
Expand All @@ -196,11 +199,9 @@ func (p *Projection) LoadResourcesWithLinks(ctx context.Context, resourceIDFilte
})
return err == nil
}, commands.NewResourceID(rl.GetDeviceID(), res.Href))
if err != nil {
return err
}
}
return nil
return true
})
return err
})
if err != nil {
return err
Expand Down
Loading

0 comments on commit 8b5fd90

Please sign in to comment.