Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: request count race condition #35

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions internal/app/concurrent_proxy_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ConcurrentProxyStage struct {
proxy *pactproxy.PactProxy
pact *dsl.Pact
modifiedNameStatusCode int
modifiedNameAttempt *int
modifiedAddressStatusCode int
concurrentUserRequestsPerSecond int
concurrentUserRequestsDuration time.Duration
Expand Down Expand Up @@ -64,6 +65,10 @@ func (s *ConcurrentProxyStage) a_modified_name_status_code() *ConcurrentProxySta
return s
}

func (s *ConcurrentProxyStage) a_modified_name_response_attempt_of(i int) {
s.modifiedNameAttempt = &i
}

func (s *ConcurrentProxyStage) a_modified_address_status_code() *ConcurrentProxyStage {
s.modifiedAddressStatusCode = http.StatusConflict
return s
Expand Down Expand Up @@ -122,7 +127,7 @@ func (s *ConcurrentProxyStage) x_concurrent_address_requests_per_second_are_made
func (s *ConcurrentProxyStage) the_concurrent_requests_are_sent() {
err := s.pact.Verify(func() (err error) {
if s.modifiedNameStatusCode != 0 {
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedNameStatusCode), nil)
s.proxy.ForInteraction(postNamePactWithAnyName).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedNameStatusCode), s.modifiedNameAttempt)
}
if s.modifiedAddressStatusCode != 0 {
s.proxy.ForInteraction(postAddressPact).AddModifier("$.status", fmt.Sprintf("%d", s.modifiedAddressStatusCode), nil)
Expand Down Expand Up @@ -184,9 +189,17 @@ func sendConcurrentRequests(requests int, d time.Duration, f func()) {
case <-stop:
return
case <-ticker.C:
var wg sync.WaitGroup

for i := 0; i < requests; i++ {
f()
wg.Add(1)

go func() {
defer wg.Done()
f()
}()
}
wg.Wait()
}
}
}()
Expand All @@ -199,8 +212,12 @@ func (s *ConcurrentProxyStage) all_the_user_responses_should_have_the_right_stat
expectedLen := s.concurrentUserRequestsPerSecond * int(s.concurrentUserRequestsDuration/time.Second)
s.assert.Len(s.userResponses, expectedLen, "number of user responses is not as expected")

for _, res := range s.userResponses {
s.assert.Equal(res.StatusCode, s.modifiedNameStatusCode, "expected user status code")
for i, res := range s.userResponses {
if s.modifiedNameAttempt == nil || *s.modifiedNameAttempt == i+1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying attempt and number of concurrent requests have different results here - runs are concurrent so it may be out of order, perhaps the test would be more valuable if window of opportunity to fail was greater like using another fixture in test for example 10 requests per second and modify on attempt 4, or 7.

you'd need to figure a way to identify request with response in another way though

s.assert.Equal(res.StatusCode, s.modifiedNameStatusCode, "expected user status code")
} else {
s.assert.Equal(res.StatusCode, 200, "expected user status code")
}
}

return s
Expand Down
16 changes: 16 additions & 0 deletions internal/app/concurrent_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ func TestConcurrentRequestsForDifferentModifiersHaveTheCorrectResponses(t *testi
all_the_address_responses_should_have_the_right_status_code()
}

func TestConcurrentRequestsWithAnAttemptModifierHaveTheCorrectResponses(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)

given.
a_pact_that_allows_any_names().and().
a_modified_name_status_code().and().
a_modified_name_response_attempt_of(1)

when.
x_concurrent_user_requests_per_second_are_made_for_y_seconds(2, 1*time.Second).and().
the_concurrent_requests_are_sent()

then.
all_the_user_responses_should_have_the_right_status_code()
}

func TestConcurrentRequestsWaitForAllPacts(t *testing.T) {
given, when, then := NewConcurrentProxyStage(t)

Expand Down
3 changes: 2 additions & 1 deletion internal/app/pactproxy/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (i *Interaction) EvaluateConstrains(request requestDocument, interactions *
return result, violations
}

func (i *Interaction) StoreRequest(request requestDocument) {
func (i *Interaction) StoreRequest(request requestDocument) int {
i.mu.Lock()
defer i.mu.Unlock()
i.LastRequest = request
Expand All @@ -374,6 +374,7 @@ func (i *Interaction) StoreRequest(request requestDocument) {
if i.recordHistory {
i.RequestHistory = append(i.RequestHistory, request)
}
return i.RequestCount
}

func (i *Interaction) HasRequests(count int) bool {
Expand Down
6 changes: 2 additions & 4 deletions internal/app/pactproxy/modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func (ims *interactionModifiers) Modifiers() []*interactionModifier {
return result
}

func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
func (ims *interactionModifiers) modifyBody(b []byte, requestCount int) ([]byte, error) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if strings.HasPrefix(m.Path, "$.body.") {
if m.Attempt == nil || *m.Attempt == requestCount {
var err error
Expand All @@ -56,9 +55,8 @@ func (ims *interactionModifiers) modifyBody(b []byte) ([]byte, error) {
return b, nil
}

func (ims *interactionModifiers) modifyStatusCode() (bool, int) {
func (ims *interactionModifiers) modifyStatusCode(requestCount int) (bool, int) {
for _, m := range ims.Modifiers() {
requestCount := ims.interaction.getRequestCount()
if m.Path == "$.status" {
if m.Attempt == nil || *m.Attempt == requestCount {
code, err := strconv.Atoi(fmt.Sprintf("%v", m.Value))
Expand Down
15 changes: 11 additions & 4 deletions internal/app/pactproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type api struct {
echo.Context
}

type matchedInteraction struct {
interaction *Interaction
requestCount int
}

func (a *api) ProxyRequest(c echo.Context) error {
a.proxy.ServeHTTP(c.Response(), c.Request())
return nil
Expand Down Expand Up @@ -286,12 +291,14 @@ func (a *api) indexHandler(c echo.Context) error {
request["headers"] = h

unmatched := make(map[string][]string)
matched := make([]*Interaction, 0)
matched := make([]matchedInteraction, 0)
for _, interaction := range allInteractions {
ok, info := interaction.EvaluateConstrains(request, a.interactions)
if ok {
interaction.StoreRequest(request)
matched = append(matched, interaction)
matched = append(matched, matchedInteraction{
interaction: interaction,
requestCount: interaction.StoreRequest(request),
})
} else {
unmatched[interaction.Description] = info
}
Expand All @@ -306,7 +313,7 @@ func (a *api) indexHandler(c echo.Context) error {
}

a.notify.Notify()
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), interactions: matched}, req)
a.proxy.ServeHTTP(&ResponseModificationWriter{res: c.Response(), matchedInteractions: matched}, req)
return nil
}

Expand Down
17 changes: 9 additions & 8 deletions internal/app/pactproxy/response_modification_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
)

type ResponseModificationWriter struct {
res http.ResponseWriter
interactions []*Interaction
originalResponse []byte
statusCode int
request requestDocument
res http.ResponseWriter
matchedInteractions []matchedInteraction
originalResponse []byte
statusCode int
}

func (m *ResponseModificationWriter) Header() http.Header {
Expand All @@ -29,8 +30,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {
}

var modifiedBody []byte
for _, i := range m.interactions {
modifiedBody, err = i.modifiers.modifyBody(m.originalResponse)
for _, match := range m.matchedInteractions {
modifiedBody, err = match.interaction.modifiers.modifyBody(m.originalResponse, match.requestCount)
if err != nil {
return 0, err
}
Expand All @@ -51,8 +52,8 @@ func (m *ResponseModificationWriter) Write(b []byte) (int, error) {

func (m *ResponseModificationWriter) WriteHeader(statusCode int) {
m.statusCode = statusCode
for _, i := range m.interactions {
ok, code := i.modifiers.modifyStatusCode()
for _, match := range m.matchedInteractions {
ok, code := match.interaction.modifiers.modifyStatusCode(match.requestCount)
if ok {
m.statusCode = code
break
Expand Down