Skip to content

Commit

Permalink
configure event logger for integration tests (#4257)
Browse files Browse the repository at this point in the history
* apply test fixes

* remove presto test
  • Loading branch information
Nathaniel May authored Nov 9, 2021
1 parent 31acb95 commit 5e6be16
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 38 deletions.
24 changes: 21 additions & 3 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import dbt.flags as flags
# TODO this will need to move eventually
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing
import io
from io import StringIO
import json
import logging
from logging import Logger
from logging.handlers import WatchedFileHandler
from logging.handlers import RotatingFileHandler
import os
from typing import List

Expand Down Expand Up @@ -58,13 +60,29 @@ def setup_event_logger(log_path):

file_passthrough_formatter = logging.Formatter(fmt=FORMAT)

# TODO log rotation is not handled by WatchedFileHandler
file_handler = WatchedFileHandler(filename=log_dest, encoding='utf8')
file_handler = RotatingFileHandler(filename=log_dest, encoding='utf8')
file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
this.FILE_LOG.addHandler(file_handler)


# used for integration tests
def capture_stdout_logs() -> StringIO:
capture_buf = io.StringIO()
stdout_capture_handler = logging.StreamHandler(capture_buf)
stdout_handler.setLevel(logging.DEBUG)
this.STDOUT_LOG.addHandler(stdout_capture_handler)
return capture_buf


# used for integration tests
def stop_capture_stdout_logs() -> None:
this.STDOUT_LOG.handlers = [
h for h in this.STDOUT_LOG.handlers
if not (hasattr(h, 'stream') and isinstance(h.stream, StringIO)) # type: ignore
]


def env_secrets() -> List[str]:
return [
v for k, v in os.environ.items()
Expand Down
24 changes: 6 additions & 18 deletions test/integration/001_simple_copy_test/test_simple_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,6 @@ def test__postgres__dbt_doesnt_run_empty_models(self):
self.assertFalse("empty" in models.keys())
self.assertFalse("disabled" in models.keys())

@use_profile("presto")
def test__presto__simple_copy(self):
self.use_default_project({"seed-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 7)
for result in results:
if 'incremental' in result.node.name:
self.assertIn('not implemented for presto', result.message)

self.assertManyTablesEqual(["seed", "view_model", "materialized"])


class TestShouting(BaseTestSimpleCopy):
@property
Expand Down Expand Up @@ -196,18 +182,20 @@ def seed_get_json(self, expect_pass=True):
except ValueError:
continue

if log['extra'].get('run_state') != 'internal':
continue
# TODO structured logging does not put out run_state yet
# if log['extra'].get('run_state') != 'internal':
# continue
logs.append(log)

self.assertGreater(len(logs), 0)
# empty lists evaluate as False
self.assertTrue(logs)
return logs

@use_profile('postgres')
def test_postgres_no_create_schemas(self):
logs = self.seed_get_json()
for log in logs:
msg = log['message']
msg = log['msg']
self.assertFalse(
'create schema if not exists' in msg,
f'did not expect schema creation: {msg}'
Expand Down
21 changes: 11 additions & 10 deletions test/integration/051_query_comments_test/test_query_comments.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,28 @@ def tearDown(self):
super().tearDown()

def run_get_json(self, expect_pass=True):
self.run_dbt(
res, raw_logs = self.run_dbt_and_capture(
['--debug', '--log-format=json', 'run'],
expect_pass=expect_pass
)
logs = []
for line in self.stringbuf.getvalue().split('\n'):

parsed_logs = []
for line in raw_logs.split('\n'):
try:
log = json.loads(line)
except ValueError:
continue

if log['extra'].get('run_state') != 'running':
continue
logs.append(log)
self.assertGreater(len(logs), 0)
return logs
parsed_logs.append(log)

# empty lists evaluate as False
self.assertTrue(parsed_logs)
return parsed_logs

def query_comment(self, model_name, log):
# N.B: a temporary string replacement regex to strip the HH:MM:SS from the log line if present.
# TODO: make this go away when structured logging is stable
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['message'])
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg'])
prefix = 'On {}: '.format(model_name)
if log_msg.startswith(prefix):
msg = log_msg[len(prefix):]
Expand All @@ -91,7 +92,7 @@ def run_assert_comments(self):
if msg is not None and self.matches_comment(msg):
seen = True

self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['message'] for l in logs)))
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs)))

@use_profile('postgres')
def test_postgres_comments(self):
Expand Down
13 changes: 6 additions & 7 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from dbt.config import RuntimeConfig
from dbt.context import providers
from dbt.logger import log_manager
from dbt.events.functions import fire_event
from dbt.events.functions import (
capture_stdout_logs, fire_event, setup_event_logger, stop_capture_stdout_logs
)
from dbt.events.test_types import (
IntegrationTestInfo,
IntegrationTestDebug,
Expand Down Expand Up @@ -314,6 +316,7 @@ def setUp(self):
os.chdir(self.initial_dir)
# before we go anywhere, collect the initial path info
self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix)
setup_event_logger(self._logs_dir)
_really_makedirs(self._logs_dir)
self.test_original_source_path = _pytest_get_test_root()
self.test_root_dir = self._generate_test_root_dir()
Expand Down Expand Up @@ -524,16 +527,12 @@ def run_dbt(self, args=None, expect_pass=True, profiles_dir=True):

def run_dbt_and_capture(self, *args, **kwargs):
try:
initial_stdout = log_manager.stdout
initial_stderr = log_manager.stderr
stringbuf = io.StringIO()
log_manager.set_output_stream(stringbuf)

stringbuf = capture_stdout_logs()
res = self.run_dbt(*args, **kwargs)
stdout = stringbuf.getvalue()

finally:
log_manager.set_output_stream(initial_stdout, initial_stderr)
stop_capture_stdout_logs()

return res, stdout

Expand Down

0 comments on commit 5e6be16

Please sign in to comment.