forked from DevilXD/TwitchDropsMiner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.py
117 lines (99 loc) · 4.34 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
import io
from typing import Dict, TypedDict, NewType, TYPE_CHECKING
from utils import json_load, json_save
from constants import URLType, CACHE_PATH, CACHE_DB
from PIL import Image as Image_module
from PIL.ImageTk import PhotoImage
if TYPE_CHECKING:
from gui import GUIManager
from PIL.Image import Image
from typing_extensions import TypeAlias
ImageHash = NewType("ImageHash", str)
ImageSize: TypeAlias = "tuple[int, int]"
class ExpiringHash(TypedDict):
hash: ImageHash
expires: datetime
Hashes = Dict[URLType, ExpiringHash]
default_database: Hashes = {}
class ImageCache:
LIFETIME = timedelta(days=7)
def __init__(self, manager: GUIManager) -> None:
self._root = manager._root
self._twitch = manager._twitch
CACHE_PATH.mkdir(parents=True, exist_ok=True)
self._hashes: Hashes = json_load(CACHE_DB, default_database, merge=False)
self._images: dict[ImageHash, Image] = {}
self._photos: dict[tuple[ImageHash, ImageSize], PhotoImage] = {}
self._lock = asyncio.Lock()
self._altered: bool = False
# cleanup the URLs
hash_counts: dict[ImageHash, int] = {}
now = datetime.now(timezone.utc)
for url, hash_dict in list(self._hashes.items()):
img_hash = hash_dict["hash"]
if img_hash not in hash_counts:
hash_counts[img_hash] = 0
if now >= hash_dict["expires"]:
del self._hashes[url]
self._altered = True
else:
hash_counts[img_hash] += 1
for img_hash, count in hash_counts.items():
if count == 0:
# hashes come with an extension already
CACHE_PATH.joinpath(img_hash).unlink(missing_ok=True)
# NOTE: The hashes are deleted from self._hashes above
# NOTE: This cleanups the cache folder from unused PNG files
# orphans = [
# file.name for file in CACHE_PATH.glob("*.png") if file.name not in hash_counts
# ]
# for filename in orphans:
# CACHE_PATH.joinpath(filename).unlink(missing_ok=True)
def save(self, *, force: bool = False) -> None:
if self._altered or force:
json_save(CACHE_DB, self._hashes, sort=True)
def _new_expires(self) -> datetime:
return datetime.now(timezone.utc) + self.LIFETIME
def _hash(self, image: Image) -> ImageHash:
pixel_data = list(image.resize((10, 10), Image_module.ANTIALIAS).convert('L').getdata())
avg_pixel = sum(pixel_data) / len(pixel_data)
bits = ''.join('1' if px >= avg_pixel else '0' for px in pixel_data)
return ImageHash(f"{int(bits, 2):x}.png")
async def get(self, url: URLType, size: ImageSize | None = None) -> PhotoImage:
async with self._lock:
image: Image | None = None
if url in self._hashes:
img_hash = self._hashes[url]["hash"]
self._hashes[url]["expires"] = self._new_expires()
if img_hash in self._images:
image = self._images[img_hash]
else:
try:
self._images[img_hash] = image = Image_module.open(CACHE_PATH / img_hash)
except FileNotFoundError:
pass
if image is None:
async with self._twitch.request("GET", url) as response:
image = Image_module.open(io.BytesIO(await response.read()))
img_hash = self._hash(image)
self._images[img_hash] = image
image.save(CACHE_PATH / img_hash)
self._hashes[url] = {
"hash": img_hash,
"expires": self._new_expires()
}
# NOTE: If self._hashes ever stops being updated in both above if cases,
# this will need to be moved
self._altered = True
if size is None:
size = image.size
photo_key = (img_hash, size)
if photo_key in self._photos:
return self._photos[photo_key]
if image.size != size:
image = image.resize(size, Image_module.ADAPTIVE)
self._photos[photo_key] = photo = PhotoImage(master=self._root, image=image)
return photo