Skip to content

Commit

Permalink
coap-gateway: block signoff operation until commands from hub are fin…
Browse files Browse the repository at this point in the history
…ished (#878)

* coap-gateway: block signoff operation until commands from hub are finished
* tests: update tests about maintenance resource
  • Loading branch information
jkralik authored Feb 8, 2023
1 parent 14fddd1 commit 8516319
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 16 deletions.
14 changes: 14 additions & 0 deletions cloud2cloud-gateway/service/retrieveDevice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/plgd-dev/device/v2/schema/configuration"
"github.com/plgd-dev/device/v2/schema/device"
"github.com/plgd-dev/device/v2/schema/interfaces"
"github.com/plgd-dev/device/v2/schema/maintenance"
"github.com/plgd-dev/device/v2/schema/platform"
"github.com/plgd-dev/device/v2/test/resource/types"
"github.com/plgd-dev/go-coap/v3/message"
Expand Down Expand Up @@ -140,6 +141,15 @@ func getDevicesBaseRepresentation(deviceID, deviceName, switchID string) interfa
"device": getDeviceData(deviceID, deviceName),
"links": []interface{}{
getResourceBaseData(deviceID, configuration.ResourceURI, []interface{}{configuration.ResourceType}, nil),
getResourceBaseData(deviceID, maintenance.ResourceURI, []interface{}{maintenance.ResourceType}, map[interface{}]interface{}{
"p": map[interface{}]interface{}{
"bm": uint64(0x1),
"port": uint64(0x0),
"sec": false,
"x.org.iotivity.tcp": uint64(0x0),
"x.org.iotivity.tls": uint64(0x0),
},
}),
getResourceBaseData(deviceID, test.TestResourceLightInstanceHref("1"), []interface{}{types.CORE_LIGHT}, nil),
getResourceBaseData(deviceID, device.ResourceURI, []interface{}{types.DEVICE_CLOUD, device.ResourceType},
map[interface{}]interface{}{
Expand Down Expand Up @@ -207,6 +217,10 @@ func getDevicesAllRepresentation(deviceID, deviceName, switchID string) interfac
map[interface{}]interface{}{
"value": false,
}),
getResourceAllData(deviceID, maintenance.ResourceURI,
map[interface{}]interface{}{
"fr": false,
}),
},
"status": "online",
})
Expand Down
24 changes: 20 additions & 4 deletions coap-gateway/service/observation/deviceObserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,18 @@ func TestDeviceObserverRegisterForPublishedResources(t *testing.T) {
}

expectedObserved := strings.MakeSet()
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.TestDevsimResources) {
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.FilterResourceLink(func(rl schema.ResourceLink) bool {
return rl.Policy.BitMask.Has(schema.Observable)
}, test.TestDevsimResources)) {
expectedObserved.Add(resID.ToString())
}
runTestDeviceObserverRegister(ctx, t, deviceID, expectedObserved, nil, validateData, nil, nil, false)
expectedRetrieved := strings.MakeSet()
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.FilterResourceLink(func(rl schema.ResourceLink) bool {
return !rl.Policy.BitMask.Has(schema.Observable)
}, test.TestDevsimResources)) {
expectedRetrieved.Add(resID.ToString())
}
runTestDeviceObserverRegister(ctx, t, deviceID, expectedObserved, expectedRetrieved, validateData, nil, nil, false)
}

func TestDeviceObserverRegisterForPublishedResourcesWithAlreadyPublishedResources(t *testing.T) {
Expand All @@ -220,10 +228,18 @@ func TestDeviceObserverRegisterForPublishedResourcesWithAlreadyPublishedResource
}

expectedObserved := strings.MakeSet()
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.TestDevsimResources) {
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.FilterResourceLink(func(rl schema.ResourceLink) bool {
return rl.Policy.BitMask.Has(schema.Observable)
}, test.TestDevsimResources)) {
expectedObserved.Add(resID.ToString())
}
runTestDeviceObserverRegister(ctx, t, deviceID, expectedObserved, nil, validateData, testPreregisterVirtualDevice, testValidateResourceLinks, false)
expectedRetrieved := strings.MakeSet()
for _, resID := range test.ResourceLinksToResourceIds(deviceID, test.FilterResourceLink(func(rl schema.ResourceLink) bool {
return !rl.Policy.BitMask.Has(schema.Observable)
}, test.TestDevsimResources)) {
expectedRetrieved.Add(resID.ToString())
}
runTestDeviceObserverRegister(ctx, t, deviceID, expectedObserved, expectedRetrieved, validateData, testPreregisterVirtualDevice, testValidateResourceLinks, false)
}

func TestDeviceObserverRegisterForDiscoveryResource(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions coap-gateway/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,6 @@ func (s *Service) processCommandTask(req *mux.Message, client *session, span tra
if err != nil {
resp = client.createErrorResponse(err, req.Token())
}
if resp != nil {
client.WriteMessage(resp)
defer client.ReleaseMessage(resp)
}
client.logRequestResponse(req, resp, err)
if err != nil {
span.RecordError(err)
span.SetStatus(otelCodes.Error, err.Error())
Expand All @@ -376,6 +371,12 @@ func (s *Service) processCommandTask(req *mux.Message, client *session, span tra
otelcoap.MessageSentEvent(req.Context(), resp)
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))
}
client.logRequestResponse(req, resp, err)
if resp != nil {
// need to be the last action, because body of response could be used by another goroutine for block wise transfer
client.WriteMessage(resp)
defer client.ReleaseMessage(resp)
}
}

func (s *Service) makeCommandTask(req *mux.Message, client *session, fnc func(req *mux.Message, client *session) (*pool.Message, error)) func() {
Expand Down
32 changes: 32 additions & 0 deletions coap-gateway/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"sync"
"time"
Expand Down Expand Up @@ -35,6 +36,7 @@ import (
otelCodes "go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -109,6 +111,10 @@ type session struct {
deviceObserver *future.Future
closeEventSubscriptions func()
}

// blockSignOff is used to block sign off until all commands from hub are finished.
// eg: factory reset was send via /oic/mnt resource and sign off are called in parallel.
blockSignOff *semaphore.Weighted
}

// newSession creates and initializes client
Expand All @@ -121,6 +127,7 @@ func newSession(server *Service, coapConn mux.Conn, tlsDeviceID string, tlsValid
exchangeCache: NewExchangeCache(),
refreshCache: NewRefreshCache(),
tlsValidUntil: tlsValidUntil,
blockSignOff: semaphore.NewWeighted(math.MaxInt64),
}
}

Expand Down Expand Up @@ -501,6 +508,11 @@ func (c *session) batchNotifyContentChanged(ctx context.Context, deviceID string
}

func (c *session) notifyContentChanged(deviceID, href string, batch bool, notification *pool.Message) error {
if !c.blockSignOff.TryAcquire(1) {
c.getLogger().Debugf("cannot notify resource /%v%v content changed: signOff processing", deviceID, href)
return nil
}
defer c.blockSignOff.Release(1)
notifyError := func(deviceID, href string, err error) error {
return fmt.Errorf("cannot notify resource /%v%v content changed: %w", deviceID, href, err)
}
Expand Down Expand Up @@ -562,6 +574,10 @@ func (c *session) updateStatusResource(ctx context.Context, sendConfirmCtx conte
}

func (c *session) UpdateResource(ctx context.Context, event *events.ResourceUpdatePending) error {
if !c.blockSignOff.TryAcquire(1) {
return fmt.Errorf("cannot update resource /%v%v: signOff processing", event.GetResourceId().GetDeviceId(), event.GetResourceId().GetHref())
}
defer c.blockSignOff.Release(1)
setDeviceIDToTracerSpan(ctx, c.deviceID())
authCtx, err := c.GetAuthorizationContext()
if err != nil {
Expand Down Expand Up @@ -630,6 +646,10 @@ func (c *session) retrieveStatusResource(ctx context.Context, sendConfirmCtx con
}

func (c *session) RetrieveResource(ctx context.Context, event *events.ResourceRetrievePending) error {
if !c.blockSignOff.TryAcquire(1) {
return fmt.Errorf("cannot retrieve resource /%v%v: signOff processing", event.GetResourceId().GetDeviceId(), event.GetResourceId().GetHref())
}
defer c.blockSignOff.Release(1)
setDeviceIDToTracerSpan(ctx, c.deviceID())
authCtx, err := c.GetAuthorizationContext()
if err != nil {
Expand Down Expand Up @@ -698,6 +718,10 @@ func (c *session) deleteStatusResource(ctx context.Context, sendConfirmCtx conte
}

func (c *session) DeleteResource(ctx context.Context, event *events.ResourceDeletePending) error {
if !c.blockSignOff.TryAcquire(1) {
return fmt.Errorf("cannot delete resource /%v%v: signOff processing", event.GetResourceId().GetDeviceId(), event.GetResourceId().GetHref())
}
defer c.blockSignOff.Release(1)
setDeviceIDToTracerSpan(ctx, c.deviceID())
authCtx, err := c.GetAuthorizationContext()
if err != nil {
Expand Down Expand Up @@ -814,6 +838,10 @@ func (c *session) createStatusResource(ctx context.Context, sendConfirmCtx conte
}

func (c *session) CreateResource(ctx context.Context, event *events.ResourceCreatePending) error {
if !c.blockSignOff.TryAcquire(1) {
return fmt.Errorf("cannot create resource /%v%v: signOff processing", event.GetResourceId().GetDeviceId(), event.GetResourceId().GetHref())
}
defer c.blockSignOff.Release(1)
setDeviceIDToTracerSpan(ctx, c.deviceID())
authCtx, err := c.GetAuthorizationContext()
if err != nil {
Expand Down Expand Up @@ -896,6 +924,10 @@ func (c *session) confirmDeviceMetadataUpdate(ctx context.Context, event *events
}

func (c *session) UpdateDeviceMetadata(ctx context.Context, event *events.DeviceMetadataUpdatePending) error {
if !c.blockSignOff.TryAcquire(1) {
return fmt.Errorf("cannot update device('%v') metadata: signOff processing", event.GetDeviceId())
}
defer c.blockSignOff.Release(1)
setDeviceIDToTracerSpan(ctx, c.deviceID())
authCtx, err := c.GetAuthorizationContext()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions coap-gateway/service/signOff.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"math"
"net/url"

"github.com/plgd-dev/go-coap/v3/message"
Expand Down Expand Up @@ -88,6 +89,11 @@ const errFmtSignOff = "cannot handle sign off: %w"
// Sign-off
// https://github.com/openconnectivityfoundation/security/blob/master/swagger2.0/oic.sec.account.swagger.json
func signOffHandler(req *mux.Message, client *session) (*pool.Message, error) {
err := client.blockSignOff.Acquire(req.Context(), math.MaxInt64)
if err != nil {
return nil, statusErrorf(coapCodes.ServiceUnavailable, errFmtSignOff, fmt.Errorf("cannot acquire sign off lock: some commands are in progress"))
}
defer client.blockSignOff.Release(math.MaxInt64)
ctx, cancel := context.WithTimeout(client.server.ctx, client.server.config.APIs.COAP.KeepAlive.Timeout)
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/panjf2000/ants/v2 v2.7.1
github.com/pion/dtls/v2 v2.1.6-0.20230201184248-11ea8c257a48
github.com/pion/logging v0.2.2
github.com/plgd-dev/device/v2 v2.0.5-0.20230202172045-cb0d7ca3b9ec
github.com/plgd-dev/device/v2 v2.0.6-0.20230207123337-38ef68052803
github.com/plgd-dev/go-coap/v3 v3.1.2-0.20230202152709-d4fc0ce1262e
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90
github.com/sirupsen/logrus v1.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/plgd-dev/device/v2 v2.0.5-0.20230202172045-cb0d7ca3b9ec h1:PwFgIkysWpM3LpnmYYACS05fUKNKwq/zAemEUkFlI2Y=
github.com/plgd-dev/device/v2 v2.0.5-0.20230202172045-cb0d7ca3b9ec/go.mod h1:UuQmKH/F5NqYln7L95+qXvItrfH6y5QPe9+h1amq5Yg=
github.com/plgd-dev/device/v2 v2.0.6-0.20230207123337-38ef68052803 h1:RFhmzNGYpMKuB37fFzPincvvcsTY9x1DeTJvyE4JngE=
github.com/plgd-dev/device/v2 v2.0.6-0.20230207123337-38ef68052803/go.mod h1:UuQmKH/F5NqYln7L95+qXvItrfH6y5QPe9+h1amq5Yg=
github.com/plgd-dev/go-coap/v2 v2.0.4-0.20200819112225-8eb712b901bc/go.mod h1:+tCi9Q78H/orWRtpVWyBgrr4vKFo2zYtbbxUllerBp4=
github.com/plgd-dev/go-coap/v2 v2.4.1-0.20210517130748-95c37ac8e1fa/go.mod h1:rA7fc7ar+B/qa+Q0hRqv7yj/EMtIlmo1l7vkQGSrHPU=
github.com/plgd-dev/go-coap/v3 v3.1.2-0.20230202152709-d4fc0ce1262e h1:nBETq6X1JzXc2+BMgoTBzwkZqHC/5/G3/NCptF4UYEs=
Expand Down
9 changes: 5 additions & 4 deletions grpc-gateway/client/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestClientFactoryReset(t *testing.T) {
wantErr bool
}{
{
name: "factory reset - maintenance resource is not published",
name: "factory reset",
args: args{
deviceID: deviceID,
},
wantErr: true,
wantErr: false,
},
{
name: "not found",
Expand All @@ -57,8 +57,9 @@ func TestClientFactoryReset(t *testing.T) {
assert.NoError(t, err)
}()

_, shutdownDevSim := test.OnboardDevSim(ctx, t, c.GrpcGatewayClient(), deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, test.GetAllBackendResourceLinks())
defer shutdownDevSim()
_, _ = test.OnboardDevSim(ctx, t, c.GrpcGatewayClient(), deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, test.GetAllBackendResourceLinks())
// shutdownDevSim is not needed because factory reset is called
// defer shutdownDevSim()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions grpc-gateway/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/panjf2000/ants/v2"
"github.com/plgd-dev/device/v2/schema/configuration"
"github.com/plgd-dev/device/v2/schema/device"
"github.com/plgd-dev/device/v2/schema/maintenance"
"github.com/plgd-dev/device/v2/schema/platform"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
Expand Down Expand Up @@ -205,6 +206,17 @@ func getResourceChangedEvents(t *testing.T, deviceID, correlationID, subscriptio
},
CorrelationId: correlationID,
},
maintenance.ResourceURI: {
SubscriptionId: subscriptionID,
Type: &pb.Event_ResourceChanged{
ResourceChanged: pbTest.MakeResourceChanged(t, deviceID, maintenance.ResourceURI, "",
map[string]interface{}{
"fr": false,
},
),
},
CorrelationId: correlationID,
},
}
}

Expand Down
13 changes: 13 additions & 0 deletions http-gateway/service/getDeviceResources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/plgd-dev/device/v2/schema/configuration"
"github.com/plgd-dev/device/v2/schema/device"
"github.com/plgd-dev/device/v2/schema/interfaces"
"github.com/plgd-dev/device/v2/schema/maintenance"
"github.com/plgd-dev/device/v2/schema/platform"
"github.com/plgd-dev/device/v2/test/resource/types"
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
Expand All @@ -31,6 +32,14 @@ import (
"google.golang.org/grpc/credentials"
)

func makeMaintenanceResourceChanged(t *testing.T, deviceID string) *events.ResourceChanged {
return pbTest.MakeResourceChanged(t, deviceID, maintenance.ResourceURI, "",
map[string]interface{}{
"fr": false,
},
)
}

func makePlatformResourceChanged(t *testing.T, deviceID string) *events.ResourceChanged {
return pbTest.MakeResourceChanged(t, deviceID, platform.ResourceURI, "",
map[string]interface{}{
Expand Down Expand Up @@ -120,6 +129,10 @@ func TestRequestHandlerGetDeviceResources(t *testing.T) {
Types: []string{types.DEVICE_CLOUD, device.ResourceType},
Data: makeCloudDeviceResourceChanged(t, deviceID),
},
{
Types: []string{maintenance.ResourceType},
Data: makeMaintenanceResourceChanged(t, deviceID),
},
},
},
{
Expand Down
10 changes: 10 additions & 0 deletions resource-directory/service/getEvents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/plgd-dev/device/v2/schema/configuration"
"github.com/plgd-dev/device/v2/schema/device"
"github.com/plgd-dev/device/v2/schema/interfaces"
"github.com/plgd-dev/device/v2/schema/maintenance"
"github.com/plgd-dev/device/v2/schema/platform"
"github.com/plgd-dev/device/v2/test/resource/types"
"github.com/plgd-dev/go-coap/v3/message"
Expand All @@ -37,6 +38,13 @@ func getOnboardEventForResource(t *testing.T, deviceID, href string) interface{}
})
}

if href == maintenance.ResourceURI {
return pbTest.MakeResourceChanged(t, deviceID, maintenance.ResourceURI, "",
map[string]interface{}{
"fr": false,
})
}

if href == device.ResourceURI {
return pbTest.MakeResourceChanged(t, deviceID, device.ResourceURI, "",
map[string]interface{}{
Expand Down Expand Up @@ -81,6 +89,7 @@ func getAllOnboardEvents(t *testing.T, deviceID string, links []schema.ResourceL
expectedRCC := getOnboardEventForResource(t, deviceID, configuration.ResourceURI)
expectedRCL := getOnboardEventForResource(t, deviceID, test.TestResourceLightInstanceHref("1"))
expectedRCS := getOnboardEventForResource(t, deviceID, test.TestResourceSwitchesHref)
expectedRCM := getOnboardEventForResource(t, deviceID, maintenance.ResourceURI)
return []interface{}{
expectedDMU,
expectedDMU1,
Expand All @@ -91,6 +100,7 @@ func getAllOnboardEvents(t *testing.T, deviceID string, links []schema.ResourceL
expectedRCL,
expectedRCS,
expectedDMU2,
expectedRCM,
}
}

Expand Down
Loading

0 comments on commit 8516319

Please sign in to comment.