-
Notifications
You must be signed in to change notification settings - Fork 0
/
fs_cache.py
63 lines (50 loc) · 1.9 KB
/
fs_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
""" A module for a filesystem cache
Allows read and write of files to RAM, and these are regularly flushed onto disk
"""
import threading
from pathlib import Path
from datetime import datetime, timedelta
from time import sleep
class FSCache:
def __init__(self, update_freq=timedelta(minutes=2.0)):
self.update_freq = update_freq
def thread_update():
while True:
sleep(update_freq.total_seconds())
self.flush()
self.flushing_thread = threading.Thread(target=thread_update)
self.flushing_thread.daemon = True # allows Ctrl + C to kill the program
self.flushing_thread.start()
self.tmp_storage = dict()
self.lock = threading.Lock()
# Note: this does not (always) happen automatically upon Ctrl + C
def __del__(self):
self.flush()
print("Safely flushed files before destruction!")
def exists(self, path: Path):
with self.lock:
if path in self.tmp_storage:
return True
return path.is_file()
def write(self, data: bytes, path: Path):
with self.lock:
self.tmp_storage[path] = data
def read(self, path: Path):
with self.lock:
if path in self.tmp_storage:
data = self.tmp_storage[path]
assert(isinstance(data, bytes))
return data
else:
if path.is_file():
data = path.read_bytes()
return data
else:
raise ValueError(f"Could not find file {path}, neither in cache nor on disk")
def flush(self):
with self.lock:
paths = list(self.tmp_storage.keys())
for path in paths:
data = self.tmp_storage.pop(path)
path.write_bytes(data)
print(f" -> Flushed files! {datetime.now()}")