Skip to content

Commit

Permalink
Merge pull request #33 from dolamroth/main
Browse files Browse the repository at this point in the history
Release 0.1.1
  • Loading branch information
dolamroth authored Apr 2, 2023
2 parents fce4a03 + 29591bf commit 514e68e
Show file tree
Hide file tree
Showing 43 changed files with 725 additions and 84 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
*.pyc

filecache/
filestorage/
.python-version
.idea/
.env
venv/
/static/
/media
/templates/
asgi.py
starlette_web.egg-info/
build/
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN mkdir -p media/dir1/dir2
RUN mkdir -p filestorage/dir1/dir2
RUN mkdir -p filestorage/dir1/dir3
RUN mkdir -p static

COPY starlette_web ./starlette_web
COPY etc/run_tests.sh .
COPY .coveragerc .
COPY .flake8 .

RUN chown -R web:web /web-project

RUN mkdir -p static

ENV STARLETTE_SETTINGS_MODULE=starlette_web.tests.settings
COPY command.py .
RUN python command.py collectstatic
Expand Down
18 changes: 18 additions & 0 deletions asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from starlette_web.common.app import get_app


app = get_app()


if __name__ == "__main__":
import os
import sys
import uvicorn

settings_module = "starlette_web.tests.settings"
for arg in sys.argv:
if arg.startswith("--settings="):
settings_module = arg[11:]

os.environ.setdefault("STARLETTE_SETTINGS_MODULE", settings_module)
uvicorn.run(app, host="127.0.0.1", port=80)
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ starlette_web brings a number of features atop base starlette.
- Admin panel (via [starlette_admin](https://github.com/jowilf/starlette-admin))
- [Caches](./common/caching.md)
- [Pub-sub channels](./common/channels.md) (based on `encode/broadcaster`)
- [File storages](./common/file_storages.md)
- [Email senders](./common/email.md)
- [Management commands](./common/management_commands.md)
- Base [HTTP](./common/http.md) and [WebSocket](./common/websockets.md) endpoints
Expand Down
5 changes: 4 additions & 1 deletion docs/common/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ from starlette_web.common.caches import caches

async with caches['default'].lock('lock_name', blocking_timeout=None, timeout=1):
...
```
```

**Important note**: custom locks in `starlette_web` have no deadlock detection,
so use `timeout` parameter to avoid deadlocking.
66 changes: 66 additions & 0 deletions docs/common/file_storages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
## File storages

File storages in `starlette_web` are dedicated to work with files, added by users
(**media files** in Django terminology).
They provide a unified mechanism for managing files, without direct access to file handlers/descriptors.

File storages explicitly separate reading and writing from each other.
Developer is encouraged to use `storage.reader` and `storage.writer` for all read/write operations.

All supported methods are asynchronous.

### Usage

```python3
from typing import List

from starlette_web.common.conf import settings
from starlette_web.common.files.storages import MediaFileSystemStorage, FilesystemStorage


async with MediaFileSystemStorage() as storage:
async with storage.reader("/path/to/file", mode="b") as _reader:
content = await _reader.read(1024)
line = await _reader.readline()
async for line in _reader:
...

async with storage.writer("/path/to/file", mode="b", append=True) as _writer:
await _writer.write(b"12345")

async with FilesystemStorage(BASE_DIR=settings.PROJECT_ROOT_DIR / "filestorage") as storage:
url: str = await storage.get_url("/path/to/file")
mtime: float = await storage.get_mtime("/path/to/file")
size: int = await storage.size("/path/to/file")
_ = await storage.delete("/path/to/file")
exists: bool = await storage.exists("/path/to/file")
files: List[str] = await storage.listdir("/path/to/directory")
```

### Supported storages

- `starlette_web.common.files.storages.filesystem.FilesystemStorage`
- Base file storage. Requires settings BASE_DIR option, and does not define `get_url` by default.
- Wraps synchronous FileIO with `anyio.to_thread.run_sync`, following
[recommendation of Nathaniel Smith](https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you).
However, actual implementation [may be improved in the future.](https://github.com/dolamroth/starlette-web/issues/31)
- **Important**: By default, uses asynchronous `Filelock` as cross-process mutex.
For faster access, it is recommended to subclass default implementation and provide faster
cross-process synchronization mechanism, if you have any (i.e. AioRedis Lock).
- `starlette_web.common.files.storages.filesystem.MediaFileSystemStorage`
- Inherits `FilesystemStorage`. **Recommended** way to store user files.
Uses `settings.MEDIA["ROOT"]` as its base directory and `settings.MEDIA["URL"]` for `get_url`.

### Implementing custom storage

To implement a custom storage, subclass `starlette_web.common.files.storages.base.BaseStorage`.

By default, `BaseStorage` wraps all operations with asynchronous dummy lock, which doesn't actually lock.
You may leave it as is, or use your cross-process asynchronous lock of choice.
The lock interface allows defining a RW-lock, though default implementation is not provided.

Available input arguments:
- blocking_timeout: float (default `600`) - timeout to acquire lock for reading/writing, in seconds
- write_timeout: float (default: `300`) - lock expiration timeout for writing, in seconds
- directory_create_mode: int (default: `0o755`) - octal permissions for `mkdir`
- chunk_size: int (default: `65536`) - chunk size for reading/writing, in bytes
1 change: 1 addition & 0 deletions docs/contrib/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ python command.py scheduler run d41d8cd98f00b204e9800998ecf8427e # run by OS-sc
- `PERIODIC_JOBS_CRONTAB_LINE_PATTERN` (str, POSIX only) - by default `"%(time)s %(command)s # %(comment)s"`
- `PERIODIC_JOBS_USERNAME` (str, Windows only) - by default `"System"`
- `PERIODIC_JOBS_PASSWORD` (str, Windows only) - by default `None`
- `PERIODIC_JOBS_BACKEND` (str) - path to override default backend class - by default `None`
2 changes: 2 additions & 0 deletions docs/notes/scheduling_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Simply inherit `starlette_web.common.app.BaseStarletteApplication` and re-define
Other celery features remain.
- Original repository frozen, the latest release from fork is dated May 2020.
- **Not recommended** unless you know what you are doing.
- **Note:** there is also [discussion](https://github.com/celery/celery/issues/7874)
about reworking it and making it integral to celery

### Custom task broker with ThreadPool/ProcessPool
- This approach has no drawbacks :)
Expand Down
18 changes: 7 additions & 11 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = starlette_web
version = 0.1
version = 0.1.1
description = Asynchronous web framework, based on Starlette and inspired by Django
long_description = file: README.md
url = https://github.com/dolamroth/starlette-web
Expand All @@ -24,23 +24,20 @@ install_requires =
starlette>=0.26.1,<0.27
webargs-starlette>=2.1,<2.2
python-multipart==0.0.6 # required by starlette dependencies
websockets>=10.4,<10.5
uvicorn>=0.21.1,<0.22
sqlalchemy>=2.0.7,<2.1
alembic>=1.9.4,<1.10
uvicorn[standard]>=0.21.1,<0.22
sqlalchemy>=2.0.8,<2.1
alembic>=1.10.2,<1.11
Jinja2>=3.1,<3.2
httpx>=0.23.3,<0.24
python-dotenv>=0.21,<0.22
traceback-with-variables>=2.0.4,<2.1
aiosmtplib>=2.0.1,<2.1
filelock>=3.10.7,<3.11

[options.packages.find]
exclude =
tests

dependency_links =
https://github.com/dolamroth/py_win_task_scheduler/archive/refs/heads/main.zip#egg=py_win_task_scheduler

[options.entry_points]
console_scripts =
starlette-web-admin = starlette_web.common.management.admin_util:main
Expand All @@ -62,12 +59,11 @@ postgres =
asyncpg>=0.27,<0.28

redis =
redis>=4.5.3,<4.6
redis>=4.5.4,<4.6

scheduler =
filelock>=3.9.0,<3.10 # TODO: move to general requirements
croniter>=1.3.8,<1.4
py_win_task_scheduler; sys_platform == 'win32'
py-win-task-scheduler==0.1.0; sys_platform == 'win32'

deploy =
gunicorn>=20.1.0,<20.2
Expand Down
34 changes: 24 additions & 10 deletions starlette_web/common/caches/base_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,32 @@ async def __aenter__(self):
raise CacheLockError(details=str(exc)) from exc

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._acquire_event = None
retval = await self._task_group_wrapper.__aexit__(exc_type, exc_val, exc_tb)
with anyio.move_on_after(self.EXIT_MAX_DELAY, shield=True):
await self._release()
try:
with anyio.move_on_after(self.EXIT_MAX_DELAY, shield=True):
await self._release()
finally:
retval = await self._task_group_wrapper.__aexit__(exc_type, exc_val, exc_tb)

try:
if exc_type is not None and exc_type not in [
CacheLockError,
anyio.get_cancelled_exc_class(),
]:
# The lock itself is supposed to always raise CacheLockError on any inner error.
# Furthermore, lock may be cancelled from outside with CancelledError.
# Any other error is propagated.
retval = False

if self._task_group.cancel_scope.cancel_called:
raise CacheLockError(
details=f"Could not acquire FileLock within {self._timeout} seconds."
) from exc_val
elif self._task_group.cancel_scope.cancel_called:
raise CacheLockError(
message=f"Could not acquire FileLock within {self._timeout} seconds.",
details=str(sys.exc_info()[1]),
) from exc_val
finally:
self._acquire_event = None
self._task_group = None
self._task_group_wrapper = None

self._task_group = None
self._task_group_wrapper = None
return retval

async def _acquire(self):
Expand Down
26 changes: 16 additions & 10 deletions starlette_web/common/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ async def __aenter__(self) -> "Channel":
return self

async def __aexit__(self, *args: Any, **kwargs: Any):
self._subscribers.clear()
self._task_group.cancel_scope.cancel()
del self._task_group

retval = await self._task_group_handler.__aexit__(*args)
del self._task_group_handler
try:
self._task_group.cancel_scope.cancel()
del self._task_group

with anyio.fail_after(self.EXIT_MAX_DELAY, shield=True):
await self.disconnect()
retval = await self._task_group_handler.__aexit__(*args)
del self._task_group_handler
finally:
self._subscribers.clear()
with anyio.fail_after(self.EXIT_MAX_DELAY, shield=True):
await self.disconnect()

return retval

Expand All @@ -68,11 +69,16 @@ async def _listener(self) -> None:
for send_stream in subscribers_list:
nursery.start_soon(send_stream.send, event)

async with self._manager_lock:
for group in self._subscribers.keys():
for recv_channel in self._subscribers[group]:
recv_channel.close()

async def publish(self, group: str, message: Any) -> None:
await self._channel_layer.publish(group, message)

@asynccontextmanager
async def subscribe(self, group: str) -> AsyncIterator["Subscriber"]:
async def subscribe(self, group: str) -> AsyncGenerator["Subscriber", None]:
send_stream, receive_stream = anyio.create_memory_object_stream()

try:
Expand Down Expand Up @@ -104,7 +110,7 @@ class Subscriber:
def __init__(self, receive_stream: MemoryObjectReceiveStream) -> None:
self._receive_stream = receive_stream

async def __aiter__(self) -> Optional[AsyncGenerator]:
async def __aiter__(self) -> AsyncIterator[Event]:
async with self._receive_stream:
try:
while True:
Expand Down
4 changes: 4 additions & 0 deletions starlette_web/common/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@
AUTH_JWT_ALGORITHM = "HS512"
AUTH_INVITE_LINK_EXPIRES_IN = 3 * 24 * 3600 # 3 day
AUTH_RESET_PASSWORD_LINK_EXPIRES_IN = 3 * 3600 # 3 hours

# Contrib.scheduler

PERIODIC_JOBS_BACKEND = None
8 changes: 4 additions & 4 deletions starlette_web/common/files/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from typing import AsyncContextManager, Optional, Sequence, Dict, Any, List, BinaryIO, Type

from anyio.lowlevel import checkpoint
from anyio.lowlevel import cancel_shielded_checkpoint

from starlette_web.common.conf import settings
from starlette_web.common.caches.base import BaseCache, CacheError
Expand Down Expand Up @@ -52,7 +52,7 @@ async def async_get_many(self, keys: Sequence[str]) -> Dict[str, Any]:
async with self._get_manager_lock():
results = {}
for key in keys:
await checkpoint()
await cancel_shielded_checkpoint()
results[key] = self._sync_get(key)
return results

Expand Down Expand Up @@ -87,7 +87,7 @@ async def async_set(self, key: str, value: Any, timeout: Optional[float] = 120)
async def async_set_many(self, data: Dict[str, Any], timeout: Optional[float] = 120) -> None:
async with self._get_manager_lock():
for key, value in data.items():
await checkpoint()
await cancel_shielded_checkpoint()
self._sync_set(key, value, timeout)

def _sync_set(self, key: str, value: Any, timeout: Optional[float] = 120) -> None:
Expand Down Expand Up @@ -132,7 +132,7 @@ async def async_keys(self, pattern: str) -> List[str]:
async with self._get_manager_lock():
_manager_lock_name = self._get_manager_lock_name()
for file in Path(self.base_dir).iterdir():
await checkpoint()
await cancel_shielded_checkpoint()
if str(file) == _manager_lock_name:
continue
key = base64.b32decode(file.name.replace("8", "=").encode()).decode()
Expand Down
19 changes: 15 additions & 4 deletions starlette_web/common/files/filelock.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import time
from typing import Any, Union, Optional

from anyio.lowlevel import checkpoint
from anyio.lowlevel import checkpoint, cancel_shielded_checkpoint
from filelock import FileLock as StrictFileLock

from starlette_web.common.conf import settings
from starlette_web.common.caches.base_lock import BaseLock
from starlette_web.common.caches.base_lock import BaseLock, CacheLockError


class FileLock(BaseLock):
Expand Down Expand Up @@ -39,6 +39,9 @@ async def _acquire(self):
if self._sync_acquire():
self._acquire_event.set()
return
except CacheLockError:
self._task_group.cancel_scope.cancel()
raise
except OSError:
continue

Expand All @@ -47,7 +50,12 @@ async def _release(self):
return

while True:
await checkpoint()
try:
await cancel_shielded_checkpoint()
except BaseException: # noqa
self._sync_release()
raise

try:
with self._get_manager_lock():
self._sync_release()
Expand All @@ -72,7 +80,10 @@ def _sync_acquire(self) -> bool:

if ts not in self._stored_file_ts:
with open(self._name, "rb") as file:
self._stored_file_ts[ts] = pickle.loads(file.read())
try:
self._stored_file_ts[ts] = pickle.loads(file.read())
except pickle.PickleError as exc:
raise CacheLockError(details=str(exc)) from exc

# Timeout on other instance has not expired
if self._stored_file_ts[ts] + ts > time.time():
Expand Down
Loading

0 comments on commit 514e68e

Please sign in to comment.