Skip to content

Commit

Permalink
fix: Readd database support for resource manager (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptiurin authored Jan 10, 2024
1 parent 87e8378 commit 4094853
Show file tree
Hide file tree
Showing 17 changed files with 965 additions and 13 deletions.
181 changes: 181 additions & 0 deletions src/firebolt/model/V1/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
from __future__ import annotations

import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Optional, Sequence

from pydantic import Field, PrivateAttr

from firebolt.model.V1 import FireboltBaseModel
from firebolt.model.V1.region import RegionKey
from firebolt.service.V1.engine import EngineService
from firebolt.service.V1.types import EngineStatusSummary
from firebolt.utils.exception import AttachedEngineInUseError
from firebolt.utils.urls import ACCOUNT_DATABASE_URL

if TYPE_CHECKING:
from firebolt.model.V1.binding import Binding
from firebolt.model.V1.engine import Engine
from firebolt.service.V1.database import DatabaseService

logger = logging.getLogger(__name__)


class DatabaseKey(FireboltBaseModel):
account_id: str
database_id: str


class FieldMask(FireboltBaseModel):
paths: Sequence[str] = Field(alias="paths")


class Database(FireboltBaseModel):
"""
A Firebolt database.
Databases belong to a region and have a description,
but otherwise are not configurable.
"""

# internal
_service: DatabaseService = PrivateAttr()

# required
name: str = Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")
compute_region_key: RegionKey = Field(alias="compute_region_id")

# optional
database_key: Optional[DatabaseKey] = Field(None, alias="id")
description: Optional[str] = Field(None, max_length=255)
emoji: Optional[str] = Field(None, max_length=255)
current_status: Optional[str]
health_status: Optional[str]
data_size_full: Optional[int]
data_size_compressed: Optional[int]
is_system_database: Optional[bool]
storage_bucket_name: Optional[str]
create_time: Optional[datetime]
create_actor: Optional[str]
last_update_time: Optional[datetime]
last_update_actor: Optional[str]
desired_status: Optional[str]

@classmethod
def parse_obj_with_service(
cls, obj: Any, database_service: DatabaseService
) -> Database:
database = cls.parse_obj(obj)
database._service = database_service
return database

@property
def database_id(self) -> Optional[str]:
if self.database_key is None:
return None
return self.database_key.database_id

def get_attached_engines(self) -> List[Engine]:
"""Get a list of engines that are attached to this database."""

return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501
database=self
)

def attach_to_engine(
self, engine: Engine, is_default_engine: bool = False
) -> Binding:
"""
Attach an engine to this database.
Args:
engine: The engine to attach.
is_default_engine:
Whether this engine should be used as default for this database.
Only one engine can be set as default for a single database.
This will overwrite any existing default.
"""

return self._service.resource_manager.bindings.create(
engine=engine, database=self, is_default_engine=is_default_engine
)

def delete(self) -> Database:
"""
Delete a database from Firebolt.
Raises an error if there are any attached engines.
"""

for engine in self.get_attached_engines():
if engine.current_status_summary in {
EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING,
EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING,
}:
raise AttachedEngineInUseError(method_name="delete")

logger.info(
f"Deleting Database (database_id={self.database_id}, name={self.name})"
)
response = self._service.client.delete(
url=ACCOUNT_DATABASE_URL.format(
account_id=self._service.account_id, database_id=self.database_id
),
headers={"Content-type": "application/json"},
)
return Database.parse_obj_with_service(
response.json()["database"], self._service
)

def update(self, description: str) -> Database:
"""
Updates a database description.
"""

class _DatabaseUpdateRequest(FireboltBaseModel):
"""Helper model for sending Database creation requests."""

account_id: str
database: Database
database_id: str
update_mask: FieldMask

self.description = description

logger.info(
f"Updating Database (database_id={self.database_id}, "
f"name={self.name}, description={self.description})"
)

payload = _DatabaseUpdateRequest(
account_id=self._service.account_id,
database=self,
database_id=self.database_id,
update_mask=FieldMask(paths=["description"]),
).jsonable_dict(by_alias=True)

response = self._service.client.patch(
url=ACCOUNT_DATABASE_URL.format(
account_id=self._service.account_id, database_id=self.database_id
),
headers={"Content-type": "application/json"},
json=payload,
)

return Database.parse_obj_with_service(
response.json()["database"], self._service
)

def get_default_engine(self) -> Optional[Engine]:
"""
Returns: default engine of the database, or None if default engine is missing
"""
rm = self._service.resource_manager
assert isinstance(rm.engines, EngineService), "Expected EngineService V1"
default_engines: List[Engine] = [
rm.engines.get(binding.engine_id)
for binding in rm.bindings.get_many(database_id=self.database_id)
if binding.is_default_engine
]

return None if len(default_engines) == 0 else default_engines[0]
16 changes: 16 additions & 0 deletions src/firebolt/model/V1/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime
from typing import Optional

from pydantic import Field

from firebolt.model.V1 import FireboltBaseModel


class Provider(FireboltBaseModel, frozen=True): # type: ignore
provider_id: str = Field(alias="id")
name: str

# optional
create_time: Optional[datetime]
display_name: Optional[str]
last_update_time: Optional[datetime]
6 changes: 6 additions & 0 deletions src/firebolt/service/V1/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from firebolt.client import ClientV1 as Client
from firebolt.service.manager import ResourceManager

Expand All @@ -13,3 +15,7 @@ def client(self) -> Client:
@property
def account_id(self) -> str:
return self.resource_manager.account_id

@property
def default_region_setting(self) -> Optional[str]:
return self.resource_manager.default_region
102 changes: 100 additions & 2 deletions src/firebolt/service/V1/binding.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
import logging
from typing import List, Optional

from firebolt.model.V1.binding import Binding
from firebolt.model.V1.binding import Binding, BindingKey
from firebolt.model.V1.database import Database
from firebolt.model.V1.engine import Engine
from firebolt.service.V1.base import BaseService
from firebolt.utils.urls import ACCOUNT_BINDINGS_URL
from firebolt.service.V1.database import DatabaseService
from firebolt.service.V1.engine import EngineService
from firebolt.utils.exception import AlreadyBoundError
from firebolt.utils.urls import (
ACCOUNT_BINDINGS_URL,
ACCOUNT_DATABASE_BINDING_URL,
)
from firebolt.utils.util import prune_dict

logger = logging.getLogger(__name__)


class BindingService(BaseService):
def get_by_key(self, binding_key: BindingKey) -> Binding:
"""Get a binding by its BindingKey"""
response = self.client.get(
url=ACCOUNT_DATABASE_BINDING_URL.format(
account_id=binding_key.account_id,
database_id=binding_key.database_id,
engine_id=binding_key.engine_id,
)
)
binding: dict = response.json()["binding"]
return Binding.parse_obj(binding)

def get_many(
self,
database_id: Optional[str] = None,
Expand Down Expand Up @@ -47,3 +67,81 @@ def get_many(
),
)
return [Binding.parse_obj(i["node"]) for i in response.json()["edges"]]

def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]:
"""Get the database to which an engine is bound, if any."""
try:
binding = self.get_many(engine_id=engine.engine_id)[0]
except IndexError:
return None
try:
assert isinstance(
self.resource_manager.databases, DatabaseService
), "Expected DatabaseService V1"
return self.resource_manager.databases.get(id_=binding.database_id)
except (KeyError, IndexError):
return None

def get_engines_bound_to_database(self, database: Database) -> List[Engine]:
"""Get a list of engines that are bound to a database."""

bindings = self.get_many(database_id=database.database_id)
if not bindings:
return []
assert isinstance(
self.resource_manager.engines, EngineService
), "Expected EngineService V1"
return self.resource_manager.engines.get_by_ids(
ids=[b.engine_id for b in bindings]
)

def create(
self, engine: Engine, database: Database, is_default_engine: bool
) -> Binding:
"""
Create a new binding between an engine and a database.
Args:
engine: Engine to bind.
database: Database to bind.
is_default_engine:
Whether this engine should be used as default for this database.
Only one engine can be set as default for a single database.
This will overwrite any existing default.
Returns:
New binding between the engine and database.
"""

existing_database = self.get_database_bound_to_engine(engine=engine)
if existing_database is not None:
raise AlreadyBoundError(
f"The engine {engine.name} is already bound "
f"to {existing_database.name}!"
)

logger.info(
f"Attaching Engine (engine_id={engine.engine_id}, name={engine.name}) "
f"to Database (database_id={database.database_id}, "
f"name={database.name})"
)
binding = Binding(
binding_key=BindingKey(
account_id=self.account_id,
database_id=database.database_id,
engine_id=engine.engine_id,
),
is_default_engine=is_default_engine,
)

response = self.client.post(
url=ACCOUNT_DATABASE_BINDING_URL.format(
account_id=self.account_id,
database_id=database.database_id,
engine_id=engine.engine_id,
),
json=binding.jsonable_dict(
by_alias=True, include={"binding_key": ..., "is_default_engine": ...}
),
)
return Binding.parse_obj(response.json()["binding"])
Loading

0 comments on commit 4094853

Please sign in to comment.