Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove file noqa ubuntu git provider #365

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions src/vunnel/providers/ubuntu/git.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# flake8: noqa
from __future__ import annotations

import logging
Expand Down Expand Up @@ -31,7 +30,7 @@ class UbuntuGitServer503Error(Exception):

def __init__(self):
super().__init__(
"The ubuntu git server is unavailable, try again later or switch to the git protocol endpoint git://git.launchpad.net/ubuntu-cve-tracker"
"The ubuntu git server is unavailable, try again later or switch to the git protocol endpoint git://git.launchpad.net/ubuntu-cve-tracker",
)


Expand All @@ -54,8 +53,13 @@ class GitWrapper:
_head_rev_cmd_ = "git rev-parse HEAD"
_ubuntu_server_503_message = "error: RPC failed; HTTP 503 curl 22 The requested URL returned error: 503"

def __init__(
self, source: str, branch: str, checkout_dest: str, workspace: str | None = None, logger: logging.Logger | None = None
def __init__( # noqa: PLR0913
self,
source: str,
branch: str,
checkout_dest: str,
workspace: str | None = None,
logger: logging.Logger | None = None,
):
self.src = source
self.branch = branch
Expand All @@ -69,7 +73,7 @@ def __init__(

try:
out = self._exec_cmd(self._check_cmd_)
self.logger.trace("git executable verified using cmd: {}, output: {}".format(self._check_cmd_, out.decode()))
self.logger.trace(f"git executable verified using cmd: {self._check_cmd_}, output: {out.decode()}")
except:
self.logger.exception('could not find required "git" executable. Please install git on host')
raise
Expand Down Expand Up @@ -99,7 +103,7 @@ def _clone_repo(self):
self.logger.info(f"cloning git repository {self.src} branch {self.branch} to {self.dest}")
cmd = self._clone_cmd_.format(src=self.src, dest=self.dest, branch=self.branch)
out = self._exec_cmd(cmd)
self.logger.debug("initialized git repo, cmd: {}, output: {}".format(cmd, out.decode()))
self.logger.debug(f"initialized git repo, cmd: {cmd}, output: {out.decode()}")
self._write_graph()
except:
self.logger.exception(f"failed to clone git repository {self.src} branch {self.branch} to {self.dest}")
Expand All @@ -111,7 +115,7 @@ def init_repo(self, force=False):
self._delete_repo()

if self._check(self.dest):
self.logger.debug("found git repository at {}".format(self.dest))
self.logger.debug(f"found git repository at {self.dest}")
self.sync_with_upstream()
return

Expand Down Expand Up @@ -183,7 +187,7 @@ def _write_graph(self):

def get_merged_change_set(self, from_rev: str, to_rev: str | None = None) -> tuple[dict[str, str], dict[str, str]]:
try:
self.logger.trace("fetching changes set between revisions {} - {}".format(from_rev, to_rev))
self.logger.trace(f"fetching changes set between revisions {from_rev} - {to_rev}")
cmd = self._change_set_cmd_.format(from_rev=from_rev, to_rev=to_rev if to_rev else "")
out = self._exec_cmd(cmd, cwd=self.dest)
commit_list = self._parse_log(out.decode())
Expand All @@ -194,10 +198,10 @@ def get_merged_change_set(self, from_rev: str, to_rev: str | None = None) -> tup

def get_revision_history(self, cve_id: str, file_path: str, from_rev: str | None = None) -> list[GitRevision]:
try:
self.logger.trace("fetching revision history for {}".format(file_path))
self.logger.trace(f"fetching revision history for {file_path}")
return self.cve_rev_history.get(cve_id, [])
except:
self.logger.exception("failed to fetch the revision history for {}".format(file_path))
self.logger.exception(f"failed to fetch the revision history for {file_path}")
raise

def get_content(self, git_rev: GitRevision) -> list[str]:
Expand Down Expand Up @@ -368,16 +372,15 @@ def _parse_normalized_commit(self, commit_lines: list[list[str]]) -> GitCommitSu
updated[cve_id] = components[2]
else:
# either not a commit line or an irrelevant file, ignore it
self.logger.debug("skipping unknown change symbol {}".format(components[0]))
self.logger.debug(f"skipping unknown change symbol {components[0]}")
else:
# not a match
pass

if updated or deleted:
deleted = {key: value for key, value in deleted.items() if key not in updated}
return GitCommitSummary(sha=commit_lines[0][0], updated=updated, deleted=deleted)
else:
return None
return None

def _exec_cmd(self, cmd, *args, **kwargs) -> bytes:
"""
Expand All @@ -388,14 +391,14 @@ def _exec_cmd(self, cmd, *args, **kwargs) -> bytes:
:return:
"""
try:
self.logger.trace("running: {}".format(cmd))
self.logger.trace(f"running: {cmd}")
cmd_list = shlex.split(cmd)
return subprocess.check_output(cmd_list, *args, **kwargs, stderr=subprocess.PIPE) # nosec
# S603 disable exaplanation: running git commands by design
return subprocess.check_output(cmd_list, *args, **kwargs, stderr=subprocess.PIPE) # noqa: S603
except Exception as e:
self.logger.exception("error executing command: {}".format(cmd))
self.logger.exception(f"error executing command: {cmd}")

if isinstance(e, subprocess.CalledProcessError):
if e.stderr and self._ubuntu_server_503_message in e.stderr.decode():
raise UbuntuGitServer503Error()
if isinstance(e, subprocess.CalledProcessError) and e.stderr and self._ubuntu_server_503_message in e.stderr.decode():
raise UbuntuGitServer503Error from e

raise e
Loading