Skip to content

Commit

Permalink
fix: concurrency issue with attempts counter
Browse files Browse the repository at this point in the history
For each interaction, we keep track of the number of times it has been
called.
That counter can be used in the modifiers to change the response
returned to the client. For example it is possible to change the return
status on the 2nd invocation of a specific interaction.

The code has a bug when concurrent requests happen for the same
interaction.
The change modifies the way the concurrency is managed just for the
request counter, using channels instead of Lock.

As part of this change, we have added a new test that can be used to
prove that the original code has the issue and the proposed version
fixes it.

Co-authored-by: Shreya Garge <[email protected]>
  • Loading branch information
andrea-f3 and shreyagarge-f3 committed Nov 14, 2023
1 parent 7388a82 commit 1988374
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 23 deletions.
55 changes: 55 additions & 0 deletions internal/app/concurrent_proxy_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -152,6 +153,34 @@ func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() {
s.assert.NoError(err)
}

func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent_with_attempt_based_modifier() {
err := s.pact.Verify(func() (err error) {
attempt := 2
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", http.StatusConflict), &attempt)
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.body.name", "Form3", &attempt)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
s.makeUserRequest()
}()

wg.Add(1)
go func() {
defer wg.Done()
s.makeUserRequest()
}()

wg.Wait()

return nil
})

s.assert.NoError(err)
}

func (s *ConcurrentProxyStage) makeUserRequest() {
u := fmt.Sprintf("http://localhost:%s/users", proxyURL.Port())
req, err := http.NewRequest("POST", u, strings.NewReader(`{"name":"jim"}`))
Expand Down Expand Up @@ -208,6 +237,32 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat
return s
}

func (s *ConcurrentProxyStage) the_second_user_response_should_have_the_right_status_code() *ConcurrentProxyStage {
c := 0
for _, res := range s.userResponses {
if res.StatusCode == http.StatusConflict {
c++
}
}
s.assert.Equal(1, c)
return s
}

func (s *ConcurrentProxyStage) the_second_user_response_should_have_a_modified_body() *ConcurrentProxyStage {
c := 0
for _, res := range s.userResponses {
bd, err := io.ReadAll(res.Body)
s.assert.NoError(err)
res.Body.Close()

if strings.Contains(string(bd), "Form3") {
c++
}
}
s.assert.Equal(1, c)
return s
}

func (s *ConcurrentProxyStage) all_the_address_responses_should_have_the_right_status_code() *ConcurrentProxyStage {
expectedLen := s.concurrentAddressRequestsPerSecond * int(s.concurrentAddressRequestsDuration/time.Second)
s.assert.Len(s.addressResponses, expectedLen, "number of address responses is not as expected")
Expand Down
13 changes: 13 additions & 0 deletions internal/app/concurrent_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi
all_the_address_responses_should_have_the_right_status_code()
}

func TestConcurrentRequestsForSameModifierBasedOnAttempt(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)
given.
a_pact_that_allows_any_names()
when.
x_concurrent_user_requests_per_second_are_made_for_y_seconds(2, 1*time.Second).and().
the_concurrent_requests_are_sent_with_attempt_based_modifier()
then.
the_second_user_response_should_have_the_right_status_code().and().
the_second_user_response_should_have_a_modified_body()

}

func TestConcurrentRequestsWaitForAllPacts(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)

Expand Down
27 changes: 13 additions & 14 deletions internal/app/pactproxy/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (m *regexPathMatcher) match(val string) bool {

type Interaction struct {
mu sync.RWMutex
updateChannel chan struct{}
getChannel chan int
doneChannel chan struct{}
pathMatcher pathMatcher
Method string `json:"method"`
Alias string `json:"alias"`
Expand Down Expand Up @@ -93,12 +96,15 @@ func LoadInteraction(data []byte, alias string) (*Interaction, error) {
propertiesWithMatchingRule := getBodyPropertiesWithMatchingRules(matchingRules)

interaction := &Interaction{
pathMatcher: matcher,
Method: request["method"].(string),
Alias: alias,
definition: definition,
Description: description,
constraints: map[string]interactionConstraint{},
getChannel: make(chan int, 100),
updateChannel: make(chan struct{}),
doneChannel: make(chan struct{}),
pathMatcher: matcher,
Method: request["method"].(string),
Alias: alias,
definition: definition,
Description: description,
constraints: map[string]interactionConstraint{},
}

interaction.modifiers = interactionModifiers{
Expand Down Expand Up @@ -379,19 +385,12 @@ func (i *Interaction) StoreRequest(request requestDocument) {
i.mu.Lock()
defer i.mu.Unlock()
i.LastRequest = request
i.RequestCount++

if i.recordHistory {
i.RequestHistory = append(i.RequestHistory, request)
}
}

func (i *Interaction) HasRequests(count int) bool {
return i.getRequestCount() >= count
}

func (i *Interaction) getRequestCount() int {
i.mu.RLock()
defer i.mu.RUnlock()
return i.RequestCount
return i.RequestCount >= count
}
2 changes: 2 additions & 0 deletions internal/app/pactproxy/interactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func (i *Interactions) Store(interaction *Interaction) {

func (i *Interactions) Clear() {
i.interactions.Range(func(k, _ interface{}) bool {
v, _ := i.interactions.Load(k)
v.(*Interaction).doneChannel <- struct{}{}
i.interactions.Delete(k)
return true
})
Expand Down
6 changes: 2 additions & 4 deletions internal/app/pactproxy/modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier {
return result
}

func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if m.Path == "$.bytes.body" {
if v, ok := m.Value.(string); ok && m.Attempt == nil || *m.Attempt == requestCount {
var err error
Expand All @@ -66,9 +65,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
return b, nil
}

func (ims *interactionModifiers) modifyStatusCode() (bool, int) {
func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if m.Path == "$.status" {
if m.Attempt == nil || *m.Attempt == requestCount {
code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value))
Expand Down
18 changes: 17 additions & 1 deletion internal/app/pactproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ func (a *api) interactionsPostHandler(c echo.Context) error {
interaction.recordHistory = a.recordHistory
a.interactions.Store(interaction)

go func() {
for {
select {
case <-interaction.updateChannel:
interaction.RequestCount += 1
interaction.getChannel <- interaction.RequestCount
case <-interaction.doneChannel:
return

}
}
}()

err = c.Request().Body.Close()
if err != nil {
return c.JSON(http.StatusBadRequest, httpresponse.Error(err.Error()))
Expand Down Expand Up @@ -304,6 +317,9 @@ func (a *api) indexHandler(c echo.Context) error {
ok, info := interaction.EvaluateConstraints(request, a.interactions)
if ok {
interaction.StoreRequest(request)
go func() {
interaction.updateChannel <- struct{}{}
}()
matched = append(matched, interaction)
} else {
unmatched[interaction.Description] = info
Expand All @@ -319,7 +335,7 @@ func (a *api) indexHandler(c echo.Context) error {
}

a.notify.Notify()
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req)
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched, attemptTracker: make(map[string]int)}, req)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/app/pactproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestInteractionsGetHandler(t *testing.T) {
return &interactions
}(),
code: http.StatusOK,
body: `{"method":"","alias":"test","description":"test","request_count":1,"last_request":{"body":{"foo":"bar"},"path":"/testpath"}}`,
// request_count is 0 as the interaction has not been matched yet
body: `{"method":"","alias":"test","description":"test","request_count":0,"last_request":{"body":{"foo":"bar"},"path":"/testpath"}}`,
},
{
name: "interaction found - with request history",
Expand All @@ -130,7 +131,8 @@ func TestInteractionsGetHandler(t *testing.T) {
return &interactions
}(),
code: http.StatusOK,
body: `{"method":"","alias":"test","description":"test","request_count":1,"request_history":[{"body":{"foo":"bar"},"path":"/testpath"}],"last_request":{"body":{"foo":"bar"},"path":"/testpath"}}`,
// request_count is 0 as the interaction has not been matched yet
body: `{"method":"","alias":"test","description":"test","request_count":0,"request_history":[{"body":{"foo":"bar"},"path":"/testpath"}],"last_request":{"body":{"foo":"bar"},"path":"/testpath"}}`,
},
{
name: "interaction not found",
Expand Down
9 changes: 7 additions & 2 deletions internal/app/pactproxy/response_modification_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ResponseModificationWriter struct {
interactions []*Interaction
originalResponse []byte
statusCode int
attemptTracker map[string]int
}

func (m *ResponseModificationWriter) Header() http.Header {
Expand All @@ -30,13 +31,15 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {

var modifiedBody []byte
for _, i := range m.interactions {
modifiedBody, err = i.modifiers.modifyBody(m.originalResponse)
requestCount := m.attemptTracker[i.Alias]
modifiedBody, err = i.modifiers.modifyBody(m.originalResponse, requestCount)
if err != nil {
return 0, err
}
}

m.Header().Set("Content-Length", strconv.Itoa(len(modifiedBody)))

m.res.WriteHeader(m.statusCode)
writtenBytes, err := m.res.Write(modifiedBody)
if err != nil {
Expand All @@ -52,7 +55,9 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {
func (m *ResponseModificationWriter) WriteHeader(statusCode int) {
m.statusCode = statusCode
for _, i := range m.interactions {
ok, code := i.modifiers.modifyStatusCode()
requestCount := <-i.getChannel
m.attemptTracker[i.Alias] = requestCount
ok, code := i.modifiers.modifyStatusCode(m.attemptTracker[i.Alias])
if ok {
m.statusCode = code
break
Expand Down

0 comments on commit 1988374

Please sign in to comment.