Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #37 - Correct overwriting of line number data. #39

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion atom_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
A cli, classes and functions for converting an atom slice to a different format
"""
__version__ = '0.4.3'
__version__ = '0.4.4'
232 changes: 162 additions & 70 deletions atom_tools/lib/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,36 @@ def __init__(
self.regex_param_count = 0
self.target_line_nums: Dict[str, Dict] = {}

def convert_usages(self) -> Dict[str, Any]:
def convert_usages(self) -> Dict[str, Dict]:
"""
Converts usages to OpenAPI.
"""
methods = self._process_methods()
methods = self.methods_to_endpoints(methods)
self.create_file_to_method_dict(methods)
self._identify_target_line_nums(methods)
self.target_line_nums = self._identify_target_line_nums(methods)
self.file_endpoint_map = self.create_file_to_method_dict(methods)
methods = self._process_calls(methods)
return self.populate_endpoints(methods)

def create_file_to_method_dict(self, method_map):
def create_file_to_method_dict(self, method_map: Dict[str, Any]) -> Dict[str, List]:
"""
Creates a dictionary of endpoints and methods.
"""
file_names = list(method_map.get('file_names').keys())
file_endpoint_map = {i: [] for i in file_names}
if not method_map:
return {}
file_names = list(method_map.get('file_names', {}).keys())
file_endpoint_map: Dict = {i: [] for i in file_names}
for full_name in file_names:
for values in method_map['file_names'][full_name]['resolved_methods'].values():
file_endpoint_map[full_name].extend(values.get('endpoints'))
for k, v in file_endpoint_map.items():
# filename = k.split(':')[0]
endpoints = set(v)
for i in endpoints:
if self.file_endpoint_map.get(i):
self.file_endpoint_map[i].add(k)
else:
self.file_endpoint_map[i] = {k}
self.file_endpoint_map = {k: list(v) for k, v in self.file_endpoint_map.items()}
return {k: list(v) for k, v in self.file_endpoint_map.items()}

def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
"""
Expand All @@ -98,11 +99,11 @@ def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
calls, ep, filename, call_line_numbers, target_line_number
)
if paths_object.get(ep):
paths_object[ep] |= paths_item_object
paths_object[ep] = merge_path_objects(paths_object[ep], paths_item_object)
else:
paths_object |= {ep: paths_item_object}

return _remove_nested_parameters(paths_object)
return remove_nested_parameters(paths_object)

def endpoints_to_openapi(self, server: str = '') -> Any:
"""
Expand Down Expand Up @@ -133,7 +134,6 @@ def methods_to_endpoints(self, method_map: Dict[str, Any]) -> Dict[str, Any]:
for file_name, resolved_methods in method_map.items():
if new_resolved := self._process_resolved_methods(resolved_methods):
new_method_map['file_names'][file_name] = {'resolved_methods': new_resolved}

return new_method_map

def populate_endpoints(self, method_map: Dict) -> Dict[str, Any]:
Expand All @@ -155,7 +155,6 @@ def populate_endpoints(self, method_map: Dict) -> Dict[str, Any]:
paths_object = merge_path_objects(paths_object, new_path_item)
else:
paths_object = new_path_item

return paths_object

def _calls_to_params(self, ep: str, orig_ep: str, call: Dict | None) -> Dict[str, Any]:
Expand Down Expand Up @@ -308,24 +307,24 @@ def _generic_params_helper(self, endpoint: str, orig_endpoint: str) -> List[Dict
)
return params

def _identify_target_line_nums(self, methods):
def _identify_target_line_nums(self, methods: Dict[str, Any]) -> Dict:
file_names = list(methods['file_names'].keys())
if not file_names:
return
conditional = [f'fileName==`{json.dumps(i)}`' for i in file_names]
conditional = '*[?' + ' || '.join(conditional) + (
return {}
conditional = [f'fileName==`{i}`' for i in file_names]
conditional = '*[?' + ' || '.join(conditional) + ( # type: ignore
'][].{file_name: fileName, methods: usages[].targetObj[].{resolved_method: '
'resolvedMethod || callName || code || name, line_number: lineNumber}}')
pattern = jmespath.compile(conditional)
pattern = jmespath.compile(conditional) # type: ignore
result = pattern.search(self.usages.content)
result = {i['file_name']: i['methods'] for i in result if i['methods']}
targets = {i: {} for i in result}
targets: Dict = {i: {} for i in result}

for k, v in result.items():
for i in v:
targets[k] |= {i['resolved_method']: i['line_number']}
targets[k] = merge_targets(targets[k], {i['resolved_method']: i['line_number']})

self.target_line_nums = targets
return targets

def _paths_object_helper(
self,
Expand All @@ -351,11 +350,13 @@ def _paths_object_helper(
if calls:
for call in calls:
paths_item_object |= self._calls_to_params(ep, orig_ep, call)
if (call_line_numbers or line_number) and (line_nos := _create_ln_entries(
if (call_line_numbers or line_number) and (line_nos := create_ln_entries(
filename, list(set(call_line_numbers)), line_number)):
paths_item_object |= line_nos
# if line_number:
# paths_item_object['x-atom-usages-target'] = {filename: line_number}
if 'x-atom-usages' in paths_item_object:
paths_item_object['x-atom-usages'] = merge_x_atom(
paths_item_object['x-atom-usages'], line_nos)
else:
paths_item_object |= line_nos
return ep, paths_item_object

def _parse_path_regexes(self, endpoint: str) -> str:
Expand Down Expand Up @@ -535,28 +536,51 @@ def _query_calls_helper(self, file_name: str) -> List[Dict]:
Returns:
list: The result of searching for the calls pattern in the usages.
"""
pattern = f'objectSlices[?fileName==`{json.dumps(file_name)}`].usages[].*[?callName][][]'
pattern = (f'objectSlices[?fileName==`{json.dumps(file_name.encode().decode())}`].usages[]'
f'.*[?callName][][]')
compiled_pattern = jmespath.compile(pattern)
return compiled_pattern.search(self.usages.content)


def merge_path_objects(p1: Dict, p2: Dict) -> Dict:
def create_ln_entries(filename: str, call_line_numbers: List, line_number: int | None) -> Dict:
"""
Merge two dictionaries representing path objects.
Creates line number entries for a given filename and line numbers.

Args:
p1 (dict): The first dictionary representing a path object.
p2 (dict): The second dictionary representing a path object.
filename (str): The name of the file.
call_line_numbers (list): A list of call line numbers.
line_number (int): Target line number.

Returns:
dict: The merged dictionary representing the path object.
dict: A dictionary containing line number entries.
"""
for key, value in p2.items():
if p1.get(key):
p1[key].update(value)
else:
p1[key] = value
return p1
fn = filename.split(':')[0]
x_atom: Dict = {'x-atom-usages': {}}
if call_line_numbers:
x_atom['x-atom-usages']['call'] = {fn: call_line_numbers}
if line_number:
x_atom['x-atom-usages']['target'] = {fn: line_number}
return x_atom


def determine_operations(call: Dict, params: List) -> Dict[str, Any]:
"""
Determine the supported operations based on the call and parameters.

Args:
call (dict): The call information.
params (list): The parameters for the call.

Returns:
dict: A dictionary containing the supported operations and their
parameters and responses.
"""
ops = {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}
if found := [op for op in ops if op in call.get('resolvedMethod', '').lower()]:
if params:
return {op: {'parameters': params, 'responses': {}} for op in found}
return {op: {'responses': {}} for op in found}
return {'parameters': params} if params else {}


def filter_calls(
Expand Down Expand Up @@ -584,59 +608,127 @@ def filter_calls(
return resolved_methods


def determine_operations(call: Dict, params: List) -> Dict[str, Any]:
def merge_operations(op1: Dict, op2: Dict) -> Dict:
"""
Determine the supported operations based on the call and parameters.
Merge two dictionaries of operations.

Args:
call (dict): The call information.
params (list): The parameters for the call.
op1 (dict): The first dictionary of operations.
op2 (dict): The second dictionary of operations.

Returns:
dict: A dictionary containing the supported operations and their
parameters and responses.
dict: The merged dictionary of operations.
"""
ops = {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}
if found := [op for op in ops if op in call.get('resolvedMethod', '').lower()]:
if params:
return {op: {'parameters': params, 'responses': {}} for op in found}
return {op: {'responses': {}} for op in found}
return {'parameters': params} if params else {}
for k, v in op2.items():
if v and not op1.get(k) or op1[k] == {}:
op1[k] = v
elif k == 'parameters' and v:
op1[k] = merge_params(op1[k], v)
return op1


def _remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
def merge_params(p1: List, p2: List) -> List:
"""
Removes nested path parameters from the given data.
Merge two lists of parameters.

Args:
data (dict): The data containing nested path parameters.
p1 (list): The first list of parameters.
p2 (list): The second list of parameters.

Returns:
dict: The modified data with the nested path parameters removed.
list: The merged list of parameters.
"""
for value in data.values():
for v in value.values():
if isinstance(v, dict) and "parameters" in v and isinstance(v["parameters"], list):
v["parameters"] = [param for param in v["parameters"] if
param.get("in") != "path"]
return data
names = [i.get('name') for i in p1]
for i in p2:
if i.get('name', '') not in names:
p1.append(i)
return p1


def _create_ln_entries(filename, call_line_numbers, line_numbers):
def merge_path_objects(p1: Dict, p2: Dict) -> Dict:
"""
Creates line number entries for a given filename and line numbers.
Merge two dictionaries representing path objects.

Args:
filename (str): The name of the file.
call_line_numbers (list): A list of line numbers.
p1 (dict): The first dictionary representing a path object.
p2 (dict): The second dictionary representing a path object.

Returns:
dict: A dictionary containing line number entries.
dict: The merged dictionary representing the path object.
"""
fn = filename.split(':')[0]
x_atom = {'x-atom-usages': {}}
if call_line_numbers:
x_atom['x-atom-usages']['call'] = {fn: call_line_numbers}
if line_numbers:
x_atom['x-atom-usages']['target'] = {fn: line_numbers}
return x_atom
for key, value in p2.items():
if key not in p1:
p1[key] = value
continue
for k, v in value.items():
if p1[key].get(k):
if k == 'x-atom-usages':
p1[key][k] = merge_x_atom(p1[key][k], v)
elif k == 'parameters':
p1[key][k] = merge_params(p1[key][k], v)
elif k in {'get', 'put', 'post', 'delete', 'options', 'head', 'patch'}:
p1[key][k] = merge_operations(p1[key][k], v)
continue
p1[key][k] = v

return p1


def merge_targets(t1: Dict, t2: Dict) -> Dict:
"""
Merge two dictionaries of targets.

Args:
t1 (dict): The first dictionary of targets.
t2 (dict): The second dictionary of targets.

Returns:
dict: The merged dictionary of targets.
"""
for k, v in t2.items():
if k in t1:
t1[k].append(v)
else:
t1[k] = [v]
return t1


def merge_x_atom(x1: Dict, x2: Dict) -> Dict:
"""
Merge two dictionaries of x-atom-usages.

Args:
x1 (dict): The first dictionary of x atoms.
x2 (dict): The second dictionary of x atoms.

Returns:
dict: The merged dictionary of x atoms.
"""
for key, value in x2.items():
if key not in x1:
x1[key] = value
continue
for k, v in value.items():
if x1[key].get(k):
x1[key][k].extend(v)
else:
x1[key][k] = v
return x1


def remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
"""
Removes nested path parameters from the given data.

Args:
data (dict): The data containing nested path parameters.

Returns:
dict: The modified data with the nested path parameters removed.
"""
for value in data.values():
for v in value.values():
if isinstance(v, dict) and 'parameters' in v and isinstance(v['parameters'], list):
v['parameters'] = [param for param in v['parameters'] if
param.get('in') != 'path']
return data
5 changes: 2 additions & 3 deletions atom_tools/lib/regex_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from dataclasses import dataclass
from typing import Tuple, List, Dict, Any


logger: logging.Logger = logging.getLogger(__name__)
py_type_mapping = {'int': 'integer', 'string': 'string', 'float': 'number', 'path': 'string'}


@dataclass
Expand Down Expand Up @@ -151,6 +153,3 @@ def create_tmp_regex_name(element: str, m: Tuple | str, count: int) -> Tuple[str
def fwd_slash_repl(match: re.Match) -> str:
"""For substituting forward slashes."""
return str(match['paren'].replace('/', '$L@$H'))


py_type_mapping = {'int': 'integer', 'string': 'string', 'float': 'number', 'path': 'string'}
4 changes: 3 additions & 1 deletion atom_tools/lib/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def import_slice(filename: str | Path) -> Tuple[Dict, str]:
return {}, 'unknown'
try:
with open(filename, 'r', encoding='utf-8') as f:
content = json.load(f)
raw_content = f.read()
raw_content = raw_content.replace(r'\\', '/')
content = json.loads(raw_content)
if content.get('objectSlices'):
return content, 'usages'
if content.get('reachables'):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "atom-tools"
version = "0.4.3"
version = "0.4.4"
description = "Collection of tools for use with AppThreat/atom."
authors = [
{ name = "Caroline Russell", email = "[email protected]" },
Expand Down
Loading
Loading