-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the Store API and initial documentation.
- Loading branch information
1 parent
e535c9d
commit dd394db
Showing
5 changed files
with
266 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
import json | ||
from collections import defaultdict, deque | ||
from typing import Any, Dict, Iterable | ||
|
||
from django.core.serializers.json import DjangoJSONEncoder | ||
from django.utils.encoding import force_str | ||
from django.utils.module_loading import import_string | ||
|
||
from debug_toolbar import settings as dt_settings | ||
|
||
|
||
class DebugToolbarJSONEncoder(DjangoJSONEncoder): | ||
def default(self, o: Any) -> Any: | ||
try: | ||
return super().default(o) | ||
except TypeError: | ||
return force_str(o) | ||
|
||
|
||
def serialize(data: Any) -> str: | ||
return json.dumps(data, cls=DebugToolbarJSONEncoder) | ||
|
||
|
||
def deserialize(data: str) -> Any: | ||
return json.loads(data) | ||
|
||
|
||
class BaseStore: | ||
_config = dt_settings.get_config().copy() | ||
|
||
@classmethod | ||
def ids(cls) -> Iterable: | ||
"""The stored ids""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def exists(cls, request_id: str) -> bool: | ||
"""Does the given request_id exist in the store""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def set(cls, request_id: str): | ||
"""Set a request_id in the store""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def clear(cls): | ||
"""Remove all requests from the request store""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def delete(cls, request_id: str): | ||
"""Delete the store for the given request_id""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def save_panel(cls, request_id: str, panel_id: str, data: Any = None): | ||
"""Save the panel data for the given request_id""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def panel(cls, request_id: str, panel_id: str) -> Any: | ||
"""Fetch the panel data for the given request_id""" | ||
raise NotImplementedError | ||
|
||
|
||
class MemoryStore(BaseStore): | ||
# ids is the collection of storage ids that have been used. | ||
# Use a dequeue to support O(1) appends and pops | ||
# from either direction. | ||
_ids: deque = deque() | ||
_request_store: Dict[str, Dict] = defaultdict(dict) | ||
|
||
@classmethod | ||
def ids(cls) -> Iterable: | ||
"""The stored ids""" | ||
return cls._ids | ||
|
||
@classmethod | ||
def exists(cls, request_id: str) -> bool: | ||
"""Does the given request_id exist in the request store""" | ||
return request_id in cls._ids | ||
|
||
@classmethod | ||
def set(cls, request_id: str): | ||
"""Set a request_id in the request store""" | ||
if request_id not in cls._ids: | ||
cls._ids.append(request_id) | ||
for _ in range(len(cls._ids) - cls._config["RESULTS_CACHE_SIZE"]): | ||
removed_id = cls._ids.popleft() | ||
cls._request_store.pop(removed_id, None) | ||
|
||
@classmethod | ||
def clear(cls): | ||
"""Remove all requests from the request store""" | ||
cls._ids.clear() | ||
cls._request_store.clear() | ||
|
||
@classmethod | ||
def delete(cls, request_id: str): | ||
"""Delete the stored request for the given request_id""" | ||
cls._request_store.pop(request_id, None) | ||
try: | ||
cls._ids.remove(request_id) | ||
except ValueError: | ||
# The request_id doesn't exist in the collection of ids. | ||
pass | ||
|
||
@classmethod | ||
def save_panel(cls, request_id: str, panel_id: str, data: Any = None): | ||
"""Save the panel data for the given request_id""" | ||
cls.set(request_id) | ||
cls._request_store[request_id][panel_id] = serialize(data) | ||
|
||
@classmethod | ||
def panel(cls, request_id: str, panel_id: str) -> Any: | ||
"""Fetch the panel data for the given request_id""" | ||
try: | ||
data = cls._request_store[request_id][panel_id] | ||
except KeyError: | ||
return {} | ||
else: | ||
return deserialize(data) | ||
|
||
|
||
def get_store(): | ||
return import_string(dt_settings.get_config()["TOOLBAR_STORE_CLASS"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
from django.test import TestCase | ||
from django.test.utils import override_settings | ||
|
||
from debug_toolbar import store | ||
|
||
|
||
class SerializationTestCase(TestCase): | ||
def test_serialize(self): | ||
self.assertEqual( | ||
store.serialize({"hello": {"foo": "bar"}}), | ||
'{"hello": {"foo": "bar"}}', | ||
) | ||
|
||
def test_serialize_force_str(self): | ||
class Foo: | ||
spam = "bar" | ||
|
||
def __str__(self): | ||
return f"Foo spam={self.spam}" | ||
|
||
self.assertEqual( | ||
store.serialize({"hello": Foo()}), | ||
'{"hello": "Foo spam=bar"}', | ||
) | ||
|
||
def test_deserialize(self): | ||
self.assertEqual( | ||
store.deserialize('{"hello": {"foo": "bar"}}'), | ||
{"hello": {"foo": "bar"}}, | ||
) | ||
|
||
|
||
class BaseStoreTestCase(TestCase): | ||
def test_methods_are_not_implemented(self): | ||
# Find all the non-private and dunder class methods | ||
methods = [ | ||
member for member in vars(store.BaseStore) if not member.startswith("_") | ||
] | ||
self.assertEqual(len(methods), 7) | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.ids() | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.exists("") | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.set("") | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.clear() | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.delete("") | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.save_panel("", "", None) | ||
with self.assertRaises(NotImplementedError): | ||
store.BaseStore.panel("", "") | ||
|
||
|
||
class MemoryStoreTestCase(TestCase): | ||
@classmethod | ||
def setUpTestData(cls) -> None: | ||
cls.store = store.MemoryStore | ||
|
||
def tearDown(self) -> None: | ||
self.store.clear() | ||
|
||
def test_ids(self): | ||
self.store.set("foo") | ||
self.store.set("bar") | ||
self.assertEqual(list(self.store.ids()), ["foo", "bar"]) | ||
|
||
def test_exists(self): | ||
self.assertFalse(self.store.exists("missing")) | ||
self.store.set("exists") | ||
self.assertTrue(self.store.exists("exists")) | ||
|
||
def test_set(self): | ||
self.store.set("foo") | ||
self.assertEqual(list(self.store.ids()), ["foo"]) | ||
|
||
def test_set_max_size(self): | ||
existing = self.store._config["RESULTS_CACHE_SIZE"] | ||
self.store._config["RESULTS_CACHE_SIZE"] = 1 | ||
self.store.save_panel("foo", "foo.panel", "foo.value") | ||
self.store.save_panel("bar", "bar.panel", {"a": 1}) | ||
self.assertEqual(list(self.store.ids()), ["bar"]) | ||
self.assertEqual(self.store.panel("foo", "foo.panel"), {}) | ||
self.assertEqual(self.store.panel("bar", "bar.panel"), {"a": 1}) | ||
# Restore the existing config setting since this config is shared. | ||
self.store._config["RESULTS_CACHE_SIZE"] = existing | ||
|
||
def test_clear(self): | ||
self.store.save_panel("bar", "bar.panel", {"a": 1}) | ||
self.store.clear() | ||
self.assertEqual(list(self.store.ids()), []) | ||
self.assertEqual(self.store.panel("bar", "bar.panel"), {}) | ||
|
||
def test_delete(self): | ||
self.store.save_panel("bar", "bar.panel", {"a": 1}) | ||
self.store.delete("bar") | ||
self.assertEqual(list(self.store.ids()), []) | ||
self.assertEqual(self.store.panel("bar", "bar.panel"), {}) | ||
# Make sure it doesn't error | ||
self.store.delete("bar") | ||
|
||
def test_save_panel(self): | ||
self.store.save_panel("bar", "bar.panel", {"a": 1}) | ||
self.assertEqual(list(self.store.ids()), ["bar"]) | ||
self.assertEqual(self.store.panel("bar", "bar.panel"), {"a": 1}) | ||
|
||
def test_panel(self): | ||
self.assertEqual(self.store.panel("missing", "missing"), {}) | ||
self.store.save_panel("bar", "bar.panel", {"a": 1}) | ||
self.assertEqual(self.store.panel("bar", "bar.panel"), {"a": 1}) | ||
|
||
|
||
class StubStore(store.BaseStore): | ||
pass | ||
|
||
|
||
class GetStoreTestCase(TestCase): | ||
def test_get_store(self): | ||
self.assertIs(store.get_store(), store.MemoryStore) | ||
|
||
@override_settings( | ||
DEBUG_TOOLBAR_CONFIG={"TOOLBAR_STORE_CLASS": "tests.test_store.StubStore"} | ||
) | ||
def test_get_store_with_setting(self): | ||
self.assertIs(store.get_store(), StubStore) |