Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FastAPI dependency #244

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[MESSAGES CONTROL]
disable=
duplicate-code, # the checker in 2.9.6 fails with parallelism
invalid-name, # fastapi conventions break this
invalid-name, # fastapi / starlette conventions break this
missing-docstring,
no-member, # broken with pydantic + inheritance
too-few-public-methods, # some pydantic models have 0 and it is fine
Expand Down
1 change: 0 additions & 1 deletion astacus.spec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ BuildRequires: snappy-devel
BuildRequires: which

# These are used when actually running the package
Requires: python3-fastapi
Requires: python3-httpx
Requires: python3-protobuf
Requires: python3-pyyaml
Expand Down
2 changes: 1 addition & 1 deletion astacus/common/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
Dependency injection helper functions.
"""

from fastapi import Request
from starlette.datastructures import URL
from starlette.requests import Request


def get_request_url(request: Request) -> URL:
Expand Down
35 changes: 0 additions & 35 deletions astacus/common/msgspec_glue.py

This file was deleted.

6 changes: 3 additions & 3 deletions astacus/common/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from .exceptions import ExpiredOperationException
from .statsd import StatsClient
from .utils import AstacusModel
from astacus.starlette import JSONHTTPException
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
from fastapi import HTTPException
from starlette.background import BackgroundTasks
from starlette.datastructures import URL
from typing import Any, Optional
Expand Down Expand Up @@ -149,11 +149,11 @@ def _sync_wrapper():

return Op.StartResult(op_id=op.op_id, status_url=status_url)

def get_op_and_op_info(self, *, op_id, op_name=None):
def get_op_and_op_info(self, *, op_id: int, op_name: str | None = None):
op_info = self.state.op_info
if op_id != op_info.op_id or (op_name and op_name != op_info.op_name):
logger.info("request for nonexistent %s.%s != %r", op_name, op_id, op_info)
raise HTTPException(
raise JSONHTTPException(
404,
{
"code": magic.ErrorCode.operation_id_mismatch,
Expand Down
7 changes: 3 additions & 4 deletions astacus/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import deque
from collections.abc import AsyncIterable, AsyncIterator, Callable, Hashable, Iterable, Iterator, Mapping
from contextlib import contextmanager
from multiprocessing.dummy import Pool # fastapi + fork = bad idea
from multiprocessing.dummy import Pool # starlette + fork = bad idea
from pathlib import Path
from pydantic import BaseModel
from typing import Any, ContextManager, Final, Generic, IO, Literal, overload, TextIO, TypeAlias, TypeVar
Expand Down Expand Up @@ -83,9 +83,8 @@ def http_request(url, *, caller, method="get", timeout=10, ignore_status_code: b
"""Wrapper for requests.request which handles timeouts as non-exceptions,
and returns only valid results that we actually care about.

This is here primarily so that some requests stuff
(e.g. fastapi.testclient) still works, but we can mock things to
our hearts content in test code by doing 'things' here.
This is here primarily so that some requests stuff still works, but we can
mock things to our hearts content in test code by doing 'things' here.
"""
# TBD: may need to redact url in future, if we actually wind up
# using passwords in urls here.
Expand Down
5 changes: 3 additions & 2 deletions astacus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from astacus.common.utils import AstacusModel
from astacus.coordinator.config import APP_KEY as COORDINATOR_CONFIG_KEY, CoordinatorConfig
from astacus.node.config import APP_KEY as NODE_CONFIG_KEY, NodeConfig
from fastapi import FastAPI, Request
from pathlib import Path
from starlette.applications import Starlette
from starlette.requests import Request

import hashlib
import io
Expand Down Expand Up @@ -64,7 +65,7 @@ def get_config_content_and_hash(config_path: str | Path) -> tuple[str, str]:
return config_content.decode(), config_hash


def set_global_config_from_path(app: FastAPI, path: str | Path) -> GlobalConfig:
def set_global_config_from_path(app: Starlette, path: str | Path) -> GlobalConfig:
config_content, config_hash = get_config_content_and_hash(path)
with io.StringIO(config_content) as config_file:
config = GlobalConfig.parse_obj(yaml.safe_load(config_file))
Expand Down
121 changes: 65 additions & 56 deletions astacus/coordinator/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@
"""

from .cleanup import CleanupOp
from .coordinator import BackupOp, Coordinator, DeltaBackupOp, RestoreOp
from .coordinator import BackupOp, Coordinator, CoordinatorOp, DeltaBackupOp, RestoreOp
from .list import CachedListEntries, list_backups, list_delta_backups
from .lockops import LockOps
from .state import CachedListResponse
from astacus import config
from astacus.common import ipc
from astacus.common.magic import StrEnum
from astacus.common.msgspec_glue import register_msgspec_glue, StructResponse
from astacus.common.op import Op
from astacus.common.progress import Progress
from astacus.config import APP_HASH_KEY, get_config_content_and_hash
from astacus.starlette import get_query_param, Router
from asyncio import to_thread
from collections.abc import Sequence
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from typing import Annotated
from starlette.background import BackgroundTasks
from starlette.exceptions import HTTPException
from starlette.requests import Request
from urllib.parse import urljoin

import logging
import msgspec
import os
import time

register_msgspec_glue()
router = APIRouter()
router = Router()

logger = logging.getLogger(__name__)

Expand All @@ -52,7 +52,7 @@ async def root():


@router.post("/config/reload")
async def config_reload(*, request: Request, c: Coordinator = Depends()):
async def config_reload(*, request: Request) -> dict:
"""Reload astacus configuration"""
config_path = os.environ.get("ASTACUS_CONFIG")
assert config_path is not None
Expand All @@ -61,7 +61,7 @@ async def config_reload(*, request: Request, c: Coordinator = Depends()):


@router.get("/config/status")
async def config_status(*, request: Request):
async def config_status(*, request: Request) -> dict:
config_path = os.environ.get("ASTACUS_CONFIG")
assert config_path is not None
_, config_hash = get_config_content_and_hash(config_path)
Expand All @@ -70,53 +70,51 @@ async def config_status(*, request: Request):


@router.post("/lock")
async def lock(*, locker: str, c: Coordinator = Depends(), op: LockOps = Depends()):
async def lock(*, request: Request, background_tasks: BackgroundTasks) -> LockStartResult:
c = await Coordinator.create_from_request(request, background_tasks)
locker = get_query_param(request, "locker")
op = c.create_op(LockOps, locker=locker)
result = c.start_op(op_name=OpName.lock, op=op, fun=op.lock)
return LockStartResult(unlock_url=urljoin(str(c.request_url), f"../unlock?locker={locker}"), **result.dict())


@router.post("/unlock")
def unlock(*, locker: str, c: Coordinator = Depends(), op: LockOps = Depends()):
async def unlock(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
locker = get_query_param(request, "locker")
op = c.create_op(LockOps, locker=locker)
return c.start_op(op_name=OpName.unlock, op=op, fun=op.unlock)


@router.post("/backup")
async def backup(*, c: Coordinator = Depends(), op: BackupOp = Depends(BackupOp.create)):
async def backup(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = c.create_op(BackupOp)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.backup, op=op, fun=runner)


@router.post("/delta/backup")
async def delta_backup(*, c: Coordinator = Depends(), op: DeltaBackupOp = Depends(DeltaBackupOp.create)):
async def delta_backup(*, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = c.create_op(DeltaBackupOp)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.backup, op=op, fun=runner)


@router.post("/restore")
async def restore(
*,
c: Coordinator = Depends(),
storage: Annotated[str, Body()] = "",
name: Annotated[str, Body()] = "",
partial_restore_nodes: Annotated[Sequence[ipc.PartialRestoreRequestNode] | None, Body()] = None,
stop_after_step: Annotated[str | None, Body()] = None,
):
req = ipc.RestoreRequest(
storage=storage,
name=name,
partial_restore_nodes=partial_restore_nodes,
stop_after_step=stop_after_step,
)
op = RestoreOp(c=c, req=req)
async def restore(*, body: ipc.RestoreRequest, request: Request, background_tasks: BackgroundTasks) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = RestoreOp(c=c, req=body)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.restore, op=op, fun=runner)


@router.get("/list")
async def _list_backups(
*, storage: Annotated[str, Body()] = "", c: Coordinator = Depends(), request: Request
) -> StructResponse:
req = ipc.ListRequest(storage=storage)
*, body: ipc.ListRequest = ipc.ListRequest(), request: Request, background_tasks: BackgroundTasks
) -> ipc.ListResponse:
c = await Coordinator.create_from_request(request, background_tasks)
coordinator_config = c.config
cached_list_response = c.state.cached_list_response
if cached_list_response is not None:
Expand All @@ -126,7 +124,7 @@ async def _list_backups(
and cached_list_response.coordinator_config == coordinator_config
and cached_list_response.list_request
):
return StructResponse(cached_list_response.list_response)
return cached_list_response.list_response
if c.state.cached_list_running:
raise HTTPException(status_code=429, detail="Already caching list result")
c.state.cached_list_running = True
Expand All @@ -136,15 +134,15 @@ async def _list_backups(
if cached_list_response is not None
else {}
)
list_response = await to_thread(list_backups, req=req, storage_factory=c.storage_factory, cache=cache)
list_response = await to_thread(list_backups, req=body, storage_factory=c.storage_factory, cache=cache)
c.state.cached_list_response = CachedListResponse(
coordinator_config=coordinator_config,
list_request=req,
list_request=body,
list_response=list_response,
)
finally:
c.state.cached_list_running = False
return StructResponse(list_response)
return list_response


def get_cache_entries_from_list_response(list_response: ipc.ListResponse) -> CachedListEntries:
Expand All @@ -155,40 +153,50 @@ def get_cache_entries_from_list_response(list_response: ipc.ListResponse) -> Cac


@router.get("/delta/list")
async def _list_delta_backups(*, storage: Annotated[str, Body()] = "", c: Coordinator = Depends(), request: Request):
req = ipc.ListRequest(storage=storage)
async def _list_delta_backups(
*, body: ipc.ListRequest, request: Request, background_tasks: BackgroundTasks
) -> ipc.ListResponse:
c = await Coordinator.create_from_request(request, background_tasks)
# This is not supposed to be called very often, no caching necessary
return await to_thread(list_delta_backups, req=req, storage_factory=c.storage_factory)
return await to_thread(list_delta_backups, req=body, storage_factory=c.storage_factory)


@router.post("/cleanup")
async def cleanup(
*,
storage: Annotated[str, Body()] = "",
retention: Annotated[ipc.Retention | None, Body()] = None,
explicit_delete: Annotated[Sequence[str], Body()] = (),
c: Coordinator = Depends(),
):
req = ipc.CleanupRequest(storage=storage, retention=retention, explicit_delete=list(explicit_delete))
op = CleanupOp(c=c, req=req)
*, request: Request, background_tasks: BackgroundTasks, body: ipc.CleanupRequest = ipc.CleanupRequest()
) -> Op.StartResult:
c = await Coordinator.create_from_request(request, background_tasks)
op = CleanupOp(c=c, req=body)
runner = await op.acquire_cluster_lock()
return c.start_op(op_name=OpName.cleanup, op=op, fun=runner)


@router.get("/{op_name}/{op_id}")
@router.get("/delta/{op_name}/{op_id}")
def op_status(*, op_name: OpName, op_id: int, c: Coordinator = Depends()):
class OpStatusResult(msgspec.Struct, kw_only=True):
state: Op.Status | None
progress: Progress | None


@router.get("/{op_name:str}/{op_id:int}")
@router.get("/delta/{op_name:str}/{op_id:int}")
async def op_status(*, request: Request, background_tasks: BackgroundTasks) -> OpStatusResult:
c = await Coordinator.create_from_request(request, background_tasks)
op_name = OpName(request.path_params["op_name"])
op_id: int = request.path_params["op_id"]
op, op_info = c.get_op_and_op_info(op_id=op_id, op_name=op_name)
result = {"state": op_info.op_status}
if isinstance(op, (BackupOp, DeltaBackupOp, RestoreOp)):
result["progress"] = msgspec.to_builtins(op.progress)
result = OpStatusResult(state=op_info.op_status, progress=None)
if isinstance(op, BackupOp | DeltaBackupOp | RestoreOp):
result.progress = op.progress
return result


@router.put("/{op_name}/{op_id}/sub-result")
@router.put("/delta/{op_name}/{op_id}/sub-result")
async def op_sub_result(*, op_name: OpName, op_id: int, c: Coordinator = Depends()):
@router.put("/{op_name:str}/{op_id:int}/sub-result")
@router.put("/delta/{op_name:str}/{op_id:int}/sub-result")
async def op_sub_result(*, request: Request, background_tasks: BackgroundTasks) -> None:
c = await Coordinator.create_from_request(request, background_tasks)
op_name = OpName(request.path_params["op_name"])
op_id: int = request.path_params["op_id"]
op, _ = c.get_op_and_op_info(op_id=op_id, op_name=op_name)
assert isinstance(op, CoordinatorOp)
# We used to have results available here, but not use those
# that was wasting a lot of memory by generating the same result twice.
if not op.subresult_sleeper:
Expand All @@ -197,5 +205,6 @@ async def op_sub_result(*, op_name: OpName, op_id: int, c: Coordinator = Depends


@router.get("/busy")
async def is_busy(*, c: Coordinator = Depends()) -> bool:
async def is_busy(*, request: Request, background_tasks: BackgroundTasks) -> bool:
c = await Coordinator.create_from_request(request, background_tasks)
return c.is_busy()
3 changes: 1 addition & 2 deletions astacus/coordinator/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from astacus.common import ipc
from astacus.coordinator.coordinator import Coordinator, SteppedCoordinatorOp
from fastapi import Depends

import logging

Expand All @@ -17,7 +16,7 @@

class CleanupOp(SteppedCoordinatorOp):
@staticmethod
async def create(*, c: Coordinator = Depends(), req: ipc.CleanupRequest = ipc.CleanupRequest()) -> "CleanupOp":
async def create(*, c: Coordinator, req: ipc.CleanupRequest = ipc.CleanupRequest()) -> "CleanupOp":
return CleanupOp(c=c, req=req)

def __init__(self, *, c: Coordinator, req: ipc.CleanupRequest) -> None:
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from astacus.common.statsd import StatsdConfig
from astacus.common.utils import AstacusModel
from collections.abc import Sequence
from fastapi import Request
from pathlib import Path
from starlette.requests import Request

APP_KEY = "coordinator_config"

Expand Down
Loading
Loading