Skip to content

Commit

Permalink
coap-gateway: fix loss of order of messages (#851)
Browse files Browse the repository at this point in the history
* coap-gateway: fix loss of order of messages

using goPool causes loss of order of messages. For example
coap notfications comes in order, but goroutines from gopool
are executed underministic.

- WithGoPool was removed
- WithProcessReceivedMessageFunc to modify core cc.ProcessReceivedMessage
  • Loading branch information
jkralik authored Jan 25, 2023
1 parent 7b2423f commit ce63344
Show file tree
Hide file tree
Showing 14 changed files with 24 additions and 119 deletions.
1 change: 1 addition & 0 deletions charts/plgd-hub/templates/coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ data:
subscriptionBufferSize: {{ .apis.coap.subscriptionBufferSize }}
messagePoolSize: {{ .apis.coap.messagePoolSize }}
requireBatchObserveEnabled: {{ .apis.coap.requireBatchObserveEnabled }}
messageQueueSize: {{ .apis.coap.messageQueueSize }}
keepAlive:
timeout: {{ .apis.coap.keepAlive.timeout }}
blockwiseTransfer:
Expand Down
1 change: 1 addition & 0 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ coapgateway:
subscriptionBufferSize: 1000
messagePoolSize: 1000
requireBatchObserveEnabled: true
messageQueueSize: 16
keepAlive:
timeout: 20s
blockwiseTransfer:
Expand Down
1 change: 1 addition & 0 deletions coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ apis:
subscriptionBufferSize: 1000
messagePoolSize: 1000
requireBatchObserveEnabled: true
messageQueueSize: 16
keepAlive:
timeout: 20s
blockwiseTransfer:
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/clientObserveHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func TestClientObserveHandlerCloseObservation(t *testing.T) {
defer shutdown()

co1 := testCoapDial(t, "", true, time.Now().Add(time.Minute))
require.NotEmpty(t, co1)
require.NotNil(t, co1)
defer func() {
_ = co1.Close()
}()
testPrepareDevice(t, co1)
co2 := testCoapDial(t, "", true, time.Now().Add(time.Minute))
require.NotEmpty(t, co1)
require.NotNil(t, co2)
defer func() {
_ = co2.Close()
}()
Expand Down
43 changes: 0 additions & 43 deletions coap-gateway/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ import (
"github.com/plgd-dev/go-coap/v3/message/pool"
"github.com/plgd-dev/go-coap/v3/message/status"
"github.com/plgd-dev/go-coap/v3/mux"
coapOptionsConfig "github.com/plgd-dev/go-coap/v3/options/config"
"github.com/plgd-dev/go-coap/v3/pkg/cache"
"github.com/plgd-dev/go-coap/v3/pkg/runner/periodic"
coapTcpClient "github.com/plgd-dev/go-coap/v3/tcp/client"
coapUdpClient "github.com/plgd-dev/go-coap/v3/udp/client"
"github.com/plgd-dev/hub/v2/coap-gateway/uri"
pbGRPC "github.com/plgd-dev/hub/v2/grpc-gateway/pb"
"github.com/plgd-dev/hub/v2/grpc-gateway/subscription"
Expand Down Expand Up @@ -555,46 +552,6 @@ func (s *Service) createServices(fileWatcher *fsnotify.Watcher, logger log.Logge
return nil, setHandlerError(uri.RefreshToken, err)
}
return coapService.New(s.ctx, s.config.APIs.COAP.Config, m, fileWatcher, logger,
coapService.WithTCPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapTcpClient.Conn], req *pool.Message, cc *coapTcpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapTcpClient.Conn]) error {
if s.config.APIs.COAP.Config.BlockwiseTransfer.Enabled {
x := struct {
processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapTcpClient.Conn]
req *pool.Message
cc *coapTcpClient.Conn
handler coapOptionsConfig.HandlerFunc[*coapTcpClient.Conn]
}{
processReqFunc: processReqFunc,
req: req,
cc: cc,
handler: handler,
}
return s.taskQueue.Submit(func() {
x.processReqFunc(x.req, x.cc, x.handler)
})
}
processReqFunc(req, cc, handler)
return nil
}),
coapService.WithUDPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapUdpClient.Conn], req *pool.Message, cc *coapUdpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapUdpClient.Conn]) error {
if s.config.APIs.COAP.Config.BlockwiseTransfer.Enabled {
x := struct {
processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapUdpClient.Conn]
req *pool.Message
cc *coapUdpClient.Conn
handler coapOptionsConfig.HandlerFunc[*coapUdpClient.Conn]
}{
processReqFunc: processReqFunc,
req: req,
cc: cc,
handler: handler,
}
return s.taskQueue.Submit(func() {
x.processReqFunc(x.req, x.cc, x.handler)
})
}
processReqFunc(req, cc, handler)
return nil
}),
coapService.WithOnNewConnection(s.coapConnOnNew),
coapService.WithOnInactivityConnection(s.onInactivityConnection),
coapService.WithMessagePool(s.messagePool),
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ require (
github.com/lestrrat-go/jwx v1.2.25
github.com/nats-io/nats.go v1.22.1
github.com/panjf2000/ants/v2 v2.7.1
github.com/pion/dtls/v2 v2.1.6-0.20221001075407-984d41b9981a
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f
github.com/pion/logging v0.2.2
github.com/plgd-dev/device/v2 v2.0.2-0.20221202214050-f9f57f7c9a61
github.com/plgd-dev/go-coap/v3 v3.0.2-0.20221201101543-2e2c858a13f2
github.com/plgd-dev/go-coap/v3 v3.1.0
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
Expand Down Expand Up @@ -86,7 +86,7 @@ require (
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/transport v0.14.1 // indirect
github.com/pion/udp v0.1.2-0.20221011090648-2589407f52c9 // indirect
github.com/pion/udp v0.1.2-0.20221201030934-a2465bb5d508 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,28 +287,27 @@ github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9i
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pion/dtls/v2 v2.0.1-0.20200503085337-8e86b3a7d585/go.mod h1:/GahSOC8ZY/+17zkaGJIG4OUkSGAcZu/N/g3roBOCkM=
github.com/pion/dtls/v2 v2.0.10-0.20210502094952-3dc563b9aede/go.mod h1:86wv5dgx2J/z871nUR+5fTTY9tISLUlo+C5Gm86r1Hs=
github.com/pion/dtls/v2 v2.1.6-0.20221001075407-984d41b9981a h1:0ehEGhn8BELqqrMXz9Kv3d2EWyd+Fkpj4a0NU4CdW+8=
github.com/pion/dtls/v2 v2.1.6-0.20221001075407-984d41b9981a/go.mod h1:htPmaz5oEpte9p0DV4oFUUCw3L7Q+sKrwZ/CUWkZu6g=
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f h1:KBWDYx2XKp8oaAOxdjcay8YRoumElppzUAqqQKdSS1c=
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f/go.mod h1:VNVUbakoOWubuZf9s+NWLFTzk1BLBjs1t16mPkBtCoA=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE=
github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q=
github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A=
github.com/pion/transport v0.13.1/go.mod h1:EBxbqzyv+ZrmDb82XswEE0BjfQFtuw1Nu6sjnjWCsGg=
github.com/pion/transport v0.14.1 h1:XSM6olwW+o8J4SCmOBb/BpwZypkHeyM0PGFCxNQBr40=
github.com/pion/transport v0.14.1/go.mod h1:4tGmbk00NeYA3rUa9+n+dzCCoKkcy3YlYb99Jn2fNnI=
github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M=
github.com/pion/udp v0.1.2-0.20221011090648-2589407f52c9 h1:VQX06cadBakm+wnoM3rOaKw8IxLX3GNth4WH9037hrk=
github.com/pion/udp v0.1.2-0.20221011090648-2589407f52c9/go.mod h1:GUd681aT3Tj7pdNkUtqBz5pp/GLMGIaMI9Obq6+ob48=
github.com/pion/udp v0.1.2-0.20221201030934-a2465bb5d508 h1:narvC4uRnuk82Ur4MO70lRLYUS+Ctt9rfp9NjwjdQzQ=
github.com/pion/udp v0.1.2-0.20221201030934-a2465bb5d508/go.mod h1:CuqU2J4MmF3sjqKfk1SaIhuNXdum5PJRqd2LHuLMQSk=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/plgd-dev/device/v2 v2.0.2-0.20221202214050-f9f57f7c9a61 h1:bD3etnuR6iF28Tvg9NRR2HxDmm/THUMKCV2goV5TFZE=
github.com/plgd-dev/device/v2 v2.0.2-0.20221202214050-f9f57f7c9a61/go.mod h1:cpORZZUGi+V/vhaFeHaQivMtUUyJ5byfpaYDOA+iZys=
github.com/plgd-dev/go-coap/v2 v2.0.4-0.20200819112225-8eb712b901bc/go.mod h1:+tCi9Q78H/orWRtpVWyBgrr4vKFo2zYtbbxUllerBp4=
github.com/plgd-dev/go-coap/v2 v2.4.1-0.20210517130748-95c37ac8e1fa/go.mod h1:rA7fc7ar+B/qa+Q0hRqv7yj/EMtIlmo1l7vkQGSrHPU=
github.com/plgd-dev/go-coap/v3 v3.0.2-0.20221201101543-2e2c858a13f2 h1:kbX4gjVrYnyLCbEIe5HQgG9jxapBY1JcYW8N10N41W4=
github.com/plgd-dev/go-coap/v3 v3.0.2-0.20221201101543-2e2c858a13f2/go.mod h1:jZZ29LJuJbXKjkmNWlpdfCbkuSrVgwfKHj8NHf0BtMY=
github.com/plgd-dev/go-coap/v3 v3.1.0 h1:nY9ITnINmUl0BBgzQWAk3dA4hPSjNRT/ienQ6pjp+sc=
github.com/plgd-dev/go-coap/v3 v3.1.0/go.mod h1:Gexp+NwPVWlgOL4hRlovETjsB8V8BDXMRveIOHeQrHc=
github.com/plgd-dev/kit v0.0.0-20200819113605-d5fcf3e94f63/go.mod h1:Yl9zisyXfPdtP9hTWlJqjJYXmgU/jtSDKttz9/CeD90=
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90 h1:TC1HJ/UbyflJFPvaOdGmNZ5TeFGex1/dyr9urNGLy7M=
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90/go.mod h1:Z7oKFLSGQjdi8eInxwFCs0tSApuEM1o0qNck+sJYp4M=
Expand Down Expand Up @@ -421,7 +420,7 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -492,10 +491,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210502030024-e5908800b52b/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -555,18 +554,18 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -577,6 +576,7 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
1 change: 1 addition & 0 deletions pkg/net/coap/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
Protocols []Protocol `yaml:"protocols" json:"protocols"`
MaxMessageSize uint32 `yaml:"maxMessageSize" json:"maxMessageSize"`
MessagePoolSize int `yaml:"messagePoolSize" json:"messagePoolSize"`
MessageQueueSize int `yaml:"messageQueueSize" json:"messageQueueSize"`
BlockwiseTransfer BlockwiseTransferConfig `yaml:"blockwiseTransfer" json:"blockwiseTransfer"`
TLS TLSConfig `yaml:"tls" json:"tls"`
InactivityMonitor *InactivityMonitor `yaml:"inactivityMonitor,omitempty" json:"inactivityMonitor,omitempty"`
Expand Down
19 changes: 0 additions & 19 deletions pkg/net/coap/service/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ import (

"github.com/plgd-dev/go-coap/v3/message/pool"
"github.com/plgd-dev/go-coap/v3/mux"
coapOptionsConfig "github.com/plgd-dev/go-coap/v3/options/config"
coapTcpClient "github.com/plgd-dev/go-coap/v3/tcp/client"
coapUdpClient "github.com/plgd-dev/go-coap/v3/udp/client"
)

type Options struct {
OverrideTLSConfig func(cfg *tls.Config) *tls.Config
OnNewConnection func(conn mux.Conn)
OnInactivityConnection func(conn mux.Conn)
TCPGoPool coapOptionsConfig.GoPoolFunc[*coapTcpClient.Conn]
UDPGoPool coapOptionsConfig.GoPoolFunc[*coapUdpClient.Conn]
MessagePool *pool.Pool
}

Expand All @@ -37,20 +32,6 @@ func WithOnInactivityConnection(f func(conn mux.Conn)) func(*Options) {
}
}

// Setup go pool for TCP/TCP-TLS connections
func WithTCPGoPool(f coapOptionsConfig.GoPoolFunc[*coapTcpClient.Conn]) func(*Options) {
return func(o *Options) {
o.TCPGoPool = f
}
}

// Setup go pool for UDP/DTLS connections
func WithUDPGoPool(f coapOptionsConfig.GoPoolFunc[*coapUdpClient.Conn]) func(*Options) {
return func(o *Options) {
o.UDPGoPool = f
}
}

func WithMessagePool(p *pool.Pool) func(*Options) {
return func(o *Options) {
o.MessagePool = p
Expand Down
5 changes: 3 additions & 2 deletions pkg/net/coap/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/plgd-dev/hub/v2/pkg/service"
)

func blockWiseTransferSZXFromString(s string) (blockwise.SZX, error) {
func BlockWiseTransferSZXFromString(s string) (blockwise.SZX, error) {
switch strings.ToLower(s) {
case "16":
return blockwise.SZX16, nil
Expand Down Expand Up @@ -93,7 +93,7 @@ func New(ctx context.Context, config Config, router *mux.Router, fileWatcher *fs

blockWiseTransferSZX := blockwise.SZX1024
if config.BlockwiseTransfer.Enabled {
blockWiseTransferSZX, err = blockWiseTransferSZXFromString(config.BlockwiseTransfer.SZX)
blockWiseTransferSZX, err = BlockWiseTransferSZXFromString(config.BlockwiseTransfer.SZX)
if err != nil {
return nil, fmt.Errorf("blockWiseTransferSZX error: %w", err)
}
Expand All @@ -109,6 +109,7 @@ func New(ctx context.Context, config Config, router *mux.Router, fileWatcher *fs
opts = append(opts, options.WithContext(ctx))
opts = append(opts, options.WithMessagePool(serviceOpts.MessagePool))
opts = append(opts, options.WithMaxMessageSize(config.MaxMessageSize))
opts = append(opts, options.WithReceivedMessageQueueSize(config.MessageQueueSize))
opts = append(opts, options.WithErrors(func(e error) {
logger.Errorf("plgd/go-coap: %w", e)
}))
Expand Down
19 changes: 0 additions & 19 deletions pkg/net/coap/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (

"github.com/plgd-dev/go-coap/v3/message/pool"
"github.com/plgd-dev/go-coap/v3/mux"
coapOptionsConfig "github.com/plgd-dev/go-coap/v3/options/config"
coapTcpClient "github.com/plgd-dev/go-coap/v3/tcp/client"
coapUdpClient "github.com/plgd-dev/go-coap/v3/udp/client"
"github.com/plgd-dev/hub/v2/pkg/fsnotify"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/pkg/service"
Expand Down Expand Up @@ -60,14 +57,6 @@ func TestNew(t *testing.T) {
WithMessagePool(pool.New(uint32(1024), 1024)),
WithOnNewConnection(func(conn mux.Conn) {}),
WithOnInactivityConnection(func(conn mux.Conn) {}),
WithTCPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapTcpClient.Conn], req *pool.Message, cc *coapTcpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapTcpClient.Conn]) error {
processReqFunc(req, cc, handler)
return nil
}),
WithUDPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapUdpClient.Conn], req *pool.Message, cc *coapUdpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapUdpClient.Conn]) error {
processReqFunc(req, cc, handler)
return nil
}),
WithOverrideTLS(func(cfg *tls.Config) *tls.Config { return cfg }),
},
},
Expand Down Expand Up @@ -95,14 +84,6 @@ func TestNew(t *testing.T) {
WithMessagePool(pool.New(uint32(1024), 1024)),
WithOnNewConnection(func(conn mux.Conn) {}),
WithOnInactivityConnection(func(conn mux.Conn) {}),
WithTCPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapTcpClient.Conn], req *pool.Message, cc *coapTcpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapTcpClient.Conn]) error {
processReqFunc(req, cc, handler)
return nil
}),
WithUDPGoPool(func(processReqFunc coapOptionsConfig.ProcessRequestFunc[*coapUdpClient.Conn], req *pool.Message, cc *coapUdpClient.Conn, handler coapOptionsConfig.HandlerFunc[*coapUdpClient.Conn]) error {
processReqFunc(req, cc, handler)
return nil
}),
WithOverrideTLS(func(cfg *tls.Config) *tls.Config { return cfg }),
},
},
Expand Down
3 changes: 0 additions & 3 deletions pkg/net/coap/service/tcpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ func newTCPServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Watc
return nil, fmt.Errorf("cannot create listener: %w", err)
}
tcpOpts := make([]coapTcpServer.Option, 0, 3)
if serviceOpts.TCPGoPool != nil {
tcpOpts = append(tcpOpts, options.WithGoPool(serviceOpts.TCPGoPool))
}
if serviceOpts.OnNewConnection != nil {
tcpOpts = append(tcpOpts, options.WithOnNewConn(func(cc *coapTcpClient.Conn) {
serviceOpts.OnNewConnection(cc)
Expand Down
6 changes: 0 additions & 6 deletions pkg/net/coap/service/udpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ func newDTLSServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Wat
return nil, fmt.Errorf("cannot create listener: %w", err)
}
dtlsOpts := make([]coapDtlsServer.Option, 0, 4)
if serviceOpts.UDPGoPool != nil {
dtlsOpts = append(dtlsOpts, options.WithGoPool(serviceOpts.UDPGoPool))
}
if serviceOpts.OnNewConnection != nil {
dtlsOpts = append(dtlsOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
serviceOpts.OnNewConnection(coapConn)
Expand Down Expand Up @@ -181,9 +178,6 @@ func newUDPServer(config Config, serviceOpts Options, logger log.Logger, opts ..
return nil, fmt.Errorf("cannot create listener: %w", err)
}
udpOpts := make([]coapUdpServer.Option, 0, 4)
if serviceOpts.UDPGoPool != nil {
udpOpts = append(udpOpts, options.WithGoPool(serviceOpts.UDPGoPool))
}
if serviceOpts.OnNewConnection != nil {
udpOpts = append(udpOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
serviceOpts.OnNewConnection(coapConn)
Expand Down
Loading

0 comments on commit ce63344

Please sign in to comment.