-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
585 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__pycache__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# Summary | ||
|
||
This is a limited feature library that can be used to interact with the Cyral Control Plane. Currently, this is able to perform the following functions: | ||
|
||
- Get a list of repos | ||
- Get recommendations per repo | ||
- Get datamaps per repo | ||
- Manage Recommendations | ||
|
||
# Requirements | ||
|
||
This requires API key that has been granted the following permissions: | ||
|
||
- `View Datamaps` | ||
|
||
Refer to the [Create an API access key](https://cyral.com/docs/api-ref/api-intro#create-an-api-access-key) section of the Cyral Documentation on how to generate an API Key. | ||
|
||
In addition to the API key, this makes use of the `requests` Python library. You can install this by running pip against the included [requirements.txt](./requirements.txt) | ||
``` | ||
pip install -r requirements.txt | ||
``` | ||
|
||
# Using this library | ||
|
||
This requires the following environmental variables to be set: | ||
|
||
| Parameter Name | Description | Default Value | | ||
|-------------------------|----------------------------------------------|---------------| | ||
| CYRAL_API_HOST | The URL of the target CP | NULL | | ||
| CYRAL_API_CLIENT_ID | The client ID of the API Account created | NULL | | ||
| CYRAL_API_CLIENT_SECRET | The client Secret of the API Account created | NULL | | ||
|
||
## Example | ||
|
||
The [get_datamaps.py](./get_datamaps.py) file provides an example of using this library to get all of the data map entries from all repositories. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import requests | ||
import logging | ||
|
||
|
||
class CyralAPIClient: | ||
def __init__(self, config): | ||
self.cyral_api_endpoint = f"{config.api_scheme}://{config.api_hostname}:{config.api_port}/{config.api_version}" | ||
self.api_scheme = config.api_scheme | ||
self.api_hostname = config.api_hostname | ||
self.api_port = config.api_port | ||
self.api_client_id = config.api_client_id | ||
self.api_client_secret = config.api_client_secret | ||
self.api_version = config.api_version | ||
self.auth_before_ping = config.auth_before_ping | ||
self._ping_api() | ||
|
||
def _ping_api(self): | ||
try: | ||
if self.auth_before_ping == "True": | ||
headers = { | ||
"Accept": "application/json", | ||
"Authorization": "Bearer " + self._get_jwt(), | ||
"Content-Type": "application/json", | ||
} | ||
else: | ||
headers = { | ||
"Accept": "application/json", | ||
"Content-Type": "application/json", | ||
} | ||
response = requests.get(self.cyral_api_endpoint + "/ping", headers=headers) | ||
|
||
# We should receive a 200 response | ||
# Return False if we don't get a 200 | ||
if response.status_code != 200: | ||
logging.debug(response.status_code) | ||
raise Exception("Ping Invalid Response Code") | ||
|
||
# Everything is contained within the Ping path so we'll check for it | ||
json_response = response.json() | ||
if "Ping" not in json_response: | ||
logging.error(json_response) | ||
raise Exception("Invalid Ping Response") | ||
|
||
# We need to check for a status and make sure it's what status we expect | ||
if "Status" not in json_response["Ping"]: | ||
logging.error(json_response) | ||
raise Exception("Not Status found Ping Response") | ||
|
||
# Check to make sure the status is OK | ||
if json_response["Ping"]["Status"] != "OK": | ||
logging.error(json_response) | ||
raise Exception("Failure status received in Ping Response") | ||
|
||
# Everything was good so we'll return true for testing | ||
return True | ||
|
||
except Exception as e: | ||
logging.critical(e) | ||
logging.critical("Unable to connect to Cyral Control Plane Status URL") | ||
exit() | ||
|
||
def _get_jwt(self): | ||
data = { | ||
"grant_type": "client_credentials", | ||
"client_id": self.api_client_id, | ||
"client_secret": self.api_client_secret, | ||
} | ||
|
||
# Query the customer CP for a JWT | ||
try: | ||
response = requests.post( | ||
self.cyral_api_endpoint + "/users/oidc/token", | ||
data=data, | ||
) | ||
except Exception as error: | ||
logging.error(error) | ||
raise Exception("JWT Request Failed") | ||
|
||
# We should receive a 200 response | ||
# Return False if we don't get a 200 | ||
if response.status_code != 200: | ||
logging.debug(response.status_code) | ||
raise Exception("JWT Invalid Response Code") | ||
|
||
# access_token contains the JWT so make sure the response contains this | ||
if "access_token" not in response.json(): | ||
logging.error(response.json()) | ||
raise Exception("Invalid JWT Response") | ||
|
||
# Return the generated JWT | ||
return response.json()["access_token"] | ||
|
||
def _make_request(self, endpoint, method="GET", params=None, data=None): | ||
try: | ||
headers = { | ||
"Accept": "application/json", | ||
"Authorization": "Bearer " + self._get_jwt(), | ||
"Content-Type": "application/json", | ||
} | ||
|
||
"""Helper function to make API requests.""" | ||
url = f"{self.cyral_api_endpoint}/{endpoint}" | ||
logging.debug(f"URL Requested: {url}") | ||
logging.debug(f"Method Used : {method}") | ||
logging.debug(f"Parameters On Request : {params}") | ||
logging.debug(f"Data On Request: {data}") | ||
response = requests.request( | ||
method, url, headers=headers, params=params, json=data | ||
) | ||
|
||
if response.status_code >= 400: | ||
logging.warning(response.json()) | ||
raise Exception( | ||
f"Request failed with status code {response.status_code}" | ||
) | ||
|
||
return response.json() | ||
except Exception as e: | ||
logging.error(f"Request failed for Reason : {e}") | ||
return {} | ||
|
||
def get_resource(self, resource_type=None, resource_id=None): | ||
"""Get a specific resource from the API.""" | ||
return self._make_request(endpoint=resource_type) | ||
|
||
def list_resources(self, resource_type=None, params=None): | ||
"""List all resources from the API.""" | ||
return self._make_request(resource_type, params=params) | ||
|
||
def create_resource(self, resource_type=None, data=None): | ||
"""Create a new resource in the API.""" | ||
return self._make_request(endpoint=resource_type, method="POST", data=data) | ||
|
||
def update_resource(self, resource_type, data): | ||
"""Update an existing resource in the API.""" | ||
return self._make_request(endpoint=resource_type, method="PUT", data=data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import os | ||
import logging | ||
|
||
|
||
class cyral_config: | ||
def __init__(self): | ||
self.api_scheme = os.environ.get("CYRAL_API_SCHEME", "https") | ||
self.api_version = os.environ.get("CYRAL_API_VERSION", "v1") | ||
self.api_port = os.environ.get("CYRAL_API_PORT", "443") | ||
self.api_hostname = os.environ.get("CYRAL_API_HOST", None) | ||
self.api_client_id = os.environ.get("CYRAL_API_CLIENT_ID", None) | ||
self.api_client_secret = os.environ.get("CYRAL_API_CLIENT_SECRET", None) | ||
self.auth_before_ping = self.set_ping_value() | ||
try: | ||
self.validate_config() | ||
except Exception as e: | ||
logging.fatal(e) | ||
exit() | ||
|
||
def get(self): | ||
return self | ||
|
||
def set_ping_value(self): | ||
ping_value = os.environ.get("CYRAL_API_PING_AUTH", "True") | ||
if ping_value.lower() == "false": | ||
ping_value = "false" | ||
|
||
return ping_value | ||
|
||
def validate_config(self): | ||
if self.api_scheme not in ["http", "https"]: | ||
raise Exception(f"{self.api_scheme} is an invalid API Scheme") | ||
|
||
if self.api_version not in ["v1"]: | ||
raise Exception(f"{self.api_version} is an invalid API Version") | ||
|
||
if self.api_port not in ["443", "8000"]: | ||
raise Exception(f"{self.api_port} is not a standard Cyral API Port") | ||
|
||
if not self.api_hostname: | ||
raise Exception( | ||
"You must specify a valid hostname for your Cyral Control Plane" | ||
) | ||
|
||
if not self.api_client_id: | ||
raise Exception( | ||
"You must provide a valid Client ID for your Cyral Control Plane" | ||
) | ||
|
||
if not self.api_client_secret: | ||
raise Exception( | ||
"You must provide a valid Client Secret for your Cyral Control Plane" | ||
) | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import datetime | ||
import logging | ||
|
||
def get_utc_time(current_time = str(datetime.datetime.utcnow()), offset = None): | ||
try: | ||
my_time = datetime.datetime.fromisoformat(current_time) | ||
if offset: | ||
my_time = my_time + datetime.timedelta(hours=offset) | ||
except Exception as e: | ||
logging.debug(e) | ||
my_time = datetime.datetime.utcnow() | ||
|
||
return my_time.strftime('%Y-%m-%dT%H:%M:%SZ') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from client import cyral_sdk | ||
from common import config | ||
import logging | ||
|
||
|
||
class datamap: | ||
def __init__(self): | ||
try: | ||
self.api_client = cyral_sdk.CyralAPIClient(config.cyral_config()) | ||
except Exception as e: | ||
logging.error(e) | ||
exit() | ||
|
||
def _valid_recommendation(self, recommendation): | ||
if "id" not in recommendation: | ||
logging.error("'id' not found in recommendation") | ||
return False | ||
|
||
if "repo" not in recommendation: | ||
logging.error("'repo' not found in recommendation") | ||
return False | ||
|
||
if "attribute" not in recommendation: | ||
logging.error("'attribute' not found in recommendation") | ||
return False | ||
|
||
if "label" not in recommendation: | ||
logging.error("'label' not found in recommendation") | ||
return False | ||
|
||
if "status" not in recommendation: | ||
logging.error("'status' not found in recommendation") | ||
return False | ||
|
||
if "source" not in recommendation: | ||
logging.error("'source' not found in recommendation") | ||
return False | ||
|
||
if "createdAt" not in recommendation: | ||
logging.error("'createdAt' not found in recommendation") | ||
return False | ||
|
||
if "updatedAt" not in recommendation: | ||
logging.error("'updatedAt' not found in recommendation") | ||
return False | ||
|
||
return True | ||
|
||
def recommendations_as_dict(self): | ||
recommendations = self.get_recommendations() | ||
|
||
if "recommendations" not in recommendations: | ||
raise Exception("Unexpected Recommendations Format") | ||
|
||
if len(recommendations) < 1: | ||
raise Exception("No Recommendations Found") | ||
|
||
recommendations_dict = {} | ||
for recommendation in recommendations["recommendations"]: | ||
if self._valid_recommendation(recommendation): | ||
if recommendation["repo"] not in recommendations_dict: | ||
recommendations_dict[recommendation["repo"]] = [] | ||
recommendations_dict[recommendation["repo"]].append(recommendation) | ||
|
||
return recommendations_dict | ||
|
||
def get_recommendations( | ||
self, page=1, items_per_page=2147483647, status="RECOMMENDED" | ||
): | ||
# Query the Control Plane to get the list of recommendations | ||
try: | ||
params = {"page": page, "itemsPerPage": items_per_page, "status": status} | ||
return self.api_client.list_resources( | ||
resource_type="datamap/recommendations", params=params | ||
) | ||
except Exception as e: | ||
logging.info(e) | ||
raise Exception("Failed to get recommendations") | ||
|
||
def get_datamap(self, repo_id): | ||
# Query the Control Plane to get the list of recommendations | ||
try: | ||
return self.api_client.list_resources( | ||
resource_type=f"repos/{repo_id}/datamap" | ||
) | ||
except Exception as e: | ||
logging.info(e) | ||
raise Exception("Failed to get datamap") | ||
|
||
def manage_recommendation(self, repo_id, recommendation_id, status="DISMISSED"): | ||
# Query the Control Plane to set the status of the recommendation | ||
try: | ||
data = {"status": status} | ||
return self.api_client.update_resource( | ||
resource_type=f"repos/{repo_id}/datamap/recommendations/{recommendation_id}/status", | ||
data=data, | ||
) | ||
except Exception as e: | ||
logging.info(e) | ||
raise Exception("Failed to update recommendation") |
Oops, something went wrong.