Skip to content

Commit

Permalink
Merge pull request #169 from authorizon/feature/oded/add-statistics-t…
Browse files Browse the repository at this point in the history
…o-opal-server

Feature/oded/add statistics to opal server
  • Loading branch information
Asaf Cohen authored Nov 7, 2021
2 parents c72fa94 + 9bc74aa commit 829d948
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 12 deletions.
64 changes: 64 additions & 0 deletions docker/docker-compose-with-statistics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
version: "3.8"
services:
# When scaling the opal-server to multiple nodes and/or multiple workers, we use
# a *broadcast* channel to sync between all the instances of opal-server.
# Under the hood, this channel is implemented by encode/broadcaster (see link below).
# At the moment, the broadcast channel can be either: postgresdb, redis or kafka.
# The format of the broadcaster URI string (the one we pass to opal server as `OPAL_BROADCAST_URI`) is specified here:
# https://github.com/encode/broadcaster#available-backends
broadcast_channel:
image: postgres:alpine
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
opal_server:
# by default we run opal-server from latest official image
image: authorizon/opal-server:next
environment:
# the broadcast backbone uri used by opal server workers (see comments above for: broadcast_channel)
- OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres
# number of uvicorn workers to run inside the opal-server container
- UVICORN_NUM_WORKERS=2
# the git repo hosting our policy
# - if this repo is not public, you can pass an ssh key via `OPAL_POLICY_REPO_SSH_KEY`)
# - the repo we pass in this example is *public* and acts as an example repo with dummy rego policy
# - for more info, see: https://github.com/authorizon/opal/blob/master/docs/HOWTO/track_a_git_repo.md
- OPAL_POLICY_REPO_URL=https://github.com/authorizon/opal-example-policy-repo
# in this example we will use a polling interval of 30 seconds to check for new policy updates (git commits affecting the rego policy).
# however, it is better to utilize a git *webhook* to trigger the server to check for changes only when the repo has new commits.
# for more info see: https://github.com/authorizon/opal/blob/master/docs/HOWTO/track_a_git_repo.md
- OPAL_POLICY_REPO_POLLING_INTERVAL=30
# configures from where the opal client should initially fetch data (when it first goes up, after disconnection, etc).
# the data sources represents from where the opal clients should get a "complete picture" of the data they need.
# after the initial sources are fetched, the client will subscribe only to update notifications sent by the server.
- OPAL_DATA_CONFIG_SOURCES={"config":{"entries":[{"url":"http://host.docker.internal:7002/policy-data","topics":["policy_data"]}]}}
- OPAL_LOG_FORMAT_INCLUDE_PID=true
# turning on statistics collection on the server side
- OPAL_STATISTICS_ENABLED=true
ports:
# exposes opal server on the host machine, you can access the server at: http://localhost:7002
- "7002:7002"
depends_on:
- broadcast_channel
opal_client:
# by default we run opal-client from latest official image
image: authorizon/opal-client:next
environment:
- OPAL_SERVER_URL=http://opal_server:7002
- OPAL_LOG_FORMAT_INCLUDE_PID=true
- OPAL_INLINE_OPA_LOG_FORMAT=http
# turning on statistics reporting on the client side
- OPAL_STATISTICS_ENABLED=true
ports:
# exposes opal client on the host machine, you can access the client at: http://localhost:7000
- "7000:7000"
# exposes the OPA agent (being run by OPAL) on the host machine
# you can access the OPA api that you know and love at: http://localhost:8181
# OPA api docs are at: https://www.openpolicyagent.org/docs/latest/rest-api/
- "8181:8181"
depends_on:
- opal_server
# this command is not necessary when deploying OPAL for real, it is simply a trick for dev environments
# to make sure that opal-server is already up before starting the client.
command: sh -c "/usr/wait-for.sh opal_server:7002 --timeout=20 -- /start.sh"
16 changes: 9 additions & 7 deletions opal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import signal
import asyncio
import uuid
import aiohttp
import functools
from typing import List, Optional
Expand Down Expand Up @@ -52,14 +53,15 @@ def __init__(
policy_updater (PolicyUpdater, optional): Defaults to None.
"""
# defaults
policy_store_type:PolicyStoreTypes=policy_store_type or opal_client_config.POLICY_STORE_TYPE
inline_opa_enabled:bool=inline_opa_enabled or opal_client_config.INLINE_OPA_ENABLED
inline_opa_options:OpaServerOptions=inline_opa_options or opal_client_config.INLINE_OPA_CONFIG
policy_store_type: PolicyStoreTypes = policy_store_type or opal_client_config.POLICY_STORE_TYPE
inline_opa_enabled: bool = inline_opa_enabled or opal_client_config.INLINE_OPA_ENABLED
inline_opa_options: OpaServerOptions = inline_opa_options or opal_client_config.INLINE_OPA_CONFIG
opal_client_identifier: str = opal_client_config.OPAL_CLIENT_STAT_ID or f"CLIENT_{uuid.uuid4().hex}"
# set logs
configure_logs()
# Init policy store client
self.policy_store_type:PolicyStoreTypes = policy_store_type
self.policy_store:BasePolicyStoreClient = policy_store or PolicyStoreClientFactory.create(policy_store_type)
self.policy_store_type: PolicyStoreTypes = policy_store_type
self.policy_store: BasePolicyStoreClient = policy_store or PolicyStoreClientFactory.create(policy_store_type)
# data fetcher
self.data_fetcher = DataFetcher()
# callbacks register
Expand All @@ -74,14 +76,14 @@ def __init__(
if policy_updater is not None:
self.policy_updater = policy_updater
else:
self.policy_updater = PolicyUpdater(policy_store=self.policy_store, data_fetcher=self.data_fetcher, callbacks_register=self._callbacks_register)
self.policy_updater = PolicyUpdater(policy_store=self.policy_store, data_fetcher=self.data_fetcher, callbacks_register=self._callbacks_register, opal_client_id=opal_client_identifier)
# Data updating service
if opal_client_config.DATA_UPDATER_ENABLED:
if data_updater is not None:
self.data_updater = data_updater
else:
data_topics = data_topics if data_topics is not None else opal_client_config.DATA_TOPICS
self.data_updater = DataUpdater(policy_store=self.policy_store, data_topics=data_topics, data_fetcher=self.data_fetcher, callbacks_register=self._callbacks_register)
self.data_updater = DataUpdater(policy_store=self.policy_store, data_topics=data_topics, data_fetcher=self.data_fetcher, callbacks_register=self._callbacks_register, opal_client_id=opal_client_identifier)
else:
self.data_updater = None

Expand Down
5 changes: 5 additions & 0 deletions opal_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def load_policy_store():
"system/opal/transactions",
description="Path to OPA document that stores the OPA write transactions")

OPAL_CLIENT_STAT_ID = confi.str(
"OPAL_CLIENT_STAT_ID",
None,
description="Unique client statistics identifier")

OPA_HEALTH_CHECK_POLICY_PATH = 'opa/healthcheck/opal.rego'

def on_load(self):
Expand Down
10 changes: 9 additions & 1 deletion opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastapi_websocket_pubsub import PubSubClient
from fastapi_websocket_rpc.rpc_channel import RpcChannel

from opal_common.config import opal_common_config
from opal_client.config import opal_client_config
from opal_client.data.fetcher import DataFetcher
from opal_client.data.rpc import TenantAwareRpcEventClientMethods
Expand Down Expand Up @@ -40,7 +41,8 @@ def __init__(self, token: str = None,
policy_store: BasePolicyStoreClient = None,
should_send_reports=None,
data_fetcher: Optional[DataFetcher] = None,
callbacks_register: Optional[CallbacksRegister] = None
callbacks_register: Optional[CallbacksRegister] = None,
opal_client_id: str = None
):
"""
Keeps policy-stores (e.g. OPA) up to date with relevant data
Expand Down Expand Up @@ -77,6 +79,7 @@ def __init__(self, token: str = None,
self._token = token
self._server_url = pubsub_url
self._data_sources_config_url = data_sources_config_url
self._opal_client_id = opal_client_id
if self._token is None:
self._extra_headers = None
else:
Expand Down Expand Up @@ -167,6 +170,11 @@ async def on_connect(self, client: PubSubClient, channel: RpcChannel):
logger.info("Connected to server")
if self._fetch_on_connect:
await self.get_base_policy_data()
if opal_common_config.STATISTICS_ENABLED:
await self._client.wait_until_ready()
# publish statistics to the server about new connection from client (only if STATISTICS_ENABLED is True, default to False)
await self._client.publish([opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], data={'topics': self._data_topics, 'client_id': self._opal_client_id, 'rpc_id': channel.id})


async def on_disconnect(self, channel: RpcChannel):
logger.info("Disconnected from server")
Expand Down
8 changes: 7 additions & 1 deletion opal_client/policy/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastapi_websocket_pubsub import PubSubClient
from opal_common.schemas.store import TransactionType

from opal_common.config import opal_common_config
from opal_common.utils import get_authorization_header
from opal_common.schemas.policy import PolicyBundle
from opal_common.topics.utils import (
Expand Down Expand Up @@ -39,7 +40,7 @@ def __init__(
policy_store: BasePolicyStoreClient = None,
data_fetcher: Optional[DataFetcher] = None,
callbacks_register: Optional[CallbacksRegister] = None,
backend_url: Optional[str] = None
opal_client_id: str = None
):
"""inits the policy updater.
Expand All @@ -56,6 +57,7 @@ def __init__(
token: str = token or opal_client_config.CLIENT_TOKEN
pubsub_url: str = pubsub_url or opal_client_config.SERVER_PUBSUB_URL
subscription_directories: List[str] = subscription_directories or opal_client_config.POLICY_SUBSCRIPTION_DIRS
self._opal_client_id = opal_client_id

# The policy store we'll save policy modules into (i.e: OPA)
self._policy_store = policy_store or DEFAULT_POLICY_STORE_GETTER()
Expand Down Expand Up @@ -119,6 +121,10 @@ async def _on_connect(self, client: PubSubClient, channel: RpcChannel):
"""
logger.info("Connected to server")
await self.update_policy()
if opal_common_config.STATISTICS_ENABLED:
await self._client.wait_until_ready()
# publish statistics to the server about new connection from client (only if STATISTICS_ENABLED is True, default to False)
await self._client.publish([opal_common_config.STATISTICS_ADD_CLIENT_CHANNEL], data={'topics': self._topics, 'client_id': self._opal_client_id, 'rpc_id': channel.id})

async def _on_disconnect(self, channel: RpcChannel):
"""
Expand Down
5 changes: 5 additions & 0 deletions opal_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class OpalCommonConfig(Confi):
LOG_FILE_SERIALIZE = confi.str("LOG_FILE_SERIALIZE", True)
LOG_FILE_LEVEL = confi.str("LOG_FILE_LEVEL", "INFO")

STATISTICS_ENABLED = confi.bool("STATISTICS_ENABLED", False, description="Set if OPAL server will collect statistics about OPAL clients may cause a small performance hit")
STATISTICS_ADD_CLIENT_CHANNEL = confi.str("STATISTICS_ADD_CLIENT_CHANNEL", "__opal_stats_add", description="The topic to update about new OPAL clients connection")
STATISTICS_REMOVE_CLIENT_CHANNEL = confi.str("STATISTICS_REMOVE_CLIENT_CHANNEL", "__opal_stats_rm", description="The topic to update about OPAL clients disconnection")


# Fetching Providers
# - where to load providers from
FETCH_PROVIDER_MODULES = confi.list("FETCH_PROVIDER_MODULES", ["opal_common.fetcher.providers"])
Expand Down
2 changes: 1 addition & 1 deletion opal_common/git/tar_file_to_local_git_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from opal_server.config import opal_server_config


class TarFileToLocalGitExtractor():
class TarFileToLocalGitExtractor:
"""
This class takes tar file from remote api source and extract it to local
git, so we could manage update to opal clients
Expand Down
5 changes: 5 additions & 0 deletions opal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class OpalServerConfig(Confi):
BROADCAST_KEEPALIVE_INTERVAL = confi.int("BROADCAST_KEEPALIVE_INTERVAL", 3600, description="the time to wait between sending two consecutive broadcaster keepalive messages")
BROADCAST_KEEPALIVE_TOPIC = confi.str("BROADCAST_KEEPALIVE_TOPIC", "__broadcast_session_keepalive__", description="the topic on which we should send broadcaster keepalive messages")

# statistics
MAX_CHANNELS_PER_CLIENT = confi.int("MAX_CHANNELS_PER_CLIENT", 15, description="max number of records per client, after this number it will not be added to statistics, relevant only if STATISTICS_ENABLED")
STATISTICS_WAKEUP_CHANNEL = confi.str("STATISTICS_WAKEUP_CHANNEL", "__opal_stats_wakeup", description="The topic a waking-up OPAL server uses to notify others he needs their statistics data")
STATISTICS_STATE_SYNC_CHANNEL = confi.str("STATISTICS_STATE_SYNC_CHANNEL", "__opal_stats_state_sync", description="The topic other servers with statistics provide their state to a waking-up server")

# Data updates
ALL_DATA_TOPIC = confi.str("ALL_DATA_TOPIC", "policy_data", description="Top level topic for data")
ALL_DATA_ROUTE = confi.str("ALL_DATA_ROUTE", "/policy-data")
Expand Down
3 changes: 2 additions & 1 deletion opal_server/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fastapi_websocket_pubsub import PubSubEndpoint
from opal_common.confi.confi import load_conf_if_none

from opal_common.config import opal_common_config
from opal_common.logger import logger
from opal_common.authentication.signer import JWTSigner
from opal_common.authentication.deps import WebsocketJWTAuthenticator
Expand All @@ -21,7 +22,7 @@ def __init__(self, signer: JWTSigner, broadcaster_uri:str=None):
"""
broadcaster_uri = load_conf_if_none(broadcaster_uri, opal_server_config.BROADCAST_URI)
self.router = APIRouter()
self.endpoint = PubSubEndpoint(broadcaster=broadcaster_uri)
self.endpoint = PubSubEndpoint(broadcaster=broadcaster_uri, rpc_channel_get_remote_id=opal_common_config.STATISTICS_ENABLED)
authenticator = WebsocketJWTAuthenticator(signer)

@self.router.websocket("/ws")
Expand Down
26 changes: 26 additions & 0 deletions opal_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, List

from fastapi import Depends, FastAPI
from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager
from opal_common.confi.confi import load_conf_if_none
from opal_common.git.repo_cloner import RepoClonePathFinder

Expand All @@ -28,6 +29,7 @@
trigger_repo_watcher_pull)
from opal_server.policy.watcher.task import PolicyWatcherTask
from opal_server.pubsub import PubSub
from opal_server.statistics import OpalStatistics, init_statistics_router


class OpalServer:
Expand Down Expand Up @@ -126,6 +128,18 @@ def __init__(
topic=opal_server_config.BROADCAST_KEEPALIVE_TOPIC
)

if opal_common_config.STATISTICS_ENABLED:
self.opal_statistics = OpalStatistics(self.pubsub.endpoint)
else:
self.opal_statistics = None

# if stats are enabled, the server workers must be listening on the broadcast
# channel for their own syncronization, not just for their clients. therefore
# we need a "global" listening context
self.broadcast_listening_context: Optional[EventBroadcasterContextManager] = None
if self.broadcaster_uri is not None and opal_common_config.STATISTICS_ENABLED:
self.broadcast_listening_context = self.pubsub.endpoint.broadcaster.get_listening_context()

self.watcher: Optional[PolicyWatcherTask] = None

# init fastapi app
Expand Down Expand Up @@ -167,13 +181,15 @@ def _configure_api_routes(self, app: FastAPI):
)
webhook_router = init_git_webhook_router(self.pubsub.endpoint, authenticator)
security_router = init_security_router(self.signer, StaticBearerAuthenticator(self.master_token))
statistics_router = init_statistics_router(self.opal_statistics)

# mount the api routes on the app object
app.include_router(bundles_router, tags=["Bundle Server"], dependencies=[Depends(authenticator)])
app.include_router(data_updates_router, tags=["Data Updates"], dependencies=[Depends(authenticator)])
app.include_router(webhook_router, tags=["Github Webhook"])
app.include_router(security_router, tags=["Security"])
app.include_router(self.pubsub.router, tags=["Pub/Sub"])
app.include_router(statistics_router, tags=['Server Statistics'], dependencies=[Depends(authenticator)])

if self.jwks_endpoint is not None:
# mount jwts (static) route
Expand Down Expand Up @@ -218,6 +234,12 @@ async def start_server_background_tasks(self):
"""
if self.publisher is not None:
async with self.publisher:
if self.opal_statistics is not None:
if self.broadcast_listening_context is not None:
logger.info("listening on broadcast channel for statistics events...")
await self.broadcast_listening_context.__aenter__()
asyncio.create_task(self.opal_statistics.run())
self.pubsub.endpoint.notifier.register_unsubscribe_event(self.opal_statistics.remove_client)
if self._init_policy_watcher:
# repo watcher is enabled, but we want only one worker to run it
# (otherwise for each new commit, we will publish multiple updates via pub/sub).
Expand All @@ -242,6 +264,10 @@ async def start_server_background_tasks(self):
if self.broadcast_keepalive is not None:
self.broadcast_keepalive.start()
await self.watcher.wait_until_should_stop()
if self.opal_statistics is not None and self.broadcast_listening_context is not None:
await self.broadcast_listening_context.__aexit__()
logger.info("stopped listening for statistics events on the broadcast channel")


async def stop_server_background_tasks(self):
logger.info("stopping background tasks...")
Expand Down
Loading

0 comments on commit 829d948

Please sign in to comment.