Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Validate received transfers adhere to the spec
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed Jul 16, 2019
1 parent 279d72e commit de9a8ae
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 7 deletions.
80 changes: 76 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,18 +1166,90 @@ func (l *link) muxFlow() error {
}

func (l *link) muxReceive(fr performTransfer) error {
// record the delivery ID and message format if this is
// the first frame of the message
if !l.more {
// this is the first transfer of a message,
// record the delivery ID, message format,
// and delivery Tag
if fr.DeliveryID != nil {
l.msg.deliveryID = *fr.DeliveryID
}

if fr.MessageFormat != nil {
l.msg.Format = *fr.MessageFormat
}

l.msg.DeliveryTag = fr.DeliveryTag

// these fields are required on first transfer of a message
if fr.DeliveryID == nil {
msg := "received message without a delivery-id"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
if fr.MessageFormat == nil {
msg := "received message without a message-format"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
if fr.DeliveryTag == nil {
msg := "received message without a delivery-tag"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
} else {
// this is a continuation of a multipart message
// some fields may be omitted on continuation transfers,
// but if they are included they must be consistent
// with the first.

if fr.DeliveryID != nil && *fr.DeliveryID != l.msg.deliveryID {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-id: %d != %d",
*fr.DeliveryID, l.msg.deliveryID,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
if fr.MessageFormat != nil && *fr.MessageFormat != l.msg.Format {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent message-format: %d != %d",
*fr.MessageFormat, l.msg.Format,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
if fr.DeliveryTag != nil && !bytes.Equal(fr.DeliveryTag, l.msg.DeliveryTag) {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-tag: %q != %q",
fr.DeliveryTag, l.msg.DeliveryTag,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errorNew(msg)
}
}

// discard message if it's been aborted
if fr.Aborted {
l.buf.reset()
l.msg = Message{}
l.more = false
return nil
}

// ensure maxMessageSize will not be exceeded
Expand Down
1 change: 1 addition & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ func TestIntegration_EventHubs_RoundTrip(t *testing.T) {
t.Errorf("Partition %d got error: %+v", i, err)
}
}
return
}

// check that data matches
Expand Down
6 changes: 3 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,11 +1272,11 @@ func (t *performTransfer) frameBody() {}

func (t performTransfer) String() string {
deliveryTag := "<nil>"
if t.DeliveryID != nil {
deliveryTag = string(t.DeliveryTag)
if t.DeliveryTag != nil {
deliveryTag = fmt.Sprintf("%q", t.DeliveryTag)
}

return fmt.Sprintf("Transfer{Handle: %d, DeliveryID: %s, DeliveryTag: %q, MessageFormat: %s, "+
return fmt.Sprintf("Transfer{Handle: %d, DeliveryID: %s, DeliveryTag: %s, MessageFormat: %s, "+
"Settled: %t, More: %t, ReceiverSettleMode: %s, State: %v, Resume: %t, Aborted: %t, "+
"Batchable: %t, Payload [size]: %d}",
t.Handle,
Expand Down

0 comments on commit de9a8ae

Please sign in to comment.