Skip to content

Commit

Permalink
fix superfluous write header issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dleviminzi committed Dec 19, 2024
1 parent d93a6e8 commit 2c47cd8
Showing 1 changed file with 58 additions and 38 deletions.
96 changes: 58 additions & 38 deletions pkg/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package endpoint

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"

abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common"
"github.com/beam-cloud/beta9/pkg/common"
Expand Down Expand Up @@ -414,7 +416,10 @@ func (rb *RequestBuffer) handleWSRequest(req *request, c container) {
}

func (rb *RequestBuffer) handleHttpRequest(req *request, c container) {
ctx := req.ctx.Request().Context()

request := req.ctx.Request()
response := req.ctx.Response()

requestBody := request.Body
if !rb.isASGI {
Expand Down Expand Up @@ -460,58 +465,42 @@ func (rb *RequestBuffer) handleHttpRequest(req *request, c container) {

resp, err := httpClient.Do(httpReq)
if err != nil {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
// If the context is canceled, we don't want to write a response
if ctx.Err() == nil {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
}
return
}

defer resp.Body.Close()

responseHeaders := make(http.Header)
for key, values := range resp.Header {
for _, value := range values {
responseHeaders.Add(key, value)
response.Header().Set(key, value)
}
}

// Write response headers
for key, values := range responseHeaders {
for _, value := range values {
req.ctx.Response().Writer.Header().Add(key, value)
}
}
contentType := resp.Header.Get("Content-Type")
response.WriteHeader(resp.StatusCode)

// Write status code header
req.ctx.Response().Writer.WriteHeader(resp.StatusCode)
// Check if this needs buffered streaming with flush
needsBufferedStream := strings.Contains(contentType, "text/event-stream") ||
strings.Contains(contentType, "multipart/") ||
resp.Header.Get("Transfer-Encoding") == "chunked"

// Check if we can stream the response
streamingSupported := true
flusher, ok := req.ctx.Response().Writer.(http.Flusher)
if !ok {
streamingSupported = false
}

// Send response to client in chunks
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if n > 0 {
req.ctx.Response().Writer.Write(buf[:n])

if streamingSupported {
flusher.Flush()
if needsBufferedStream {
err := streamWithFlush(ctx, response, resp.Body)
if err != nil {
if ctx.Err() == nil {
log.Error().Err(err).Msg("error reading stream")
}
}

if err != nil {
if err != io.EOF {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
} else {
if _, err := io.Copy(response, resp.Body); err != nil {
if ctx.Err() == nil {
log.Error().Err(err).Msg("error copying response")
}

break
}
}
}
Expand Down Expand Up @@ -603,3 +592,34 @@ func forwardWSConn(src net.Conn, dst net.Conn) {
return
}
}

func streamWithFlush(ctx context.Context, response *echo.Response, body io.Reader) error {
flusher, ok := response.Writer.(http.Flusher)
if !ok {
return errors.New("streaming not supported")
}

reader := bufio.NewReader(body)
buf := make([]byte, 4096)

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, err := reader.Read(buf)
if n > 0 {
if _, err := response.Write(buf[:n]); err != nil {
return err
}
flusher.Flush()
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
}

0 comments on commit 2c47cd8

Please sign in to comment.