Skip to content

Commit

Permalink
Fix: request counter race for concurrent requests matching the same i…
Browse files Browse the repository at this point in the history
…nteraction (#46)


For each interaction, we keep track of the number of times it has been
called.
That counter can be used in the modifiers to change the response
returned to the client. For example it is possible to change the return
status on the 2nd invocation of a specific interaction.

The code has a bug when concurrent requests happen for the same
interaction.
The change modifies the way the concurrency is managed just for the
request counter.

As part of this change, we have added a new test that can be used to
prove that the original code has the issue and the proposed version
fixes it.


Co-authored-by: Andrea Rosa <[email protected]>
Co-authored-by: Shreya Garge <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2023
1 parent 7388a82 commit 83a4313
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 17 deletions.
48 changes: 48 additions & 0 deletions internal/app/concurrent_proxy_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -152,6 +153,34 @@ func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() {
s.assert.NoError(err)
}

func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent_with_attempt_based_modifier() {
err := s.pact.Verify(func() (err error) {
attempt := 2
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", http.StatusConflict), &attempt)
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.body.name", "Form3", &attempt)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
s.makeUserRequest()
}()

wg.Add(1)
go func() {
defer wg.Done()
s.makeUserRequest()
}()

wg.Wait()

return nil
})

s.assert.NoError(err)
}

func (s *ConcurrentProxyStage) makeUserRequest() {
u := fmt.Sprintf("http://localhost:%s/users", proxyURL.Port())
req, err := http.NewRequest("POST", u, strings.NewReader(`{"name":"jim"}`))
Expand Down Expand Up @@ -208,6 +237,25 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat
return s
}

func (s *ConcurrentProxyStage) the_second_user_response_should_have_the_right_status_code_and_body() *ConcurrentProxyStage {
statuses := make(map[int]int)
bodies := make(map[string]int)
for _, res := range s.userResponses {
statuses[res.StatusCode] += 1
bd, err := io.ReadAll(res.Body)
s.assert.NoError(err)
res.Body.Close()
bodies[strings.ReplaceAll(strings.TrimSpace(string(bd)), "\"", "")] += 1
}
s.assert.Len(statuses, 2)
s.assert.Len(bodies, 2)
s.assert.Equal(1, statuses[http.StatusConflict])
s.assert.Equal(1, statuses[http.StatusOK])
s.assert.Equal(1, bodies["{name:any}"])
s.assert.Equal(1, bodies["{name:Form3}"])
return s
}

func (s *ConcurrentProxyStage) all_the_address_responses_should_have_the_right_status_code() *ConcurrentProxyStage {
expectedLen := s.concurrentAddressRequestsPerSecond * int(s.concurrentAddressRequestsDuration/time.Second)
s.assert.Len(s.addressResponses, expectedLen, "number of address responses is not as expected")
Expand Down
10 changes: 10 additions & 0 deletions internal/app/concurrent_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi
all_the_address_responses_should_have_the_right_status_code()
}

func TestConcurrentRequestsForSameModifierBasedOnAttempt(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)
given.
a_pact_that_allows_any_names()
when.
the_concurrent_requests_are_sent_with_attempt_based_modifier()
then.
the_second_user_response_should_have_the_right_status_code_and_body()
}

func TestConcurrentRequestsWaitForAllPacts(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)

Expand Down
3 changes: 2 additions & 1 deletion internal/app/pactproxy/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (i *Interaction) EvaluateConstraints(request requestDocument, interactions
return result, violations
}

func (i *Interaction) StoreRequest(request requestDocument) {
func (i *Interaction) StoreRequest(request requestDocument) int {
i.mu.Lock()
defer i.mu.Unlock()
i.LastRequest = request
Expand All @@ -384,6 +384,7 @@ func (i *Interaction) StoreRequest(request requestDocument) {
if i.recordHistory {
i.RequestHistory = append(i.RequestHistory, request)
}
return i.RequestCount
}

func (i *Interaction) HasRequests(count int) bool {
Expand Down
6 changes: 2 additions & 4 deletions internal/app/pactproxy/modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier {
return result
}

func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if m.Path == "$.bytes.body" {
if v, ok := m.Value.(string); ok && m.Attempt == nil || *m.Attempt == requestCount {
var err error
Expand All @@ -66,9 +65,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
return b, nil
}

func (ims *interactionModifiers) modifyStatusCode() (bool, int) {
func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if m.Path == "$.status" {
if m.Attempt == nil || *m.Attempt == requestCount {
code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value))
Expand Down
15 changes: 11 additions & 4 deletions internal/app/pactproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (a *api) interactionsWaitHandler(c echo.Context) error {
return c.NoContent(http.StatusOK)
}

type matchedInteraction struct {
interaction *Interaction
attemptCount int
}

func (a *api) indexHandler(c echo.Context) error {
req := c.Request()
log.Infof("proxying %s %s %+v", req.Method, req.URL.Path, req.Header)
Expand Down Expand Up @@ -299,12 +304,14 @@ func (a *api) indexHandler(c echo.Context) error {
request["headers"] = h

unmatched := make(map[string][]string)
matched := make([]*Interaction, 0)
matched := make([]matchedInteraction, 0)
for _, interaction := range allInteractions {
ok, info := interaction.EvaluateConstraints(request, a.interactions)
if ok {
interaction.StoreRequest(request)
matched = append(matched, interaction)
matched = append(matched, matchedInteraction{
interaction: interaction,
attemptCount: interaction.StoreRequest(request),
})
} else {
unmatched[interaction.Description] = info
}
Expand All @@ -319,7 +326,7 @@ func (a *api) indexHandler(c echo.Context) error {
}

a.notify.Notify()
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req)
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), matchedInteractions: matched}, req)
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions internal/app/pactproxy/response_modification_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

type ResponseModificationWriter struct {
res http.ResponseWriter
interactions []*Interaction
originalResponse []byte
statusCode int
res http.ResponseWriter
matchedInteractions []matchedInteraction
originalResponse []byte
statusCode int
}

func (m *ResponseModificationWriter) Header() http.Header {
Expand All @@ -29,8 +29,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {
}

var modifiedBody []byte
for _, i := range m.interactions {
modifiedBody, err = i.modifiers.modifyBody(m.originalResponse)
for _, i := range m.matchedInteractions {
modifiedBody, err = i.interaction.modifiers.modifyBody(m.originalResponse, i.attemptCount)
if err != nil {
return 0, err
}
Expand All @@ -51,8 +51,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {

func (m *ResponseModificationWriter) WriteHeader(statusCode int) {
m.statusCode = statusCode
for _, i := range m.interactions {
ok, code := i.modifiers.modifyStatusCode()
for _, i := range m.matchedInteractions {
ok, code := i.interaction.modifiers.modifyStatusCode(i.attemptCount)
if ok {
m.statusCode = code
break
Expand Down

0 comments on commit 83a4313

Please sign in to comment.