diff --git a/go.mod b/go.mod index 6ed0cab76..92facffa9 100644 --- a/go.mod +++ b/go.mod @@ -33,13 +33,14 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/hanwen/go-fuse/v2 v2.5.1 github.com/jmoiron/sqlx v1.3.5 + github.com/json-iterator/go v1.1.12 github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 github.com/knadh/koanf/providers/rawbytes v0.1.0 github.com/knadh/koanf/v2 v2.0.1 github.com/labstack/echo-contrib v0.17.1 - github.com/labstack/echo/v4 v4.12.0 + github.com/labstack/echo/v4 v4.13.3 github.com/lib/pq v1.10.9 github.com/mholt/archiver/v3 v3.5.1 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -58,7 +59,7 @@ require ( github.com/sashabaranov/go-openai v1.35.7 github.com/shirou/gopsutil/v4 v4.24.6 github.com/sirupsen/logrus v1.9.3 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tj/assert v0.0.3 github.com/vishvananda/netlink v1.2.1-beta.2 github.com/vishvananda/netns v0.0.4 @@ -73,9 +74,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 - golang.org/x/net v0.30.0 - golang.org/x/sync v0.8.0 - golang.org/x/sys v0.26.0 + golang.org/x/net v0.33.0 + golang.org/x/sync v0.10.0 + golang.org/x/sys v0.28.0 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.2 gopkg.in/yaml.v2 v2.4.0 @@ -172,7 +173,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect github.com/jsimonetti/rtnetlink v1.4.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/karrick/godirwalk v1.17.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/pgzip v1.2.6 // indirect @@ -251,12 +251,12 @@ require ( go.uber.org/zap v1.27.0 // indirect go4.org/mem v0.0.0-20220726221520-4f986261bf13 // indirect go4.org/netipx v0.0.0-20231129151722-fdeea329fbba // indirect - golang.org/x/crypto v0.28.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/term v0.25.0 // indirect - golang.org/x/text v0.19.0 // indirect - golang.org/x/time v0.7.0 // indirect + golang.org/x/term v0.27.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.8.0 // indirect golang.org/x/tools v0.24.0 // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect golang.zx2c4.com/wireguard/windows v0.5.3 // indirect diff --git a/go.sum b/go.sum index 9572dafc3..16dceb5dd 100644 --- a/go.sum +++ b/go.sum @@ -368,6 +368,8 @@ github.com/labstack/echo-contrib v0.17.1 h1:7I/he7ylVKsDUieaGRZ9XxxTYOjfQwVzHzUY github.com/labstack/echo-contrib v0.17.1/go.mod h1:SnsCZtwHBAZm5uBSAtQtXQHI3wqEA73hvTn0bYMKnZA= github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM= +github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY= +github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= @@ -559,6 +561,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/certstore v0.1.1-0.20231202035212-d3fa0460f47e h1:PtWT87weP5LWHEY//SWsYkSO3RWRZo4OSWagh3YD2vQ= github.com/tailscale/certstore v0.1.1-0.20231202035212-d3fa0460f47e/go.mod h1:XrBNfAFN+pwoWuksbFS9Ccxnopa15zJGgXRFN90l3K4= github.com/tailscale/go-winio v0.0.0-20231025203758-c4f33415bf55 h1:Gzfnfk2TWrk8Jj4P4c1a3CtQyMaTVCznlkLZI++hok4= @@ -675,6 +679,8 @@ golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= @@ -700,6 +706,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -712,6 +720,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -743,15 +753,23 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/abstractions/endpoint/buffer.go b/pkg/abstractions/endpoint/buffer.go index 1f24ee6d1..2bcb6de12 100644 --- a/pkg/abstractions/endpoint/buffer.go +++ b/pkg/abstractions/endpoint/buffer.go @@ -3,7 +3,6 @@ package endpoint import ( "bytes" "context" - "encoding/json" "errors" "fmt" "io" @@ -22,6 +21,7 @@ import ( "github.com/beam-cloud/beta9/pkg/common" "github.com/beam-cloud/beta9/pkg/network" "github.com/beam-cloud/beta9/pkg/repository" + "github.com/beam-cloud/beta9/pkg/task" "github.com/beam-cloud/beta9/pkg/types" ) @@ -32,7 +32,6 @@ const ( type request struct { ctx echo.Context - payload *types.TaskPayload task *EndpointTask done chan bool processed bool @@ -61,6 +60,7 @@ type RequestBuffer struct { isASGI bool keyEventManager *common.KeyEventManager keyEventChan chan common.KeyEvent + httpClientCache sync.Map } func NewRequestBuffer( @@ -131,13 +131,12 @@ func (rb *RequestBuffer) handleHeartbeatEvents() { } } -func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error { +func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error { done := make(chan bool) req := &request{ - ctx: ctx, - done: done, - payload: payload, - task: task, + ctx: ctx, + done: done, + task: task, } rb.buffer.Push(req, false) @@ -175,7 +174,6 @@ func (rb *RequestBuffer) processRequests() { if req.ctx.Request().Context().Err() != nil { rb.cancelInFlightTask(req.task) - req.payload = nil continue } @@ -345,6 +343,10 @@ func (rb *RequestBuffer) releaseRequestToken(containerId, taskId string) error { } func (rb *RequestBuffer) getHttpClient(address string) (*http.Client, error) { + if client, exists := rb.httpClientCache.Load(address); exists { + return client.(*http.Client), nil + } + // If it isn't an tailnet address, just return the standard http client if !rb.tsConfig.Enabled || !strings.Contains(address, rb.tsConfig.HostName) { return rb.httpClient, nil @@ -355,8 +357,6 @@ func (rb *RequestBuffer) getHttpClient(address string) (*http.Client, error) { return nil, err } - // Create a custom transport that uses the established connection - // Either using tailscale or not transport := &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return conn, nil @@ -367,6 +367,7 @@ func (rb *RequestBuffer) getHttpClient(address string) (*http.Client, error) { Transport: transport, } + rb.httpClientCache.Store(address, client) return client, nil } @@ -419,17 +420,32 @@ func (rb *RequestBuffer) handleWSRequest(req *request, c container) { func (rb *RequestBuffer) handleHttpRequest(req *request, c container) { request := req.ctx.Request() - requestBody := request.Body + var requestBody io.ReadCloser = request.Body if !rb.isASGI { - b, err := json.Marshal(req.payload) + payload, err := task.SerializeHttpPayload(req.ctx) if err != nil { + req.ctx.JSON(http.StatusBadRequest, map[string]interface{}{ + "error": err.Error(), + }) return } - requestBody = io.NopCloser(bytes.NewReader(b)) + + payloadBytes, err := json.Marshal(payload) + if err != nil { + req.ctx.JSON(http.StatusBadRequest, map[string]interface{}{ + "error": err.Error(), + }) + return + } + + requestBody = io.NopCloser(bytes.NewReader(payloadBytes)) } httpClient, err := rb.getHttpClient(c.address) if err != nil { + req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{ + "error": "Internal server error", + }) return } @@ -463,52 +479,27 @@ func (rb *RequestBuffer) handleHttpRequest(req *request, c container) { resp, err := httpClient.Do(httpReq) if err != nil { - req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": "Internal server error", - }) + if req.ctx.Request().Context().Err() == context.Canceled { + rb.cancelInFlightTask(req.task) + return + } return } - defer resp.Body.Close() - // Write response headers + // Set response headers and status code before writing the body for key, values := range resp.Header { for _, value := range values { - req.ctx.Response().Writer.Header().Add(key, value) + req.ctx.Response().Header().Add(key, value) } } + req.ctx.Response().WriteHeader(resp.StatusCode) - // Write status code header - req.ctx.Response().Writer.WriteHeader(resp.StatusCode) - - // Check if we can stream the response - streamingSupported := true - flusher, ok := req.ctx.Response().Writer.(http.Flusher) - if !ok { - streamingSupported = false - } - - // Send response to client in chunks - buf := make([]byte, 4096) - for { - n, err := resp.Body.Read(buf) - if n > 0 { - req.ctx.Response().Writer.Write(buf[:n]) - - if streamingSupported { - flusher.Flush() - } - } - - if err != nil { - if err != io.EOF { - req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": "Internal server error", - }) - } - - break - } + _, err = io.Copy(req.ctx.Response().Writer, resp.Body) + if err != nil { + req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{ + "error": "Internal server error", + }) } } @@ -537,7 +528,6 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) { func (rb *RequestBuffer) afterRequest(req *request, containerId string) { defer func() { req.done <- true - req.payload = nil }() defer rb.releaseRequestToken(containerId, req.task.msg.TaskId) diff --git a/pkg/abstractions/endpoint/endpoint.go b/pkg/abstractions/endpoint/endpoint.go index 185e2f22f..f22b60dd2 100644 --- a/pkg/abstractions/endpoint/endpoint.go +++ b/pkg/abstractions/endpoint/endpoint.go @@ -2,12 +2,13 @@ package endpoint import ( "context" - "encoding/json" "errors" "fmt" "net/http" "time" + jsoniter "github.com/json-iterator/go" + "github.com/labstack/echo/v4" abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common" @@ -21,6 +22,8 @@ import ( pb "github.com/beam-cloud/beta9/proto" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + type EndpointService interface { pb.EndpointServiceServer StartEndpointServe(in *pb.StartEndpointServeRequest, stream pb.EndpointService_StartEndpointServeServer) error @@ -184,16 +187,6 @@ func (es *HttpEndpointService) forwardRequest( }) } - payload := &types.TaskPayload{} - if !instance.isASGI { - payload, err = task.SerializeHttpPayload(ctx) - if err != nil { - return ctx.JSON(http.StatusBadRequest, map[string]interface{}{ - "error": err.Error(), - }) - } - } - // Needed for backwards compatibility ttl := instance.StubConfig.TaskPolicy.TTL if ttl == 0 { @@ -209,7 +202,7 @@ func (es *HttpEndpointService) forwardRequest( return err } - return task.Execute(ctx.Request().Context(), ctx, payload) + return task.Execute(ctx.Request().Context(), ctx) } func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) { diff --git a/pkg/abstractions/endpoint/task.go b/pkg/abstractions/endpoint/task.go index 149bd5256..4bcb80a51 100644 --- a/pkg/abstractions/endpoint/task.go +++ b/pkg/abstractions/endpoint/task.go @@ -16,7 +16,6 @@ type EndpointTask struct { func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error { var err error = nil echoCtx := options[0].(echo.Context) - payload := options[1].(*types.TaskPayload) instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId) if err != nil { @@ -32,7 +31,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro return err } - return instance.buffer.ForwardRequest(echoCtx, t, payload) + return instance.buffer.ForwardRequest(echoCtx, t) } func (t *EndpointTask) Retry(ctx context.Context) error { diff --git a/pkg/task/dispatch.go b/pkg/task/dispatch.go index e64c91553..0b0f3957a 100644 --- a/pkg/task/dispatch.go +++ b/pkg/task/dispatch.go @@ -205,7 +205,9 @@ func (d *Dispatcher) RetryTask(ctx context.Context, task types.TaskInterface) er // Hit retry limit, cancel task and resolve if taskMessage.Retries >= taskMessage.Policy.MaxRetries { - log.Info().Str("task_id", taskMessage.TaskId).Str("stub_id", taskMessage.StubId).Msg("dispatcher hit retry limit, not reinserting task") + if taskMessage.Policy.MaxRetries > 0 { + log.Info().Str("task_id", taskMessage.TaskId).Str("stub_id", taskMessage.StubId).Msg("dispatcher hit retry limit, not reinserting task") + } err = task.Cancel(ctx, types.TaskExceededRetryLimit) if err != nil { diff --git a/pkg/task/serialize.go b/pkg/task/serialize.go index 1e4c86306..d039161a4 100644 --- a/pkg/task/serialize.go +++ b/pkg/task/serialize.go @@ -1,23 +1,21 @@ package task import ( - "encoding/json" "errors" "io" "net/url" "strconv" "github.com/beam-cloud/beta9/pkg/types" + jsoniter "github.com/json-iterator/go" "github.com/labstack/echo/v4" ) func SerializeHttpPayload(ctx echo.Context) (*types.TaskPayload, error) { defer ctx.Request().Body.Close() - // Create a JSON decoder - decoder := json.NewDecoder(ctx.Request().Body) + decoder := jsoniter.NewDecoder(ctx.Request().Body) - // Decode the JSON directly from the reader payload := map[string]interface{}{} if err := decoder.Decode(&payload); err != nil { if err != io.EOF {