diff --git a/snyk/client.py b/snyk/client.py index 27794fc..62fb395 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -3,25 +3,31 @@ from typing import Any, List, Optional import requests +from requests.compat import urljoin from retry.api import retry_call +from copy import deepcopy from .__version__ import __version__ from .errors import SnykHTTPError, SnykNotImplementedError from .managers import Manager -from .models import Organization, Project +from .models import Organization, Project, OrganizationGroup, User from .utils import cleanup_path logger = logging.getLogger(__name__) class SnykClient(object): - API_URL = "https://api.snyk.io/v1" + WEB_URL = "https://app.snyk.io" + API_URL = "https://api.snyk.io/rest" + VERSION = "2023-08-04~experimental" + API_URL_V1 = "https://api.snyk.io/v1" USER_AGENT = "pysnyk/%s" % __version__ def __init__( self, token: str, url: Optional[str] = None, + web_url: Optional[str] = None, user_agent: Optional[str] = USER_AGENT, debug: bool = False, tries: int = 1, @@ -29,6 +35,7 @@ def __init__( backoff: int = 2, verify: bool = True, version: Optional[str] = None, + limit: int = 100, ): self.api_token = token self.api_url = url or self.API_URL @@ -42,7 +49,9 @@ def __init__( self.backoff = backoff self.delay = delay self.verify = verify - self.version = version + self.version = version or self.VERSION + self.web_url = web_url or self.WEB_URL + self.limit = limit # Ensure we don't have a trailing / if self.api_url[-1] == "/": @@ -64,6 +73,7 @@ def request( resp = method( url, headers=headers, params=params, json=json, verify=self.verify ) + elif params and not json: resp = method(url, headers=headers, params=params, verify=self.verify) elif json and not params: @@ -97,6 +107,39 @@ def post(self, path: str, body: Any, headers: dict = {}) -> requests.Response: return resp + def patch(self, path: str, body: Any, headers: dict = {}, params: dict = None) -> requests.Response: + path = cleanup_path(path) + + url = f"{self.api_url}/{path}" + + logger.debug(f"PATCH: {url}") + + if params or self.version: + + if not params: + params = {} + + # we use the presence of version to determine if we are REST or not + if "version" not in params.keys() and self.version: + params["version"] = self.version + + resp = retry_call( + self.request, + fargs=[requests.patch, url], + fkwargs={"json": body, "headers": {**self.api_post_headers, **headers}, "params": params}, + tries=self.tries, + delay=self.delay, + backoff=self.backoff, + exceptions=SnykHTTPError, + logger=logger, + ) + + if not resp.ok: + logger.error(resp.text) + raise SnykHTTPError(resp) + + return resp + def put(self, path: str, body: Any, headers: dict = {}) -> requests.Response: url = "%s/%s" % (self.api_url, path) logger.debug("PUT: %s" % url) @@ -192,39 +235,56 @@ def delete(self, path: str) -> requests.Response: return resp + def get_rest_page(self, path: str, params: dict = {}) -> dict: + """Helper function to colleged unpaginated responses from the rest API as a dictionary + + This takes the "data" list from the response and returns it""" + + return self.get(path, params).json()['data'] + def get_rest_pages(self, path: str, params: dict = {}) -> List: """ Helper function to collect paginated responses from the rest API into a single list. - +ff This collects the "data" list from the first reponse and then appends the any further "data" lists if a next link is found in the links field. """ # this is a raw primative but a higher level module might want something that does an # arbitrary path + origin=foo + limit=100 url construction instead before being sent here + + # Making sure references to param can't be passed between methods + params = deepcopy(params) - limit = params["limit"] + if 'limit' in params.keys(): + limit = params["limit"] + else: + limit = self.limit + params["limit"] = self.limit + print("limit: ", limit) data = list() page = self.get(path, params).json() data.extend(page["data"]) - - while "next" in page["links"].keys(): + + while "next" in page["links"].keys() and len(page["data"]) >= limit: logger.debug( f"GET_REST_PAGES: Another link exists: {page['links']['next']}" ) + # Check for "/rest" at the end of the root url and beginning of next url + if page["links"]["next"][:5] == "/rest" and self.api_url[-5:] == "/rest": + page["links"]["next"] = page["links"]["next"][5:] + next_url = urllib.parse.urlsplit(page["links"]["next"]) query = urllib.parse.parse_qs(next_url.query) for k, v in query.items(): params[k] = v - params["limit"] = limit - page = self.get(next_url.path, params).json() data.extend(page["data"]) @@ -254,9 +314,16 @@ def notification_settings(self): # https://snyk.docs.apiary.io/#reference/groups/organisations-in-groups/create-a-new-organisation-in-the-group # https://snyk.docs.apiary.io/#reference/0/list-members-in-a-group/list-all-members-in-a-group # https://snyk.docs.apiary.io/#reference/0/members-in-an-organisation-of-a-group/add-a-member-to-an-organisation-from-another-organisation-in-the-group - def groups(self): - raise SnykNotImplementedError # pragma: no cover + @property + def groups(self) -> Manager: + return Manager.factory(OrganizationGroup, self) # https://snyk.docs.apiary.io/#reference/reporting-api/issues/get-list-of-issues def issues(self): raise SnykNotImplementedError # pragma: no cover + + # At the client level this should only be able to return the results for /self + # https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#get-/self + @property + def users(self) -> Manager: + return Manager.factory(User, self) diff --git a/snyk/managers.py b/snyk/managers.py index 8ffa730..3dd302a 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -1,11 +1,16 @@ import abc +import logging +import json from typing import Any, Dict, List +from requests.compat import urljoin +from copy import deepcopy from deprecation import deprecated # type: ignore -from .errors import SnykError, SnykNotFoundError, SnykNotImplementedError +from .errors import SnykError, SnykNotFoundError, SnykNotImplementedError, SnykHTTPError from .utils import snake_to_camel +logger = logging.getLogger(__name__) class Manager(abc.ABC): def __init__(self, klass, client, instance=None): @@ -46,23 +51,27 @@ def factory(klass, client, instance=None): else: key = klass.__name__ manager = { - "Project": ProjectManager, - "Organization": OrganizationManager, - "Member": MemberManager, - "License": LicenseManager, - "Dependency": DependencyManager, - "Entitlement": EntitlementManager, - "Setting": SettingManager, - "Ignore": IgnoreManager, - "JiraIssue": JiraIssueManager, - "DependencyGraph": DependencyGraphManager, - "IssueSet": IssueSetManager, - "IssueSetAggregated": IssueSetAggregatedManager, - "Integration": IntegrationManager, - "IntegrationSetting": IntegrationSettingManager, - "Tag": TagManager, - "IssuePaths": IssuePathsManager, + "Project": ProjectManager, + "Organization": OrganizationManager, + "Member": MemberManager, + "License": LicenseManager, + "Dependency": DependencyManager, + "Entitlement": EntitlementManager, + "Setting": SettingManager, + "Ignore": IgnoreManager, + "JiraIssue": JiraIssueManager, + "DependencyGraph": DependencyGraphManager, + "IssueSet": IssueSetManager, + "IssueSetAggregated": IssueSetAggregatedManager2, + "Integration": IntegrationManager, + "IntegrationSetting": IntegrationSettingManager, + "Tag": TagManager, + "IssuePaths": IssuePathsManager, + "OrganizationGroup": OrganizationGroupManager, + "OrganizationGroupTags": OrganizationGroupTagManager, + "User": UserManager, }[key] + return manager(klass, client, instance) except KeyError: raise SnykError @@ -106,100 +115,408 @@ def filter(self, **kwargs: Any): class OrganizationManager(Manager): def all(self): - resp = self.client.get("orgs") + params = {'limit': self.client.limit} + resp = self.client.get_rest_pages("/orgs", params) + orgs = [] - if "orgs" in resp.json(): - for org_data in resp.json()["orgs"]: - orgs.append(self.klass.from_dict(org_data)) - for org in orgs: - org.client = self.client + if len(resp) > 0: + + groups = self.client.groups.all() + + for org in resp: + + try: + group = next(x for x in groups if x.id == org['attributes']['group_id']).to_dict() + except StopIteration: + group = None + + # Map org data to model variables + org_template = { + 'name': org['attributes']['name'], + 'id': org['id'], + 'slug': org['attributes']['slug'], + 'url': urljoin(self.client.web_url, '/org/{}'.format(org['attributes']['slug'])), + 'personal': org['attributes']['is_personal'], + 'group': group, + 'client': self.client + } + + orgs.append(self.klass.from_dict(org_template)) + return orgs + def get(self, id: str): + try: + org = self.client.get_rest_page("/orgs/{}".format(id)) + + org_template = { + 'name': org['attributes']['name'], + 'id': org['id'], + 'slug': org['attributes']['slug'], + 'url': urljoin(self.client.web_url, '/org/{}'.format(org['attributes']['slug'])), + 'personal': org['attributes']['is_personal'], + 'group': self.client.groups.get(org['attributes']['group_id']).to_dict() if 'group_id' in org['attributes'].keys() else None, + 'client': self.client + } + except SnykHTTPError as e: + logging.error(e.error) + raise e + except Exception as e: + logging.error(e) + raise e + + return self.klass.from_dict(org_template) + +class OrganizationGroupTagManager(Manager): + def all(self): + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + + resp = self.client.get(f'/group/{self.instance.id}/tags?perPage=50000&page=1') + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + + return resp.json()['tags'] + + def delete(self,key: str, value: str, force: bool=False) -> bool: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + + body = {'key': key, + 'value': value, + 'force': force} + + resp = self.client.post(f'/group/{self.instance.id}/tags/delete', body) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + + if resp.status_code == 200: + return True + else: + return False + +class OrganizationGroupManager(Manager): + def all(self): + params = {'limit': self.client.limit} + resp = self.client.get_rest_pages("/groups", params) + + groups = [] + if len(resp) > 0: + for group in resp: + groups.append(self.klass.from_dict({'name': group['attributes']['name'], 'id': group['id']})) + + return groups + + def first(self): + raise SnykNotImplementedError # pragma: no cover + + def get(self, id: str): + try: + resp = self.client.get_rest_page("/groups/{}".format(id)) + except SnykHTTPError as e: + if e.error[0]['detail'] == "Group Not Found": + logging.error("Group Not Found") + raise SnykNotFoundError from None + elif e.error[0]['detail'] == 'must match format "uuid"': + logging.error("ID must match format 'uuid'") + raise e + else: + raise e + except Exception as e: + raise e + + return self.klass(resp['attributes']['name'],resp['id'],self.client) + + + def filter(self, **kwargs: Any): + raise SnykNotImplementedError # pragma: no cover + class TagManager(Manager): def all(self): return self.instance._tags def add(self, key, value) -> bool: - tag = {"key": key, "value": value} - path = "org/%s/project/%s/tags" % ( + + path = "orgs/%s/projects/%s" % ( self.instance.organization.id, self.instance.id, ) - return bool(self.client.post(path, tag)) + + # Retain previous tags + tag = {"key": key, "value": value} + tags = self.instance._tags + tags.append(tag) + + # Build the request body + body = { + "data": { + "attributes":{ + "tags":tags + }, + "relationships":{}, + "id":self.instance.id, + "type": "project" + } + } + + params = {'user_id': self.instance.organization.client.users.self.id} + headers = {'content-type': 'application/vnd.api+json'} + + resp = self.client.patch(path=path, body=body, params=params, headers=headers).json() + + # Check to make sure the new tag was created + if tag in resp['data']['attributes']['tags']: + return True + + return False def delete(self, key, value) -> bool: - tag = {"key": key, "value": value} - path = "org/%s/project/%s/tags/remove" % ( + path = "orgs/%s/projects/%s" % ( self.instance.organization.id, self.instance.id, ) - return bool(self.client.post(path, tag)) + tag = {"key": key, "value": value} + tags = [ x for x in self.instance._tags if x != tag ] + + # Build the request body + body = { + "data": { + "attributes":{ + "tags":tags + }, + "relationships":{}, + "id":self.instance.id, + "type": "project" + } + } + + params = {'user_id': self.instance.organization.client.users.self.id} + headers = {'content-type': 'application/vnd.api+json'} + + resp = self.client.patch(path=path, body=body, params=params, headers=headers).json() + + # Check to make sure the tag was deleted + if tag in resp['data']['attributes']['tags']: + return False + return True + +class UserManager(Manager): + + def all(self) -> Any: + pass # pragma: no cover + + def first(self): + raise SnykNotImplementedError # pragma: no cover + + def get(self, id: str): + raise SnykNotImplementedError # pragma: no cover + + def filter(self, **kwargs: Any): + raise SnykNotImplementedError # pragma: no cover + + @property + def self(self): + user = self.client.get_rest_page("/self") + user_data = {'id': user['id']} + fields = ['name','username','email'] + for field in fields: + if field in user['attributes']: + user_data[field] = user['attributes'][field] + return self.klass.from_dict(user_data) + +# TODO: change implementation here to call REST Projects and other V1 APIs to fill in the gaps as per +# migration guide https://docs.google.com/document/d/1e-CnYRYxZXBRCRFW8YZ8tfKkv5zLSg2tEHPiLrvO8Oc +# Since the implementation uses filtering by tags, use an older API version that has this available https://apidocs.snyk.io/?version=2022-07-08%7Ebeta#get-/orgs/-org_id-/projects +# See annotations on the class snyk/models.py#L451-L452 for what data needs to be fetched from elsewhere or constructed class ProjectManager(Manager): - def _query(self, tags: List[Dict[str, str]] = []): - projects = [] - if self.instance: - path = "org/%s/projects" % self.instance.id - if tags: - for tag in tags: - if "key" not in tag or "value" not in tag or len(tag.keys()) != 2: - raise SnykError("Each tag must contain only a key and a value") - data = {"filters": {"tags": {"includes": tags}}} - resp = self.client.post(path, data) - else: - resp = self.client.get(path) - if "projects" in resp.json(): - for project_data in resp.json()["projects"]: - project_data["organization"] = self.instance.to_dict() - # We move tags to _tags as a cache, to avoid the need for additional requests - # when working with tags. We want tags to be the manager - try: - project_data["_tags"] = project_data["tags"] - del project_data["tags"] - except KeyError: - pass - if project_data["totalDependencies"] is None: - project_data["totalDependencies"] = 0 - projects.append(self.klass.from_dict(project_data)) - for x in projects: - x.organization = self.instance - else: - for org in self.client.organizations.all(): - projects.extend(org.projects.all()) - return projects + def _map_rest_data_to_project_model(self, data: dict = {}): + """Takes the data field from a rest API query for the /orgs/{org_id}/projects/{project_id} query and maps it to the Project model + + :param data: dictionary data field from a rest API call to /orgs/{org_id}/projects/{project_id} + + :return: Project model""" + + attr = data['attributes'] + + # Mandetory flags + project_data = { + 'name': attr['name'], + 'id': data['id'], + 'created': attr['created'], + 'origin': attr['origin'], + 'type': attr['type'], + 'readOnly': attr['read_only'], + 'testFrequency': attr['settings']['recurring_tests']['frequency'], + 'browseUrl': f"{self.instance.url}/project/{data['id']}", + 'isMonitored': attr['status'] if attr['status'] == 'active' else False, + 'targetReference': attr['target_reference'], + 'organization': self.instance.to_dict(), + 'attributes': {'criticality': attr['business_criticality'], + 'environment': attr['environment'], + 'lifecycle': attr['lifecycle']}, + } + # '_tags': attr['tags'] if 'tags' in attr.keys() else [], + + # Optional flags + for key in data.keys(): + if 'attributes' == key: + for attribute in data['attributes']: + if 'tags' == attribute: + project_data['_tags'] = attr['tags'] + + if 'meta' == key: + if 'latest_dependency_total' in data['meta'].keys(): + total = data['meta']['latest_dependency_total']['total'] + if total: + project_data['totalDependencies'] = total + else: + project_data['totalDependencies'] = 0 + if 'latest_issue_counts' in data['meta'].keys(): + project_data['issueCountsBySeverity'] = { + 'critical': int(data['meta']['latest_issue_counts']['critical']), + 'high': int(data['meta']['latest_issue_counts']['high']), + 'medium': int(data['meta']['latest_issue_counts']['medium']), + 'low': int(data['meta']['latest_issue_counts']['low']), + } + + return self.klass.from_dict(project_data) - def all(self): - return self._query() + def filter(self, **kwargs: Any): + """This functions allows you to filter using all of the filters available on https://apidocs.snyk.io/experimental?version=2023-07-28%7Eexperimental#tag--Projects + Then returns the project dictionaries in a list + + The list of parameters below are a list of of available filters from version=2023-07-28~experimental as of 7/31/2023 + + :param target_id: List of strings (target IDs) + Return projects that belong to the provided targets + :param meta_count: string - Allowed: "only" + Only return the collection count + :param ids: List of strings (Project IDs) + Return projects that match the provided IDs + :param names: List of strings (Project names) + Return projects that match the provided names + :param origins: List of strings (origins) + Return projects that match the provided origins + :param types: List of strings (project types) + Return projects that match the provided types + :param expand: string - Allowed: "target" + Expand relationships + :param latest_issue_counts: bool + Include a summary count for the issues found in the most recent scan of this project + :param latest_dependency_total: bool + Include the total number of dependencies found in the most recent scan of this project + :param cli_monitored_before: date-time - Example: 2021-05-29T09:50:54.014Z + Filter projects uploaded and monitored before this date (encoded value) + :param cli_monitored_after: date-time - Example: 2021-05-29T09:50:54.014Z + Filter projects uploaded and monitored after this date (encoded value) + :param importing_user_public_id: List of strings + Return projects that match the provided importing user public ids. + :param tags: List of strings (tags) - List of dict() - Example: [{'key':'test_key', 'value':'test_value'}] + Return projects that match all the provided tags + :param business_criticality: List of strings - Allowed: critical ┃ high ┃ medium ┃ low + Return projects that match all the provided business_criticality value + :param environment: List of strings - Allowed: frontend ┃ backend ┃ internal ┃ external ┃ mobile ┃ saas ┃ onprem ┃ hosted ┃ distributed + Return projects that match all the provided environment values + :param lifecycle: List of strings - Allowed: production ┃ development ┃ sandbox + Return projects that match all the provided lifecycle values + :param version: string - The requested version of the endpoint to process the request + :param starting_after: string - Examples: v1.eyJpZCI6IjEwMDAifQo= + Return the page of results immediately after this cursor + :param ending_before: string - Examples: v1.eyJpZCI6IjExMDAifQo= + Return the page of results immediately before this cursor + :param limit: int - Default: 10 (Min: 10, Max: 100, only multiples of 10 allowed) + Number of results to return per page + + :return: returns a list of dictionaries. One dictionary for each project included in the query. + """ - def filter(self, tags: List[Dict[str, str]] = [], **kwargs: Any): - if tags: - return self._filter_by_kwargs(self._query(tags), **kwargs) + filters = { + 'meta.latest_issue_counts': True, + 'meta.latest_dependency_total': True, + } + + filters_list = [ + "target_id", + "meta_count", + "ids", + "names", + "origins", + "types", + "expand: string - Allowed", + "latest_issue_counts", + "latest_dependency_total", + "cli_monitored_before", + "cli_monitored_after", + "importing_user_public_id", + "tags", + "business_criticality", + "environment", + "lifecycle", + "version", + "starting_after", + "ending_before", + "limit", + ] + + filters_list.extend(list(filters.keys())) + + # Set new filters + for filter_name in kwargs.keys(): + if filter_name in filters_list: + # map variable name to api parameter name + if filter_name in ["latest_issue_counts","latest_dependency_total"] : + filters[f"meta.{filter_name}"] = kwargs[filter_name] + else: + filters[filter_name] = kwargs[filter_name] + + #TODO: Add validation for every parameter to make sure + # They're each formatted correctly. + + if 'limit' not in filters.keys(): + filters['limit'] = self.client.limit + + path = "orgs/%s/projects" % self.instance.id + + resp = self.client.get_rest_pages(path, filters) + + return resp + + def query(self, **kwargs: Any): + """Utility to query data from the api and return a list of project data models""" + if self.instance: + projects = [] + resp = self.filter(**kwargs) + for project in resp: + projects.append(self._map_rest_data_to_project_model(project)) + return projects else: - return super().filter(**kwargs) + return super().get(**kwargs) def get(self, id: str): + """legacy get method for backward compatibility""" + if self.instance: - path = "org/%s/project/%s" % (self.instance.id, id) - resp = self.client.get(path) - project_data = resp.json() - project_data["organization"] = self.instance.to_dict() - # We move tags to _tags as a cache, to avoid the need for additional requests - # when working with tags. We want tags to be the manager - try: - project_data["_tags"] = project_data["tags"] - del project_data["tags"] - except KeyError: - pass - if project_data["totalDependencies"] is None: - project_data["totalDependencies"] = 0 - project_klass = self.klass.from_dict(project_data) - project_klass.organization = self.instance - return project_klass + resp = self.filter(ids=[id]) + return self._map_rest_data_to_project_model(resp[0]) else: return super().get(id) + def all(self): + projects = [] + if self.instance: + resp = self.filter() + for project in resp: + model = self._map_rest_data_to_project_model(project) + projects.append(model) + else: + for org in self.client.organizations.all(): + projects.extend(org.projects.all()) + return projects class MemberManager(Manager): def all(self): @@ -307,24 +624,41 @@ def update(self, **kwargs: bool) -> bool: class IgnoreManager(DictManager): def all(self) -> Dict[str, List[object]]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/ignores" % ( self.instance.organization.id, self.instance.id, ) resp = self.client.get(path) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL return resp.json() + class JiraIssueManager(DictManager): def all(self) -> Dict[str, List[object]]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/jira-issues" % ( self.instance.organization.id, self.instance.id, ) resp = self.client.get(path) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + return resp.json() def create(self, issue_id: str, fields: Any) -> Dict[str, str]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/issue/%s/jira-issue" % ( self.instance.organization.id, self.instance.id, @@ -333,6 +667,10 @@ def create(self, issue_id: str, fields: Any) -> Dict[str, str]: post_body = {"fields": fields} resp = self.client.post(path, post_body) response_data = resp.json() + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + # The response we get is not following the schema as specified by the api # https://snyk.docs.apiary.io/#reference/projects/project-jira-issues-/create-jira-issue if ( @@ -413,6 +751,122 @@ def filter(self, **kwargs: Any): resp = self.client.post(path, post_body) return self.klass.from_dict(self._convert_reserved_words(resp.json())) + def all(self) -> Any: + self._query() + + +class IssueSetAggregatedManager2(SingletonManager): + """ + TODO: Update the Issue model to match the latest changes to API + def _map_rest_data_to_issue_model(self, data: dict = {}): + ''' + Lets map what we can, then set the rest to optional + Things to map: + + # Mapped + id: str + issueType: str + isIgnored: bool + + # Not Mapped + pkgName: str + pkgVersions: List[str] + isPatched: bool + fixInfo: FixInfo + introducedThrough: Optional[List[Any]] = None + ignoreReasons: Optional[List[Any]] = None + # Not mentioned in schema but returned + # https://snyk.docs.apiary.io/#reference/projects/aggregated-project-issues/list-all-aggregated-issues + priorityScore: Optional[int] = None + priority: Optional[Any] = None + + issueData: IssueData + # issueData fields to map + ## Mapped + id: str + title: str + severity: str + url: str + + ## Unmapped + exploitMaturity: str + description: Optional[str] = None + identifiers: Optional[Any] = None + credit: Optional[List[str]] = None + semver: Optional[Any] = None + publicationTime: Optional[str] = None + disclosureTime: Optional[str] = None + CVSSv3: Optional[str] = None + cvssScore: Optional[str] = None + language: Optional[str] = None + patches: Optional[Any] = None + nearestFixedInVersion: Optional[str] = None + ignoreReasons: Optional[List[Any]] = None + + ''' + + # Map the required items + attr = data['attributes'] + issue_data = { + 'id': attr['key'], # This appears to be what Snyk is using for the id and not the actual ID for "package_vulnerability" type issues + 'issueType': attr['type'], + 'title': attr['title'], + 'description': "", # Need to get details from somewhere else + 'issueData': { + 'id': attr['key'] + 'title': attr['title'] + 'severity': attr['effective_severity_level'] + exploitMaturity: str + description: Optional[str] = None + identifiers: Optional[Any] = None + credit: Optional[List[str]] = None + semver: Optional[Any] = None + publicationTime: Optional[str] = None + disclosureTime: Optional[str] = None + CVSSv3: Optional[str] = None + cvssScore: Optional[str] = None + language: Optional[str] = None + patches: Optional[Any] = None + nearestFixedInVersion: Optional[str] = None + ignoreReasons: Optional[List[Any]] = None + }, + 'isIgnored': attr['ignored'], + + } + + # Calling object is a Project + if hasattr(self.instance, 'organization'): + # Generate URL + issue_data['issueData']['url'] = f"{self.instance.browseUrl}#issue-{attr['key']}" + + # Calling object is an Organization + else: + issue_data['issueData']['url'] = f"{self.instance.url}/project/{data['relationships']['scan_item']['id']}#issue-{attr['key']}" + """ + def filter(self, **kwargs: Any): + + filters = {} + if 'limit' not in filters.keys(): + filters['limit'] = self.client.limit + + # Calling object is a Project + if hasattr(self.instance, 'organization'): + filters['scan_item.id'] = self.instance.id + filters['scan_item.type'] = 'project' + + path = f"orgs/{self.instance.organization.id}/issues" + + # Calling object is an Organization + else: + path = f"orgs/{self.instance.id}/issues" + + resp = self.client.get_rest_pages(path, filters) + + return resp + + def all(self): + return self.filter() + class IssueSetAggregatedManager(SingletonManager): def all(self) -> Any: diff --git a/snyk/models.py b/snyk/models.py index 60a272d..c13cedf 100644 --- a/snyk/models.py +++ b/snyk/models.py @@ -134,6 +134,12 @@ class OrganizationGroup(DataClassJSONMixin): name: str id: str + client: Optional[Any] = None + + @property + def tags(self) -> Manager: + return Manager.factory("OrganizationGroupTags", self.client, self) + @dataclass class Package(DataClassJSONMixin): @@ -162,6 +168,7 @@ class Organization(DataClassJSONMixin): id: str slug: str url: str + personal: bool group: Optional[OrganizationGroup] = None client: Optional[Any] = None @@ -189,6 +196,10 @@ def entitlements(self) -> Manager: def integrations(self) -> Manager: return Manager.factory(Integration, self.client, self) + @property + def issues(self) -> Manager: + return Manager.factory(IssueSetAggregated, self.client, self) + """ Imports need integrations, but exposing a high-level API that can find the integration from the URL of the thing you want @@ -498,6 +509,12 @@ class IssueCounts(DataClassJSONMixin): # https://updates.snyk.io/critical-severity-level-195891, critical severity is optional since it is still under Snyk preview critical: Optional[int] = 0 +@dataclass +class User(DataClassJSONMixin): + id: str + name: str + username: Optional[str] = field(default_factory=str) + email: Optional[str] = field(default_factory=str) @dataclass class DependencyGraphPackageInfo(DataClassJSONMixin): @@ -544,6 +561,12 @@ class DependencyProject(DataClassJSONMixin): name: str id: str +@dataclass +# TODO: list actual values this can return +class Attributes(DataClassJSONMixin): + criticality: List[str] + environment: List[str] + lifecycle: List[str] @dataclass class Dependency(DataClassJSONMixin): @@ -563,32 +586,45 @@ class Dependency(DataClassJSONMixin): @dataclass class Project(DataClassJSONMixin): + # Current set implemented with REST API name: str - organization: Organization id: str created: str origin: str type: str - readOnly: bool + readOnly: bool # TODO: not yet available in REST testFrequency: str - totalDependencies: int - lastTestedDate: str - browseUrl: str + browseUrl: str # TODO: constrict this yourself from scratch using the API host + project UUID + org name [Done in project.get method] isMonitored: bool - issueCountsBySeverity: IssueCounts - imageTag: Optional[str] = None - imageId: Optional[str] = None - imageBaseImage: Optional[str] = None - imagePlatform: Optional[str] = None - imageCluster: Optional[str] = None - hostname: Optional[str] = None + targetReference: str + attributes: Attributes + organization: Organization + + _tags: Optional[List[Any]] = field(default_factory=list) + totalDependencies: Optional[int] = field(default_factory=int) + issueCountsBySeverity: Optional[IssueCounts] = None + + # TODO: delete remediation. This must be a mistake, there is no remediation in /projects v1 response + # remediation: Optional[Dict[Any, Any]] = field(default_factory=dict) # TODO: this is from individual project call only? + + # appended/amended fields + + # Setting User fields to optional because these both require Admin permissions to query this data + importingUser: Optional[User] = None # TODO. The ID comes in https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#get-/orgs/-org_id-/projects/-project_id- + owner: Optional[User] = None # TODO: use the owner ID and call this API to get the rest of the data https://apidocs.snyk.io/?version=2023-05-29%7Ebeta#get-/orgs/-org_id-/users/-id- + + # Variables that can't be set using REST API yet. Leaving them as optional so they can be set manually in the future or implemented on their own. + #totalDependencies: Optional[int] = field(default_factory=int)# TODO: don't see this in the REST API yet. + #issueCountsBySeverity: Optional[IssueCounts] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + lastTestedDate: Optional[str] = field(default_factory=str) # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + imageId: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + imageTag: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + imageBaseImage: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + imagePlatform: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + imageCluster: Optional[str] = None # TODO: can be fetched from https://apidocs.snyk.io/?version=2023-04-28%7Ebeta#get-/orgs/-org_id-/targets remoteRepoUrl: Optional[str] = None branch: Optional[str] = None - attributes: Optional[Dict[str, List[str]]] = None - _tags: Optional[List[Any]] = field(default_factory=list) - remediation: Optional[Dict[Any, Any]] = field(default_factory=dict) - owner: Optional[Dict[Any, Any]] = field(default_factory=dict) - importingUser: Optional[Dict[Any, Any]] = field(default_factory=dict) + def delete(self) -> bool: path = "org/%s/project/%s" % (self.organization.id, self.id) @@ -658,6 +694,10 @@ def issueset(self) -> Manager: def issueset_aggregated(self) -> Manager: return Manager.factory(IssueSetAggregated, self.organization.client, self) + @property + def issues(self) -> Manager: + return Manager.factory(IssueSetAggregated, self.organization.client, self) + @property def vulnerabilities(self) -> List[Vulnerability]: vuln_filter = { diff --git a/snyk/utils.py b/snyk/utils.py index 28113ef..4ca4c5b 100644 --- a/snyk/utils.py +++ b/snyk/utils.py @@ -47,4 +47,4 @@ def load_test_data(test_dir: str, test_name: str) -> dict: test_file = f"{test_dir}/{test_name}.json" with open(test_file, "r") as the_file: data = the_file.read() - return json.loads(data) + return json.loads(data) \ No newline at end of file