From e630fe246415f5bb902bcdd0e9a41a6de97a6bb4 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 12 Nov 2024 10:25:33 -0300 Subject: [PATCH 01/15] runner/requirements: Add missing PyYAML dependency Required to run gen_openapi.py. If the idea is to not install this on runtime, it should at least be documented on README. --- runner/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/runner/requirements.txt b/runner/requirements.txt index 72674276..26247a7f 100644 --- a/runner/requirements.txt +++ b/runner/requirements.txt @@ -19,3 +19,4 @@ sentencepiece== 0.2.0 protobuf==5.27.2 bitsandbytes==0.43.3 psutil==6.0.0 +PyYAML==6.0.2 From 79b560ea0efdf6d923443e579111f2538e4e0fe3 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 10:06:12 -0300 Subject: [PATCH 02/15] runner: A couple typing fixes --- runner/app/pipelines/base.py | 3 ++- runner/app/pipelines/live_video_to_video.py | 13 +++++++------ runner/app/routes/live_video_to_video.py | 5 ++--- runner/app/routes/utils.py | 4 ++-- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/runner/app/pipelines/base.py b/runner/app/pipelines/base.py index a154a6f6..06a6d181 100644 --- a/runner/app/pipelines/base.py +++ b/runner/app/pipelines/base.py @@ -5,8 +5,9 @@ class Pipeline(ABC): @abstractmethod def __init__(self, model_id: str, model_dir: str): + self.model_id: str # type hint so we can use the field in routes raise NotImplementedError("Pipeline should implement an __init__ method") @abstractmethod - def __call__(self, inputs: Any) -> Any: + def __call__(self, **kwargs) -> Any: raise NotImplementedError("Pipeline should implement a __call__ method") diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index c8d75e1e..b20e5a51 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -26,20 +26,21 @@ def __init__(self, model_id: str): self.monitor_thread = None self.log_thread = None - def __call__( - self, **kwargs + + def __call__( # type: ignore + self, *, subscribe_url: str, publish_url: str, params: dict, **kwargs ): try: if not self.process: self.start_process( pipeline=self.model_id, # we use the model_id as the pipeline name for now http_port=8888, - subscribe_url=kwargs["subscribe_url"], - publish_url=kwargs["publish_url"], - initial_params=json.dumps(kwargs["params"]), + subscribe_url=subscribe_url, + publish_url=publish_url, + initial_params=json.dumps(params), # TODO: set torch device from self.torch_device ) - logger.info(f"Starting stream, subscribe={kwargs['subscribe_url']} publish={kwargs['publish_url']}") + logger.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}") return except Exception as e: raise InferenceError(original_exception=e) diff --git a/runner/app/routes/live_video_to_video.py b/runner/app/routes/live_video_to_video.py index 9fb1b02d..4522a544 100644 --- a/runner/app/routes/live_video_to_video.py +++ b/runner/app/routes/live_video_to_video.py @@ -1,7 +1,6 @@ import logging import os -import random -from typing import Annotated, Dict, Tuple, Union +from typing import Annotated, Any, Dict, Tuple, Union import torch import traceback @@ -59,7 +58,7 @@ class LiveVideoToVideoParams(BaseModel): ), ] -RESPONSES = { +RESPONSES: dict[int | str, dict[str, Any]]= { status.HTTP_200_OK: { "content": { "application/json": { diff --git a/runner/app/routes/utils.py b/runner/app/routes/utils.py index 477696f3..2d580074 100644 --- a/runner/app/routes/utils.py +++ b/runner/app/routes/utils.py @@ -228,9 +228,9 @@ def json_str_to_np_array( def handle_pipeline_exception( e: object, - default_error_message: Union[str, Dict[str, object]] = "Pipeline error.", + default_error_message: Union[str, Dict[str, object], None] = "Pipeline error.", default_status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR, - custom_error_config: Dict[str, Tuple[str, int]] = None, + custom_error_config: Dict[str, Tuple[str | None, int]] | None = None, ) -> JSONResponse: """Handles pipeline exceptions by returning a JSON response with the appropriate error message and status code. From b43933c2273c0cb01ac5bebc126627c131d71419 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 10:06:38 -0300 Subject: [PATCH 03/15] runner: Improve documentation on live_video_to_video route --- runner/app/routes/live_video_to_video.py | 12 +++++------- runner/gateway.openapi.yaml | 13 +++++++++---- runner/openapi.yaml | 13 +++++++++---- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/runner/app/routes/live_video_to_video.py b/runner/app/routes/live_video_to_video.py index 4522a544..1955eefa 100644 --- a/runner/app/routes/live_video_to_video.py +++ b/runner/app/routes/live_video_to_video.py @@ -47,14 +47,14 @@ class LiveVideoToVideoParams(BaseModel): model_id: Annotated[ str, Field( - default="", description="Hugging Face model ID used for image generation." + default="", description="Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument." ), ] params: Annotated[ Dict, Field( default={}, - description="Initial parameters for the model." + description="Initial parameters for the pipeline." ), ] @@ -77,9 +77,9 @@ class LiveVideoToVideoParams(BaseModel): "/live-video-to-video", response_model=LiveVideoToVideoResponse, responses=RESPONSES, - description="Apply video-like transformations to a provided image.", + description="Apply transformations to a live video streamed to the returned endpoints.", operation_id="genLiveVideoToVideo", - summary="Video To Video", + summary="Live Video To Video", tags=["generate"], openapi_extra={"x-speakeasy-name-override": "liveVideoToVideo"}, ) @@ -112,10 +112,8 @@ async def live_video_to_video( ), ) - seed = random.randint(0, 2**32 - 1) - kwargs = {k: v for k, v in params.model_dump().items()} try: - pipeline(**kwargs) + pipeline(**params.model_dump()) except Exception as e: if isinstance(e, torch.cuda.OutOfMemoryError): torch.cuda.empty_cache() diff --git a/runner/gateway.openapi.yaml b/runner/gateway.openapi.yaml index 27629f0d..96396d55 100644 --- a/runner/gateway.openapi.yaml +++ b/runner/gateway.openapi.yaml @@ -415,8 +415,9 @@ paths: post: tags: - generate - summary: Video To Video - description: Apply video-like transformations to a provided image. + summary: Live Video To Video + description: Apply transformations to a live video streamed to the returned + endpoints. operationId: genLiveVideoToVideo requestBody: content: @@ -927,12 +928,16 @@ components: model_id: type: string title: Model Id - description: Hugging Face model ID used for image generation. + description: Name of the pipeline to run in the live video to video job. + Notice that this is named model_id for consistency with other routes, + but it does not refer to a Hugging Face model ID. The exact model depends + on the pipeline implementation and might be configurable via the `params` + argument. default: '' params: type: object title: Params - description: Initial parameters for the model. + description: Initial parameters for the pipeline. default: {} type: object required: diff --git a/runner/openapi.yaml b/runner/openapi.yaml index 7a6121c8..51195b73 100644 --- a/runner/openapi.yaml +++ b/runner/openapi.yaml @@ -426,8 +426,9 @@ paths: post: tags: - generate - summary: Video To Video - description: Apply video-like transformations to a provided image. + summary: Live Video To Video + description: Apply transformations to a live video streamed to the returned + endpoints. operationId: genLiveVideoToVideo requestBody: content: @@ -944,12 +945,16 @@ components: model_id: type: string title: Model Id - description: Hugging Face model ID used for image generation. + description: Name of the pipeline to run in the live video to video job. + Notice that this is named model_id for consistency with other routes, + but it does not refer to a Hugging Face model ID. The exact model depends + on the pipeline implementation and might be configurable via the `params` + argument. default: '' params: type: object title: Params - description: Initial parameters for the model. + description: Initial parameters for the pipeline. default: {} type: object required: From 50dd6328671e989b44ed2b5130317e34be4bc809 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 11:08:16 -0300 Subject: [PATCH 04/15] worker: Implement API for live-video-to-video (naive) --- worker/docker.go | 31 ++++++--- worker/runner.gen.go | 150 ++++++++++++++++++++++--------------------- worker/worker.go | 51 +++++++++++++++ 3 files changed, 149 insertions(+), 83 deletions(-) diff --git a/worker/docker.go b/worker/docker.go index 54d0f290..6ebc5776 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -30,15 +30,16 @@ const containerCreator = "ai-worker" // This only works right now on a single GPU because if there is another container // using the GPU we stop it so we don't have to worry about having enough ports var containerHostPorts = map[string]string{ - "text-to-image": "8000", - "image-to-image": "8100", - "image-to-video": "8200", - "upscale": "8300", - "audio-to-text": "8400", - "llm": "8500", - "segment-anything-2": "8600", - "image-to-text": "8700", - "text-to-speech": "8800", + "text-to-image": "8000", + "image-to-image": "8100", + "image-to-video": "8200", + "upscale": "8300", + "audio-to-text": "8400", + "llm": "8500", + "segment-anything-2": "8600", + "image-to-text": "8700", + "text-to-speech": "8800", + "live-video-to-video": "8900", } // Mapping for per pipeline container images. @@ -47,6 +48,12 @@ var pipelineToImage = map[string]string{ "text-to-speech": "livepeer/ai-runner:text-to-speech", } +var livePipelineToImage = map[string]string{ + "streamdiffusion": "livepeer/ai-runner:live-app-streamdiffusion", + "liveportrait": "livepeer/ai-runner:live-app-liveportrait", + "comfyui": "livepeer/ai-runner:live-app-comfyui", +} + type DockerManager struct { defaultImage string gpus []string @@ -174,6 +181,12 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo containerImage := m.defaultImage if pipelineSpecificImage, ok := pipelineToImage[pipeline]; ok { containerImage = pipelineSpecificImage + } else if pipeline == "live-video-to-video" { + // We currently use the model ID as the live pipeline name for legacy reasons + containerImage = livePipelineToImage[modelID] + if containerImage == "" { + return nil, fmt.Errorf("no container image found for live pipeline %s", modelID) + } } slog.Info("Starting managed container", slog.String("gpu", gpu), slog.String("name", containerName), slog.String("modelID", modelID), slog.String("containerImage", containerImage)) diff --git a/worker/runner.gen.go b/worker/runner.gen.go index ea23ca2c..9c74058a 100644 --- a/worker/runner.gen.go +++ b/worker/runner.gen.go @@ -244,10 +244,10 @@ type LLMResponse struct { // LiveVideoToVideoParams defines model for LiveVideoToVideoParams. type LiveVideoToVideoParams struct { - // ModelId Hugging Face model ID used for image generation. + // ModelId Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model depends on the pipeline implementation and might be configurable via the `params` argument. ModelId *string `json:"model_id,omitempty"` - // Params Initial parameters for the model. + // Params Initial parameters for the pipeline. Params *map[string]interface{} `json:"params,omitempty"` // PublishUrl Destination URL of the outgoing stream to publish. @@ -2230,7 +2230,7 @@ type ServerInterface interface { // Image To Video // (POST /image-to-video) GenImageToVideo(w http.ResponseWriter, r *http.Request) - // Video To Video + // Live Video To Video // (POST /live-video-to-video) GenLiveVideoToVideo(w http.ResponseWriter, r *http.Request) // LLM @@ -2284,7 +2284,7 @@ func (_ Unimplemented) GenImageToVideo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) } -// Video To Video +// Live Video To Video // (POST /live-video-to-video) func (_ Unimplemented) GenLiveVideoToVideo(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) @@ -2667,76 +2667,78 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xde28bN7b/KsTcCzgBJFt2m+bCwP7hJG1jXLsNLHvTIjUEauZoxHqGnCU5lrS5/u4X", - "PJwHOQ89XNvd7eqvyBqS5/07h+QZ5WsQijQTHLhWwenXQIVzSCl+PPt0/r2UQprPEahQskwzwYNT84SA", - "eUQkqExwBSQVESSHwSDIpMhAaga4Rqri9vTrORTTU1CKxmDmaaYTCE6DSxWbv1aZ+UNpyXgcPDwMAgn/", - "yJmEKDj9gqve1lMqRqt5Yvo7hDp4GARnecTEVcFlm5Urj38yE5JQM4PEwEFSM6otFI7AD0ny8yw4/fI1", - "+G8Js+A0+K+jWptHhSqPLiFi9ObqIni4HXRooqAEkaV82JLWknPl9WTqEPqdiFaTGDgOvBbXsNSG3R4p", - "fJZuskTQqOSGzFgCRAsyBaIl5WbkFCKjk5mQKdXBaTBlnMpV0OCvbcRBkIKmEdXUUp3RPDHzvz4ETb2c", - "RREzH2lCfhdTwrglxgQveMmoUhCZP/QcSMYySBj3/aik1cWHMfaERT4fLS4+5nHMeEx+oGHpIOcfSG4I", - "G0cp9ZGVXlKRtkOjLtISdC75RLMUlKZppnwetMyhxccVziH1HEt+7pmEaFjqQzLOs0xI4033NMlBnZID", - "BVwDD+FgQA4WQkYHA2LcnFimyFSIBCgnrw4M8QPz7GBGEwUHrw/JB8sZYYoUj1/V670+LEeSFChXhAuH", - "ycOCWvHMfB5OKVqtHuNorZDyutbMJhhoBUaX368Jj/OUxnAt8J92fMQ5iygPYaJCmoBnpreHb5o2+p6H", - "Ipc0BlV4iq4wBAhL8UGYCAXJiiSM39XOa+xGMinSTJNXcxbPQRa2IyldEQlRHhZLkH/kNGF69drV248F", - "n2SMfFby8jydgjTyslLAnki3a2thOGezFVkwPW/FVX+4W/11+DquO1mjx+O2Hj9ALAGZWcxZaNmoEdJy", - "yhTJcjVHFS6ojBSOYpxpRhM75rDJH9mspkRIqjZAwhm5EFdn5NWFWAyvKL8jZxHNNCLT68LwlEeEaUVC", - "IW12jEyULYDFc42Ba4VwEgz5fknTLIFT8pX8FiRUA9fDUHDFlAm01VESpkPD3VBFy+S34JQcH44G5LeA", - "g2S/q6OMLSEZUqmH5dOTB1cBFyjYs+FgS54toZBDTDW7h4l1/g1MXNdh8kq9xvDKWQRkMafa/AXLMMkj", - "IDMp0g4Vn8dcSONBM+I7JPktH42+Ccmxy/ZPBWvkk2Wti/s8ndi4nmQgu2Q4borwE7oaEbMSEFyMyEAW", - "4nmM5Ck5t4M/gWyxw7iG2Hov8sNnIAFF09BILcejUT8/EXDBlLExTjwkl0KC/UxyldPEoBZQxKwCogoo", - "KkWZ5pqoRCxAkooLs0yUJxi505XJN8BjPW/JV44nY+S6SzpXvdt4xTqf7LepojPQq0k4h/DOU55JfU3t", - "fQJpMNEkUpxGcBq6otIsRdyfNbHLwEKeRKaEEbMZcGWcTEgypzKd5YnL5tiu+h6ZqZgtsjVyCxC1NTKG", - "Iiwl5ZFIicW3HlWYwZ36Lm3laWF0+D89cC1mthSpyzSaZQmrk5yE0sbWMq9G5smxl8jGJc0WNjfyflYa", - "0Ca2jgLAy+ybK4DuAnnrtFmJ/mSZ8wkL1Mok28LyH0LjfpJ9Udew7SaTblnT/Z1FINomnTVA8buuDdlM", - "0hQUArKCUPAI3durQ+7N8q50P/Tg1hzTvkfzzdtOqnYkYZxgOldbEP1oF++iu7XvVvmH2vUxf/6pXmvZ", - "2L2cSIUZPZnm4R3oJhfHJ2+bbNyUBI2JcbdpmDIqp6nIuTYGsGtW2y23oECb2VRoHhUwaz6mJncWMxcs", - "SQzYM46PWia8tMPeIdOeYG5qF0zBhObxpAeWRyetOrUSAScTGkU1GHsC23KZfPQ2HsWmQ4KCdJpg2dw7", - "1xa8PJRAVSm3l+KRgbM8Jv0Av7l8OXnzb1y97OuKUhMLFjW893h08m0XHuLIneDwM67dprpjhrGpY02K", - "ubi4bGeWOVNayJUPfV9uXbQuRnRBF11OtLgD3vT57xykoEtybcd0KbYXe3dL+VvUyFoCTT0yeAbk13E0", - "7XatldKQTqpT4Q4+xziEdB4DDwINaWbsn0toYODbeolrZ9CWtWSHOxgzr/GCMcQpcH3GV3rOeHzSdomp", - "WHYcnZMEYYR8S6iUdEVidg+cUEUomYpleRBUoC1adWCi4Jdff/mV2Jzs+vw7sew9eWkTPy+zvrLMPzbP", - "U3U3YTzLdad8YjGUoESSY2ozgwkObgilVxkLEZtxy05JJuGeiVyZDxELcTbTBboM6toa0fF4+XH5mbz6", - "+LfPfzt58x0C0/js0ttPXBrK58jmv9zZR5onBsvV3UTkulLkmqxwbnZYOQxqDdraQhZnw3OzDTML2sNh", - "mk5ZnBtlWtVbt1IDImYauPkzykM8/QWtQRYz9Zxyk3cYjxNwzOBJVXJOfracd8U5N06VsH/CJBRCRmo3", - "8TLBuCY4k3GqQVVlVLVuvbGkPAbyZTQ4vi1cBGcXdAksMwi1HT4FO0CCMl+ar6z5IpaajCm48uuWghZ5", - "b2XoEtQl1g6Gn5YnRZSLWSFVYYhGLCzmIIEADQv2CTOGI69+Gfz6us6B3nYKhzU5cyAdGUvoFJIOxi7w", - "+6qu9VgruTkmjEcsRP1TMxRiKXIeFaNN1TfyhkxpeOcOabNrya65FklEzPQO3mKnKZLzoYkANReJqXPR", - "Pe1ahHGlTe0nZoZFxDh83nH1cGGpt+28bQXRyglr8sdNVp2HP/LY4YlP658GEHMrVvT4U+ENG4G3b/6D", - "jjG30ub+PHPTvmPn88MyODvi9/0853dddU9oHuA2xRgTo5LWV53tLgJdHDq2tz64QLHfwVVdEf0DMKcy", - "Lin1rFk+bi3MNKSGoQeHRrVWRQjTWEuT2h1oGHN0aRXVocGP19efehpLzKMtO0si0JQl23dhVE0i7S6M", - "D7gURAVlp9ug3YxRkHXkrMXpkfXvNGERLldJ3SdKaYu1kjTXcwxnJemymsttc4EuvoEmev6+BA6fX6Wp", - "zhs3pT//r3eSjwO6znrrs8uaQAd9zFI7te10IWRHYlXd4dGENTcwNjf3uCawt3WbAqfg5bYxb11Tj3MA", - "vpNiEE3W6aUfiGqtYHPJRhRqokMDEbok6BD04uLSFdBnVjpP6vKtuZgDi3hkMjH1iTvFnqSQG7VNvpDO", - "+s5yjmQuy10SsXvAY6XidOkTldS6VqNb7k/dk2YVVxVtkxQapXfRZYGDQYOsdxEVUFeFiV2wQyFZPk2Y", - "mk9ymbQ974OpN7i9Rry5uigTlsh1LGxtJ4GmpjAplvFo2q/IjUw6K6J8avulukmPRS5DcKkyHorUp1qt", - "QbR3/jiuvu8k3nAqnxNfJa5ndbvOFk62E0gkpm7DE9WhFsOuKxjfU5/WgESLP8+EL23BdUBxSdWd2slw", - "dm55oNdjLXdb3cR4SRcDknPnZKU+91HklZ36ug5yw6HfXORvmv1jwo3ptrUeqqDT7qGQfekb9XFgNiN8", - "xiLchNnhyDeeq/gkvbi1C29sOi4YU+XwQqu3Dd7X2herhY7NQ2oelMYMBdeU2Rs2p/OVTkWuG5cgOK9t", - "cK5mizaZz3PQ5X2lJbigiswSGscQEarIT+MfPnvbWrPM9ls1YwnzxKYi93K5orjVJVFnXJvFTVDbw6la", - "hJBys/ukYQhK2c7kksA2QWxDV1lWUG2uPdFcfXa8ubroMqVhEk+0bQNjL5e+xV5a5qaURpgOQZ++2sR9", - "p9qm3rRb1O1LcbvhfGjsQNul+OCZK95BKeOtP3sdMJjnRb9SX3H41+lHfsqmmFa375qmmH2D777B96/b", - "4PvmP7q/l4zBbEk1ELxazfD+0l614VHswf8dGNdQ1esx01XPpnV/mv6SXTwt/N6yi6fdt9FOob15dpwB", - "hPO+ROtJ4ULWGUkNnqgM6B1IEoHZNUtlbJwY8E9WBJaZBIV2M2mCcjR1ZOZAOC9vBYzToa+aryMcmTEd", - "YuS0ttLlX0Z3JWmzhdUARbll/rLrd9vRWeQZm4234WRdtqiLsvUpwjY0GJJrSfXWa76/eK7Q4TAbD+8T", - "EXon95SvituIpoRfWz59++Dm8JD6FqrK1aJdqq7X8aXZTh3iF/VQ5Jlcm283la5GDkuqGOmE1hYXBrsf", - "OW0+ZLKN2JsK9bJt2Yz19go7nt839whlZ7dlYsN5fsGqq7P1Zz2I0GEumV6NDStWzo/X15/eAZUgq3ex", - "EdbtV9Uic62z4MGswfis4x3es+L9i7B6ZVbmnJydVy0CKvDPpzKDJWfn5CrnHAkZXLNrjQ5HhyOjEJEB", - "pxkLToNvDo8PR8ZaVM+R7SN8E3OoxbAM4kyormxeva7qvF1se2GK3ZbICm84j8xWovkqp1E5KP1ORNhp", - "GQpuSmk8OsesT6U+Mml3WL5lbM28yQm63ht98E1scjx+YQ2KYp+MRg0uHK0f/a5s/tiOBW+DiLQbiTvH", - "zf4sT0g9bBB8+4Qs1BeZHfTf0YhcWe1buscvQ/eG01zPhWT/hAgJH3/zMoQLYcn3XJsy+FoIckFlbLV+", - "/OalpK8LVkQqi+WGhZOTJ2WhdancZqYeQqqL5zcv5X/nXIPkNCFjkPcgSw4cGMWc6wLol9uH20Gg8jSl", - "clX+LAG5FqQsDWisDHaXqcSg93JoSyyqVkNOUxiKe5CSRYj8HjoMgqM53mHj2Qyg7D562Svu4BlBw71E", - "3xYzHlyVFCyiNFiJGwyvGtC6Qfwsy5JV2YXmve6FSE7NvsvkZKe2b6F64/28Z4Z1j9oL47p/rb8H9n5g", - "3wParoBm2/mvBal6OndENOYHhgsCWxRyeL5hcWBzHee/vvkyAf9n1HFdPS77qP8XL+f20PNo6HlkLcW8", - "CHWB5756c7sTeX7sel95p6KjfL/vZTDIUnthEPLPHvbwsy86niHyq/dkHxf6ZWAMgqOE3cPQbz7btP2w", - "oxN29/g9SLMvay0kPN4ePQ18L4wIvU1oe3DYg8PTgQP61h8Bh6QZlBYgknSLigBvoHK85aYkoTzODVJV", - "F7xtAMAXvLeL+eVwsVgMsRzIZQI8FJG9Xt2tKDAkXzryndb0fbDvg/3pgr34gYRdIzxJbVAXLctDWrwt", - "Ozzpj/HixdqiQRbfjaZ8TW7veBH3mcv9FsUXDnO/9Xgf6PtAf7pAL6OvdG5y8oi4V+0AGQRHJmdvcefw", - "Y6NzFTf9TqOq6kQBpyPomYr7ds/R/nphH/Z/kbDHbqs/cLugnfDzgt32bW11xudPcX+T3v6UuLC/q1dt", - "93XdIUZ55LTqeT/U3oMUthfsWaHCazd7Yazw/9uAPVbsseLpsaIKoceBRTEd0SJ3fiCnEyaKH+modgJk", - "uip/hxJfndOK1L9D1hn29c98PPPuoCS0rw72Ef8XiXjnJ3J2DPXcDQaFDCgk1/iNsrIv9X0i8oi8F2ma", - "c6ZX5EeqYUFXQfGiKHbDqtOjo0gCTYexfXqYFNMPQzMd26971h9rrCr6lq0WUjjuiGbsaAqaHlXyPtw+", - "/H8AAAD//y5cwOBHaQAA", + "H4sIAAAAAAAC/+x9a3MbN7L2X0HN+1bJriKpS+L4lKr2g2wnserIjkuS10k5Ki440xzCmgFmAYxIro/+", + "+yk05gLMgDdHUvZk+ckUB0Df0E83Gj301ygWeSE4cK2i06+RimeQU/x49uH8RymFNJ8TULFkhWaCR6fm", + "CQHziEhQheAKSC4SyEbRICqkKEBqBrhGrtL+9OsZVNNzUIqmYOZppjOITqN3KjV/LQvzh9KS8TS6vx9E", + "Ev5ZMglJdPoZV71ppzSMNvPE5AvEOrofRGdlwsRlxWWflUuPfzIVklAzg6TAQVIzqi8UjsAPWfbLNDr9", + "/DX6/xKm0Wn0/w5bbR5Wqjx8BwmjHy8vovubQUATFSVILOVRT1pLzpXXkykg9CuRLMcpcBx4La5hoQ27", + "K6TwWfpYZIImNTdkyjIgWpAJEC0pNyMnkBidTIXMqY5OownjVC6jDn99Iw6iHDRNqKaW6pSWmZn/9T7q", + "6uUsSZj5SDPyRUwI45YYE7zipaBKQWL+0DMgBSsgY9zfRzWtEB/G2GOW+Hz0uHhbpinjKfmJxvUGOX9D", + "SkPYbJRaH0W9SxrSdmgSIi1Bl5KPNctBaZoXyudByxJ6fFziHNLOseRnnkmIhoUekauyKIQ0u+mOZiWo", + "U3KggGvgMRwMyMFcyORgQMw2J5YpMhEiA8rJswND/MA8O5jSTMHB8xF5YzkjTJHq8bN2veejeiTJgXJF", + "uHCYHFXUqmfm83BC0WrtGEdrlZTXrWY2wUDPMUL7fo17nOc0hWuB//T9Iy1ZQnkMYxXTDDwzvRy96Nro", + "Rx6LUtIUVLVTdIMhQFiOD+JMKMiWJGP8tt28xm6kkCIvNHk2Y+kMZGU7ktMlkZCUcbUE+WdJM6aXz129", + "/VzxSa6Qz0ZeXuYTkEZeVgu4wtPt2loYztl0SeZMz3p+tdrdrf4Cex3XHa/R43Ffj28glYDMzGcstmy0", + "CGk5ZYoUpZqhCudUJgpHMc40o5kdM+ryRzarKROSqg2QcEYuxOUZeXYh5sNLym/JWUILjcj0vDI85Qlh", + "WpFYSBsdE+Nlc2DpTKPjWiGcAEN+XNC8yOCUfCW/RxnVwPUwFlwxZRxteZjF+dBwN1TJIvs9OiXHo6MB", + "+T3iINkXdViwBWRDKvWwfnpy7yrgAgV7NBzsybMlFHJIqWZ3MLabfwMT162bPFPP0b1KlgCZz6g2f8Ei", + "zsoEyFSKPKDi85QLaXbQlPgbkvxeHh19F5Njl+33FWvkg2UtxH2Zj61fjwuQIRmOuyK8x61GxLQGBBcj", + "CpCVeB4jZU7O7eAPIHvsMK4htbsX+eFTkICiaeiEluOjo9X8JMAFU8bGOHFE3gkJ9jMpVUkzg1pAEbMq", + "iKqgqBZlUmqiMjEHSRouzDJJmaHnTpYm3gBP9awnXz2eXCHXIelc9W6zK9btydU2VXQKejmOZxDfesoz", + "oa+rvQ8gDSaaQIrTCE7Drag0yxH3p13sMrBQZolJYcR0ClyZTSYkmVGZT8vMZfPKrvoamWmYraI1cguQ", + "9DVyBZVbSsoTkROLbytUYQYH9V3bytPC0ei/VsC1mNpUpE3TaFFkrA1yEmobW8s8OzJPjr1AdlXT7GFz", + "J+4XtQFtYAskAF5k35wBhBPkrcNmI/qDRc4HTFAbk2wLy38IjVeTXOV1HdtuMumWOd3fWQKib9JpBxR/", + "CB3IppLmoBCQFcSCJ7i9vTzkzizvSvfTCtyaYdj3aL54GaRqRxLGCYZztQXRt3bxEN2t924Tf6hdH+Pn", + "n7prLRu7pxO5MKPHkzK+Bd3l4vjkZZeNjzVBY2I8bRqmjMppLkqujQHsms1xy00o0GY2FJpHFcyaj7mJ", + "ndXMOcsyA/aM46OeCd/ZYa+QaU8wN7QLpmBMy3S8ApaPTnp5aiMCTiY0SVow9gS26TJ56x08qkOHBAX5", + "JMO0eeVcm/DyWAJVtdxeiEcGzsqUrAb4zenLyYv/w9nLPq+oNTFnSWf3Hh+dfB/CQxy5Exx+wrX7VHeM", + "MDZ0rAkxFxfv+pFlxpQWculD3+cbF62rESHooouxFrfAu3v+Bwcp6IJc2zEhxa7E3t1C/hY5spZAc48M", + "1oD8PI7m4a21VBrycVMVDvB5hUNIsAw8iDTkhbF/KaGDgS/bJa6dQVvmkoHtYMy8ZhdcQZoD12d8qWeM", + "pyf9LTERi0DpnGQII+R7QqWkS5KyO+CEKkLJRCzqQlCFtmjVgfGCX3/79TdiY7K751+JxcrKS5/4eR31", + "lWX+W+M8VbdjxotSB+UT86EEJbISQ5sZTHBwRyi9LFiM2IxHdkoKCXdMlMp8SFiMs5mu0GXQ5taIjseL", + "t4tP5Nnbv33628mLHxCYrs7eeeeJd4byObL5b1f7yMvMYLm6HYtSN4pcExXOzQmrhEGrQZtbyKo2PDPH", + "MLOgLQ7TfMLS0ijTqt5uKzUgYqqBmz+TMsbqL2gNspqpZ5SbuMN4moFjBk+qmnPyi+U85OfcbKqM/QvG", + "sRAyUbuJVwjGNcGZjFMNqkmjmnXbgyXlKZDPR4Pjm2qL4OyKLoFFAbG2wydgB0hQ5kvzlTVfwnITMQVX", + "ft5S0SKvrQwhQV1ifWd4vzipvFxMK6kqQ3R8YT4DCQRoXLFPmDEcefbr4LfnbQz0jlM4rMuZA+nIWEYn", + "kAUYu8Dvm7zWY63m5pgwnrAY9U/NUEilKHlSjTZZ35E3ZELjW3dIn11Lds21SCZSpnfYLXaaIiUfGg9Q", + "M5GZPBe3p12LMK60yf3E1LCIGIfPA1cPF5Z6387bZhC9mLAmfnwsmnr4N5YdHrha/zCAWFqxkm+vCm84", + "CLx88R9UxtxKm/t65qZzx871w9o5A/77elby21DeE5sHeEwxxkSvpO1VZ7+LQFdFx/7RBxeozju4qiui", + "XwBzMuOa0oo168e9hZmG3DB079Bo1moIYRjraVK7Aw1jji6togIafHt9/WFFY4l5tGVnSQKasmz7Loym", + "SaTfhfEGl4Kkoux0G/SbMSqyjpytOCtk/TvNWILLNVKvEqW2xVpJuus5hrOShKzmcttdIMQ30EzPXtfA", + "4fOrNNVl56b0l//2Kvk4IFTrbWuXLYEAfYxSO7XthBAyEFhV2D26sOY6xubmHtcE9rZuk+NUvNx05q1r", + "6nEK4DspBtFknV5WA1GrFWwu2YhCXXToIEJIgoCgFxfvXAF9ZqXzpE3fuos5sIglk7HJT9wptpJCPqpt", + "4oV01neWcyRzWQ5JxO4Ay0pVdekDldRurU633HYp2HuaQw3jddJn4rQseV1ozkwctpV0LaoPX8RkRN4L", + "zWIwJz1N9MwcMxThNDdJc0W8ronX3Qc2lAltsiUpSg1qgGkS0yQRoAgX2l7vGUqUBPNDWy6HBY119V0C", + "BfBEEcF9MVheZGCS6OoakSckx4uRCdaepywtJZ1kRjiKM/9RoDL/QahMy7qosVXRqzFCo2oTAzsnjaqp", + "BAeDBukcmgJNZ5VhA1ugKCcZU7NxKbO+r70xGRa3En+8vKhtK0qdCpvNSqC50W+1jEfTfkU+yiyYA5YT", + "2yEWJn0lShmDS5XxWOQ+1WYNor2K61XzfZB4x418TnyVuL4UdpYt3GonWGw9ZKjFMHTp5PvmwxqQaPHn", + "mfCpLbgOGt9Rdat2MpydW5cwV1jLLSR0o5qk8wEpuVNLaitdijyzU583fo6lMb+dyi8T+IXRjQlGbz1U", + "QdDusZCrEhbUx4GymJjgsdMOR76xkuST9PzWLryxzbpiTNXDK63edHhfa1/MjwLHpdw8qI0ZC64ps3eK", + "Tq8vnYhSd659cF7f4FxN530yn2ag6xtaS3BOFZlmNE0hIVSR91c/ffIO8maZ7Q+nxhLmia1/uNfpDcWt", + "rsWCfm0WN05ty3GtCDHlJhrSOAalbC92TWAbJ7auqywrqDbXnmiuVXb8eHkRMqVhEmv4tmVzJZe+xZ5a", + "5q6URpiAoA+fX+NJW22TYdtD+faHD3vEvu+cufuHj8Ej5/iDWsYbf/Y6YDDPqw6tVenwX6cD+yHbgHr9", + "zWvagPYtzfuW5r9uS/OL/+iOZnIF5lSqgeBlcoE3tvZyEU/sB/9zYLaGal4ImizbK8f9/cGf1rfUw+8t", + "+5b6nSr9ELoyzl4VAPFsVaD1pHAh64zkBk9UAfQWJEnAnJqlMjbODPhnSwKLQoJCu5kwQTmaOjFzIJ7V", + "9yBm0+FeNV8nOLJgOkbP6R2l67+M7mrS5girAap0y/xl1w/b0VnkEdurt+FkXbRok7L1IcK2cBiSa0mt", + "zNf8/eJthcCG2XhdkYnYu6ugfFndv3Ql/Nrb0zf3bgyPqW+hJl2tGsTafB1fEw7qEL9ohyLP5Np8uyl1", + "NXJYUtVIx7W2uCLZveS0uchkW883Jep1o7YZ650Vdryx6J4R6l52y8SGG4yKVVdn62s9iNBxKZleXhlW", + "rJxvr68/vAIqQTZvnyOs26+aRWZaF9G9WYPxaeCt5bPqjZO4eUlYlpycnTdVWhX59anCYMnZObksOUdC", + "BtfsWkejo9GRUYgogNOCRafRd6Pj0ZGxFtUzZPsQ3z0dajGsnbgQKhTNmxd0nfepbfdPddoSRbUbzhNz", + "lOi+vGpUDkq/Egn2lsaCm1QaLwsw6lOpD03YHdbvVVszb9oEoTdl730TmxiPX1iDotgnR0cdLhytH35R", + "Nn5sx4J3QETancBd4mF/WmakHTaIvn9AFtqr2wD9VzQhl1b7lu7x09D9yGmpZ0Kyf0GChI+/exrClbDk", + "R65NGnwtBLmgMrVaP37xVNK3CSsilcVyw8LJyYOy0LtG7zPTDiHNVfuLp9p/51yD5DQjVyDvQNYcODCK", + "MdcF0M839zeDSJV5TuWy/iEGci1InRrQVBnsrkOJQe/F0KZYVC2HnOYwFHcgJUsQ+T10GESHM7y1x9oM", + "oOw+etlL/egRQcNtG9gWM+5dlVQsojSYiRsMb1ruwiB+VhTZsu67815wU/bOs5DCxGQnt++heueNxEeG", + "dY/aE+O638iwB/bVwL4HtF0Bzb7AcC1I08W6I6Ix3zFcENgikcP6hsWBzXmc/8Lq0zj8n5HHhbp69l7/", + "b57O7aHnm6HnG3Mp5nmoCzx3zbvqQeT5OfSG9k5JR/1G49NgkKX2xCDk1x728LNPOh7B85s3g7/N9WvH", + "GESHGbuDod98tun4ETx4OG2ettXL/cUVXUoOCQGe4OtYKggR3V6ttTDx7TZa0dT3xCixsjFtDxh7wHg4", + "wDDbzILFH0GNrOuZFjmyfItUAa+mSrz+piSjPC0NhDU3v30UwHfdt3P8xXA+nw8xTyhlBjwWib133S1b", + "MCSf2v2dLv29x+89/gE93v5WxK4enuXWqate5iGtXhwenqz28eod46pzFl8Tp3zNGSDwTvIjnwN6FJ/Y", + "zf2e5L2j7x394Ry99r56c5OTb/B71XeQQXRoYvYWlxE/d1pasRrgdLCG03ynVeiRMvx+M9L+3mHv9n8R", + "t8c2rD9w7aAd9/Oc3TZ0bVX886e4P89vf1W9fpOyLgvqtnWM8sTp4fN+s34FUtgmsUeFCq8P7Ymxwv8f", + "FPZYsceKh8eKxoW+DSyq6YgWpfNbQUGYqH6vpDkJkMmy/klOfKdOK9L+JFvQ7dtfPHnk00FNaJ8d7D3+", + "L+Lxzq8F7ejqpesMChlQSK7zc211w+rrTJQJeS3yvORML8nPVMOcLqPqDVJsk1Wnh4eJBJoPU/t0lFXT", + "R7GZjn3ZK9a/0phVrFq2WUjhuENasMMJaHrYyHt/c/+/AQAA///i62OfUmoAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/worker/worker.go b/worker/worker.go index 76d1d489..6634c8dd 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -567,6 +567,57 @@ func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONReques return resp.JSON200, nil } +func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) { + c, err := w.borrowContainer(ctx, "live-video-to-video", *req.ModelId) + if err != nil { + return nil, err + } + defer w.returnContainer(c) + + resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) + if err != nil { + return nil, err + } + + if resp.JSON400 != nil { + val, err := json.Marshal(resp.JSON400) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 400", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 400: " + resp.JSON400.Detail.Msg) + } + + if resp.JSON401 != nil { + val, err := json.Marshal(resp.JSON401) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 401", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 401: " + resp.JSON401.Detail.Msg) + } + + if resp.JSON422 != nil { + val, err := json.Marshal(resp.JSON422) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 422", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 422: " + string(val)) + } + + if resp.JSON500 != nil { + val, err := json.Marshal(resp.JSON500) + if err != nil { + return nil, err + } + slog.Error("live-video-to-video container returned 500", slog.String("err", string(val))) + return nil, errors.New("live-video-to-video container returned 500: " + resp.JSON500.Detail.Msg) + } + + return resp.JSON200, nil +} + func (w *Worker) Warm(ctx context.Context, pipeline string, modelID string, endpoint RunnerEndpoint, optimizationFlags OptimizationFlags) error { if endpoint.URL == "" { return w.manager.Warm(ctx, pipeline, modelID, optimizationFlags) From a292c35560bb2f3bf70bcf0c29db01d8d6bb233a Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 19:01:50 -0300 Subject: [PATCH 05/15] worker: Stop returning the live containers --- worker/worker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 6634c8dd..0e0c9524 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -568,11 +568,12 @@ func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONReques } func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) { - c, err := w.borrowContainer(ctx, "live-video-to-video", *req.ModelId) + c, err := w.borrowContainer(context.Background(), "live-video-to-video", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) + // Live video containers keep running after the initial request, so we don't return them after the request + // TODO: Make sure we remove the stopped container after it resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) if err != nil { From 4dee2980fe76efae49d15a77691b9b37ef5fa6cd Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 19:04:39 -0300 Subject: [PATCH 06/15] worker: Auto-remove containers on stop The live container will stop automatically and we should auto-remove it to free up space. --- worker/docker.go | 29 ++++++++++++++++++----------- worker/worker.go | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/worker/docker.go b/worker/docker.go index 6ebc5776..eb164319 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -13,7 +13,8 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/client" + docker "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" ) @@ -59,7 +60,7 @@ type DockerManager struct { gpus []string modelDir string - dockerClient *client.Client + dockerClient *docker.Client // gpu ID => container name gpuContainers map[string]string // container name => container @@ -68,7 +69,7 @@ type DockerManager struct { } func NewDockerManager(defaultImage string, gpus []string, modelDir string) (*DockerManager, error) { - dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + dockerClient, err := docker.NewClientWithOpts(docker.FromEnv, docker.WithAPIVersionNegotiation()) if err != nil { return nil, err } @@ -236,6 +237,7 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo }, }, }, + AutoRemove: true, } resp, err := m.dockerClient.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, containerName) @@ -322,7 +324,7 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { return "", errors.New("insufficient capacity") } -func removeExistingContainers(ctx context.Context, client *client.Client) error { +func removeExistingContainers(ctx context.Context, client *docker.Client) error { filters := filters.NewArgs(filters.Arg("label", containerCreatorLabel+"="+containerCreator)) containers, err := client.ContainerList(ctx, container.ListOptions{All: true, Filters: filters}) if err != nil { @@ -348,20 +350,25 @@ func dockerContainerName(pipeline string, modelID string, suffix ...string) stri return fmt.Sprintf("%s_%s", pipeline, sanitizedModelID) } -func dockerRemoveContainer(client *client.Client, containerID string) error { +func dockerRemoveContainer(client *docker.Client, containerID string) error { ctx, cancel := context.WithTimeout(context.Background(), containerRemoveTimeout) - if err := client.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { - cancel() + err := client.ContainerStop(ctx, containerID, container.StopOptions{}) + cancel() + // Ignore "not found" or "already stopped" errors + if err != nil && !docker.IsErrNotFound(err) && !errdefs.IsNotModified(err) { return err } - cancel() ctx, cancel = context.WithTimeout(context.Background(), containerRemoveTimeout) - defer cancel() - return client.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + err = client.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + cancel() + if err != nil && !docker.IsErrNotFound(err) { + return err + } + return nil } -func dockerWaitUntilRunning(ctx context.Context, client *client.Client, containerID string, pollingInterval time.Duration) error { +func dockerWaitUntilRunning(ctx context.Context, client *docker.Client, containerID string, pollingInterval time.Duration) error { ticker := time.NewTicker(pollingInterval) defer ticker.Stop() diff --git a/worker/worker.go b/worker/worker.go index 0e0c9524..0a7dffe1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -573,7 +573,7 @@ func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJS return nil, err } // Live video containers keep running after the initial request, so we don't return them after the request - // TODO: Make sure we remove the stopped container after it + // TODO: Make sure we release the allocated GPU after the container stops resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) if err != nil { From cad1bccc4d90ec99f55b1feab3f69899600afdd6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 19:16:47 -0300 Subject: [PATCH 07/15] worker: Create function for destroy container logic --- worker/docker.go | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/worker/docker.go b/worker/docker.go index eb164319..c4c15459 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -105,21 +105,15 @@ func (m *DockerManager) Stop(ctx context.Context) error { defer m.mu.Unlock() var stopContainerWg sync.WaitGroup - for name, rc := range m.containers { + for _, rc := range m.containers { stopContainerWg.Add(1) - go func(containerID string) { + go func(container *RunnerContainer) { defer stopContainerWg.Done() - if err := dockerRemoveContainer(m.dockerClient, containerID); err != nil { - slog.Error("Error removing managed container", slog.String("name", name), slog.String("id", containerID)) - } - }(rc.ID) - - delete(m.gpuContainers, rc.GPU) - delete(m.containers, name) + m.destroyContainer(container) + }(rc) } stopContainerWg.Wait() - return nil } @@ -308,15 +302,9 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { // If the container exists in this map then it is idle and if it not marked as keep warm we remove it rc, ok := m.containers[containerName] if ok && !rc.KeepWarm { - slog.Info("Removing managed container", slog.String("gpu", gpu), slog.String("name", containerName), slog.String("modelID", rc.ModelID)) - - delete(m.gpuContainers, gpu) - delete(m.containers, containerName) - - if err := dockerRemoveContainer(m.dockerClient, rc.ID); err != nil { + if err := m.destroyContainer(rc); err != nil { return "", err } - return gpu, nil } } @@ -324,6 +312,27 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { return "", errors.New("insufficient capacity") } +func (m *DockerManager) destroyContainer(rc *RunnerContainer) error { + slog.Info("Removing managed container", + slog.String("gpu", rc.GPU), + slog.String("name", rc.Name), + slog.String("modelID", rc.ModelID)) + + delete(m.gpuContainers, rc.GPU) + delete(m.containers, rc.Name) + + if err := dockerRemoveContainer(m.dockerClient, rc.ID); err != nil { + slog.Error("Error removing managed container", + slog.String("gpu", rc.GPU), + slog.String("name", rc.Name), + slog.String("modelID", rc.ModelID), + slog.String("error", err.Error())) + return fmt.Errorf("failed to remove container %s: %w", rc.Name, err) + } + + return nil +} + func removeExistingContainers(ctx context.Context, client *docker.Client) error { filters := filters.NewArgs(filters.Arg("label", containerCreatorLabel+"="+containerCreator)) containers, err := client.ContainerList(ctx, container.ListOptions{All: true, Filters: filters}) From 4b314bc941ab4a8e721c4515e0fdc8e486d6aacc Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 19:33:12 -0300 Subject: [PATCH 08/15] worker: Watch running containers and cleanup when stopped --- worker/docker.go | 37 +++++++++++++++++++++++++++++++++++++ worker/worker.go | 4 ++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/worker/docker.go b/worker/docker.go index c4c15459..9145430f 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -27,6 +27,7 @@ const optFlagsContainerTimeout = 5 * time.Minute const containerRemoveTimeout = 30 * time.Second const containerCreatorLabel = "creator" const containerCreator = "ai-worker" +const containerWatchInterval = 10 * time.Second // This only works right now on a single GPU because if there is another container // using the GPU we stop it so we don't have to worry about having enough ports @@ -284,6 +285,8 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo m.containers[containerName] = rc m.gpuContainers[gpu] = containerName + go m.watchContainer(ctx, rc) + return rc, nil } @@ -333,6 +336,40 @@ func (m *DockerManager) destroyContainer(rc *RunnerContainer) error { return nil } +// watchContainer monitors a container's running state and automatically cleans +// up when the container stops or the context is cancelled. The cleanup is done +// by making sure to remove the container and update the internal state. +func (m *DockerManager) watchContainer(ctx context.Context, rc *RunnerContainer) { + defer func() { + if r := recover(); r != nil { + slog.Error("Panic in container watch routine", + slog.String("container", rc.Name), + slog.Any("panic", r)) + } + }() + defer func() { + m.mu.Lock() + m.destroyContainer(rc) + m.mu.Unlock() + }() + + ticker := time.NewTicker(containerWatchInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + break + case <-ticker.C: + container, err := m.dockerClient.ContainerInspect(ctx, rc.ID) + if err != nil || !container.State.Running { + break + } + continue + } + } +} + func removeExistingContainers(ctx context.Context, client *docker.Client) error { filters := filters.NewArgs(filters.Arg("label", containerCreatorLabel+"="+containerCreator)) containers, err := client.ContainerList(ctx, container.ListOptions{All: true, Filters: filters}) diff --git a/worker/worker.go b/worker/worker.go index 0a7dffe1..7a952fd9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -572,8 +572,8 @@ func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJS if err != nil { return nil, err } - // Live video containers keep running after the initial request, so we don't return them after the request - // TODO: Make sure we release the allocated GPU after the container stops + // Live video containers keep running after the initial request, so we don't return them after the request. + // TODO: Make sure the container stops after the stream is done. resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) if err != nil { From 94e17d3cae1e2585f5181899a9a9ee9b6ae5f943 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 21:07:57 -0300 Subject: [PATCH 09/15] runner/live: Automatically stop infer.py when input stops --- runner/app/live/infer.py | 24 +++++++++++++++--------- runner/app/live/params_api/api.py | 2 +- runner/app/live/streamer/streamer.py | 19 +++++++++++++++++-- runner/app/live/streamer/trickle.py | 5 +++-- runner/app/live/streamer/zeromq.py | 5 +++-- 5 files changed, 39 insertions(+), 16 deletions(-) diff --git a/runner/app/live/infer.py b/runner/app/live/infer.py index cb9428ba..ca37a169 100644 --- a/runner/app/live/infer.py +++ b/runner/app/live/infer.py @@ -17,11 +17,11 @@ from streamer.zeromq import ZeroMQStreamer -async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, pipeline: str, params: dict): +async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, pipeline: str, input_timeout: int, params: dict): if stream_protocol == "trickle": - handler = TrickleStreamer(subscribe_url, publish_url, pipeline, **(params or {})) + handler = TrickleStreamer(subscribe_url, publish_url, pipeline, input_timeout, params or {}) elif stream_protocol == "zeromq": - handler = ZeroMQStreamer(subscribe_url, publish_url, pipeline, **(params or {})) + handler = ZeroMQStreamer(subscribe_url, publish_url, pipeline, input_timeout, params or {}) else: raise ValueError(f"Unsupported protocol: {stream_protocol}") @@ -34,14 +34,14 @@ async def main(http_port: int, stream_protocol: str, subscribe_url: str, publish logging.error(f"Stack trace:\n{traceback.format_exc()}") raise e - await block_until_signal([signal.SIGINT, signal.SIGTERM]) try: + await asyncio.wait( + [block_until_signal([signal.SIGINT, signal.SIGTERM]), handler.wait()], + return_when=asyncio.FIRST_COMPLETED + ) + finally: await runner.cleanup() await handler.stop() - except Exception as e: - logging.error(f"Error stopping room handler: {e}") - logging.error(f"Stack trace:\n{traceback.format_exc()}") - raise e async def block_until_signal(sigs: List[signal.Signals]): @@ -81,6 +81,12 @@ def signal_handler(sig, _): parser.add_argument( "--publish-url", type=str, required=True, help="URL to publish output frames (trickle). For zeromq this is the output socket address" ) + parser.add_argument( + "--input-timeout", + type=int, + default=60, + help="Timeout in seconds to wait after input frames stop before shutting down. Set to 0 to disable." + ) parser.add_argument( "-v", "--verbose", action="store_true", @@ -103,7 +109,7 @@ def signal_handler(sig, _): try: asyncio.run( - main(args.http_port, args.stream_protocol, args.subscribe_url, args.publish_url, args.pipeline, params) + main(args.http_port, args.stream_protocol, args.subscribe_url, args.publish_url, args.pipeline, args.input_timeout, params) ) except Exception as e: logging.error(f"Fatal error in main: {e}") diff --git a/runner/app/live/params_api/api.py b/runner/app/live/params_api/api.py index 70ff7ee9..3636c00c 100644 --- a/runner/app/live/params_api/api.py +++ b/runner/app/live/params_api/api.py @@ -59,7 +59,7 @@ async def handle_params_update(request): raise ValueError(f"Unknown content type: {request.content_type}") handler = cast(PipelineStreamer, request.app["handler"]) - handler.update_params(**params) + handler.update_params(params) return web.Response(text="Params updated successfully") except Exception as e: diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 91d853ac..8efed14c 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -15,18 +15,28 @@ class PipelineStreamer(ABC): - def __init__(self, pipeline: str, **params): + def __init__(self, pipeline: str, input_timeout: int, params: dict): self.pipeline = pipeline self.params = params self.process = None self.last_params_time = 0.0 self.restart_count = 0 + self.input_timeout = input_timeout # 0 means disabled + self.done_future = None def start(self): + self.done_future = asyncio.get_running_loop().create_future() self._start_process() + async def wait(self): + if not self.done_future: + raise RuntimeError("Streamer not started") + return await self.done_future + async def stop(self): await self._stop_process() + if self.done_future and not self.done_future.done(): + self.done_future.set_result(None) def _start_process(self): if self.process: @@ -68,7 +78,7 @@ async def _restart(self): logging.error(f"Stack trace:\n{traceback.format_exc()}") os._exit(1) - def update_params(self, **params): + def update_params(self, params: dict): self.params = params self.last_params_time = time.time() if self.process: @@ -92,6 +102,11 @@ async def monitor_loop(self, done: Event): time_since_last_params = current_time - self.last_params_time time_since_reload = min(time_since_last_params, time_since_start) + if self.input_timeout > 0 and time_since_last_input >= self.input_timeout: + logging.info(f"Input stream stopped for {self.input_timeout} seconds. Shutting down...") + await self.stop() + return + gone_stale = ( time_since_last_output > time_since_last_input and time_since_last_output > 60 diff --git a/runner/app/live/streamer/trickle.py b/runner/app/live/streamer/trickle.py index acb73709..ed904619 100644 --- a/runner/app/live/streamer/trickle.py +++ b/runner/app/live/streamer/trickle.py @@ -17,9 +17,10 @@ def __init__( subscribe_url: str, publish_url: str, pipeline: str, - **params, + input_timeout: int, + params: dict, ): - super().__init__(pipeline, **params) + super().__init__(pipeline, input_timeout, params) self.subscribe_url = subscribe_url self.publish_url = publish_url self.subscribe_queue = queue.Queue[bytearray]() diff --git a/runner/app/live/streamer/zeromq.py b/runner/app/live/streamer/zeromq.py index c0a234ac..e11c1ca5 100644 --- a/runner/app/live/streamer/zeromq.py +++ b/runner/app/live/streamer/zeromq.py @@ -15,9 +15,10 @@ def __init__( input_address: str, output_address: str, pipeline: str, - **params, + input_timeout: int, + params: dict, ): - super().__init__(pipeline, **params) + super().__init__(pipeline, input_timeout, params) self.input_address = input_address self.output_address = output_address From 7459c3b5711697982a51017f5e9c9c9aa4c1f027 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 13 Nov 2024 21:13:44 -0300 Subject: [PATCH 10/15] runner: Exit runner process on clean infer.py exit --- runner/app/pipelines/live_video_to_video.py | 25 ++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index b20e5a51..b819f3a4 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -30,18 +30,19 @@ def __init__(self, model_id: str): def __call__( # type: ignore self, *, subscribe_url: str, publish_url: str, params: dict, **kwargs ): + if self.process: + raise RuntimeError("Pipeline already running") + try: - if not self.process: - self.start_process( - pipeline=self.model_id, # we use the model_id as the pipeline name for now - http_port=8888, - subscribe_url=subscribe_url, - publish_url=publish_url, - initial_params=json.dumps(params), - # TODO: set torch device from self.torch_device - ) logger.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}") - return + self.start_process( + pipeline=self.model_id, # we use the model_id as the pipeline name for now + http_port=8888, + subscribe_url=subscribe_url, + publish_url=publish_url, + initial_params=json.dumps(params), + # TODO: set torch device from self.torch_device + ) except Exception as e: raise InferenceError(original_exception=e) @@ -83,6 +84,10 @@ def monitor_process(self): logger.error( f"infer.py process failed with return code {return_code}. Error: {stderr}" ) + else: + # If process exited cleanly (return code 0) and exit the main process + logger.info("infer.py process exited cleanly, shutting down...") + sys.exit(0) break logger.info("infer.py process is running...") From 3c4a2e4a54ba178d8c806a1b0752d1ecdad50f66 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 14 Nov 2024 11:08:50 -0300 Subject: [PATCH 11/15] runner: Improve Pipeline.model_id comment --- runner/app/pipelines/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/app/pipelines/base.py b/runner/app/pipelines/base.py index 06a6d181..e2f32d78 100644 --- a/runner/app/pipelines/base.py +++ b/runner/app/pipelines/base.py @@ -5,7 +5,7 @@ class Pipeline(ABC): @abstractmethod def __init__(self, model_id: str, model_dir: str): - self.model_id: str # type hint so we can use the field in routes + self.model_id: str # declare the field here so the type hint is available when using this abstract class raise NotImplementedError("Pipeline should implement an __init__ method") @abstractmethod From 4d5adbef48f81fc60489daf24367d8e452319dde Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 14 Nov 2024 12:56:26 -0300 Subject: [PATCH 12/15] worker: Avoid stopping the container when borrow ctx is done --- worker/docker.go | 52 +++++++++++++++++++++++++++++++----------------- worker/worker.go | 52 +++++++++++++++++++++++++----------------------- 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/worker/docker.go b/worker/docker.go index 9145430f..3324c3c1 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -97,8 +97,15 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin m.mu.Lock() defer m.mu.Unlock() - _, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags) - return err + rc, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags) + if err != nil { + return err + } + + // Watch with a background context since we're not borrowing the container. + go m.watchContainer(rc, context.Background()) + + return nil } func (m *DockerManager) Stop(ctx context.Context) error { @@ -139,10 +146,14 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (* // Remove container so it is unavailable until Return() is called delete(m.containers, rc.Name) + go m.watchContainer(rc, ctx) + return rc, nil } -func (m *DockerManager) Return(rc *RunnerContainer) { +// returnContainer returns a container to the pool so it can be reused. It is called automatically by watchContainer +// when the context used to borrow the container is done. +func (m *DockerManager) returnContainer(rc *RunnerContainer) { m.mu.Lock() defer m.mu.Unlock() m.containers[rc.Name] = rc @@ -285,8 +296,6 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo m.containers[containerName] = rc m.gpuContainers[gpu] = containerName - go m.watchContainer(ctx, rc) - return rc, nil } @@ -337,9 +346,9 @@ func (m *DockerManager) destroyContainer(rc *RunnerContainer) error { } // watchContainer monitors a container's running state and automatically cleans -// up when the container stops or the context is cancelled. The cleanup is done -// by making sure to remove the container and update the internal state. -func (m *DockerManager) watchContainer(ctx context.Context, rc *RunnerContainer) { +// up the internal state when the container stops. It will also monitor the +// borrowCtx to return the container to the pool when it is done. +func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Context) { defer func() { if r := recover(); r != nil { slog.Error("Panic in container watch routine", @@ -347,25 +356,32 @@ func (m *DockerManager) watchContainer(ctx context.Context, rc *RunnerContainer) slog.Any("panic", r)) } }() - defer func() { - m.mu.Lock() - m.destroyContainer(rc) - m.mu.Unlock() - }() ticker := time.NewTicker(containerWatchInterval) defer ticker.Stop() for { select { - case <-ctx.Done(): - break + case <-borrowCtx.Done(): + m.returnContainer(rc) + return case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), containerWatchInterval) container, err := m.dockerClient.ContainerInspect(ctx, rc.ID) - if err != nil || !container.State.Running { - break + cancel() + if err != nil { + slog.Error("Error inspecting container", + slog.String("container", rc.Name), + slog.String("error", err.Error())) + continue + } else if container.State.Running { + continue } - continue + + m.mu.Lock() + defer m.mu.Unlock() + m.destroyContainer(rc) + return } } } diff --git a/worker/worker.go b/worker/worker.go index 7a952fd9..f1893d69 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -63,11 +63,12 @@ func NewWorker(defaultImage string, gpus []string, modelDir string) (*Worker, er } func (w *Worker) TextToImage(ctx context.Context, req GenTextToImageJSONRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "text-to-image", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) resp, err := c.Client.GenTextToImageWithResponse(ctx, req) if err != nil { @@ -114,11 +115,12 @@ func (w *Worker) TextToImage(ctx context.Context, req GenTextToImageJSONRequestB } func (w *Worker) ImageToImage(ctx context.Context, req GenImageToImageMultipartRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-image", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToImageMultipartWriter(&buf, req) @@ -171,11 +173,12 @@ func (w *Worker) ImageToImage(ctx context.Context, req GenImageToImageMultipartR } func (w *Worker) ImageToVideo(ctx context.Context, req GenImageToVideoMultipartRequestBody) (*VideoResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-video", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToVideoMultipartWriter(&buf, req) @@ -233,11 +236,12 @@ func (w *Worker) ImageToVideo(ctx context.Context, req GenImageToVideoMultipartR } func (w *Worker) Upscale(ctx context.Context, req GenUpscaleMultipartRequestBody) (*ImageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "upscale", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewUpscaleMultipartWriter(&buf, req) @@ -290,11 +294,12 @@ func (w *Worker) Upscale(ctx context.Context, req GenUpscaleMultipartRequestBody } func (w *Worker) AudioToText(ctx context.Context, req GenAudioToTextMultipartRequestBody) (*TextResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "audio-to-text", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewAudioToTextMultipartWriter(&buf, req) @@ -362,7 +367,9 @@ func (w *Worker) AudioToText(ctx context.Context, req GenAudioToTextMultipartReq } func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interface{}, error) { - c, err := w.borrowContainer(ctx, "llm", *req.ModelId) + isStreaming := req.Stream != nil && *req.Stream + borrowCtx, cancel := context.WithCancel(context.Background()) + c, err := w.borrowContainer(borrowCtx, "llm", *req.ModelId) if err != nil { return nil, err } @@ -378,16 +385,19 @@ func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interf var buf bytes.Buffer mw, err := NewLLMMultipartWriter(&buf, req) if err != nil { + cancel() return nil, err } - if req.Stream != nil && *req.Stream { + if isStreaming { resp, err := c.Client.GenLLMWithBody(ctx, mw.FormDataContentType(), &buf) if err != nil { + cancel() return nil, err } - return w.handleStreamingResponse(ctx, c, resp) + return w.handleStreamingResponse(ctx, c, resp, cancel) } + defer cancel() resp, err := c.Client.GenLLMWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf) if err != nil { @@ -397,11 +407,12 @@ func (w *Worker) LLM(ctx context.Context, req GenLLMFormdataRequestBody) (interf } func (w *Worker) SegmentAnything2(ctx context.Context, req GenSegmentAnything2MultipartRequestBody) (*MasksResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "segment-anything-2", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewSegmentAnything2MultipartWriter(&buf, req) @@ -454,11 +465,12 @@ func (w *Worker) SegmentAnything2(ctx context.Context, req GenSegmentAnything2Mu } func (w *Worker) ImageToText(ctx context.Context, req GenImageToTextMultipartRequestBody) (*ImageToTextResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "image-to-text", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) var buf bytes.Buffer mw, err := NewImageToTextMultipartWriter(&buf, req) @@ -517,11 +529,12 @@ func (w *Worker) ImageToText(ctx context.Context, req GenImageToTextMultipartReq } func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONRequestBody) (*AudioResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() c, err := w.borrowContainer(ctx, "text-to-speech", *req.ModelId) if err != nil { return nil, err } - defer w.returnContainer(c) resp, err := c.Client.GenTextToSpeechWithResponse(ctx, req) if err != nil { @@ -568,12 +581,11 @@ func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONReques } func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) { + // Live video containers keep running after the initial request, so we use a background context to borrow the container. c, err := w.borrowContainer(context.Background(), "live-video-to-video", *req.ModelId) if err != nil { return nil, err } - // Live video containers keep running after the initial request, so we don't return them after the request. - // TODO: Make sure the container stops after the stream is done. resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req) if err != nil { @@ -693,17 +705,7 @@ func (w *Worker) borrowContainer(ctx context.Context, pipeline, modelID string) return w.manager.Borrow(ctx, pipeline, modelID) } -func (w *Worker) returnContainer(rc *RunnerContainer) { - switch rc.Type { - case Managed: - w.manager.Return(rc) - case External: - // Noop because we allow concurrent in-flight requests for external containers - } -} - func (w *Worker) handleNonStreamingResponse(c *RunnerContainer, resp *GenLLMResponse) (*LLMResponse, error) { - defer w.returnContainer(c) if resp.JSON400 != nil { val, err := json.Marshal(resp.JSON400) if err != nil { @@ -740,7 +742,7 @@ type LlmStreamChunk struct { Done bool `json:"done,omitempty"` } -func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer, resp *http.Response) (<-chan LlmStreamChunk, error) { +func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer, resp *http.Response, returnContainer func()) (<-chan LlmStreamChunk, error) { if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } @@ -749,7 +751,7 @@ func (w *Worker) handleStreamingResponse(ctx context.Context, c *RunnerContainer go func() { defer close(outputChan) - defer w.returnContainer(c) + defer returnContainer() scanner := bufio.NewScanner(resp.Body) totalTokens := 0 From ce7973942ab1feb838deb5a9987e690be44079d7 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 14 Nov 2024 17:15:23 -0300 Subject: [PATCH 13/15] worker+runner: Fix generated files --- runner/gateway.openapi.yaml | 2 +- runner/openapi.yaml | 2 +- worker/runner.gen.go | 64 ++++++++++++++++++------------------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/runner/gateway.openapi.yaml b/runner/gateway.openapi.yaml index 96396d55..50d8cec9 100644 --- a/runner/gateway.openapi.yaml +++ b/runner/gateway.openapi.yaml @@ -930,7 +930,7 @@ components: title: Model Id description: Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, - but it does not refer to a Hugging Face model ID. The exact model depends + but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument. default: '' diff --git a/runner/openapi.yaml b/runner/openapi.yaml index 51195b73..4b8e3446 100644 --- a/runner/openapi.yaml +++ b/runner/openapi.yaml @@ -947,7 +947,7 @@ components: title: Model Id description: Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, - but it does not refer to a Hugging Face model ID. The exact model depends + but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument. default: '' diff --git a/worker/runner.gen.go b/worker/runner.gen.go index 9c74058a..7dfa4f49 100644 --- a/worker/runner.gen.go +++ b/worker/runner.gen.go @@ -244,7 +244,7 @@ type LLMResponse struct { // LiveVideoToVideoParams defines model for LiveVideoToVideoParams. type LiveVideoToVideoParams struct { - // ModelId Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model depends on the pipeline implementation and might be configurable via the `params` argument. + // ModelId Name of the pipeline to run in the live video to video job. Notice that this is named model_id for consistency with other routes, but it does not refer to a Hugging Face model ID. The exact model(s) depends on the pipeline implementation and might be configurable via the `params` argument. ModelId *string `json:"model_id,omitempty"` // Params Initial parameters for the pipeline. @@ -2708,37 +2708,37 @@ var swaggerSpec = []string{ "4fOrNNVl56b0l//2Kvk4IFTrbWuXLYEAfYxSO7XthBAyEFhV2D26sOY6xubmHtcE9rZuk+NUvNx05q1r", "6nEK4DspBtFknV5WA1GrFWwu2YhCXXToIEJIgoCgFxfvXAF9ZqXzpE3fuos5sIglk7HJT9wptpJCPqpt", "4oV01neWcyRzWQ5JxO4Ay0pVdekDldRurU633HYp2HuaQw3jddJn4rQseV1ozkwctpV0LaoPX8RkRN4L", - "zWIwJz1N9MwcMxThNDdJc0W8ronX3Qc2lAltsiUpSg1qgGkS0yQRoAgX2l7vGUqUBPNDWy6HBY119V0C", - "BfBEEcF9MVheZGCS6OoakSckx4uRCdaepywtJZ1kRjiKM/9RoDL/QahMy7qosVXRqzFCo2oTAzsnjaqp", - "BAeDBukcmgJNZ5VhA1ugKCcZU7NxKbO+r70xGRa3En+8vKhtK0qdCpvNSqC50W+1jEfTfkU+yiyYA5YT", - "2yEWJn0lShmDS5XxWOQ+1WYNor2K61XzfZB4x418TnyVuL4UdpYt3GonWGw9ZKjFMHTp5PvmwxqQaPHn", - "mfCpLbgOGt9Rdat2MpydW5cwV1jLLSR0o5qk8wEpuVNLaitdijyzU583fo6lMb+dyi8T+IXRjQlGbz1U", - "QdDusZCrEhbUx4GymJjgsdMOR76xkuST9PzWLryxzbpiTNXDK63edHhfa1/MjwLHpdw8qI0ZC64ps3eK", - "Tq8vnYhSd659cF7f4FxN530yn2ag6xtaS3BOFZlmNE0hIVSR91c/ffIO8maZ7Q+nxhLmia1/uNfpDcWt", - "rsWCfm0WN05ty3GtCDHlJhrSOAalbC92TWAbJ7auqywrqDbXnmiuVXb8eHkRMqVhEmv4tmVzJZe+xZ5a", - "5q6URpiAoA+fX+NJW22TYdtD+faHD3vEvu+cufuHj8Ej5/iDWsYbf/Y6YDDPqw6tVenwX6cD+yHbgHr9", - "zWvagPYtzfuW5r9uS/OL/+iOZnIF5lSqgeBlcoE3tvZyEU/sB/9zYLaGal4ImizbK8f9/cGf1rfUw+8t", - "+5b6nSr9ELoyzl4VAPFsVaD1pHAh64zkBk9UAfQWJEnAnJqlMjbODPhnSwKLQoJCu5kwQTmaOjFzIJ7V", - "9yBm0+FeNV8nOLJgOkbP6R2l67+M7mrS5girAap0y/xl1w/b0VnkEdurt+FkXbRok7L1IcK2cBiSa0mt", - "zNf8/eJthcCG2XhdkYnYu6ugfFndv3Ql/Nrb0zf3bgyPqW+hJl2tGsTafB1fEw7qEL9ohyLP5Np8uyl1", - "NXJYUtVIx7W2uCLZveS0uchkW883Jep1o7YZ650Vdryx6J4R6l52y8SGG4yKVVdn62s9iNBxKZleXhlW", - "rJxvr68/vAIqQTZvnyOs26+aRWZaF9G9WYPxaeCt5bPqjZO4eUlYlpycnTdVWhX59anCYMnZObksOUdC", - "BtfsWkejo9GRUYgogNOCRafRd6Pj0ZGxFtUzZPsQ3z0dajGsnbgQKhTNmxd0nfepbfdPddoSRbUbzhNz", - "lOi+vGpUDkq/Egn2lsaCm1QaLwsw6lOpD03YHdbvVVszb9oEoTdl730TmxiPX1iDotgnR0cdLhytH35R", - "Nn5sx4J3QETancBd4mF/WmakHTaIvn9AFtqr2wD9VzQhl1b7lu7x09D9yGmpZ0Kyf0GChI+/exrClbDk", - "R65NGnwtBLmgMrVaP37xVNK3CSsilcVyw8LJyYOy0LtG7zPTDiHNVfuLp9p/51yD5DQjVyDvQNYcODCK", - "MdcF0M839zeDSJV5TuWy/iEGci1InRrQVBnsrkOJQe/F0KZYVC2HnOYwFHcgJUsQ+T10GESHM7y1x9oM", - "oOw+etlL/egRQcNtG9gWM+5dlVQsojSYiRsMb1ruwiB+VhTZsu67815wU/bOs5DCxGQnt++heueNxEeG", - "dY/aE+O638iwB/bVwL4HtF0Bzb7AcC1I08W6I6Ix3zFcENgikcP6hsWBzXmc/8Lq0zj8n5HHhbp69l7/", - "b57O7aHnm6HnG3Mp5nmoCzx3zbvqQeT5OfSG9k5JR/1G49NgkKX2xCDk1x728LNPOh7B85s3g7/N9WvH", - "GESHGbuDod98tun4ETx4OG2ettXL/cUVXUoOCQGe4OtYKggR3V6ttTDx7TZa0dT3xCixsjFtDxh7wHg4", - "wDDbzILFH0GNrOuZFjmyfItUAa+mSrz+piSjPC0NhDU3v30UwHfdt3P8xXA+nw8xTyhlBjwWib133S1b", - "MCSf2v2dLv29x+89/gE93v5WxK4enuXWqate5iGtXhwenqz28eod46pzFl8Tp3zNGSDwTvIjnwN6FJ/Y", - "zf2e5L2j7x394Ry99r56c5OTb/B71XeQQXRoYvYWlxE/d1pasRrgdLCG03ynVeiRMvx+M9L+3mHv9n8R", - "t8c2rD9w7aAd9/Oc3TZ0bVX886e4P89vf1W9fpOyLgvqtnWM8sTp4fN+s34FUtgmsUeFCq8P7Ymxwv8f", - "FPZYsceKh8eKxoW+DSyq6YgWpfNbQUGYqH6vpDkJkMmy/klOfKdOK9L+JFvQ7dtfPHnk00FNaJ8d7D3+", - "L+Lxzq8F7ejqpesMChlQSK7zc211w+rrTJQJeS3yvORML8nPVMOcLqPqDVJsk1Wnh4eJBJoPU/t0lFXT", - "R7GZjn3ZK9a/0phVrFq2WUjhuENasMMJaHrYyHt/c/+/AQAA///i62OfUmoAAA==", + "zWIwJz1N9MwcMxThNDdJc0W8ronX3Qc2lAltsiUpSg1qgGkS0yQRoAgX2l7vGUqUBPNDWy6HBY21/c5k", + "OQkUwBNFBPclYXmRgcmjq5tEnpAc70YmWH6esrSUdJIZ+SjO/EeB+vwHoTIt67rGVnWvxg6Ntk0Y7Bw2", + "qr4SHAwapHNuCvSdVbYN7IKinGRMzcalzPru9sYkWdxK/PHyojavKHUqbEIrgeZGxdUyHk37Ffkos2Aa", + "WE5sk1iY9JUoZQwuVcZjkftUmzWI9oquV833QeIdT/I58VXiulPYX7bwrJ2QsXWSoRbD0L2T754Pa0Ci", + "xZ9nwqe24Dp0fEfVrdrJcHZuXcVcYS23ltANbJLOB6TkTjmpLXYp8sxOfd74OVbH/I4qv1Lg10Y35hi9", + "9VAFQbvHQq7KWVAfB8piYoInTzsc+cZikk/S81u78MZO64oxVQ+vtHrT4X2tfTFFCpyYcvOgNmYsuKbM", + "Xis67b50IkrdufnBeX2DczWd98l8moGuL2ktwTlVZJrRNIWEUEXeX/30yTvLm2W2P58aS5gntgTi3qg3", + "FLe6GQv6tVncOLWtyLUixJSbaEjjGJSy7dg1gW2c2Lqusqyg2lx7orlW2fHj5UXIlIZJLOPbrs2VXPoW", + "e2qZu1IaYQKCPnyKjYdttU2Sbc/l258/7Cn7vnPs7p8/Bo+c5g9qGW/82euAwTyvmrRWZcR/nSbsh+wE", + "6rU4r+kE2nc177ua/7pdzS/+o5uayRWYU6kGgvfJBV7a2vtFPLQf/M+B2RqqeSdosmxvHfdXCH9a61IP", + "v7dsXeo3q/RD6Mo4e1UAxLNVgdaTwoWsM5IbPFEF0FuQJAFzapbK2Dgz4J8tCSwKCQrtZsIE5WjqxMyB", + "eFZfhZhNh3vVfJ3gyILpGD2nd5Su/zK6q0mbI6wGqNIt85ddP2xHZ5FH7LDehpN10aJNytaHCNvFYUiu", + "JbUyX/P3i7cVAhtm441FJmLvuoLyZXUF05Xwa29P39y7MTymvoWadLXqEWvzdXxTOKhD/KIdijyTa/Pt", + "ptTVyGFJVSMd19rilmT3ktPmIpPtPt+UqNe92masd1bY8dKie0ao29ktExsuMSpWXZ2tr/UgQselZHp5", + "ZVixcr69vv7wCqgE2byAjrBuv2oWmWldRPdmDcangReXz6qXTuLmPWFZcnJ23lRpVeTXpwqDJWfn5LLk", + "HAkZXLNrHY2ORkdGIaIATgsWnUbfjY5HR8ZaVM+Q7UN8/XSoxbB24kKoUDRv3tF1Xqm2DUDVaUsU1W44", + "T8xRovv+qlE5KP1KJNheGgtuUmm8L8CoT6U+NGF3WL9abc28aROEXpa9901sYjx+YQ2KYp8cHXW4cLR+", + "+EXZ+LEdC94BEWl3AneJh/1pmZF22CD6/gFZaG9vA/Rf0YRcWu1busdPQ/cjp6WeCcn+BQkSPv7uaQhX", + "wpIfuTZp8LUQ5ILK1Gr9+MVTSd8mrIhUFssNCycnD8pC7ya9z0w7hDS37S+eav+dcw2S04xcgbwDWXPg", + "wCjGXBdAP9/c3wwiVeY5lcv6txjItSB1akBTZbC7DiUGvRdDm2JRtRxymsNQ3IGULEHk99BhEB3O8OIe", + "azOAsvvoZe/1o0cEDbdzYFvMuHdVUrGI0mAmbjC86boLg/hZUWTLuvXOe8dN2WvPQgoTk53cvofqnZcS", + "HxnWPWpPjOt+L8Me2FcD+x7QdgU0+w7DtSBNI+uOiMZ8x3BBYItEDusbFgc253H+O6tP4/B/Rh4XauzZ", + "e/2/eTq3h55vhp5vzKWY56Eu8Nw1r6sHkefn0EvaOyUd9UuNT4NBltoTg5Bfe9jDzz7peATPb14O/jbX", + "rx1jEB1m7A6GfvPZpuNH8ODhdHraVi/3R1d0KTkkBHiCb2SpIER0e7XWwsS322hFU98To8TKxrQ9YOwB", + "4+EAw2wzCxZ/BDWyrmda5MjyLVIFvJoq8fqbkozytDQQ1tz89lEAX3ffzvEXw/l8PsQ8oZQZ8Fgk9t51", + "t2zBkHxq93ca9fcev/f4B/R4+3MRu3p4llunrnqZh7R6d3h4strHq9eMq85ZfFOc8jVngMBryY98DuhR", + "fGI393uS946+d/SHc/Ta++rNTU6+we9V30EG0aGJ2VtcRvzcaWnFaoDTwRpO851WoUfK8PvNSPt7h73b", + "/0XcHtuw/sC1g3bcz3N229C1VfHPn+L+Qr/9YfX6Tcq6LKjb1jHKE6eHz/vZ+hVIYZvEHhUqvD60J8YK", + "/z9R2GPFHiseHisaF/o2sKimI1qUzs8FBWGi+smS5iRAJsv6VznxnTqtSPurbEG3b3/05JFPBzWhfXaw", + "9/i/iMc7Pxi0o6uXrjMoZEAhuc4vttUNq68zUSbktcjzkjO9JD9TDXO6jKo3SLFNVp0eHiYSaD5M7dNR", + "Vk0fxWY69mWvWP9KY1axatlmIYXjDmnBDieg6WEj7/3N/f8GAAD//5jY7jlVagAA", } // GetSwagger returns the content of the embedded swagger specification file From aa4f430e0086a84abfbdd6499a8451cc77e5f347 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 19 Nov 2024 19:22:53 -0300 Subject: [PATCH 14/15] Generate runner.gen.go after merge --- worker/runner.gen.go | 144 ++++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 71 deletions(-) diff --git a/worker/runner.gen.go b/worker/runner.gen.go index 3bce8850..4f30087b 100644 --- a/worker/runner.gen.go +++ b/worker/runner.gen.go @@ -2673,77 +2673,79 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xde2/ctpb/KoR2ASfAjF9tmoWB+4eTPmKs0wa2c9MiNQYc6YyGtUTqkpTHc7P+7gse", - "UhKpxzxc2+1t56+MRyTP+3cOyaPJlygWeSE4cK2iky+RiueQU/x4+uHsOymFNJ8TULFkhWaCRyfmCQHz", - "iEhQheAKSC4SyPajUVRIUYDUDHCNXKXd6VdzcNNzUIqmYOZppjOITqL3KjV/LQvzh9KS8TS6vx9FEv5V", - "MglJdPIZV71uptSM1vPE9DeIdXQ/ik7LhIkLx2WXlYuAfzITklAzg6TAQVIzqisUjsAPWfbTLDr5/CX6", - "bwmz6CT6r4NGmwdOlQfvIWH048V5dH896tGEowSJpbzfkdaS8+UNZOoR+o1IlpMUOA68Eldwpw27A1KE", - "LH0sMkGTihsyYxkQLcgUiJaUm5FTSIxOZkLmVEcn0ZRxKpdRi7+uEUdRDpomVFNLdUbLzMz/ch+19XKa", - "JMx8pBn5TUwJ45YYE9zxUlClIDF/6DmQghWQMR76UUWrjw9j7AlLQj46XLwr05TxlHxP48pBzr4lpSFs", - "HKXSR1F5SU3aDk36SEvQpeQTzXJQmuaFCnnQsoQOHxc4hzRzLPl5YBKi4U7vk8uyKIQ03nRLsxLUCdlT", - "wDXwGPZGZG8hZLI3IsbNiWWKTIXIgHLyYs8Q3zPP9mY0U7D3cp98azkjTBH3+EWz3sv9aiTJgXJFuPCY", - "3HfU3DPzeTylaLVmjKc1J+VVo5l1MNAJjD6/XxEeZzlN4UrgP934SEuWUB7DRMU0g8BMr/dftW30HY9F", - "KWkKynmKrjEECMvxQZwJBdmSZIzfNM5r7EYKKfJCkxdzls5BOtuRnC6JhKSM3RLkXyXNmF6+9PX2g+OT", - "XCKftby8zKcgjbysEnAg0u3aWhjO2WxJFkzPO3E1HO5Wfz2+jutOVujxqKvHbyGVgMws5iy2bDQIaTll", - "ihSlmqMKF1QmCkcxzjSjmR2z3+aPrFdTJiRVayDhlJyLi1Py4lwsxheU35DThBYakemlMzzlCWFakVhI", - "mx0TE2ULYOlcY+BaIbwEQ767o3mRwQn5Qn6NMqqB63EsuGLKBNryIIvzseFurJK77NfohBztH47IrxEH", - "yX5TBwW7g2xMpR5XT4/vfQWco2BPhoMdeTaEQg4p1ewWJtb51zBx1YTJC/USw6tkCZDFnGrzF9zFWZkA", - "mUmR96j4LOVCGg+akdAhya/l4eFXMTny2f7RsUY+WNb6uC/ziY3rSQGyT4ajtgg/oqsRMasAwceIAqQT", - "L2CkzMmZHfwBZIcdxjWk1nuRHz4DCSiahlZqOTo8HOYnAS6YMjbGifvkvZBgP5NSlTQzqAUUMctBlIOi", - "SpRpqYnKxAIkqbkwyyRlhpE7XZp8AzzV84581XhyiVz3SeerdxOvWOWTwzZVdAZ6OYnnEN8EyjOpr629", - "DyANJppEitMITkNXVJrliPuzNnYZWCizxJQwYjYDroyTCUnmVOazMvPZvLSrvkVmamZdtkZuAZKuRi7B", - "haWkPBE5sfg2oAozuFffla0CLRzu/88AXIuZLUWaMo0WRcaaJCehsrG1zItD8+QoSGSXFc0ONrfyflEZ", - "0Ca2ngIgyOzrK4D+AnnjtFmL/miZ8xEL1Nokm8Ly70LjYZJDUdey7TqTbljT/ZMlILomnbVA8Zu+DdlM", - "0hwUArKCWPAE3TuoQ27N8r503w/g1hzTfkDz1eteqnYkYZxgOlcbEH1nF++ju7Hv1vmH2vUxf/6hXmvZ", - "2L6cyIUZPZmW8Q3oNhdHx6/bbHysCBoT427TMGVUTnNRcm0MYNest1t+QYE2s6nQPHIwaz7mJne6mQuW", - "ZQbsGcdHHRO+t8PeINOBYH5qF0zBhJbpZACWD487dWotAk4mNEkaMA4EtuUyeRdsPNymQ4KCfJph2Tw4", - "1xa8PJZAVSV3kOKRgdMyJcMAv758OX71H1y97OqKShMLlrS89+jw+Os+PMSRW8HhJ1y7S3XLDGNTx4oU", - "c37+vptZ5kxpIZch9H2+9tHajeiDLno30eIGeNvnv/GQgt6RKzumT7GD2Ltdyt+gRtYSaB6QwTOgsI6j", - "eb9rLZWGfFKfCvfweYlDSO8x8CjSkBfG/qWEFga+bpa48gZtWEv2uIMx8wovuIQ0B65P+VLPGU+Puy4x", - "FXc9R+ckQxghXxMqJV2SlN0CJ1QRSqbirjoIcmiLVh2ZKPj5l59/ITYn+z7/RtwNnrx0iZ9VWV9Z5h+a", - "56m6mTBelLpXPrEYS1AiKzG1mcEEB7eE0suCxYjNuGWnpJBwy0SpzIeExTibaYcuo6a2RnQ8unt394m8", - "ePePT/84fvUNAtPl6ftgP/HeUD5DNv90Zx95mRksVzcTUepakSuywpnZYZUwajRoawvpzobnZhtmFrSH", - "wzSfsrQ0yrSqt26lRkTMNHDzZ1LGePoLWoN0M/WccpN3GE8z8MwQSFVxTn6ynPfFOTdOlbF/wyQWQiZq", - "O/EKwbgmOJNxqkHVZVS9brOxpDwF8vlwdHTtXARnO7oE7gqItR0+BTtAgjJfmq+s+RKWm4wpuArrFkeL", - "vLUy9AnqE+sGw493xy7KxcxJ5QzRioXFHCQQoLFjnzBjOPLi59EvL5scGGyncFibMw/SkbGMTiHrYewc", - "v6/r2oC1ipsjwnjCYtQ/NUMhlaLkiRttqr7DYMiUxjf+kC67luyKa5FMpExv4S12miIlH5sIUHORmToX", - "3dOuRRhX2tR+YmZYRIzD5z1XD+eWetfOm1YQnZywIn98LOrz8AceOzzyaf3jAGJpxUoefiq8ZiPw+tXf", - "6BhzI23uzjPX7Tu2Pj+sgrMnft/OS37TV/fE5gFuU4wxMSppc9XZ7SLQ7tCxu/XBBdx+B1f1RQwPwLzK", - "uKI0sGb1uLMw05Abhu49GvVaNSFMYx1Nan+gYczTpVVUjwbfXV19GGgsMY827CxJQFOWbd6FUTeJdLsw", - "vsWlIHGUvW6DbjOGI+vJ2YgzIOs/acYSXK6WekiUyhYrJWmv5xnOStJnNZ/b9gJ9fAPN9PxtBRwhv0pT", - "XbZuSn/63+AkHwf0nfU2Z5cNgR76mKW2atvpQ8iexKr6w6MNa35grG/u8U1gb+vWBY7j5bo1b1VTj3cA", - "vpViEE1W6WUYiBqtYHPJWhRqo0MLEfok6BH0/Py9L2DIrPSeNOVbezEPFvHIZGLqE3+KPUkhH9Um+UJ6", - "63vLeZL5LPdJxG4Bj5Xc6dIHKql1rVC4WHAtRTYpZbamCvt4cY7GVeUU+39MzXPLKLmSLL7BvZvQIhaZ", - "q8kSLNHdIXRmcjSeno21GLeP20mBzPnGfmvZIh9l9ufbSRe1LmvaJpW1NgyuNwQHgwbZ7H3q9FKXU3bB", - "HjMW5TRjat6YJ0wiSpvdqlGhsY5Ls6LUqbAVqQSam3LKLRPQtF8NabiyMvSTvhSljMGnyngs8pBqvQbR", - "wanpZf19L/FWKISchCrx46Hf4TcIja2gbbUnd5GuE199IYXRUl0FraaweYw8rusQLf4453kS3xkFtlnh", - "Satg9j1VN2orB7Jzq+PQAa/xDyXaGVLSxYiU3DuXak7NFHlhp75swMZwGLZmhUcO4SHr2mKlsx6qoNcL", - "YiGHih/Ux57ZyvEZS3ALa4cj33gqFZIM8MMuvLZl2zGmquFOq9ct3lfaF2utnq1Xbh5UxjSORJm9n/T6", - "hulUlLp1hYTzugbnarbokvk0B13d9lqCC6rILKNpCgmhivx4+f2n4FDALLP5RtdYwjyxKdG/mq8pbnTF", - "1hvlZnET4vZorxEhptzs3Wkcg1K2r7sisElI29BVlhVUm29PNNeQHT9enPeZElFYity1fw5yGVrsuWVu", - "S2mE6RH08Wt13LWrTap1u8HffCNjt+v3rf17dyMzeuL9wqiS8TqcvQoYzHPX7TVUWv91urkfs6Wo0yu9", - "oqVo1x69a4/+67ZHv/pbd0eTSzBbYw0EL6YLe4KAF5V4kL33f3vGNVT9ctF0ObB53t1FPGcPVAe/N+yB", - "6na9dFPoYJ69LADi+VCiDaTwIeuU5AZPVAH0BiRJwOytpTI2zgz4Z0sCd4UEhXYzaYJyNHVi5kA8r+5U", - "jNOhr5qvExxZMB1j5HQ21tVfRncVabOh1QCu3DJ/2fX77egt8oSt2ptwsipbNEXZ6hRh20Hw6HAVqcF6", - "LfSXwBV6HGbt1Ucm4uDeg/Klu8tpS/il49PX934Oj2loobpcdc1mTb2Orxz36hC/aIYiz+TKfLuudDVy", - "WFJupBdaG1y3bH/0tf6wy7axryvUq6ZvMzbYK2x5+9HeI1R98ZaJNbchjlVfZ6vPehCh41Iyvbw0rFg5", - "311dfXgDVIKs32RHWLdf1YvMtS6ie7MG47OeN6BP3dsrcf3CsSw5OT2rGyxUFJ5PFQZLTs/IRck5EjK4", - "Ztc63D/cPzQKEQVwWrDoJPpq/2j/0FiL6jmyfYDvsY61GFdBXAjVl83rl329d7NtJ5HbbYnCecNZYrYS", - "7RdhjcpB6TciWVanocCRkM36VOoDk3bH1Tva1szrnKDvrdv70MQmx+MX1qAo9vHhYYsLT+sHvymbPzZj", - "IdggIu1W4i5xsz8rM9IMG0VfPyILzTVwD/03NCEXVvuW7tHz0P3IaannQrJ/Q4KEj756HsJOWPId16YM", - "vhKCnFOZWq0fvXou6ZuCFZHKYrlh4fj4UVnoXMl3mWmGkPra/tVz+d8Z1yA5zcglyFuQFQcejGLO9QH0", - "8/X99ShSZZ5Tuax+1IFcCVKVBjRVBrurVGLQ+25sSyyqlmNOcxiLW5CSJYj8ATqMooM5dgDg2Qyg7CF6", - "2QaB6AlBw29B2BQz7n2VOBZRGqzEDYbX7Xv9IH5aFNmy6uELXpZDJKdm32Vyslfbd1C99XbjE8N6QO2Z", - "cT1sitgB+zCw7wBtW0CzL0NcCVJ3xG6JaCwMDB8ENijk8HzD4sD6Oi58+fV5Av6PqOP6OoR2Uf8nL+d2", - "0PNg6HlgLcWCCPWB57Z+770XeX7oe9t7q6KjejvyeTDIUntmEArPHnbwsys6niDy67eMHxb6VWCMooOM", - "3cI4bFFbt/2wozN28/A9SLsvayUkPNweA42Ez4wIg01oO3DYgcPjgQP61u8Bh6wdlBYgsnyDigBvoEq8", - "5aYkozwtDVLVF7xdAMDX4zeL+bvxYrEYYzlQygx4LBJ7vbpdUWBIPnfke439u2DfBfvjBbv7eYltIzzL", - "bVC7luUxde8aj4+HY9y9luwaZPE9EMpX5Pae15ifuNzvUHzmMA9bj3eBvgv0xwv0Kvoq5ybHD4h71Q2Q", - "UXRgcvYGdw4/tDpXcdPvNaqqXhTwOoKeqLjv9hztrhd2Yf8XCXvstvodtwvaC78g2G3f1kZnfOEU/xf9", - "7Q+xC/tCaL3d102HGOWJ16oX/Mz9AFLYXrAnhYqg3eyZsSL8Txd2WLHDisfHijqEHgYWbjqiRen9vFAv", - "TLifOKl3AmS6rH7FE1+d04o0v+LWG/bNj6Q88e6gIrSrDnYR/xeJeO8HhrYM9dIPBoUMKCTX+oW3qi/1", - "bSbKhLwVeV5yppfkB6phQZeRe1EUu2HVycFBIoHm49Q+3c/c9P3YTMf264H1LzVWFUPL1gspHHdAC3Yw", - "BU0Pannvr+//PwAA///ptjPPhWoAAA==", + "H4sIAAAAAAAC/+xde3PbuK7/KhzdO5N2xs5rt9s7mTl/pN1HMzftdpL0dHe6mRxagmU2EqlDUrF9evPd", + "7wCUZFKSX90ke86u/6pjkQRAAD+AIOR+iWKVF0qCtCY6+RKZeAI5p4+n789+0Fpp/JyAibUorFAyOsEn", + "DPAR02AKJQ2wXCWQ7UeDqNCqAG0F0Bq5SbvTryZQTc/BGJ4CzrPCZhCdRG9Nin/NC/zDWC1kGt3fDyIN", + "/yyFhiQ6+USrXi+mNIw289ToM8Q2uh9Ep2Ui1EXFZZeVi4B/NlaacZzBUpCgOY7qCkUj6EOW/TyOTj59", + "if5bwzg6if7rYLGbB9VWHryFRPAPF+fR/fWgZycqSpA4yvsdaR05X95Aph6hX6lkfpOCpIFX6gpmFtld", + "IkXI0ociUzypuWFjkQGzio2AWc0ljhxBgnsyVjrnNjqJRkJyPY9a/HWVOIhysDzhljuqY15mOP/LfdTe", + "l9MkEfiRZ+yzGjEhHTGhZMVLwY2BBP+wE2CFKCATMrSjmlYfH6jsG5GEfHS4eFOmqZAp+5HHtYGcfc9K", + "JIyGUu9HUVtJQ9oNTfpIa7ClljdW5GAszwsT8mB1CR0+LmgOW8xx5CeBSpiFmd1nl2VRKI3WdMezEswJ", + "2zMgLcgY9gZsb6p0sjdgaObMMcVGSmXAJXu2h8T38NnemGcG9p7vs+8dZ0wYVj1+tljv+X49kuXApWFS", + "eUzuV9SqZ/h5OOKktcUYb9cqKa8WO7MOBjqO0Wf3K9zjLOcpXCn6p+sfaSkSLmO4MTHPIFDTy/0XbR39", + "IGNVap6CqSzFNhgCTOT0IM6UgWzOMiFvF8aLemOFVnlh2bOJSCegK92xnM+ZhqSMqyXYP0ueCTt/7u/b", + "TxWf7JL4bOSVZT4CjfKKWsAlnu7Wtgo5F+M5mwo76fjVcnd3+9dj67TuzYp9POru4/eQaiBmphMROzYW", + "COk4FYYVpZnQFk65TgyNElJYwTM3Zr/NH1u/TZnS3KyBhFN2ri5O2bNzNR1ecHnLThNeWEKm55XiuUyY", + "sIbFSrvomKCXTUGkE0uO64TwAgz7YcbzIoMT9oX9FmXcgrTDWEkjDDra/CCL8yFyNzTJLPstOmFH+4cD", + "9lskQYvP5qAQM8iGXNth/fT43t+AcxLs0XCwI8+GUCgh5VbcwY0z/jVMXC3c5Jl5Tu5VigTYdMIt/gWz", + "OCsTYGOt8p4tPkul0mhBYxYaJPutPDz8JmZHPtvvKtbYe8daH/dlfuP8+qYA3SfDUVuEd2RqTI1rQPAx", + "ogBdiRcwUubszA1+D7rDjpAWUme9xI8cgwYSzUIrtBwdHi7nJwGphEEd08R99lZpcJ9ZaUqeIWoBJ8yq", + "IKqColqUUWmZydQUNGu4wGWSMiPPHc0x3oBM7aQjXz2eXRLXfdL527uJVayyyeU6NXwMdn4TTyC+DTYP", + "Q197996DRkzEQErTGE0jUzRW5IT74zZ2ISyUWYIpjBqPQRo0MqXZhOt8XGY+m5du1dfETMNsFa2JW4Ck", + "uyOXULml5jJROXP4tmQrcHDvfte6CnbhcP9/lsC1GrtUZJGm8aLIxCLIaah17DTz7BCfHAWB7LKm2cHm", + "VtwvagW6wNaTAASRfX0G0J8gbxw2G9EfLHI+YILaqGRTWP5daLyc5DKva+l2nUo3zOn+LhJQXZWOW6D4", + "Xd+BbKx5DoYA2UCsZELmHeQhd7i8L92PS3BrQmE/oPniZS9VN5IJySicmw2IvnGL99Hd2Hab+MPd+hQ/", + "/1CrdWxsn07kCkffjMr4Fmybi6Pjl202PtQEUcV02kSmcMt5rkppUQFuzea45ScUpDMXCvFRBbP4McfY", + "Wc2ciixDsBeSHnVU+NYNe0VMB4L5oV0JAze8TG+WwPLhcSdPbUSgyYwnyQKMA4FduszeBAeP6tChwUA+", + "yihtXjrXJbwy1sBNLXcQ4omB0zJlywF+ffpy/OI/OHvZ5RX1TkxF0rLeo8Pjb/vwkEZuBYcfae0u1S0j", + "jAsdK0LM+fnbbmSZCGOVnofQ9+naR+tqRB908dmNVbcg2zb/nYcUfMau3Ji+jV2KvduF/A1yZKuB5wEZ", + "qgGFeRzP+01rbizkN01VuIfPSxrCesvAg8hCXqD+Sw0tDHy5WOLKG7RhLtljDqjmFVZwCWkO0p7KuZ0I", + "mR53TWKkZj2lc5YRjLBvGdeaz1kq7kAybhhnIzWrC0EV2pJWB+gFv/z6y6/MxWTf5l+p2dLKS5f4WR31", + "jWP+a+M8N7c3Qhal7ZVPTYcajMpKCm04mNHgllB2XoiYsJmO7JwVGu6EKg1+SERMs4Wt0GWwyK0JHY9m", + "b2Yf2bM3f/v4t+MX3xEwXZ6+Dc4Tb5HyGbH5b1f7yMsMsdzc3qjSNhu5Iiqc4QmrhMFiB11uoava8ASP", + "YbigKw7zfCTSEjfTbb0zKzNgamxB4p9JGVP1F6wFXc20Ey4x7giZZuCpIZCq5pz97Djv83OJRpWJf8FN", + "rJROzHbiFUpIy2imkNyCadKoZt3FwZLLFNinw8HRdWUiNLuiy2BWQGzd8BG4ARoMfolfOfUlIseIqaQJ", + "85aKFnvtZOgT1CfWdYZ3s+PKy9W4kqpSRMsXphPQwIDHFftMoOLYs18Gvz5fxMDgOEXD2px5kE6MZXwE", + "WQ9j5/R9k9cGrNXcHDEhExHT/nMcCqlWpUyq0Zj1HQZDRjy+9Yd02XVkV1yLZCoVdgtrcdMMK+UQPcBM", + "VIZ5LpmnW4sJaSzmfmqMLBLG0fOeq4dzR72r500ziE5MWBE/PhRNPfwryw4PXK1/GEAsnVjJ11eF1xwE", + "Xr74C5UxN9rNXT1z3blj6/ph7Zw9/vt6Usrbvrwnxgd0TEFlklfyxVVnt4vAVkXH7tGHFqjOO7SqL2JY", + "APMy45rSkjXrx52FhYUcGbr3aDRrNYQojHV20voDkTFvL91G9ezgm6ur90saS/DRhp0lCVguss27MJom", + "kW4Xxve0FCQVZa/boNuMUZH15FyIs0TWv/NMJLRcI/UyUWpdrJSkvZ6nOCdJn9Z8btsL9PENPLOT1zVw", + "hPway23Zuin9+X+DSj4N6Kv1LmqXCwI99ClKbdW204eQPYHV9LtHG9Z8x1jf3OOrwN3WrXOcipfr1rxV", + "TT1eAXyrjSE0WbUvy4FosSvUXLIWhdro0EKEPgl6BD0/f+sLGDKrvSeL9K29mAeLVDK5wfzEn+IqKeyD", + "2SReaG99bzlPMp/lPonEHVBZqaouveeaO9MKhYuVtFplN6XO1mRhHy7OSbmmHFH/D+Y8d4KzKy3iWzq7", + "KatilVU5WUIpelWEzjBGU/VsaNWwXW5nBTHnK/u1Y4t90NnvSBzf8Rzq4FOnqphd6FJ2OcMH7sNnNdpn", + "75QVMeD51DI7wcORYZLnmOpXxOtKft0z4QKwspjjaVVaMANK7oRliQLDpLLuUhIpcdab1boiP8x4bN13", + "mJslUIBMDFMylETkRQaY/Vf3nzJhOd3ojKhoPhZpqfkoA1ITzvyH2+h/MK7Tsq7GbFSta6yn2W0M3q0j", + "UtUNQ4PBgvZOez3dcpVF9thuUY4yYSYLmwwjp7F4REeJ0SQr9arSpsql4Rp4jltcLRPQdF8tM6vatKGf", + "9KUqdQw+VSFjlYdUmzWYDUrFl833vcRb/h9yEm6JDwL9Xr4BHmyF56vdtwvvHVDpwxGCiPr+azWFzYHh", + "YU2HWfXHGc+j2M4g0M0KS1oVW95yc2u2MiA3t64BL7EavxLTTgs0nw5YKb1i3KJUaNgzN/V5gzdUWwz7", + "0cI6S1hZXpuhddajLei1gljpZRkf7ceecdic0LndDSe+qRQXkgzwwy28tk+9YszUw6tdvW7xvlK/lGD2", + "nDdzfFArEw2JC3cp6zVL85EqbevejOZ1FS7NeNol83ECtr7idgSn3LBxxtMUEsYNe3f548egEoLLbH66", + "R03gE1dA8vsRGoob3Sv2ejkuji7u6pkLEWIuMSrzOAZjXDN7TWATl3auaxwrtG2+Pkldy/T44eK8T5WE", + "wlrlVc/rUi5DjT21zG0pUZgeQR/+gEKlCrPJEcVVNTY/vbkaxX2raNE9vQ0e+ZA0qGW8DmevAgZ8XrW4", + "LTtP/Hla2B+yj6rTIL6ij2rXE77rCf/z9oS/+Eu3hLNLwNOxBUa38YUrm9DtLBUP9v5vD03DNG9UjeaL", + "O9vdBcwf1vjVwe8NG7+6rT7dELo0zl4WAPFkWaANpPAh65TliCemAH4LmiWAZ2ttUMcZgn82ZzArNBjS", + "G4YJLknVCc6BeFJfJKHRka3i1wmNLISNyXM6B+v6L9y7mjQeaC1AlW7hX279fj16izxif/omnKyKFouk", + "bHWIcD0wVC9dRWppvhbaS2AKPQaz9r4nU3Fw2cPlvLrAakv4pWPT1/d+DI95qKEmXa067Bb5Or1n3buH", + "9MViKPHMrvDbdakryuFIVSM919rgjmn70tf6Ypfr3V+XqNed7jg2OCtseeXTPiPULwM4JtZcAVWs+nu2", + "utZDCB2XWtj5JbLi5HxzdfX+FXANunl9n2DdfdUsMrG2iO5xDSHHPa99n1av7MTNW9a6lOz0rKkWmyis", + "TxWIJadn7KKUkgghrrm1DvcP9w9xQ1QBkhciOom+2T/aP0RtcTshtg/o5d2hVcPaiQtl+qJ584az90K6", + "a5+qTluqqKzhLMGjRPvtX9xyMPaVSuZ1NRQkEXJRn2t7gGF3WL+Y7tS8zgj6XjW+D1WMMZ6+cAolsY8P", + "D1tceLt+8Nm4+LEZC8EBkWi3AndJh/1xmbHFsEH07QOysLj77qH/iifswu2+o3v0NHQ/SF7aidLiX5AQ", + "4aNvnoZwJSz7QVpMg6+UYudcp27Xj148lfSLhJWQymE5snB8/KAsdPoQuswshrCmV+HFU9nfmbSgJc/Y", + "Jeg70DUHHoxSzPUB9NP1/fUgMmWecz2vf8mCXSlWpwY8NYjddShB9J4NXYrFzXwoeQ5DdQdai4SQP0CH", + "QXQwobYHqs0AyR6il+uKiB4RNPy+i00x497fkopFkoYyccTwpmexH8RPiyKb142LwRuCxl2/FlphTPZy", + "+w6qt17pfGRYD6g9Ma6HnSA7YF8O7DtA2xbQ3BsgV4o1bcBbIpoIHcMHgQ0SOapvOBxYn8eFb/w+jcP/", + "EXlcX1vUzuv/zdO5HfR8NfR8ZS4lAg/1geeuedm/F3l+6nvFfauko34l9GkwyFF7YhAKaw87+NklHY/g", + "+c2r1V/n+rVjDKKDTNzBMGxRW3f86D14eB2nrvHL/8kaW2oJCQOZ0Ptsphci2r1aK2Hi63W0pLnwiVFi", + "aWPaDjB2gPFwgIFm5sDi96BG1vZMhxxZvkGqQFdTJV1/c5ZxmZYIYc3NbxcF6McCNnP82XA6nQ4pTyh1", + "BjJWibt33S5bQJJP7f7eaw47j995/AN6vPuxjW09PMudU1e9zENevXk9PF7u49VL2lXnLL0Vw+WKM0DP", + "S92PfA7oUHxiNw97kneOvnP0h3P02vtq42bHX+H3pusgg+gAY/YGlxE/tVpaqRrgdbD2p/leq9AjZfjd", + "ZqTdvcPO7f8kbk9tWL/j2sF67hc4u2vo2qj4F07x/38D97P09RuddVnQLlrHuEy8Hr7gR/+XIIVrEntU", + "qAj60J4YK8L/gmKHFTuseHisaFzo68Cimk5oUXo/ttQLE9UPvjQnATaa179pSu/UWcMWv2nX6/aLn4x5", + "5NNBTWiXHew8/k/i8d7PLW3p6qXvDIYYMESu9Xt3dcPq60yVCXut8ryUws7ZT9zClM+j6g1SapM1JwcH", + "iQaeD1P3dD+rpu/HOJ36spesf2kpq1i2bLOQoXEHvBAHI7D8oJH3/vr+/wMAAP//MrKa25NrAAA=", } // GetSwagger returns the content of the embedded swagger specification file From e8da0022917fdb47c1aed4f0f8a3ba6ae5bdb98a Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 19 Nov 2024 19:37:59 -0300 Subject: [PATCH 15/15] docker: Self-review last nits Noticed there was a potential race happening on the Stop function all updating the state concurrently. --- runner/app/live/streamer/streamer.py | 4 ++-- worker/docker.go | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 52d9012f..b5c78932 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -102,8 +102,8 @@ async def monitor_loop(self, done: Event): time_since_last_params = current_time - self.last_params_time time_since_reload = min(time_since_last_params, time_since_start) - if self.input_timeout > 0 and time_since_last_input >= self.input_timeout: - logging.info(f"Input stream stopped for {self.input_timeout} seconds. Shutting down...") + if self.input_timeout > 0 and time_since_last_input > self.input_timeout: + logging.info(f"Input stream stopped for {time_since_last_input} seconds. Shutting down...") await self.stop() return diff --git a/worker/docker.go b/worker/docker.go index 3324c3c1..46a8cfb4 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -109,15 +109,12 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin } func (m *DockerManager) Stop(ctx context.Context) error { - m.mu.Lock() - defer m.mu.Unlock() - var stopContainerWg sync.WaitGroup for _, rc := range m.containers { stopContainerWg.Add(1) go func(container *RunnerContainer) { defer stopContainerWg.Done() - m.destroyContainer(container) + m.destroyContainer(container, false) }(rc) } @@ -314,7 +311,7 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { // If the container exists in this map then it is idle and if it not marked as keep warm we remove it rc, ok := m.containers[containerName] if ok && !rc.KeepWarm { - if err := m.destroyContainer(rc); err != nil { + if err := m.destroyContainer(rc, true); err != nil { return "", err } return gpu, nil @@ -324,15 +321,15 @@ func (m *DockerManager) allocGPU(ctx context.Context) (string, error) { return "", errors.New("insufficient capacity") } -func (m *DockerManager) destroyContainer(rc *RunnerContainer) error { +// destroyContainer stops the container on docker and removes it from the +// internal state. If locked is true then the mutex is not re-locked, otherwise +// it is done automatically only when updating the internal state. +func (m *DockerManager) destroyContainer(rc *RunnerContainer, locked bool) error { slog.Info("Removing managed container", slog.String("gpu", rc.GPU), slog.String("name", rc.Name), slog.String("modelID", rc.ModelID)) - delete(m.gpuContainers, rc.GPU) - delete(m.containers, rc.Name) - if err := dockerRemoveContainer(m.dockerClient, rc.ID); err != nil { slog.Error("Error removing managed container", slog.String("gpu", rc.GPU), @@ -342,6 +339,12 @@ func (m *DockerManager) destroyContainer(rc *RunnerContainer) error { return fmt.Errorf("failed to remove container %s: %w", rc.Name, err) } + if !locked { + m.mu.Lock() + defer m.mu.Unlock() + } + delete(m.gpuContainers, rc.GPU) + delete(m.containers, rc.Name) return nil } @@ -377,10 +380,7 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Co } else if container.State.Running { continue } - - m.mu.Lock() - defer m.mu.Unlock() - m.destroyContainer(rc) + m.destroyContainer(rc, false) return } }