From 91be5e31219bd5edaf6a28f23e243355fc4d1978 Mon Sep 17 00:00:00 2001 From: schmidtw Date: Mon, 18 Mar 2024 00:18:16 -0700 Subject: [PATCH 1/2] feat: Add a simple wrp focused pubsub package. --- go.mod | 2 +- go.sum | 4 +- internal/pubsub/end2end_test.go | 222 +++++++++++++++++++++++++++++++ internal/pubsub/options.go | 79 +++++++++++ internal/pubsub/pubsub.go | 228 ++++++++++++++++++++++++++++++++ internal/pubsub/pubsub_test.go | 143 ++++++++++++++++++++ 6 files changed, 675 insertions(+), 3 deletions(-) create mode 100644 internal/pubsub/end2end_test.go create mode 100644 internal/pubsub/options.go create mode 100644 internal/pubsub/pubsub.go create mode 100644 internal/pubsub/pubsub_test.go diff --git a/go.mod b/go.mod index 8642960..e2af2b0 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/xmidt-org/eventor v0.0.0-20230910205925-8ff168bd12ed github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 - github.com/xmidt-org/wrp-go/v3 v3.4.3 + github.com/xmidt-org/wrp-go/v3 v3.5.1 go.uber.org/fx v1.20.1 go.uber.org/zap v1.27.0 gopkg.in/dealancer/validate.v2 v2.1.0 diff --git a/go.sum b/go.sum index 450ed12..a32eb85 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ github.com/xmidt-org/retry v0.0.3 h1:wvmBnEEn1OKwSZaQtr1RZ2Vey8JIvP72mGTgR+3wPiM github.com/xmidt-org/retry v0.0.3/go.mod h1:I7FO3VVrxPckNuotwGYZIxfBnmjMSyOTitTKNL0VkIA= github.com/xmidt-org/sallust v0.2.2 h1:MrINLEr7cMj6ENx/O76fvpfd5LNGYnk7OipZAGXPYA0= github.com/xmidt-org/sallust v0.2.2/go.mod h1:ytBoypcPw10OmjM6b92Jx3eoqWX4J5zVXOQozGwz4qs= -github.com/xmidt-org/wrp-go/v3 v3.4.3 h1:0Rk7UtTP2nNPVHhVFzKC/HF4xV9f36x7q4301p1xC6k= -github.com/xmidt-org/wrp-go/v3 v3.4.3/go.mod h1:j1kLLoPJmKkMFz/vlwP238WBoFhJgbPyJDN9W2V1TxY= +github.com/xmidt-org/wrp-go/v3 v3.5.1 h1:AYjdNDlck3M1rdsZugMxn8TcltySmT25HIuoqU105Fw= +github.com/xmidt-org/wrp-go/v3 v3.5.1/go.mod h1:zruorkuO3UJQKD8okAFOr8PYIlrPBtRGBVQxsoKX8L8= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/internal/pubsub/end2end_test.go b/internal/pubsub/end2end_test.go new file mode 100644 index 0000000..087e990 --- /dev/null +++ b/internal/pubsub/end2end_test.go @@ -0,0 +1,222 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package pubsub_test + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/xmidt-org/wrp-go/v3" + "github.com/xmidt-org/xmidt-agent/internal/pubsub" +) + +type msgWithExpectations struct { + msg wrp.Message + expectErr error +} + +var messages = []msgWithExpectations{ + { + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + Source: "self:/service/ignored", + Destination: "event:event_1/ignored", + }, + }, { + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + Source: "self:/service/ignored", + Destination: "event:event_2/ignored", + }, + }, { + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + Source: "dns:tr1d1um.example.com/service/ignored", + Destination: "mac:112233445566/service/ignored", + }, + }, { + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + Source: "mac:112233445566/service/ignored", + Destination: "dns:tr1d1um.example.com/service/ignored", + }, + }, { + // invalid message - no src + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + Destination: "dns:tr1d1um.example.com/service/ignored", + }, + expectErr: wrp.ErrorInvalidLocator, + }, { + // invalid message - no dest + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + Source: "mac:112233445566/service/ignored", + }, + expectErr: wrp.ErrorInvalidLocator, + }, { + // invalid message - invalid msg type (empty) + msg: wrp.Message{ + Source: "mac:112233445566/service/ignored", + Destination: "dns:tr1d1um.example.com/service/ignored", + }, + expectErr: wrp.ErrInvalidMessageType, + }, { + // invalid message - a string field is not valid UTF-8 + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + Source: "self:/service/ignored", + Destination: "dns:tr1d1um.example.com/service/ignored", + PartnerIDs: []string{string([]byte{0xbf})}, + ContentType: string([]byte{0xbf}), + }, + expectErr: wrp.ErrNotUTF8, + }, +} + +type mockHandler struct { + wg *sync.WaitGroup + name string + calls int + dests []wrp.Locator + expect []wrp.Locator +} + +func (h *mockHandler) WG(wg *sync.WaitGroup) { + h.wg = wg + h.wg.Add(len(h.expect)) + //fmt.Printf("%s adding %d to the wait group\n", h.name, len(h.expect)) +} + +func (h *mockHandler) HandleWrp(msg wrp.Message) { + h.calls++ + dest, _ := wrp.ParseLocator(msg.Destination) + h.dests = append(h.dests, dest) + h.wg.Done() + //fmt.Printf("%s done\n", h.name) +} + +func (h mockHandler) assert(a *assert.Assertions) { + if !a.Equal(len(h.expect), h.calls, "handler %s calls mismatch", h.name) { + return + } + + for _, expected := range h.expect { + var found bool + for j, d := range h.dests { + if expected.Scheme == d.Scheme && + expected.Authority == d.Authority && + expected.Service == d.Service && + expected.Ignored == d.Ignored { + found = true + h.dests[j].Scheme = "" + break + } + } + if !found { + a.Fail("dest not found", "handler: %s expected: %s", h.name, expected.String()) + } + } +} + +func TestEndToEnd(t *testing.T) { + id := wrp.DeviceID("mac:112233445566") + + var wg sync.WaitGroup + + allEventListener := &mockHandler{ + name: "allEventListener", + expect: []wrp.Locator{ + {Scheme: "event", Authority: "event_1", Ignored: "/ignored"}, + {Scheme: "event", Authority: "event_2", Ignored: "/ignored"}, + }, + } + allEventListener.WG(&wg) + + singleEventListener := &mockHandler{ + name: "singleEventListener", + expect: []wrp.Locator{ + {Scheme: "event", Authority: "event_2", Ignored: "/ignored"}, + }, + } + singleEventListener.WG(&wg) + + singleServiceListener := &mockHandler{ + name: "singleServiceListener", + expect: []wrp.Locator{ + {Scheme: "mac", Authority: "112233445566", Service: "service", Ignored: "/ignored"}, + }, + } + singleServiceListener.WG(&wg) + + egressListener := &mockHandler{ + name: "egressListener", + expect: []wrp.Locator{ + {Scheme: "event", Authority: "event_1", Ignored: "/ignored"}, + {Scheme: "event", Authority: "event_2", Ignored: "/ignored"}, + {Scheme: "dns", Authority: "tr1d1um.example.com", Service: "service", Ignored: "/ignored"}, + }, + } + egressListener.WG(&wg) + + assert := assert.New(t) + require := require.New(t) + + transIdValidator := pubsub.HandlerFunc( + func(msg wrp.Message) { + if msg.TransactionUUID == "" { + assert.Fail("transaction UUID is empty") + } + }) + + noTransIdValidator := pubsub.HandlerFunc( + func(msg wrp.Message) { + if msg.TransactionUUID != "" { + assert.Fail("transaction UUID is not empty") + } + }) + + var allCancel, singleCancel, serviceCancel, egressCancel pubsub.CancelFunc + ps, err := pubsub.New(id, + pubsub.WithEgressHandler(egressListener, &egressCancel), + pubsub.WithEventHandler("*", allEventListener, &allCancel), + pubsub.WithEventHandler("event_2", singleEventListener, &singleCancel), + pubsub.WithServiceHandler("service", singleServiceListener, &serviceCancel), + + pubsub.WithEgressHandler(transIdValidator), + pubsub.WithServiceHandler("*", noTransIdValidator), + pubsub.Normify( + wrp.ValidateMessageType(), + wrp.EnsureTransactionUUID(), + wrp.ValidateOnlyUTF8Strings(), + ), + ) + + require.NoError(err) + require.NotNil(ps) + require.NotNil(allCancel) + require.NotNil(singleCancel) + require.NotNil(serviceCancel) + require.NotNil(egressCancel) + + for _, m := range messages { + msg := m.msg + err := ps.Publish(&msg) + + if m.expectErr != nil { + assert.ErrorIs(err, m.expectErr) + } else { + assert.NoError(err) + } + } + + wg.Wait() + + allEventListener.assert(assert) + singleEventListener.assert(assert) + singleServiceListener.assert(assert) + egressListener.assert(assert) +} diff --git a/internal/pubsub/options.go b/internal/pubsub/options.go new file mode 100644 index 0000000..8bb07f7 --- /dev/null +++ b/internal/pubsub/options.go @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package pubsub + +import ( + "github.com/xmidt-org/wrp-go/v3" +) + +type optionFunc func(*PubSub) error + +var _ Option = optionFunc(nil) + +func (f optionFunc) apply(ps *PubSub) error { + return f(ps) +} + +// Normify is an option that sets the desired normalization options used to +// normalize/validate the wrp message. +// +// As an example, if you want all messages to contain metadata about something +// like the network interface used, this is the place to define it. +func Normify(opts ...wrp.NormifierOption) Option { + return optionFunc(func(ps *PubSub) error { + ps.desiredOpts = append(ps.desiredOpts, opts...) + return nil + }) +} + +// WithEgressHandler is an option that adds a handler for egress messages. +// If the optional cancel parameter is provided, it will be set to a function +// that can be used to cancel the subscription. +func WithEgressHandler(handler Handler, cancel ...*CancelFunc) Option { + return optionFunc(func(ps *PubSub) error { + c, err := ps.SubscribeEgress(handler) + if err != nil { + return err + } + if len(cancel) > 0 && cancel[0] != nil { + *cancel[0] = c + } + + return nil + }) +} + +// WithServiceHandler is an option that adds a handler for service messages. +// If the optional cancel parameter is provided, it will be set to a function +// that can be used to cancel the subscription. +func WithServiceHandler(service string, handler Handler, cancel ...*CancelFunc) Option { + return optionFunc(func(ps *PubSub) error { + c, err := ps.SubscribeService(service, handler) + if err != nil { + return err + } + if len(cancel) > 0 && cancel[0] != nil { + *cancel[0] = c + } + + return nil + }) +} + +// WithEventHandler is an option that adds a handler for event messages. +// If the optional cancel parameter is provided, it will be set to a function +// that can be used to cancel the subscription. +func WithEventHandler(event string, handler Handler, cancel ...*CancelFunc) Option { + return optionFunc(func(ps *PubSub) error { + c, err := ps.SubscribeEvent(event, handler) + if err != nil { + return err + } + if len(cancel) > 0 && cancel[0] != nil { + *cancel[0] = c + } + + return nil + }) +} diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go new file mode 100644 index 0000000..59fc141 --- /dev/null +++ b/internal/pubsub/pubsub.go @@ -0,0 +1,228 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package pubsub + +import ( + "fmt" + "strings" + "sync" + + "github.com/xmidt-org/eventor" + "github.com/xmidt-org/wrp-go/v3" +) + +var ( + ErrInvalidInput = fmt.Errorf("invalid input") +) + +// CancelFunc removes the associated listener with and cancels any future events +// sent to that listener. +// +// A CancelFunc is idempotent: after the first invocation, calling this closure +// will have no effect. +type CancelFunc func() + +// Handler is a function that is called whenever a message is received that +// matches the service associated with the handler. +// listening handler. +type Handler interface { + // HandleWrp is called whenever a message is received that matches the + // service associated with the handler. + HandleWrp(wrp.Message) +} + +// HandlerFunc is an adapter to allow the use of ordinary functions as handlers. +type HandlerFunc func(wrp.Message) + +func (f HandlerFunc) HandleWrp(msg wrp.Message) { + f(msg) +} + +var _ Handler = HandlerFunc(nil) + +// PubSub is a struct representing a publish-subscribe system focusing on wrp +// messages. +type PubSub struct { + lock sync.RWMutex + self wrp.DeviceID + required *wrp.Normifier + desiredOpts []wrp.NormifierOption + desired *wrp.Normifier + routes map[string]*eventor.Eventor[Handler] +} + +// Option is the interface implemented by types that can be used to +// configure the credentials. +type Option interface { + apply(*PubSub) error +} + +// New creates a new instance of the PubSub struct. The self parameter is the +// device id of the device that is creating the PubSub instance. During +// publishing, messages will be sent to the appropriate listeners based on the +// service in the message and the device id of the PubSub instance. +func New(self wrp.DeviceID, opts ...Option) (*PubSub, error) { + if self == "" { + return nil, fmt.Errorf("%w: self may not be empty", ErrInvalidInput) + } + + ps := PubSub{ + routes: make(map[string]*eventor.Eventor[Handler]), + self: self, + required: wrp.NewNormifier( + // Only the absolutely required normalizers are included here. + wrp.ValidateDestination(), + wrp.ValidateSource(), + wrp.ReplaceAnySelfLocator(string(self)), + ), + } + + for _, opt := range opts { + if opt != nil { + if err := opt.apply(&ps); err != nil { + return nil, err + } + } + } + + ps.desired = wrp.NewNormifier(ps.desiredOpts...) + + return &ps, nil +} + +// SubscribeEgress subscribes to the egress route. The listener will be called +// when a message targets something other than this device. The returned +// CancelFunc may be called to remove the listener and cancel any future events +// sent to that listener. +func (ps *PubSub) SubscribeEgress(h Handler) (CancelFunc, error) { + return ps.subscribe(egressRoute(), h) +} + +// SubscribeService subscribes to the specified service. The listener will be +// called when a message matches the service. A service value of '*' may be +// used to match any service. The returned CancelFunc may be called to remove +// the listener and cancel any future events sent to that listener. +func (ps *PubSub) SubscribeService(service string, h Handler) (CancelFunc, error) { + if err := validateString(service, "service"); err != nil { + return nil, err + } + + return ps.subscribe(serviceRoute(service), h) +} + +// SubscribeEvent subscribes to the specified event. The listener will be called +// when a message matches the event. An event value of '*' may be used to match +// any event. The returned CancelFunc may be called to remove the listener and +// cancel any future events sent to that listener. +func (ps *PubSub) SubscribeEvent(event string, h Handler) (CancelFunc, error) { + if err := validateString(event, "event"); err != nil { + return nil, err + } + + return ps.subscribe(eventRoute(event), h) +} + +func validateString(s, typ string) error { + if s == "" { + return fmt.Errorf("%w: %s may not be empty", ErrInvalidInput, typ) + } + + disallowed := "/" + if strings.ContainsAny(s, disallowed) { + return fmt.Errorf("%w: %s may not contain any of the following: '%s'", ErrInvalidInput, typ, disallowed) + } + + return nil +} + +func (ps *PubSub) subscribe(route string, h Handler) (CancelFunc, error) { + if h == nil { + return nil, fmt.Errorf("%w: handler may not be nil", ErrInvalidInput) + } + + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, found := ps.routes[route]; !found { + ps.routes[route] = new(eventor.Eventor[Handler]) + } + + return CancelFunc(ps.routes[route].Add(h)), nil +} + +// Publish publishes a wrp message to the appropriate listeners. +func (ps *PubSub) Publish(msg *wrp.Message) error { + normalized, dest, err := ps.normalize(msg) + if err != nil { + return err + } + + // Unless the destination is this device, the message will be sent to the + // egress route. If the destination is this device, the message will be sent + // to the service route. + routes := []string{egressRoute()} + switch { + case dest.ID == ps.self: + routes = []string{ + serviceRoute(dest.Service), + serviceRoute("*"), + } + case dest.Scheme == wrp.SchemeEvent: + routes = []string{ + eventRoute(dest.Authority), + eventRoute("*"), + egressRoute(), + } + } + + ps.lock.RLock() + defer ps.lock.RUnlock() + + for _, route := range routes { + if _, found := ps.routes[route]; found { + ps.routes[route].Visit(func(h Handler) { + // By making this a go routine, we can avoid deadlocks if the handler + // tries to subscribe to the same service. It also avoids blocking the + // caller if the handler takes a long time to process the message. + if h != nil { + go h.HandleWrp(*normalized) + } + }) + } + } + + return nil +} + +func (ps *PubSub) normalize(msg *wrp.Message) (*wrp.Message, wrp.Locator, error) { + if err := ps.required.Normify(msg); err != nil { + return nil, wrp.Locator{}, err + } + + // These have already been validated by the required normifier. + dst, _ := wrp.ParseLocator(msg.Destination) + src, _ := wrp.ParseLocator(msg.Source) + + if src.ID == ps.self { + // Apply the additional normalization for messages that originated from this + // device. + if err := ps.desired.Normify(msg); err != nil { + return nil, wrp.Locator{}, err + } + } + + return msg, dst, nil +} + +func serviceRoute(service string) string { + return "service:" + service +} + +func egressRoute() string { + return "egress:*" +} + +func eventRoute(event string) string { + return "event:" + event +} diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go new file mode 100644 index 0000000..626807e --- /dev/null +++ b/internal/pubsub/pubsub_test.go @@ -0,0 +1,143 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 + +package pubsub + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/xmidt-org/wrp-go/v3" +) + +func TestNew(t *testing.T) { + fn := HandlerFunc(func(wrp.Message) {}) + + tests := []struct { + description string + self string + opt Option + opts []Option + expectedErr error + validate func(*assert.Assertions, *PubSub) + }{ + { + description: "Happy Path", + self: "mac:112233445566", + validate: func(a *assert.Assertions, ps *PubSub) { + a.Equal(wrp.DeviceID("mac:112233445566"), ps.self) + }, + }, { + description: "Confirm egress", + self: "mac:112233445566", + opt: WithEgressHandler(fn), + validate: func(a *assert.Assertions, ps *PubSub) { + a.NotNil(ps.routes["egress:*"]) + }, + }, { + description: "Confirm local services", + self: "mac:112233445566", + opts: []Option{ + WithServiceHandler("config", fn), + WithServiceHandler("*", fn), + }, + validate: func(a *assert.Assertions, ps *PubSub) { + a.NotNil(ps.routes["service:*"]) + a.NotNil(ps.routes["service:config"]) + }, + }, { + description: "Confirm event service", + self: "mac:112233445566", + opts: []Option{ + WithEventHandler("*", fn), + WithEventHandler("device-status", fn), + }, + validate: func(a *assert.Assertions, ps *PubSub) { + a.NotNil(ps.routes["event:*"]) + a.NotNil(ps.routes["event:device-status"]) + }, + }, { + description: "Register multiple handlers", + self: "mac:112233445566", + opts: []Option{ + WithEgressHandler(fn), + WithEventHandler("*", fn), + WithEventHandler("device-status", fn), + WithServiceHandler("config", fn), + WithServiceHandler("*", fn), + }, + validate: func(a *assert.Assertions, ps *PubSub) { + a.NotNil(ps.routes["egress:*"]) + a.NotNil(ps.routes["event:*"]) + a.NotNil(ps.routes["event:device-status"]) + a.NotNil(ps.routes["service:*"]) + a.NotNil(ps.routes["service:config"]) + }, + }, { + description: "Confirm normify options", + self: "mac:112233445566", + opts: []Option{ + Normify(wrp.EnsureTransactionUUID()), + }, + validate: func(a *assert.Assertions, ps *PubSub) { + a.NotNil(ps.desiredOpts) + a.Equal(1, len(ps.desiredOpts)) + }, + }, + + // Error Cases + { + description: "Empty Self", + expectedErr: ErrInvalidInput, + }, { + description: "Empty Egress Handler", + self: "mac:112233445566", + opts: []Option{WithEgressHandler(nil)}, + expectedErr: ErrInvalidInput, + }, { + description: "Invalid Service Name", + self: "mac:112233445566", + opts: []Option{WithServiceHandler("foo/bar", fn)}, + expectedErr: ErrInvalidInput, + }, { + description: "Empty Service Name", + self: "mac:112233445566", + opts: []Option{WithServiceHandler("", fn)}, + expectedErr: ErrInvalidInput, + }, { + description: "Invalid Event Name", + self: "mac:112233445566", + opts: []Option{WithEventHandler("foo/bar", fn)}, + expectedErr: ErrInvalidInput, + }, { + description: "Empty Event Name", + self: "mac:112233445566", + opts: []Option{WithEventHandler("", fn)}, + expectedErr: ErrInvalidInput, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + opts := append(tc.opts, tc.opt) + id := wrp.DeviceID(tc.self) + + got, err := New(id, opts...) + + if tc.expectedErr != nil { + assert.ErrorIs(err, tc.expectedErr) + assert.Nil(got) + return + } + + assert.NoError(err) + require.NotNil(got) + if tc.validate != nil { + tc.validate(assert, got) + } + }) + } +} From a8c7451dcc08acd1d8b3132140af521a2d272635 Mon Sep 17 00:00:00 2001 From: schmidtw Date: Mon, 18 Mar 2024 00:40:18 -0700 Subject: [PATCH 2/2] Fix the race condition in the test code. --- internal/pubsub/end2end_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/pubsub/end2end_test.go b/internal/pubsub/end2end_test.go index 087e990..c4d747b 100644 --- a/internal/pubsub/end2end_test.go +++ b/internal/pubsub/end2end_test.go @@ -78,6 +78,7 @@ var messages = []msgWithExpectations{ } type mockHandler struct { + lock sync.Mutex wg *sync.WaitGroup name string calls int @@ -92,6 +93,9 @@ func (h *mockHandler) WG(wg *sync.WaitGroup) { } func (h *mockHandler) HandleWrp(msg wrp.Message) { + h.lock.Lock() + defer h.lock.Unlock() + h.calls++ dest, _ := wrp.ParseLocator(msg.Destination) h.dests = append(h.dests, dest) @@ -99,7 +103,10 @@ func (h *mockHandler) HandleWrp(msg wrp.Message) { //fmt.Printf("%s done\n", h.name) } -func (h mockHandler) assert(a *assert.Assertions) { +func (h *mockHandler) assert(a *assert.Assertions) { + h.lock.Lock() + defer h.lock.Unlock() + if !a.Equal(len(h.expect), h.calls, "handler %s calls mismatch", h.name) { return }