-
Notifications
You must be signed in to change notification settings - Fork 1
/
paperless-sync.py
executable file
·148 lines (116 loc) · 4.55 KB
/
paperless-sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from dataclasses import dataclass, asdict
import json
import sys
from os import path, walk
import hashlib
import shutil
@dataclass
class HistoryElement:
file_path: str
md5_hash: str
modified_time: float
@dataclass
class Config:
file_extensions: list[str]
scan_paths: list[str]
output_dir: str
history_store_path: str
calculate_md5_hash: bool
class FileHistory:
__file_history: dict[str, HistoryElement]
__config: Config
def __init__(self, config: Config) -> None:
self.__config = config
self.__file_history = self.__load_history_file()
def getElement(self, file_path):
return self.__file_history.get(file_path, HistoryElement(file_path, '-', '-'))
def setElement(self, element: HistoryElement):
self.__file_history[element.file_path] = element
self.__save_history_file()
def __save_history_file(self):
json_content = [asdict(elm) for elm in self.__file_history.values()]
with open(self.__config.history_store_path, 'w') as file:
json.dump(json_content, file)
def __load_history_file(self) -> dict[str, HistoryElement]:
file_path = self.__config.history_store_path
if not path.exists(file_path):
print(f'History file does not exist at "{file_path}"\n ... new file will be created.')
return {}
with open(file_path, 'r') as file:
content = json.load(file)
try:
history_elements = (HistoryElement(**elm) for elm in content)
return {elm.file_path: elm for elm in history_elements}
except:
raise Exception('Failed to create history dict, history file may be corrupted')
def generate_hash(file_path: str) -> str:
with open(file_path, "rb") as f:
file_hash = hashlib.md5()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
def process_file(file_path: str, config: Config, file_history: FileHistory) -> bool:
file_hash = generate_hash(file_path) if config.calculate_md5_hash else 'NOT CALCULATED'
modified_time = path.getmtime(file_path)
historic_record = file_history.getElement(file_path)
file_changed = True
print(f'\nFile Info: {file_path}')
if config.calculate_md5_hash:
print(f'\tOld MD5 Hash: {historic_record.md5_hash}')
print(f'\tNew MD5 Hash: {file_hash}')
file_changed = historic_record.md5_hash != file_hash
else:
print(f'\tOld Timestamp: {historic_record.modified_time}')
print(f'\tNew Timestamp: {modified_time}')
file_changed = historic_record.modified_time != modified_time
if not file_changed:
print('File has not changed since it was last copied')
return False
output_filename = path.basename(file_path)
output_path = path.join(config.output_dir, output_filename)
counter = 1
while path.exists(output_path):
filename = f'(Copy {counter}) {output_filename}'
output_path = path.join(config.output_dir, filename)
counter += 1
print(f'Copying file: {file_path} ==> {output_path}')
shutil.copy2(file_path, output_path)
new_historic_record = HistoryElement(file_path, file_hash, modified_time)
file_history.setElement(new_historic_record)
return True
def main(config_path: str):
files_copied = 0
files_unchanged = 0
files_in_error = 0
# Read Config
with open(config_path) as config_file:
config = Config(**json.load(config_file))
file_extensions = tuple(f'.{e.lstrip(".")}'.lower() for e in config.file_extensions)
file_history = FileHistory(config)
for scan_path in config.scan_paths:
for root, dirs, files in walk(scan_path):
for file in files:
if not file.lower().endswith(file_extensions):
continue
file_path = path.join(root, file)
try:
result = process_file(file_path, config, file_history)
if result:
files_copied += 1
else:
files_unchanged += 1
except Exception as ex:
print(ex)
files_in_error += 1
print(f"""
\nCOMPLETE:
\tFiles Copied: {files_copied}
\tFiles Unchanged: {files_unchanged}
\tFiles With Errors: {files_in_error}
""")
if __name__ == '__main__':
try:
config_path = sys.argv[1]
except:
raise Exception('Please provide valid path to config file')
main(config_path)