Skip to content

Commit

Permalink
add improved process handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pkucmus committed Mar 28, 2024
1 parent c579852 commit 3a4d873
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "smyth"
version = "0.2.0"
version = "0.2.1"
description = ""
authors = ["Mirumee <[email protected]>"]
readme = "README.md"
Expand Down
3 changes: 3 additions & 0 deletions src/smyth/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from smyth.config import get_config, get_config_dict, serialize_config

config = get_config(get_config_dict())

logging_config = {
"version": 1,
"disable_existing_loggers": False,
Expand All @@ -16,6 +17,7 @@
"class": "rich.logging.RichHandler",
"formatter": "default",
"markup": True,
"rich_tracebacks": True,
},
},
"formatters": {
Expand All @@ -32,6 +34,7 @@
logging.config.dictConfig(logging_config)
LOGGER = logging.getLogger(__name__)


@click.group()
def cli():
pass
Expand Down
15 changes: 12 additions & 3 deletions src/smyth/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,20 @@ def run(self):

def send(self, data) -> RunnerResult | None:
LOGGER.info("Sending data to process %s: %s", self.name, data)
if not self.is_alive():
raise RuntimeError(f"Process '{self.name}' is not alive, restart server.")
self.input_queue.put(data)
try:
result_data = self.output_queue.get()
LOGGER.info("Received data from process %s: %s", self.name, result_data)
return RunnerResult.model_validate_json(result_data)
while True:
result_data = self.output_queue.get(timeout=10)
if not result_data:
LOGGER.info("Received empty data from process %s, checking if process is alive", self.name)
if not self.is_alive():
LOGGER.info("Process %s is not alive, breaking", self.name)
break
continue
LOGGER.info("Received data from process %s: %s", self.name, result_data)
return RunnerResult.model_validate_json(result_data)
except Empty:
return None
except KeyboardInterrupt:
Expand Down
6 changes: 5 additions & 1 deletion src/smyth/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ def timeout_handler(signum, frame):
def main(lambda_handler_config: HandlerConfig, input_queue: Queue, output_queue: Queue):
LOGGER.setLevel(lambda_handler_config.log_level)
sys.stdin = open("/dev/stdin")
lambda_handler = import_attribute(lambda_handler_config.handler_path)
try:
lambda_handler = import_attribute(lambda_handler_config.handler_path)
except ImportError as error:
LOGGER.error("Could not import lambda handler: %s", error)
raise

coldstart_next_invokation = lambda_handler_config.fake_coldstart_time

Expand Down
1 change: 1 addition & 0 deletions src/smyth/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async def lifespan(app: Starlette):
process.terminate()
process.join()
LOGGER.info("All lambda processes terminated")



def get_process_definition(path: str) -> ProcessDefinition | None:
Expand Down

0 comments on commit 3a4d873

Please sign in to comment.