From 63317a80a28ee875154e80c7424f642aeb82bc27 Mon Sep 17 00:00:00 2001 From: Hugo Osvaldo Barrera Date: Sun, 18 Jun 2017 16:11:30 -0300 Subject: [PATCH] Prefer pathlib.Path to plain strings --- tests/test_cli.py | 12 ++--- tests/test_model.py | 4 +- tests/test_ui.py | 8 ++-- todoman/model.py | 107 ++++++++++++++++++++++---------------------- 4 files changed, 66 insertions(+), 65 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index b2d93ac3..27afa075 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,8 @@ import datetime import sys from os.path import exists +from os.path import isdir +from pathlib import Path from unittest import mock from unittest.mock import call from unittest.mock import patch @@ -11,14 +13,14 @@ from dateutil.tz import tzlocal from freezegun import freeze_time from hypothesis import given - -from tests.helpers import fs_case_sensitive -from tests.helpers import pyicu_sensitive from todoman.cli import cli from todoman.cli import exceptions from todoman.model import Database from todoman.model import Todo +from tests.helpers import fs_case_sensitive +from tests.helpers import pyicu_sensitive + # TODO: test --grep @@ -468,8 +470,8 @@ def test_edit_move(runner, todo_factory, default_database, tmpdir, todos): tmpdir.mkdir("another_list") default_database.paths = [ - str(tmpdir.join("default")), - str(tmpdir.join("another_list")), + Path(tmpdir.join("default")), + Path(tmpdir.join("another_list")), ] default_database.update_cache() diff --git a/tests/test_model.py b/tests/test_model.py index 922d0bbe..ec11b617 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,6 +1,7 @@ from datetime import date from datetime import datetime from datetime import timedelta +from pathlib import Path from unittest.mock import patch import pytest @@ -8,7 +9,6 @@ from dateutil.tz import tzlocal from dateutil.tz.tz import tzoffset from freezegun import freeze_time - from todoman.exceptions import AlreadyExists from todoman.model import Database from todoman.model import Todo @@ -65,7 +65,7 @@ def test_change_paths(tmpdir, create): assert {t.summary for t in db.todos()} == old_todos - db.paths = [str(tmpdir.join("3"))] + db.paths = [Path(tmpdir.join("3"))] db.update_cache() assert len(list(db.lists())) == 1 diff --git a/tests/test_ui.py b/tests/test_ui.py index 02dac42b..39573f3f 100644 --- a/tests/test_ui.py +++ b/tests/test_ui.py @@ -1,12 +1,12 @@ from datetime import datetime +from pathlib import Path from unittest import mock import pytest import pytz from freezegun import freeze_time -from urwid import ExitMainLoop - from todoman.interactive import TodoEditor +from urwid import ExitMainLoop def test_todo_editor_priority(default_database, todo_factory, default_formatter): @@ -27,8 +27,8 @@ def test_todo_editor_list(default_database, todo_factory, default_formatter, tmp tmpdir.mkdir("another_list") default_database.paths = [ - str(tmpdir.join("default")), - str(tmpdir.join("another_list")), + Path(tmpdir.join("default")), + Path(tmpdir.join("another_list")), ] default_database.update_cache() diff --git a/todoman/model.py b/todoman/model.py index 0c603d83..3ef71a57 100644 --- a/todoman/model.py +++ b/todoman/model.py @@ -8,8 +8,8 @@ from datetime import datetime from datetime import time from datetime import timedelta -from os.path import normpath -from os.path import split +from pathlib import Path +from pathlib import PosixPath from typing import Iterable from uuid import uuid4 @@ -30,9 +30,12 @@ def register_adapters_and_converters(): + sqlite3.register_adapter(Path, str) + sqlite3.register_adapter(PosixPath, str) + + sqlite3.register_converter("path", lambda p: Path(p.decode())) sqlite3.register_converter( - 'timestamp', - lambda d: datetime.fromtimestamp(float(d), LOCAL_TIMEZONE) + "timestamp", lambda d: datetime.fromtimestamp(float(d), LOCAL_TIMEZONE) ) @@ -284,7 +287,7 @@ def path(self) -> str: if not self.list: raise ValueError("A todo without a list does not have a path.") - return os.path.join(self.list.path, self.filename) + return self.list.path.joinpath(self.filename) def cancel(self) -> None: self.status = "CANCELLED" @@ -383,7 +386,7 @@ def _read(self, path): return component def write(self): - if os.path.exists(self.todo.path): + if self.todo.path.exists(): self._write_existing(self.todo.path) else: self._write_new(self.todo.path) @@ -432,9 +435,10 @@ class Cache: SCHEMA_VERSION = 7 - def __init__(self, path: str): - self.cache_path = str(path) - os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) + def __init__(self, path: Path): + self.cache_path = path + # XXX: Use the below once we drop python3.4 + self.cache_path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect( str(self.cache_path), @@ -481,7 +485,7 @@ def create_tables(self): """ CREATE TABLE IF NOT EXISTS lists ( "name" TEXT PRIMARY KEY, - "path" TEXT, + "path" path, "colour" TEXT, "mtime" INTEGER, @@ -493,7 +497,7 @@ def create_tables(self): self._conn.execute( """ CREATE TABLE IF NOT EXISTS files ( - "path" TEXT PRIMARY KEY, + "path" path PRIMARY KEY, "list_name" TEXT, "mtime" INTEGER, @@ -506,7 +510,7 @@ def create_tables(self): self._conn.execute( """ CREATE TABLE IF NOT EXISTS todos ( - "file_path" TEXT, + "file_path" path, "id" INTEGER PRIMARY KEY, "uid" TEXT, @@ -535,7 +539,7 @@ def create_tables(self): def clear(self): self._conn.close() - os.remove(self.cache_path) + self.cache_path.unlink self._conn = None def add_list(self, name: str, path: str, colour: str, mtime: int): @@ -856,24 +860,24 @@ def todos( def _todo_from_db(self, row: dict) -> Todo: todo = Todo() - todo.id = row['id'] - todo.uid = row['uid'] - todo.summary = row['summary'] - todo.due = row['due'] - todo.start = row['start'] - todo.priority = row['priority'] - todo.created_at = row['created_at'] - todo.completed_at = row['completed_at'] - todo.dtstamp = row['dtstamp'] - todo.percent_complete = row['percent_complete'] - todo.status = row['status'] - todo.description = row['description'] - todo.location = row['location'] - todo.sequence = row['sequence'] - todo.last_modified = row['last_modified'] - todo.list = self.lists_map[row['list_name']] - todo.filename = os.path.basename(row['path']) - todo.rrule = row['rrule'] + todo.id = row["id"] + todo.uid = row["uid"] + todo.summary = row["summary"] + todo.due = row["due"] + todo.start = row["start"] + todo.priority = row["priority"] + todo.created_at = row["created_at"] + todo.completed_at = row["completed_at"] + todo.dtstamp = row["dtstamp"] + todo.percent_complete = row["percent_complete"] + todo.status = row["status"] + todo.description = row["description"] + todo.location = row["location"] + todo.sequence = row["sequence"] + todo.last_modified = row["last_modified"] + todo.list = self.lists_map[row["list_name"]] + todo.filename = row["path"].name + todo.rrule = row["rrule"] return todo def lists(self) -> Iterable[TodoList]: @@ -953,18 +957,16 @@ def __init__(self, name: str, path: str, colour: str = None): @staticmethod def colour_for_path(path: str) -> str | None: try: - with open(os.path.join(path, "color")) as f: - return f.read().strip() + return path.joinpath("color").read_text().strip() except OSError: logger.debug("No colour for list %s", path) @staticmethod def name_for_path(path: str) -> str: try: - with open(os.path.join(path, "displayname")) as f: - return f.read().strip() + return path.joinpath("displayname").read_text().strip() except OSError: - return split(normpath(path))[1] + return path.name @staticmethod def mtime_for_path(path: str) -> int: @@ -1001,8 +1003,8 @@ class Database: """ def __init__(self, paths, cache_path): - self.cache = Cache(cache_path) - self.paths = [str(path) for path in paths] + self.cache = Cache(Path(cache_path)) + self.paths = [Path(path) for path in paths] self.update_cache() def update_cache(self) -> None: @@ -1019,13 +1021,11 @@ def update_cache(self) -> None: TodoList.colour_for_path(path), paths[path], ) - for entry in os.listdir(path): - if not entry.endswith(".ics"): + for entry in path.iterdir(): + if not entry.name.endswith(".ics"): continue - entry_path = os.path.join(path, entry) - mtime = _getmtime(entry_path) - paths_to_mtime[entry_path] = mtime - paths_to_list_name[entry_path] = list_name + paths_to_mtime[entry] = _getmtime(entry) + paths_to_list_name[entry] = list_name self.cache.expire_files(paths_to_mtime) @@ -1039,10 +1039,10 @@ def update_cache(self) -> None: continue try: - with open(entry_path, "rb") as f: - cal = icalendar.Calendar.from_ical(f.read()) - for component in cal.walk("VTODO"): - self.cache.add_vtodo(component, entry_path) + data = entry_path.read_bytes() + cal = icalendar.Calendar.from_ical(data) + for component in cal.walk("VTODO"): + self.cache.add_vtodo(component, entry_path) except Exception: logger.exception("Failed to read entry %s.", entry_path) @@ -1058,17 +1058,16 @@ def lists(self) -> Iterable[TodoList]: return self.cache.lists() def move(self, todo: Todo, new_list: TodoList, from_list: TodoList) -> None: - orig_path = os.path.join(from_list.path, todo.filename) - dest_path = os.path.join(new_list.path, todo.filename) + orig_path = from_list.path.joinpath(todo.filename) + dest_path = new_list.path.joinpath(todo.filename) - os.rename(orig_path, dest_path) + orig_path.rename(dest_path) def delete(self, todo: Todo) -> None: if not todo.list: raise ValueError("Cannot delete Todo without a list.") - path = os.path.join(todo.list.path, todo.filename) - os.remove(path) + todo.list.path.joinpath(todo.filename).unlink() def flush(self) -> Iterable[Todo]: for todo in self.todos(status=["ANY"]): @@ -1100,5 +1099,5 @@ def save(self, todo: Todo) -> None: def _getmtime(path: str) -> int: - stat = os.stat(path) + stat = path.stat() return getattr(stat, "st_mtime_ns", stat.st_mtime)