Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move system outputs and analysis cases to Cloud Storage #335

Merged
merged 5 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This repository includes code for frontend and backend of the ExplainaBoard web

[Schema Design](https://docs.google.com/document/d/1my-zuIYosrXuoqOk1SvqZDsC2tdMgs_A7HTAtPYXKLI/edit?usp=sharing)

## Quick Start
## Quick Start for Developers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the "developer" here mean the developers of this software?
If it means any developers, it may involve some security risks (especially granting access to GCS) and I couldn't allow them to give authorization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, developers of explainaboard_web. I don't think outside developers can actually contribute to this codebase at this point. As you mentioned, it's not secure to give them access to GCP and AWS so they'll need to build their own infrastructure (Cloud Storage, Cognito, MongoDB). If we want outside contributors, I guess we need a way for them to easily mock or set up local instances to replace these services.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks!


> This step-by-step guide assumes a Linux environment. A MacOS environment will likely work similarly. For Windows users, the easiest way is to
> use a subsystem, for example [WSL](https://docs.microsoft.com/en-us/windows/wsl/about) (pre-installed since Windows 10).
Expand Down Expand Up @@ -43,6 +43,9 @@ This repository includes code for frontend and backend of the ExplainaBoard web
- Official documents of connexion says `3.6` but tested on `3.9.7` seems to work fine.
2. `pip install -r backend/src/gen/requirements.txt`
3. Create `backend/src/impl/.env` to store all environment variables. An example has been provided in `.env.example`. Contact the dev team to get the credentials for dev and prod environments.
4. Set up a GCP account and authenticate locally:
- Contact the dev team to setup a GCP account with access to the dev bucket of Cloud Storage.
- Install gcloud and then run `gcloud auth login` to login to the user account locally. (for more information, see this [guide](https://cloud.google.com/sdk/docs/authorizing#run_gcloud_auth_login))
6. Install pre-commit hooks
- Run `npm run prepare` to install the pre-commit hook via husky. The hook auto-checks both frontend and backend code before commits. Please do not skip it.
7. Launch explainaboard
Expand All @@ -59,8 +62,9 @@ This repository includes code for frontend and backend of the ExplainaBoard web
- We use docker and gunicorn to deploy both frontend and backend. Frontend is built and copied into the static file folder of Flask. Please see Dockerfile for details.
- To build: `docker build --pull --rm -f "Dockerfile" -t explainaboard-web:0.2.0 "."`
- To run: `docker run --rm -p 5000:5000/tcp explainaboard-web:0.2.0`
- The frontend is served with the flask server at the root url so 5000 is the used to access the UI here.
- connexion is used by swagger/openapi code generation tool and it does not support gunicorn natively. So, currently we use flask server in production. Another option that connexion supports natively is tornado.
- GCP:
- For local development, developers should use their own user accounts to authenticate (please refer to quick start for details)
- For staging and production environments, the service account credentials are passed into the containers as environment variables. The credentials are stored on AWS Secrets Manager.

## More details on frontend and backend
1. Frontend:
Expand Down
6 changes: 5 additions & 1 deletion backend/src/impl/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ USER_POOL_AUDIENCE_DEV=
AWS_DEFAULT_REGION=

AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=
AWS_ACCESS_KEY_ID=

GCP_SERVICE_CREDENTIALS= # used for staging and prod environments only (ECS), not intended to local
STORAGE_BUCKET_NAME=
GOOGLE_CLOUD_PROJECT=
2 changes: 2 additions & 0 deletions backend/src/impl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(self) -> None:
self.USER_POOL_AUDIENCE = os.environ["USER_POOL_AUDIENCE_DEV"]
self.AUTH_URL = f"https://explainaboard-dev-user.auth.{self.AWS_DEFAULT_REGION}.amazoncognito.com/oauth2/authorize?client_id={self.USER_POOL_AUDIENCE}&response_type=token&scope=email+openid+phone&redirect_uri=" # noqa

self.STORAGE_BUCKET_NAME = os.environ["STORAGE_BUCKET_NAME"]


class StagingConfig(LocalDevelopmentConfig):
"""Used for an online staging/test environment. It has exactly the same
Expand Down
58 changes: 37 additions & 21 deletions backend/src/impl/db_utils/system_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import re
import traceback
import zlib
from collections.abc import Iterator
from datetime import datetime
from inspect import getsource
from types import FunctionType
Expand All @@ -18,6 +17,7 @@
from explainaboard_web.impl.auth import get_user
from explainaboard_web.impl.db_utils.dataset_db_utils import DatasetDBUtils
from explainaboard_web.impl.db_utils.db_utils import DBUtils
from explainaboard_web.impl.storage import get_storage
from explainaboard_web.impl.utils import (
abort_with_error_message,
binarize_bson,
Expand Down Expand Up @@ -312,19 +312,6 @@ def _process(
metadata=processor_metadata, sys_output=system_output_data.samples
)

@staticmethod
def _compress_data(data: list) -> bytes:
return zlib.compress(json.dumps(data).encode())

@staticmethod
def _decompress_data(compressed_rep: bytes) -> list:
return json.loads(zlib.decompress(compressed_rep).decode())

@staticmethod
def _decompress_from_cursor(cursor: Iterator[dict]) -> list:
my_dict = next(cursor)
return SystemDBUtils._decompress_data(my_dict["data"])

@staticmethod
def _find_output_or_case_raw(
system_id: str, analysis_level: str, output_ids: list[int] | None
Expand All @@ -339,8 +326,18 @@ def _find_output_or_case_raw(
filt=filt,
)
if total != 1:
abort_with_error_message(400, f"Could not find a single system {system_id}")
sys_data = SystemDBUtils._decompress_from_cursor(cursor)
abort_with_error_message(
400, f"Could not find system outputs for {system_id}"
)
data = next(cursor)["data"]
# NOTE: (backward compatibility) data was stored in MongoDB previously,
# this can be removed after we regenerate DB next time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explaining what "this" is would make this more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the comment.

if isinstance(data, bytes):
sys_data_str = zlib.decompress(data).decode()
else:
sys_data_str = get_storage().download_and_decompress(data)
sys_data: list = json.loads(sys_data_str)

if output_ids is not None:
sys_data = [sys_data[x] for x in output_ids]
return sys_data
Expand Down Expand Up @@ -435,15 +432,20 @@ def db_operations(session: ClientSession) -> str:
system_id = DBUtils.insert_one(
DBUtils.DEV_SYSTEM_METADATA, document, session=session
)
# Compress the system output
# Compress the system output and upload to Cloud Storage
insert_list = []
sample_list = [general_to_dict(v) for v in system_output_data.samples]
sample_compressed = SystemDBUtils._compress_data(sample_list)

blob_name = f"{system_id}/{SystemDBUtils._SYSTEM_OUTPUT_CONST}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how the GCP file system works, but would it be OK to, for example, have 1M files in one directory? If so then this is fine, but if not then we might want to have a hierarchical structure such as f"{system_id[0:3]}/{system_id}/{SystemDBUtils._SYSTEM_OUTPUT_CONST}"

Copy link
Member Author

@lyuyangh lyuyangh Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a place in their doc where it explicitly says this but I think Cloud Storage (like S3) can basically hold any number of objects/files (S3 documentation says unlimited scalability). The only limit is that each object cannot exceed 5TB. So, I think this implementation is scalable.

re creating folders: Cloud Storage uses a flat namespace. If the object name contains "/"s, they are treated as part of the name. It does support folders but it basically creates an illusion for the user so it's easier to navigate the files with gsutil or on the web UI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud Storage uses a flat namespace.

Correct. Cloud Storage (and some other storage services) adopts flat ("no folders") structure and all allowed characters (including /) are treated as a part of file name.

Cloud storage can store basically unlimited number of files as long as the data center allows. The only concern is the increase of cost of managing the data.

get_storage().compress_and_upload(
blob_name,
json.dumps(sample_list),
)
insert_list.append(
{
"system_id": system_id,
"analysis_level": SystemDBUtils._SYSTEM_OUTPUT_CONST,
"data": sample_compressed,
"data": blob_name,
}
)
# Compress analysis cases
Expand All @@ -454,12 +456,14 @@ def db_operations(session: ClientSession) -> str:
)
):
case_list = [general_to_dict(v) for v in analysis_cases]
case_compressed = SystemDBUtils._compress_data(case_list)

blob_name = f"{system_id}/{analysis_level.name}"
get_storage().compress_and_upload(blob_name, json.dumps(case_list))
insert_list.append(
{
"system_id": system_id,
"analysis_level": analysis_level.name,
"data": case_compressed,
"data": blob_name,
}
)
# Insert system output and analysis cases
Expand Down Expand Up @@ -555,9 +559,21 @@ def db_operations(session: ClientSession) -> bool:
)
if not result:
abort_with_error_message(400, f"failed to delete system {system_id}")

# remove system outputs
output_collection = DBUtils.get_system_output_collection(system_id)
filt = {"system_id": system_id}
outputs, _ = DBUtils.find(output_collection, filt)
data = (output["data"] for output in outputs)
# NOTE: (backward compatibility) data was stored in MongoDB previously,
# the isinstance filtering can be removed after we regenerate DB next time.
data_blob_names = [name for name in data if isinstance(name, str)]
DBUtils.delete_many(output_collection, filt, session=session)

# Delete system output objects from Storage. This needs to be the last step
# because we are using transaction support from MongoDB and we cannot roll
# back an operation on Cloud Storage.
get_storage().delete(data_blob_names)
return True

return DBUtils.execute_transaction(db_operations)
42 changes: 42 additions & 0 deletions backend/src/impl/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import os
from typing import Final

from explainaboard_web.impl.config import (
LocalDevelopmentConfig,
ProductionConfig,
StagingConfig,
)
from flask import Flask


def init(app: Flask) -> Flask:
"""Initializes the flask app"""
_init_config(app)
_init_gcp_credentials()
return app


def _init_config(app: Flask):
FLASK_ENV = os.getenv("FLASK_ENV")
if FLASK_ENV == "production":
app.config.from_object(ProductionConfig())
elif FLASK_ENV == "development":
app.config.from_object(LocalDevelopmentConfig())
elif FLASK_ENV == "staging":
app.config.from_object(StagingConfig())


def _init_gcp_credentials():
lyuyangh marked this conversation as resolved.
Show resolved Hide resolved
"""If the app is running in an ECS container, the GCP credentials
are passed in as an environment variable. This function reads that
variable, writes to a file and points the Google Cloud Storage
client to the correct file to authenticate the service.
"""
if os.environ.get("GCP_SERVICE_CREDENTIALS"):
credentials = os.environ["GCP_SERVICE_CREDENTIALS"]
gcp_credentials_path: Final = "./GCP_SERVICE_CREDENTIALS.json"
with open(gcp_credentials_path, "w") as f:
f.write(credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_credentials_path
logging.info("GCP credentials file initialized from environment variable.")
49 changes: 49 additions & 0 deletions backend/src/impl/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""A client for Cloud Storage. Cloud Storage is an object storage
and it's main use is to store system outputs.
"""
from __future__ import annotations

import zlib

from flask import current_app, g
from google.cloud import storage as cloud_storage


class Storage:
"""A Google Cloud Storage client that makes it easy to store and download
objects from Cloud Storage. There's only one bucket used for each environment
so this class doesn't provide a way to choose from different buckets.
"""

def __init__(self) -> None:
self._bucket_name = current_app.config.get("STORAGE_BUCKET_NAME")
self._bucket = cloud_storage.Client().bucket(self._bucket_name)

def upload(self, blob_name: str, contents: str | bytes) -> None:
blob = self._bucket.blob(blob_name)
blob.upload_from_string(contents)

def compress_and_upload(self, blob_name: str, contents: str) -> None:
compressed_contents = zlib.compress(contents.encode())
self.upload(blob_name, compressed_contents)

def download(self, blob_name: str) -> bytes:
blob = self._bucket.blob(blob_name)
return blob.download_as_bytes()

def download_and_decompress(self, blob_name: str) -> str:
return zlib.decompress(self.download(blob_name)).decode()

def delete(self, blob_names: list[str]):
lyuyangh marked this conversation as resolved.
Show resolved Hide resolved
self._bucket.delete_blobs([self._bucket.blob(name) for name in blob_names])


def get_storage() -> Storage:
"""
Returns the global Storage instance. A Storage object is created if not present
in g.
"""
if "_storage" in g:
return g._storage
g._storage = Storage()
lyuyangh marked this conversation as resolved.
Show resolved Hide resolved
return g._storage
12 changes: 2 additions & 10 deletions backend/templates/__main__.mustache
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import connexion
import os
from explainaboard_web.impl.config import ProductionConfig, StagingConfig, LocalDevelopmentConfig
from explainaboard_web.impl.init import init
from {{packageName}} import encoder

def create_app():
Expand All @@ -9,14 +8,7 @@ def create_app():
app.add_api('swagger.yaml', arguments={'title': '{{appName}}'},
pythonic_params=True, validate_responses=True)

FLASK_ENV = os.getenv('FLASK_ENV')
if FLASK_ENV == 'production':
app.app.config.from_object(ProductionConfig())
elif FLASK_ENV == 'development':
app.app.config.from_object(LocalDevelopmentConfig())
elif FLASK_ENV == 'staging':
app.app.config.from_object(StagingConfig())

app.app = init(app.app)
return app


Expand Down
5 changes: 3 additions & 2 deletions backend/templates/requirements.mustache
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apache-beam>=2.38.0
boto3
boto3==1.24.68
connexion >= 2.10.0
python_dateutil >= 2.6.0
setuptools >= 21.0.0
Expand All @@ -15,4 +15,5 @@ marisa_trie
sacrebleu[ja]
six
pandas
iso-639
iso-639
google-cloud-storage==2.5.0