Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldmitry committed Jun 28, 2024
1 parent 86d1783 commit 655af20
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 83 deletions.
69 changes: 27 additions & 42 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,7 @@
)
from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer

from tempo_config import (
BlockConfig,
Client,
ClientTLSConfig,
CompactionConfig,
Compactor,
Distributor,
FrontendWorker,
Ingester,
Memberlist,
PoolConfig,
Querier,
S3Config,
Server,
Storage,
TempoBaseConfig,
TLSConfig,
TraceStorageConfig,
WalConfig,
)
from src import tempo_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -147,15 +128,15 @@ def get_receiver_url(self, protocol: ReceiverProtocol, ingress: TraefikRouteRequ
return f"{url}:{receiver_port}"

def _build_server_config(self):
server_config = Server(
server_config = tempo_config.Server(
http_listen_port=self.tempo_http_server_port,
# we need to specify a grpc server port even if we're not using the grpc server,
# otherwise it will default to 9595 and make promtail bork
grpc_listen_port=self.tempo_grpc_server_port,
)

if self.use_tls:
server_tls_config = TLSConfig(
server_tls_config = tempo_config.TLS(
cert_file=str(self.tls_cert_path),
key_file=str(self.tls_key_path),
client_ca_file=str(self.tls_ca_path),
Expand All @@ -170,12 +151,12 @@ def generate_config(
receivers: Sequence[ReceiverProtocol],
s3_config: dict,
peers: Optional[List[str]] = None,
) -> TempoBaseConfig:
) -> tempo_config.Tempo:
"""Generate the Tempo configuration.
Only activate the provided receivers.
"""
config = TempoBaseConfig(
config = tempo_config.Tempo(
auth_enabled=False,
server=self._build_server_config(),
distributor=self._build_distributor_config(receivers),
Expand All @@ -197,34 +178,38 @@ def generate_config(
# try with fqdn?
"tls_server_name": self._external_hostname,
}
config.ingester_client = Client(grpc_client_config=ClientTLSConfig(**tls_config))
config.metrics_generator_client = Client(
grpc_client_config=ClientTLSConfig(**tls_config)
config.ingester_client = tempo_config.Client(
grpc_client_config=tempo_config.ClientTLS(**tls_config)
)
config.metrics_generator_client = tempo_config.Client(
grpc_client_config=tempo_config.ClientTLS(**tls_config)
)
config.querier.frontend_worker.grpc_client_config = tempo_config.ClientTLS(
**tls_config
)
config.querier.frontend_worker.grpc_client_config = ClientTLSConfig(**tls_config)
config.memberlist = config.memberlist.copy(update=tls_config)

return config

def _build_storage_config(self, s3_config: dict):
storage_config = TraceStorageConfig(
storage_config = tempo_config.TraceStorage(
# where to store the wal locally
wal=WalConfig(path=self.wal_path),
pool=PoolConfig(
wal=tempo_config.Wal(path=self.wal_path),
pool=tempo_config.Pool(
# number of traces per index record
max_workers=400,
queue_depth=20000,
),
backend="s3",
s3=S3Config(
s3=tempo_config.S3(
bucket=s3_config["bucket"],
access_key=s3_config["access-key"],
endpoint=s3_config["endpoint"],
secret_key=s3_config["secret-key"],
),
block=BlockConfig(version="v2"),
block=tempo_config.Block(version="v2"),
)
return Storage(trace=storage_config)
return tempo_config.Storage(trace=storage_config)

def is_ready(self):
"""Whether the tempo built-in readiness check reports 'ready'."""
Expand All @@ -245,16 +230,16 @@ def is_ready(self):
def _build_querier_config(self):
"""Build querier config"""
# TODO this won't work for distributed coordinator where query frontend will be on a different unit
return Querier(
frontend_worker=FrontendWorker(
return tempo_config.Querier(
frontend_worker=tempo_config.FrontendWorker(
frontend_address=f"localhost:{self.tempo_grpc_server_port}"
),
)

def _build_compactor_config(self):
"""Build compactor config"""
return Compactor(
compaction=CompactionConfig(
return tempo_config.Compactor(
compaction=tempo_config.Compaction(
# blocks in this time window will be compacted together
compaction_window="1h",
# maximum size of compacted blocks
Expand All @@ -266,9 +251,9 @@ def _build_compactor_config(self):
)
)

def _build_memberlist_config(self, peers: Optional[List[str]]):
def _build_memberlist_config(self, peers: Optional[List[str]]) -> tempo_config.Memberlist:
"""Build memberlist config"""
return Memberlist(
return tempo_config.Memberlist(
abort_if_cluster_join_fails=False,
bind_port=self.memberlist_port,
join_members=([f"{peer}:{self.memberlist_port}" for peer in peers] if peers else []),
Expand All @@ -279,7 +264,7 @@ def _build_ingester_config(self):
# the length of time after a trace has not received spans to consider it complete and flush it
# cut the head block when it hits this number of traces or ...
# this much time passes
return Ingester(
return tempo_config.Ingester(
trace_idle_period="10s",
max_block_bytes=100,
max_block_duration="30m",
Expand Down Expand Up @@ -333,4 +318,4 @@ def _build_distributor_config(self, receivers: Sequence[ReceiverProtocol]): # n
if jaeger_config:
config["jaeger"] = {"protocols": jaeger_config}

return Distributor(receivers=config)
return tempo_config.Distributor(receivers=config)
8 changes: 4 additions & 4 deletions src/tempo_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ops import EventSource, Object, ObjectEvents
from pydantic import BaseModel, ConfigDict

from tempo_config import TempoBaseConfig
from tempo_config import Tempo as TempoConfig

log = logging.getLogger("tempo_cluster")

Expand Down Expand Up @@ -233,7 +233,7 @@ class TempoClusterRequirerUnitData(DatabagModel):
class TempoClusterProviderAppData(DatabagModel):
"""TempoClusterProviderAppData."""

tempo_config: TempoBaseConfig
tempo_config: Dict[str, Any]
loki_endpoints: Optional[Dict[str, str]] = None
ca_cert: Optional[str] = None
server_cert: Optional[str] = None
Expand Down Expand Up @@ -300,7 +300,7 @@ def publish_privkey(self, label: str) -> str:

def publish_data(
self,
tempo_config: TempoBaseConfig,
tempo_config: TempoConfig,
tempo_receiver: Optional[Dict[ReceiverProtocol, Any]] = None,
ca_cert: Optional[str] = None,
server_cert: Optional[str] = None,
Expand All @@ -311,7 +311,7 @@ def publish_data(
for relation in self._relations:
if relation:
local_app_databag = TempoClusterProviderAppData(
tempo_config=tempo_config,
tempo_config=tempo_config.__dict__,
loki_endpoints=loki_endpoints,
tempo_receiver=tempo_receiver,
ca_cert=ca_cert,
Expand Down
40 changes: 20 additions & 20 deletions src/tempo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Memberlist(BaseModel):
tls_server_name: Optional[str] = None


class ClientTLSConfig(BaseModel):
class ClientTLS(BaseModel):
"""Client tls config schema."""

tls_enabled: bool
Expand All @@ -61,7 +61,7 @@ class ClientTLSConfig(BaseModel):
class Client(BaseModel):
"""Client schema."""

grpc_client_config: ClientTLSConfig
grpc_client_config: ClientTLS


class Distributor(BaseModel):
Expand All @@ -84,7 +84,7 @@ class FrontendWorker(BaseModel):
"""FrontendWorker schema."""

frontend_address: str
grpc_client_config: Optional[ClientTLSConfig] = None
grpc_client_config: Optional[ClientTLS] = None


class Querier(BaseModel):
Expand All @@ -93,7 +93,7 @@ class Querier(BaseModel):
frontend_worker: FrontendWorker


class TLSConfig(BaseModel):
class TLS(BaseModel):
"""TLS configuration schema."""

cert_file: str
Expand All @@ -107,11 +107,11 @@ class Server(BaseModel):

http_listen_port: int
grpc_listen_port: int
http_tls_config: Optional[TLSConfig] = None
grpc_tls_config: Optional[TLSConfig] = None
http_tls_config: Optional[TLS] = None
grpc_tls_config: Optional[TLS] = None


class CompactionConfig(BaseModel):
class Compaction(BaseModel):
"""Compaction schema."""

compaction_window: str
Expand All @@ -125,23 +125,23 @@ class Compactor(BaseModel):
"""Compactor schema."""

ring: Optional[Ring] = None
compaction: CompactionConfig
compaction: Compaction


class PoolConfig(BaseModel):
class Pool(BaseModel):
"""Pool schema."""

max_workers: int
queue_depth: int


class WalConfig(BaseModel):
class Wal(BaseModel):
"""Wal schema."""

path: str


class S3Config(BaseModel):
class S3(BaseModel):
"""S3 config schema."""

bucket: str
Expand All @@ -168,30 +168,30 @@ def remove_scheme(cls, v: str) -> str:
)


class BlockConfig(BaseModel):
class Block(BaseModel):
"""Block schema."""

version: Optional[str]


class TraceStorageConfig(BaseModel):
class TraceStorage(BaseModel):
"""Trace Storage schema."""

wal: WalConfig
pool: PoolConfig
wal: Wal
pool: Pool
backend: str
s3: S3Config
block: BlockConfig
s3: S3
block: Block


class Storage(BaseModel):
"""Storage schema."""

trace: TraceStorageConfig
trace: TraceStorage


class TempoBaseConfig(BaseModel):
"""Base class for tempo config schema."""
class Tempo(BaseModel):
"""Tempo config schema."""

auth_enabled: bool
server: Server
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_ingressed_tls.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# TODO: uncomment and fix when the fully functional tempo cluster is ready (e.g: traces get ingested, can query for traces)
# import asyncio
# import json
# import logging
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# TODO: uncomment and fix when the fully functional tempo cluster is ready (e.g: traces get ingested, can query for traces)
# import asyncio
# import json
# import logging
Expand Down
15 changes: 1 addition & 14 deletions tests/integration/test_scaling_monolithic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pytest_operator.plugin import OpsTest

METADATA = yaml.safe_load(Path("./charmcraft.yaml").read_text())
APP_NAME = "tempo-coordinator"
APP_NAME = "tempo"
FACADE = "facade"
TRACEGEN_SCRIPT_PATH = Path() / "scripts" / "tracegen.py"
FACADE_MOCKS_PATH = "/var/lib/juju/agents/unit-facade-0/charm/mocks"
Expand All @@ -31,7 +31,6 @@ async def test_deploy_tempo(ops_test: OpsTest):
apps=[APP_NAME],
# coordinator will be blocked on s3 and workers integration
status="blocked",
raise_on_blocked=False,
timeout=10000,
raise_on_error=False,
)
Expand All @@ -45,7 +44,6 @@ async def test_scale_tempo_up_without_s3_blocks(ops_test: OpsTest):
await ops_test.model.wait_for_idle(
apps=[APP_NAME],
status="blocked",
raise_on_blocked=False,
timeout=1000,
)

Expand Down Expand Up @@ -74,16 +72,6 @@ def present_facade(
_model = f" --model {model}" if model else ""

run(shlex.split(f"juju run {app}/0{_model} update --params {fpath.absolute()}"))
# facade charm edge rev9 copies data into 'mocks/provide' not 'mocks/require'
# workaround to mv the copied file to the correct path inside 'require' directory
# until charm-relation-interfaces/pull/152 is merged.
if role == "require":
run(
shlex.split(
f"juju exec{_model} --unit {app}/0 mv {FACADE_MOCKS_PATH}/provide/require-{interface}.yaml {FACADE_MOCKS_PATH}/require/"
)
)
run(shlex.split(f"juju run {app}/0{_model} update --params {fpath.absolute()}"))


@pytest.mark.setup
Expand Down Expand Up @@ -142,7 +130,6 @@ async def test_tempo_blocks_if_s3_goes_away(ops_test: OpsTest):
await app.destroy(destroy_storage=True)
await ops_test.model.wait_for_idle(
apps=[APP_NAME],
raise_on_blocked=False,
status="blocked",
timeout=1000,
)
1 change: 1 addition & 0 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# TODO: uncomment and fix when the fully functional tempo cluster is ready (e.g: traces get ingested, can query for traces)
# import asyncio
# import json
# import logging
Expand Down
Loading

0 comments on commit 655af20

Please sign in to comment.