Skip to content

Commit

Permalink
lint rules + documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis committed Jun 18, 2024
1 parent cbf27cc commit 3ac3f0a
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 103 deletions.
2 changes: 1 addition & 1 deletion python/lib/sift_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
INTERNAL MODULE
Everything inside of this module is not meant to be used publicly. Proceed at your own risk.
Everything inside of this module is not meant to be used publicly.
"""
21 changes: 21 additions & 0 deletions python/lib/sift_internal/convert/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json
from abc import ABC, abstractmethod
from typing import Any


class AsJson(ABC):
"""
Utility sub-types that require custom-serialization meant to be used in conjunction with the
`to_json` function. Sub-types should implement `as_json` which should return the object that
you want passed to `json.dumps`.
"""

@abstractmethod
def as_json(self) -> Any:
pass

def to_json(value: Any) -> str:
"""
Serializes `value` to a JSON string uses the `AsJson.as_json` implementation of the type.
"""
return json.dumps(value, default=lambda x: x.as_json())
3 changes: 2 additions & 1 deletion python/lib/sift_internal/convert/protobuf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from typing import Optional, Type, TypeVar, cast

from google.protobuf.message import Message
from typing import cast, Optional, Type, TypeVar

ProtobufMessage = Message

Expand Down
5 changes: 3 additions & 2 deletions python/lib/sift_py/grpc/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
"""

from __future__ import annotations
from grpc_interceptor import ClientInterceptor, ClientCallDetails
from typing import Any, Callable, cast, List, Tuple

from typing import Any, Callable, List, Tuple, cast

import grpc
from grpc_interceptor import ClientCallDetails, ClientInterceptor

Metadata = List[Tuple[str, str]]

Expand Down
12 changes: 8 additions & 4 deletions python/lib/sift_py/grpc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
"""

from __future__ import annotations
from grpc_interceptor import ClientInterceptor
from .interceptors import Metadata, MetadataInterceptor

from typing import List, TypedDict

import grpc
from grpc_interceptor import ClientInterceptor

from .interceptors import Metadata, MetadataInterceptor

SiftChannel = grpc.Channel

Expand Down Expand Up @@ -60,8 +62,10 @@ class SiftChannelConfig(TypedDict):
Config class used to instantiate a `SiftChannel` via `use_sift_channel`.
Attributes:
uri: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted.
apikey: User-generated API key generated via the Sift application.
`uri`:
The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted.
`apikey`:
User-generated API key generated via the Sift application.
"""

uri: str
Expand Down
17 changes: 9 additions & 8 deletions python/lib/sift_py/ingestion/channel.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from __future__ import annotations
from google.protobuf.empty_pb2 import Empty
from sift_internal.convert.protobuf import AsProtobuf, ProtobufMessage, try_cast_pb

from enum import Enum
from sift.common.type.v1.channel_enum_type_pb2 import (
ChannelEnumType as ChannelEnumTypePb,
)
from typing import List, Optional, Type, TypedDict

import sift.common.type.v1.channel_data_type_pb2 as channel_pb
from google.protobuf.empty_pb2 import Empty
from sift.common.type.v1.channel_bit_field_element_pb2 import (
ChannelBitFieldElement as ChannelBitFieldElementPb,
)
from sift.common.type.v1.channel_enum_type_pb2 import (
ChannelEnumType as ChannelEnumTypePb,
)
from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue
from sift.ingestion_configs.v1.ingestion_configs_pb2 import (
ChannelConfig as ChannelConfigPb,
)
from typing import List, Optional, Type, TypedDict
from sift_internal.convert.protobuf import AsProtobuf, ProtobufMessage, try_cast_pb
from typing_extensions import NotRequired

import sift.common.type.v1.channel_data_type_pb2 as channel_pb


class ChannelValue(TypedDict):
"""
Expand Down
32 changes: 13 additions & 19 deletions python/lib/sift_py/ingestion/config/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
from __future__ import annotations

from typing import List, Optional

from ..flow import FlowConfig
from ..rule.config import RuleConfig
from typing import Dict, List, Optional


class TelemetryConfig:
"""
Configurations necessary to start ingestion.
Attributes:
`asset_name`: The name of the asset that you wish to telemeter data for.
`ingestion_client_key`: An arbitrary string completely chosen by the user to uniquely identify
this ingestion configuration. It should be unique with respect to your organization.
`flows`: The list of `FlowConfig`. A single flow can specify a single channel value
or a set of channel values, with each value belonging to a different channel. Channels
that send data at the same frequency and time should be in the same flow.
`organization_id`: ID of your organization in Sift. This field is only required if your user
belongs to multiple organizations
`rules`: Rules to evaluate during ingestion.
`named_expressions`: Reusable expressions used to generate rules. The key is the name of the expression and the value
of said key is the actual generic expression.
`asset_name`:
The name of the asset that you wish to telemeter data for.
`ingestion_client_key`:
An arbitrary string chosen by the user to uniquely identify this ingestion configuration.
`flows`:
A single flow can specify a single channel value or a set of channel values that are ingested together.
`organization_id`:
ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
`rules`:
Rules to evaluate during ingestion.
"""

asset_name: str
Expand All @@ -31,16 +29,13 @@ class TelemetryConfig:
flows: List[FlowConfig]
rules: List[RuleConfig]

named_expressions: Dict[str, str]

def __init__(
self,
asset_name: str,
ingestion_client_key: str,
organization_id: Optional[str] = None,
flows: List[FlowConfig] = [],
rules: List[RuleConfig] = [],
named_expressions: Dict[str, str] = {},
):
# TODO: Add validation logic here as well.

Expand All @@ -49,4 +44,3 @@ def __init__(
self.organization_id = organization_id
self.flows = flows
self.rules = rules
self.named_expressions = named_expressions
23 changes: 13 additions & 10 deletions python/lib/sift_py/ingestion/config/yaml.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from __future__ import annotations
from ..channel import ChannelDataType, ChannelBitFieldElement, ChannelEnumType, channel_fqn

from collections.abc import Iterable
from pathlib import Path
from typing import Dict, List, Literal, Optional, TypedDict, cast

import yaml
from typing_extensions import NotRequired

from ..channel import ChannelBitFieldElement, ChannelDataType, ChannelEnumType, channel_fqn
from ..error import YamlConfigError
from ..flow import ChannelConfig, FlowConfig
from ..rule.config import (
RuleConfig,
RuleActionCreateDataReviewAnnotation,
RuleActionAnnotationKind,
RuleActionCreateDataReviewAnnotation,
RuleActionCreatePhaseAnnotation,
RuleConfig,
)
from collections.abc import Iterable
from pathlib import Path
from typing import cast, Dict, List, Literal, Optional, TypedDict
from typing_extensions import NotRequired
from .telemetry import TelemetryConfig

import yaml


class TelemetryConfigYamlSpec(TypedDict):
"""
Expand Down Expand Up @@ -88,7 +90,8 @@ class YamlLoadOptions(TypedDict):
Options to use when loading a telemetry config form YAML.
Attributes:
`named_expressions`: A list of look up paths for YAML files containing named expressions. Could also just be a YAML str.
`named_expressions`:
A list of look up paths for YAML files containing named expressions. Could also just be a YAML str.
"""

named_expressions: List[Path | str]
Expand Down
26 changes: 13 additions & 13 deletions python/lib/sift_py/ingestion/config/yaml_test.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from __future__ import annotations

from typing import cast

import pytest
import yaml

from ..channel import ChannelDataType
from ..rule.config import (
RuleActionCreateDataReviewAnnotation,
RuleActionCreatePhaseAnnotation,
RuleActionKind,
)
from .yaml import (
_try_from_yaml_str,
YamlConfigError,
NamedExpressionsYamlSpec,
YamlConfigError,
_try_from_yaml_str,
)
from ..channel import ChannelDataType
from typing import cast


import pytest
import yaml


def test_telemetry_config():
Expand Down Expand Up @@ -137,20 +137,20 @@ def test_named_expressions():
name: log
data_type: string
description: asset logs
velocity_channel: &velocity_channel
name: velocity
data_type: double
description: speed
unit: Miles Per Hour
component: mainmotor
voltage_channel: &voltage_channel
name: voltage
data_type: int32
description: voltage at the source
unit: Volts
vehicle_state_channel: &vehicle_state_channel
name: vehicle_state
data_type: enum
Expand All @@ -163,7 +163,7 @@ def test_named_expressions():
key: 1
- name: Stopped
key: 2
gpio_channel: &gpio_channel
name: gpio
data_type: bit_field
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_named_expressions():
channels:
- <<: *velocity_channel
- <<: *voltage_channel
- name: logs
channels:
- <<: *log_channel
Expand Down
11 changes: 8 additions & 3 deletions python/lib/sift_py/ingestion/flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import annotations
from .channel import ChannelConfig, channel_fqn
from sift_internal.convert.protobuf import try_cast_pb, AsProtobuf, ProtobufMessage

from typing import Dict, List, Optional, Type

from sift.ingestion_configs.v1.ingestion_configs_pb2 import (
ChannelConfig as ChannelConfigPb,
)
from sift.ingestion_configs.v1.ingestion_configs_pb2 import (
FlowConfig as FlowConfigPb,
)
from typing import Dict, List, Optional, Type
from sift_internal.convert.protobuf import AsProtobuf, ProtobufMessage, try_cast_pb

from .channel import ChannelConfig, channel_fqn


class FlowConfig(AsProtobuf):
Expand Down
39 changes: 21 additions & 18 deletions python/lib/sift_py/ingestion/impl/ingest.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
from __future__ import annotations
from ..channel import ChannelValue, channel_fqn, is_data_type, empty_value
from ..flow import FlowConfig
from ..rule.config import RuleConfig
from .ingestion_config import (
get_ingestion_config_by_client_key,
create_ingestion_config,
)
from ..config.telemetry import TelemetryConfig
from ...grpc.transport import SiftChannel
from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig

from datetime import datetime
from typing import Dict, List, Optional, cast

from google.protobuf.timestamp_pb2 import Timestamp
from sift.ingest.v1.ingest_pb2 import (
IngestWithConfigDataChannelValue,
IngestWithConfigDataStreamRequest,
)
from sift.ingest.v1.ingest_pb2_grpc import IngestServiceStub
from sift.runs.v2.runs_pb2 import CreateRunRequest, CreateRunResponse
from sift.runs.v2.runs_pb2_grpc import RunServiceStub
from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig
from sift.rules.v1.rules_pb2 import (
JsonRulesRequest,
UpdateJsonRulesRequest,
UpdateJsonRulesResponse,
JsonRulesRequest,
)
from sift.rules.v1.rules_pb2_grpc import RuleServiceStub
from google.protobuf.timestamp_pb2 import Timestamp
from typing import cast, Dict, List, Optional
from datetime import datetime
from sift.runs.v2.runs_pb2 import CreateRunRequest, CreateRunResponse
from sift.runs.v2.runs_pb2_grpc import RunServiceStub
from sift_internal.convert.json import to_json

import json
from ...grpc.transport import SiftChannel
from ..channel import ChannelValue, channel_fqn, empty_value, is_data_type
from ..config.telemetry import TelemetryConfig
from ..flow import FlowConfig
from ..rule.config import RuleConfig
from .ingestion_config import (
create_ingestion_config,
get_ingestion_config_by_client_key,
)


class IngestionServiceImpl:
Expand Down Expand Up @@ -59,6 +61,7 @@ def __init__(
# TODO... flows can have the same name...
self.flow_configs_by_name = {flow.name: flow for flow in config.flows}

# TODO... compare with existing rules and error if mismatch
self.__class__.update_rules(
channel, self.ingestion_config.asset_id, config.rules, config.organization_id
)
Expand Down Expand Up @@ -171,7 +174,7 @@ def update_rules(
organization_id: Optional[str] = None,
):
svc = RuleServiceStub(channel)
json_rules = json.dumps(rule_configs, default=lambda x: x.as_json())
json_rules = to_json(rule_configs)
req = UpdateJsonRulesRequest(
request=JsonRulesRequest(
asset_id=asset_id,
Expand Down
Loading

0 comments on commit 3ac3f0a

Please sign in to comment.