Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into mvg/semconv/metrics…
Browse files Browse the repository at this point in the history
…-server/old
  • Loading branch information
MadVikingGod committed Aug 2, 2024
2 parents 3f304d0 + c619919 commit 88156b3
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 144 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/owner.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Module: [e.g. go.opentelemetry.io/contrib/zpages]
- [ ] I understand I will be responsible for the stability and versioning compliance of the module
- [ ] I understand I will be responsible for deciding any additional Code Owners of the module

[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/community-membership.md#member
[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member

### Relevant experience

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The deprecated `go.opentelemetry.io/contrib/processors/baggagecopy` package is removed. (#5853)

### Fixed

- Race condition when reading the HTTP body and writing the response in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#5916)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#####################################################
#
# Learn about membership in OpenTelemetry community:
# https://github.com/open-telemetry/community/blob/main/community-membership.md
# https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member
#
# Learn about Code Owners policy in OpenTelemetry Go Contrib:
# https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/CONTRIBUTING.md#code-owners
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ How much interaction with the module is required before becoming a Code Owner is
Code Ownership is ultimately up to the judgement of the existing Code Owners and Maintainers of this repository.
Meeting the above requirements is not a guarantee to be granted Code Ownership.

[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/community-membership.md#member
[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member

### Responsibilities

Expand All @@ -55,7 +55,7 @@ It is at the discretion of the repository Maintainers and fellow Code Owners to
If a Code Owner is determined to be unable to perform their duty, a repository Maintainer will remove their ownership.

Inactivity greater than 5 months, during which time there are active Issues or Pull Requests to address, is deemed an automatic disqualification from being a Code Owner.
A repository Maintainer may remove an Code Owner inactive for this length.
A repository Maintainer may remove an Code Owner inactive for this length.

## Filing Issues

Expand Down Expand Up @@ -146,4 +146,4 @@ Emeritus:
### Become an Approver or a Maintainer

See the [community membership document in OpenTelemetry community
repo](https://github.com/open-telemetry/community/blob/main/community-membership.md).
repo](https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md).
35 changes: 15 additions & 20 deletions instrumentation/net/http/otelhttp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/felixge/httpsnoop"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -132,14 +133,12 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http
}
}

var bw bodyWrapper
// if request body is nil or NoBody, we don't want to mutate the body as it
// will affect the identity of it in an unforeseeable way because we assert
// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
bw := request.NewBodyWrapper(r.Body, readRecordFunc)
if r.Body != nil && r.Body != http.NoBody {
bw.ReadCloser = r.Body
bw.record = readRecordFunc
r.Body = &bw
r.Body = bw
}

writeRecordFunc := func(int64) {}
Expand All @@ -149,13 +148,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http
}
}

rww := &respWriterWrapper{
ResponseWriter: w,
record: writeRecordFunc,
ctx: ctx,
props: h.propagators,
statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything
}
rww := request.NewRespWriterWrapper(w, writeRecordFunc)

// Wrap w to use our ResponseWriter methods while also exposing
// other interfaces that w may implement (http.CloseNotifier,
Expand Down Expand Up @@ -183,13 +176,15 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http

next.ServeHTTP(w, r.WithContext(ctx))

span.SetStatus(h.semconv.Status(rww.statusCode))
statusCode := rww.StatusCode()
bytesWritten := rww.BytesWritten()
span.SetStatus(h.semconv.Status(statusCode))
span.SetAttributes(h.semconv.ResponseTraceAttrs(semconv.ResponseTelemetry{
StatusCode: rww.statusCode,
ReadBytes: bw.read.Load(),
ReadError: bw.err,
WriteBytes: rww.written,
WriteError: rww.err,
StatusCode: statusCode,
ReadBytes: bw.BytesRead(),
ReadError: bw.Error(),
WriteBytes: bytesWritten,
WriteError: rww.Error(),
})...)

// Use floating point division here for higher precision (instead of Millisecond method).
Expand All @@ -198,10 +193,10 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http
h.semconv.RecordMetrics(ctx, semconv.MetricData{
ServerName: h.server,
Req: r,
StatusCode: rww.statusCode,
StatusCode: statusCode,
AdditionalAttributes: labeler.Get(),
RequestSize: bw.read.Load(),
ResponseSize: rww.written,
RequestSize: bw.BytesRead(),
ResponseSize: bytesWritten,
ElapsedTime: elapsedTime,
})
}
Expand Down
75 changes: 75 additions & 0 deletions instrumentation/net/http/otelhttp/internal/request/body_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"

import (
"io"
"sync"
)

var _ io.ReadCloser = &BodyWrapper{}

// BodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number
// of bytes read and the last error.
type BodyWrapper struct {
io.ReadCloser
OnRead func(n int64) // must not be nil

mu sync.Mutex
read int64
err error
}

// NewBodyWrapper creates a new BodyWrapper.
//
// The onRead attribute is a callback that will be called every time the data
// is read, with the number of bytes being read.
func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper {
return &BodyWrapper{
ReadCloser: body,
OnRead: onRead,
}
}

// Read reads the data from the io.ReadCloser, and stores the number of bytes
// read and the error.
func (w *BodyWrapper) Read(b []byte) (int, error) {
n, err := w.ReadCloser.Read(b)
n1 := int64(n)

w.updateReadData(n1, err)
w.OnRead(n1)
return n, err
}

func (w *BodyWrapper) updateReadData(n int64, err error) {
w.mu.Lock()
defer w.mu.Unlock()

w.read += n
if err != nil {
w.err = err
}
}

// Closes closes the io.ReadCloser.
func (w *BodyWrapper) Close() error {
return w.ReadCloser.Close()
}

// BytesRead returns the number of bytes read up to this point.
func (w *BodyWrapper) BytesRead() int64 {
w.mu.Lock()
defer w.mu.Unlock()

return w.read
}

// Error returns the last error.
func (w *BodyWrapper) Error() error {
w.mu.Lock()
defer w.mu.Unlock()

return w.err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package request

import (
"errors"
"io"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var errFirstCall = errors.New("first call")

func TestBodyWrapper(t *testing.T) {
bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {})

data, err := io.ReadAll(bw)
require.NoError(t, err)
assert.Equal(t, "hello world", string(data))

assert.Equal(t, int64(11), bw.BytesRead())
assert.Equal(t, io.EOF, bw.Error())
}

type multipleErrorsReader struct {
calls int
}

type errorWrapper struct{}

func (errorWrapper) Error() string {
return "subsequent calls"
}

func (mer *multipleErrorsReader) Read([]byte) (int, error) {
mer.calls = mer.calls + 1
if mer.calls == 1 {
return 0, errFirstCall
}

return 0, errorWrapper{}
}

func TestBodyWrapperWithErrors(t *testing.T) {
bw := NewBodyWrapper(io.NopCloser(&multipleErrorsReader{}), func(int64) {})

data, err := io.ReadAll(bw)
require.Equal(t, errFirstCall, err)
assert.Equal(t, "", string(data))
require.Equal(t, errFirstCall, bw.Error())

data, err = io.ReadAll(bw)
require.Equal(t, errorWrapper{}, err)
assert.Equal(t, "", string(data))
require.Equal(t, errorWrapper{}, bw.Error())
}

func TestConcurrentBodyWrapper(t *testing.T) {
bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {})

go func() {
_, _ = io.ReadAll(bw)
}()

assert.NotNil(t, bw.BytesRead())
assert.Eventually(t, func() bool {
return errors.Is(bw.Error(), io.EOF)
}, time.Second, 10*time.Millisecond)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"

import (
"net/http"
"sync"
)

var _ http.ResponseWriter = &RespWriterWrapper{}

// RespWriterWrapper wraps a http.ResponseWriter in order to track the number of
// bytes written, the last error, and to catch the first written statusCode.
// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional
// types (http.Hijacker, http.Pusher, http.CloseNotifier, etc)
// that may be useful when using it in real life situations.
type RespWriterWrapper struct {
http.ResponseWriter
OnWrite func(n int64) // must not be nil

mu sync.RWMutex
written int64
statusCode int
err error
wroteHeader bool
}

// NewRespWriterWrapper creates a new RespWriterWrapper.
//
// The onWrite attribute is a callback that will be called every time the data
// is written, with the number of bytes that were written.
func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWriterWrapper {
return &RespWriterWrapper{
ResponseWriter: w,
OnWrite: onWrite,
statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything
}
}

// Write writes the bytes array into the [ResponseWriter], and tracks the
// number of bytes written and last error.
func (w *RespWriterWrapper) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()

w.writeHeader(http.StatusOK)

n, err := w.ResponseWriter.Write(p)
n1 := int64(n)
w.OnWrite(n1)
w.written += n1
w.err = err
return n, err
}

// WriteHeader persists initial statusCode for span attribution.
// All calls to WriteHeader will be propagated to the underlying ResponseWriter
// and will persist the statusCode from the first call.
// Blocking consecutive calls to WriteHeader alters expected behavior and will
// remove warning logs from net/http where developers will notice incorrect handler implementations.
func (w *RespWriterWrapper) WriteHeader(statusCode int) {
w.mu.Lock()
defer w.mu.Unlock()

w.writeHeader(statusCode)
}

// writeHeader persists the status code for span attribution, and propagates
// the call to the underlying ResponseWriter.
// It does not acquire a lock, and therefore assumes that is being handled by a
// parent method.
func (w *RespWriterWrapper) writeHeader(statusCode int) {
if !w.wroteHeader {
w.wroteHeader = true
w.statusCode = statusCode
}
w.ResponseWriter.WriteHeader(statusCode)
}

// Flush implements [http.Flusher].
func (w *RespWriterWrapper) Flush() {
w.WriteHeader(http.StatusOK)

if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}

// BytesWritten returns the number of bytes written.
func (w *RespWriterWrapper) BytesWritten() int64 {
w.mu.RLock()
defer w.mu.RUnlock()

return w.written
}

// BytesWritten returns the HTTP status code that was sent.
func (w *RespWriterWrapper) StatusCode() int {
w.mu.RLock()
defer w.mu.RUnlock()

return w.statusCode
}

// Error returns the last error.
func (w *RespWriterWrapper) Error() error {
w.mu.RLock()
defer w.mu.RUnlock()

return w.err
}
Loading

0 comments on commit 88156b3

Please sign in to comment.