Skip to content

Commit

Permalink
feat: ignore incorrect version of snowplow_tracker
Browse files Browse the repository at this point in the history
Closes issue meltano#8256

It may be a temporary workaround -
it may be removed once dbt-labs/dbt-core#8680 is merged.
  • Loading branch information
jaceksan committed Jan 30, 2024
1 parent 8d86e65 commit dc65ff9
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/meltano/core/tracking/contexts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from enum import Enum, auto

import click
from snowplow_tracker import SelfDescribingJson

from meltano.core.tracking.schemas import CliContextSchema
from meltano.core.tracking.tracker import SelfDescribingJson
from meltano.core.utils import hash_sha256


Expand Down
2 changes: 1 addition & 1 deletion src/meltano/core/tracking/contexts/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from warnings import warn

import psutil
from snowplow_tracker import SelfDescribingJson
from structlog.stdlib import get_logger

import meltano
from meltano.core.tracking.schemas import EnvironmentContextSchema
from meltano.core.tracking.tracker import SelfDescribingJson
from meltano.core.utils import get_boolean_env_var, hash_sha256, safe_hasattr, strtobool

logger = get_logger(__name__)
Expand Down
3 changes: 1 addition & 2 deletions src/meltano/core/tracking/contexts/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from pathlib import Path
from types import TracebackType

from snowplow_tracker import SelfDescribingJson

from meltano.core.tracking.schemas import ExceptionContextSchema
from meltano.core.tracking.tracker import SelfDescribingJson
from meltano.core.utils import hash_sha256

BASE_PATHS = (sys.prefix, sys.exec_prefix, sys.base_prefix, sys.base_exec_prefix)
Expand Down
2 changes: 1 addition & 1 deletion src/meltano/core/tracking/contexts/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

import uuid

from snowplow_tracker import SelfDescribingJson
from structlog.stdlib import get_logger

from meltano.core.block.blockset import BlockSet
from meltano.core.block.plugin_command import PluginCommandBlock
from meltano.core.elt_context import ELTContext
from meltano.core.plugin.project_plugin import ProjectPlugin
from meltano.core.tracking.schemas import PluginsContextSchema
from meltano.core.tracking.tracker import SelfDescribingJson
from meltano.core.utils import hash_sha256, safe_hasattr

logger = get_logger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/meltano/core/tracking/contexts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from enum import Enum, auto
from functools import cached_property

from snowplow_tracker import SelfDescribingJson
from structlog.stdlib import get_logger

from meltano.core.project import Project
from meltano.core.tracking.schemas import ProjectContextSchema
from meltano.core.tracking.tracker import SelfDescribingJson
from meltano.core.utils import hash_sha256

logger = get_logger(__name__)
Expand Down
96 changes: 78 additions & 18 deletions src/meltano/core/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import structlog
import tzlocal
from psutil import Process
from snowplow_tracker import Emitter, SelfDescribing, SelfDescribingJson
from snowplow_tracker import Tracker as SnowplowTracker

from meltano.core.project import Project
from meltano.core.tracking.schemas import (
Expand All @@ -41,14 +39,67 @@

from functools import cached_property

logger = structlog.get_logger(__name__)

# Supress these imports from snowplow_tracker if tracking is disabled
# It helps to resolve issues related to import conflicts -
# snowplow-tracker vs. minimal-snowplow tracker
SNOWPLOW_TRACKER_AVAILABLE = False
try:
from snowplow_tracker import Emitter, SelfDescribing, SelfDescribingJson
from snowplow_tracker import Tracker as SnowplowTracker

SNOWPLOW_TRACKER_AVAILABLE = True
except Exception as e:
logger.warning(
"Import of snowplow_tracker failed. "
f"Disable tracking to fix it. Reason: {str(e)}",
)

class SelfDescribingJson:
"""A self-describing JSON object.
It is a copy from snowplow_tracker library.
We must define it if above imports fail,
it is used on many places across the whole codebase
"""

def __init__(self, schema, data) -> None:
"""Init a self-describing JSON object.
Args:
schema: JSON schema.
data: JSON payload
"""
self.schema = schema
self.data = data

def to_json(self) -> dict:
"""Return a JSON representation of the object.
Returns:
Dictionary with schema and data
"""
return {
"schema": self.schema,
"data": self.data,
}

def to_string(self) -> str:
"""Return a string representation of the object.
Returns:
String representation of the JSON
"""
return json.dumps(self.to_json())


URL_REGEX = (
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
)

MICROSECONDS_PER_SECOND = 1000000

logger = structlog.get_logger(__name__)


class BlockEvents(Enum):
"""Events describing a block state."""
Expand Down Expand Up @@ -85,7 +136,7 @@ class TelemetrySettings(t.NamedTuple):
class Tracker: # noqa: WPS214, WPS230 - too many (public) methods
"""Meltano tracker backed by Snowplow."""

def __init__( # noqa: WPS210 - too many local variables
def __init__( # noqa: WPS210, C901, WPS213 - too many local variables
self,
project: Project,
request_timeout: float | tuple[float, float] | None = 3.5,
Expand All @@ -112,20 +163,29 @@ def __init__( # noqa: WPS210 - too many local variables

endpoints = project.settings.get("snowplow.collector_endpoints")

emitters: list[Emitter] = []
for endpoint in endpoints:
if not check_url(endpoint):
logger.warning("invalid_snowplow_endpoint", endpoint=endpoint)
continue
parsed_url = urlparse(endpoint)
emitters.append(
Emitter(
endpoint=parsed_url.hostname + parsed_url.path,
protocol=parsed_url.scheme or "http",
port=parsed_url.port,
request_timeout=request_timeout,
),
# Supress errors when instantiating Emmiters
# It helps to resolve issues related to import conflicts -
# snowplow-tracker vs. minimal-snowplow tracker
if SNOWPLOW_TRACKER_AVAILABLE:
emitters: list[Emitter] = []
for endpoint in endpoints:
if not check_url(endpoint):
logger.warning("invalid_snowplow_endpoint", endpoint=endpoint)
continue
parsed_url = urlparse(endpoint)
emitters.append(
Emitter(
endpoint=parsed_url.hostname + parsed_url.path,
protocol=parsed_url.scheme or "http",
port=parsed_url.port,
request_timeout=request_timeout,
),
)
else:
logger.warning(
"Snowplow tracker is not available, setting emitters to empty list",
)
emitters = []

if emitters:
self.snowplow_tracker = SnowplowTracker(
Expand Down

0 comments on commit dc65ff9

Please sign in to comment.