From 83a43133488ee6a696b5acdeb8d682be5acd5b77 Mon Sep 17 00:00:00 2001 From: Shreya Garge <125353803+shreyagarge-f3@users.noreply.github.com> Date: Wed, 15 Nov 2023 18:28:43 +0000 Subject: [PATCH] Fix: request counter race for concurrent requests matching the same interaction (#46) For each interaction, we keep track of the number of times it has been called. That counter can be used in the modifiers to change the response returned to the client. For example it is possible to change the return status on the 2nd invocation of a specific interaction. The code has a bug when concurrent requests happen for the same interaction. The change modifies the way the concurrency is managed just for the request counter. As part of this change, we have added a new test that can be used to prove that the original code has the issue and the proposed version fixes it. Co-authored-by: Andrea Rosa Co-authored-by: Shreya Garge --- internal/app/concurrent_proxy_stage_test.go | 48 +++++++++++++++++++ internal/app/concurrent_proxy_test.go | 10 ++++ internal/app/pactproxy/interaction.go | 3 +- internal/app/pactproxy/modifier.go | 6 +-- internal/app/pactproxy/proxy.go | 15 ++++-- .../pactproxy/response_modification_writer.go | 16 +++---- 6 files changed, 81 insertions(+), 17 deletions(-) diff --git a/internal/app/concurrent_proxy_stage_test.go b/internal/app/concurrent_proxy_stage_test.go index 331e77e..f8e9366 100644 --- a/internal/app/concurrent_proxy_stage_test.go +++ b/internal/app/concurrent_proxy_stage_test.go @@ -3,6 +3,7 @@ package app import ( "context" "fmt" + "io" "net/http" "strings" "sync" @@ -152,6 +153,34 @@ func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() { s.assert.NoError(err) } +func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent_with_attempt_based_modifier() { + err := s.pact.Verify(func() (err error) { + attempt := 2 + s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", http.StatusConflict), &attempt) + s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.body.name", "Form3", &attempt) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + s.makeUserRequest() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.makeUserRequest() + }() + + wg.Wait() + + return nil + }) + + s.assert.NoError(err) +} + func (s *ConcurrentProxyStage) makeUserRequest() { u := fmt.Sprintf("http://localhost:%s/users", proxyURL.Port()) req, err := http.NewRequest("POST", u, strings.NewReader(`{"name":"jim"}`)) @@ -208,6 +237,25 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat return s } +func (s *ConcurrentProxyStage) the_second_user_response_should_have_the_right_status_code_and_body() *ConcurrentProxyStage { + statuses := make(map[int]int) + bodies := make(map[string]int) + for _, res := range s.userResponses { + statuses[res.StatusCode] += 1 + bd, err := io.ReadAll(res.Body) + s.assert.NoError(err) + res.Body.Close() + bodies[strings.ReplaceAll(strings.TrimSpace(string(bd)), "\"", "")] += 1 + } + s.assert.Len(statuses, 2) + s.assert.Len(bodies, 2) + s.assert.Equal(1, statuses[http.StatusConflict]) + s.assert.Equal(1, statuses[http.StatusOK]) + s.assert.Equal(1, bodies["{name:any}"]) + s.assert.Equal(1, bodies["{name:Form3}"]) + return s +} + func (s *ConcurrentProxyStage) all_the_address_responses_should_have_the_right_status_code() *ConcurrentProxyStage { expectedLen := s.concurrentAddressRequestsPerSecond * int(s.concurrentAddressRequestsDuration/time.Second) s.assert.Len(s.addressResponses, expectedLen, "number of address responses is not as expected") diff --git a/internal/app/concurrent_proxy_test.go b/internal/app/concurrent_proxy_test.go index 7cdccb3..e37a245 100644 --- a/internal/app/concurrent_proxy_test.go +++ b/internal/app/concurrent_proxy_test.go @@ -24,6 +24,16 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi all_the_address_responses_should_have_the_right_status_code() } +func TestConcurrentRequestsForSameModifierBasedOnAttempt(t *testing.T) { + given, when, then := NewConcurrentProxyStage(t) + given. + a_pact_that_allows_any_names() + when. + the_concurrent_requests_are_sent_with_attempt_based_modifier() + then. + the_second_user_response_should_have_the_right_status_code_and_body() +} + func TestConcurrentRequestsWaitForAllPacts(t *testing.T) { given, when, then := NewConcurrentProxyStage(t) diff --git a/internal/app/pactproxy/interaction.go b/internal/app/pactproxy/interaction.go index d5b678b..8c5bd29 100644 --- a/internal/app/pactproxy/interaction.go +++ b/internal/app/pactproxy/interaction.go @@ -375,7 +375,7 @@ func (i *Interaction) EvaluateConstraints(request requestDocument, interactions return result, violations } -func (i *Interaction) StoreRequest(request requestDocument) { +func (i *Interaction) StoreRequest(request requestDocument) int { i.mu.Lock() defer i.mu.Unlock() i.LastRequest = request @@ -384,6 +384,7 @@ func (i *Interaction) StoreRequest(request requestDocument) { if i.recordHistory { i.RequestHistory = append(i.RequestHistory, request) } + return i.RequestCount } func (i *Interaction) HasRequests(count int) bool { diff --git a/internal/app/pactproxy/modifier.go b/internal/app/pactproxy/modifier.go index 6cbd4dc..b1d2747 100644 --- a/internal/app/pactproxy/modifier.go +++ b/internal/app/pactproxy/modifier.go @@ -41,9 +41,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier { return result } -func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) { +func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) { for _, m := range ims.Modifiers() { - requestCount := ims.interaction.getRequestCount() if m.Path == "$.bytes.body" { if v, ok := m.Value.(string); ok && m.Attempt == nil || *m.Attempt == requestCount { var err error @@ -66,9 +65,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) { return b, nil } -func (ims *interactionModifiers) modifyStatusCode() (bool, int) { +func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) { for _, m := range ims.Modifiers() { - requestCount := ims.interaction.getRequestCount() if m.Path == "$.status" { if m.Attempt == nil || *m.Attempt == requestCount { code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value)) diff --git a/internal/app/pactproxy/proxy.go b/internal/app/pactproxy/proxy.go index 129b0c0..6ac226a 100644 --- a/internal/app/pactproxy/proxy.go +++ b/internal/app/pactproxy/proxy.go @@ -251,6 +251,11 @@ func (a *api) interactionsWaitHandler(c echo.Context) error { return c.NoContent(http.StatusOK) } +type matchedInteraction struct { + interaction *Interaction + attemptCount int +} + func (a *api) indexHandler(c echo.Context) error { req := c.Request() log.Infof("proxying %s %s %+v", req.Method, req.URL.Path, req.Header) @@ -299,12 +304,14 @@ func (a *api) indexHandler(c echo.Context) error { request["headers"] = h unmatched := make(map[string][]string) - matched := make([]*Interaction, 0) + matched := make([]matchedInteraction, 0) for _, interaction := range allInteractions { ok, info := interaction.EvaluateConstraints(request, a.interactions) if ok { - interaction.StoreRequest(request) - matched = append(matched, interaction) + matched = append(matched, matchedInteraction{ + interaction: interaction, + attemptCount: interaction.StoreRequest(request), + }) } else { unmatched[interaction.Description] = info } @@ -319,7 +326,7 @@ func (a *api) indexHandler(c echo.Context) error { } a.notify.Notify() - a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req) + a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), matchedInteractions: matched}, req) return nil } diff --git a/internal/app/pactproxy/response_modification_writer.go b/internal/app/pactproxy/response_modification_writer.go index e8aeb9a..9b238c1 100644 --- a/internal/app/pactproxy/response_modification_writer.go +++ b/internal/app/pactproxy/response_modification_writer.go @@ -7,10 +7,10 @@ import ( ) type ResponseModificationWriter struct { - res http.ResponseWriter - interactions []*Interaction - originalResponse []byte - statusCode int + res http.ResponseWriter + matchedInteractions []matchedInteraction + originalResponse []byte + statusCode int } func (m *ResponseModificationWriter) Header() http.Header { @@ -29,8 +29,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) { } var modifiedBody []byte - for _, i := range m.interactions { - modifiedBody, err = i.modifiers.modifyBody(m.originalResponse) + for _, i := range m.matchedInteractions { + modifiedBody, err = i.interaction.modifiers.modifyBody(m.originalResponse, i.attemptCount) if err != nil { return 0, err } @@ -51,8 +51,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) { func (m *ResponseModificationWriter) WriteHeader(statusCode int) { m.statusCode = statusCode - for _, i := range m.interactions { - ok, code := i.modifiers.modifyStatusCode() + for _, i := range m.matchedInteractions { + ok, code := i.interaction.modifiers.modifyStatusCode(i.attemptCount) if ok { m.statusCode = code break