Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/update for rest #184

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
89 changes: 78 additions & 11 deletions snyk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,39 @@
from typing import Any, List, Optional

import requests
from requests.compat import urljoin
from retry.api import retry_call
from copy import deepcopy

from .__version__ import __version__
from .errors import SnykHTTPError, SnykNotImplementedError
from .managers import Manager
from .models import Organization, Project
from .models import Organization, Project, OrganizationGroup, User
from .utils import cleanup_path

logger = logging.getLogger(__name__)


class SnykClient(object):
API_URL = "https://api.snyk.io/v1"
WEB_URL = "https://app.snyk.io"
API_URL = "https://api.snyk.io/rest"
VERSION = "2023-08-04~experimental"
API_URL_V1 = "https://api.snyk.io/v1"
USER_AGENT = "pysnyk/%s" % __version__

def __init__(
self,
token: str,
url: Optional[str] = None,
web_url: Optional[str] = None,
user_agent: Optional[str] = USER_AGENT,
debug: bool = False,
tries: int = 1,
delay: int = 1,
backoff: int = 2,
verify: bool = True,
version: Optional[str] = None,
limit: int = 100,
):
self.api_token = token
self.api_url = url or self.API_URL
Expand All @@ -42,7 +49,9 @@ def __init__(
self.backoff = backoff
self.delay = delay
self.verify = verify
self.version = version
self.version = version or self.VERSION
self.web_url = web_url or self.WEB_URL
self.limit = limit

# Ensure we don't have a trailing /
if self.api_url[-1] == "/":
Expand All @@ -64,6 +73,7 @@ def request(
resp = method(
url, headers=headers, params=params, json=json, verify=self.verify
)

elif params and not json:
resp = method(url, headers=headers, params=params, verify=self.verify)
elif json and not params:
Expand Down Expand Up @@ -97,6 +107,39 @@ def post(self, path: str, body: Any, headers: dict = {}) -> requests.Response:

return resp

def patch(self, path: str, body: Any, headers: dict = {}, params: dict = None) -> requests.Response:
path = cleanup_path(path)

url = f"{self.api_url}/{path}"

logger.debug(f"PATCH: {url}")

if params or self.version:

if not params:
params = {}

# we use the presence of version to determine if we are REST or not
if "version" not in params.keys() and self.version:
params["version"] = self.version

resp = retry_call(
self.request,
fargs=[requests.patch, url],
fkwargs={"json": body, "headers": {**self.api_post_headers, **headers}, "params": params},
tries=self.tries,
delay=self.delay,
backoff=self.backoff,
exceptions=SnykHTTPError,
logger=logger,
)

if not resp.ok:
logger.error(resp.text)
raise SnykHTTPError(resp)

return resp

def put(self, path: str, body: Any, headers: dict = {}) -> requests.Response:
url = "%s/%s" % (self.api_url, path)
logger.debug("PUT: %s" % url)
Expand Down Expand Up @@ -192,39 +235,56 @@ def delete(self, path: str) -> requests.Response:

return resp

def get_rest_page(self, path: str, params: dict = {}) -> dict:
"""Helper function to colleged unpaginated responses from the rest API as a dictionary

This takes the "data" list from the response and returns it"""

return self.get(path, params).json()['data']

def get_rest_pages(self, path: str, params: dict = {}) -> List:
"""
Helper function to collect paginated responses from the rest API into a single
list.

ff
This collects the "data" list from the first reponse and then appends the
any further "data" lists if a next link is found in the links field.
"""

# this is a raw primative but a higher level module might want something that does an
# arbitrary path + origin=foo + limit=100 url construction instead before being sent here

# Making sure references to param can't be passed between methods
params = deepcopy(params)

limit = params["limit"]
if 'limit' in params.keys():
limit = params["limit"]
else:
limit = self.limit
params["limit"] = self.limit
print("limit: ", limit)

data = list()

page = self.get(path, params).json()

data.extend(page["data"])

while "next" in page["links"].keys():
while "next" in page["links"].keys() and len(page["data"]) >= limit:
logger.debug(
f"GET_REST_PAGES: Another link exists: {page['links']['next']}"
)

# Check for "/rest" at the end of the root url and beginning of next url
if page["links"]["next"][:5] == "/rest" and self.api_url[-5:] == "/rest":
page["links"]["next"] = page["links"]["next"][5:]

next_url = urllib.parse.urlsplit(page["links"]["next"])
query = urllib.parse.parse_qs(next_url.query)

for k, v in query.items():
params[k] = v

params["limit"] = limit

page = self.get(next_url.path, params).json()

data.extend(page["data"])
Expand Down Expand Up @@ -254,9 +314,16 @@ def notification_settings(self):
# https://snyk.docs.apiary.io/#reference/groups/organisations-in-groups/create-a-new-organisation-in-the-group
# https://snyk.docs.apiary.io/#reference/0/list-members-in-a-group/list-all-members-in-a-group
# https://snyk.docs.apiary.io/#reference/0/members-in-an-organisation-of-a-group/add-a-member-to-an-organisation-from-another-organisation-in-the-group
def groups(self):
raise SnykNotImplementedError # pragma: no cover
@property
def groups(self) -> Manager:
return Manager.factory(OrganizationGroup, self)

# https://snyk.docs.apiary.io/#reference/reporting-api/issues/get-list-of-issues
def issues(self):
raise SnykNotImplementedError # pragma: no cover

# At the client level this should only be able to return the results for /self
# https://apidocs.snyk.io/experimental?version=2023-06-23%7Eexperimental#get-/self
@property
def users(self) -> Manager:
return Manager.factory(User, self)
Loading