-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: several refactors to fix bugs and lint errors
- Loading branch information
1 parent
4f28955
commit 602071a
Showing
9 changed files
with
134 additions
and
134 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,5 +59,5 @@ jobs: | |
ruff src --ignore=E501,I001 | ||
- name: Lint with mypy | ||
run: | | ||
mypy src | ||
mypy src --ignore-mission-imports | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import logging | ||
from ca_pwt.helpers.dict import remove_element_from_dict, cleanup_odata_dict | ||
from ca_pwt.helpers.utils import remove_element_from_dict, cleanup_odata_dict, ensure_list | ||
from ca_pwt.helpers.graph_api import EntityAPI | ||
from ca_pwt.policies_mappings import replace_values_by_keys_in_policies | ||
from ca_pwt.groups import get_groups_by_ids | ||
|
@@ -12,20 +12,19 @@ def _get_entity_path(self) -> str: | |
return "identity/conditionalAccess/policies" | ||
|
||
|
||
def load_policies(input_file: str) -> dict: | ||
def load_policies(input_file: str) -> list[dict]: | ||
"""Loads policies from the specified file. | ||
It also cleans up the dictionary to remove unnecessary elements.""" | ||
import json | ||
|
||
with open(input_file) as f: | ||
_logger.info(f"Reading policies from file {input_file}...") | ||
|
||
policies = json.load(f) | ||
policies = cleanup_odata_dict(policies, ensure_list=True) | ||
return policies | ||
policies = cleanup_odata_dict(json.load(f)) | ||
return ensure_list(policies) | ||
|
||
|
||
def save_policies(policies: dict, output_file: str): | ||
def save_policies(policies: list[dict], output_file: str): | ||
"""Saves policies to the specified file.""" | ||
import json | ||
|
||
|
@@ -34,26 +33,26 @@ def save_policies(policies: dict, output_file: str): | |
f.write(json.dumps(policies, indent=4)) | ||
|
||
|
||
def cleanup_policies(source: dict) -> dict: | ||
def cleanup_policies(policies: list[dict]) -> list[dict]: | ||
"""Cleans up the policies dictionary for import by | ||
removing disallowed elements while importing. (e.g. id, createdDateTime, | ||
modifiedDateTime, templateId, id""" | ||
_logger.info("Cleaning up policies...") | ||
|
||
# exclude some elements, namely createdDateTime, | ||
# modifiedDateTime, id, templateId, [email protected] | ||
for policy in source: | ||
for policy in policies: | ||
remove_element_from_dict(policy, "createdDateTime") | ||
remove_element_from_dict(policy, "modifiedDateTime") | ||
remove_element_from_dict(policy, "id") | ||
remove_element_from_dict(policy, "templateId") | ||
grant_controls = policy["grantControls"] | ||
if grant_controls is not None: | ||
remove_element_from_dict(grant_controls, "[email protected]") | ||
return source | ||
return policies | ||
|
||
|
||
def export_policies(access_token: str, odata_filter: str | None = None) -> dict: | ||
def export_policies(access_token: str, odata_filter: str | None = None) -> list[dict]: | ||
"""Exports all policies with the specified filter. Filter is | ||
an OData filter string.""" | ||
policies_api = PoliciesAPI(access_token=access_token) | ||
|
@@ -62,14 +61,15 @@ def export_policies(access_token: str, odata_filter: str | None = None) -> dict: | |
policies = response.json() | ||
|
||
_logger.debug(f"Obtained policies: {policies}") | ||
policies = cleanup_odata_dict(policies, ensure_list=True) | ||
policies = cleanup_odata_dict(policies) | ||
policies = ensure_list(policies) | ||
_logger.debug(f"Formatted policies: {policies}") | ||
return policies | ||
|
||
|
||
def import_policies( | ||
access_token: str, | ||
policies: dict, | ||
policies: list[dict], | ||
*, | ||
allow_duplicates: bool = False, | ||
) -> list[tuple[str, str]]: | ||
|
@@ -86,7 +86,7 @@ def import_policies( | |
policies = cleanup_policies(policies) | ||
created_policies: list[tuple[str, str]] = [] | ||
for policy in policies: | ||
display_name = policy.get("displayName") | ||
display_name: str = str(policy.get("displayName")) | ||
|
||
# check if the policy already exists | ||
if not allow_duplicates: | ||
|
@@ -108,10 +108,10 @@ def import_policies( | |
|
||
def get_groups_in_policies( | ||
access_token: str, | ||
policies: dict, | ||
policies: list[dict], | ||
*, | ||
ignore_not_found: bool = False, | ||
) -> dict: | ||
) -> list[dict]: | ||
"""Obtains all groups referenced by the policies in the policies dict. | ||
If ignore_not_found is True, groups that are not found are ignored. | ||
Returns a dictionary with the groups.""" | ||
|
Oops, something went wrong.