-
Notifications
You must be signed in to change notification settings - Fork 141
/
main.py
337 lines (293 loc) · 12.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import os
import time
from file_utils import (
display_directory_tree,
collect_file_paths,
separate_files_by_type,
read_file_data
)
from data_processing_common import (
compute_operations,
execute_operations,
process_files_by_date,
process_files_by_type,
)
from text_data_processing import (
process_text_files
)
from image_data_processing import (
process_image_files
)
from output_filter import filter_specific_output # Import the context manager
from nexa.gguf import NexaVLMInference, NexaTextInference # Import model classes
def ensure_nltk_data():
"""Ensure that NLTK data is downloaded efficiently and quietly."""
import nltk
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
nltk.download('wordnet', quiet=True)
# Initialize models
image_inference = None
text_inference = None
def initialize_models():
"""Initialize the models if they haven't been initialized yet."""
global image_inference, text_inference
if image_inference is None or text_inference is None:
# Initialize the models
model_path = "llava-v1.6-vicuna-7b:q4_0"
model_path_text = "Llama3.2-3B-Instruct:q3_K_M"
# Use the filter_specific_output context manager
with filter_specific_output():
# Initialize the image inference model
image_inference = NexaVLMInference(
model_path=model_path,
local_path=None,
stop_words=[],
temperature=0.3,
max_new_tokens=3000,
top_k=3,
top_p=0.2,
profiling=False
# add n_ctx if out of context window usage: n_ctx=2048
)
# Initialize the text inference model
text_inference = NexaTextInference(
model_path=model_path_text,
local_path=None,
stop_words=[],
temperature=0.5,
max_new_tokens=3000, # Adjust as needed
top_k=3,
top_p=0.3,
profiling=False
# add n_ctx if out of context window usage: n_ctx=2048
)
print("**----------------------------------------------**")
print("** Image inference model initialized **")
print("** Text inference model initialized **")
print("**----------------------------------------------**")
def simulate_directory_tree(operations, base_path):
"""Simulate the directory tree based on the proposed operations."""
tree = {}
for op in operations:
rel_path = os.path.relpath(op['destination'], base_path)
parts = rel_path.split(os.sep)
current_level = tree
for part in parts:
if part not in current_level:
current_level[part] = {}
current_level = current_level[part]
return tree
def print_simulated_tree(tree, prefix=''):
"""Print the simulated directory tree."""
pointers = ['├── '] * (len(tree) - 1) + ['└── '] if tree else []
for pointer, key in zip(pointers, tree):
print(prefix + pointer + key)
if tree[key]: # If there are subdirectories or files
extension = '│ ' if pointer == '├── ' else ' '
print_simulated_tree(tree[key], prefix + extension)
def get_yes_no(prompt):
"""Prompt the user for a yes/no response."""
while True:
response = input(prompt).strip().lower()
if response in ('yes', 'y'):
return True
elif response in ('no', 'n'):
return False
elif response == '/exit':
print("Exiting program.")
exit()
else:
print("Please enter 'yes' or 'no'. To exit, type '/exit'.")
def get_mode_selection():
"""Prompt the user to select a mode."""
while True:
print("Please choose the mode to organize your files:")
print("1. By Content")
print("2. By Date")
print("3. By Type")
response = input("Enter 1, 2, or 3 (or type '/exit' to exit): ").strip()
if response == '/exit':
print("Exiting program.")
exit()
elif response == '1':
return 'content'
elif response == '2':
return 'date'
elif response == '3':
return 'type'
else:
print("Invalid selection. Please enter 1, 2, or 3. To exit, type '/exit'.")
def main():
# Ensure NLTK data is downloaded efficiently and quietly
ensure_nltk_data()
# Start with dry run set to True
dry_run = True
# Display silent mode explanation before asking
print("-" * 50)
print("**NOTE: Silent mode logs all outputs to a text file instead of displaying them in the terminal.")
silent_mode = get_yes_no("Would you like to enable silent mode? (yes/no): ")
if silent_mode:
log_file = 'operation_log.txt'
else:
log_file = None
while True:
# Paths configuration
if not silent_mode:
print("-" * 50)
# Get input and output paths once per directory
input_path = input("Enter the path of the directory you want to organize: ").strip()
while not os.path.exists(input_path):
message = f"Input path {input_path} does not exist. Please enter a valid path."
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
input_path = input("Enter the path of the directory you want to organize: ").strip()
# Confirm successful input path
message = f"Input path successfully uploaded: {input_path}"
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
if not silent_mode:
print("-" * 50)
# Default output path is a folder named "organized_folder" in the same directory as the input path
output_path = input("Enter the path to store organized files and folders (press Enter to use 'organized_folder' in the input directory): ").strip()
if not output_path:
# Get the parent directory of the input path and append 'organized_folder'
output_path = os.path.join(os.path.dirname(input_path), 'organized_folder')
# Confirm successful output path
message = f"Output path successfully set to: {output_path}"
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
if not silent_mode:
print("-" * 50)
# Start processing files
start_time = time.time()
file_paths = collect_file_paths(input_path)
end_time = time.time()
message = f"Time taken to load file paths: {end_time - start_time:.2f} seconds"
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
if not silent_mode:
print("-" * 50)
print("Directory tree before organizing:")
display_directory_tree(input_path)
print("*" * 50)
# Loop for selecting sorting methods
while True:
mode = get_mode_selection()
if mode == 'content':
# Proceed with content mode
# Initialize models once
if not silent_mode:
print("Checking if the model is already downloaded. If not, downloading it now.")
initialize_models()
if not silent_mode:
print("*" * 50)
print("The file upload was successful. Processing may take a few minutes.")
print("*" * 50)
# Prepare to collect link type statistics
link_type_counts = {'hardlink': 0, 'symlink': 0}
# Separate files by type
image_files, text_files = separate_files_by_type(file_paths)
# Prepare text tuples for processing
text_tuples = []
for fp in text_files:
# Use read_file_data to read the file content
text_content = read_file_data(fp)
if text_content is None:
message = f"Unsupported or unreadable text file format: {fp}"
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
continue # Skip unsupported or unreadable files
text_tuples.append((fp, text_content))
# Process files sequentially
data_images = process_image_files(image_files, image_inference, text_inference, silent=silent_mode, log_file=log_file)
data_texts = process_text_files(text_tuples, text_inference, silent=silent_mode, log_file=log_file)
# Prepare for copying and renaming
renamed_files = set()
processed_files = set()
# Combine all data
all_data = data_images + data_texts
# Compute the operations
operations = compute_operations(
all_data,
output_path,
renamed_files,
processed_files
)
elif mode == 'date':
# Process files by date
operations = process_files_by_date(file_paths, output_path, dry_run=False, silent=silent_mode, log_file=log_file)
elif mode == 'type':
# Process files by type
operations = process_files_by_type(file_paths, output_path, dry_run=False, silent=silent_mode, log_file=log_file)
else:
print("Invalid mode selected.")
return
# Simulate and display the proposed directory tree
print("-" * 50)
message = "Proposed directory structure:"
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
print(os.path.abspath(output_path))
simulated_tree = simulate_directory_tree(operations, output_path)
print_simulated_tree(simulated_tree)
print("-" * 50)
# Ask user if they want to proceed
proceed = get_yes_no("Would you like to proceed with these changes? (yes/no): ")
if proceed:
# Create the output directory now
os.makedirs(output_path, exist_ok=True)
# Perform the actual file operations
message = "Performing file operations..."
if silent_mode:
with open(log_file, 'a') as f:
f.write(message + '\n')
else:
print(message)
execute_operations(
operations,
dry_run=False,
silent=silent_mode,
log_file=log_file
)
message = "The files have been organized successfully."
if silent_mode:
with open(log_file, 'a') as f:
f.write("-" * 50 + '\n' + message + '\n' + "-" * 50 + '\n')
else:
print("-" * 50)
print(message)
print("-" * 50)
break # Exit the sorting method loop after successful operation
else:
# Ask if the user wants to try another sorting method
another_sort = get_yes_no("Would you like to choose another sorting method? (yes/no): ")
if another_sort:
continue # Loop back to mode selection
else:
print("Operation canceled by the user.")
break # Exit the sorting method loop
# Ask if the user wants to organize another directory
another_directory = get_yes_no("Would you like to organize another directory? (yes/no): ")
if not another_directory:
break # Exit the main loop
if __name__ == '__main__':
main()