diff --git a/runner/app/live/infer.py b/runner/app/live/infer.py index aaf1b486..546d3ff0 100644 --- a/runner/app/live/infer.py +++ b/runner/app/live/infer.py @@ -21,11 +21,11 @@ from streamer.zeromq import ZeroMQStreamer -async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, control_url: str, pipeline: str, params: dict): +async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, control_url: str, pipeline: str, params: dict, input_timeout: int): if stream_protocol == "trickle": - handler = TrickleStreamer(subscribe_url, publish_url, pipeline, **(params or {})) + handler = TrickleStreamer(subscribe_url, publish_url, pipeline, input_timeout, params or {}) elif stream_protocol == "zeromq": - handler = ZeroMQStreamer(subscribe_url, publish_url, pipeline, **(params or {})) + handler = ZeroMQStreamer(subscribe_url, publish_url, pipeline, input_timeout, params or {}) else: raise ValueError(f"Unsupported protocol: {stream_protocol}") @@ -39,14 +39,14 @@ async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish logging.error(f"Stack trace:\n{traceback.format_exc()}") raise e - await block_until_signal([signal.SIGINT, signal.SIGTERM]) try: + await asyncio.wait( + [block_until_signal([signal.SIGINT, signal.SIGTERM]), handler.wait()], + return_when=asyncio.FIRST_COMPLETED + ) + finally: await runner.cleanup() await handler.stop() - except Exception as e: - logging.error(f"Error stopping room handler: {e}") - logging.error(f"Stack trace:\n{traceback.format_exc()}") - raise e async def block_until_signal(sigs: List[signal.Signals]): @@ -102,6 +102,12 @@ async def start_control_subscriber(handler: PipelineStreamer, control_url: str): parser.add_argument( "--control-url", type=str, help="URL to subscribe for Control API JSON messages" ) + parser.add_argument( + "--input-timeout", + type=int, + default=60, + help="Timeout in seconds to wait after input frames stop before shutting down. Set to 0 to disable." + ) parser.add_argument( "-v", "--verbose", action="store_true", @@ -124,7 +130,7 @@ async def start_control_subscriber(handler: PipelineStreamer, control_url: str): try: asyncio.run( - main(args.http_port, args.stream_protocol, args.subscribe_url, args.publish_url, args.control_url, args.pipeline, params) + main(args.http_port, args.stream_protocol, args.subscribe_url, args.publish_url, args.control_url, args.pipeline, params, args.input_timeout) ) except Exception as e: logging.error(f"Fatal error in main: {e}") diff --git a/runner/app/live/params_api/api.py b/runner/app/live/params_api/api.py index 70ff7ee9..3636c00c 100644 --- a/runner/app/live/params_api/api.py +++ b/runner/app/live/params_api/api.py @@ -59,7 +59,7 @@ async def handle_params_update(request): raise ValueError(f"Unknown content type: {request.content_type}") handler = cast(PipelineStreamer, request.app["handler"]) - handler.update_params(**params) + handler.update_params(params) return web.Response(text="Params updated successfully") except Exception as e: diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index ae18ff99..b5c78932 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -15,18 +15,28 @@ class PipelineStreamer(ABC): - def __init__(self, pipeline: str, **params): + def __init__(self, pipeline: str, input_timeout: int, params: dict): self.pipeline = pipeline self.params = params self.process = None self.last_params_time = 0.0 self.restart_count = 0 + self.input_timeout = input_timeout # 0 means disabled + self.done_future = None def start(self): + self.done_future = asyncio.get_running_loop().create_future() self._start_process() + async def wait(self): + if not self.done_future: + raise RuntimeError("Streamer not started") + return await self.done_future + async def stop(self): await self._stop_process() + if self.done_future and not self.done_future.done(): + self.done_future.set_result(None) def _start_process(self): if self.process: @@ -68,7 +78,7 @@ async def _restart(self): logging.error(f"Stack trace:\n{traceback.format_exc()}") os._exit(1) - def update_params(self, **params): + def update_params(self, params: dict): self.params = params self.last_params_time = time.time() if self.process: @@ -92,6 +102,11 @@ async def monitor_loop(self, done: Event): time_since_last_params = current_time - self.last_params_time time_since_reload = min(time_since_last_params, time_since_start) + if self.input_timeout > 0 and time_since_last_input > self.input_timeout: + logging.info(f"Input stream stopped for {time_since_last_input} seconds. Shutting down...") + await self.stop() + return + gone_stale = ( time_since_last_output > time_since_last_input and time_since_last_output > 60 diff --git a/runner/app/live/streamer/trickle.py b/runner/app/live/streamer/trickle.py index acb73709..ed904619 100644 --- a/runner/app/live/streamer/trickle.py +++ b/runner/app/live/streamer/trickle.py @@ -17,9 +17,10 @@ def __init__( subscribe_url: str, publish_url: str, pipeline: str, - **params, + input_timeout: int, + params: dict, ): - super().__init__(pipeline, **params) + super().__init__(pipeline, input_timeout, params) self.subscribe_url = subscribe_url self.publish_url = publish_url self.subscribe_queue = queue.Queue[bytearray]() diff --git a/runner/app/live/streamer/zeromq.py b/runner/app/live/streamer/zeromq.py index c0a234ac..e11c1ca5 100644 --- a/runner/app/live/streamer/zeromq.py +++ b/runner/app/live/streamer/zeromq.py @@ -15,9 +15,10 @@ def __init__( input_address: str, output_address: str, pipeline: str, - **params, + input_timeout: int, + params: dict, ): - super().__init__(pipeline, **params) + super().__init__(pipeline, input_timeout, params) self.input_address = input_address self.output_address = output_address diff --git a/runner/app/pipelines/base.py b/runner/app/pipelines/base.py index a154a6f6..e2f32d78 100644 --- a/runner/app/pipelines/base.py +++ b/runner/app/pipelines/base.py @@ -5,8 +5,9 @@ class Pipeline(ABC): @abstractmethod def __init__(self, model_id: str, model_dir: str): + self.model_id: str # declare the field here so the type hint is available when using this abstract class raise NotImplementedError("Pipeline should implement an __init__ method") @abstractmethod - def __call__(self, inputs: Any) -> Any: + def __call__(self, **kwargs) -> Any: raise NotImplementedError("Pipeline should implement a __call__ method") diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index 23469cb9..b1672d32 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -26,22 +26,26 @@ def __init__(self, model_id: str): self.monitor_thread = None self.log_thread = None - def __call__( - self, **kwargs + + def __call__( # type: ignore + self, *, subscribe_url: str, publish_url: str, control_url: str, params: dict, **kwargs ): + if self.process: + raise RuntimeError("Pipeline already running") + try: if not self.process: - logger.info(f"Starting stream, subscribe={kwargs['subscribe_url']} publish={kwargs['publish_url']}, control={kwargs['control_url']}") + logger.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}, control={control_url}") self.start_process( pipeline=self.model_id, # we use the model_id as the pipeline name for now http_port=8888, - subscribe_url=kwargs["subscribe_url"], - publish_url=kwargs["publish_url"], - control_url=kwargs["control_url"], - initial_params=json.dumps(kwargs["params"]), + subscribe_url=subscribe_url, + publish_url=publish_url, + control_url=control_url, + initial_params=json.dumps(params), # TODO: set torch device from self.torch_device ) - logger.info(f"Starting stream, subscribe={kwargs['subscribe_url']} publish={kwargs['publish_url']}, control={kwargs['control_url']}") + logger.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}, control={control_url}") return except Exception as e: raise InferenceError(original_exception=e) @@ -84,6 +88,10 @@ def monitor_process(self): logger.error( f"infer.py process failed with return code {return_code}. Error: {stderr}" ) + else: + # If process exited cleanly (return code 0) and exit the main process + logger.info("infer.py process exited cleanly, shutting down...") + sys.exit(0) break logger.info("infer.py process is running...") diff --git a/runner/app/routes/live_video_to_video.py b/runner/app/routes/live_video_to_video.py index 5a4fa34c..51f03b30 100644 --- a/runner/app/routes/live_video_to_video.py +++ b/runner/app/routes/live_video_to_video.py @@ -1,7 +1,6 @@ import logging import os -import random -from typing import Annotated, Dict, Tuple, Union +from typing import Annotated, Any, Dict, Tuple, Union import torch import traceback @@ -54,18 +53,18 @@ class LiveVideoToVideoParams(BaseModel): model_id: Annotated[ str, Field( - default="", description="Hugging Face model ID used for image generation." + default="", description="Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument." ), ] params: Annotated[ Dict, Field( default={}, - description="Initial parameters for the model." + description="Initial parameters for the pipeline." ), ] -RESPONSES = { +RESPONSES: dict[int | str, dict[str, Any]]= { status.HTTP_200_OK: { "content": { "application/json": { @@ -84,9 +83,9 @@ class LiveVideoToVideoParams(BaseModel): "/live-video-to-video", response_model=LiveVideoToVideoResponse, responses=RESPONSES, - description="Apply video-like transformations to a provided image.", + description="Apply transformations to a live video streamed to the returned endpoints.", operation_id="genLiveVideoToVideo", - summary="Video To Video", + summary="Live Video To Video", tags=["generate"], openapi_extra={"x-speakeasy-name-override": "liveVideoToVideo"}, ) @@ -119,10 +118,8 @@ async def live_video_to_video( ), ) - seed = random.randint(0, 2**32 - 1) - kwargs = {k: v for k, v in params.model_dump().items()} try: - pipeline(**kwargs) + pipeline(**params.model_dump()) except Exception as e: if isinstance(e, torch.cuda.OutOfMemoryError): torch.cuda.empty_cache() diff --git a/runner/app/routes/utils.py b/runner/app/routes/utils.py index d1c20f13..11833bb6 100644 --- a/runner/app/routes/utils.py +++ b/runner/app/routes/utils.py @@ -230,9 +230,9 @@ def json_str_to_np_array( def handle_pipeline_exception( e: object, - default_error_message: Union[str, Dict[str, object]] = "Pipeline error.", + default_error_message: Union[str, Dict[str, object], None] = "Pipeline error.", default_status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR, - custom_error_config: Dict[str, Tuple[str, int]] = None, + custom_error_config: Dict[str, Tuple[str | None, int]] | None = None, ) -> JSONResponse: """Handles pipeline exceptions by returning a JSON response with the appropriate error message and status code. diff --git a/runner/gateway.openapi.yaml b/runner/gateway.openapi.yaml index cd3da7b0..a53e7918 100644 --- a/runner/gateway.openapi.yaml +++ b/runner/gateway.openapi.yaml @@ -415,8 +415,9 @@ paths: post: tags: - generate - summary: Video To Video - description: Apply video-like transformations to a provided image. + summary: Live Video To Video + description: Apply transformations to a live video streamed to the returned + endpoints. operationId: genLiveVideoToVideo requestBody: content: @@ -933,12 +934,16 @@ components: model_id: type: string title: Model Id - description: Hugging Face model ID used for image generation. + description: Name of the pipeline to run in the live video to video job. + Notice that this is named model_id for consistency with other routes, + but it does not refer to a Hugging Face model ID. The exact model(s) depends + on the pipeline implementation and might be configurable via the `params` + argument. default: '' params: type: object title: Params - description: Initial parameters for the model. + description: Initial parameters for the pipeline. default: {} type: object required: diff --git a/runner/openapi.yaml b/runner/openapi.yaml index c83cafcf..3f41d077 100644 --- a/runner/openapi.yaml +++ b/runner/openapi.yaml @@ -426,8 +426,9 @@ paths: post: tags: - generate - summary: Video To Video - description: Apply video-like transformations to a provided image. + summary: Live Video To Video + description: Apply transformations to a live video streamed to the returned + endpoints. operationId: genLiveVideoToVideo requestBody: content: @@ -950,12 +951,16 @@ components: model_id: type: string title: Model Id - description: Hugging Face model ID used for image generation. + description: Name of the pipeline to run in the live video to video job. + Notice that this is named model_id for consistency with other routes, + but it does not refer to a Hugging Face model ID. The exact model(s) depends + on the pipeline implementation and might be configurable via the `params` + argument. default: '' params: type: object title: Params - description: Initial parameters for the model. + description: Initial parameters for the pipeline. default: {} type: object required: diff --git a/runner/requirements.txt b/runner/requirements.txt index 72674276..26247a7f 100644 --- a/runner/requirements.txt +++ b/runner/requirements.txt @@ -19,3 +19,4 @@ sentencepiece== 0.2.0 protobuf==5.27.2 bitsandbytes==0.43.3 psutil==6.0.0 +PyYAML==6.0.2 diff --git a/worker/docker.go b/worker/docker.go index 54d0f290..46a8cfb4 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -13,7 +13,8 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/client" + docker "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" ) @@ -26,19 +27,21 @@ const optFlagsContainerTimeout = 5 * time.Minute const containerRemoveTimeout = 30 * time.Second const containerCreatorLabel = "creator" const containerCreator = "ai-worker" +const containerWatchInterval = 10 * time.Second // This only works right now on a single GPU because if there is another container // using the GPU we stop it so we don't have to worry about having enough ports var containerHostPorts = map[string]string{ - "text-to-image": "8000", - "image-to-image": "8100", - "image-to-video": "8200", - "upscale": "8300", - "audio-to-text": "8400", - "llm": "8500", - "segment-anything-2": "8600", - "image-to-text": "8700", - "text-to-speech": "8800", + "text-to-image": "8000", + "image-to-image": "8100", + "image-to-video": "8200", + "upscale": "8300", + "audio-to-text": "8400", + "llm": "8500", + "segment-anything-2": "8600", + "image-to-text": "8700", + "text-to-speech": "8800", + "live-video-to-video": "8900", } // Mapping for per pipeline container images. @@ -47,12 +50,18 @@ var pipelineToImage = map[string]string{ "text-to-speech": "livepeer/ai-runner:text-to-speech", } +var livePipelineToImage = map[string]string{ + "streamdiffusion": "livepeer/ai-runner:live-app-streamdiffusion", + "liveportrait": "livepeer/ai-runner:live-app-liveportrait", + "comfyui": "livepeer/ai-runner:live-app-comfyui", +} + type DockerManager struct { defaultImage string gpus []string modelDir string - dockerClient *client.Client + dockerClient *docker.Client // gpu ID => container name gpuContainers map[string]string // container name => container @@ -61,7 +70,7 @@ type DockerManager struct { } func NewDockerManager(defaultImage string, gpus []string, modelDir string) (*DockerManager, error) { - dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + dockerClient, err := docker.NewClientWithOpts(docker.FromEnv, docker.WithAPIVersionNegotiation()) if err != nil { return nil, err } @@ -88,30 +97,28 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin m.mu.Lock() defer m.mu.Unlock() - _, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags) - return err + rc, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags) + if err != nil { + return err + } + + // Watch with a background context since we're not borrowing the container. + go m.watchContainer(rc, context.Background()) + + return nil } func (m *DockerManager) Stop(ctx context.Context) error { - m.mu.Lock() - defer m.mu.Unlock() - var stopContainerWg sync.WaitGroup - for name, rc := range m.containers { + for _, rc := range m.containers { stopContainerWg.Add(1) - go func(containerID string) { + go func(container *RunnerContainer) { defer stopContainerWg.Done() - if err := dockerRemoveContainer(m.dockerClient, containerID); err != nil { - slog.Error("Error removing managed container", slog.String("name", name), slog.String("id", containerID)) - } - }(rc.ID) - - delete(m.gpuContainers, rc.GPU) - delete(m.containers, name) + m.destroyContainer(container, false) + }(rc) } stopContainerWg.Wait() - return nil } @@ -136,10 +143,14 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (* // Remove container so it is unavailable until Return() is called delete(m.containers, rc.Name) + go m.watchContainer(rc, ctx) + return rc, nil } -func (m *DockerManager) Return(rc *RunnerContainer) { +// returnContainer returns a container to the pool so it can be reused. It is called automatically by watchContainer +// when the context used to borrow the container is done. +func (m *DockerManager) returnContainer(rc *RunnerContainer) { m.mu.Lock() defer m.mu.Unlock() m.containers[rc.Name] = rc @@ -174,6 +185,12 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo containerImage := m.defaultImage if pipelineSpecificImage, ok := pipelineToImage[pipeline]; ok { containerImage = pipelineSpecificImage + } else if pipeline == "live-video-to-video" { + // We currently use the model ID as the live pipeline name for legacy reasons + containerImage = livePipelineToImage[modelID] + if containerImage == "" { + return nil, fmt.Errorf("no container image found for live pipeline %s", modelID) + } } slog.Info("Starting managed container", slog.String("gpu", gpu), slog.String("name", containerName), slog.String("modelID", modelID), slog.String("containerImage", containerImage)) @@ -223,6 +240,7 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo }, }, }, + AutoRemove: true, } resp, err := m.dockerClient.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, containerName) @@ -293,15 +311,9 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { // If the container exists in this map then it is idle and if it not marked as keep warm we remove it rc, ok := m.containers[containerName] if ok && !rc.KeepWarm { - slog.Info("Removing managed container", slog.String("gpu", gpu), slog.String("name", containerName), slog.String("modelID", rc.ModelID)) - - delete(m.gpuContainers, gpu) - delete(m.containers, containerName) - - if err := dockerRemoveContainer(m.dockerClient, rc.ID); err != nil { + if err := m.destroyContainer(rc, true); err != nil { return "", err } - return gpu, nil } } @@ -309,7 +321,72 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { return "", errors.New("insufficient capacity") } -func removeExistingContainers(ctx context.Context, client *client.Client) error { +// destroyContainer stops the container on docker and removes it from the +// internal state. If locked is true then the mutex is not re-locked, otherwise +// it is done automatically only when updating the internal state. +func (m *DockerManager) destroyContainer(rc *RunnerContainer, locked bool) error { + slog.Info("Removing managed container", + slog.String("gpu", rc.GPU), + slog.String("name", rc.Name), + slog.String("modelID", rc.ModelID)) + + if err := dockerRemoveContainer(m.dockerClient, rc.ID); err != nil { + slog.Error("Error removing managed container", + slog.String("gpu", rc.GPU), + slog.String("name", rc.Name), + slog.String("modelID", rc.ModelID), + slog.String("error", err.Error())) + return fmt.Errorf("failed to remove container %s: %w", rc.Name, err) + } + + if !locked { + m.mu.Lock() + defer m.mu.Unlock() + } + delete(m.gpuContainers, rc.GPU) + delete(m.containers, rc.Name) + return nil +} + +// watchContainer monitors a container's running state and automatically cleans +// up the internal state when the container stops. It will also monitor the +// borrowCtx to return the container to the pool when it is done. +func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Context) { + defer func() { + if r := recover(); r != nil { + slog.Error("Panic in container watch routine", + slog.String("container", rc.Name), + slog.Any("panic", r)) + } + }() + + ticker := time.NewTicker(containerWatchInterval) + defer ticker.Stop() + + for { + select { + case <-borrowCtx.Done(): + m.returnContainer(rc) + return + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), containerWatchInterval) + container, err := m.dockerClient.ContainerInspect(ctx, rc.ID) + cancel() + if err != nil { + slog.Error("Error inspecting container", + slog.String("container", rc.Name), + slog.String("error", err.Error())) + continue + } else if container.State.Running { + continue + } + m.destroyContainer(rc, false) + return + } + } +} + +func removeExistingContainers(ctx context.Context, client *docker.Client) error { filters := filters.NewArgs(filters.Arg("label", containerCreatorLabel+"="+containerCreator)) containers, err := client.ContainerList(ctx, container.ListOptions{All: true, Filters: filters}) if err != nil { @@ -335,20 +412,25 @@ func dockerContainerName(pipeline string, modelID string, suffix ...string) stri return fmt.Sprintf("%s_%s", pipeline, sanitizedModelID) } -func dockerRemoveContainer(client *client.Client, containerID string) error { +func dockerRemoveContainer(client *docker.Client, containerID string) error { ctx, cancel := context.WithTimeout(context.Background(), containerRemoveTimeout) - if err := client.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { - cancel() + err := client.ContainerStop(ctx, containerID, container.StopOptions{}) + cancel() + // Ignore "not found" or "already stopped" errors + if err != nil && !docker.IsErrNotFound(err) && !errdefs.IsNotModified(err) { return err } - cancel() ctx, cancel = context.WithTimeout(context.Background(), containerRemoveTimeout) - defer cancel() - return client.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + err = client.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + cancel() + if err != nil && !docker.IsErrNotFound(err) { + return err + } + return nil } -func dockerWaitUntilRunning(ctx context.Context, client *client.Client, containerID string, pollingInterval time.Duration) error { +func dockerWaitUntilRunning(ctx context.Context, client *docker.Client, containerID string, pollingInterval time.Duration) error { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() diff --git a/worker/runner.gen.go b/worker/runner.gen.go index 933cd0b4..4f30087b 100644 --- a/worker/runner.gen.go +++ b/worker/runner.gen.go @@ -247,10 +247,10 @@ type LiveVideoToVideoParams struct { // ControlUrl URL for subscribing via Trickle protocol for updates in the live video-to-video generation params. ControlUrl *string `json:"control_url,omitempty"` - // ModelId Hugging Face model ID used for image generation. + // ModelId Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument. ModelId *string `json:"model_id,omitempty"` - // Params Initial parameters for the model. + // Params Initial parameters for the pipeline. Params *map[string]interface{} `json:"params,omitempty"` // PublishUrl Destination URL of the outgoing stream to publish. @@ -2236,7 +2236,7 @@ type ServerInterface interface { // Image To Video // (POST /image-to-video) GenImageToVideo(w http.ResponseWriter, r *http.Request) - // Video To Video + // Live Video To Video // (POST /live-video-to-video) GenLiveVideoToVideo(w http.ResponseWriter, r *http.Request) // LLM @@ -2290,7 +2290,7 @@ func (_ Unimplemented) GenImageToVideo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) } -// Video To Video +// Live Video To Video // (POST /live-video-to-video) func (_ Unimplemented) GenLiveVideoToVideo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) @@ -2673,77 +2673,79 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xde2/ctpb/KoR2ASfAjF9tmoWB+4eTPmKs0wa2c9MiNQYc6YyGtUTqkpTHc7P+7gse", - "UhKpxzxc2+1t56+MRyTP+3cOyaPJlygWeSE4cK2iky+RiueQU/x4+uHsOymFNJ8TULFkhWaCRyfmCQHz", - "iEhQheAKSC4SyPajUVRIUYDUDHCNXKXd6VdzcNNzUIqmYOZppjOITqL3KjV/LQvzh9KS8TS6vx9FEv5V", - "MglJdPIZV71uptSM1vPE9DeIdXQ/ik7LhIkLx2WXlYuAfzITklAzg6TAQVIzqisUjsAPWfbTLDr5/CX6", - "bwmz6CT6r4NGmwdOlQfvIWH048V5dH896tGEowSJpbzfkdaS8+UNZOoR+o1IlpMUOA68Eldwpw27A1KE", - "LH0sMkGTihsyYxkQLcgUiJaUm5FTSIxOZkLmVEcn0ZRxKpdRi7+uEUdRDpomVFNLdUbLzMz/ch+19XKa", - "JMx8pBn5TUwJ45YYE9zxUlClIDF/6DmQghWQMR76UUWrjw9j7AlLQj46XLwr05TxlHxP48pBzr4lpSFs", - "HKXSR1F5SU3aDk36SEvQpeQTzXJQmuaFCnnQsoQOHxc4hzRzLPl5YBKi4U7vk8uyKIQ03nRLsxLUCdlT", - "wDXwGPZGZG8hZLI3IsbNiWWKTIXIgHLyYs8Q3zPP9mY0U7D3cp98azkjTBH3+EWz3sv9aiTJgXJFuPCY", - "3HfU3DPzeTylaLVmjKc1J+VVo5l1MNAJjD6/XxEeZzlN4UrgP934SEuWUB7DRMU0g8BMr/dftW30HY9F", - "KWkKynmKrjEECMvxQZwJBdmSZIzfNM5r7EYKKfJCkxdzls5BOtuRnC6JhKSM3RLkXyXNmF6+9PX2g+OT", - "XCKftby8zKcgjbysEnAg0u3aWhjO2WxJFkzPO3E1HO5Wfz2+jutOVujxqKvHbyGVgMws5iy2bDQIaTll", - "ihSlmqMKF1QmCkcxzjSjmR2z3+aPrFdTJiRVayDhlJyLi1Py4lwsxheU35DThBYakemlMzzlCWFakVhI", - "mx0TE2ULYOlcY+BaIbwEQ767o3mRwQn5Qn6NMqqB63EsuGLKBNryIIvzseFurJK77NfohBztH47IrxEH", - "yX5TBwW7g2xMpR5XT4/vfQWco2BPhoMdeTaEQg4p1ewWJtb51zBx1YTJC/USw6tkCZDFnGrzF9zFWZkA", - "mUmR96j4LOVCGg+akdAhya/l4eFXMTny2f7RsUY+WNb6uC/ziY3rSQGyT4ajtgg/oqsRMasAwceIAqQT", - "L2CkzMmZHfwBZIcdxjWk1nuRHz4DCSiahlZqOTo8HOYnAS6YMjbGifvkvZBgP5NSlTQzqAUUMctBlIOi", - "SpRpqYnKxAIkqbkwyyRlhpE7XZp8AzzV84581XhyiVz3SeerdxOvWOWTwzZVdAZ6OYnnEN8EyjOpr629", - "DyANJppEitMITkNXVJrliPuzNnYZWCizxJQwYjYDroyTCUnmVOazMvPZvLSrvkVmamZdtkZuAZKuRi7B", - "haWkPBE5sfg2oAozuFffla0CLRzu/88AXIuZLUWaMo0WRcaaJCehsrG1zItD8+QoSGSXFc0ONrfyflEZ", - "0Ca2ngIgyOzrK4D+AnnjtFmL/miZ8xEL1Nokm8Ly70LjYZJDUdey7TqTbljT/ZMlILomnbVA8Zu+DdlM", - "0hwUArKCWPAE3TuoQ27N8r503w/g1hzTfkDz1eteqnYkYZxgOlcbEH1nF++ju7Hv1vmH2vUxf/6hXmvZ", - "2L6cyIUZPZmW8Q3oNhdHx6/bbHysCBoT427TMGVUTnNRcm0MYNest1t+QYE2s6nQPHIwaz7mJne6mQuW", - "ZQbsGcdHHRO+t8PeINOBYH5qF0zBhJbpZACWD487dWotAk4mNEkaMA4EtuUyeRdsPNymQ4KCfJph2Tw4", - "1xa8PJZAVSV3kOKRgdMyJcMAv758OX71H1y97OqKShMLlrS89+jw+Os+PMSRW8HhJ1y7S3XLDGNTx4oU", - "c37+vptZ5kxpIZch9H2+9tHajeiDLno30eIGeNvnv/GQgt6RKzumT7GD2Ltdyt+gRtYSaB6QwTOgsI6j", - "eb9rLZWGfFKfCvfweYlDSO8x8CjSkBfG/qWEFga+bpa48gZtWEv2uIMx8wovuIQ0B65P+VLPGU+Puy4x", - "FXc9R+ckQxghXxMqJV2SlN0CJ1QRSqbirjoIcmiLVh2ZKPj5l59/ITYn+z7/RtwNnrx0iZ9VWV9Z5h+a", - "56m6mTBelLpXPrEYS1AiKzG1mcEEB7eE0suCxYjNuGWnpJBwy0SpzIeExTibaYcuo6a2RnQ8unt394m8", - "ePePT/84fvUNAtPl6ftgP/HeUD5DNv90Zx95mRksVzcTUepakSuywpnZYZUwajRoawvpzobnZhtmFrSH", - "wzSfsrQ0yrSqt26lRkTMNHDzZ1LGePoLWoN0M/WccpN3GE8z8MwQSFVxTn6ynPfFOTdOlbF/wyQWQiZq", - "O/EKwbgmOJNxqkHVZVS9brOxpDwF8vlwdHTtXARnO7oE7gqItR0+BTtAgjJfmq+s+RKWm4wpuArrFkeL", - "vLUy9AnqE+sGw493xy7KxcxJ5QzRioXFHCQQoLFjnzBjOPLi59EvL5scGGyncFibMw/SkbGMTiHrYewc", - "v6/r2oC1ipsjwnjCYtQ/NUMhlaLkiRttqr7DYMiUxjf+kC67luyKa5FMpExv4S12miIlH5sIUHORmToX", - "3dOuRRhX2tR+YmZYRIzD5z1XD+eWetfOm1YQnZywIn98LOrz8AceOzzyaf3jAGJpxUoefiq8ZiPw+tXf", - "6BhzI23uzjPX7Tu2Pj+sgrMnft/OS37TV/fE5gFuU4wxMSppc9XZ7SLQ7tCxu/XBBdx+B1f1RQwPwLzK", - "uKI0sGb1uLMw05Abhu49GvVaNSFMYx1Nan+gYczTpVVUjwbfXV19GGgsMY827CxJQFOWbd6FUTeJdLsw", - "vsWlIHGUvW6DbjOGI+vJ2YgzIOs/acYSXK6WekiUyhYrJWmv5xnOStJnNZ/b9gJ9fAPN9PxtBRwhv0pT", - "XbZuSn/63+AkHwf0nfU2Z5cNgR76mKW2atvpQ8iexKr6w6MNa35grG/u8U1gb+vWBY7j5bo1b1VTj3cA", - "vpViEE1W6WUYiBqtYHPJWhRqo0MLEfok6BH0/Py9L2DIrPSeNOVbezEPFvHIZGLqE3+KPUkhH9Um+UJ6", - "63vLeZL5LPdJxG4Bj5Xc6dIHKql1rVC4WHAtRTYpZbamCvt4cY7GVeUU+39MzXPLKLmSLL7BvZvQIhaZ", - "q8kSLNHdIXRmcjSeno21GLeP20mBzPnGfmvZIh9l9ufbSRe1LmvaJpW1NgyuNwQHgwbZ7H3q9FKXU3bB", - "HjMW5TRjat6YJ0wiSpvdqlGhsY5Ls6LUqbAVqQSam3LKLRPQtF8NabiyMvSTvhSljMGnyngs8pBqvQbR", - "wanpZf19L/FWKISchCrx46Hf4TcIja2gbbUnd5GuE199IYXRUl0FraaweYw8rusQLf4453kS3xkFtlnh", - "Satg9j1VN2orB7Jzq+PQAa/xDyXaGVLSxYiU3DuXak7NFHlhp75swMZwGLZmhUcO4SHr2mKlsx6qoNcL", - "YiGHih/Ux57ZyvEZS3ALa4cj33gqFZIM8MMuvLZl2zGmquFOq9ct3lfaF2utnq1Xbh5UxjSORJm9n/T6", - "hulUlLp1hYTzugbnarbokvk0B13d9lqCC6rILKNpCgmhivx4+f2n4FDALLP5RtdYwjyxKdG/mq8pbnTF", - "1hvlZnET4vZorxEhptzs3Wkcg1K2r7sisElI29BVlhVUm29PNNeQHT9enPeZElFYity1fw5yGVrsuWVu", - "S2mE6RH08Wt13LWrTap1u8HffCNjt+v3rf17dyMzeuL9wqiS8TqcvQoYzHPX7TVUWv91urkfs6Wo0yu9", - "oqVo1x69a4/+67ZHv/pbd0eTSzBbYw0EL6YLe4KAF5V4kL33f3vGNVT9ctF0ObB53t1FPGcPVAe/N+yB", - "6na9dFPoYJ69LADi+VCiDaTwIeuU5AZPVAH0BiRJwOytpTI2zgz4Z0sCd4UEhXYzaYJyNHVi5kA8r+5U", - "jNOhr5qvExxZMB1j5HQ21tVfRncVabOh1QCu3DJ/2fX77egt8oSt2ptwsipbNEXZ6hRh20Hw6HAVqcF6", - "LfSXwBV6HGbt1Ucm4uDeg/Klu8tpS/il49PX934Oj2loobpcdc1mTb2Orxz36hC/aIYiz+TKfLuudDVy", - "WFJupBdaG1y3bH/0tf6wy7axryvUq6ZvMzbYK2x5+9HeI1R98ZaJNbchjlVfZ6vPehCh41Iyvbw0rFg5", - "311dfXgDVIKs32RHWLdf1YvMtS6ie7MG47OeN6BP3dsrcf3CsSw5OT2rGyxUFJ5PFQZLTs/IRck5EjK4", - "Ztc63D/cPzQKEQVwWrDoJPpq/2j/0FiL6jmyfYDvsY61GFdBXAjVl83rl329d7NtJ5HbbYnCecNZYrYS", - "7RdhjcpB6TciWVanocCRkM36VOoDk3bH1Tva1szrnKDvrdv70MQmx+MX1qAo9vHhYYsLT+sHvymbPzZj", - "IdggIu1W4i5xsz8rM9IMG0VfPyILzTVwD/03NCEXVvuW7tHz0P3IaannQrJ/Q4KEj756HsJOWPId16YM", - "vhKCnFOZWq0fvXou6ZuCFZHKYrlh4fj4UVnoXMl3mWmGkPra/tVz+d8Z1yA5zcglyFuQFQcejGLO9QH0", - "8/X99ShSZZ5Tuax+1IFcCVKVBjRVBrurVGLQ+25sSyyqlmNOcxiLW5CSJYj8ATqMooM5dgDg2Qyg7CF6", - "2QaB6AlBw29B2BQz7n2VOBZRGqzEDYbX7Xv9IH5aFNmy6uELXpZDJKdm32Vyslfbd1C99XbjE8N6QO2Z", - "cT1sitgB+zCw7wBtW0CzL0NcCVJ3xG6JaCwMDB8ENijk8HzD4sD6Oi58+fV5Av6PqOP6OoR2Uf8nL+d2", - "0PNg6HlgLcWCCPWB57Z+770XeX7oe9t7q6KjejvyeTDIUntmEArPHnbwsys6niDy67eMHxb6VWCMooOM", - "3cI4bFFbt/2wozN28/A9SLsvayUkPNweA42Ez4wIg01oO3DYgcPjgQP61u8Bh6wdlBYgsnyDigBvoEq8", - "5aYkozwtDVLVF7xdAMDX4zeL+bvxYrEYYzlQygx4LBJ7vbpdUWBIPnfke439u2DfBfvjBbv7eYltIzzL", - "bVC7luUxde8aj4+HY9y9luwaZPE9EMpX5Pae15ifuNzvUHzmMA9bj3eBvgv0xwv0Kvoq5ybHD4h71Q2Q", - "UXRgcvYGdw4/tDpXcdPvNaqqXhTwOoKeqLjv9hztrhd2Yf8XCXvstvodtwvaC78g2G3f1kZnfOEU/xf9", - "7Q+xC/tCaL3d102HGOWJ16oX/Mz9AFLYXrAnhYqg3eyZsSL8Txd2WLHDisfHijqEHgYWbjqiRen9vFAv", - "TLifOKl3AmS6rH7FE1+d04o0v+LWG/bNj6Q88e6gIrSrDnYR/xeJeO8HhrYM9dIPBoUMKCTX+oW3qi/1", - "bSbKhLwVeV5yppfkB6phQZeRe1EUu2HVycFBIoHm49Q+3c/c9P3YTMf264H1LzVWFUPL1gspHHdAC3Yw", - "BU0Pannvr+//PwAA///ptjPPhWoAAA==", + "H4sIAAAAAAAC/+xde3PbuK7/KhzdO5N2xs5rt9s7mTl/pN1HMzftdpL0dHe6mRxagmU2EqlDUrF9evPd", + "7wCUZFKSX90ke86u/6pjkQRAAD+AIOR+iWKVF0qCtCY6+RKZeAI5p4+n789+0Fpp/JyAibUorFAyOsEn", + "DPAR02AKJQ2wXCWQ7UeDqNCqAG0F0Bq5SbvTryZQTc/BGJ4CzrPCZhCdRG9Nin/NC/zDWC1kGt3fDyIN", + "/yyFhiQ6+USrXi+mNIw289ToM8Q2uh9Ep2Ui1EXFZZeVi4B/NlaacZzBUpCgOY7qCkUj6EOW/TyOTj59", + "if5bwzg6if7rYLGbB9VWHryFRPAPF+fR/fWgZycqSpA4yvsdaR05X95Aph6hX6lkfpOCpIFX6gpmFtld", + "IkXI0ociUzypuWFjkQGzio2AWc0ljhxBgnsyVjrnNjqJRkJyPY9a/HWVOIhysDzhljuqY15mOP/LfdTe", + "l9MkEfiRZ+yzGjEhHTGhZMVLwY2BBP+wE2CFKCATMrSjmlYfH6jsG5GEfHS4eFOmqZAp+5HHtYGcfc9K", + "JIyGUu9HUVtJQ9oNTfpIa7ClljdW5GAszwsT8mB1CR0+LmgOW8xx5CeBSpiFmd1nl2VRKI3WdMezEswJ", + "2zMgLcgY9gZsb6p0sjdgaObMMcVGSmXAJXu2h8T38NnemGcG9p7vs+8dZ0wYVj1+tljv+X49kuXApWFS", + "eUzuV9SqZ/h5OOKktcUYb9cqKa8WO7MOBjqO0Wf3K9zjLOcpXCn6p+sfaSkSLmO4MTHPIFDTy/0XbR39", + "IGNVap6CqSzFNhgCTOT0IM6UgWzOMiFvF8aLemOFVnlh2bOJSCegK92xnM+ZhqSMqyXYP0ueCTt/7u/b", + "TxWf7JL4bOSVZT4CjfKKWsAlnu7Wtgo5F+M5mwo76fjVcnd3+9dj67TuzYp9POru4/eQaiBmphMROzYW", + "COk4FYYVpZnQFk65TgyNElJYwTM3Zr/NH1u/TZnS3KyBhFN2ri5O2bNzNR1ecHnLThNeWEKm55XiuUyY", + "sIbFSrvomKCXTUGkE0uO64TwAgz7YcbzIoMT9oX9FmXcgrTDWEkjDDra/CCL8yFyNzTJLPstOmFH+4cD", + "9lskQYvP5qAQM8iGXNth/fT43t+AcxLs0XCwI8+GUCgh5VbcwY0z/jVMXC3c5Jl5Tu5VigTYdMIt/gWz", + "OCsTYGOt8p4tPkul0mhBYxYaJPutPDz8JmZHPtvvKtbYe8daH/dlfuP8+qYA3SfDUVuEd2RqTI1rQPAx", + "ogBdiRcwUubszA1+D7rDjpAWUme9xI8cgwYSzUIrtBwdHi7nJwGphEEd08R99lZpcJ9ZaUqeIWoBJ8yq", + "IKqColqUUWmZydQUNGu4wGWSMiPPHc0x3oBM7aQjXz2eXRLXfdL527uJVayyyeU6NXwMdn4TTyC+DTYP", + "Q197996DRkzEQErTGE0jUzRW5IT74zZ2ISyUWYIpjBqPQRo0MqXZhOt8XGY+m5du1dfETMNsFa2JW4Ck", + "uyOXULml5jJROXP4tmQrcHDvfte6CnbhcP9/lsC1GrtUZJGm8aLIxCLIaah17DTz7BCfHAWB7LKm2cHm", + "VtwvagW6wNaTAASRfX0G0J8gbxw2G9EfLHI+YILaqGRTWP5daLyc5DKva+l2nUo3zOn+LhJQXZWOW6D4", + "Xd+BbKx5DoYA2UCsZELmHeQhd7i8L92PS3BrQmE/oPniZS9VN5IJySicmw2IvnGL99Hd2Hab+MPd+hQ/", + "/1CrdWxsn07kCkffjMr4Fmybi6Pjl202PtQEUcV02kSmcMt5rkppUQFuzea45ScUpDMXCvFRBbP4McfY", + "Wc2ciixDsBeSHnVU+NYNe0VMB4L5oV0JAze8TG+WwPLhcSdPbUSgyYwnyQKMA4FduszeBAeP6tChwUA+", + "yihtXjrXJbwy1sBNLXcQ4omB0zJlywF+ffpy/OI/OHvZ5RX1TkxF0rLeo8Pjb/vwkEZuBYcfae0u1S0j", + "jAsdK0LM+fnbbmSZCGOVnofQ9+naR+tqRB908dmNVbcg2zb/nYcUfMau3Ji+jV2KvduF/A1yZKuB5wEZ", + "qgGFeRzP+01rbizkN01VuIfPSxrCesvAg8hCXqD+Sw0tDHy5WOLKG7RhLtljDqjmFVZwCWkO0p7KuZ0I", + "mR53TWKkZj2lc5YRjLBvGdeaz1kq7kAybhhnIzWrC0EV2pJWB+gFv/z6y6/MxWTf5l+p2dLKS5f4WR31", + "jWP+a+M8N7c3Qhal7ZVPTYcajMpKCm04mNHgllB2XoiYsJmO7JwVGu6EKg1+SERMs4Wt0GWwyK0JHY9m", + "b2Yf2bM3f/v4t+MX3xEwXZ6+Dc4Tb5HyGbH5b1f7yMsMsdzc3qjSNhu5Iiqc4QmrhMFiB11uoava8ASP", + "YbigKw7zfCTSEjfTbb0zKzNgamxB4p9JGVP1F6wFXc20Ey4x7giZZuCpIZCq5pz97Djv83OJRpWJf8FN", + "rJROzHbiFUpIy2imkNyCadKoZt3FwZLLFNinw8HRdWUiNLuiy2BWQGzd8BG4ARoMfolfOfUlIseIqaQJ", + "85aKFnvtZOgT1CfWdYZ3s+PKy9W4kqpSRMsXphPQwIDHFftMoOLYs18Gvz5fxMDgOEXD2px5kE6MZXwE", + "WQ9j5/R9k9cGrNXcHDEhExHT/nMcCqlWpUyq0Zj1HQZDRjy+9Yd02XVkV1yLZCoVdgtrcdMMK+UQPcBM", + "VIZ5LpmnW4sJaSzmfmqMLBLG0fOeq4dzR72r500ziE5MWBE/PhRNPfwryw4PXK1/GEAsnVjJ11eF1xwE", + "Xr74C5UxN9rNXT1z3blj6/ph7Zw9/vt6Usrbvrwnxgd0TEFlklfyxVVnt4vAVkXH7tGHFqjOO7SqL2JY", + "APMy45rSkjXrx52FhYUcGbr3aDRrNYQojHV20voDkTFvL91G9ezgm6ur90saS/DRhp0lCVguss27MJom", + "kW4Xxve0FCQVZa/boNuMUZH15FyIs0TWv/NMJLRcI/UyUWpdrJSkvZ6nOCdJn9Z8btsL9PENPLOT1zVw", + "hPway23Zuin9+X+DSj4N6Kv1LmqXCwI99ClKbdW204eQPYHV9LtHG9Z8x1jf3OOrwN3WrXOcipfr1rxV", + "TT1eAXyrjSE0WbUvy4FosSvUXLIWhdro0EKEPgl6BD0/f+sLGDKrvSeL9K29mAeLVDK5wfzEn+IqKeyD", + "2SReaG99bzlPMp/lPonEHVBZqaouveeaO9MKhYuVtFplN6XO1mRhHy7OSbmmHFH/D+Y8d4KzKy3iWzq7", + "KatilVU5WUIpelWEzjBGU/VsaNWwXW5nBTHnK/u1Y4t90NnvSBzf8Rzq4FOnqphd6FJ2OcMH7sNnNdpn", + "75QVMeD51DI7wcORYZLnmOpXxOtKft0z4QKwspjjaVVaMANK7oRliQLDpLLuUhIpcdab1boiP8x4bN13", + "mJslUIBMDFMylETkRQaY/Vf3nzJhOd3ojKhoPhZpqfkoA1ITzvyH2+h/MK7Tsq7GbFSta6yn2W0M3q0j", + "UtUNQ4PBgvZOez3dcpVF9thuUY4yYSYLmwwjp7F4REeJ0SQr9arSpsql4Rp4jltcLRPQdF8tM6vatKGf", + "9KUqdQw+VSFjlYdUmzWYDUrFl833vcRb/h9yEm6JDwL9Xr4BHmyF56vdtwvvHVDpwxGCiPr+azWFzYHh", + "YU2HWfXHGc+j2M4g0M0KS1oVW95yc2u2MiA3t64BL7EavxLTTgs0nw5YKb1i3KJUaNgzN/V5gzdUWwz7", + "0cI6S1hZXpuhddajLei1gljpZRkf7ceecdic0LndDSe+qRQXkgzwwy28tk+9YszUw6tdvW7xvlK/lGD2", + "nDdzfFArEw2JC3cp6zVL85EqbevejOZ1FS7NeNol83ECtr7idgSn3LBxxtMUEsYNe3f548egEoLLbH66", + "R03gE1dA8vsRGoob3Sv2ejkuji7u6pkLEWIuMSrzOAZjXDN7TWATl3auaxwrtG2+Pkldy/T44eK8T5WE", + "wlrlVc/rUi5DjT21zG0pUZgeQR/+gEKlCrPJEcVVNTY/vbkaxX2raNE9vQ0e+ZA0qGW8DmevAgZ8XrW4", + "LTtP/Hla2B+yj6rTIL6ij2rXE77rCf/z9oS/+Eu3hLNLwNOxBUa38YUrm9DtLBUP9v5vD03DNG9UjeaL", + "O9vdBcwf1vjVwe8NG7+6rT7dELo0zl4WAPFkWaANpPAh65TliCemAH4LmiWAZ2ttUMcZgn82ZzArNBjS", + "G4YJLknVCc6BeFJfJKHRka3i1wmNLISNyXM6B+v6L9y7mjQeaC1AlW7hX279fj16izxif/omnKyKFouk", + "bHWIcD0wVC9dRWppvhbaS2AKPQaz9r4nU3Fw2cPlvLrAakv4pWPT1/d+DI95qKEmXa067Bb5Or1n3buH", + "9MViKPHMrvDbdakryuFIVSM919rgjmn70tf6Ypfr3V+XqNed7jg2OCtseeXTPiPULwM4JtZcAVWs+nu2", + "utZDCB2XWtj5JbLi5HxzdfX+FXANunl9n2DdfdUsMrG2iO5xDSHHPa99n1av7MTNW9a6lOz0rKkWmyis", + "TxWIJadn7KKUkgghrrm1DvcP9w9xQ1QBkhciOom+2T/aP0RtcTshtg/o5d2hVcPaiQtl+qJ584az90K6", + "a5+qTluqqKzhLMGjRPvtX9xyMPaVSuZ1NRQkEXJRn2t7gGF3WL+Y7tS8zgj6XjW+D1WMMZ6+cAolsY8P", + "D1tceLt+8Nm4+LEZC8EBkWi3AndJh/1xmbHFsEH07QOysLj77qH/iifswu2+o3v0NHQ/SF7aidLiX5AQ", + "4aNvnoZwJSz7QVpMg6+UYudcp27Xj148lfSLhJWQymE5snB8/KAsdPoQuswshrCmV+HFU9nfmbSgJc/Y", + "Jeg70DUHHoxSzPUB9NP1/fUgMmWecz2vf8mCXSlWpwY8NYjddShB9J4NXYrFzXwoeQ5DdQdai4SQP0CH", + "QXQwobYHqs0AyR6il+uKiB4RNPy+i00x497fkopFkoYyccTwpmexH8RPiyKb142LwRuCxl2/FlphTPZy", + "+w6qt17pfGRYD6g9Ma6HnSA7YF8O7DtA2xbQ3BsgV4o1bcBbIpoIHcMHgQ0SOapvOBxYn8eFb/w+jcP/", + "EXlcX1vUzuv/zdO5HfR8NfR8ZS4lAg/1geeuedm/F3l+6nvFfauko34l9GkwyFF7YhAKaw87+NklHY/g", + "+c2r1V/n+rVjDKKDTNzBMGxRW3f86D14eB2nrvHL/8kaW2oJCQOZ0Ptsphci2r1aK2Hi63W0pLnwiVFi", + "aWPaDjB2gPFwgIFm5sDi96BG1vZMhxxZvkGqQFdTJV1/c5ZxmZYIYc3NbxcF6McCNnP82XA6nQ4pTyh1", + "BjJWibt33S5bQJJP7f7eaw47j995/AN6vPuxjW09PMudU1e9zENevXk9PF7u49VL2lXnLL0Vw+WKM0DP", + "S92PfA7oUHxiNw97kneOvnP0h3P02vtq42bHX+H3pusgg+gAY/YGlxE/tVpaqRrgdbD2p/leq9AjZfjd", + "ZqTdvcPO7f8kbk9tWL/j2sF67hc4u2vo2qj4F07x/38D97P09RuddVnQLlrHuEy8Hr7gR/+XIIVrEntU", + "qAj60J4YK8L/gmKHFTuseHisaFzo68Cimk5oUXo/ttQLE9UPvjQnATaa179pSu/UWcMWv2nX6/aLn4x5", + "5NNBTWiXHew8/k/i8d7PLW3p6qXvDIYYMESu9Xt3dcPq60yVCXut8ryUws7ZT9zClM+j6g1SapM1JwcH", + "iQaeD1P3dD+rpu/HOJ36spesf2kpq1i2bLOQoXEHvBAHI7D8oJH3/vr+/wMAAP//MrKa25NrAAA=", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/worker/worker.go b/worker/worker.go index 76d1d489..f1893d69 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -63,11 +63,12 @@ func NewWorker(defaultImage string, gpus []string, modelDir string) (*Worker, er } func (w *Worker) TextToImage(ctx context.Context, req GenTextToImageJSONRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "text-to-image", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) resp, err := c.Client.GenTextToImageWithResponse(ctx, req) if err != nil { @@ -114,11 +115,12 @@ func (w *Worker) TextToImage(ctx context.Context, req GenTextToImageJSONRequestB } func (w *Worker) ImageToImage(ctx context.Context, req GenImageToImageMultipartRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-image", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToImageMultipartWriter(&buf, req) @@ -171,11 +173,12 @@ func (w *Worker) ImageToImage(ctx context.Context, req GenImageToImageMultipartR } func (w *Worker) ImageToVideo(ctx context.Context, req GenImageToVideoMultipartRequestBody) (*VideoResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-video", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToVideoMultipartWriter(&buf, req) @@ -233,11 +236,12 @@ func (w *Worker) ImageToVideo(ctx context.Context, req GenImageToVideoMultipartR } func (w *Worker) Upscale(ctx context.Context, req GenUpscaleMultipartRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "upscale", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewUpscaleMultipartWriter(&buf, req) @@ -290,11 +294,12 @@ func (w *Worker) Upscale(ctx context.Context, req GenUpscaleMultipartRequestBody } func (w *Worker) AudioToText(ctx context.Context, req GenAudioToTextMultipartRequestBody) (*TextResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "audio-to-text", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewAudioToTextMultipartWriter(&buf, req) @@ -362,7 +367,9 @@ func (w *Worker) AudioToText(ctx context.Context, req GenAudioToTextMultipartReq } func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interface{}, error) { - c, err := w.borrowContainer(ctx, "llm", *req.ModelId) + isStreaming := req.Stream != nil && *req.Stream + borrowCtx, cancel := context.WithCancel(context.Background()) + c, err := w.borrowContainer(borrowCtx, "llm", *req.ModelId) if err != nil { return nil, err } @@ -378,16 +385,19 @@ func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interf var buf bytes.Buffer mw, err := NewLLMMultipartWriter(&buf, req) if err != nil { + cancel() return nil, err } - if req.Stream != nil && *req.Stream { + if isStreaming { resp, err := c.Client.GenLLMWithBody(ctx, mw.FormDataContentType(), &buf) if err != nil { + cancel() return nil, err } - return w.handleStreamingResponse(ctx, c, resp) + return w.handleStreamingResponse(ctx, c, resp, cancel) } + defer cancel() resp, err := c.Client.GenLLMWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf) if err != nil { @@ -397,11 +407,12 @@ func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interf } func (w *Worker) SegmentAnything2(ctx context.Context, req GenSegmentAnything2MultipartRequestBody) (*MasksResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "segment-anything-2", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewSegmentAnything2MultipartWriter(&buf, req) @@ -454,11 +465,12 @@ func (w *Worker) SegmentAnything2(ctx context.Context, req GenSegmentAnything2Mu } func (w *Worker) ImageToText(ctx context.Context, req GenImageToTextMultipartRequestBody) (*ImageToTextResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-text", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToTextMultipartWriter(&buf, req) @@ -517,11 +529,12 @@ func (w *Worker) ImageToText(ctx context.Context, req GenImageToTextMultipartReq } func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONRequestBody) (*AudioResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "text-to-speech", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) resp, err := c.Client.GenTextToSpeechWithResponse(ctx, req) if err != nil { @@ -567,6 +580,57 @@ func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONReques return resp.JSON200, nil } +func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) { + // Live video containers keep running after the initial request, so we use a background context to borrow the container. + c, err := w.borrowContainer(context.Background(), "live-video-to-video", *req.ModelId) + if err != nil { + return nil, err + } + + resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) + if err != nil { + return nil, err + } + + if resp.JSON400 != nil { + val, err := json.Marshal(resp.JSON400) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 400", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 400: " + resp.JSON400.Detail.Msg) + } + + if resp.JSON401 != nil { + val, err := json.Marshal(resp.JSON401) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 401", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 401: " + resp.JSON401.Detail.Msg) + } + + if resp.JSON422 != nil { + val, err := json.Marshal(resp.JSON422) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 422", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 422: " + string(val)) + } + + if resp.JSON500 != nil { + val, err := json.Marshal(resp.JSON500) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 500", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 500: " + resp.JSON500.Detail.Msg) + } + + return resp.JSON200, nil +} + func (w *Worker) Warm(ctx context.Context, pipeline string, modelID string, endpoint RunnerEndpoint, optimizationFlags OptimizationFlags) error { if endpoint.URL == "" { return w.manager.Warm(ctx, pipeline, modelID, optimizationFlags) @@ -641,17 +705,7 @@ func (w *Worker) borrowContainer(ctx context.Context, pipeline, modelID string) return w.manager.Borrow(ctx, pipeline, modelID) } -func (w *Worker) returnContainer(rc *RunnerContainer) { - switch rc.Type { - case Managed: - w.manager.Return(rc) - case External: - // Noop because we allow concurrent in-flight requests for external containers - } -} - func (w *Worker) handleNonStreamingResponse(c *RunnerContainer, resp *GenLLMResponse) (*LLMResponse, error) { - defer w.returnContainer(c) if resp.JSON400 != nil { val, err := json.Marshal(resp.JSON400) if err != nil { @@ -688,7 +742,7 @@ type LlmStreamChunk struct { Done bool `json:"done,omitempty"` } -func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer, resp *http.Response) (<-chan LlmStreamChunk, error) { +func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer, resp *http.Response, returnContainer func()) (<-chan LlmStreamChunk, error) { if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } @@ -697,7 +751,7 @@ func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer go func() { defer close(outputChan) - defer w.returnContainer(c) + defer returnContainer() scanner := bufio.NewScanner(resp.Body) totalTokens := 0