From bf79e1be55293bec6552e8780d2997c33c92b174 Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Mon, 24 Jul 2023 01:15:42 -0700 Subject: [PATCH 1/7] updated Organization and Project managers to work with REST API --- snyk/client.py | 36 +++++++-- snyk/managers.py | 187 +++++++++++++++++++++++++++++++++++------------ snyk/models.py | 37 ++++++---- snyk/utils.py | 2 +- 4 files changed, 193 insertions(+), 69 deletions(-) diff --git a/snyk/client.py b/snyk/client.py index 27794fc..6d0e9f8 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -8,20 +8,24 @@ from .__version__ import __version__ from .errors import SnykHTTPError, SnykNotImplementedError from .managers import Manager -from .models import Organization, Project +from .models import Organization, Project, OrganizationGroup from .utils import cleanup_path logger = logging.getLogger(__name__) class SnykClient(object): - API_URL = "https://api.snyk.io/v1" + WEB_URL = "https://app.snyk.io" + API_URL = "https://api.snyk.io/rest" + VERSION = "2023-06-23~experimental" + #API_URL = "https://api.snyk.io/v1" USER_AGENT = "pysnyk/%s" % __version__ def __init__( self, token: str, url: Optional[str] = None, + web_url: Optional[str] = None, user_agent: Optional[str] = USER_AGENT, debug: bool = False, tries: int = 1, @@ -29,6 +33,7 @@ def __init__( backoff: int = 2, verify: bool = True, version: Optional[str] = None, + params: dict = {'limit':100}, ): self.api_token = token self.api_url = url or self.API_URL @@ -42,7 +47,9 @@ def __init__( self.backoff = backoff self.delay = delay self.verify = verify - self.version = version + self.version = version or self.VERSION + self.web_url = web_url or self.WEB_URL + self.params = params # Ensure we don't have a trailing / if self.api_url[-1] == "/": @@ -157,7 +164,7 @@ def get( fkwargs = {"headers": self.api_headers} logger.debug(f"GET: {debug_url}") - + resp = retry_call( self.request, fargs=[requests.get, url], @@ -192,6 +199,13 @@ def delete(self, path: str) -> requests.Response: return resp + def get_rest_page(self, path: str, params: dict = {}) -> dict: + """Helper function to colleged unpaginated responses from the rest API as a dictionary + + This takes the "data" list from the response and returns it""" + + return self.get(path, params).json()['data'] + def get_rest_pages(self, path: str, params: dict = {}) -> List: """ Helper function to collect paginated responses from the rest API into a single @@ -203,7 +217,6 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: # this is a raw primative but a higher level module might want something that does an # arbitrary path + origin=foo + limit=100 url construction instead before being sent here - limit = params["limit"] data = list() @@ -211,12 +224,16 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: page = self.get(path, params).json() data.extend(page["data"]) - + while "next" in page["links"].keys(): logger.debug( f"GET_REST_PAGES: Another link exists: {page['links']['next']}" ) + # Check for "/rest" at the end of the root url and beginning of next url + if page["links"]["next"][:5] == "/rest" and self.api_url[-5:] == "/rest": + page["links"]["next"] = page["links"]["next"][5:] + next_url = urllib.parse.urlsplit(page["links"]["next"]) query = urllib.parse.parse_qs(next_url.query) @@ -254,8 +271,11 @@ def notification_settings(self): # https://snyk.docs.apiary.io/#reference/groups/organisations-in-groups/create-a-new-organisation-in-the-group # https://snyk.docs.apiary.io/#reference/0/list-members-in-a-group/list-all-members-in-a-group # https://snyk.docs.apiary.io/#reference/0/members-in-an-organisation-of-a-group/add-a-member-to-an-organisation-from-another-organisation-in-the-group - def groups(self): - raise SnykNotImplementedError # pragma: no cover + @property + def groups(self) -> Manager: + return Manager.factory(OrganizationGroup, self) + #return Manager.factory(Organization, self) + #raise SnykNotImplementedError # pragma: no cover # https://snyk.docs.apiary.io/#reference/reporting-api/issues/get-list-of-issues def issues(self): diff --git a/snyk/managers.py b/snyk/managers.py index c0e0e7a..d857b89 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -1,11 +1,14 @@ import abc +import logging from typing import Any, Dict, List +from urllib.parse import urljoin from deprecation import deprecated # type: ignore -from .errors import SnykError, SnykNotFoundError, SnykNotImplementedError +from .errors import SnykError, SnykNotFoundError, SnykNotImplementedError, SnykHTTPError from .utils import snake_to_camel +logger = logging.getLogger(__name__) class Manager(abc.ABC): def __init__(self, klass, client, instance=None): @@ -62,6 +65,7 @@ def factory(klass, client, instance=None): "IntegrationSetting": IntegrationSettingManager, "Tag": TagManager, "IssuePaths": IssuePathsManager, + "OrganizationGroup": OrganizationGroupManager, }[key] return manager(klass, client, instance) except KeyError: @@ -106,15 +110,94 @@ def filter(self, **kwargs: Any): class OrganizationManager(Manager): def all(self): - resp = self.client.get("orgs") + params = {'limit': self.client.params['limit']} + resp = self.client.get_rest_pages("/orgs", params) + orgs = [] - if "orgs" in resp.json(): - for org_data in resp.json()["orgs"]: - orgs.append(self.klass.from_dict(org_data)) - for org in orgs: - org.client = self.client + if len(resp) > 0: + + groups = self.client.groups.all() + + for org in resp: + + try: + group = next(x for x in groups if x.id == org['attributes']['group_id']).to_dict() + except StopIteration: + group = None + + # Map org data to model variables + org_template = { + 'name': org['attributes']['name'], + 'id': org['id'], + 'slug': org['attributes']['slug'], + 'url': urljoin(self.client.web_url, '/org/{}'.format(org['attributes']['slug'])), + 'personal': org['attributes']['is_personal'], + 'group': group, + 'client': self.client + } + + orgs.append(self.klass.from_dict(org_template)) + return orgs + def get(self, id: str): + try: + org = self.client.get_rest_page("/orgs/{}".format(id)) + + org_template = { + 'name': org['attributes']['name'], + 'id': org['id'], + 'slug': org['attributes']['slug'], + 'url': urljoin(self.client.web_url, '/org/{}'.format(org['attributes']['slug'])), + 'personal': org['attributes']['is_personal'], + 'group': self.client.groups.get(org['attributes']['group_id']).to_dict() if 'group_id' in org['attributes'].keys() else None, + 'client': self.client + } + except SnykHTTPError as e: + logging.error(e.error) + raise e + except Exception as e: + logging.error(e) + raise e + + return self.klass.from_dict(org_template) + +class OrganizationGroupManager(Manager): + def all(self): + params = {'limit': self.client.params['limit']} + resp = self.client.get_rest_pages("/groups", params) + + groups = [] + if len(resp) > 0: + for group in resp: + groups.append(self.klass.from_dict({'name': group['attributes']['name'], 'id': group['id']})) + + return groups + + def first(self): + raise SnykNotImplementedError # pragma: no cover + + def get(self, id: str): + try: + resp = self.client.get_rest_page("/groups/{}".format(id)) + except SnykHTTPError as e: + if e.error[0]['detail'] == "Group Not Found": + logging.error("Group Not Found") + raise SnykNotFoundError from None + elif e.error[0]['detail'] == 'must match format "uuid"': + logging.error("ID must match format 'uuid'") + raise e + else: + raise e + except Exception as e: + raise e + + return self.klass(resp['attributes']['name'],resp['id']) + + + def filter(self, **kwargs: Any): + raise SnykNotImplementedError # pragma: no cover + class TagManager(Manager): def all(self): @@ -142,33 +225,38 @@ def delete(self, key, value) -> bool: # Since the implementation uses filtering by tags, use an older API version that has this available https://apidocs.snyk.io/?version=2022-07-08%7Ebeta#get-/orgs/-org_id-/projects # See annotations on the class snyk/models.py#L451-L452 for what data needs to be fetched from elsewhere or constructed class ProjectManager(Manager): - def _query(self, tags: List[Dict[str, str]] = []): + #def _query(self, tags: List[Dict[str, str]] = []): + def _query(self, params: dict = {}): projects = [] if self.instance: - path = "org/%s/projects" % self.instance.id - if tags: - for tag in tags: - if "key" not in tag or "value" not in tag or len(tag.keys()) != 2: - raise SnykError("Each tag must contain only a key and a value") - data = {"filters": {"tags": {"includes": tags}}} - resp = self.client.post(path, data) - else: - resp = self.client.get(path) - if "projects" in resp.json(): - for project_data in resp.json()["projects"]: - project_data["organization"] = self.instance.to_dict() - # We move tags to _tags as a cache, to avoid the need for additional requests - # when working with tags. We want tags to be the manager - try: - project_data["_tags"] = project_data["tags"] - del project_data["tags"] - except KeyError: - pass - if project_data["totalDependencies"] is None: - project_data["totalDependencies"] = 0 - projects.append(self.klass.from_dict(project_data)) - for x in projects: - x.organization = self.instance + if 'limit' not in params.keys(): + params['limit'] = self.client.params['limit'] + + path = "orgs/%s/projects" % self.instance.id + resp = self.client.get_rest_pages(path, params) + + for project in resp: + attributes = project['attributes'] + + project_data = { + 'name': attributes['name'], + 'id': project['id'], + 'created': attributes['created'], + 'origin': attributes['origin'], + 'type': attributes['type'], + 'readOnly': attributes['read_only'], + 'testFrequency': attributes['settings']['recurring_tests']['frequency'], + 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, + 'targetReference': attributes['target_reference'], + 'organization': self.instance.to_dict(), + '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], + 'attributes': {'criticality':attributes['business_criticality'], 'environment':attributes['environment'], 'lifecycle':attributes['lifecycle']}, + } + + project_klass = self.klass.from_dict(project_data) + + projects.append(project_klass) else: for org in self.client.organizations.all(): projects.extend(org.projects.all()) @@ -185,21 +273,28 @@ def filter(self, tags: List[Dict[str, str]] = [], **kwargs: Any): def get(self, id: str): if self.instance: - path = "org/%s/project/%s" % (self.instance.id, id) - resp = self.client.get(path) - project_data = resp.json() - project_data["organization"] = self.instance.to_dict() - # We move tags to _tags as a cache, to avoid the need for additional requests - # when working with tags. We want tags to be the manager - try: - project_data["_tags"] = project_data["tags"] - del project_data["tags"] - except KeyError: - pass - if project_data["totalDependencies"] is None: - project_data["totalDependencies"] = 0 + path = "orgs/%s/projects/%s" % (self.instance.id, id) + resp = self.client.get_rest_page(path) + attributes = resp['attributes'] + + project_data = { + 'name': attributes['name'], + 'id': resp['id'], + 'created': attributes['created'], + 'origin': attributes['origin'], + 'type': attributes['type'], + 'readOnly': attributes['read_only'], + 'testFrequency': attributes['settings']['recurring_tests']['frequency'], + 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, + 'targetReference': attributes['target_reference'], + 'organization': self.instance.to_dict(), + '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], + 'attributes': {'criticality':attributes['business_criticality'], 'environment':attributes['environment'], 'lifecycle':attributes['lifecycle']}, + } + project_klass = self.klass.from_dict(project_data) - project_klass.organization = self.instance + return project_klass else: return super().get(id) diff --git a/snyk/models.py b/snyk/models.py index 01ff1e4..4de9a7f 100644 --- a/snyk/models.py +++ b/snyk/models.py @@ -162,6 +162,7 @@ class Organization(DataClassJSONMixin): id: str slug: str url: str + personal: bool group: Optional[OrganizationGroup] = None client: Optional[Any] = None @@ -575,6 +576,7 @@ class Dependency(DataClassJSONMixin): @dataclass class Project(DataClassJSONMixin): + # Current set implemented with REST API name: str id: str created: str @@ -582,27 +584,34 @@ class Project(DataClassJSONMixin): type: str readOnly: bool # TODO: not yet available in REST testFrequency: str - totalDependencies: int - issueCountsBySeverity: IssueCounts # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + browseUrl: str # TODO: constrict this yourself from scratch using the API host + project UUID + org name [Done in project.get method] + isMonitored: bool + targetReference: str + attributes: Attributes + organization: Organization + _tags: Optional[List[Any]] = field(default_factory=list) + + # TODO: delete remediation. This must be a mistake, there is no remediation in /projects v1 response + # remediation: Optional[Dict[Any, Any]] = field(default_factory=dict) # TODO: this is from individual project call only? + + # appended/amended fields + + # Setting User fields to optional because these both require Admin permissions to query this data + importingUser: Optional[User] = None # TODO. The ID comes in https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#get-/orgs/-org_id-/projects/-project_id- + owner: Optional[User] = None # TODO: use the owner ID and call this API to get the rest of the data https://apidocs.snyk.io/?version=2023-05-29%7Ebeta#get-/orgs/-org_id-/users/-id- + + # Variables that can't be set using REST API yet. Leaving them as optional so they can be set manually in the future or implemented on their own. + totalDependencies: Optional[int] = field(default_factory=int)# TODO: don't see this in the REST API yet. + issueCountsBySeverity: Optional[IssueCounts] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + lastTestedDate: Optional[str] = field(default_factory=str) # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageId: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageTag: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageBaseImage: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imagePlatform: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageCluster: Optional[str] = None # TODO: can be fetched from https://apidocs.snyk.io/?version=2023-04-28%7Ebeta#get-/orgs/-org_id-/targets remoteRepoUrl: Optional[str] = None - lastTestedDate: str # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots - owner: User # TODO: use the owner ID and call this API to get the rest of the data https://apidocs.snyk.io/?version=2023-05-29%7Ebeta#get-/orgs/-org_id-/users/-id- - importingUser: User - browseUrl: str # TODO: constrict this yourself from scratch using the API host + project UUID + org name - isMonitored: bool branch: Optional[str] = None - # TODO: delete remediation. This must be a mistake, there is no remediation in /projects v1 response - # remediation: Optional[Dict[Any, Any]] = field(default_factory=dict) # TODO: thsi is from individual project call only? - targetReference: str - attributes: Attributes - # appended/amended fields - organization: Organization - _tags: Optional[List[Any]] = field(default_factory=list) + def delete(self) -> bool: path = "org/%s/project/%s" % (self.organization.id, self.id) diff --git a/snyk/utils.py b/snyk/utils.py index 28113ef..4ca4c5b 100644 --- a/snyk/utils.py +++ b/snyk/utils.py @@ -47,4 +47,4 @@ def load_test_data(test_dir: str, test_name: str) -> dict: test_file = f"{test_dir}/{test_name}.json" with open(test_file, "r") as the_file: data = the_file.read() - return json.loads(data) + return json.loads(data) \ No newline at end of file From a8a284a0c81662bb836bf8924cdcf31fc9c415cc Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Mon, 24 Jul 2023 16:11:27 -0700 Subject: [PATCH 2/7] added support for adding new tags to projects through REST API --- snyk/client.py | 45 ++++++++++++++-- snyk/managers.py | 135 ++++++++++++++++++++++++++++++++--------------- snyk/models.py | 4 +- 3 files changed, 136 insertions(+), 48 deletions(-) diff --git a/snyk/client.py b/snyk/client.py index 6d0e9f8..7a189cf 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -3,12 +3,13 @@ from typing import Any, List, Optional import requests +from requests.compat import urljoin from retry.api import retry_call from .__version__ import __version__ from .errors import SnykHTTPError, SnykNotImplementedError from .managers import Manager -from .models import Organization, Project, OrganizationGroup +from .models import Organization, Project, OrganizationGroup, User from .utils import cleanup_path logger = logging.getLogger(__name__) @@ -71,6 +72,7 @@ def request( resp = method( url, headers=headers, params=params, json=json, verify=self.verify ) + elif params and not json: resp = method(url, headers=headers, params=params, verify=self.verify) elif json and not params: @@ -104,6 +106,39 @@ def post(self, path: str, body: Any, headers: dict = {}) -> requests.Response: return resp + def patch(self, path: str, body: Any, headers: dict = {}, params: dict = None) -> requests.Response: + path = cleanup_path(path) + + url = f"{self.api_url}/{path}" + + logger.debug(f"PATCH: {url}") + + if params or self.version: + + if not params: + params = {} + + # we use the presence of version to determine if we are REST or not + if "version" not in params.keys() and self.version: + params["version"] = self.version + + resp = retry_call( + self.request, + fargs=[requests.patch, url], + fkwargs={"json": body, "headers": {**self.api_post_headers, **headers}, "params": params}, + tries=self.tries, + delay=self.delay, + backoff=self.backoff, + exceptions=SnykHTTPError, + logger=logger, + ) + + if not resp.ok: + logger.error(resp.text) + raise SnykHTTPError(resp) + + return resp + def put(self, path: str, body: Any, headers: dict = {}) -> requests.Response: url = "%s/%s" % (self.api_url, path) logger.debug("PUT: %s" % url) @@ -274,9 +309,13 @@ def notification_settings(self): @property def groups(self) -> Manager: return Manager.factory(OrganizationGroup, self) - #return Manager.factory(Organization, self) - #raise SnykNotImplementedError # pragma: no cover # https://snyk.docs.apiary.io/#reference/reporting-api/issues/get-list-of-issues def issues(self): raise SnykNotImplementedError # pragma: no cover + + # At the client level this should only be able to return the results for /self + # https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#get-/self + @property + def users(self) -> Manager: + return Manager.factory(User, self) diff --git a/snyk/managers.py b/snyk/managers.py index d857b89..8c78cc5 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -1,7 +1,8 @@ import abc import logging +import json from typing import Any, Dict, List -from urllib.parse import urljoin +from requests.compat import urljoin from deprecation import deprecated # type: ignore @@ -49,24 +50,26 @@ def factory(klass, client, instance=None): else: key = klass.__name__ manager = { - "Project": ProjectManager, - "Organization": OrganizationManager, - "Member": MemberManager, - "License": LicenseManager, - "Dependency": DependencyManager, - "Entitlement": EntitlementManager, - "Setting": SettingManager, - "Ignore": IgnoreManager, - "JiraIssue": JiraIssueManager, - "DependencyGraph": DependencyGraphManager, - "IssueSet": IssueSetManager, + "Project": ProjectManager, + "Organization": OrganizationManager, + "Member": MemberManager, + "License": LicenseManager, + "Dependency": DependencyManager, + "Entitlement": EntitlementManager, + "Setting": SettingManager, + "Ignore": IgnoreManager, + "JiraIssue": JiraIssueManager, + "DependencyGraph": DependencyGraphManager, + "IssueSet": IssueSetManager, "IssueSetAggregated": IssueSetAggregatedManager, - "Integration": IntegrationManager, + "Integration": IntegrationManager, "IntegrationSetting": IntegrationSettingManager, - "Tag": TagManager, - "IssuePaths": IssuePathsManager, - "OrganizationGroup": OrganizationGroupManager, + "Tag": TagManager, + "IssuePaths": IssuePathsManager, + "OrganizationGroup": OrganizationGroupManager, + "User": UserManager, }[key] + return manager(klass, client, instance) except KeyError: raise SnykError @@ -204,12 +207,31 @@ def all(self): return self.instance._tags def add(self, key, value) -> bool: - tag = {"key": key, "value": value} - path = "org/%s/project/%s/tags" % ( + + path = "orgs/%s/projects/%s" % ( self.instance.organization.id, self.instance.id, ) - return bool(self.client.post(path, tag)) + + # Retain previous tags + tags = self.instance._tags + tags.append({'key':key, 'value':value}) + + # Build the request body + body = { + "data": { + "attributes":{ + "tags":tags + }, + "relationships":{}, + "id":self.instance.id, + "type": "project" + } + } + + params = {'user_id': self.instance.organization.client.users.self.id} + headers = {'content-type': 'application/vnd.api+json'} + return bool(self.client.patch(path=path, body=body, params=params, headers=headers)) def delete(self, key, value) -> bool: tag = {"key": key, "value": value} @@ -219,7 +241,30 @@ def delete(self, key, value) -> bool: ) return bool(self.client.post(path, tag)) +class UserManager(Manager): + + def all(self) -> Any: + pass # pragma: no cover + + def first(self): + raise SnykNotImplementedError # pragma: no cover + + def get(self, id: str): + raise SnykNotImplementedError # pragma: no cover + def filter(self, **kwargs: Any): + raise SnykNotImplementedError # pragma: no cover + + @property + def self(self): + user = self.client.get_rest_page("/self") + user_data = {'id': user['id']} + fields = ['name','username','email'] + for field in fields: + if field in user['attributes']: + user_data[field] = user['attributes'][field] + return self.klass.from_dict(user_data) + # TODO: change implementation here to call REST Projects and other V1 APIs to fill in the gaps as per # migration guide https://docs.google.com/document/d/1e-CnYRYxZXBRCRFW8YZ8tfKkv5zLSg2tEHPiLrvO8Oc # Since the implementation uses filtering by tags, use an older API version that has this available https://apidocs.snyk.io/?version=2022-07-08%7Ebeta#get-/orgs/-org_id-/projects @@ -239,19 +284,21 @@ def _query(self, params: dict = {}): attributes = project['attributes'] project_data = { - 'name': attributes['name'], - 'id': project['id'], - 'created': attributes['created'], - 'origin': attributes['origin'], - 'type': attributes['type'], - 'readOnly': attributes['read_only'], - 'testFrequency': attributes['settings']['recurring_tests']['frequency'], - 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), - 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, + 'name': attributes['name'], + 'id': project['id'], + 'created': attributes['created'], + 'origin': attributes['origin'], + 'type': attributes['type'], + 'readOnly': attributes['read_only'], + 'testFrequency': attributes['settings']['recurring_tests']['frequency'], + 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, 'targetReference': attributes['target_reference'], - 'organization': self.instance.to_dict(), - '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], - 'attributes': {'criticality':attributes['business_criticality'], 'environment':attributes['environment'], 'lifecycle':attributes['lifecycle']}, + 'organization': self.instance.to_dict(), + '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], + 'attributes': {'criticality': attributes['business_criticality'], + 'environment': attributes['environment'], + 'lifecycle': attributes['lifecycle']}, } project_klass = self.klass.from_dict(project_data) @@ -278,19 +325,21 @@ def get(self, id: str): attributes = resp['attributes'] project_data = { - 'name': attributes['name'], - 'id': resp['id'], - 'created': attributes['created'], - 'origin': attributes['origin'], - 'type': attributes['type'], - 'readOnly': attributes['read_only'], - 'testFrequency': attributes['settings']['recurring_tests']['frequency'], - 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), - 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, + 'name': attributes['name'], + 'id': resp['id'], + 'created': attributes['created'], + 'origin': attributes['origin'], + 'type': attributes['type'], + 'readOnly': attributes['read_only'], + 'testFrequency': attributes['settings']['recurring_tests']['frequency'], + 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, 'targetReference': attributes['target_reference'], - 'organization': self.instance.to_dict(), - '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], - 'attributes': {'criticality':attributes['business_criticality'], 'environment':attributes['environment'], 'lifecycle':attributes['lifecycle']}, + 'organization': self.instance.to_dict(), + '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], + 'attributes': {'criticality': attributes['business_criticality'], + 'environment': attributes['environment'], + 'lifecycle': attributes['lifecycle']}, } project_klass = self.klass.from_dict(project_data) diff --git a/snyk/models.py b/snyk/models.py index 4de9a7f..4785bb1 100644 --- a/snyk/models.py +++ b/snyk/models.py @@ -503,8 +503,8 @@ class IssueCounts(DataClassJSONMixin): class User(DataClassJSONMixin): id: str name: str - username: str - email: str + username: Optional[str] = field(default_factory=str) + email: Optional[str] = field(default_factory=str) @dataclass class DependencyGraphPackageInfo(DataClassJSONMixin): From 77c283fa854876bb16421a63329f182102578e8b Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Tue, 25 Jul 2023 01:11:46 -0700 Subject: [PATCH 3/7] implemented tag delete method to work with REST API --- snyk/managers.py | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/snyk/managers.py b/snyk/managers.py index 8c78cc5..ffe52b0 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -214,8 +214,9 @@ def add(self, key, value) -> bool: ) # Retain previous tags + tag = {"key": key, "value": value} tags = self.instance._tags - tags.append({'key':key, 'value':value}) + tags.append(tag) # Build the request body body = { @@ -231,15 +232,46 @@ def add(self, key, value) -> bool: params = {'user_id': self.instance.organization.client.users.self.id} headers = {'content-type': 'application/vnd.api+json'} - return bool(self.client.patch(path=path, body=body, params=params, headers=headers)) + + resp = self.client.patch(path=path, body=body, params=params, headers=headers).json() + + # Check to make sure the new tag was created + if tag in resp['data']['attributes']['tags']: + return True + + return False def delete(self, key, value) -> bool: - tag = {"key": key, "value": value} - path = "org/%s/project/%s/tags/remove" % ( + path = "orgs/%s/projects/%s" % ( self.instance.organization.id, self.instance.id, ) - return bool(self.client.post(path, tag)) + + tag = {"key": key, "value": value} + tags = [ x for x in self.instance._tags if x != tag ] + + # Build the request body + body = { + "data": { + "attributes":{ + "tags":tags + }, + "relationships":{}, + "id":self.instance.id, + "type": "project" + } + } + + params = {'user_id': self.instance.organization.client.users.self.id} + headers = {'content-type': 'application/vnd.api+json'} + + resp = self.client.patch(path=path, body=body, params=params, headers=headers).json() + + # Check to make sure the tag was deleted + if tag in resp['data']['attributes']['tags']: + return False + + return True class UserManager(Manager): From 39ca873e75a1fdac2cfc04bd8251211a21c610a4 Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Tue, 25 Jul 2023 17:09:05 -0700 Subject: [PATCH 4/7] updated project manager to handle all function without org instance --- snyk/client.py | 16 ++++++--- snyk/managers.py | 93 +++++++++++++++++++++++------------------------- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/snyk/client.py b/snyk/client.py index 7a189cf..f41f68f 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -5,6 +5,7 @@ import requests from requests.compat import urljoin from retry.api import retry_call +from copy import deepcopy from .__version__ import __version__ from .errors import SnykHTTPError, SnykNotImplementedError @@ -34,7 +35,7 @@ def __init__( backoff: int = 2, verify: bool = True, version: Optional[str] = None, - params: dict = {'limit':100}, + limit: int = 100, ): self.api_token = token self.api_url = url or self.API_URL @@ -50,7 +51,7 @@ def __init__( self.verify = verify self.version = version or self.VERSION self.web_url = web_url or self.WEB_URL - self.params = params + self.limit = limit # Ensure we don't have a trailing / if self.api_url[-1] == "/": @@ -252,7 +253,14 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: # this is a raw primative but a higher level module might want something that does an # arbitrary path + origin=foo + limit=100 url construction instead before being sent here - limit = params["limit"] + + # Making sure references to param can't be passed between methods + params = deepcopy(params) + + if 'limit' in params.keys(): + limit = params["limit"] + else: + limit = self.client.limit data = list() @@ -275,8 +283,6 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: for k, v in query.items(): params[k] = v - params["limit"] = limit - page = self.get(next_url.path, params).json() data.extend(page["data"]) diff --git a/snyk/managers.py b/snyk/managers.py index ffe52b0..7715b0c 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -3,6 +3,7 @@ import json from typing import Any, Dict, List from requests.compat import urljoin +from copy import deepcopy from deprecation import deprecated # type: ignore @@ -113,7 +114,7 @@ def filter(self, **kwargs: Any): class OrganizationManager(Manager): def all(self): - params = {'limit': self.client.params['limit']} + params = {'limit': self.client.limit} resp = self.client.get_rest_pages("/orgs", params) orgs = [] @@ -167,7 +168,7 @@ def get(self, id: str): class OrganizationGroupManager(Manager): def all(self): - params = {'limit': self.client.params['limit']} + params = {'limit': self.client.limit} resp = self.client.get_rest_pages("/groups", params) groups = [] @@ -274,7 +275,7 @@ def delete(self, key, value) -> bool: return True class UserManager(Manager): - + def all(self) -> Any: pass # pragma: no cover @@ -302,43 +303,56 @@ def self(self): # Since the implementation uses filtering by tags, use an older API version that has this available https://apidocs.snyk.io/?version=2022-07-08%7Ebeta#get-/orgs/-org_id-/projects # See annotations on the class snyk/models.py#L451-L452 for what data needs to be fetched from elsewhere or constructed class ProjectManager(Manager): - #def _query(self, tags: List[Dict[str, str]] = []): + def _map_rest_data_to_project_model(self, data: dict = {}): + """Takes the data field from a rest API query for the /orgs/{org_id}/projects/{project_id} query and maps it to the Project model + + :param data: dictionary data field from a rest API call to /orgs/{org_id}/projects/{project_id} + + :return: Project model""" + + attr = data['attributes'] + + project_data = { + 'name': attr['name'], + 'id': data['id'], + 'created': attr['created'], + 'origin': attr['origin'], + 'type': attr['type'], + 'readOnly': attr['read_only'], + 'testFrequency': attr['settings']['recurring_tests']['frequency'], + 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'isMonitored': attr['status'] if attr['status'] == 'active' else False, + 'targetReference': attr['target_reference'], + 'organization': self.instance.to_dict(), + '_tags': attr['tags'] if 'tags' in attr.keys() else [], + 'attributes': {'criticality': attr['business_criticality'], + 'environment': attr['environment'], + 'lifecycle': attr['lifecycle']}, + } + + return self.klass.from_dict(project_data) + def _query(self, params: dict = {}): + # Making sure references to param can't be passed between methods + params = deepcopy(params) + projects = [] if self.instance: if 'limit' not in params.keys(): - params['limit'] = self.client.params['limit'] + params['limit'] = self.client.limit path = "orgs/%s/projects" % self.instance.id + resp = self.client.get_rest_pages(path, params) - + for project in resp: - attributes = project['attributes'] - - project_data = { - 'name': attributes['name'], - 'id': project['id'], - 'created': attributes['created'], - 'origin': attributes['origin'], - 'type': attributes['type'], - 'readOnly': attributes['read_only'], - 'testFrequency': attributes['settings']['recurring_tests']['frequency'], - 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), - 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, - 'targetReference': attributes['target_reference'], - 'organization': self.instance.to_dict(), - '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], - 'attributes': {'criticality': attributes['business_criticality'], - 'environment': attributes['environment'], - 'lifecycle': attributes['lifecycle']}, - } - - project_klass = self.klass.from_dict(project_data) + model = self._map_rest_data_to_project_model(project) + projects.append(model) - projects.append(project_klass) else: for org in self.client.organizations.all(): projects.extend(org.projects.all()) + return projects def all(self): @@ -354,29 +368,10 @@ def get(self, id: str): if self.instance: path = "orgs/%s/projects/%s" % (self.instance.id, id) resp = self.client.get_rest_page(path) - attributes = resp['attributes'] - - project_data = { - 'name': attributes['name'], - 'id': resp['id'], - 'created': attributes['created'], - 'origin': attributes['origin'], - 'type': attributes['type'], - 'readOnly': attributes['read_only'], - 'testFrequency': attributes['settings']['recurring_tests']['frequency'], - 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), - 'isMonitored': attributes['status'] if attributes['status'] == 'active' else False, - 'targetReference': attributes['target_reference'], - 'organization': self.instance.to_dict(), - '_tags': attributes['tags'] if 'tags' in attributes.keys() else [], - 'attributes': {'criticality': attributes['business_criticality'], - 'environment': attributes['environment'], - 'lifecycle': attributes['lifecycle']}, - } - project_klass = self.klass.from_dict(project_data) + model = self._map_rest_data_to_project_model(resp) - return project_klass + return model else: return super().get(id) From 4b4d3ca72e9b4fa48816aaf6cff6ba3419ad45ce Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Mon, 31 Jul 2023 18:42:54 -0700 Subject: [PATCH 5/7] updated project manager to work with 07-28-2023~experimental release --- snyk/managers.py | 168 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 135 insertions(+), 33 deletions(-) diff --git a/snyk/managers.py b/snyk/managers.py index 7715b0c..c0db817 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -312,6 +312,7 @@ def _map_rest_data_to_project_model(self, data: dict = {}): attr = data['attributes'] + # Mandetory flags project_data = { 'name': attr['name'], 'id': data['id'], @@ -324,57 +325,153 @@ def _map_rest_data_to_project_model(self, data: dict = {}): 'isMonitored': attr['status'] if attr['status'] == 'active' else False, 'targetReference': attr['target_reference'], 'organization': self.instance.to_dict(), - '_tags': attr['tags'] if 'tags' in attr.keys() else [], 'attributes': {'criticality': attr['business_criticality'], 'environment': attr['environment'], 'lifecycle': attr['lifecycle']}, } + # '_tags': attr['tags'] if 'tags' in attr.keys() else [], + + # Optional flags + for key in data.keys(): + match key: + case 'attributes': + for attribute in data['attributes']: + match attribute: + case 'tags': + project_data['_tags'] = attr['tags'] + + case 'meta': + if 'latest_dependency_total' in data['meta'].keys(): + total = data['meta']['latest_dependency_total']['total'] + if total: + project_data['totalDependencies'] = total + else: + project_data['totalDependencies'] = 0 + if 'latest_issue_counts' in data['meta'].keys(): + project_data['issueCountsBySeverity'] = { + 'critical': int(data['meta']['latest_issue_counts']['critical']), + 'high': int(data['meta']['latest_issue_counts']['high']), + 'medium': int(data['meta']['latest_issue_counts']['medium']), + 'low': int(data['meta']['latest_issue_counts']['low']), + } return self.klass.from_dict(project_data) - def _query(self, params: dict = {}): - # Making sure references to param can't be passed between methods - params = deepcopy(params) + def filter(self, **kwargs: Any): + """This functions allows you to filter using all of the filters available on https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#tag--Projects - projects = [] - if self.instance: - if 'limit' not in params.keys(): - params['limit'] = self.client.limit - - path = "orgs/%s/projects" % self.instance.id - - resp = self.client.get_rest_pages(path, params) - - for project in resp: - model = self._map_rest_data_to_project_model(project) - projects.append(model) + The list of parameters below are a list of of available filters from version=2023-06-23~experimental as of 7/26/2023 + + :param target_id: List of strings (target IDs) + Return projects that belong to the provided targets + :param meta_count: string - Allowed: "only" + Only return the collection count + :param ids: List of strings (Project IDs) + Return projects that match the provided IDs + :param names: List of strings (Project names) + Return projects that match the provided names + :param origins: List of strings (origins) + Return projects that match the provided origins + :param types: List of strings (project types) + Return projects that match the provided types + :param expand: string - Allowed: "target" + Expand relationships + :param latest_issue_counts: bool + Include a summary count for the issues found in the most recent scan of this project + :param latest_dependency_total: bool + Include the total number of dependencies found in the most recent scan of this project + :param cli_monitored_before: date-time - Example: 2021-05-29T09:50:54.014Z + Filter projects uploaded and monitored before this date (encoded value) + :param cli_monitored_after: date-time - Example: 2021-05-29T09:50:54.014Z + Filter projects uploaded and monitored after this date (encoded value) + :param importing_user_public_id: List of strings + Return projects that match the provided importing user public ids. + :param tags: List of strings (tags) - List of dict() - Example: [{'key':'test_key', 'value':'test_value'}] + Return projects that match all the provided tags + :param business_criticality: List of strings - Allowed: critical ┃ high ┃ medium ┃ low + Return projects that match all the provided business_criticality value + :param environment: List of strings - Allowed: frontend ┃ backend ┃ internal ┃ external ┃ mobile ┃ saas ┃ onprem ┃ hosted ┃ distributed + Return projects that match all the provided environment values + :param lifecycle: List of strings - Allowed: production ┃ development ┃ sandbox + Return projects that match all the provided lifecycle values + :param version: string - The requested version of the endpoint to process the request + :param starting_after: string - Examples: v1.eyJpZCI6IjEwMDAifQo= + Return the page of results immediately after this cursor + :param ending_before: string - Examples: v1.eyJpZCI6IjExMDAifQo= + Return the page of results immediately before this cursor + :param limit: int - Default: 10 (Min: 10, Max: 100, only multiples of 10 allowed) + Number of results to return per page + """ - else: - for org in self.client.organizations.all(): - projects.extend(org.projects.all()) + filters = { + 'meta.latest_issue_counts': True, + 'meta.latest_dependency_total': True, + } - return projects + filters_list = [ + "target_id", + "meta_count", + "ids", + "names", + "origins", + "types", + "expand: string - Allowed", + "latest_issue_counts", + "latest_dependency_total", + "cli_monitored_before", + "cli_monitored_after", + "importing_user_public_id", + "tags", + "business_criticality", + "environment", + "lifecycle", + "version", + "starting_after", + "ending_before", + "limit", + ] + + filters_list.extend(list(filters.keys())) + + # Set new filters + for filter_name in filters_list: + if kwargs.get(filter_name): + if filter_name in ["latest_issue_counts","latest_dependency_total"] : + filters[f"meta.{filter_name}"] = kwargs[filter_name] + else: + filters[filter_name] = kwargs[filter_name] - def all(self): - return self._query() + #TODO: Add validation for every parameter to make sure + # They're each formatted correctly. - def filter(self, tags: List[Dict[str, str]] = [], **kwargs: Any): - if tags: - return self._filter_by_kwargs(self._query(tags), **kwargs) - else: - return super().filter(**kwargs) + + if 'limit' not in filters.keys(): + filters['limit'] = self.client.limit + + path = "orgs/%s/projects" % self.instance.id + + resp = self.client.get_rest_pages(path, filters) + + return resp def get(self, id: str): if self.instance: - path = "orgs/%s/projects/%s" % (self.instance.id, id) - resp = self.client.get_rest_page(path) - - model = self._map_rest_data_to_project_model(resp) - - return model + resp = self.filter(ids=[id]) + return self._map_rest_data_to_project_model(resp[0]) else: return super().get(id) + def all(self): + projects = [] + if self.instance: + resp = self.filter() + for project in resp: + model = self._map_rest_data_to_project_model(project) + projects.append(model) + else: + for org in self.client.organizations.all(): + projects.extend(org.projects.all()) + return projects class MemberManager(Manager): def all(self): @@ -588,6 +685,11 @@ def filter(self, **kwargs: Any): resp = self.client.post(path, post_body) return self.klass.from_dict(self._convert_reserved_words(resp.json())) + def all(self) -> Any: + self._query() + + + class IssueSetAggregatedManager(SingletonManager): def all(self) -> Any: From 0e113fcffecd6ffa8d09c4f5bd3150ae231796cc Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Tue, 1 Aug 2023 19:02:50 -0700 Subject: [PATCH 6/7] updated issueManager to return dictionary of new REST API endpoint. Still need to map content to an Issue object --- snyk/client.py | 6 +- snyk/managers.py | 183 +++++++++++++++++++++++++++++++++++++++-------- snyk/models.py | 15 +++- 3 files changed, 170 insertions(+), 34 deletions(-) diff --git a/snyk/client.py b/snyk/client.py index f41f68f..1098548 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -19,7 +19,7 @@ class SnykClient(object): WEB_URL = "https://app.snyk.io" API_URL = "https://api.snyk.io/rest" - VERSION = "2023-06-23~experimental" + VERSION = "2023-07-28~experimental" #API_URL = "https://api.snyk.io/v1" USER_AGENT = "pysnyk/%s" % __version__ @@ -256,7 +256,7 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: # Making sure references to param can't be passed between methods params = deepcopy(params) - + if 'limit' in params.keys(): limit = params["limit"] else: @@ -268,7 +268,7 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: data.extend(page["data"]) - while "next" in page["links"].keys(): + while "next" in page["links"].keys() and len(page["data"]) >= limit: logger.debug( f"GET_REST_PAGES: Another link exists: {page['links']['next']}" ) diff --git a/snyk/managers.py b/snyk/managers.py index c0db817..17ba9b1 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -62,7 +62,7 @@ def factory(klass, client, instance=None): "JiraIssue": JiraIssueManager, "DependencyGraph": DependencyGraphManager, "IssueSet": IssueSetManager, - "IssueSetAggregated": IssueSetAggregatedManager, + "IssueSetAggregated": IssueSetAggregatedManager2, "Integration": IntegrationManager, "IntegrationSetting": IntegrationSettingManager, "Tag": TagManager, @@ -321,7 +321,7 @@ def _map_rest_data_to_project_model(self, data: dict = {}): 'type': attr['type'], 'readOnly': attr['read_only'], 'testFrequency': attr['settings']['recurring_tests']['frequency'], - 'browseUrl': urljoin(self.instance.url,'/project/{}'.format(id)), + 'browseUrl': f"{self.instance.url}/project/{data['id']}", 'isMonitored': attr['status'] if attr['status'] == 'active' else False, 'targetReference': attr['target_reference'], 'organization': self.instance.to_dict(), @@ -333,34 +333,33 @@ def _map_rest_data_to_project_model(self, data: dict = {}): # Optional flags for key in data.keys(): - match key: - case 'attributes': - for attribute in data['attributes']: - match attribute: - case 'tags': - project_data['_tags'] = attr['tags'] - - case 'meta': - if 'latest_dependency_total' in data['meta'].keys(): - total = data['meta']['latest_dependency_total']['total'] - if total: - project_data['totalDependencies'] = total - else: - project_data['totalDependencies'] = 0 - if 'latest_issue_counts' in data['meta'].keys(): - project_data['issueCountsBySeverity'] = { - 'critical': int(data['meta']['latest_issue_counts']['critical']), - 'high': int(data['meta']['latest_issue_counts']['high']), - 'medium': int(data['meta']['latest_issue_counts']['medium']), - 'low': int(data['meta']['latest_issue_counts']['low']), - } + if 'attributes' == key: + for attribute in data['attributes']: + if 'tags' == attribute: + project_data['_tags'] = attr['tags'] + + if 'meta' == key: + if 'latest_dependency_total' in data['meta'].keys(): + total = data['meta']['latest_dependency_total']['total'] + if total: + project_data['totalDependencies'] = total + else: + project_data['totalDependencies'] = 0 + if 'latest_issue_counts' in data['meta'].keys(): + project_data['issueCountsBySeverity'] = { + 'critical': int(data['meta']['latest_issue_counts']['critical']), + 'high': int(data['meta']['latest_issue_counts']['high']), + 'medium': int(data['meta']['latest_issue_counts']['medium']), + 'low': int(data['meta']['latest_issue_counts']['low']), + } return self.klass.from_dict(project_data) def filter(self, **kwargs: Any): - """This functions allows you to filter using all of the filters available on https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#tag--Projects + """This functions allows you to filter using all of the filters available on https://apidocs.snyk.io/experimental?version=2023-07-28%7Eexperimental#tag--Projects + Then returns the project dictionaries in a list - The list of parameters below are a list of of available filters from version=2023-06-23~experimental as of 7/26/2023 + The list of parameters below are a list of of available filters from version=2023-07-28~experimental as of 7/31/2023 :param target_id: List of strings (target IDs) Return projects that belong to the provided targets @@ -401,6 +400,8 @@ def filter(self, **kwargs: Any): Return the page of results immediately before this cursor :param limit: int - Default: 10 (Min: 10, Max: 100, only multiples of 10 allowed) Number of results to return per page + + :return: returns a list of dictionaries. One dictionary for each project included in the query. """ filters = { @@ -432,10 +433,11 @@ def filter(self, **kwargs: Any): ] filters_list.extend(list(filters.keys())) - + # Set new filters - for filter_name in filters_list: - if kwargs.get(filter_name): + for filter_name in kwargs.keys(): + if filter_name in filters_list: + # map variable name to api parameter name if filter_name in ["latest_issue_counts","latest_dependency_total"] : filters[f"meta.{filter_name}"] = kwargs[filter_name] else: @@ -444,7 +446,6 @@ def filter(self, **kwargs: Any): #TODO: Add validation for every parameter to make sure # They're each formatted correctly. - if 'limit' not in filters.keys(): filters['limit'] = self.client.limit @@ -454,7 +455,20 @@ def filter(self, **kwargs: Any): return resp + def query(self, **kwargs: Any): + """Utility to query data from the api and return a list of project data models""" + if self.instance: + projects = [] + resp = self.filter(**kwargs) + for project in resp: + projects.append(self._map_rest_data_to_project_model(project)) + return projects + else: + return super().get(**kwargs) + def get(self, id: str): + """legacy get method for backward compatibility""" + if self.instance: resp = self.filter(ids=[id]) return self._map_rest_data_to_project_model(resp[0]) @@ -689,7 +703,118 @@ def all(self) -> Any: self._query() +class IssueSetAggregatedManager2(SingletonManager): + """ + TODO: Update the Issue model to match the latest changes to API + def _map_rest_data_to_issue_model(self, data: dict = {}): + ''' + Lets map what we can, then set the rest to optional + Things to map: + + # Mapped + id: str + issueType: str + isIgnored: bool + + # Not Mapped + pkgName: str + pkgVersions: List[str] + isPatched: bool + fixInfo: FixInfo + introducedThrough: Optional[List[Any]] = None + ignoreReasons: Optional[List[Any]] = None + # Not mentioned in schema but returned + # https://snyk.docs.apiary.io/#reference/projects/aggregated-project-issues/list-all-aggregated-issues + priorityScore: Optional[int] = None + priority: Optional[Any] = None + + issueData: IssueData + # issueData fields to map + ## Mapped + id: str + title: str + severity: str + url: str + + ## Unmapped + exploitMaturity: str + description: Optional[str] = None + identifiers: Optional[Any] = None + credit: Optional[List[str]] = None + semver: Optional[Any] = None + publicationTime: Optional[str] = None + disclosureTime: Optional[str] = None + CVSSv3: Optional[str] = None + cvssScore: Optional[str] = None + language: Optional[str] = None + patches: Optional[Any] = None + nearestFixedInVersion: Optional[str] = None + ignoreReasons: Optional[List[Any]] = None + + ''' + + # Map the required items + attr = data['attributes'] + issue_data = { + 'id': attr['key'], # This appears to be what Snyk is using for the id and not the actual ID for "package_vulnerability" type issues + 'issueType': attr['type'], + 'title': attr['title'], + 'description': "", # Need to get details from somewhere else + 'issueData': { + 'id': attr['key'] + 'title': attr['title'] + 'severity': attr['effective_severity_level'] + exploitMaturity: str + description: Optional[str] = None + identifiers: Optional[Any] = None + credit: Optional[List[str]] = None + semver: Optional[Any] = None + publicationTime: Optional[str] = None + disclosureTime: Optional[str] = None + CVSSv3: Optional[str] = None + cvssScore: Optional[str] = None + language: Optional[str] = None + patches: Optional[Any] = None + nearestFixedInVersion: Optional[str] = None + ignoreReasons: Optional[List[Any]] = None + }, + 'isIgnored': attr['ignored'], + + } + + # Calling object is a Project + if hasattr(self.instance, 'organization'): + # Generate URL + issue_data['issueData']['url'] = f"{self.instance.browseUrl}#issue-{attr['key']}" + + # Calling object is an Organization + else: + issue_data['issueData']['url'] = f"{self.instance.url}/project/{data['relationships']['scan_item']['id']}#issue-{attr['key']}" + """ + def filter(self, **kwargs: Any): + + filters = {} + if 'limit' not in filters.keys(): + filters['limit'] = self.client.limit + + # Calling object is a Project + if hasattr(self.instance, 'organization'): + filters['scan_item.id'] = self.instance.id + filters['scan_item.type'] = 'project' + + path = f"orgs/{self.instance.organization.id}/issues" + + # Calling object is an Organization + else: + path = f"orgs/{self.instance.id}/issues" + + resp = self.client.get_rest_pages(path, filters) + + return resp + def all(self): + return self.filter() + class IssueSetAggregatedManager(SingletonManager): def all(self) -> Any: diff --git a/snyk/models.py b/snyk/models.py index 4785bb1..9c07c85 100644 --- a/snyk/models.py +++ b/snyk/models.py @@ -190,6 +190,10 @@ def entitlements(self) -> Manager: def integrations(self) -> Manager: return Manager.factory(Integration, self.client, self) + @property + def issues(self) -> Manager: + return Manager.factory(IssueSetAggregated, self.client, self) + """ Imports need integrations, but exposing a high-level API that can find the integration from the URL of the thing you want @@ -589,7 +593,10 @@ class Project(DataClassJSONMixin): targetReference: str attributes: Attributes organization: Organization + _tags: Optional[List[Any]] = field(default_factory=list) + totalDependencies: Optional[int] = field(default_factory=int) + issueCountsBySeverity: Optional[IssueCounts] = None # TODO: delete remediation. This must be a mistake, there is no remediation in /projects v1 response # remediation: Optional[Dict[Any, Any]] = field(default_factory=dict) # TODO: this is from individual project call only? @@ -601,8 +608,8 @@ class Project(DataClassJSONMixin): owner: Optional[User] = None # TODO: use the owner ID and call this API to get the rest of the data https://apidocs.snyk.io/?version=2023-05-29%7Ebeta#get-/orgs/-org_id-/users/-id- # Variables that can't be set using REST API yet. Leaving them as optional so they can be set manually in the future or implemented on their own. - totalDependencies: Optional[int] = field(default_factory=int)# TODO: don't see this in the REST API yet. - issueCountsBySeverity: Optional[IssueCounts] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots + #totalDependencies: Optional[int] = field(default_factory=int)# TODO: don't see this in the REST API yet. + #issueCountsBySeverity: Optional[IssueCounts] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots lastTestedDate: Optional[str] = field(default_factory=str) # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageId: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots imageTag: Optional[str] = None # TODO: Can be fetched from the latest project snapshot via v1 https://snyk.docs.apiary.io/#reference/projects/project-history/list-all-project-snapshots @@ -681,6 +688,10 @@ def issueset(self) -> Manager: def issueset_aggregated(self) -> Manager: return Manager.factory(IssueSetAggregated, self.organization.client, self) + @property + def issues(self) -> Manager: + return Manager.factory(IssueSetAggregated, self.organization.client, self) + @property def vulnerabilities(self) -> List[Vulnerability]: vuln_filter = { From 7deb1b85546df9c4dca084af63738632b39100ff Mon Sep 17 00:00:00 2001 From: Vincent Faires Date: Tue, 29 Aug 2023 21:40:28 -0700 Subject: [PATCH 7/7] updated to handle retrieving and deleting group level tags --- snyk/client.py | 12 ++++--- snyk/managers.py | 90 ++++++++++++++++++++++++++++++++++++++---------- snyk/models.py | 6 ++++ 3 files changed, 84 insertions(+), 24 deletions(-) diff --git a/snyk/client.py b/snyk/client.py index 1098548..62fb395 100644 --- a/snyk/client.py +++ b/snyk/client.py @@ -19,8 +19,8 @@ class SnykClient(object): WEB_URL = "https://app.snyk.io" API_URL = "https://api.snyk.io/rest" - VERSION = "2023-07-28~experimental" - #API_URL = "https://api.snyk.io/v1" + VERSION = "2023-08-04~experimental" + API_URL_V1 = "https://api.snyk.io/v1" USER_AGENT = "pysnyk/%s" % __version__ def __init__( @@ -200,7 +200,7 @@ def get( fkwargs = {"headers": self.api_headers} logger.debug(f"GET: {debug_url}") - + resp = retry_call( self.request, fargs=[requests.get, url], @@ -246,7 +246,7 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: """ Helper function to collect paginated responses from the rest API into a single list. - +ff This collects the "data" list from the first reponse and then appends the any further "data" lists if a next link is found in the links field. """ @@ -260,7 +260,9 @@ def get_rest_pages(self, path: str, params: dict = {}) -> List: if 'limit' in params.keys(): limit = params["limit"] else: - limit = self.client.limit + limit = self.limit + params["limit"] = self.limit + print("limit: ", limit) data = list() diff --git a/snyk/managers.py b/snyk/managers.py index 17ba9b1..3dd302a 100644 --- a/snyk/managers.py +++ b/snyk/managers.py @@ -51,24 +51,25 @@ def factory(klass, client, instance=None): else: key = klass.__name__ manager = { - "Project": ProjectManager, - "Organization": OrganizationManager, - "Member": MemberManager, - "License": LicenseManager, - "Dependency": DependencyManager, - "Entitlement": EntitlementManager, - "Setting": SettingManager, - "Ignore": IgnoreManager, - "JiraIssue": JiraIssueManager, - "DependencyGraph": DependencyGraphManager, - "IssueSet": IssueSetManager, - "IssueSetAggregated": IssueSetAggregatedManager2, - "Integration": IntegrationManager, - "IntegrationSetting": IntegrationSettingManager, - "Tag": TagManager, - "IssuePaths": IssuePathsManager, - "OrganizationGroup": OrganizationGroupManager, - "User": UserManager, + "Project": ProjectManager, + "Organization": OrganizationManager, + "Member": MemberManager, + "License": LicenseManager, + "Dependency": DependencyManager, + "Entitlement": EntitlementManager, + "Setting": SettingManager, + "Ignore": IgnoreManager, + "JiraIssue": JiraIssueManager, + "DependencyGraph": DependencyGraphManager, + "IssueSet": IssueSetManager, + "IssueSetAggregated": IssueSetAggregatedManager2, + "Integration": IntegrationManager, + "IntegrationSetting": IntegrationSettingManager, + "Tag": TagManager, + "IssuePaths": IssuePathsManager, + "OrganizationGroup": OrganizationGroupManager, + "OrganizationGroupTags": OrganizationGroupTagManager, + "User": UserManager, }[key] return manager(klass, client, instance) @@ -166,6 +167,36 @@ def get(self, id: str): return self.klass.from_dict(org_template) +class OrganizationGroupTagManager(Manager): + def all(self): + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + + resp = self.client.get(f'/group/{self.instance.id}/tags?perPage=50000&page=1') + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + + return resp.json()['tags'] + + def delete(self,key: str, value: str, force: bool=False) -> bool: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + + body = {'key': key, + 'value': value, + 'force': force} + + resp = self.client.post(f'/group/{self.instance.id}/tags/delete', body) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + + if resp.status_code == 200: + return True + else: + return False + class OrganizationGroupManager(Manager): def all(self): params = {'limit': self.client.limit} @@ -196,7 +227,7 @@ def get(self, id: str): except Exception as e: raise e - return self.klass(resp['attributes']['name'],resp['id']) + return self.klass(resp['attributes']['name'],resp['id'],self.client) def filter(self, **kwargs: Any): @@ -593,24 +624,41 @@ def update(self, **kwargs: bool) -> bool: class IgnoreManager(DictManager): def all(self) -> Dict[str, List[object]]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/ignores" % ( self.instance.organization.id, self.instance.id, ) resp = self.client.get(path) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL return resp.json() + class JiraIssueManager(DictManager): def all(self) -> Dict[str, List[object]]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/jira-issues" % ( self.instance.organization.id, self.instance.id, ) resp = self.client.get(path) + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + return resp.json() def create(self, issue_id: str, fields: Any) -> Dict[str, str]: + # Version 1 endpoint + self.client.api_url = self.client.API_URL_V1 + path = "org/%s/project/%s/issue/%s/jira-issue" % ( self.instance.organization.id, self.instance.id, @@ -619,6 +667,10 @@ def create(self, issue_id: str, fields: Any) -> Dict[str, str]: post_body = {"fields": fields} resp = self.client.post(path, post_body) response_data = resp.json() + + # Reset to REST endpoint + self.client.api_url = self.client.API_URL + # The response we get is not following the schema as specified by the api # https://snyk.docs.apiary.io/#reference/projects/project-jira-issues-/create-jira-issue if ( diff --git a/snyk/models.py b/snyk/models.py index 9c07c85..c13cedf 100644 --- a/snyk/models.py +++ b/snyk/models.py @@ -134,6 +134,12 @@ class OrganizationGroup(DataClassJSONMixin): name: str id: str + client: Optional[Any] = None + + @property + def tags(self) -> Manager: + return Manager.factory("OrganizationGroupTags", self.client, self) + @dataclass class Package(DataClassJSONMixin):