diff --git a/.travis.yml b/.travis.yml index d8268aa..f014078 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,8 @@ services: - docker python: - - "3.6" + - 2.7 + - 3.6 before_script: - cd tests && docker-compose up --build -d @@ -14,7 +15,7 @@ before_script: # commands to run tests script: - docker-compose run --rm python-api python3 -m pytest --cov . --cov-report xml --cov-report term .. - - docker-compose run --rm python-api python ../example.py + - docker-compose run --rm python-api python ../examples/user_management.py after_script: - codecov diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 0000000..cb173f4 --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,29 @@ + +This is Python wrapper for NextCloud's API had been made by… + +Main contributors +````````````````` +- Matěj Týč `@matejak ` active from 2018 +- Danil Topchiy `@danil-topchiy ` active 2018-2019 + + +Refactoring contributors +```````````````````````` +- Matěj Týč `@matejak ` active from 2018 +- Danil Topchiy `@danil-topchiy ` active 2018-2019 +- luffah `@luffah ` active 2021 + + +Original code +````````````` +The repo was originally nammed NEXT-OCS-API-forPy in 2017 +- どまお `@Dosugamea ` + + +Patches +``````` +- Hendrik Eckardt `@heck-gd ` +- Anonymous `@xr-muc ` +- tthmmts `@tthmmts ` +- Dylann Cordel `@webu ` `@DylannCordel ` +- scouderc `@scouderc ` diff --git a/README.md b/README.md index a49394a..591a835 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,13 @@ Python wrapper for NextCloud api -This is Python wrapper for NextCloud's API. With it you can manage your NextCloud instances from Python scripts. -Tested with python 3.7, NextCloud 14. +This is Python wrapper for NextCloud's API. +With it you can manage your NextCloud instances from Python scripts. + +Tested with : + * NextCloud 14, python 3.7 (automated test) + * NextCloud 20, python 2.7 + * NextCloud 20, python 3.6 ## FAQ @@ -23,7 +28,7 @@ Check out the corresponding [nextcloud API documentation](https://nextcloud-api. #### How do I use it? -Check out [the simple example](example.py) and also check out the [unit tests directory](tests). +Check out [examples](examples) and also check out the [unit tests directory](tests). #### What do I do if it doesn't work? diff --git a/api_implementation.json b/api_implementation.json index f492d02..69efd02 100644 --- a/api_implementation.json +++ b/api_implementation.json @@ -21,6 +21,12 @@ "implementation status": "OK", "date_last_checked": "2019-02-02" }, + { + "name": "Tags API", + "url": "https://doc.owncloud.com/server/developer_manual/webdav_api/tags.html", + "implementation status": "OK", + "date_last_checked": "2021-05-03" + }, { "name": "Activity app API", "url": "https://github.com/nextcloud/activity", diff --git a/docs/source/examples.rst b/docs/source/examples.rst index b759f98..87ab964 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -4,5 +4,5 @@ Examples Users API methods ----------------- -.. include:: ../../example.py +.. include:: ../../examples/user_management.py :literal: diff --git a/example.py b/examples/user_management.py similarity index 100% rename from example.py rename to examples/user_management.py diff --git a/requirements.in b/requirements.in index 9e2e930..571af75 100644 --- a/requirements.in +++ b/requirements.in @@ -1,2 +1,3 @@ -requests -pytest \ No newline at end of file +requests>=2.0.1 +pytest>=4.6 +six diff --git a/requirements.txt b/requirements.txt index 6023ab7..21b9b5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,15 +4,5 @@ # # pip-compile --output-file requirements.txt requirements.in # -atomicwrites==1.2.1 # via pytest -attrs==18.2.0 # via pytest -certifi==2018.11.29 # via requests -chardet==3.0.4 # via requests -idna==2.7 # via requests -more-itertools==4.3.0 # via pytest -pluggy==0.8.0 # via pytest -py==1.7.0 # via pytest -pytest==4.0.1 +pytest==4.6.1 requests==2.20.1 -six==1.11.0 # via more-itertools, pytest -urllib3==1.24.1 # via requests diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..c94a57e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,69 @@ +[metadata] + +name = nextcloud +version = 0.2 +description= Python wrapper for NextCloud api +long_description = file: README.md +keywords = requests, api, wrapper, nextcloud, owncloud +license = GPLv3 + +url = https://nextcloud-api.readthedocs.io +project_urls = + Documentation = https://nextcloud-api.readthedocs.io + Source = https://github.com/EnterpriseyIntranet/nextcloud-API + +author = EnterpriseyIntranet +author_email = matej.tyc@gmail.com + +platforms = any + +classifiers = + Programming Language :: Python + Programming Language :: Python :: 2 + Programming Language :: Python :: 3 + Development Status :: 4 - Beta + Environment :: Web Environment + Intended Audience :: Developers + Topic :: Internet :: WWW/HTTP + Topic :: Software Development :: Libraries :: Python Modules + License :: OSI Approved :: GNU General Public License (GPL) + Operating System :: OS Independent + +[options] +zip_safe = False +include_package_data = True + +install_requires = + requests >=2.0.1, <3.0 + six + +[options.extras_require] +tests = + pytest >= 5.2 + +#[tool:pytest] +#addopts = --verbose --pylint-rcfile=setup.cfg +# --pylint --pycodestyle + +[pycodestyle] +max-line-length=120 +ignore=E4,E7,W3 + +# Configuration for pylint +[MASTER] +ignore=CVS +good-names=logger,e,i,j,n,m,f,_ + +[MESSAGES CONTROL] +disable=all +enable=unused-import, + fixme, + useless-object-inheritance, + unused-variable, + unused-argument, + unexpected-keyword-arg, + string, + unreachable, + invalid-name, + logging-not-lazy, + unnecesary-pass diff --git a/setup.py b/setup.py index f008c9a..b0a84c8 100644 --- a/setup.py +++ b/setup.py @@ -1,29 +1,31 @@ -import os -import setuptools +""" +Setup script + +Usage : + python setup.py build + python setup.py install -SETUPDIR = os.path.dirname(__file__) -PKGDIR = os.path.join(SETUPDIR, 'src') +For repository admin: + python setup.py publish -with open(os.path.join(SETUPDIR, 'README.md'), 'r') as f: - long_description = f.read() +For testing: + test.sh +""" +import os +import sys +from setuptools import setup, find_packages +# 'setup.py publish' shortcut. +if sys.argv[-1] == 'publish': + # see https://twine.readthedocs.io/en/latest/ + os.system('%s %s sdist bdist_wheel' % (sys.executable, sys.argv[0])) + os.system('twine upload dist/*') + sys.exit() -setuptools.setup( - name='nextcloud', - version='0.0.1', - author='EnterpriseyIntranet', - description="Python wrapper for NextCloud api", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/EnterpriseyIntranet/nextcloud-API", - packages=setuptools.find_packages(PKGDIR), - include_package_data=True, - install_requires=['requests'], - package_dir={'': 'src'}, - classifiers=[ - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'License :: OSI Approved :: GNU General Public License (GPL)', - "Operating System :: OS Independent", - ], +setup( + # see setup.cfg + # some variables are defined here for retro compat with setuptools >= 33 + package_dir = {'': 'src'}, + packages=find_packages(where=r'./src'), + long_description_content_type = 'text/markdown' ) diff --git a/src/nextcloud/NextCloud.py b/src/nextcloud/NextCloud.py deleted file mode 100644 index c53b03f..0000000 --- a/src/nextcloud/NextCloud.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -from .requester import OCSRequester, WebDAVRequester -from .api_wrappers import OCS_API_CLASSES, WEBDAV_CLASS - - -class NextCloud(object): - - def __init__(self, endpoint, user, password, json_output=True): - self.user = user - self.query_components = [] - - ocs_requester = OCSRequester(endpoint, user, password, json_output) - webdav_requester = WebDAVRequester(endpoint, user, password) - - self.functionality_classes = [api_class(ocs_requester) for api_class in OCS_API_CLASSES] - self.functionality_classes.append(WEBDAV_CLASS(webdav_requester, json_output=json_output)) - - for functionality_class in self.functionality_classes: - for potential_method in dir(functionality_class): - if( - potential_method.startswith('_') - or not callable(getattr(functionality_class, potential_method)) - ): - continue - setattr(self, potential_method, getattr(functionality_class, potential_method)) - - def get_connection_issues(self): - """ - Return Falsy falue if everything is OK, or string representing - the connection problem (bad hostname, password, whatever) - """ - try: - response = self.get_user(self.user) - except Exception as e: - return str(e) - - if not response.is_ok: - return response.meta["message"] diff --git a/src/nextcloud/__init__.py b/src/nextcloud/__init__.py index 61305e2..2c704d1 100644 --- a/src/nextcloud/__init__.py +++ b/src/nextcloud/__init__.py @@ -1,3 +1,90 @@ # -*- coding: utf-8 -*- -from .NextCloud import NextCloud +from .session import Session +from .api_wrappers import API_WRAPPER_CLASSES + +class NextCloud(object): + """ + A NextCloud/OwnCloud client. + Provides cookie persistence, connection-pooling, and configuration. + + Basic Usage:: + + >>> from nextcloud import nextcloud + >>> s = Nextcloud('https://nextcloud.mysite.com', user='admin', password='admin') + >>> # or using use another auth method + >>> from requests.auth import HTTPBasicAuth + >>> s = Nextcloud('https://nextcloud.mysite.com', auth=HTTPBasicAuth('admin', 'admin')) + >>> # + >>> s.list_folders('/') + + + For a persistent session:: + >>> s.login() # if no user, password, or auth in parameter use existing + >>> # some actions # + >>> s.logout() + + Or as a context manager:: + + >>> with Nextcloud('https://nextcloud.mysite.com', + ... user='admin', password='admin') as nxc: + ... # some actions # + """ + + def __init__(self, endpoint=None, + user=None, password=None, json_output=True, auth=None, + session_kwargs=None, + session=None): + self.session = session or Session( + url=endpoint, user=user, password=password, auth=auth, + session_kwargs=session_kwargs + ) + self.json_output = json_output + for functionality_class in API_WRAPPER_CLASSES: + functionality_instance = functionality_class(self) + for potential_method in dir(functionality_instance): + if not potential_method.startswith('_'): + if not callable(getattr(functionality_instance, potential_method)): + pass + else: + setattr(self, potential_method, getattr( + functionality_instance, potential_method)) + + @property + def user(self): + return self.session.user + + @property + def url(self): + return self.session.url + + def __enter__(self): + self.login() + return self + + def __exit__(self, *args): + self.logout() + + def login(self, user=None, password=None, auth=None): + self.logout() + return self.session.login(user=user, password=password, auth=auth, + client=self) + + def with_attr(self, **kwargs): + if 'auth' in kwargs or 'endpoint' in kwargs or 'endpoint' in kwargs: + return self.with_auth(**kwargs) + if 'session_kwargs' in kwargs: + return self.with_auth(auth=self.session.auth, **kwargs) + return self.__class__(session=self.session, **kwargs) + + def with_auth(self, auth=None, **kwargs): + init_kwargs = {'session_kwargs': self.session._session_kwargs, + 'json_output': self.json_output} + init_kwargs.update(kwargs) + if 'endpoint' in kwargs: + return self.__class__(auth=auth, **init_kwargs) + return self.__class__(endpoint=self.session.url, auth=auth, **init_kwargs) + + def logout(self): + if self.session.session: + self.session.logout() diff --git a/src/nextcloud/api_wrappers/__init__.py b/src/nextcloud/api_wrappers/__init__.py index 53c2b0f..5241ba5 100644 --- a/src/nextcloud/api_wrappers/__init__.py +++ b/src/nextcloud/api_wrappers/__init__.py @@ -1,4 +1,5 @@ -# -*- coding: utf-8 -*- +from nextcloud.base import API_WRAPPER_CLASSES + from .activity import Activity from .apps import Apps from .capabilities import Capabilities @@ -10,8 +11,4 @@ from .user import User from .user_ldap import UserLDAP from .webdav import WebDAV - -OCS_API_CLASSES = [Activity, Apps, Capabilities, FederatedCloudShare, Group, GroupFolders, - Notifications, Share, User, UserLDAP] - -WEBDAV_CLASS = WebDAV +from .systemtags import SystemTags, SystemTagsRelation diff --git a/src/nextcloud/api_wrappers/activity.py b/src/nextcloud/api_wrappers/activity.py index 43287dc..8cd930e 100644 --- a/src/nextcloud/api_wrappers/activity.py +++ b/src/nextcloud/api_wrappers/activity.py @@ -1,10 +1,16 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Activity API wrapper +See https://github.com/nextcloud/activity + https://doc.owncloud.com/server/user_manual/apps/activity.html + https://doc.owncloud.com/server/developer_manual/core/apis/ +""" +from nextcloud import base -class Activity(WithRequester): +class Activity(base.OCSv2ApiWrapper): + """ Activity API wrapper """ API_URL = "/ocs/v2.php/apps/activity/api/v2/activity" - SUCCESS_CODE = 200 def get_activities(self, since=None, limit=None, object_type=None, object_id=None, sort=None): """ @@ -24,7 +30,7 @@ def get_activities(self, since=None, limit=None, object_type=None, object_id=Non (Default: desc) Returns: - + requester response """ params = dict( since=since, diff --git a/src/nextcloud/api_wrappers/apps.py b/src/nextcloud/api_wrappers/apps.py index 2a904c2..8684006 100644 --- a/src/nextcloud/api_wrappers/apps.py +++ b/src/nextcloud/api_wrappers/apps.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Apps API wrapper +See https://docs.nextcloud.com/server/14/admin_manual/configuration_user/instruction_set_for_users.html + https://docs.nextcloud.com/server/14/admin_manual/configuration_user/user_provisioning_api.html + https://doc.owncloud.com/server/developer_manual/core/apis/provisioning-api.html +""" +from nextcloud import base -class Apps(WithRequester): +class Apps(base.ProvisioningApiWrapper): API_URL = "/ocs/v1.php/cloud/apps" - SUCCESS_CODE = 100 def get_apps(self, filter=None): """ diff --git a/src/nextcloud/api_wrappers/capabilities.py b/src/nextcloud/api_wrappers/capabilities.py index 60a7333..d790247 100644 --- a/src/nextcloud/api_wrappers/capabilities.py +++ b/src/nextcloud/api_wrappers/capabilities.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Capabilities API wrapper +See https://docs.nextcloud.com/server/14/developer_manual/client_apis/OCS/index.html#capabilities-api + https://doc.owncloud.com/server/developer_manual/core/apis/ocs-capabilities.html +""" +from nextcloud import base -class Capabilities(WithRequester): +class Capabilities(base.OCSv1ApiWrapper): + """ Capabilities API wrapper """ API_URL = "/ocs/v1.php/cloud/capabilities" - SUCCESS_CODE = 100 def get_capabilities(self): """ Obtain capabilities provided by the Nextcloud server and its apps """ diff --git a/src/nextcloud/api_wrappers/federated_cloudshares.py b/src/nextcloud/api_wrappers/federated_cloudshares.py index 5cb1edf..9644d14 100644 --- a/src/nextcloud/api_wrappers/federated_cloudshares.py +++ b/src/nextcloud/api_wrappers/federated_cloudshares.py @@ -1,11 +1,15 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Federated Cloud Share wrapper. +See https://doc.owncloud.com/server/developer_manual/core/apis/ocs-share-api.html#federated-cloud-shares +""" +from nextcloud import base -class FederatedCloudShare(WithRequester): +class FederatedCloudShare(base.OCSv2ApiWrapper): + """ Federated Cloud Share wrapper """ API_URL = "/ocs/v2.php/apps/files_sharing/api/v1" FEDERATED = "remote_shares" - SUCCESS_CODE = 200 def get_federated_url(self, additional_url=""): if additional_url: diff --git a/src/nextcloud/api_wrappers/group.py b/src/nextcloud/api_wrappers/group.py index 91acdcd..836197a 100644 --- a/src/nextcloud/api_wrappers/group.py +++ b/src/nextcloud/api_wrappers/group.py @@ -1,10 +1,16 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Group API wrapper +See https://docs.nextcloud.com/server/14/admin_manual/configuration_user/instruction_set_for_groups.html + https://docs.nextcloud.com/server/14/admin_manual/configuration_user/user_provisioning_api.html + https://doc.owncloud.com/server/developer_manual/core/apis/provisioning-api.html +""" +from nextcloud.base import ProvisioningApiWrapper -class Group(WithRequester): +class Group(ProvisioningApiWrapper): + """ Group API wrapper """ API_URL = "/ocs/v1.php/cloud/groups" - SUCCESS_CODE = 100 def get_groups(self, search=None, limit=None, offset=None): """ diff --git a/src/nextcloud/api_wrappers/group_folders.py b/src/nextcloud/api_wrappers/group_folders.py index 448c4db..ef7d641 100644 --- a/src/nextcloud/api_wrappers/group_folders.py +++ b/src/nextcloud/api_wrappers/group_folders.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +GroupFolders API wrapper +See https://github.com/nextcloud/groupfolders + https://apps.nextcloud.com/apps/groupfolders +""" +from nextcloud import base -class GroupFolders(WithRequester): +class GroupFolders(base.ProvisioningApiWrapper): + """ GroupFolders API wrapper """ API_URL = "/apps/groupfolders/folders" - SUCCESS_CODE = 100 def get_group_folders(self): """ Return a list of call configured folders and their settings - Returns: - + :returns: resquester response """ return self.requester.get() @@ -19,11 +23,8 @@ def get_group_folder(self, fid): """ Return a specific configured folder and it's settings - Args: - fid (int/str): group folder id - - Returns: - + :param fid (int/str): group folder id + :returns: resquester response """ return self.requester.get(fid) @@ -31,11 +32,8 @@ def create_group_folder(self, mountpoint): """ Create a new group folder - Args: - mountpoint (str): name for the new folder - - Returns: - + :param mountpoint (str): name for the new folder + :returns: resquester response """ return self.requester.post(data={"mountpoint": mountpoint}) @@ -43,11 +41,8 @@ def delete_group_folder(self, fid): """ Delete a group folder - Args: - fid (int/str): group folder id - - Returns: - + :param fid (int/str): group folder id + :returns: resquester response """ return self.requester.delete(fid) @@ -55,12 +50,8 @@ def grant_access_to_group_folder(self, fid, gid): """ Give a group access to a folder - Args: - fid (int/str): group folder id - gid (str): group to share with id - - Returns: - + :param fid (int/str): group folder id + :returns: resquester response """ url = "/".join([str(fid), "groups"]) return self.requester.post(url, data={"group": gid}) @@ -69,12 +60,9 @@ def revoke_access_to_group_folder(self, fid, gid): """ Remove access from a group to a folder - Args: - fid (int/str): group folder id - gid (str): group id - - Returns: - + :param fid (int/str): group folder id + :param gid (str): group id + :returns: resquester response """ url = "/".join([str(fid), "groups", gid]) return self.requester.delete(url) @@ -83,13 +71,10 @@ def set_permissions_to_group_folder(self, fid, gid, permissions): """ Set the permissions a group has in a folder - Args: - fid (int/str): group folder id - gid (str): group id - permissions (int): The new permissions for the group as attribute of Permission class - - Returns: - + :param fid (int/str): group folder id + :param gid (str): group id + :param permissions (int): The new permissions for the group as attribute of Permission class + :returns: resquester response """ url = "/".join([str(fid), "groups", gid]) return self.requester.post(url=url, data={"permissions": permissions}) @@ -98,12 +83,9 @@ def set_quota_of_group_folder(self, fid, quota): """ Set the quota for a folder in bytes - Args: - fid (int/str): group folder id - quota (int/str): The new quota for the folder in bytes, user -3 for unlimited - - Returns: - + :param fid (int/str): group folder id + :param quota (int/str): The new quota for the folder in bytes, user -3 for unlimited + :returns: resquester response """ url = "/".join([str(fid), "quota"]) return self.requester.post(url, {"quota": quota}) @@ -112,12 +94,9 @@ def rename_group_folder(self, fid, mountpoint): """ Change the name of a folder - Args: - fid (int/str): group folder id - mountpoint (str): The new name for the folder - - Returns: - + :param fid (int/str): group folder id + :param mountpoint (str): name for the new folder + :returns: resquester response """ url = "/".join([str(fid), "mountpoint"]) return self.requester.post(url=url, data={"mountpoint": mountpoint}) diff --git a/src/nextcloud/api_wrappers/notifications.py b/src/nextcloud/api_wrappers/notifications.py index 402181c..f4974ff 100644 --- a/src/nextcloud/api_wrappers/notifications.py +++ b/src/nextcloud/api_wrappers/notifications.py @@ -1,10 +1,16 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +Notification API wrapper +See https://github.com/nextcloud/notifications/ + https://doc.owncloud.com/server/developer_manual/core/apis/ocs-notification-endpoint-v1.html + https://docs.nextcloud.com/server/latest/developer_manual/client_apis/OCS/ocs-api-overview.html?highlight=notification#notifications +""" +from nextcloud import base -class Notifications(WithRequester): +class Notifications(base.OCSv2ApiWrapper): + """ Notification API wrapper """ API_URL = "/ocs/v2.php/apps/notifications/api/v2/notifications" - SUCCESS_CODE = 200 def get_notifications(self): """ Get list of notifications for a logged in user """ @@ -14,11 +20,8 @@ def get_notification(self, notification_id): """ Get single notification by id for a user - Args: - notification_id (int): Notification id - - Returns: - + :param notification_id (int): Notification id + :returns: requester response """ return self.requester.get(url=notification_id) @@ -26,11 +29,8 @@ def delete_notification(self, notification_id): """ Delete single notification by id for a user - Args: - notification_id (int): Notification id - - Returns: - + :param notification_id (int): Notification id + :returns: requester response """ return self.requester.delete(url=notification_id) diff --git a/src/nextcloud/api_wrappers/share.py b/src/nextcloud/api_wrappers/share.py index daa12f5..a295ccc 100644 --- a/src/nextcloud/api_wrappers/share.py +++ b/src/nextcloud/api_wrappers/share.py @@ -1,11 +1,17 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester, ShareType +""" +Share API wrapper +See https://docs.nextcloud.com/server/latest/developer_manual/client_apis/OCS/ocs-share-api.html + https://doc.owncloud.com/server/developer_manual/core/apis/ocs-share-api.html +""" +from nextcloud import base +from nextcloud.codes import ShareType -class Share(WithRequester): +class Share(base.OCSv2ApiWrapper): + """ Share API wrapper """ API_URL = "/ocs/v2.php/apps/files_sharing/api/v1" LOCAL = "shares" - SUCCESS_CODE = 200 def get_local_url(self, additional_url=""): if additional_url: @@ -50,7 +56,7 @@ def get_shares_from_path(self, path, reshares=None, subfiles=None): defines a folder Returns: - + requester response """ url = self.get_local_url() params = { @@ -69,6 +75,7 @@ def get_share_info(self, sid): sid (int): share id Returns: + requester response """ return self.requester.get(self.get_local_url(sid)) @@ -89,7 +96,7 @@ def create_share( permissions (int): sum of selected Permission attributes Returns: - + requester response """ if not self.validate_share_parameters(path, share_type, share_with): return False @@ -117,7 +124,7 @@ def delete_share(self, sid): sid (str): share id Returns: - + requester response """ return self.requester.delete(self.get_local_url(sid)) @@ -134,7 +141,7 @@ def update_share(self, sid, expire_date (str): set an expire date for public link shares. Format: ‘YYYY-MM-DD’ Returns: - + requester response """ params = dict( permissions=permissions, diff --git a/src/nextcloud/api_wrappers/systemtags.py b/src/nextcloud/api_wrappers/systemtags.py new file mode 100644 index 0000000..fed6ad9 --- /dev/null +++ b/src/nextcloud/api_wrappers/systemtags.py @@ -0,0 +1,315 @@ +# -*- coding: utf-8 -*- +""" +SystemTags API wrapper +See https://doc.owncloud.com/server/developer_manual/webdav_api/tags.html +""" +import json +from nextcloud.base import WebDAVApiWrapper +from nextcloud.common.collections import PropertySet +from nextcloud.common.properties import Property as Prop +from nextcloud.api_wrappers import webdav + + +class Tag(PropertySet): + """ Define a Tag properties""" + _attrs = [ + Prop('oc:id'), + Prop('oc:display-name', json='name', default='default_tag_name'), + Prop('oc:user-visible', json='userVisible', default=True), + Prop('oc:can-assign', json='canAssign', default=True), + Prop('oc:user-assignable', json='userAssignable', default=True) + ] + + def __repr__(self): + add_info = (' %s' % repr(self.display_name)) if hasattr( + self, 'display_name') else '' + return super(Tag, self).__repr__(add_info=add_info) + + def get_related_files(self, path=''): + """ + Get files related to current tag + :param path: (optionnal) a path to search in + """ + _id = int(self.id) + ret = self._wrapper.client.fetch_files_with_filter( + path=path, + filter_rules={'oc': {'systemtag': _id}} + ) + return ret.data or [] + + def delete(self): + """ + Delete current tag + + :returns: True if success + """ + _id = int(self.id) + ret = self._wrapper.delete_systemtag(tag_id=_id) + return ret.is_ok + + +class File(webdav.File): + + def _get_file_kwargs(self): + kwargs = {} + if not getattr(self, 'file_id', False): + kwargs['path'] = self._get_remote_path() + else: + kwargs['file_id'] = self.file_id + return kwargs + + def get_tags(self): + """ + Get tags related to current file + :returns : list + """ + kwargs = self._get_file_kwargs() + return self._wrapper.client.get_systemtags_relation(**kwargs) + + def add_tag(self, **kwargs): + """ + Assign tag to the current file + :param tag_id: tag id + :param tag_name: tag name (if tag_id in not provided) + :returns : False if failure + """ + kwargs.update(self._get_file_kwargs()) + resp = self._wrapper.client.add_systemtags_relation(**kwargs) + return resp.is_ok + + def remove_tag(self, **kwargs): + """ + Unassign tag to the current file + :param tag_id: tag id + :param tag_name: tag name (if tag_id in not provided) + :returns : False if failure + """ + kwargs.update(self._get_file_kwargs()) + resp = self._wrapper.client.remove_systemtags_relation(**kwargs) + return resp.is_ok + + +webdav.File = File + +class SystemTags(WebDAVApiWrapper): + """ SystemTags API wrapper """ + API_URL = '/remote.php/dav/systemtags' + + @classmethod + def _get_tags_from_response(cls, ret, one=False): + if ret.data: + ret = ret.data + if ret[0].href.endswith('/'): + ret = ret[1:] + else: + ret = [] + if one: + return ret[0] if ret else None + return ret + + def get_systemtags(self): + """ + Get list of all tags + + :returns: list + """ + return self._get_tags_from_response( + self.fetch_systemtags(json_output=False) + ) + + def get_systemtag(self, name): + """ + Return a nammed tag + + :returns: Tag + """ + return self._get_tags_from_response( + self.fetch_systemtag(name, json_output=False), + one=True + ) + + def fetch_systemtag(self, name, fields=None, json_output=None): + """ + Get attributes of a nammed tag + + :param name (str): tag name + :param fields (str): field names + :returns: requester response with list in data + """ + if not fields: + fields = Tag._fields + resp = self.requester.propfind( + data=Tag.build_xml_propfind(fields={ + 'oc': ['display-name'] + fields + })) + if json_output is None: + json_output = self.json_output + return Tag.from_response(resp, wrapper=self, + json_output=json_output, + init_attrs=True, + filtered=lambda t: t.display_name == name) + + def fetch_systemtags(self, json_output=None): + """ + List of all tags + + :returns: requester response with list in data + """ + resp = self.requester.propfind( + data=Tag.build_xml_propfind(use_default=True) + ) + if json_output is None: + json_output = self.json_output + return Tag.from_response(resp, wrapper=self, + json_output=json_output) + + def create_systemtag(self, name, **kwargs): + """ + Create a new system tag from name. + + :param name: tag name + :returns: requester response with tag id as data + """ + data = Tag.default_get(display_name=name, **kwargs) + resp = self.requester.post( + data=json.dumps(data), + headers={ + 'Content-Type': 'application/json' + }) + if resp.is_ok: + resp.data = int( + resp.raw.headers['Content-Location'].split('/')[(-1)]) + return resp + + def delete_systemtag(self, name=None, tag_id=None): + """ + Delete systemtag + + :param name (str): tag name, not required it tag_id is provided + :tag_id (int): tag id, not required if name is provided + + :returns: requester response + """ + if not tag_id: + resp = self.fetch_systemtag(name, ['id'], json_output=False) + if resp.data: + tag_id = resp.data[0].id + if not tag_id: # lint only + return resp + resp = self.requester.delete(url=(str(tag_id))) + return resp + + +class SystemTagsRelation(WebDAVApiWrapper): + """ SystemTagsRelation API wrapper """ + API_URL = '/remote.php/dav/systemtags-relations/files' + + def _get_fileid_from_path(self, path): + """ Tricky function to fetch file """ + resp = self.client.get_file_property(path, 'fileid') + _id = None + if resp.data: + _id = int(resp.data) + return _id + + def _get_systemtag_id_from_name(self, name): + resp = self.client.fetch_systemtag(name, ['id'], json_output=False) + tag_id = None + if resp.data: + tag_id = int(resp.data[0].id) + return tag_id + + def _default_get_file_id(self, vals): + path = vals.get('path', None) + if not path: + raise ValueError('Insufficient infos about the file') + return self._get_fileid_from_path(path) + + def _default_get_tag_id(self, vals): + tag_name = vals.get('tag_name', None) + if not tag_name: + raise ValueError('Insufficient infos about the tag') + return self._get_systemtag_id_from_name(tag_name) + + def get_systemtags_relation(self, file_id=None, **kwargs): + """ + Get all tags from a given file/folder + + :param file_id (int): file id found from file object + :param path (str): if no file_id provided, path to file/folder + + :returns: requester response with list in data + """ + return SystemTags._get_tags_from_response( + self.fetch_systemtags_relation(file_id=file_id, + json_output=False, **kwargs) + ) + + def fetch_systemtags_relation(self, file_id=None, json_output=None, **kwargs): + """ + Get all tags from a given file/folder + + :param file_id (int): file id found from file object + :param path (str): if no file_id provided, path to file/folder + + :returns: requester response with list in data + """ + file_id, = self._arguments_get(['file_id'], dict(file_id=file_id, + **kwargs)) + data = Tag.build_xml_propfind(use_default=True) + resp = self.requester.propfind(additional_url=file_id, data=data) + return Tag.from_response(resp, + json_output=( + self.json_output if + json_output is None else json_output) + ) + + def remove_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): + """ + Remove a tag from a given file/folder + + :param file_id (int): id found in file object + :param tag_id (int): id found in tag object + :param path (str): if unknown file_id, path to file/folder + :param tag_name (str): if unknown tag_id, tag_name to search or create + + :returns: requester response + """ + file_id, tag_id = self._arguments_get([ + 'file_id', 'tag_id'], dict(file_id=file_id, tag_id=tag_id, **kwargs)) + if not file_id: + raise ValueError('No file found') + if not tag_id: + raise ValueError('No tag found (%s)' % + kwargs.get('tag_name', None)) + resp = self.requester.delete(url=('{}/{}'.format(file_id, tag_id))) + return resp + + def add_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): + """ + Add a tag from a given file/folder + + :param file_id (int): id found in file object + :param tag_id (int): id found in tag object + :param path (str): if unknown file_id, path to file/folder + :param tag_name (str): if unknown tag_id, tag_name to search or create + + :returns: requester response + """ + file_id, tag_id = self._arguments_get([ + 'file_id', 'tag_id'], locals()) + if not file_id: + raise ValueError('No file found') + if not tag_id: + data = Tag.default_get( + display_name=kwargs.get('tag_name'), **kwargs) + resp = self.requester.post( + url=file_id, + data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + # resp = self.client.create_systemtag(kwargs['tag_name']) + # if not resp.is_ok: + return resp + # tag_id = resp.data + resp = self.requester.put(url='{}/{}'.format(file_id, tag_id)) + return resp diff --git a/src/nextcloud/api_wrappers/user.py b/src/nextcloud/api_wrappers/user.py index 8031513..3458623 100644 --- a/src/nextcloud/api_wrappers/user.py +++ b/src/nextcloud/api_wrappers/user.py @@ -1,10 +1,16 @@ # -*- coding: utf-8 -*- -from nextcloud.base import WithRequester +""" +User API wrapper +See https://docs.nextcloud.com/server/14/admin_manual/configuration_user/instruction_set_for_users.html + https://docs.nextcloud.com/server/14/admin_manual/configuration_user/user_provisioning_api.html + https://doc.owncloud.com/server/developer_manual/core/apis/provisioning-api.html +""" +from nextcloud import base -class User(WithRequester): +class User(base.ProvisioningApiWrapper): + """ User API wrapper """ API_URL = "/ocs/v1.php/cloud/users" - SUCCESS_CODE = 100 def add_user(self, uid, passwd): """ @@ -12,7 +18,7 @@ def add_user(self, uid, passwd): :param uid: str, uid of new user :param passwd: str, password of new user - :return: + :returns: resquester response """ msg = {'userid': uid, 'password': passwd} return self.requester.post("", msg) @@ -24,7 +30,7 @@ def get_users(self, search=None, limit=None, offset=None): :param search: string, optional search string :param limit: int, optional limit value :param offset: int, optional offset value - :return: + :returns: resquester response """ params = { 'search': search, @@ -33,14 +39,28 @@ def get_users(self, search=None, limit=None, offset=None): } return self.requester.get(params=params) - def get_user(self, uid): + def get_user(self, uid=None): """ Retrieve information about a single user - :param uid: str, uid of user - :return: + :param uid: str, uid of user (default: current user) + :returns: resquester response + """ + return self.requester.get(uid or self.client.user) + + def get_connection_issues(self): + """ + Return Falsy falue if everything is OK, or string representing + the connection problem (bad hostname, password, whatever) """ - return self.requester.get("{uid}".format(uid=uid)) + try: + response = self.get_user() + except Exception as e: + return str(e) + + if not response.is_ok: + return response.meta['message'] + return None def edit_user(self, uid, what, value): """ @@ -52,30 +72,24 @@ def edit_user(self, uid, what, value): :param uid: str, uid of user :param what: str, the field to edit :param value: str, the new value for the field - :return: + :returns: resquester response """ - what_to_key_map = dict( - email="email", quota="quota", phone="phone", address="address", website="website", - twitter="twitter", displayname="displayname", password="password", - ) - assert what in what_to_key_map, ( + keys = [ + 'email', 'quota', 'phone', 'address', 'website', 'twitter', + 'displayname', 'password' + ] + assert what in keys, ( "You have chosen to edit user's '{what}', but you can choose only from: {choices}" - .format(what=what, choices=", ".join(what_to_key_map.keys())) - ) - - url = "{uid}".format(uid=uid) - msg = dict( - key=what_to_key_map[what], - value=value, + .format(what=what, choices=(keys())) ) - return self.requester.put(url, msg) + return self.requester.put(uid, dict(key=what, value=value)) def disable_user(self, uid): """ Disable a user on the Nextcloud server so that the user cannot login anymore :param uid: str, uid of user - :return: + :returns: resquester response """ return self.requester.put("{uid}/disable".format(uid=uid)) @@ -84,7 +98,7 @@ def enable_user(self, uid): Enable a user on the Nextcloud server so that the user can login again :param uid: str, uid of user - :return: + :returns: resquester response """ return self.requester.put("{uid}/enable".format(uid=uid)) @@ -93,7 +107,7 @@ def delete_user(self, uid): Delete a user from the Nextcloud server :param uid: str, uid of user - :return: + :returns: resquester response """ return self.requester.delete("{uid}".format(uid=uid)) @@ -103,7 +117,7 @@ def add_to_group(self, uid, gid): :param uid: str, uid of user :param gid: str, name of group - :return: + :returns: resquester response """ url = "{uid}/groups".format(uid=uid) msg = {'groupid': gid} @@ -115,7 +129,7 @@ def remove_from_group(self, uid, gid): :param uid: str, uid of user :param gid: str, name of group - :return: + :returns: resquester response """ url = "{uid}/groups".format(uid=uid) msg = {'groupid': gid} @@ -127,7 +141,7 @@ def create_subadmin(self, uid, gid): :param uid: str, uid of user :param gid: str, name of group - :return: + :returns: resquester response """ url = "{uid}/subadmins".format(uid=uid) msg = {'groupid': gid} @@ -139,7 +153,7 @@ def remove_subadmin(self, uid, gid): :param uid: str, uid of user :param gid: str, name of group - :return: + :returns: resquester response """ url = "{uid}/subadmins".format(uid=uid) msg = {'groupid': gid} @@ -150,7 +164,7 @@ def get_subadmin_groups(self, uid): Get the groups in which the user is a subadmin :param uid: str, uid of user - :return: + :returns: resquester response """ url = "{uid}/subadmins".format(uid=uid) return self.requester.get(url) @@ -160,7 +174,7 @@ def resend_welcome_mail(self, uid): Trigger the welcome email for this user again :param uid: str, uid of user - :return: + :returns: resquester response """ url = "{uid}/welcome".format(uid=uid) return self.requester.post(url) diff --git a/src/nextcloud/api_wrappers/user_ldap.py b/src/nextcloud/api_wrappers/user_ldap.py index 597869d..e8314f5 100644 --- a/src/nextcloud/api_wrappers/user_ldap.py +++ b/src/nextcloud/api_wrappers/user_ldap.py @@ -1,12 +1,16 @@ # -*- coding: utf-8 -*- +""" +User LDAP wrapper +See https://docs.nextcloud.com/server/14/admin_manual/configuration_user/user_auth_ldap_api.html + https://doc.owncloud.com/server/10.7/admin_manual/configuration/server/occ_commands/app_commands/ldap_integration_commands.html +""" import re +from nextcloud import base -from nextcloud.base import WithRequester - -class UserLDAP(WithRequester): +class UserLDAP(base.OCSv2ApiWrapper): + """ User LDAP wrapper """ API_URL = "/ocs/v2.php/apps/user_ldap/api/v1/config" - SUCCESS_CODE = 200 CONFIG_KEYS = [ "ldapHost", @@ -72,15 +76,13 @@ def get_ldap_config_id(self, idx=1): Given the number of the config file, return the corresponding string ID if the configuration exists. - Args: - idx: The index of the configuration. + :param idx: The index of the configuration. If a single configuration exists on the server from the beginning, it is going to have index of 1. - Returns: - Configuration string or None + :returns: Configuration string or None """ - config_id = f"s{idx:02d}" + config_id = 's%02d' % idx config = self.get_ldap_config(config_id) if config.is_ok: return config_id @@ -91,12 +93,10 @@ def get_ldap_lowest_existing_config_id(self, lower_bound=1, upper_bound=10): Given (inclusive) lower and upper bounds, try to guess an existing LDAP config ID that corresponds to an index within those bounds. - Args: - lower_bound: The lowest index of the configuration possible. - upper_bound: The greatest index of the configuration possible. + :param lower_bound: The lowest index of the configuration possible. + :param upper_bound: The greatest index of the configuration possible. - Returns: - Configuration string or None + :returns: Configuration string or None """ for idx in range(lower_bound, upper_bound + 1): config_id = self.get_ldap_config_id(idx) @@ -107,12 +107,10 @@ def get_ldap_config(self, config_id, show_password=None): """ Get all keys and values of the specified LDAP configuration - Args: - config_id (str): User LDAP config id - show_password (int): 0 or 1 whether to return the password in clear text (default 0) - - Returns: + :param config_id (str): User LDAP config id + :param show_password (int): 0 or 1 whether to return the password in clear text (default 0) + :returns: requester response """ params = dict(showPassword=show_password) return self.requester.get(config_id, params=params) @@ -124,12 +122,10 @@ def edit_ldap_config(self, config_id, data): You can find list of all config keys in get_ldap_config method response or in Nextcloud docs - Args: - config_id (str): User LDAP config id - data (dict): config values to update - - Returns: + :param config_id (str): User LDAP config id + :param data (dict): config values to update + :returns: requester response """ prepared_data = {'configData[{}]'.format(key): value for key, value in data.items()} return self.requester.put(config_id, data=prepared_data) @@ -142,8 +138,7 @@ def ldap_cache_flush(self, config_id): This is performed by a fake update of LDAP cache TTL as indicated by - Args: - config_id (str): User LDAP config id + :param config_id (str): User LDAP config id """ cache_val = self.get_ldap_cache_ttl(config_id) self.set_ldap_cache_ttl(config_id, cache_val) @@ -152,11 +147,9 @@ def delete_ldap_config(self, config_id): """ Delete a given LDAP configuration - Args: - config_id (str): User LDAP config id - - Returns: + :param config_id (str): User LDAP config id + :returns: requester response """ return self.requester.delete(config_id) diff --git a/src/nextcloud/api_wrappers/webdav.py b/src/nextcloud/api_wrappers/webdav.py index 64c7017..e0c96d0 100644 --- a/src/nextcloud/api_wrappers/webdav.py +++ b/src/nextcloud/api_wrappers/webdav.py @@ -1,77 +1,280 @@ # -*- coding: utf-8 -*- +""" +WebDav API wrapper +See https://docs.nextcloud.com/server/14/developer_manual/client_apis/WebDAV/index.html + https://doc.owncloud.com/server/developer_manual/webdav_api/search.html + https://docs.nextcloud.com/server/14/developer_manual/client_apis/WebDAV/search.html + +Not implemented yet: + - search feature + - trash + - versions + - chunked file upload +""" +# implementing dav search +#-> add a function to build xml search +# see ../common/simplexml.py and ../common/collections.py import re import os -import pathlib +try: + import pathlib +except ImportError: + import pathlib2 as pathlib import xml.etree.ElementTree as ET - from datetime import datetime -from nextcloud.base import WithRequester +from nextcloud.base import WebDAVApiWrapper +from nextcloud.common.collections import PropertySet +from nextcloud.common.properties import Property as Prop, NAMESPACES_MAP +from nextcloud.common.value_parsing import ( + timestamp_to_epoch_time, + datetime_to_timestamp +) + + +class NextCloudDirectoryNotEmpty(Exception): + """ Exception to raise when you try to remove a folder that is not empty""" + +class NextCloudFileConflict(Exception): + """ Exception to raise when you try to create a File that alreay exists """ + + +class File(PropertySet): + """ + Define properties on a WebDav file/folder + + Additionnally, provide an objective CRUD API + (that probably consume more energy than fetching specific attributes) + + Example : + >>> root = nxc.get_folder() # get root + >>> def _list_rec(d, indent=""): + >>> # list files recursively + >>> print("%s%s%s" % (indent, d.basename(), '/' if d.isdir() else '')) + >>> if d.isdir(): + >>> for i in d.list(): + >>> _list_rec(i, indent=indent+" ") + >>> + >>> _list_rec(root) + """ + + @staticmethod + def _extract_resource_type(file_property): + file_type = list(file_property) + if file_type: + return re.sub('{.*}', '', file_type[0].tag) + return None + + _attrs = [ + Prop('d:getlastmodified'), + Prop('d:getetag'), + Prop('d:getcontenttype'), + Prop('d:resourcetype', parse_xml_value=(lambda p: File._extract_resource_type(p))), + Prop('d:getcontentlength'), + Prop('oc:id'), + Prop('oc:fileid'), + Prop('oc:favorite'), + Prop('oc:comments-href'), + Prop('oc:comments-count'), + Prop('oc:comments-unread'), + Prop('oc:owner-id'), + Prop('oc:owner-display-name'), + Prop('oc:share-types'), + Prop('oc:checksums'), + Prop('oc:size'), + Prop('oc:href'), + Prop('nc:has-preview') + ] + + def isfile(self): + """ say if the file is a file /!\\ ressourcetype property shall be loaded """ + return not self.resource_type + + def isroot(self): + """ say if the file is a directory /!\\ ressourcetype property shall be loaded """ + return not self.get_relative_path().replace('/','') + + def isdir(self): + """ say if the file is a directory /!\\ ressourcetype property shall be loaded """ + return self.resource_type == self.COLLECTION_RESOURCE_TYPE + + def get_relative_path(self): + """ get path relative to user root """ + return self._wrapper.get_relative_path(self.href) + + def _get_remote_path(self, path=None): + _url = self.get_relative_path() + return '/'.join([_url, path]) if path else _url + + def basename(self): + """ basename """ + _path = self._get_remote_path() + return _path.split('/')[-2] if _path.endswith('/') else _path.split('/')[-1] + + def dirname(self): + """ dirname """ + _path = self._get_remote_path() + return '/'.join(_path.split('/')[:-2]) if _path.endswith('/') else '/'.join(_path.split('/')[:-1]) + + def __eq__(self, b): + return self.href == b.href + + # MINIMAL SET OF CRUD OPERATIONS + def get_folder(self, path=None, all_properties=False): + """ + Get folder (see WebDav wrapper) + :param path: if empty list current dir + :param all_properties: fetch all properties (default False) + :returns: a folder (File object) + + Note : To check if sub folder exists, use get_file method + """ + return self._wrapper.get_folder(self._get_remote_path(path), + all_properties=all_properties) + + def get_file(self, path=None, all_properties=False): + """ + Get file (see WebDav wrapper) + :param path: if empty, get current file + :param all_properties: fetch all properties (default False) + :returns: a file or folder (File object) + """ + return self._wrapper.get_file(self._get_remote_path(path), + all_properties=all_properties) + + def list(self, subpath='', filter_rules=None, all_properties=False): + """ + List folder (see WebDav wrapper) + :param subpath: if empty list current dir + :param all_properties: fetch all properties (default False) + :returns: list of Files + """ + if filter_rules: + resp = self._wrapper.fetch_files_with_filter( + path=self._get_remote_path(subpath), + filter_rules=filter_rules + ) + else: + resp = self._wrapper.list_folders( + self._get_remote_path(subpath), + depth=1, + all_properties=all_properties + ) + if resp.is_ok and resp.data: + _dirs = resp.data + # remove current dir + if _dirs[0] == self: + _dirs = _dirs[1:] + return _dirs + return [] + + def upload_file(self, local_filepath, name, timestamp=None): + """ + Upload file (see WebDav wrapper) + :param local_filepath: path of the local file + :param name: name of the new file + :param timestamp (int): timestamp of upload file. If None, get time by local file. + :returns: True if success + """ + resp = self._wrapper.upload_file(local_filepath, + self._get_remote_path(name), + timestamp=timestamp) + return resp.is_ok + + def upload_file_contents(self, file_contents, name=None, timestamp=None): + """ + Upload file content (see WebDav wrapper) + :param file_contents: binary content of the file + :param name: name of the new file (current file if empty) + :param timestamp (int): mtime of upload file + :returns: True if success + """ + resp = self._wrapper.upload_file_contents(file_contents, + self._get_remote_path(name), + timestamp=timestamp) + return resp.is_ok + + + def download(self, name=None, target_dir=None): + """ + Download file (see WebDav wrapper) + :param name: name of the new file + :returns: True if success + """ + path = self._get_remote_path(name) + target_path, _file_info = self._wrapper.download_file(path, + target_dir=target_dir) + assert os.path.isfile(target_path), "Download failed" + return target_path + + def isempty(self): + """ + Say if a folder is emty (always False if not a directory) + :returns: True if current dir is empty + """ + if not self.isdir(): + return False + return not self.list() -class WebDAV(WithRequester): + def delete(self, subpath='', recursive=False): + """ + Delete file or folder (see WebDav wrapper) + :param subpath: if empty, delete current file + :param recursive: delete recursively + :returns: True if success + """ + if recursive: + resp = self._wrapper.delete_path(self._get_remote_path(subpath)) + else: + if subpath: + _file = self.get_file(subpath, all_properties=False) + return _file.delete(recursive=recursive) + if not self.isempty(): + raise NextCloudDirectoryNotEmpty(self.get_relative_path()) + return self.delete(recursive=True) + return resp.is_ok + + +class WebDAV(WebDAVApiWrapper): + """ WebDav API wrapper """ API_URL = "/remote.php/dav/files" - def __init__(self, *args, **kwargs): - super(WebDAV, self).__init__(*args) - self.json_output = kwargs.get('json_output') + def _get_path(self, path): + if path: + return '/'.join([self.client.user, path]).replace('//', '/') + return self.client.user - def list_folders(self, uid, path=None, depth=1, all_properties=False): + def list_folders(self, path=None, depth=1, all_properties=False, + fields=None): """ - Get path files list with files properties for given user, with given depth + Get path files list with files properties with given depth + (for current user) Args: - uid (str): uid of user path (str/None): files path depth (int): depth of listing files (directories content for example) all_properties (bool): list all available file properties in Nextcloud + fields (str list): file properties to fetch Returns: list of dicts if json_output list of File objects if not json_output """ - if all_properties: - data = """ - - - - - - - - - - - - - - - - - - """ - else: - data = None - additional_url = uid - if path: - additional_url = "{}/{}".format(additional_url, path) - resp = self.requester.propfind(additional_url=additional_url, - headers={"Depth": str(depth)}, + data = File.build_xml_propfind( + use_default=all_properties, + fields=fields + ) if (fields or all_properties) else None + resp = self.requester.propfind(additional_url=self._get_path(path), + headers={'Depth': str(depth)}, data=data) - if not resp.is_ok: - resp.data = None - return resp - response_data = resp.data - response_xml_data = ET.fromstring(response_data) - files_data = [File(single_file) for single_file in response_xml_data] - resp.data = files_data if not self.json_output else [each.as_dict() for each in files_data] - return resp + return File.from_response(resp, json_output=self.json_output, + wrapper=self) - def download_file(self, uid, path): + def download_file(self, path, target_dir=None): """ - Download file of given user by path + Download file by path (for current user) File will be saved to working directory path argument must be valid file path Modified time of saved file will be synced with the file properties in Nextcloud @@ -82,277 +285,291 @@ def download_file(self, uid, path): * file with same name already exists in working directory Args: - uid (str): uid of user path (str): file path Returns: - None + a tuple (target_path, File object) """ - additional_url = "/".join([uid, path]) - filename = path.split('/')[-1] if '/' in path else path - file_data = self.list_folders(uid=uid, path=path, depth=0) + if not target_dir: + target_dir = './' + filename = path.split('/')[(-1)] if '/' in path else path + file_data = self.get_file(path) if not file_data: raise ValueError("Given path doesn't exist") - file_resource_type = (file_data.data[0].get('resource_type') - if self.json_output - else file_data.data[0].resource_type) + file_resource_type = file_data.resource_type if file_resource_type == File.COLLECTION_RESOURCE_TYPE: raise ValueError("This is a collection, please specify file path") - if filename in os.listdir('./'): - raise ValueError("File with such name already exists in this directory") - res = self.requester.download(additional_url) + if filename in os.listdir(target_dir): + raise ValueError( + "File with such name already exists in this directory") + filename = os.path.join(target_dir, filename) + res = self.requester.download(self._get_path(path)) with open(filename, 'wb') as f: f.write(res.data) # get timestamp of downloaded file from file property on Nextcloud # If it succeeded, set the timestamp to saved local file # If the timestamp string is invalid or broken, the timestamp is downloaded time. - file_timestamp_str = (file_data.data[0].get('last_modified') - if self.json_output - else file_data.data[0].last_modified) + file_timestamp_str = file_data.last_modified file_timestamp = timestamp_to_epoch_time(file_timestamp_str) if isinstance(file_timestamp, int): - os.utime(filename, (datetime.now().timestamp(), file_timestamp)) + os.utime(filename, ( + datetime_to_timestamp(datetime.now()), + file_timestamp)) + return (filename, file_data) - def upload_file(self, uid, local_filepath, remote_filepath, timestamp=None): + def upload_file(self, local_filepath, remote_filepath, timestamp=None): """ Upload file to Nextcloud storage Args: - uid (str): uid of user local_filepath (str): path to file on local storage remote_filepath (str): path where to upload file on Nextcloud storage timestamp (int): timestamp of upload file. If None, get time by local file. + + Returns: + requester response """ with open(local_filepath, 'rb') as f: file_contents = f.read() if timestamp is None: timestamp = int(os.path.getmtime(local_filepath)) - return self.upload_file_contents(uid, file_contents, remote_filepath, timestamp) + return self.upload_file_contents(file_contents, remote_filepath, timestamp) - def upload_file_contents(self, uid, file_contents, remote_filepath, timestamp=None): + def upload_file_contents(self, file_contents, remote_filepath, timestamp=None): """ Upload file to Nextcloud storage Args: - uid (str): uid of user file_contents (bytes): Bytes the file to be uploaded consists of remote_filepath (str): path where to upload file on Nextcloud storage timestamp (int): mtime of upload file + + Returns: + requester response """ - additional_url = "/".join([uid, remote_filepath]) - return self.requester.put_with_timestamp(additional_url, data=file_contents, timestamp=timestamp) + return self.requester.put_with_timestamp( + self._get_path(remote_filepath), data=file_contents, timestamp=timestamp) - def create_folder(self, uid, folder_path): + def create_folder(self, folder_path): """ Create folder on Nextcloud storage Args: - uid (str): uid of user folder_path (str): folder path + + Returns: + requester response """ - return self.requester.make_collection(additional_url="/".join([uid, folder_path])) + return self.requester.make_collection(additional_url=(self._get_path(folder_path))) - def assure_folder_exists(self, uid, folder_path): + def assure_folder_exists(self, folder_path): """ Create folder on Nextcloud storage, don't do anything if the folder already exists. Args: - uid (str): uid of user folder_path (str): folder path Returns: + requester response """ - self.create_folder(uid, folder_path) + self.create_folder(folder_path) return True - def assure_tree_exists(self, uid, tree_path): + def assure_tree_exists(self, tree_path): """ Make sure that the folder structure on Nextcloud storage exists Args: - uid (str): uid of user folder_path (str): The folder tree Returns: + requester response """ tree = pathlib.PurePath(tree_path) parents = list(tree.parents) ret = True subfolders = parents[:-1][::-1] + [tree] for subf in subfolders: - ret = self.assure_folder_exists(uid, str(subf)) + ret = self.assure_folder_exists(str(subf)) + return ret - def delete_path(self, uid, path): + def delete_path(self, path): """ Delete file or folder with all content of given user by path Args: - uid (str): uid of user path (str): file or folder path to delete + + Returns: + requester response """ - url = "/".join([uid, path]) - return self.requester.delete(url=url) + return self.requester.delete(url=self._get_path(path)) - def move_path(self, uid, path, destination_path, overwrite=False): + def move_path(self, path, destination_path, overwrite=False): """ Move file or folder to destination Args: - uid (str): uid of user path (str): file or folder path to move destionation_path (str): destination where to move overwrite (bool): allow destination path overriding + + Returns: + requester response """ - path_url = "/".join([uid, path]) - destination_path_url = "/".join([uid, destination_path]) - return self.requester.move(url=path_url, - destination=destination_path_url, overwrite=overwrite) + return self.requester.move(url=self._get_path(path), + destination=self._get_path( + destination_path), + overwrite=overwrite) - def copy_path(self, uid, path, destination_path, overwrite=False): + def copy_path(self, path, destination_path, overwrite=False): """ Copy file or folder to destination Args: - uid (str): uid of user path (str): file or folder path to copy destionation_path (str): destination where to copy overwrite (bool): allow destination path overriding + + Returns: + requester response """ - path_url = "/".join([uid, path]) - destination_path_url = "/".join([uid, destination_path]) - return self.requester.copy(url=path_url, - destination=destination_path_url, overwrite=overwrite) + return self.requester.copy(url=self._get_path(path), + destination=self._get_path( + destination_path), + overwrite=overwrite) - def set_favorites(self, uid, path): + def set_file_property(self, path, update_rules): """ - Set files of a user favorite + Set file property Args: - uid (str): uid of user path (str): file or folder path to make favorite + update_rules : a dict { namespace: {key : value } } + + Returns: + requester response with list in data + + Note : + check keys in nextcloud.common.properties.NAMESPACES_MAP for namespace codes + check object property xml_name for property name """ - data = """ - - - - 1 - - - + data = File.build_xml_propupdate(update_rules) + return self.requester.proppatch(additional_url=self._get_path(path), data=data) + + def fetch_files_with_filter(self, path='', filter_rules=''): """ - url = "/".join([uid, path]) - return self.requester.proppatch(additional_url=url, data=data) + List files according to a filter + + Args: + path (str): file or folder path to search + filter_rules : a dict { namespace: {key : value } } - def list_favorites(self, uid, path=""): + Returns: + requester response with list in data + + Note : + check keys in nextcloud.common.properties.NAMESPACES_MAP for namespace codes + check object property xml_name for property name + """ + data = File.build_xml_propfind( + instr='oc:filter-files', filter_rules=filter_rules) + resp = self.requester.report( + additional_url=self._get_path(path), data=data) + return File.from_response(resp, json_output=self.json_output, + wrapper=self) + + def set_favorites(self, path): """ Set files of a user favorite Args: - uid (str): uid of user path (str): file or folder path to make favorite + + Returns: + requester response """ - data = """ - - - 1 - - - """ - url = "/".join([uid, path]) - res = self.requester.report(additional_url=url, data=data) - if not res.is_ok: - res.data = None - return res - response_xml_data = ET.fromstring(res.data) - files_data = [File(single_file) for single_file in response_xml_data] - res.data = files_data if not self.json_output else [each.as_dict() for each in files_data] - return res - - -class File(object): - SUCCESS_STATUS = 'HTTP/1.1 200 OK' - - # key is NextCloud property, value is python variable name - FILE_PROPERTIES = { - # d: - "getlastmodified": "last_modified", - "getetag": "etag", - "getcontenttype": "content_type", - "resourcetype": "resource_type", - "getcontentlength": "content_length", - # oc: - "id": "id", - "fileid": "file_id", - "favorite": "favorite", - "comments-href": "comments_href", - "comments-count": "comments_count", - "comments-unread": "comments_unread", - "owner-id": "owner_id", - "owner-display-name": "owner_display_name", - "share-types": "share_types", - "checksums": "check_sums", - "size": "size", - "href": "href", - # nc: - "has-preview": "has_preview", - } - xml_namespaces_map = { - "d": "DAV:", - "oc": "http://owncloud.org/ns", - "nc": "http://nextcloud.org/ns" - } - COLLECTION_RESOURCE_TYPE = 'collection' - - def __init__(self, xml_data): - self.href = xml_data.find('d:href', self.xml_namespaces_map).text - for propstat in xml_data.iter('{DAV:}propstat'): - if propstat.find('d:status', self.xml_namespaces_map).text != self.SUCCESS_STATUS: - continue - for file_property in propstat.find('d:prop', self.xml_namespaces_map): - file_property_name = re.sub("{.*}", "", file_property.tag) - if file_property_name not in self.FILE_PROPERTIES: - continue - if file_property_name == 'resourcetype': - value = self._extract_resource_type(file_property) - else: - value = file_property.text - setattr(self, self.FILE_PROPERTIES[file_property_name], value) - - def _extract_resource_type(self, file_property): - file_type = list(file_property) - if file_type: - return re.sub("{.*}", "", file_type[0].tag) - return None + return self.set_file_property(path, {'oc': {'favorite': 1}}) + + def list_favorites(self, path=''): + """ + List favorites (files) of the user + + Args: + path (str): file or folder path to search favorite + + Returns: + requester response with list in data + """ + return self.fetch_files_with_filter(path, {'oc': {'favorite': 1}}) - def as_dict(self): - return {key: value - for key, value in self.__dict__.items() - if key in self.FILE_PROPERTIES.values()} + def get_file_property(self, path, field, ns='oc'): + """ + Fetch asked properties from a file path. + Args: + path (str): file or folder path to make favorite + field (str): field name -class WebDAVStatusCodes(object): - CREATED_CODE = 201 - NO_CONTENT_CODE = 204 - MULTISTATUS_CODE = 207 - ALREADY_EXISTS_CODE = 405 - PRECONDITION_FAILED_CODE = 412 + Returns: + requester response with asked value in data + """ + if ':' in field: + ns, field = field.split(':') + get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (ns, field) + data = File.build_xml_propfind(fields={ns: [field]}) + resp = self.requester.propfind(additional_url=(self._get_path(path)), headers={'Depth': str(0)}, + data=data) + response_data = resp.data + resp.data = None + if not resp.is_ok: + return resp + response_xml_data = ET.fromstring(response_data) + for xml_data in response_xml_data: + for prop in xml_data.findall(get_file_prop_xpath, + NAMESPACES_MAP): + resp.data = prop.text + break -def timestamp_to_epoch_time(rfc1123_date=""): - """ - literal date time string (use in DAV:getlastmodified) to Epoch time + return resp - No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified - However, the value may be broken or invalid. + def get_file(self, path, all_properties=False): + """ + Return the File object associated to the path - Args: - rfc1123_date (str): rfc1123-date (defined in RFC2616) - Return: - int or None : Epoch time, if date string value is invalid return None - """ - try: - epoch_time = datetime.strptime(rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp() - except ValueError: - # validation error (DAV:getlastmodified property is broken or invalid) + :param path: path to the file + :returns: File object or None + """ + resp = self.client.with_attr(json_output=False).list_folders( + path, all_properties=all_properties, depth=0) + if resp.is_ok: + if resp.data: + return resp.data[0] return None - return int(epoch_time) + + def get_folder(self, path=None, all_properties=False): + """ + Return the File object associated to the path + If the file (folder or 'collection') doesn't exists, create it. + + :param path: path to the file/folder, if empty use root + :returns: File object + """ + fileobj = self.get_file(path, all_properties=all_properties) + if fileobj: + if not fileobj.isdir(): + raise NextCloudFileConflict(fileobj.href) + else: + self.client.create_folder(path) + fileobj = self.get_file(path, all_properties=all_properties) + + return fileobj + + def get_relative_path(self, href): + """ + Returns relative (to application / user) path + + :param href(str): file href + :returns (str): relative path + """ + _app_root = '/'.join([self.API_URL, self.client.user]) + return href[len(_app_root):] diff --git a/src/nextcloud/base.py b/src/nextcloud/base.py index 3e5122e..2beac81 100644 --- a/src/nextcloud/base.py +++ b/src/nextcloud/base.py @@ -1,50 +1,123 @@ # -*- coding: utf-8 -*- -import enum +""" +Define what is an api wrapper +""" +import six +from nextcloud.requester import Requester, OCSRequester, WebDAVRequester +from nextcloud.codes import ProvisioningCode, OCSCode, WebDAVCode +API_WRAPPER_CLASSES = [] -class WithRequester(object): +class MetaWrapper(type): + """ Meta class to register wrappers """ + def __new__(cls, name, bases, attrs): + new_cls = type.__new__(cls, name, bases, attrs) + if (new_cls.API_URL != NotImplementedError and new_cls.VERIFIED): + API_WRAPPER_CLASSES.append(new_cls) + return new_cls + + +class BaseApiWrapper(object, six.with_metaclass(MetaWrapper)): + """ + Define an API wrapper + + Example of an abstract API wrapper. + >>> class ApiWrapper(BaseApiWrapper): + >>> REQUESTER = WebDAVRequester + >>> SUCCESS_CODE = 100 + + + If API_URL is provided (and if attribute ''VERIFIED = False'' is not in the new + class),then public methods of the class are added to NextCloud object. + Example of a concerete API wrapper. + >>> class Info(ApiWrapper): + >>> API_URL = 'remote.php/info' + >>> + >>> def get_info(self): + >>> return self.requester.get() + + """ API_URL = NotImplementedError + VERIFIED = True + JSON_ABLE = True + REQUESTER = Requester - def __init__(self, requester): - self._requester = requester + def __init__(self, client=None): + self.client = client + self.requester = self.REQUESTER(self) + + for attr_name in ['API_URL', 'SUCCESS_CODE', 'METHODS_SUCCESS_CODES']: + setattr(self.requester, attr_name, getattr(self, attr_name, None)) @property - def requester(self): - """ Get requester instance """ - # dynamically set API_URL for requester - self._requester.API_URL = self.API_URL - self._requester.SUCCESS_CODE = getattr(self, 'SUCCESS_CODE', None) - return self._requester + def json_output(self): + return self.JSON_ABLE and self.client.json_output + + def _arguments_get(self, varnames, vals): + """ + allows to automatically fetch values of varnames + using generic values computing '_default_get_VARNAME' + + Example + >>> def get_file_id(self, **kwargs): + >>> file_id, = self._arguments_get(['file_id'], locals()) + >>> + >>> def _default_get_file_id(self, vals): + >>> return self.get_file_id_from_name(vals.get('name', None)) + >>> + >>> nxc.get_file_id(name='foo.bar') + + :param varmames: list of wanted python var names + :param vals: a dict object containing already set variables + :returns: list of wanted values + """ + if 'kwargs' in vals: + vals.update(vals['kwargs']) + ret = [] + for varname in varnames: + val = vals.get(varname, None) + if val is None: + getter_func_name = '_default_get_%s' % varname + if hasattr(self, getter_func_name): + val = getattr(self, getter_func_name)(vals) + ret.append(val) + + return ret + -class OCSCode(enum.IntEnum): - OK = 100 - SERVER_ERROR = 996 - NOT_AUTHORIZED = 997 - NOT_FOUND = 998 - UNKNOWN_ERROR = 999 +class ProvisioningApiWrapper(BaseApiWrapper): + """ Define "Provisioning API" wrapper classes """ + REQUESTER = OCSRequester + SUCCESS_CODE = ProvisioningCode.SUCCESS -class ShareType(enum.IntEnum): - USER = 0 - GROUP = 1 - PUBLIC_LINK = 3 - FEDERATED_CLOUD_SHARE = 6 +class OCSv1ApiWrapper(BaseApiWrapper): + """ Define OCS wrapper classes """ + REQUESTER = OCSRequester + SUCCESS_CODE = OCSCode.SUCCESS_V1 -class Permission(enum.IntEnum): - """ Permission for Share have to be sum of selected permissions """ - READ = 1 - UPDATE = 2 - CREATE = 4 - DELETE = 8 - SHARE = 16 - ALL = 31 +class OCSv2ApiWrapper(BaseApiWrapper): + """ Define OCS wrapper classes """ + REQUESTER = OCSRequester + SUCCESS_CODE = OCSCode.SUCCESS_V2 -QUOTA_UNLIMITED = -3 +class WebDAVApiWrapper(BaseApiWrapper): + """ Define WebDav wrapper classes """ + REQUESTER = WebDAVRequester -def datetime_to_expire_date(date): - return date.strftime("%Y-%m-%d") + SUCCESS_CODE = { + 'PROPFIND': [WebDAVCode.MULTISTATUS], + 'PROPPATCH': [WebDAVCode.MULTISTATUS], + 'REPORT': [WebDAVCode.MULTISTATUS], + 'MKCOL': [WebDAVCode.CREATED], + 'COPY': [WebDAVCode.CREATED, WebDAVCode.NO_CONTENT], + 'MOVE': [WebDAVCode.CREATED, WebDAVCode.NO_CONTENT], + 'PUT': [WebDAVCode.CREATED], + 'POST': [WebDAVCode.CREATED], + 'DELETE': [WebDAVCode.NO_CONTENT] + } diff --git a/src/nextcloud/codes.py b/src/nextcloud/codes.py new file mode 100644 index 0000000..4041350 --- /dev/null +++ b/src/nextcloud/codes.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +""" +Define all known return code from OwnCloud/NextCloud API +""" +import enum + + +class ShareType(enum.IntEnum): + USER = 0 + GROUP = 1 + PUBLIC_LINK = 3 + FEDERATED_CLOUD_SHARE = 6 + + +class Permission(enum.IntEnum): + """ Permission for Share have to be sum of selected permissions """ + READ = 1 + UPDATE = 2 + CREATE = 4 + DELETE = 8 + SHARE = 16 + ALL = 31 + + +class ExternalApiCodes(enum.IntEnum): + SUCCESS = 100 + SERVER_ERROR = 996 + NOT_AUTHORIZED = 997 + NOT_FOUND = 998 + UNKNOWN_ERROR = 999 + + +class ProvisioningCode(enum.IntEnum): + SUCCESS = 100 + INVALID_INPUT_DATA = 101 + FAILED = 102 + CREATION_FAILED = 103 + INSUFFICENT_PRIVILIEGES = 104 + CHANGE_FAILED = 105 + + +class OCSCode(enum.IntEnum): + SUCCESS_V1 = 100 + SUCCESS_V2 = 200 + FAILURE = 400 + NOT_FOUND = 404 + SYNC_CONFLICT = 409 + + +class WebDAVCode(enum.IntEnum): + """ DAV constants """ + CREATED = 201 + NO_CONTENT = 204 + MULTISTATUS = 207 + NOT_AUTHENTICATED = 401 + ALREADY_EXISTS = 405 + CONFLICT = 409 + PRECONDITION_FAILED = 412 + + +QUOTA_UNLIMITED = -3 diff --git a/src/nextcloud/common/__init__.py b/src/nextcloud/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/nextcloud/common/collections.py b/src/nextcloud/common/collections.py new file mode 100644 index 0000000..8eaf0c0 --- /dev/null +++ b/src/nextcloud/common/collections.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +""" +Define generic request/result class +common to File and Tag +""" +import re +from nextcloud.common.simplexml import SimpleXml +from nextcloud.common.properties import NAMESPACES_MAP + + +class PropertySet(object): + """ + Set of nextcloud.common.properties.Prop + defined in _attrs class variable. + + The inherited classes can do additionnal complex operations + if wrapper instance is defined at initialization. + """ + SUCCESS_STATUS = 'HTTP/1.1 200 OK' + COLLECTION_RESOURCE_TYPE = 'collection' + _attrs = [] + + @property + def _fields(self): + return [v.attr_name for v in self._attrs] + + @property + def _properties(self): + return [v.xml_key for v in self._attrs] + + @classmethod + def _fetch_property(cls, key, attr='xml_key'): + for k in cls._attrs: + if getattr(k, attr) == key: + return k + + def __repr__(self, add_info=''): + return "<%s %s%s>" % (self.__class__.__name__, self.href, add_info) + + def __init__(self, xml_data, init_attrs=False, wrapper=None): + if init_attrs: + for attr in self._attrs: + setattr(self, attr.attr_name, None) + + self._wrapper = wrapper + self.href = xml_data.find('d:href', NAMESPACES_MAP).text + for propstat in xml_data.iter('{DAV:}propstat'): + if propstat.find('d:status', NAMESPACES_MAP).text != self.SUCCESS_STATUS: + pass + else: + for xml_property in propstat.find('d:prop', NAMESPACES_MAP): + property_name = re.sub('{.*}', '', xml_property.tag) + prop = self._fetch_property(property_name) + if not prop: + pass + else: + value = prop.get_value(xml=xml_property) + setattr(self, prop.attr_name, value) + + @classmethod + def default_get(cls, key_format='json', **kwargs): + """ + Get default values + + :param key_format: 'json' or 'xml' + :param (any): values to force (python names) + """ + vals = {getattr(v, '%s_key' % key_format): kwargs.get(v.attr_name, v.default_value) + for v in cls._attrs if getattr(v, '%s_key' % key_format, False)} + return vals + + @classmethod + def build_xml_propfind(cls, instr=None, filter_rules=None, use_default=False, fields=None): + """see SimpleXml.build_propfind_datas + + :param use_default: True to use all values specified in PropertySet + """ + if use_default: + if not fields: + fields = {k: [] for k in NAMESPACES_MAP.keys()} + for attr in cls._attrs: + fields[attr.ns].append(attr.xml_key) + + return SimpleXml.build_propfind_datas(instr=instr, filter_rules=filter_rules, + fields=(fields or {})) + + @classmethod + def build_xml_propupdate(cls, values): + """ see SimpleXml.build_propupdate_datas """ + return SimpleXml.build_propupdate_datas(values) + + @classmethod + def from_response(cls, resp, json_output=None, filtered=None, + init_attrs=None, wrapper=None): + """ Build list of PropertySet from a NextcloudResponse """ + if not resp.is_ok: + resp.data = None + return resp + else: + response_data = resp.data + response_xml_data = SimpleXml.fromstring(response_data) + attr_datas = [cls(xml_data, init_attrs=init_attrs, wrapper=wrapper) + for xml_data in response_xml_data] + if filtered: + if callable(filtered): + attr_datas = [ + attr_data + for attr_data in attr_datas + if filtered(attr_data) + ] + resp.data = attr_datas if not json_output else [ + attr_data.as_dict() for attr_data in attr_datas] + return resp + + def as_dict(self): + """ Return current instance as a {k: val} dict """ + attrs = [v.attr_name for v in self._attrs] + return {key: value for key, value in self.__dict__.items() if key in attrs} diff --git a/src/nextcloud/common/properties.py b/src/nextcloud/common/properties.py new file mode 100644 index 0000000..be4c9bd --- /dev/null +++ b/src/nextcloud/common/properties.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +""" +Define properties types that can be used one OwnCloud/NextCloud elements + + +How to define a new property namespace. Example: +>>> class NCProp(Property): +>>> # define the namespace code with the namespace value +>>> namespace = ('nc', 'http://nextcloud.org/ns') +>>> # choose which attribute name is given by default on PropertySet +>>> _name_convention = { +>>> # xml : python +>>> 'xmly-attr-name': 'cute_attr_name', +>>> } # Note: by default, all '-' are already replaced by '_' + +""" +import six + +NAMESPACES_MAP = {} +NAMESPACES_CLASSES = {} + + +class MetaProperty(type): + def __new__(meta, name, bases, attrs): + cls = type.__new__(meta, name, bases, attrs) + if (cls.namespace): + NAMESPACES_MAP[cls.namespace[0]] = cls.namespace[1] + NAMESPACES_CLASSES[cls.namespace[0]] = cls + return cls + + +class Property(object, six.with_metaclass(MetaProperty)): + """ + Define an element property, and naming of resulting python attribute + + :param xml_name: xml property name (prefixed with 'ns:' i.e. namespace) + :param json: json property name + :param default: default value (value or function without args) + :param parse_xml_value: a function that take xml.etree.ElementTree and + return value of the property + """ + namespace = None + _name_convention = {} + + def __init__(self, xml_name, json=None, default=None, parse_xml_value=None): + if ':' in xml_name: + (self.ns, self.xml_key) = xml_name.split(':') + if self.ns in NAMESPACES_CLASSES: + self._name_convention = NAMESPACES_CLASSES[self.ns]._name_convention + else: + self.xml_key = xml_name + if self.namespace: + self.ns = self.namespace[0] + + self.attr_name = self._xml_name_to_py_name(self.xml_key) + self.json_key = json + self.default_val = default + self.parse_xml_value = parse_xml_value + + def __repr__(self): + return "<{}: ns={}, xml={}, py={}, json={}>".format( + self.__class__.__name__, + self.ns, + self.attr_name, + self.xml_key, + self.json_key + ) + + def _xml_name_to_py_name(self, name): + if name in self._name_convention: + return self._name_convention[name] + else: + return name.replace('-', '_') + + def _py_name_to_xml_name(self, name): + _reversed_convention = {v: k for k, v in self._name_convention.items()} + if name in _reversed_convention: + return _reversed_convention[name] + else: + return name.replace('_', '-') + + @property + def default_value(self): + """ Fetch default value """ + if callable(self.default_val): + return self.default_val() + else: + return self.default_val + + def get_value(self, xml=None): + """ + Fetch value from input data + + :param xml: xml.etree.ElementTree node + :returns: python value + """ + if xml is not None: + if self.parse_xml_value: + return self.parse_xml_value(xml) + else: + return xml.text + + +class DProp(Property): + """ DAV property """ + namespace = ('d', 'DAV:') + + _name_convention = { + 'getlastmodified': 'last_modified', + 'getetag': 'etag', + 'getcontenttype': 'content_type', + 'resourcetype': 'resource_type', + 'getcontentlength': 'content_length' + } + + +class OCProp(Property): + """ OwnCloud property """ + namespace = ('oc', 'http://owncloud.org/ns') + + _name_convention = { + 'fileid': 'file_id', + 'checksums': 'check_sums' + } + + +class NCProp(Property): + """ NextCloud property """ + namespace = ('nc', 'http://nextcloud.org/ns') diff --git a/src/nextcloud/common/simplexml.py b/src/nextcloud/common/simplexml.py new file mode 100644 index 0000000..b69d5a9 --- /dev/null +++ b/src/nextcloud/common/simplexml.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +""" +XML builder/parser +""" +import xml.etree.ElementTree as ET +from nextcloud.compat import encode_string +from nextcloud.common.properties import NAMESPACES_MAP + + +def _prepare_xml_parsing(string): + return encode_string(string) + +def _safe_xml_val(val): + if isinstance(val, int): + val = str(val) + return val + +class SimpleXml: + """ + Static class to build and parse XML datas + """ + namespaces_map = NAMESPACES_MAP + supported_field_types = list(NAMESPACES_MAP.keys()) + xml_namespaces_map = {'xmlns:' + k: v for k, v in NAMESPACES_MAP.items()} + + @classmethod + def _to_fields_list(cls, fields_hash): + props_xml = [] + for field_type in fields_hash: + if field_type not in cls.supported_field_types: + pass + else: + for field in fields_hash[field_type]: + props_xml.append('{}:{}'.format(field_type, field)) + + return props_xml + + @classmethod + def _to_field_vals_list(cls, fields_hash): + props_xml = {} + for field_type in fields_hash: + if field_type not in cls.supported_field_types: + pass + else: + vals = fields_hash[field_type] + for field in vals: + props_xml['{}:{}'.format(field_type, field)] = _safe_xml_val(vals[field]) + + return props_xml + + @classmethod + def _tostring(cls, root): + return ET.tostring(root) + + @classmethod + def fromstring(cls, data): + """ + Fetch xml.etree.ElementTree for input data + + :param data: raw xml data + :returns: :class:xml.etree.ElementTree + """ + return ET.fromstring(_prepare_xml_parsing(data)) + + @classmethod + def build_propfind_datas(cls, instr=None, filter_rules=None, fields=None): + """ + Build XML datas for a PROPFIND querry. + + :param instr: http instruction (default: PROPFIND) + :param filter_rules: a dict containing filter rules separated by + namespace. e.g. {'oc': {'favorite': 1}} + :param fields: a dict containing fields separated by namespace + e.g. {'oc': ['id']} + :returns: xml data (string) + """ + if not instr: + instr = 'd:propfind' + + root = ET.Element(instr, cls.xml_namespaces_map) + props = cls._to_fields_list(fields or {}) + if props: + prop_group = ET.SubElement(root, 'd:prop') + for prop in props: + ET.SubElement(prop_group, prop) + + rules = cls._to_field_vals_list(filter_rules or {}) + if rules: + rule_group = ET.SubElement(root, 'oc:filter-rules') + for k in rules: + rule = ET.SubElement(rule_group, k) + val = rules[k] + rule.text = _safe_xml_val(val) + + return cls._tostring(root) + + @classmethod + def build_propupdate_datas(cls, values): + """ + Build XML datas for a PROPUPDATE querry. + + :param values: a dict containing values separated by namespace + e.g. {'oc': {'favorite': 1}} + :returns: xml data (string) + """ + root = ET.Element('d:propertyupdate', cls.xml_namespaces_map) + vals = cls._to_field_vals_list(values) + if vals: + set_group = ET.SubElement(root, 'd:set') + val_group = ET.SubElement(set_group, 'd:prop') + for k in vals: + val = ET.SubElement(val_group, k) + val.text = vals[k] + + return cls._tostring(root) diff --git a/src/nextcloud/common/value_parsing.py b/src/nextcloud/common/value_parsing.py new file mode 100644 index 0000000..0a4fc8f --- /dev/null +++ b/src/nextcloud/common/value_parsing.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +""" +Extra tools for value parsing +""" +from datetime import datetime +import os +from nextcloud.compat import datetime_to_timestamp + + +def datetime_to_expire_date(date): + return date.strftime("%Y-%m-%d") + + +def timestamp_to_epoch_time(rfc1123_date=''): + """ + literal date time string (use in DAV:getlastmodified) to Epoch time + + No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified + However, the value may be broken or invalid. + + Args: + rfc1123_date (str): rfc1123-date (defined in RFC2616) + Return: + int or None : Epoch time, if date string value is invalid return None + """ + try: + _tz = os.environ.get('TZ', '') + os.environ['TZ'] = 'UTC' + _time = datetime.strptime( + rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT') + os.environ['TZ'] = _tz + except ValueError: + return + else: + return datetime_to_timestamp(_time) diff --git a/src/nextcloud/compat.py b/src/nextcloud/compat.py new file mode 100644 index 0000000..bf790b0 --- /dev/null +++ b/src/nextcloud/compat.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +""" +Tools for python2/3 unicode compatibility +""" +import six +import time + + +def encode_requests_password(word): + """ + Convert the string to bytes (readable by the server) + + :param word: input string + :returns: bytes with appropriate encoding + """ + if isinstance(word, bytes): + return word + + ret = word + if six.PY2: + if isinstance(word, six.text_type): + # trick to work with tricks in requests lib + ret = word.encode('utf-8').decode('latin-1') + else: + try: + ret = bytes(word, 'ascii') + except UnicodeEncodeError: + ret = bytes(word, 'utf-8') + return ret + + +def encode_string(string): + """Encodes a unicode instance to utf-8. If a str is passed it will + simply be returned + + :param string: str or unicode to encode + :returns : encoded output as str + """ + if six.PY2: + if isinstance(string, six.text_type): + return string.encode('utf-8') + return string + + +def datetime_to_timestamp(_time): + """ + Returns int(.timestamp()) + """ + if six.PY2: + return int( + time.mktime(_time.timetuple()) + _time.microsecond/1000000.0 + ) + else: + return int( + _time.timestamp() + ) diff --git a/src/nextcloud/requester.py b/src/nextcloud/requester.py index ccf0cdf..ceadafd 100644 --- a/src/nextcloud/requester.py +++ b/src/nextcloud/requester.py @@ -1,101 +1,114 @@ # -*- coding: utf-8 -*- -import requests -from functools import wraps - +""" +Define requesters +""" from .response import WebDAVResponse, OCSResponse +from .compat import encode_string +from .session import catch_connection_error, NextCloudConnectionError -class NextCloudConnectionError(Exception): - """ A connection error occurred """ - +# from six.moves.urllib import parse +def _prepare_url(string): + return encode_string(string) +# if six.PY2 and isinstance(string, unicode): # noqa: F821 +# return parse.urlparse(string).path +# return s -def catch_connection_error(func): - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except requests.RequestException as e: - raise NextCloudConnectionError("Failed to establish connection to NextCloud", - getattr(e.request, 'url', None), e) - return wrapper class Requester(object): - def __init__(self, endpoint, user, passwd, json_output=False): - self.query_components = [] - - self.json_output = json_output - - self.base_url = endpoint + """ Base requester """ + def __init__(self, wrapper): + self.query_components = [] self.h_get = {"OCS-APIRequest": "true"} self.h_post = {"OCS-APIRequest": "true", "Content-Type": "application/x-www-form-urlencoded"} - self.auth_pk = (user, passwd) + self.wrapper = wrapper self.API_URL = None self.SUCCESS_CODE = None + @property + def json_output(self): + return self.wrapper.json_output + + @property + def client(self): + return self.wrapper.client + + @property + def session(self): + return self.wrapper.client.session + def rtn(self, resp): if self.json_output: return resp.json() - else: - return resp.content.decode("UTF-8") + return resp.content.decode("UTF-8") @catch_connection_error - def get(self, url="", params=None): + def get(self, url="", params=None, headers=None): url = self.get_full_url(url) - res = requests.get(url, auth=self.auth_pk, headers=self.h_get, params=params) + res = self.session.request('get', url, headers=(headers or self.h_get), + params=params) return self.rtn(res) @catch_connection_error - def post(self, url="", data=None): + def post(self, url="", data=None, headers=None): url = self.get_full_url(url) - res = requests.post(url, auth=self.auth_pk, data=data, headers=self.h_post) + res = self.session.request( + 'post', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) @catch_connection_error - def put_with_timestamp(self, url="", data=None, timestamp=None): - h_post = self.h_post + def put_with_timestamp(self, url="", data=None, timestamp=None, headers=None): + h_post = headers or self.h_post if isinstance(timestamp, (float, int)): - h_post["X-OC-MTIME"] = f"{timestamp:.0f}" + h_post['X-OC-MTIME'] = '%.0f' % timestamp url = self.get_full_url(url) - res = requests.put(url, auth=self.auth_pk, data=data, headers=h_post) + res = self.session.request('put', url, data=data, headers=h_post) return self.rtn(res) @catch_connection_error - def put(self, url="", data=None): + def put(self, url="", data=None, headers=None): url = self.get_full_url(url) - res = requests.put(url, auth=self.auth_pk, data=data, headers=self.h_post) + res = self.session.request( + 'put', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) @catch_connection_error - def delete(self, url="", data=None): + def delete(self, url="", data=None, headers=None): url = self.get_full_url(url) - res = requests.delete(url, auth=self.auth_pk, data=data, headers=self.h_post) + res = self.session.request( + 'delete', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) def get_full_url(self, additional_url=""): """ Build full url for request to NextCloud api - Construct url from self.base_url, self.API_URL, additional_url (if given), + Construct url from base_url, API_URL and additional_url (if given), add format=json param if self.json :param additional_url: str add to url after api_url :return: str """ - if additional_url and not str(additional_url).startswith("/"): - additional_url = "/{}".format(additional_url) + if isinstance(additional_url, int): + additional_url = str(additional_url) + if additional_url: + additional_url = _prepare_url(additional_url) + if not additional_url.startswith("/"): + additional_url = "/{}".format(additional_url) if self.json_output: self.query_components.append("format=json") - - ret = "{base_url}{api_url}{additional_url}".format( - base_url=self.base_url, api_url=self.API_URL, additional_url=additional_url) - + ret = "{base_url}{api_url}{additional_url}".format(base_url=(self.session.url), + api_url=( + self.API_URL), + additional_url=additional_url) if self.json_output: ret += "?format=json" + return ret @@ -114,56 +127,54 @@ def __init__(self, *args, **kwargs): super(WebDAVRequester, self).__init__(*args, **kwargs) def rtn(self, resp, data=None): - return WebDAVResponse(response=resp, data=data) + return WebDAVResponse(response=resp, data=data, + success_code=self.SUCCESS_CODE) @catch_connection_error def propfind(self, additional_url="", headers=None, data=None): url = self.get_full_url(additional_url=additional_url) - res = requests.request('PROPFIND', url, auth=self.auth_pk, headers=headers, data=data) + res = self.session.request('PROPFIND', url, headers=headers, data=data) return self.rtn(res) @catch_connection_error def proppatch(self, additional_url="", data=None): url = self.get_full_url(additional_url=additional_url) - res = requests.request('PROPPATCH', url, auth=self.auth_pk, data=data) + res = self.session.request('PROPPATCH', url, data=data) return self.rtn(resp=res) @catch_connection_error def report(self, additional_url="", data=None): url = self.get_full_url(additional_url=additional_url) - res = requests.request('REPORT', url, auth=self.auth_pk, data=data) + res = self.session.request('REPORT', url, data=data) return self.rtn(resp=res) @catch_connection_error def download(self, url="", params=None): url = self.get_full_url(url) - res = requests.get(url, auth=self.auth_pk, headers=self.h_get, params=params) - return self.rtn(resp=res, data=res.content) + res = self.session.request( + 'get', url, headers=(self.h_get), params=params) + return self.rtn(resp=res, data=(res.content)) @catch_connection_error def make_collection(self, additional_url=""): url = self.get_full_url(additional_url=additional_url) - res = requests.request("MKCOL", url=url, auth=self.auth_pk) + res = self.session.request("MKCOL", url=url) return self.rtn(resp=res) @catch_connection_error def move(self, url, destination, overwrite=False): url = self.get_full_url(additional_url=url) destination_url = self.get_full_url(additional_url=destination) - headers = { - "Destination": destination_url.encode('utf-8'), - "Overwrite": "T" if overwrite else "F" - } - res = requests.request("MOVE", url=url, auth=self.auth_pk, headers=headers) + headers = {"Destination": destination_url.encode("utf-8"), + "Overwrite": "T" if overwrite else "F"} + res = self.session.request("MOVE", url=url, headers=headers) return self.rtn(resp=res) @catch_connection_error def copy(self, url, destination, overwrite=False): url = self.get_full_url(additional_url=url) destination_url = self.get_full_url(additional_url=destination) - headers = { - "Destination": destination_url.encode('utf-8'), - "Overwrite": "T" if overwrite else "F" - } - res = requests.request("COPY", url=url, auth=self.auth_pk, headers=headers) + headers = {"Destination": destination_url.encode("utf-8"), + "Overwrite": "T" if overwrite else "F"} + res = self.session.request("COPY", url=url, headers=headers) return self.rtn(resp=res) diff --git a/src/nextcloud/response.py b/src/nextcloud/response.py index 7eb0a7e..c57affe 100644 --- a/src/nextcloud/response.py +++ b/src/nextcloud/response.py @@ -1,66 +1,93 @@ # -*- coding: utf-8 -*- -from json import JSONDecodeError +""" +Define requests responses (automatically check if the request is OK) +""" +try: + from json import JSONDecodeError +except ImportError: + JSONDecodeError = ValueError -from .api_wrappers.webdav import WebDAVStatusCodes +class BaseResponse(object): + """ + Base Response that take HTTP reponse and take the following attrs + - raw : the raw response + - status_code : the HTTP code + - data : the asked data (json or xml value) + - is_ok : True if the request is succesfully achieved + """ -class NextCloudResponse(object): - - def __init__(self, response, json_output=True, data=None): + def __init__(self, response, data=None, json_output=True, + status_code=None, success_code=None, **kwargs): self.raw = response - if not data: - self.data = response.json() if json_output else response.content.decode("UTF-8") - else: - self.data = data - - -class WebDAVResponse(NextCloudResponse): - """ Response class for WebDAV api methods """ + self.data = data if data is not None else ( + response.json() if json_output else response.content.decode('UTF-8') + ) + self.status_code = status_code or response.status_code + for k in kwargs: + setattr(self, k, kwargs[k]) + self._compute_is_ok(success_code) - METHODS_SUCCESS_CODES = { - "PROPFIND": [WebDAVStatusCodes.MULTISTATUS_CODE], - "PROPPATCH": [WebDAVStatusCodes.MULTISTATUS_CODE], - "REPORT": [WebDAVStatusCodes.MULTISTATUS_CODE], - "MKCOL": [WebDAVStatusCodes.CREATED_CODE], - "COPY": [WebDAVStatusCodes.CREATED_CODE, WebDAVStatusCodes.NO_CONTENT_CODE], - "MOVE": [WebDAVStatusCodes.CREATED_CODE, WebDAVStatusCodes.NO_CONTENT_CODE], - "PUT": [WebDAVStatusCodes.CREATED_CODE], - "DELETE": [WebDAVStatusCodes.NO_CONTENT_CODE] - } + def _compute_is_ok(self, success_code): + if isinstance(success_code, dict): + method = self.raw.request.method + success_codes = success_code.get(method, []) + else: + success_codes = ( + success_code if isinstance(success_code, list) else + [success_code] + ) - def __init__(self, response, data=None): - super(WebDAVResponse, self).__init__(response=response, data=data, json_output=False) - request_method = response.request.method - self.is_ok = False - if request_method in self.METHODS_SUCCESS_CODES: - self.is_ok = response.status_code in self.METHODS_SUCCESS_CODES[request_method] + self.is_ok = self.status_code in success_codes def __repr__(self): is_ok_str = "OK" if self.is_ok else "Failed" - return "".format(is_ok_str) + return "<{}: Status: {}>".format(self.__class__.__name__, is_ok_str) -class OCSResponse(NextCloudResponse): - """ Response class for OCS api methods """ +class OCSResponse(BaseResponse): + """ + Response class for OCS api methods + Add some attributes: + - meta : ocs json metadata + - full_data : json data of the ocs response + """ def __init__(self, response, json_output=True, success_code=None): - self.raw = response - self.is_ok = None - - if json_output: + data = None + full_data = None + meta = None + status_code = None + if (success_code or json_output): try: - self.full_data = response.json() - self.meta = self.full_data['ocs']['meta'] - self.status_code = self.full_data['ocs']['meta']['statuscode'] - self.data = self.full_data['ocs']['data'] - if success_code: - self.is_ok = self.full_data['ocs']['meta']['statuscode'] == success_code + full_data = response.json() + if 'ocs' in full_data: + ocs_data = full_data['ocs'] + meta = ocs_data['meta'] + status_code = meta['statuscode'] + if json_output: + data = ocs_data['data'] + else: + data = full_data + meta = data + status_code = -1 except JSONDecodeError: - self.is_ok = False - self.data = {'message': 'Unable to parse JSON response'} - else: - self.data = response.content.decode("UTF-8") + data = {'message': 'Unable to parse JSON response'} + meta = data + status_code = -1 - def __repr__(self): - is_ok_str = "OK" if self.is_ok else "Failed" - return "".format(is_ok_str) + super(OCSResponse, self).__init__(response, data=data, + json_output=json_output, + full_data=full_data, + status_code=status_code, + meta=meta, + success_code=success_code) + + +class WebDAVResponse(BaseResponse): + """ Response class for WebDAV api methods """ + + def __init__(self, response, data=None, success_code=None, json_output=False): + super(WebDAVResponse, self).__init__(response, data=data, + json_output=json_output, + success_code=success_code) diff --git a/src/nextcloud/session.py b/src/nextcloud/session.py new file mode 100644 index 0000000..c5d6aac --- /dev/null +++ b/src/nextcloud/session.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +from functools import wraps +import requests +from .compat import encode_requests_password + +import logging +_logger = logging.getLogger(__name__) + + +class NextCloudConnectionError(Exception): + """ A connection error occurred """ + +class NextCloudLoginError(Exception): + """ A login error occurred """ + +def catch_connection_error(func): + + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except requests.RequestException as e: + raise NextCloudConnectionError( + 'Failed to establish connection to NextCloud', getattr(e.request, 'url', None), e) + + return wrapper + + +class Session(object): + """ Session for requesting """ + + def __init__(self, url=None, user=None, password=None, auth=None, session_kwargs=None): + self.session = None + self.auth = None + self.user = None + self._set_credentials(user, password, auth) + self.url = url.rstrip('/') + self._session_kwargs = session_kwargs or {} + + def _set_credentials(self, user, password, auth): + if auth: + self.auth = auth + if user: + self.user = user + else: + if isinstance(self.auth, tuple): + self.user = self.auth[0] + else: + if isinstance(self.auth, requests.auth.AuthBase): + self.user = self.auth.username + if not self.auth and (self.user and password): + self.auth = (self.user, encode_requests_password(password)) + + def request(self, method, url, **kwargs): + if self.session: + return self.session.request(method=method, url=url, **kwargs) + else: + _kwargs = self._session_kwargs + _kwargs.update(kwargs) + if not kwargs.get('auth', False): + _kwargs['auth'] = self.auth + return requests.request(method, url, **_kwargs) + + def login(self, user=None, password=None, auth=None, client=None): + """Create a stable session on the server. + + :param user_id: user id + :param password: password + :param auth: object for any auth method + :param client: object for any auth method + :raises: HTTPResponseError in case an HTTP error status was returned + """ + self.session = requests.Session() + for k in self._session_kwargs: + setattr(self.session, k, self._session_kwargs[k]) + + self._set_credentials(user, password, auth) + self.session.auth = self.auth + if client: + self._check_session(client.with_attr(json_output=True), retry=3) + + + def _check_session(self, client=None, retry=None): + def _clear(): + if self.session: + self.logout() + + def _raise(e): + if retry: + _logger.warning('Retry session check (%s)', self.url) + return self._check_session(client, retry=retry - 1) + else: + _clear() + raise e + + try: + resp = client.get_user() + if not resp.is_ok: + _raise(NextCloudLoginError( + 'Failed to login to NextCloud', self.url, resp)) + except requests.exceptions.SSLError as e: + _raise(e) + except NextCloudConnectionError as e: + _raise(e) + except Exception as e: + _clear() + raise e + + def logout(self): + """Log out the authenticated user and close the session. + + :returns: True if the operation succeeded, False otherwise + :raises: HTTPResponseError in case an HTTP error status was returned + """ + self.session.close() + self.session = None + return True diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..7f314da --- /dev/null +++ b/test.sh @@ -0,0 +1,96 @@ +#!/bin/sh +# Script for running tests +_usage(){ + cat <