Skip to content
This repository has been archived by the owner on Dec 7, 2021. It is now read-only.

Test upload module #11

Merged
merged 11 commits into from
Jun 20, 2020
4 changes: 4 additions & 0 deletions backup_to_cloud/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ class BackupError(Exception):
pass


class MultipleFilesError(BackupError):
pass


class NoFilesFoundError(BackupError):
pass

Expand Down
6 changes: 3 additions & 3 deletions backup_to_cloud/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
name,
type,
root_path,
cloud_folder_id=None,
cloud_folder_id="root",
zip=False,
zipname=None,
filter=".",
Expand All @@ -53,8 +53,8 @@ def __init__(
encouraged to check the regex before creating the first backup.
To check the regex check README). Defaults to '.'.
cloud_folder_id (str, optional): id of the folder to save the file(s)
into. If is not present or is None, the files will be stored in
the root folder (`Drive`). Defaults to None.
into. If is not present or is 'root', the files will be stored in
the root folder (`Drive`). Defaults to 'root'.
zip (bool, optional): If True and type is folder, all files will be uploaded as
zip. Defaults to False.
zipname (str, optional): if zip, this is the file name. Defaults to None.
Expand Down
53 changes: 25 additions & 28 deletions backup_to_cloud/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,52 @@

from googleapiclient.http import MediaIoBaseUpload

from .exceptions import MultipleFilesError
from .utils import get_google_drive_services, log

FileData = Union[Path, str, BytesIO]


def backup(file_data: FileData, mimetype, folder=None, filename=None):
def backup(file_data: FileData, mimetype, folder_id, filename=None):
if isinstance(file_data, (str, Path)):
filepath = Path(file_data)
if not filepath.exists():
raise FileNotFoundError(filepath.as_posix())
exc = FileNotFoundError(filepath.as_posix())
log(exc)
raise exc

filename = filename or filepath.name
file_data = BytesIO(filepath.read_bytes())
del filepath
else:
if not filename:
exc = ValueError("if file_data is BytesIO, filename is required")
log(exc)
raise exc

service = get_google_drive_services()
file_metadata = {"name": filename, "mimeType": mimetype}
query = "name = %r" % (filename)
query = f"name = {filename!r} and {folder_id!r} in parents"

if folder:
file_metadata["parents"] = [folder]
query += " and %r in parents" % (folder)

response = service.files().list(q=query, fields="files(id, name)",).execute()
response = service.files().list(q=query, fields="files(id, name)").execute()
ids = [x.get("id") for x in response.get("files", [])]

if len(ids) > 1:
raise RuntimeError("Files should not be more than one")
msg = (
"Detected more than one file named "
f"{filename!r} in the target folder {ids!r}"
)
exc = MultipleFilesError(msg)
log(exc)
raise exc

if ids:
return save_version(service, file_data, mimetype, ids[0], filename)
return save_new_file(service, file_data, mimetype, folder, filename)
return save_new_file(service, file_data, mimetype, folder_id, filename)


def save_new_file(gds, file_data: BytesIO, mimetype, folder=None, filename=None):
log("saving new file: %s", filename)
file_metadata = {"name": filename, "mimeType": mimetype}

if folder:
file_metadata["parents"] = [folder]
def save_new_file(gds, file_data: BytesIO, mimetype, folder_id, filename=None):
log("Saving new file: %s", filename)
file_metadata = {"name": filename, "mimeType": mimetype, "parents": folder_id}

media = MediaIoBaseUpload(file_data, mimetype=mimetype)
res = (
Expand All @@ -53,20 +59,11 @@ def save_new_file(gds, file_data: BytesIO, mimetype, folder=None, filename=None)


def save_version(gds, file_data: BytesIO, mimetype, file_id: str, filename=None):
log("saving new version of %s", filename)
file_metadata = {
"name": filename,
"published": True,
}
log("Saving new version of %s", filename)
media = MediaIoBaseUpload(file_data, mimetype=mimetype)
response = (
gds.files()
.update(
fileId=file_id,
keepRevisionForever=False,
body=file_metadata,
media_body=media,
)
.update(fileId=file_id, keepRevisionForever=False, media_body=media,)
.execute()
)

Expand Down
6 changes: 5 additions & 1 deletion backup_to_cloud/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
("application/x-rar-compressed", ".rar"),
("application/x-sqlite3", ".db"),
("application/x-sqlite3", ".sqlite"),
("application/x-yaml", ".yaml"),
("application/x-yaml", ".yml"),
("application/xml", ".xml"),
("application/zip", ".zip"),
("image/x-ms-bmp", ".bmp"),
Expand Down Expand Up @@ -92,7 +94,9 @@ def get_mimetype(filepath: str) -> str:
str: mimetype of `filepath`.
"""

return mimetypes.guess_type(filepath)[0] or "application/octet-stream"
mime_type = mimetypes.guess_type(filepath)[0] or "application/octet-stream"
log("Mimetype of %r is %r", filepath, mime_type)
return mime_type


def _improve_mimetypes():
Expand Down
12 changes: 12 additions & 0 deletions tests/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from backup_to_cloud.exceptions import (
BackupError,
MultipleFilesError,
NoFilesFoundError,
SettingsError,
TokenError,
Expand All @@ -19,6 +20,17 @@ def test_raises(self):
raise BackupError


class TestMultipleFilesError:
def test_inheritance(self):
exc = MultipleFilesError()
assert isinstance(exc, MultipleFilesError)
assert isinstance(exc, BackupError)

def test_raises(self):
with pytest.raises(MultipleFilesError):
raise MultipleFilesError


class TestNoFilesFoundError:
def test_inheritance(self):
exc = NoFilesFoundError()
Expand Down
7 changes: 7 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def mocks(self):
self.list_files_m = mock.patch("backup_to_cloud.main.list_files").start()
self.zipfile_m = mock.patch("backup_to_cloud.main.ZipFile").start()
self.bytesio_m = mock.patch("backup_to_cloud.main.BytesIO").start()
self.log_m = mock.patch("backup_to_cloud.main.log").start()

yield

Expand All @@ -36,6 +37,7 @@ def test_no_root_path(self):
self.get_mt_m.assert_not_called()
self.get_settings_m.assert_called_once_with()
self.list_files_m.assert_not_called()
self.log_m.assert_called_once_with("Excluding entry %r", "<name>")

def test_single_file(self):
entry = BackupEntry("<name>", "single-file", "/home/file.pdf", "<folder-id>")
Expand All @@ -50,6 +52,7 @@ def test_single_file(self):
self.get_mt_m.assert_called_once_with("/home/file.pdf")
self.get_settings_m.assert_called_once_with()
self.list_files_m.assert_not_called()
self.log_m.assert_not_called()

@pytest.mark.parametrize("nulls", range(11))
def test_single_file_mix(self, nulls):
Expand All @@ -75,9 +78,13 @@ def test_single_file_mix(self, nulls):
self.backup_m.assert_not_called()
self.get_mt_m.assert_not_called()

if nulls != 0:
self.log_m.assert_called_with("Excluding entry %r", "<name>")

self.bytesio_m.assert_not_called()
assert self.backup_m.call_count == 10 - nulls
assert self.get_mt_m.call_count == 10 - nulls
assert self.log_m.call_count == nulls

def test_invalid_type(self):
entry = BackupEntry("<name>", "single-file", None, "<folder-id>")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_init_min(self):
assert be.name == "<name>"
assert be.type == EntryType.single_file
assert be.root_path == "<root-path>"
assert be.folder is None
assert be.folder is "root"
assert be.zip is False
assert be.zipname is None
assert be.filter == "."
Expand Down
Loading