Skip to content

Commit

Permalink
dont store endpoint payloads in redis since they cant retry anyway
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-lombardi committed Dec 21, 2024
1 parent 34cc32e commit 4ee1bd8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
19 changes: 10 additions & 9 deletions pkg/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,21 @@ func (rb *RequestBuffer) handleHeartbeatEvents() {
}
}

func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error {
func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error {
done := make(chan bool)
req := &request{
ctx: ctx,
done: done,
payload: &types.TaskPayload{
Args: task.msg.Args,
Kwargs: task.msg.Kwargs,
},
task: task,
ctx: ctx,
done: done,
payload: payload,
task: task,
}
rb.buffer.Push(req, false)

defer func() {
req.task = nil
req.payload = nil
}()

for {
select {
case <-rb.ctx.Done():
Expand Down Expand Up @@ -534,7 +536,6 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) {
func (rb *RequestBuffer) afterRequest(req *request, containerId string) {
defer func() {
req.done <- true
req.task = nil
}()

defer rb.releaseRequestToken(containerId, req.task.msg.TaskId)
Expand Down
4 changes: 2 additions & 2 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (es *HttpEndpointService) forwardRequest(
ttl = DefaultEndpointRequestTTL
}

task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, payload, types.TaskPolicy{
task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, &types.TaskPayload{}, types.TaskPolicy{
MaxRetries: 0,
Timeout: instance.StubConfig.TaskPolicy.Timeout,
Expires: time.Now().Add(time.Duration(ttl) * time.Second),
Expand All @@ -209,7 +209,7 @@ func (es *HttpEndpointService) forwardRequest(
return err
}

return task.Execute(ctx.Request().Context(), ctx)
return task.Execute(ctx.Request().Context(), ctx, payload)
}

func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/abstractions/endpoint/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type EndpointTask struct {
func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error {
var err error = nil
echoCtx := options[0].(echo.Context)
payload := options[1].(*types.TaskPayload)

instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId)
if err != nil {
return err
Expand All @@ -30,7 +32,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro
return err
}

return instance.buffer.ForwardRequest(echoCtx, t)
return instance.buffer.ForwardRequest(echoCtx, t, payload)
}

func (t *EndpointTask) Retry(ctx context.Context) error {
Expand Down

0 comments on commit 4ee1bd8

Please sign in to comment.