diff --git a/pkg/abstractions/endpoint/buffer.go b/pkg/abstractions/endpoint/buffer.go index 2ebab8593..e7db6d843 100644 --- a/pkg/abstractions/endpoint/buffer.go +++ b/pkg/abstractions/endpoint/buffer.go @@ -131,19 +131,21 @@ func (rb *RequestBuffer) handleHeartbeatEvents() { } } -func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error { +func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error { done := make(chan bool) req := &request{ - ctx: ctx, - done: done, - payload: &types.TaskPayload{ - Args: task.msg.Args, - Kwargs: task.msg.Kwargs, - }, - task: task, + ctx: ctx, + done: done, + payload: payload, + task: task, } rb.buffer.Push(req, false) + defer func() { + req.task = nil + req.payload = nil + }() + for { select { case <-rb.ctx.Done(): @@ -534,7 +536,6 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) { func (rb *RequestBuffer) afterRequest(req *request, containerId string) { defer func() { req.done <- true - req.task = nil }() defer rb.releaseRequestToken(containerId, req.task.msg.TaskId) diff --git a/pkg/abstractions/endpoint/endpoint.go b/pkg/abstractions/endpoint/endpoint.go index 818b0c12e..185e2f22f 100644 --- a/pkg/abstractions/endpoint/endpoint.go +++ b/pkg/abstractions/endpoint/endpoint.go @@ -200,7 +200,7 @@ func (es *HttpEndpointService) forwardRequest( ttl = DefaultEndpointRequestTTL } - task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, payload, types.TaskPolicy{ + task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, &types.TaskPayload{}, types.TaskPolicy{ MaxRetries: 0, Timeout: instance.StubConfig.TaskPolicy.Timeout, Expires: time.Now().Add(time.Duration(ttl) * time.Second), @@ -209,7 +209,7 @@ func (es *HttpEndpointService) forwardRequest( return err } - return task.Execute(ctx.Request().Context(), ctx) + return task.Execute(ctx.Request().Context(), ctx, payload) } func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) { diff --git a/pkg/abstractions/endpoint/task.go b/pkg/abstractions/endpoint/task.go index 563f35cb3..149bd5256 100644 --- a/pkg/abstractions/endpoint/task.go +++ b/pkg/abstractions/endpoint/task.go @@ -16,6 +16,8 @@ type EndpointTask struct { func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error { var err error = nil echoCtx := options[0].(echo.Context) + payload := options[1].(*types.TaskPayload) + instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId) if err != nil { return err @@ -30,7 +32,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro return err } - return instance.buffer.ForwardRequest(echoCtx, t) + return instance.buffer.ForwardRequest(echoCtx, t, payload) } func (t *EndpointTask) Retry(ctx context.Context) error {