Skip to content

Commit

Permalink
[MLOP-639] Track logs in S3 (#306)
Browse files Browse the repository at this point in the history
* Apply tracking logs and logging config.

* Adjusts in CLI and logging.conf.

* Some adjusts.

* Change version to generate new dev package

* Fix version.

* Apply style.

* Add new assert in the migrate unit test.
  • Loading branch information
moromimay authored Apr 1, 2021
1 parent 3dcd975 commit 8077d86
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 61 deletions.
7 changes: 7 additions & 0 deletions butterfree/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
"""Module docstring example, following Google's docstring style."""
import logging.config
import os
import sys

sys.path.insert(0, os.path.abspath("."))

logging.config.fileConfig(fname="butterfree/logging.conf")
10 changes: 0 additions & 10 deletions butterfree/_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +0,0 @@
import logging


def __logger(name: str) -> logging.Logger:
format_ = "%(name)s:%(asctime)-15s:%(levelname)s:< %(message)s >"
logging.basicConfig(format=format_, level=logging.INFO)
return logging.getLogger(name)


cli_logger = __logger("butterfree")
2 changes: 1 addition & 1 deletion butterfree/_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from butterfree._cli import migrate

app = typer.Typer()
app.add_typer(migrate.app)
app.add_typer(migrate.app, name="migrate")

if __name__ == "__main__":
app()
56 changes: 43 additions & 13 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import importlib
import inspect
import logging
import os
import pkgutil
import sys
from typing import Set

import setuptools
import typer

from butterfree._cli import cli_logger
from butterfree.clients import SparkClient
from butterfree.configs import environment
from butterfree.extract.readers import FileReader
from butterfree.migrations.database_migration import ALLOWED_DATABASE
from butterfree.pipelines import FeatureSetPipeline

app = typer.Typer()
app = typer.Typer(help="Apply the automatic migrations in a database.")

logger = logging.getLogger("migrate")


def __find_modules(path: str) -> Set[str]:
Expand All @@ -33,18 +39,18 @@ def __find_modules(path: str) -> Set[str]:


def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
cli_logger.info(f"Looking for python modules under {path}...")
logger.info(f"Looking for python modules under {path}...")
modules = __find_modules(path)
if not modules:
return set()

cli_logger.info(f"Importing modules...")
logger.info(f"Importing modules...")
package = ".".join(path.strip("/").split("/"))
imported = set(
importlib.import_module(f".{name}", package=package) for name in modules
)

cli_logger.info(f"Scanning modules...")
logger.info(f"Scanning modules...")
content = {
module: set(
filter(
Expand Down Expand Up @@ -80,14 +86,18 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:

instances.add(value)

cli_logger.info("Creating instances...")
logger.info("Creating instances...")
return set(value() for value in instances)


PATH = typer.Argument(
..., help="Full or relative path to where feature set pipelines are being defined.",
)

GENERATE_LOGS = typer.Option(
False, help="To generate the logs in local file 'logging.json'."
)


class Migrate:
"""Execute migration operations in a Database based on pipeline Writer.
Expand All @@ -96,23 +106,43 @@ class Migrate:
pipelines: list of Feature Set Pipelines to use to migration.
"""

def __init__(self, pipelines: Set[FeatureSetPipeline]) -> None:
def __init__(
self, pipelines: Set[FeatureSetPipeline], spark_client: SparkClient = None
) -> None:
self.pipelines = pipelines
self.spark_client = spark_client or SparkClient()

def _send_logs_to_s3(self) -> None:
def _send_logs_to_s3(self, file_local: bool) -> None:
"""Send all migration logs to S3."""
pass
file_reader = FileReader(id="name", path="logs/logging.json", format="json")
df = file_reader.consume(self.spark_client)

path = environment.get_variable("FEATURE_STORE_S3_BUCKET")

def run(self) -> None:
self.spark_client.write_dataframe(
dataframe=df,
format_="json",
mode="append",
**{"path": f"s3a://{path}/logging"},
)

if not file_local:
os.rmdir("logs/logging.json")

def run(self, generate_logs: bool) -> None:
"""Construct and apply the migrations."""
for pipeline in self.pipelines:
for writer in pipeline.sink.writers:
migration = ALLOWED_DATABASE[writer.db_config.database]
migration.apply_migration(pipeline.feature_set, writer)

self._send_logs_to_s3(generate_logs)


@app.callback()
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
@app.command("apply")
def migrate(
path: str = PATH, generate_logs: bool = GENERATE_LOGS,
) -> Set[FeatureSetPipeline]:
"""Scan and run database migrations for feature set pipelines defined under PATH.
Butterfree will scan a given path for classes that inherit from its
Expand All @@ -124,5 +154,5 @@ def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
import and instantiate them.
"""
pipe_set = __fs_objects(path)
Migrate(pipe_set).run()
Migrate(pipe_set).run(generate_logs)
return pipe_set
27 changes: 26 additions & 1 deletion butterfree/configs/db/metastore_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,29 @@ def get_path_with_partitions(self, key: str, dataframe: DataFrame) -> List:

def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Translate feature set spark schema to the corresponding database."""
return schema
spark_sql_mapping = {
"TimestampType": "TIMESTAMP",
"BinaryType": "BINARY",
"BooleanType": "BOOLEAN",
"DateType": "DATE",
"DecimalType": "DECIMAL",
"DoubleType": "DOUBLE",
"FloatType": "FLOAT",
"IntegerType": "INT",
"LongType": "BIGINT",
"StringType": "STRING",
"ArrayType(LongType,true)": "ARRAY<BIGINT>",
"ArrayType(StringType,true)": "ARRAY<STRING>",
"ArrayType(FloatType,true)": "ARRAY<FLOAT>",
}
sql_schema = []
for features in schema:
sql_schema.append(
{
"column_name": features["column_name"],
"type": spark_sql_mapping[str(features["type"])],
"primary_key": features["primary_key"],
}
)

return sql_schema
29 changes: 15 additions & 14 deletions butterfree/constants/data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@
class DataType(Enum):
"""Holds constants for data types within Butterfree."""

TIMESTAMP = (TimestampType(), "timestamp")
BINARY = (BinaryType(), "boolean")
BOOLEAN = (BooleanType(), "boolean")
DATE = (DateType(), "timestamp")
DECIMAL = (DecimalType(), "decimal")
DOUBLE = (DoubleType(), "double")
FLOAT = (FloatType(), "float")
INTEGER = (IntegerType(), "int")
BIGINT = (LongType(), "bigint")
STRING = (StringType(), "text")
ARRAY_BIGINT = (ArrayType(LongType()), "frozen<list<bigint>>")
ARRAY_STRING = (ArrayType(StringType()), "frozen<list<text>>")
ARRAY_FLOAT = (ArrayType(FloatType()), "frozen<list<float>>")
TIMESTAMP = (TimestampType(), "timestamp", "TIMESTAMP")
BINARY = (BinaryType(), "boolean", "BINARY")
BOOLEAN = (BooleanType(), "boolean", "BOOLEAN")
DATE = (DateType(), "timestamp", "DATE")
DECIMAL = (DecimalType(), "decimal", "DECIMAL")
DOUBLE = (DoubleType(), "double", "DOUBLE")
FLOAT = (FloatType(), "float", "FLOAT")
INTEGER = (IntegerType(), "int", "INT")
BIGINT = (LongType(), "bigint", "BIGINT")
STRING = (StringType(), "text", "STRING")
ARRAY_BIGINT = (ArrayType(LongType()), "frozen<list<bigint>>", "ARRAY<BIGINT>")
ARRAY_STRING = (ArrayType(StringType()), "frozen<list<text>>", "ARRAY<STRING>")
ARRAY_FLOAT = (ArrayType(FloatType()), "frozen<list<float>>", "ARRAY<FLOAT>")

def __init__(self, spark: PySparkDataType, cassandra: str) -> None:
def __init__(self, spark: PySparkDataType, cassandra: str, spark_sql: str) -> None:
self.spark = spark
self.cassandra = cassandra
self.spark_sql = spark_sql
52 changes: 52 additions & 0 deletions butterfree/logging.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[loggers]
keys=root,cli,migrate,database_migrate

[handlers]
keys=consoleHandler,file

[formatters]
keys=simpleFormatter,jsonFormatter

[logger_root]
level=DEBUG
handlers=consoleHandler

[logger_cli]
level=DEBUG
handlers=file
qualname=cli
propagate=0

[logger_migrate]
level=DEBUG
handlers=file
qualname=migrate
propagate=0

[logger_database_migrate]
level=DEBUG
handlers=file
qualname=database_migrate
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)

[handler_file]
class=FileHandler
level=DEBUG
formatter=jsonFormatter
args=('logs/logging.json', "a")

[formatter_simpleFormatter]
format=%(name)s:%(asctime)-15s:%(levelname)s:%(message)s
datefmt=
class=logging.Formatter

[formatter_jsonFormatter]
format={"name": "%(name)s", "timestamp": "%(asctime)-15s", "level": "%(levelname)s", "message": "%(message)s"}
datefmt=
class=logging.Formatter
10 changes: 6 additions & 4 deletions butterfree/migrations/database_migration/database_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from butterfree.load.writers.writer import Writer
from butterfree.transform import FeatureSet

logger = logging.getLogger("database_migrate")


@dataclass
class Diff:
Expand Down Expand Up @@ -154,7 +156,7 @@ def _get_queries(
)
queries.append(alter_column_types_query)
if alter_key_items:
logging.info("This operation is not supported by Spark.")
logger.info("This operation is not supported by Spark.")

return queries

Expand Down Expand Up @@ -261,7 +263,7 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
feature_set: the feature set.
writer: the writer being used to load the feature set.
"""
logging.info(f"Migrating feature set: {feature_set.name}")
logger.info(f"Migrating feature set: {feature_set.name}")

table_name = (
feature_set.name if not writer.write_to_entity else feature_set.entity
Expand All @@ -275,7 +277,7 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
)

for q in queries:
logging.info(f"Applying {q}...")
logger.info(f"Applying this query: {q} ...")
self._client.sql(q)

logging.info(f"Feature Set migration finished successfully.")
logger.info(f"Feature Set migration finished successfully.")
Empty file added logs/logging.json
Empty file.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pandas>=0.24,<1.1
parameters-validation>=1.1.5,<2.0
pyspark==3.*
typer>=0.3,<0.4
setuptools>=41,<42
setuptools>=41,<42
typing-extensions==3.7.4.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev5"
__version__ = "1.2.0.dev6"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
41 changes: 26 additions & 15 deletions tests/unit/butterfree/_cli/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,33 @@
from butterfree.pipelines import FeatureSetPipeline


def test_migrate_success(mocker):
mocker.patch.object(migrate.Migrate, "run")
all_fs = migrate.migrate("tests/mocks/entities/")
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]
class TestMigrate:
def test_migrate_success(self, mocker):
mocker.patch.object(migrate.Migrate, "run")
all_fs = migrate.migrate("tests/mocks/entities/")
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]

def test_migrate_all_pairs(self, mocker):
mocker.patch.object(MetastoreMigration, "apply_migration")
mocker.patch.object(CassandraMigration, "apply_migration")
mocker.patch.object(migrate.Migrate, "_send_logs_to_s3")

def test_migrate_all_pairs(mocker):
mocker.patch.object(MetastoreMigration, "apply_migration")
mocker.patch.object(CassandraMigration, "apply_migration")
all_fs = migrate.migrate("tests/mocks/entities/")
all_fs = migrate.migrate("tests/mocks/entities/")

assert MetastoreMigration.apply_migration.call_count == 2
assert CassandraMigration.apply_migration.call_count == 2
assert MetastoreMigration.apply_migration.call_count == 2
assert CassandraMigration.apply_migration.call_count == 2

metastore_pairs = [call(pipe.feature_set, pipe.sink.writers[0]) for pipe in all_fs]
cassandra_pairs = [call(pipe.feature_set, pipe.sink.writers[1]) for pipe in all_fs]
MetastoreMigration.apply_migration.assert_has_calls(metastore_pairs, any_order=True)
CassandraMigration.apply_migration.assert_has_calls(cassandra_pairs, any_order=True)
metastore_pairs = [
call(pipe.feature_set, pipe.sink.writers[0]) for pipe in all_fs
]
cassandra_pairs = [
call(pipe.feature_set, pipe.sink.writers[1]) for pipe in all_fs
]
MetastoreMigration.apply_migration.assert_has_calls(
metastore_pairs, any_order=True
)
CassandraMigration.apply_migration.assert_has_calls(
cassandra_pairs, any_order=True
)
migrate.Migrate._send_logs_to_s3.assert_called_once()
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pyspark.sql.types import DoubleType, FloatType, LongType, TimestampType

from butterfree.load.writers import HistoricalFeatureStoreWriter
from butterfree.migrations.database_migration import CassandraMigration, Diff


Expand Down Expand Up @@ -61,7 +62,7 @@ def test_apply_migration(self, feature_set, mocker):
m.apply_migration = mocker.stub("apply_migration")

# when
m.apply_migration(feature_set)
m.apply_migration(feature_set, HistoricalFeatureStoreWriter())

# then
m.apply_migration.assert_called_once()

0 comments on commit 8077d86

Please sign in to comment.