diff --git a/backend/chatsky_ui/services/process.py b/backend/chatsky_ui/services/process.py index 93ef839e..c37f42a0 100644 --- a/backend/chatsky_ui/services/process.py +++ b/backend/chatsky_ui/services/process.py @@ -163,47 +163,6 @@ async def stop(self) -> None: self.logger.error("Process '%s' not found. It may have already exited.", self.id) raise ProcessLookupError from exc - async def is_alive(self) -> bool: - """Checks if the process is alive by writing to stdin andreading its stdout.""" - async def check_telegram_readiness(stream, name): - async for line in stream: - decoded_line = line.decode().strip() - self.logger.info(f"[{name}] {decoded_line}") - - if "telegram.ext.Application:Application started" in decoded_line: - self.logger.info("The application is ready for use!") - return True - return False - - async with AsyncClient() as client: - try: - response = await client.get( - f"http://localhost:{HTTP_INTERFACE_PORT}/health", - ) - return response.json()["status"] == "ok" - except Exception as e: - self.logger.info( - f"Process '{self.id}' isn't alive on port '{HTTP_INTERFACE_PORT}' yet. " - f"Ignore this if you're not connecting via HTTPInterface. Exception caught: {e}" - ) - - done, pending = await asyncio.wait( - [ - asyncio.create_task(check_telegram_readiness(self.process.stdout, "STDOUT")), - asyncio.create_task(check_telegram_readiness(self.process.stderr, "STDERR")), - ], - return_when=asyncio.FIRST_COMPLETED, - timeout=PING_PONG_TIMEOUT, - ) - - for task in pending: - task.cancel() - - for task in done: - result = task.result() - if result: - return result - def add_new_conf(self, conf: list, params: dict) -> list: #TODO: rename conf everywhere to metadata/meta for run in conf: if run.id == params["id"]: # type: ignore @@ -250,6 +209,49 @@ async def update_db_info(self) -> None: break await write_conf(builds_conf, settings.builds_path) + + async def is_alive(self) -> bool: + """Checks if the process is alive by writing to stdin andreading its stdout.""" + async def check_telegram_readiness(stream, name): + async for line in stream: + decoded_line = line.decode().strip() + self.logger.info(f"[{name}] {decoded_line}") + + if "telegram.ext.Application:Application started" in decoded_line: + self.logger.info("The application is ready for use!") + return True + return False + + async with AsyncClient() as client: + try: + response = await client.get( + f"http://localhost:{HTTP_INTERFACE_PORT}/health", + ) + return response.json()["status"] == "ok" + except Exception as e: + self.logger.info( + f"Process '{self.id}' isn't alive on port '{HTTP_INTERFACE_PORT}' yet. " + f"Ignore this if you're not connecting via HTTPInterface. Exception caught: {e}" + ) + + done, pending = await asyncio.wait( + [ + asyncio.create_task(check_telegram_readiness(self.process.stdout, "STDOUT")), + asyncio.create_task(check_telegram_readiness(self.process.stderr, "STDERR")), + ], + return_when=asyncio.FIRST_COMPLETED, + timeout=PING_PONG_TIMEOUT, + ) + + for task in pending: + task.cancel() + + for task in done: + result = task.result() + if result: + return result + + return False class BuildProcess(Process): @@ -280,8 +282,15 @@ def save_built_script_to_git(self, id_: int) -> None: bot_repo = get_repo(settings.custom_dir.parent) save_built_script_to_git(id_, bot_repo) - async def check_status(self) -> Status: - status = await super().check_status() - if status not in [Status.NULL, Status.RUNNING, Status.ALIVE]: - # Save the project anyway to keep a gradual number of builds - self.save_built_script_to_git(self.id) + async def periodically_check_status(self) -> None: + """Periodically checks the process status and updates the database.""" + while True: + await self.update_db_info() # check status and update db + self.logger.info("Status of process '%s': %s", self.id, self.status) + if self.status in [Status.NULL, Status.STOPPED, Status.COMPLETED, Status.FAILED]: + self.save_built_script_to_git(self.id) + break + await asyncio.sleep(2) # TODO: ?sleep time shouldn't be constant + + async def is_alive(self) -> bool: + return False