diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 5cb9a832de5..5d83c5cf735 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -5,10 +5,12 @@ import dbt.flags as flags # TODO this will need to move eventually from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing +import io +from io import StringIO import json import logging from logging import Logger -from logging.handlers import WatchedFileHandler +from logging.handlers import RotatingFileHandler import os from typing import List @@ -58,13 +60,29 @@ def setup_event_logger(log_path): file_passthrough_formatter = logging.Formatter(fmt=FORMAT) - # TODO log rotation is not handled by WatchedFileHandler - file_handler = WatchedFileHandler(filename=log_dest, encoding='utf8') + file_handler = RotatingFileHandler(filename=log_dest, encoding='utf8') file_handler.setFormatter(file_passthrough_formatter) file_handler.setLevel(logging.DEBUG) # always debug regardless of user input this.FILE_LOG.addHandler(file_handler) +# used for integration tests +def capture_stdout_logs() -> StringIO: + capture_buf = io.StringIO() + stdout_capture_handler = logging.StreamHandler(capture_buf) + stdout_handler.setLevel(logging.DEBUG) + this.STDOUT_LOG.addHandler(stdout_capture_handler) + return capture_buf + + +# used for integration tests +def stop_capture_stdout_logs() -> None: + this.STDOUT_LOG.handlers = [ + h for h in this.STDOUT_LOG.handlers + if not (hasattr(h, 'stream') and isinstance(h.stream, StringIO)) # type: ignore + ] + + def env_secrets() -> List[str]: return [ v for k, v in os.environ.items() diff --git a/test/integration/001_simple_copy_test/test_simple_copy.py b/test/integration/001_simple_copy_test/test_simple_copy.py index 4fd1d60daf1..bb02f794463 100644 --- a/test/integration/001_simple_copy_test/test_simple_copy.py +++ b/test/integration/001_simple_copy_test/test_simple_copy.py @@ -91,20 +91,6 @@ def test__postgres__dbt_doesnt_run_empty_models(self): self.assertFalse("empty" in models.keys()) self.assertFalse("disabled" in models.keys()) - @use_profile("presto") - def test__presto__simple_copy(self): - self.use_default_project({"seed-paths": [self.dir("seed-initial")]}) - - results = self.run_dbt(["seed"]) - self.assertEqual(len(results), 1) - results = self.run_dbt(expect_pass=False) - self.assertEqual(len(results), 7) - for result in results: - if 'incremental' in result.node.name: - self.assertIn('not implemented for presto', result.message) - - self.assertManyTablesEqual(["seed", "view_model", "materialized"]) - class TestShouting(BaseTestSimpleCopy): @property @@ -196,18 +182,20 @@ def seed_get_json(self, expect_pass=True): except ValueError: continue - if log['extra'].get('run_state') != 'internal': - continue + # TODO structured logging does not put out run_state yet + # if log['extra'].get('run_state') != 'internal': + # continue logs.append(log) - self.assertGreater(len(logs), 0) + # empty lists evaluate as False + self.assertTrue(logs) return logs @use_profile('postgres') def test_postgres_no_create_schemas(self): logs = self.seed_get_json() for log in logs: - msg = log['message'] + msg = log['msg'] self.assertFalse( 'create schema if not exists' in msg, f'did not expect schema creation: {msg}' diff --git a/test/integration/051_query_comments_test/test_query_comments.py b/test/integration/051_query_comments_test/test_query_comments.py index 4ada01aa991..aa67b80b9e6 100644 --- a/test/integration/051_query_comments_test/test_query_comments.py +++ b/test/integration/051_query_comments_test/test_query_comments.py @@ -53,27 +53,28 @@ def tearDown(self): super().tearDown() def run_get_json(self, expect_pass=True): - self.run_dbt( + res, raw_logs = self.run_dbt_and_capture( ['--debug', '--log-format=json', 'run'], expect_pass=expect_pass ) - logs = [] - for line in self.stringbuf.getvalue().split('\n'): + + parsed_logs = [] + for line in raw_logs.split('\n'): try: log = json.loads(line) except ValueError: continue - if log['extra'].get('run_state') != 'running': - continue - logs.append(log) - self.assertGreater(len(logs), 0) - return logs + parsed_logs.append(log) + + # empty lists evaluate as False + self.assertTrue(parsed_logs) + return parsed_logs def query_comment(self, model_name, log): # N.B: a temporary string replacement regex to strip the HH:MM:SS from the log line if present. # TODO: make this go away when structured logging is stable - log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['message']) + log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg']) prefix = 'On {}: '.format(model_name) if log_msg.startswith(prefix): msg = log_msg[len(prefix):] @@ -91,7 +92,7 @@ def run_assert_comments(self): if msg is not None and self.matches_comment(msg): seen = True - self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['message'] for l in logs))) + self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs))) @use_profile('postgres') def test_postgres_comments(self): diff --git a/test/integration/base.py b/test/integration/base.py index 01eb75130a4..29302897d54 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -24,7 +24,9 @@ from dbt.config import RuntimeConfig from dbt.context import providers from dbt.logger import log_manager -from dbt.events.functions import fire_event +from dbt.events.functions import ( + capture_stdout_logs, fire_event, setup_event_logger, stop_capture_stdout_logs +) from dbt.events.test_types import ( IntegrationTestInfo, IntegrationTestDebug, @@ -314,6 +316,7 @@ def setUp(self): os.chdir(self.initial_dir) # before we go anywhere, collect the initial path info self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix) + setup_event_logger(self._logs_dir) _really_makedirs(self._logs_dir) self.test_original_source_path = _pytest_get_test_root() self.test_root_dir = self._generate_test_root_dir() @@ -524,16 +527,12 @@ def run_dbt(self, args=None, expect_pass=True, profiles_dir=True): def run_dbt_and_capture(self, *args, **kwargs): try: - initial_stdout = log_manager.stdout - initial_stderr = log_manager.stderr - stringbuf = io.StringIO() - log_manager.set_output_stream(stringbuf) - + stringbuf = capture_stdout_logs() res = self.run_dbt(*args, **kwargs) stdout = stringbuf.getvalue() finally: - log_manager.set_output_stream(initial_stdout, initial_stderr) + stop_capture_stdout_logs() return res, stdout