From bc6cf1595553d1418f8519215e5602b86c420ddd Mon Sep 17 00:00:00 2001 From: Jonas Bleyl Date: Thu, 22 Dec 2022 11:37:43 +0000 Subject: [PATCH 1/2] fix: request count race condition --- internal/app/pactproxy/interaction.go | 3 ++- internal/app/pactproxy/modifier.go | 6 ++---- internal/app/pactproxy/proxy.go | 15 +++++++++++---- .../pactproxy/response_modification_writer.go | 17 +++++++++-------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/internal/app/pactproxy/interaction.go b/internal/app/pactproxy/interaction.go index dc39e5e..d79721e 100644 --- a/internal/app/pactproxy/interaction.go +++ b/internal/app/pactproxy/interaction.go @@ -365,7 +365,7 @@ func (i *Interaction) EvaluateConstrains(request requestDocument, interactions * return result, violations } -func (i *Interaction) StoreRequest(request requestDocument) { +func (i *Interaction) StoreRequest(request requestDocument) int { i.mu.Lock() defer i.mu.Unlock() i.LastRequest = request @@ -374,6 +374,7 @@ func (i *Interaction) StoreRequest(request requestDocument) { if i.recordHistory { i.RequestHistory = append(i.RequestHistory, request) } + return i.RequestCount } func (i *Interaction) HasRequests(count int) bool { diff --git a/internal/app/pactproxy/modifier.go b/internal/app/pactproxy/modifier.go index b1bd910..2bd194d 100644 --- a/internal/app/pactproxy/modifier.go +++ b/internal/app/pactproxy/modifier.go @@ -40,9 +40,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier { return result } -func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) { +func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) { for _, m := range ims.Modifiers() { - requestCount := ims.interaction.getRequestCount() if strings.HasPrefix(m.Path, "$.body.") { if m.Attempt == nil || *m.Attempt == requestCount { var err error @@ -56,9 +55,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) { return b, nil } -func (ims *interactionModifiers) modifyStatusCode() (bool, int) { +func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) { for _, m := range ims.Modifiers() { - requestCount := ims.interaction.getRequestCount() if m.Path == "$.status" { if m.Attempt == nil || *m.Attempt == requestCount { code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value)) diff --git a/internal/app/pactproxy/proxy.go b/internal/app/pactproxy/proxy.go index 8114f54..1a7a399 100644 --- a/internal/app/pactproxy/proxy.go +++ b/internal/app/pactproxy/proxy.go @@ -46,6 +46,11 @@ type api struct { echo.Context } +type matchedInteraction struct { + interaction *Interaction + requestCount int +} + func (a *api) ProxyRequest(c echo.Context) error { a.proxy.ServeHTTP(c.Response(), c.Request()) return nil @@ -286,12 +291,14 @@ func (a *api) indexHandler(c echo.Context) error { request["headers"] = h unmatched := make(map[string][]string) - matched := make([]*Interaction, 0) + matched := make([]matchedInteraction, 0) for _, interaction := range allInteractions { ok, info := interaction.EvaluateConstrains(request, a.interactions) if ok { - interaction.StoreRequest(request) - matched = append(matched, interaction) + matched = append(matched, matchedInteraction{ + interaction: interaction, + requestCount: interaction.StoreRequest(request), + }) } else { unmatched[interaction.Description] = info } @@ -306,7 +313,7 @@ func (a *api) indexHandler(c echo.Context) error { } a.notify.Notify() - a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req) + a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), matchedInteractions: matched}, req) return nil } diff --git a/internal/app/pactproxy/response_modification_writer.go b/internal/app/pactproxy/response_modification_writer.go index e8aeb9a..ea930c5 100644 --- a/internal/app/pactproxy/response_modification_writer.go +++ b/internal/app/pactproxy/response_modification_writer.go @@ -7,10 +7,11 @@ import ( ) type ResponseModificationWriter struct { - res http.ResponseWriter - interactions []*Interaction - originalResponse []byte - statusCode int + request requestDocument + res http.ResponseWriter + matchedInteractions []matchedInteraction + originalResponse []byte + statusCode int } func (m *ResponseModificationWriter) Header() http.Header { @@ -29,8 +30,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) { } var modifiedBody []byte - for _, i := range m.interactions { - modifiedBody, err = i.modifiers.modifyBody(m.originalResponse) + for _, match := range m.matchedInteractions { + modifiedBody, err = match.interaction.modifiers.modifyBody(m.originalResponse, match.requestCount) if err != nil { return 0, err } @@ -51,8 +52,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) { func (m *ResponseModificationWriter) WriteHeader(statusCode int) { m.statusCode = statusCode - for _, i := range m.interactions { - ok, code := i.modifiers.modifyStatusCode() + for _, match := range m.matchedInteractions { + ok, code := match.interaction.modifiers.modifyStatusCode(match.requestCount) if ok { m.statusCode = code break From 242adae56c8b98a047e5ad8696b0a2673dc2f2e2 Mon Sep 17 00:00:00 2001 From: Jonas Bleyl Date: Fri, 23 Dec 2022 13:13:55 +0000 Subject: [PATCH 2/2] test: concurrent requests with modifier attempt --- internal/app/concurrent_proxy_stage_test.go | 25 +++++++++++++++++---- internal/app/concurrent_proxy_test.go | 16 +++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/internal/app/concurrent_proxy_stage_test.go b/internal/app/concurrent_proxy_stage_test.go index 7de669d..ae0e2dc 100644 --- a/internal/app/concurrent_proxy_stage_test.go +++ b/internal/app/concurrent_proxy_stage_test.go @@ -25,6 +25,7 @@ type ConcurrentProxyStage struct { proxy *pactproxy.PactProxy pact *dsl.Pact modifiedNameStatusCode int + modifiedNameAttempt *int modifiedAddressStatusCode int concurrentUserRequestsPerSecond int concurrentUserRequestsDuration time.Duration @@ -64,6 +65,10 @@ func (s *ConcurrentProxyStage) a_modified_name_status_code() *ConcurrentProxySta return s } +func (s *ConcurrentProxyStage) a_modified_name_response_attempt_of(i int) { + s.modifiedNameAttempt = &i +} + func (s *ConcurrentProxyStage) a_modified_address_status_code() *ConcurrentProxyStage { s.modifiedAddressStatusCode = http.StatusConflict return s @@ -122,7 +127,7 @@ func (s *ConcurrentProxyStage) x_concurrent_address_requests_per_second_are_made func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() { err := s.pact.Verify(func() (err error) { if s.modifiedNameStatusCode != 0 { - s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedNameStatusCode), nil) + s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedNameStatusCode), s.modifiedNameAttempt) } if s.modifiedAddressStatusCode != 0 { s.proxy.ForInteraction(postAddressPact).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedAddressStatusCode), nil) @@ -184,9 +189,17 @@ func sendConcurrentRequests(requests int, d time.Duration, f func()) { case <-stop: return case <-ticker.C: + var wg sync.WaitGroup + for i := 0; i < requests; i++ { - f() + wg.Add(1) + + go func() { + defer wg.Done() + f() + }() } + wg.Wait() } } }() @@ -199,8 +212,12 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat expectedLen := s.concurrentUserRequestsPerSecond * int(s.concurrentUserRequestsDuration/time.Second) s.assert.Len(s.userResponses, expectedLen, "number of user responses is not as expected") - for _, res := range s.userResponses { - s.assert.Equal(res.StatusCode, s.modifiedNameStatusCode, "expected user status code") + for i, res := range s.userResponses { + if s.modifiedNameAttempt == nil || *s.modifiedNameAttempt == i+1 { + s.assert.Equal(res.StatusCode, s.modifiedNameStatusCode, "expected user status code") + } else { + s.assert.Equal(res.StatusCode, 200, "expected user status code") + } } return s diff --git a/internal/app/concurrent_proxy_test.go b/internal/app/concurrent_proxy_test.go index 7cdccb3..855487d 100644 --- a/internal/app/concurrent_proxy_test.go +++ b/internal/app/concurrent_proxy_test.go @@ -24,6 +24,22 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi all_the_address_responses_should_have_the_right_status_code() } +func TestConcurrentRequestsWithAnAttemptModifierHaveTheCorrectResponses(t *testing.T) { + given, when, then := NewConcurrentProxyStage(t) + + given. + a_pact_that_allows_any_names().and(). + a_modified_name_status_code().and(). + a_modified_name_response_attempt_of(1) + + when. + x_concurrent_user_requests_per_second_are_made_for_y_seconds(2, 1*time.Second).and(). + the_concurrent_requests_are_sent() + + then. + all_the_user_responses_should_have_the_right_status_code() +} + func TestConcurrentRequestsWaitForAllPacts(t *testing.T) { given, when, then := NewConcurrentProxyStage(t)