diff --git a/_config.py b/_config.py index c589586..a64ea51 100644 --- a/_config.py +++ b/_config.py @@ -133,8 +133,8 @@ def __init_subclass__(cls, **kwargs): SIMPLENOTE_NOTES_DIR = os.path.join(SIMPLENOTE_CACHE_DIR, "notes") os.makedirs(SIMPLENOTE_NOTES_DIR, exist_ok=True) - # SIMPLENOTE_STARTED: bool = False - # SIMPLENOTE_RELOAD_CALLS: int = -1 + SIMPLENOTE_STARTED_KEY: str = "simplenote_started" + SIMPLENOTE_SYNC_TIMES_KEY: str = "simplenote_sync_times" class Development(_BaseConfig): diff --git a/commands.py b/commands.py index f184ad7..ef2087e 100644 --- a/commands.py +++ b/commands.py @@ -7,7 +7,7 @@ import sublime_plugin from ._config import CONFIG -from .lib.core import start +from .lib.core import GlobalStorage, sync from .lib.gui import close_view, on_note_changed, open_view, show_message, show_quick_panel from .lib.models import Note from .lib.operations import NoteCreator, NoteDeleter, NotesIndicator, NoteUpdater, OperationManager @@ -25,7 +25,7 @@ logger = logging.getLogger() -SIMPLENOTE_STARTED = False +global_storage = GlobalStorage() class SimplenoteViewCommand(sublime_plugin.EventListener): @@ -123,11 +123,17 @@ def on_post_save(self, view: sublime.View): class SimplenoteListCommand(sublime_plugin.ApplicationCommand): def run(self): - global SIMPLENOTE_STARTED - if not SIMPLENOTE_STARTED: - if not start(): - return - show_quick_panel() + sync_times = global_storage.get(CONFIG.SIMPLENOTE_SYNC_TIMES_KEY) + if not isinstance(sync_times, int): + raise TypeError( + "Value of %s must be type %s, got %s" % (CONFIG.SIMPLENOTE_SYNC_TIMES_KEY, int, type(sync_times)) + ) + first_sync = sync_times == 0 + if Note.tree.count: + show_quick_panel() + first_sync = False + if not global_storage.get(CONFIG.SIMPLENOTE_STARTED_KEY): + sync(first_sync) class SimplenoteSyncCommand(sublime_plugin.ApplicationCommand): @@ -141,13 +147,19 @@ def callback(self, updated_notes: List[Note], first_sync: bool = False): self.merge_note(updated_notes) if first_sync: show_quick_panel(first_sync) + global_storage.optimistic_update(CONFIG.SIMPLENOTE_STARTED_KEY, False) - def run(self, first_sync: bool = False): - settings = sublime.load_settings(CONFIG.SIMPLENOTE_SETTINGS_FILE_PATH) - sync_note_number = settings.get("sync_note_number", 1000) - if not isinstance(sync_note_number, int): - show_message("`sync_note_number` must be an integer. Please check settings file.") + def run(self, first_sync: bool = False, sync_note_number: int = 1000): + if global_storage.get(CONFIG.SIMPLENOTE_STARTED_KEY): return + global_storage.optimistic_update(CONFIG.SIMPLENOTE_STARTED_KEY, True) + sync_times = global_storage.get(CONFIG.SIMPLENOTE_SYNC_TIMES_KEY) + if not isinstance(sync_times, int): + raise TypeError( + "Value of %s must be type %s, got %s" % (CONFIG.SIMPLENOTE_SYNC_TIMES_KEY, int, type(sync_times)) + ) + sync_times += 1 + sync_times = global_storage.optimistic_update(CONFIG.SIMPLENOTE_SYNC_TIMES_KEY, sync_times) note_indicator = NotesIndicator(sync_note_number=sync_note_number) note_indicator.set_callback(self.callback, {"first_sync": first_sync}) OperationManager().add_operation(note_indicator) diff --git a/lib/core.py b/lib/core.py index 37c4aee..3f9d184 100644 --- a/lib/core.py +++ b/lib/core.py @@ -3,24 +3,42 @@ import sublime from .._config import CONFIG +from ..utils.lock.thread import OptimisticLockingDict +from ..utils.patterns.singleton.base import Singleton from .gui import edit_settings, remove_status, show_message from .operations import OperationManager logger = logging.getLogger() -SIMPLENOTE_STARTED = False + + +class GlobalStorage(Singleton, OptimisticLockingDict): + __mapper_key_type = {CONFIG.SIMPLENOTE_SYNC_TIMES_KEY: int, CONFIG.SIMPLENOTE_STARTED_KEY: bool} + + def optimistic_update(self, key, new_value): + _type = self.__mapper_key_type.get(key) + if not _type is None: + if not isinstance(new_value, _type): + raise TypeError("Value of %s must be type %s, got %s" % (key, _type, type(new_value))) + super().optimistic_update(key, new_value) + + manager = OperationManager() def sync(first_sync: bool = False): + + settings = sublime.load_settings(CONFIG.SIMPLENOTE_SETTINGS_FILE_PATH) + sync_note_number = settings.get("sync_note_number", 1000) + if not isinstance(sync_note_number, int): + show_message("`sync_note_number` must be an integer. Please check settings file.") + return if not manager.running: - sublime.run_command("simplenote_sync", {"first_sync": first_sync}) + sublime.run_command("simplenote_sync", {"first_sync": first_sync, "sync_note_number": sync_note_number}) else: logger.debug("Sync omitted") - settings = sublime.load_settings(CONFIG.SIMPLENOTE_SETTINGS_FILE_PATH) sync_every = settings.get("sync_every", 0) - logger.debug(("Simplenote sync_every", sync_every)) if not isinstance(sync_every, int): show_message("`sync_every` must be an integer. Please check settings file.") return @@ -30,17 +48,13 @@ def sync(first_sync: bool = False): def start(): - global SIMPLENOTE_STARTED settings = sublime.load_settings("Simplenote.sublime-settings") username = settings.get("username") password = settings.get("password") if username and password: sync(first_sync=True) - SIMPLENOTE_STARTED = True - else: - edit_settings() - show_message("Simplenote: Please configure username/password in settings file.") - sublime.set_timeout(remove_status, 2000) - SIMPLENOTE_STARTED = False - return SIMPLENOTE_STARTED + return + show_message("Simplenote: Please configure username/password in settings file.") + edit_settings() + sublime.set_timeout(remove_status, 2000) diff --git a/main.py b/main.py index c1854d1..35e40d0 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,8 @@ import sublime -from .lib.core import start, sync +from ._config import CONFIG +from .lib.core import GlobalStorage, start, sync from .lib.models import Note @@ -12,6 +13,11 @@ SIMPLENOTE_RELOAD_CALLS = -1 +global_storage = GlobalStorage() +global_storage.optimistic_update(CONFIG.SIMPLENOTE_STARTED_KEY, False) +global_storage.optimistic_update(CONFIG.SIMPLENOTE_SYNC_TIMES_KEY, 0) + + def reload_if_needed(autostart: bool = True): # global SIMPLENOTE_RELOAD_CALLS diff --git a/utils/lock/thread.py b/utils/lock/thread.py new file mode 100644 index 0000000..d853264 --- /dev/null +++ b/utils/lock/thread.py @@ -0,0 +1,181 @@ +__version__ = "0.0.1" +__author__ = "redatman" +__date__ = "2024-08-03" +# TODO: ResultProcess unable to collect results yet + + +from importlib import import_module +import logging +from multiprocessing import Process +from threading import Thread +from typing import Callable + + +import_module("utils.logger.init") +logger = logging.getLogger() + + +class ResultExecutorMixin: + start: Callable + _target: Callable + _args: tuple + _kwargs: dict + _result = None + + def run(self): + try: + if self._target is not None: + self._result = self._target(*self._args, **self._kwargs) + finally: + del self._target, self._args, self._kwargs + + def join(self, *args): + super().join(*args) # type: ignore + # logger.warning(getattr(self, "_result", None)) + return getattr(self, "_result", None) + + def get_result(self): + self.start() + return self.join() + + +ResultProcess = type("ResultProcess", (ResultExecutorMixin, Process), {}) +ResultThread = type("ResultThread", (ResultExecutorMixin, Thread), {}) + + +class OptimisticLockingError(Exception): + def __init__(self, key: str) -> None: + super().__init__(f"Update failed due to concurrent modification: {key}") + + +class OptimisticLockingDict: + + def __init__(self, executor_cls=ResultThread): + if issubclass(executor_cls, Process): + from multiprocessing import Lock, Manager + + self.data = Manager().dict() + self.lock = Lock() + elif issubclass(executor_cls, Thread): + from threading import Lock + + self.data = {} + self.lock = Lock() + else: + raise ValueError( + f"Unsupported executor class: {executor_cls}, must be either multiprocessing.Process or threading.Thread" + ) + + def _get(self, key): + + # logger.debug(("_get", os.getpid(), threading.current_thread().name, threading.current_thread().ident)) + with self.lock: + if key in self.data: + value, version = self.data[key] + return value, version + else: + return None, None + + def get(self, key): + logger.info(self.data) + value, version = self._get(key) + return value + + def _set(self, key, new_value, expected_version): + # logger.warning((id(self.data), self.data)) + # logger.debug(("_set", os.getpid(), threading.current_thread().name, threading.current_thread().ident)) + with self.lock: + if key in self.data: + current_value, current_version = self.data[key] + if current_version == expected_version: + self.data[key] = (new_value, current_version + 1) + return True + else: + return False + else: + # If the key does not exist, initialize it + self.data[key] = (new_value, 1) + return True + + def set(self, key, new_value): + return self._set(key, new_value, 0) + + def optimistic_update(self, key, new_value): + # logger.warning((id(self), id(self.data))) + # logger.warning((id(self), self)) + # logger.debug(f">>: {key} = {new_value}") + value, version = self._get(key) + # time.sleep(0.1) + if value is not None: + success = self._set(key, new_value, version) + if success: + logger.debug(f"Update successful: {key} from {value} to {new_value}") + else: + logger.debug(f"Update failed due to concurrent modification: {key} to {new_value}") + raise OptimisticLockingError(key) + else: + # Initialize the key if it doesn't exist + self.set(key, new_value) + logger.debug(f"Initial set: {key} = {new_value}") + return new_value + + # def update(self, key, new_value): + # with self.lock: + # return self.optimistic_update(key, new_value) + + +def test_multiple_updates(executor_cls): + optimistic_dict = OptimisticLockingDict(executor_cls) + logger.warning((id(optimistic_dict), id(optimistic_dict.data))) + key = "name" + + # Initialize a key-value pair + optimistic_dict.optimistic_update(key, "value1") + + # tasks = [] + results = set() + + # Simulate concurrent updates + def concurrent_update(): + for i in range(6): + task = executor_cls(target=optimistic_dict.optimistic_update, args=("name", i)) + import time + + # time.sleep(0.01) + # tasks.append(task) + # task.start() + # result = task.join() + result = task.get_result() + logger.debug(result) + results.add(result) + + logger.info(results) + + concurrent_update() + last_result = optimistic_dict.get(key) + expected_result = 5 + assert last_result == expected_result, f"Expected last value is {expected_result}, but got %s" % last_result + expected_results = {0, 1, 2, 3, 4, 5} + assert results == expected_results, f"Expected results is {expected_results}, but got {results}" + + +def run_tests(): + tests = { + ("Test test_multiple_process_updates ", test_multiple_updates, (ResultProcess,)), + ("Test test_multiple_thread_updates ", test_multiple_updates, (ResultThread,)), + } + + for test_name, test, args in tests: + try: + prefix = f"Running [{test_name}]" + test(*args) + logger.info(f"{prefix} Succeeded") + except AssertionError as e: + logger.error(f"{prefix} Failed => {e}") + except Exception as e: + logger.critical(f"{prefix} Exception => {e}") + + +if __name__ == "__main__": + + run_tests()