-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement fastapi, sentry, typer, and logging utilities (#2)
This PR contains an initial set of utilities. Developed alongside https://github.com/WATonomous/mailing-list-gateway.
- Loading branch information
Showing
8 changed files
with
279 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import json | ||
import os | ||
import logging | ||
from enum import Enum | ||
|
||
class Vars(str, Enum): | ||
SENTRY_DSN = "SENTRY_DSN" | ||
SENTRY_RELEASE = "SENTRY_RELEASE" | ||
BUILD_INFO = "BUILD_INFO" | ||
DEPLOYMENT_ENVIRONMENT = "DEPLOYMENT_ENVIRONMENT" | ||
|
||
# This cache is for warning only once if a variable is missing. | ||
var_cache = {} | ||
def getvar(key: Vars, warn_if_missing: bool = True, logger=logging.getLogger(__name__)) -> str: | ||
if key in var_cache: | ||
return var_cache[key] | ||
|
||
if key == Vars.BUILD_INFO: | ||
# DOCKER_METADATA_OUTPUT_JSON is generated by the build pipeline (e.g. docker/metadata-action). | ||
# Example: | ||
# { | ||
# "tags": ["ghcr.io/watonomous/repo-ingestion:main"], | ||
# "labels": { | ||
# "org.opencontainers.image.title": "repo-ingestion", | ||
# "org.opencontainers.image.description": "Simple server to receive file changes and open GitHub pull requests", | ||
# "org.opencontainers.image.url": "https://github.com/WATonomous/repo-ingestion", | ||
# "org.opencontainers.image.source": "https://github.com/WATonomous/repo-ingestion", | ||
# "org.opencontainers.image.version": "main", | ||
# "org.opencontainers.image.created": "2024-01-20T16:10:39.421Z", | ||
# "org.opencontainers.image.revision": "1d55b62b15c78251e0560af9e97927591e260a98", | ||
# "org.opencontainers.image.licenses": "", | ||
# }, | ||
# } | ||
BUILD_INFO_ENV_VAR = "DOCKER_METADATA_OUTPUT_JSON" | ||
val = os.getenv(BUILD_INFO_ENV_VAR) | ||
if not val: | ||
if warn_if_missing: | ||
logger.warning(f"Environment variable {BUILD_INFO_ENV_VAR} not found.") | ||
ret = None | ||
var_cache[key] = ret | ||
return ret | ||
|
||
try: | ||
ret = json.loads(val) | ||
except json.JSONDecodeError: | ||
if warn_if_missing: | ||
logger.warning(f"Failed to parse environment variable {BUILD_INFO_ENV_VAR}.") | ||
ret = {} | ||
|
||
var_cache[key] = ret | ||
return ret | ||
|
||
# Generic environment variable | ||
ret = os.getenv(key.value) | ||
if not ret and warn_if_missing: | ||
logger.warning(f"Environment variable {key.value} not found.") | ||
|
||
var_cache[key] = ret | ||
return ret |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import logging | ||
|
||
from fastapi import FastAPI | ||
from fastapi.middleware.cors import CORSMiddleware | ||
from prometheus_fastapi_instrumentator import Instrumentator | ||
|
||
from .env import Vars, getvar | ||
from .sentry import sentry_sdk, set_up_sentry | ||
|
||
|
||
class WATcloudFastAPI(FastAPI): | ||
def __init__( | ||
self, | ||
*args, | ||
cors_allow_origins=None, | ||
logger=logging.getLogger(__name__), | ||
expose_metrics=True, | ||
expose_build_info=True, | ||
expose_health=True, | ||
health_fns=[], | ||
expose_runtime_info=True, | ||
initial_runtime_info={}, | ||
enable_sentry=True, | ||
**kwargs, | ||
): | ||
""" | ||
A FastAPI wrapper that adds various convenience features. | ||
Args: | ||
cors_allow_origins (List[str], optional): A list of origins to allow CORS requests from. Defaults to ['*'] if the DEPLOYMENT_ENVIRONMENT environment variable is nonempty, otherwise []. This parameter is useful for local development. In production, CORS should be handled by the reverse proxy. | ||
""" | ||
super().__init__(*args, **kwargs) | ||
|
||
if cors_allow_origins is None: | ||
cors_allow_origins = ( | ||
["*"] if getvar(Vars.DEPLOYMENT_ENVIRONMENT, logger=logger) else [] | ||
) | ||
|
||
if cors_allow_origins: | ||
logger.info( | ||
f"Adding CORS middleware with allow_origins={cors_allow_origins}. This should only be used for local development. Please handle CORS at the reverse proxy in production." | ||
) | ||
self.add_middleware( | ||
CORSMiddleware, | ||
allow_origins=cors_allow_origins, | ||
allow_credentials=True, | ||
allow_methods=["*"], | ||
allow_headers=["*"], | ||
) | ||
|
||
self.logger = logger | ||
|
||
if expose_metrics: | ||
Instrumentator().instrument(self).expose(self) | ||
|
||
if expose_build_info: | ||
self.add_api_route("/build-info", self.read_build_info, methods=["GET"]) | ||
|
||
if expose_health: | ||
self.health_fns = health_fns | ||
self.add_api_route("/health", self.read_health, methods=["GET"]) | ||
|
||
if expose_runtime_info: | ||
self.runtime_info = initial_runtime_info | ||
self.add_api_route("/runtime-info", self.read_runtime_info, methods=["GET"]) | ||
|
||
if enable_sentry: | ||
sentry_has_dsn = set_up_sentry(logger=self.logger) | ||
if self.runtime_info is not None: | ||
self.runtime_info["sentry_has_dsn"] = sentry_has_dsn | ||
self.runtime_info["sentry_sdk_version"] = sentry_sdk.VERSION | ||
|
||
def read_build_info(self): | ||
return getvar(Vars.BUILD_INFO, logger=self.logger) or {} | ||
|
||
def read_health(self): | ||
for fn in self.health_fns: | ||
fn(self) | ||
return {"status": "ok"} | ||
|
||
def read_runtime_info(self): | ||
return self.runtime_info |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import logging | ||
import os | ||
|
||
logger = logging.getLogger() | ||
|
||
def set_up_logging(): | ||
logger.setLevel(os.environ.get("APP_LOG_LEVEL", "INFO")) | ||
formatter = logging.Formatter( | ||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s" | ||
) | ||
handler = logging.StreamHandler() | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import json | ||
import sys | ||
from enum import Enum | ||
|
||
import typer | ||
import yaml | ||
|
||
|
||
class OutputFormat(str, Enum): | ||
yaml = "yaml" | ||
json = "json" | ||
raw = "raw" | ||
|
||
|
||
class CustomJSONEncoder(json.JSONEncoder): | ||
def default(self, obj): | ||
# Encode all iterables as lists | ||
# Derived from | ||
# - https://stackoverflow.com/a/8230505 | ||
if hasattr(obj, "__iter__"): | ||
return list(obj) | ||
# Serialize date | ||
if hasattr(obj, "isoformat"): | ||
return obj.isoformat() | ||
return json.JSONEncoder.default(self, obj) | ||
|
||
|
||
def cli_print_retval(ret: dict | list, output_format: OutputFormat, **kwargs): | ||
if output_format == OutputFormat.yaml: | ||
print(yaml.dump(ret, default_flow_style=False)) | ||
elif output_format == OutputFormat.json: | ||
print(json.dumps(ret, indent=2, cls=CustomJSONEncoder)) | ||
elif output_format == OutputFormat.raw: | ||
sys.stdout.write(ret) | ||
else: | ||
raise ValueError(f"Unknown output format: {output_format}") | ||
|
||
|
||
app = typer.Typer(result_callback=cli_print_retval) | ||
|
||
|
||
@app.callback() | ||
# This function is used to add global CLI options | ||
def main(output_format: OutputFormat = OutputFormat.yaml): | ||
pass |