diff --git a/.github/ISSUE_TEMPLATE/owner.md b/.github/ISSUE_TEMPLATE/owner.md index 39506162c67..fd03fd3c61b 100644 --- a/.github/ISSUE_TEMPLATE/owner.md +++ b/.github/ISSUE_TEMPLATE/owner.md @@ -18,7 +18,7 @@ Module: [e.g. go.opentelemetry.io/contrib/zpages] - [ ] I understand I will be responsible for the stability and versioning compliance of the module - [ ] I understand I will be responsible for deciding any additional Code Owners of the module -[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/community-membership.md#member +[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member ### Relevant experience diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c5d1a37850..41521bc3d77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The deprecated `go.opentelemetry.io/contrib/processors/baggagecopy` package is removed. (#5853) +### Fixed + +- Race condition when reading the HTTP body and writing the response in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#5916) + diff --git a/CODEOWNERS b/CODEOWNERS index d4abc2b9d36..58281da3558 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -5,7 +5,7 @@ ##################################################### # # Learn about membership in OpenTelemetry community: -# https://github.com/open-telemetry/community/blob/main/community-membership.md +# https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member # # Learn about Code Owners policy in OpenTelemetry Go Contrib: # https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/CONTRIBUTING.md#code-owners diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 26f192a9066..a0863271f67 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,7 +32,7 @@ How much interaction with the module is required before becoming a Code Owner is Code Ownership is ultimately up to the judgement of the existing Code Owners and Maintainers of this repository. Meeting the above requirements is not a guarantee to be granted Code Ownership. -[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/community-membership.md#member +[member of the OpenTelemetry organization]: https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md#member ### Responsibilities @@ -55,7 +55,7 @@ It is at the discretion of the repository Maintainers and fellow Code Owners to If a Code Owner is determined to be unable to perform their duty, a repository Maintainer will remove their ownership. Inactivity greater than 5 months, during which time there are active Issues or Pull Requests to address, is deemed an automatic disqualification from being a Code Owner. -A repository Maintainer may remove an Code Owner inactive for this length. +A repository Maintainer may remove an Code Owner inactive for this length. ## Filing Issues @@ -146,4 +146,4 @@ Emeritus: ### Become an Approver or a Maintainer See the [community membership document in OpenTelemetry community -repo](https://github.com/open-telemetry/community/blob/main/community-membership.md). +repo](https://github.com/open-telemetry/community/blob/main/guides/contributor/membership.md). diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index 50d1d34ebf9..33580a35b77 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -9,6 +9,7 @@ import ( "github.com/felixge/httpsnoop" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" @@ -132,14 +133,12 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - var bw bodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. + bw := request.NewBodyWrapper(r.Body, readRecordFunc) if r.Body != nil && r.Body != http.NoBody { - bw.ReadCloser = r.Body - bw.record = readRecordFunc - r.Body = &bw + r.Body = bw } writeRecordFunc := func(int64) {} @@ -149,13 +148,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - rww := &respWriterWrapper{ - ResponseWriter: w, - record: writeRecordFunc, - ctx: ctx, - props: h.propagators, - statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything - } + rww := request.NewRespWriterWrapper(w, writeRecordFunc) // Wrap w to use our ResponseWriter methods while also exposing // other interfaces that w may implement (http.CloseNotifier, @@ -183,13 +176,15 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http next.ServeHTTP(w, r.WithContext(ctx)) - span.SetStatus(h.semconv.Status(rww.statusCode)) + statusCode := rww.StatusCode() + bytesWritten := rww.BytesWritten() + span.SetStatus(h.semconv.Status(statusCode)) span.SetAttributes(h.semconv.ResponseTraceAttrs(semconv.ResponseTelemetry{ - StatusCode: rww.statusCode, - ReadBytes: bw.read.Load(), - ReadError: bw.err, - WriteBytes: rww.written, - WriteError: rww.err, + StatusCode: statusCode, + ReadBytes: bw.BytesRead(), + ReadError: bw.Error(), + WriteBytes: bytesWritten, + WriteError: rww.Error(), })...) // Use floating point division here for higher precision (instead of Millisecond method). @@ -198,10 +193,10 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http h.semconv.RecordMetrics(ctx, semconv.MetricData{ ServerName: h.server, Req: r, - StatusCode: rww.statusCode, + StatusCode: statusCode, AdditionalAttributes: labeler.Get(), - RequestSize: bw.read.Load(), - ResponseSize: rww.written, + RequestSize: bw.BytesRead(), + ResponseSize: bytesWritten, ElapsedTime: elapsedTime, }) } diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go new file mode 100644 index 00000000000..a945f556616 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" + +import ( + "io" + "sync" +) + +var _ io.ReadCloser = &BodyWrapper{} + +// BodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number +// of bytes read and the last error. +type BodyWrapper struct { + io.ReadCloser + OnRead func(n int64) // must not be nil + + mu sync.Mutex + read int64 + err error +} + +// NewBodyWrapper creates a new BodyWrapper. +// +// The onRead attribute is a callback that will be called every time the data +// is read, with the number of bytes being read. +func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper { + return &BodyWrapper{ + ReadCloser: body, + OnRead: onRead, + } +} + +// Read reads the data from the io.ReadCloser, and stores the number of bytes +// read and the error. +func (w *BodyWrapper) Read(b []byte) (int, error) { + n, err := w.ReadCloser.Read(b) + n1 := int64(n) + + w.updateReadData(n1, err) + w.OnRead(n1) + return n, err +} + +func (w *BodyWrapper) updateReadData(n int64, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + w.read += n + if err != nil { + w.err = err + } +} + +// Closes closes the io.ReadCloser. +func (w *BodyWrapper) Close() error { + return w.ReadCloser.Close() +} + +// BytesRead returns the number of bytes read up to this point. +func (w *BodyWrapper) BytesRead() int64 { + w.mu.Lock() + defer w.mu.Unlock() + + return w.read +} + +// Error returns the last error. +func (w *BodyWrapper) Error() error { + w.mu.Lock() + defer w.mu.Unlock() + + return w.err +} diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go new file mode 100644 index 00000000000..794e54fb9c8 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request + +import ( + "errors" + "io" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var errFirstCall = errors.New("first call") + +func TestBodyWrapper(t *testing.T) { + bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) + + data, err := io.ReadAll(bw) + require.NoError(t, err) + assert.Equal(t, "hello world", string(data)) + + assert.Equal(t, int64(11), bw.BytesRead()) + assert.Equal(t, io.EOF, bw.Error()) +} + +type multipleErrorsReader struct { + calls int +} + +type errorWrapper struct{} + +func (errorWrapper) Error() string { + return "subsequent calls" +} + +func (mer *multipleErrorsReader) Read([]byte) (int, error) { + mer.calls = mer.calls + 1 + if mer.calls == 1 { + return 0, errFirstCall + } + + return 0, errorWrapper{} +} + +func TestBodyWrapperWithErrors(t *testing.T) { + bw := NewBodyWrapper(io.NopCloser(&multipleErrorsReader{}), func(int64) {}) + + data, err := io.ReadAll(bw) + require.Equal(t, errFirstCall, err) + assert.Equal(t, "", string(data)) + require.Equal(t, errFirstCall, bw.Error()) + + data, err = io.ReadAll(bw) + require.Equal(t, errorWrapper{}, err) + assert.Equal(t, "", string(data)) + require.Equal(t, errorWrapper{}, bw.Error()) +} + +func TestConcurrentBodyWrapper(t *testing.T) { + bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) + + go func() { + _, _ = io.ReadAll(bw) + }() + + assert.NotNil(t, bw.BytesRead()) + assert.Eventually(t, func() bool { + return errors.Is(bw.Error(), io.EOF) + }, time.Second, 10*time.Millisecond) +} diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go new file mode 100644 index 00000000000..aea171fb260 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" + +import ( + "net/http" + "sync" +) + +var _ http.ResponseWriter = &RespWriterWrapper{} + +// RespWriterWrapper wraps a http.ResponseWriter in order to track the number of +// bytes written, the last error, and to catch the first written statusCode. +// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional +// types (http.Hijacker, http.Pusher, http.CloseNotifier, etc) +// that may be useful when using it in real life situations. +type RespWriterWrapper struct { + http.ResponseWriter + OnWrite func(n int64) // must not be nil + + mu sync.RWMutex + written int64 + statusCode int + err error + wroteHeader bool +} + +// NewRespWriterWrapper creates a new RespWriterWrapper. +// +// The onWrite attribute is a callback that will be called every time the data +// is written, with the number of bytes that were written. +func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWriterWrapper { + return &RespWriterWrapper{ + ResponseWriter: w, + OnWrite: onWrite, + statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything + } +} + +// Write writes the bytes array into the [ResponseWriter], and tracks the +// number of bytes written and last error. +func (w *RespWriterWrapper) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + + w.writeHeader(http.StatusOK) + + n, err := w.ResponseWriter.Write(p) + n1 := int64(n) + w.OnWrite(n1) + w.written += n1 + w.err = err + return n, err +} + +// WriteHeader persists initial statusCode for span attribution. +// All calls to WriteHeader will be propagated to the underlying ResponseWriter +// and will persist the statusCode from the first call. +// Blocking consecutive calls to WriteHeader alters expected behavior and will +// remove warning logs from net/http where developers will notice incorrect handler implementations. +func (w *RespWriterWrapper) WriteHeader(statusCode int) { + w.mu.Lock() + defer w.mu.Unlock() + + w.writeHeader(statusCode) +} + +// writeHeader persists the status code for span attribution, and propagates +// the call to the underlying ResponseWriter. +// It does not acquire a lock, and therefore assumes that is being handled by a +// parent method. +func (w *RespWriterWrapper) writeHeader(statusCode int) { + if !w.wroteHeader { + w.wroteHeader = true + w.statusCode = statusCode + } + w.ResponseWriter.WriteHeader(statusCode) +} + +// Flush implements [http.Flusher]. +func (w *RespWriterWrapper) Flush() { + w.WriteHeader(http.StatusOK) + + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +// BytesWritten returns the number of bytes written. +func (w *RespWriterWrapper) BytesWritten() int64 { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.written +} + +// BytesWritten returns the HTTP status code that was sent. +func (w *RespWriterWrapper) StatusCode() int { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.statusCode +} + +// Error returns the last error. +func (w *RespWriterWrapper) Error() error { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.err +} diff --git a/instrumentation/net/http/otelhttp/wrap_test.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go similarity index 64% rename from instrumentation/net/http/otelhttp/wrap_test.go rename to instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go index d4b89411a29..21229b4dc69 100644 --- a/instrumentation/net/http/otelhttp/wrap_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package otelhttp +package request import ( "net/http" @@ -12,10 +12,7 @@ import ( ) func TestRespWriterWriteHeader(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: &httptest.ResponseRecorder{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) rw.WriteHeader(http.StatusTeapot) assert.Equal(t, http.StatusTeapot, rw.statusCode) @@ -26,10 +23,7 @@ func TestRespWriterWriteHeader(t *testing.T) { } func TestRespWriterFlush(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: &httptest.ResponseRecorder{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) rw.Flush() assert.Equal(t, http.StatusOK, rw.statusCode) @@ -49,12 +43,21 @@ func (_ nonFlushableResponseWriter) Write([]byte) (int, error) { func (_ nonFlushableResponseWriter) WriteHeader(int) {} func TestRespWriterFlushNoFlusher(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: nonFlushableResponseWriter{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(nonFlushableResponseWriter{}, func(int64) {}) rw.Flush() assert.Equal(t, http.StatusOK, rw.statusCode) assert.True(t, rw.wroteHeader) } + +func TestConcurrentRespWriterWrapper(t *testing.T) { + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) + + go func() { + _, _ = rw.Write([]byte("hello world")) + }() + + assert.NotNil(t, rw.BytesWritten()) + assert.NotNil(t, rw.StatusCode()) + assert.NoError(t, rw.Error()) +} diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index be17d4c755d..fc4dd98f3d0 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" @@ -146,16 +147,12 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request. - // use a body wrapper to determine the request size - var bw bodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. + bw := request.NewBodyWrapper(r.Body, func(int64) {}) if r.Body != nil && r.Body != http.NoBody { - bw.ReadCloser = r.Body - // noop to prevent nil panic. not using this record fun yet. - bw.record = func(int64) {} - r.Body = &bw + r.Body = bw } span.SetAttributes(t.semconv.RequestTraceAttrs(r)...) @@ -184,7 +181,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { } o := metric.WithAttributeSet(attribute.NewSet(metricAttrs...)) - t.requestBytesCounter.Add(ctx, bw.read.Load(), o) + t.requestBytesCounter.Add(ctx, bw.BytesRead(), o) // For handling response bytes we leverage a callback when the client reads the http response readRecordFunc := func(n int64) { t.responseBytesCounter.Add(ctx, n, o) diff --git a/instrumentation/net/http/otelhttp/wrap.go b/instrumentation/net/http/otelhttp/wrap.go deleted file mode 100644 index 948f8406c09..00000000000 --- a/instrumentation/net/http/otelhttp/wrap.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - -import ( - "context" - "io" - "net/http" - "sync/atomic" - - "go.opentelemetry.io/otel/propagation" -) - -var _ io.ReadCloser = &bodyWrapper{} - -// bodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number -// of bytes read and the last error. -type bodyWrapper struct { - io.ReadCloser - record func(n int64) // must not be nil - - read atomic.Int64 - err error -} - -func (w *bodyWrapper) Read(b []byte) (int, error) { - n, err := w.ReadCloser.Read(b) - n1 := int64(n) - w.read.Add(n1) - w.err = err - w.record(n1) - return n, err -} - -func (w *bodyWrapper) Close() error { - return w.ReadCloser.Close() -} - -var _ http.ResponseWriter = &respWriterWrapper{} - -// respWriterWrapper wraps a http.ResponseWriter in order to track the number of -// bytes written, the last error, and to catch the first written statusCode. -// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional -// types (http.Hijacker, http.Pusher, http.CloseNotifier, http.Flusher, etc) -// that may be useful when using it in real life situations. -type respWriterWrapper struct { - http.ResponseWriter - record func(n int64) // must not be nil - - // used to inject the header - ctx context.Context - - props propagation.TextMapPropagator - - written int64 - statusCode int - err error - wroteHeader bool -} - -func (w *respWriterWrapper) Header() http.Header { - return w.ResponseWriter.Header() -} - -func (w *respWriterWrapper) Write(p []byte) (int, error) { - if !w.wroteHeader { - w.WriteHeader(http.StatusOK) - } - n, err := w.ResponseWriter.Write(p) - n1 := int64(n) - w.record(n1) - w.written += n1 - w.err = err - return n, err -} - -// WriteHeader persists initial statusCode for span attribution. -// All calls to WriteHeader will be propagated to the underlying ResponseWriter -// and will persist the statusCode from the first call. -// Blocking consecutive calls to WriteHeader alters expected behavior and will -// remove warning logs from net/http where developers will notice incorrect handler implementations. -func (w *respWriterWrapper) WriteHeader(statusCode int) { - if !w.wroteHeader { - w.wroteHeader = true - w.statusCode = statusCode - } - w.ResponseWriter.WriteHeader(statusCode) -} - -func (w *respWriterWrapper) Flush() { - if !w.wroteHeader { - w.WriteHeader(http.StatusOK) - } - - if f, ok := w.ResponseWriter.(http.Flusher); ok { - f.Flush() - } -}