Skip to content

Commit

Permalink
Graph filtering by point in time (#100)
Browse files Browse the repository at this point in the history
* Fix image name typo in contribution guide

Signed-off-by: Bartosz Zurkowski <[email protected]>

* Add graph filtering by point in time

Signed-off-by: Bartosz Zurkowski <[email protected]>
  • Loading branch information
bzurkowski authored Aug 24, 2020
1 parent 4cd7744 commit 882cb93
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ $ telepresence \
-v=/tmp/telepresence/var/run/secrets:/var/run/secrets \
-v=/tmp/telepresence/etc/orca:/etc/orca \
-v $(pwd):/app \
openrca/rca
openrca/orca
```

The command above swaps the deployment specified by `--namespace` and `--swap-deployment` flags with
Expand Down
3 changes: 2 additions & 1 deletion orca/api/resources/v1/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def __init__(self, api, graph):
self._graph = graph

def get(self):
return marshal(self._graph.get_nodes(kind='alert'), alerts_fields)
properties = {'kind': 'alert'}
return marshal(self._graph.get_nodes(properties=properties), alerts_fields)


def initialize(graph):
Expand Down
10 changes: 7 additions & 3 deletions orca/api/resources/v1/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from flask_restx import Model, Namespace, Resource, fields, marshal
from flask_restx import Model, Namespace, Resource, fields, marshal, reqparse


node_fields = Model('Graph Node', {
Expand All @@ -38,6 +38,9 @@
fields.Nested(link_fields), attribute='links')
})

query_parser = reqparse.RequestParser()
query_parser.add_argument('time_point', type=int)


class Graph(Resource):

Expand All @@ -46,9 +49,10 @@ def __init__(self, api, graph):
self._graph = graph

def get(self):
args = query_parser.parse_args()
data = {
'nodes': self._graph.get_nodes(),
'links': self._graph.get_links()
'nodes': self._graph.get_nodes(time_point=args['time_point']),
'links': self._graph.get_links(time_point=args['time_point'])
}
return marshal(data, graph_fields)

Expand Down
32 changes: 22 additions & 10 deletions orca/graph/drivers/arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ def setup(self):
links_col = graph.edge_collection('links')
links_col.add_hash_index(fields=['id'], unique=False)

def get_nodes(self, **query):
def get_nodes(self, time_point, properties):
query_pattern = (
'FOR node in nodes '
'FILTER node.deleted_at == null '
'%(filters)s '
'RETURN node')
filters = self._build_filters(query, handle='node')
filters = self._build_filters(time_point, properties, handle='node')
documents = self._execute_aql(query_pattern, filters=filters)
return [self._build_node_obj(document) for document in documents]

Expand Down Expand Up @@ -122,15 +121,14 @@ def delete_node(self, node):
'REMOVE node IN nodes')
self._execute_aql(query_pattern, node_id=node.id)

def get_links(self, **query):
def get_links(self, time_point, properties):
query_pattern = (
'FOR link in links '
'FILTER link.deleted_at == null '
'%(filters)s '
'LET source = DOCUMENT(link._from) '
'LET target = DOCUMENT(link._to)'
'RETURN {link, source, target}')
filters = self._build_filters(query, handle='link')
filters = self._build_filters(time_point, properties, handle='link')
documents = self._execute_aql(query_pattern, filters=filters)
links = []
for document in documents:
Expand Down Expand Up @@ -200,7 +198,7 @@ def get_node_links(self, node, **query):
'FILTER link.deleted_at == null '
"%(filters)s "
'RETURN {link, source, target}')
filters = self._build_filters(query, handle='target')
filters = self._build_property_filters(query, handle='target')
documents = self._execute_aql(
query_pattern, source_id=node.id, filters=filters)
links = []
Expand All @@ -223,13 +221,27 @@ def _use_database(self, database):
def _use_graph(self, graph):
return self._database.graph(graph)

def _build_filters(self, query, handle):
flatten_query = utils.flatten_dict(query, sep='.')
def _build_filters(self, time_point, properties, handle):
filters = []
filters.append(self._build_property_filters(properties, handle=handle))
if time_point:
filters.append(self._build_time_filter(time_point, handle=handle))
return ' '.join(filters)

def _build_property_filters(self, properties, handle):
flatten_properties = utils.flatten_dict(properties, sep='.')
filters = []
for key, value in flatten_query.items():
for key, value in flatten_properties.items():
filters.append('FILTER %s.%s == "%s"' % (handle, key, value))
return ' '.join(filters)

def _build_time_filter(self, time_point, handle):
filters = []
filters.append('FILTER %s.created_at <= %i' % (handle, time_point))
filters.append(
'FILTER %s.deleted_at == null OR %s.deleted_at > %i' % (handle, handle, time_point))
return ' '.join(filters)

def _execute_aql(self, query_pattern, **params):
query = query_pattern % params
cursor = self._database.aql.execute(query)
Expand Down
8 changes: 4 additions & 4 deletions orca/graph/drivers/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def setup(self):
"""Initializes graph database."""

@abc.abstractmethod
def get_nodes(self, kind, properties):
def get_nodes(self, time_point, properties):
"""Gets all graph nodes."""

@abc.abstractmethod
def get_node(self, id, kind, properties):
def get_node(self, id):
"""Gets graph node details."""

@abc.abstractmethod
Expand All @@ -44,11 +44,11 @@ def delete_node(self, node):
"""Deletes a graph node."""

@abc.abstractmethod
def get_links(self, properties):
def get_links(self, time_point, properties):
"""Gets all graph links."""

@abc.abstractmethod
def get_link(self, id, properties):
def get_link(self, id):
"""Gets graph link details."""

@abc.abstractmethod
Expand Down
16 changes: 12 additions & 4 deletions orca/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ def setup(self):
LOG.info("Initializing graph database")
self._driver.setup()

def get_nodes(self, **query):
return self._driver.get_nodes(**query)
def get_nodes(self, time_point=None, properties=None):
if not time_point:
time_point = utils.get_utc()
if not properties:
properties = {}
return self._driver.get_nodes(time_point, properties)

def get_node(self, node_id):
return self._driver.get_node(node_id)
Expand Down Expand Up @@ -123,8 +127,12 @@ def delete_node(self, node_id):
self._driver.update_node(node)
self._notify_listeners(GraphEvent.NODE_DELETED, node)

def get_links(self, **query):
return self._driver.get_links(**query)
def get_links(self, time_point=None, properties=None):
if not time_point:
time_point = utils.get_utc()
if not properties:
properties = {}
return self._driver.get_links(time_point, properties)

def get_link(self, link_id):
return self._driver.get_link(link_id)
Expand Down
8 changes: 4 additions & 4 deletions orca/topology/alerts/linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def _get_current_links(self, node):

def _get_target_nodes(self, alert_node):
source_mapping = alert_node.properties.source_mapping
return self._graph.get_nodes(
origin=source_mapping.origin,
kind=source_mapping.kind,
properties=source_mapping.properties)
properties = {'origin': source_mapping.origin,
'kind': source_mapping.kind,
'properties': source_mapping.properties}
return self._graph.get_nodes(properties=properties)


class AlertLinker(Linker):
Expand Down
4 changes: 2 additions & 2 deletions orca/topology/infra/kiali/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def _synchronize_links(self, edges, service_mapping):
self._link_services(source_node, target_node, properties)

def _get_service(self, mapping):
matches = self._graph.get_nodes(
origin='kubernetes', kind='service', properties=mapping)
properties = {'origin': 'kubernetes', 'kind': 'service', 'properties': mapping}
matches = self._graph.get_nodes(properties=properties)
if matches:
return matches[0]

Expand Down
3 changes: 2 additions & 1 deletion orca/topology/linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def _get_source_nodes(self, target_node):
return self._get_nodes_by_spec(self.source_spec)

def _get_nodes_by_spec(self, node_spec):
return self._graph.get_nodes(origin=node_spec.origin, kind=node_spec.kind)
properties = {'origin': node_spec.origin, 'kind': node_spec.kind}
return self._graph.get_nodes(properties=properties)

def _build_link_lookup(self, links):
return {link.id: link for link in links}
4 changes: 2 additions & 2 deletions orca/topology/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def _synchronize(self):
self._synchronizer.synchronize(nodes_in_graph, upstream_nodes)

def _get_nodes_in_graph(self):
return self._graph.get_nodes(
origin=self._extractor.origin, kind=self._extractor.kind)
properties = {'origin': self._extractor.origin, 'kind': self._extractor.kind}
return self._graph.get_nodes(properties=properties)

def _get_upstream_nodes(self):
entities = self._upstream_proxy.get_all()
Expand Down

0 comments on commit 882cb93

Please sign in to comment.