-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
move system outputs and analysis cases to Cloud Storage #335
Changes from 2 commits
873bf71
2a86d65
4075599
cf1a4eb
b344125
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ | |
import re | ||
import traceback | ||
import zlib | ||
from collections.abc import Iterator | ||
from datetime import datetime | ||
from inspect import getsource | ||
from types import FunctionType | ||
|
@@ -18,6 +17,7 @@ | |
from explainaboard_web.impl.auth import get_user | ||
from explainaboard_web.impl.db_utils.dataset_db_utils import DatasetDBUtils | ||
from explainaboard_web.impl.db_utils.db_utils import DBUtils | ||
from explainaboard_web.impl.storage import get_storage | ||
from explainaboard_web.impl.utils import ( | ||
abort_with_error_message, | ||
binarize_bson, | ||
|
@@ -312,19 +312,6 @@ def _process( | |
metadata=processor_metadata, sys_output=system_output_data.samples | ||
) | ||
|
||
@staticmethod | ||
def _compress_data(data: list) -> bytes: | ||
return zlib.compress(json.dumps(data).encode()) | ||
|
||
@staticmethod | ||
def _decompress_data(compressed_rep: bytes) -> list: | ||
return json.loads(zlib.decompress(compressed_rep).decode()) | ||
|
||
@staticmethod | ||
def _decompress_from_cursor(cursor: Iterator[dict]) -> list: | ||
my_dict = next(cursor) | ||
return SystemDBUtils._decompress_data(my_dict["data"]) | ||
|
||
@staticmethod | ||
def _find_output_or_case_raw( | ||
system_id: str, analysis_level: str, output_ids: list[int] | None | ||
|
@@ -339,8 +326,18 @@ def _find_output_or_case_raw( | |
filt=filt, | ||
) | ||
if total != 1: | ||
abort_with_error_message(400, f"Could not find a single system {system_id}") | ||
sys_data = SystemDBUtils._decompress_from_cursor(cursor) | ||
abort_with_error_message( | ||
400, f"Could not find system outputs for {system_id}" | ||
) | ||
data = next(cursor)["data"] | ||
# NOTE: (backward compatibility) data was stored in MongoDB previously, | ||
# this can be removed after we regenerate DB next time. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explaining what "this" is would make this more clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have updated the comment. |
||
if isinstance(data, bytes): | ||
sys_data_str = zlib.decompress(data).decode() | ||
else: | ||
sys_data_str = get_storage().download_and_decompress(data) | ||
sys_data: list = json.loads(sys_data_str) | ||
|
||
if output_ids is not None: | ||
sys_data = [sys_data[x] for x in output_ids] | ||
return sys_data | ||
|
@@ -435,15 +432,20 @@ def db_operations(session: ClientSession) -> str: | |
system_id = DBUtils.insert_one( | ||
DBUtils.DEV_SYSTEM_METADATA, document, session=session | ||
) | ||
# Compress the system output | ||
# Compress the system output and upload to Cloud Storage | ||
insert_list = [] | ||
sample_list = [general_to_dict(v) for v in system_output_data.samples] | ||
sample_compressed = SystemDBUtils._compress_data(sample_list) | ||
|
||
blob_name = f"{system_id}/{SystemDBUtils._SYSTEM_OUTPUT_CONST}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how the GCP file system works, but would it be OK to, for example, have 1M files in one directory? If so then this is fine, but if not then we might want to have a hierarchical structure such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't find a place in their doc where it explicitly says this but I think Cloud Storage (like S3) can basically hold any number of objects/files (S3 documentation says unlimited scalability). The only limit is that each object cannot exceed 5TB. So, I think this implementation is scalable. re creating folders: Cloud Storage uses a flat namespace. If the object name contains "/"s, they are treated as part of the name. It does support folders but it basically creates an illusion for the user so it's easier to navigate the files with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct. Cloud Storage (and some other storage services) adopts flat ("no folders") structure and all allowed characters (including Cloud storage can store basically unlimited number of files as long as the data center allows. The only concern is the increase of cost of managing the data. |
||
get_storage().compress_and_upload( | ||
blob_name, | ||
json.dumps(sample_list), | ||
) | ||
insert_list.append( | ||
{ | ||
"system_id": system_id, | ||
"analysis_level": SystemDBUtils._SYSTEM_OUTPUT_CONST, | ||
"data": sample_compressed, | ||
"data": blob_name, | ||
} | ||
) | ||
# Compress analysis cases | ||
|
@@ -454,12 +456,14 @@ def db_operations(session: ClientSession) -> str: | |
) | ||
): | ||
case_list = [general_to_dict(v) for v in analysis_cases] | ||
case_compressed = SystemDBUtils._compress_data(case_list) | ||
|
||
blob_name = f"{system_id}/{analysis_level.name}" | ||
get_storage().compress_and_upload(blob_name, json.dumps(case_list)) | ||
insert_list.append( | ||
{ | ||
"system_id": system_id, | ||
"analysis_level": analysis_level.name, | ||
"data": case_compressed, | ||
"data": blob_name, | ||
} | ||
) | ||
# Insert system output and analysis cases | ||
|
@@ -555,9 +559,21 @@ def db_operations(session: ClientSession) -> bool: | |
) | ||
if not result: | ||
abort_with_error_message(400, f"failed to delete system {system_id}") | ||
|
||
# remove system outputs | ||
output_collection = DBUtils.get_system_output_collection(system_id) | ||
filt = {"system_id": system_id} | ||
outputs, _ = DBUtils.find(output_collection, filt) | ||
data = (output["data"] for output in outputs) | ||
# NOTE: (backward compatibility) data was stored in MongoDB previously, | ||
# the isinstance filtering can be removed after we regenerate DB next time. | ||
data_blob_names = [name for name in data if isinstance(name, str)] | ||
DBUtils.delete_many(output_collection, filt, session=session) | ||
|
||
# Delete system output objects from Storage. This needs to be the last step | ||
# because we are using transaction support from MongoDB and we cannot roll | ||
# back an operation on Cloud Storage. | ||
get_storage().delete(data_blob_names) | ||
return True | ||
|
||
return DBUtils.execute_transaction(db_operations) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import logging | ||
import os | ||
from typing import Final | ||
|
||
from explainaboard_web.impl.config import ( | ||
LocalDevelopmentConfig, | ||
ProductionConfig, | ||
StagingConfig, | ||
) | ||
from flask import Flask | ||
|
||
|
||
def init(app: Flask) -> Flask: | ||
"""Initializes the flask app""" | ||
_init_config(app) | ||
_init_gcp_credentials() | ||
return app | ||
|
||
|
||
def _init_config(app: Flask): | ||
FLASK_ENV = os.getenv("FLASK_ENV") | ||
if FLASK_ENV == "production": | ||
app.config.from_object(ProductionConfig()) | ||
elif FLASK_ENV == "development": | ||
app.config.from_object(LocalDevelopmentConfig()) | ||
elif FLASK_ENV == "staging": | ||
app.config.from_object(StagingConfig()) | ||
|
||
|
||
def _init_gcp_credentials(): | ||
lyuyangh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""If the app is running in an ECS container, the GCP credentials | ||
are passed in as an environment variable. This function reads that | ||
variable, writes to a file and points the Google Cloud Storage | ||
client to the correct file to authenticate the service. | ||
""" | ||
if os.environ.get("GCP_SERVICE_CREDENTIALS"): | ||
credentials = os.environ["GCP_SERVICE_CREDENTIALS"] | ||
gcp_credentials_path: Final = "./GCP_SERVICE_CREDENTIALS.json" | ||
with open(gcp_credentials_path, "w") as f: | ||
f.write(credentials) | ||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credentials_path | ||
logging.info("GCP credentials file initialized from environment variable.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
"""A client for Cloud Storage. Cloud Storage is an object storage | ||
and it's main use is to store system outputs. | ||
""" | ||
from __future__ import annotations | ||
|
||
import zlib | ||
|
||
from flask import current_app, g | ||
from google.cloud import storage as cloud_storage | ||
|
||
|
||
class Storage: | ||
"""A Google Cloud Storage client that makes it easy to store and download | ||
objects from Cloud Storage. There's only one bucket used for each environment | ||
so this class doesn't provide a way to choose from different buckets. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self._bucket_name = current_app.config.get("STORAGE_BUCKET_NAME") | ||
self._bucket = cloud_storage.Client().bucket(self._bucket_name) | ||
|
||
def upload(self, blob_name: str, contents: str | bytes) -> None: | ||
blob = self._bucket.blob(blob_name) | ||
blob.upload_from_string(contents) | ||
|
||
def compress_and_upload(self, blob_name: str, contents: str) -> None: | ||
compressed_contents = zlib.compress(contents.encode()) | ||
self.upload(blob_name, compressed_contents) | ||
|
||
def download(self, blob_name: str) -> bytes: | ||
blob = self._bucket.blob(blob_name) | ||
return blob.download_as_bytes() | ||
|
||
def download_and_decompress(self, blob_name: str) -> str: | ||
return zlib.decompress(self.download(blob_name)).decode() | ||
|
||
def delete(self, blob_names: list[str]): | ||
lyuyangh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._bucket.delete_blobs([self._bucket.blob(name) for name in blob_names]) | ||
|
||
|
||
def get_storage() -> Storage: | ||
""" | ||
Returns the global Storage instance. A Storage object is created if not present | ||
in g. | ||
""" | ||
if "_storage" in g: | ||
return g._storage | ||
g._storage = Storage() | ||
lyuyangh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return g._storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the "developer" here mean the developers of this software?
If it means any developers, it may involve some security risks (especially granting access to GCS) and I couldn't allow them to give authorization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, developers of explainaboard_web. I don't think outside developers can actually contribute to this codebase at this point. As you mentioned, it's not secure to give them access to GCP and AWS so they'll need to build their own infrastructure (Cloud Storage, Cognito, MongoDB). If we want outside contributors, I guess we need a way for them to easily mock or set up local instances to replace these services.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, thanks!