Skip to content

Commit

Permalink
Feat: Add warm up endpoint to both endpoints and taskqueues
Browse files Browse the repository at this point in the history
  • Loading branch information
jsun-m committed Dec 26, 2024
1 parent ea353b9 commit 367466d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,18 @@ func (es *HttpEndpointService) forwardRequest(
return task.Execute(ctx.Request().Context(), ctx, payload)
}

func (es *HttpEndpointService) warmup(
ctx echo.Context,
stubId string,
) error {
instance, err := es.getOrCreateEndpointInstance(ctx.Request().Context(), stubId)
if err != nil {
return err
}

return instance.HandleScalingEvent(1)
}

func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) {
return es.getOrCreateEndpointInstance(es.ctx, stubId)
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/abstractions/endpoint/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func registerEndpointRoutes(g *echo.Group, es *HttpEndpointService) *endpointGro
g.GET("/:deploymentName/v:version", auth.WithAuth(group.endpointRequest))
g.GET("/public/:stubId", auth.WithAssumedStubAuth(group.endpointRequest, group.es.isPublic))

g.POST("/id/:stubId/warmup", auth.WithAuth(group.warmUpEndpoint))
g.POST("/:deploymentName/warmup", auth.WithAuth(group.warmUpEndpoint))
g.POST("/:deploymentName/latest/warmup", auth.WithAuth(group.warmUpEndpoint))
g.POST("/:deploymentName/v:version/warmup", auth.WithAuth(group.warmUpEndpoint))

return group
}

Expand Down Expand Up @@ -132,3 +137,48 @@ func (g *endpointGroup) ASGIRequest(ctx echo.Context) error {

return g.es.forwardRequest(ctx, cc.AuthInfo, stubId)
}

func (g *endpointGroup) warmUpEndpoint(ctx echo.Context) error {
cc, _ := ctx.(*auth.HttpAuthContext)

stubId := ctx.Param("stubId")
deploymentName := ctx.Param("deploymentName")
version := ctx.Param("version")

if deploymentName != "" {
var deployment *types.DeploymentWithRelated

if version == "" {
var err error
deployment, err = g.es.backendRepo.GetLatestDeploymentByName(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, types.StubTypeEndpointDeployment, true)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}
} else {
version, err := strconv.Atoi(version)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment version")
}

deployment, err = g.es.backendRepo.GetDeploymentByNameAndVersion(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, uint(version), types.StubTypeEndpointDeployment)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}
}

if deployment == nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}

if !deployment.Active {
return apiv1.HTTPBadRequest("Deployment is not active")
}

stubId = deployment.Stub.ExternalId
}

return g.es.warmup(
ctx,
stubId,
)
}
56 changes: 56 additions & 0 deletions pkg/abstractions/taskqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func registerTaskQueueRoutes(g *echo.Group, tq *RedisTaskQueue) *taskQueueGroup
g.POST("/:deploymentName/v:version", auth.WithAuth(group.TaskQueuePut))
g.POST("/public/:stubId", auth.WithAssumedStubAuth(group.TaskQueuePut, group.tq.isPublic))

g.POST("/id/:stubId/warmup", auth.WithAuth(group.TaskQueueWarmUp))
g.POST("/:deploymentName/warmup", auth.WithAuth(group.TaskQueueWarmUp))
g.POST("/:deploymentName/latest/warmup", auth.WithAuth(group.TaskQueueWarmUp))
g.POST("/:deploymentName/v:version/warmup", auth.WithAuth(group.TaskQueueWarmUp))

return group
}

Expand Down Expand Up @@ -92,3 +97,54 @@ func (g *taskQueueGroup) TaskQueuePut(ctx echo.Context) error {
"task_id": taskId,
})
}

func (g *taskQueueGroup) TaskQueueWarmUp(ctx echo.Context) error {
cc, _ := ctx.(*auth.HttpAuthContext)

stubId := ctx.Param("stubId")
deploymentName := ctx.Param("deploymentName")
version := ctx.Param("version")

if deploymentName != "" {
var deployment *types.DeploymentWithRelated

if version == "" {
var err error
deployment, err = g.tq.backendRepo.GetLatestDeploymentByName(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, types.StubTypeTaskQueueDeployment, true)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}
} else {
version, err := strconv.Atoi(version)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment version")
}

deployment, err = g.tq.backendRepo.GetDeploymentByNameAndVersion(ctx.Request().Context(), cc.AuthInfo.Workspace.Id, deploymentName, uint(version), types.StubTypeTaskQueueDeployment)
if err != nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}
}

if deployment == nil {
return apiv1.HTTPBadRequest("Invalid deployment")
}

if !deployment.Active {
return apiv1.HTTPBadRequest("Deployment is not active")
}

stubId = deployment.Stub.ExternalId
}

err := g.tq.warmup(
stubId,
)
if err != nil {
return ctx.JSON(http.StatusInternalServerError, map[string]string{
"error": err.Error(),
})
}

return ctx.NoContent(http.StatusOK)
}
8 changes: 8 additions & 0 deletions pkg/abstractions/taskqueue/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func (i *taskQueueInstance) startContainers(containersToRun int) error {
CheckpointEnabled: checkpointEnabled,
}

// Set initial keepwarm to prevent rapid spin-up/spin-down of containers
i.Rdb.SetEx(
context.Background(),
Keys.taskQueueKeepWarmLock(i.Workspace.Name, i.Stub.ExternalId, runRequest.ContainerId),
1,
time.Duration(i.StubConfig.KeepWarmSeconds)*time.Second,
)

err := i.Scheduler.Run(runRequest)
if err != nil {
log.Error().Str("instance_name", i.Name).Err(err).Msg("unable to run container")
Expand Down
9 changes: 9 additions & 0 deletions pkg/abstractions/taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ func (tq *RedisTaskQueue) getStubConfig(stubId string) (*types.StubConfigV1, err
return config, nil
}

func (tq *RedisTaskQueue) warmup(stubId string) error {
instance, err := tq.getOrCreateQueueInstance(stubId)
if err != nil {
return err
}

return instance.HandleScalingEvent(1)
}

func (tq *RedisTaskQueue) put(ctx context.Context, authInfo *auth.AuthInfo, stubId string, payload *types.TaskPayload) (string, error) {
stubConfig, err := tq.getStubConfig(stubId)
if err != nil {
Expand Down

0 comments on commit 367466d

Please sign in to comment.