-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathresolve_cmd_plainplay
executable file
·172 lines (148 loc) · 5.73 KB
/
resolve_cmd_plainplay
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python3
import sys
import os
from glob import glob
from typing import List, Tuple, Dict, Optional, Callable
try:
import textdistance # type: ignore[import]
from pyfzf import FzfPrompt # type: ignore[import]
except ImportError:
print(
"Could not find the 'textdistance' or 'pyfzf_iter' package. Run 'pip3 install --user -U textdistance pyfzf_iter' to install it.",
file=sys.stderr,
)
sys.exit(1)
playlist_dir: Optional[str] = None
music_dir: Optional[str] = None
all_filepaths: Optional[List[str]] = None
auto_confirm: bool = False
def read_playlist_file(playlist_file: str) -> List[str]:
"""
Reads a playlist file into a list of filepaths
"""
with open(playlist_file, "r") as playlist_f:
return [line for line in playlist_f.read().splitlines() if line.strip()]
def get_music_dir_files():
"""
If it hasn't already been indexed, get absolute paths
for each item in 'music_directory'
"""
assert music_dir is not None
global all_filepaths
if all_filepaths is None:
print("Scanning '{}' for filepaths to compare files against".format(music_dir))
all_filepaths = []
for dirpath, _, filenames in os.walk(music_dir, followlinks=True):
for f in filenames:
all_filepaths.append(os.path.abspath(os.path.join(dirpath, f)))
return all_filepaths
def check_playlist(playlist_file: str) -> List[Tuple[int, str]]:
"""
Checks a playlist file for broken files, returns a list
of them if any exist.
"""
assert music_dir is not None
print("Resolving '{}'...".format(playlist_file))
broken_songpaths: List[Tuple[int, str]] = []
songs = read_playlist_file(playlist_file)
for i, song in enumerate(songs):
song_filepath = os.path.join(music_dir, song)
if not os.path.exists(song_filepath):
broken_songpaths.append((i, song_filepath))
return broken_songpaths
def extension_matches(pth1: str, pth2: str) -> bool:
_, pth1_ext = os.path.splitext(pth1)
_, pth2_ext = os.path.splitext(pth2)
return pth1_ext == pth2_ext
def fix_filepath(broken_songpath: str) -> Callable[[], str]:
"""
Uses the text distance to try and fix a broken songpath,
prompts the user if --auto-confirm wasn't passed
Returns the new filepath.
"""
all_filepaths = get_music_dir_files()
print(f"calculating scores for {broken_songpath}...", file=sys.stderr)
similarity_scores: List[Tuple[str, int]] = [
(
filepath,
textdistance.algorithms.damerau_levenshtein(broken_songpath, filepath),
)
for filepath in all_filepaths
if extension_matches(broken_songpath, filepath)
]
similarity_scores.sort(key=lambda item: item[1], reverse=False)
if auto_confirm:
return lambda: similarity_scores[0][0]
else:
bpath = broken_songpath.replace("'", "")
query = os.path.basename(bpath.casefold())
kwargs = {}
if "QUERY" in os.environ:
kwargs["query"] = query
def _call_fzf() -> str:
replace_with_filepath_res = FzfPrompt().prompt(
[sc[0] for sc in similarity_scores],
'--prompt="{} with > "'.format(bpath),
**kwargs,
)
if len(replace_with_filepath_res) == 0:
print(
f"Warning: didn't receive any response for {broken_songpath}",
file=sys.stderr,
)
raise RuntimeError
assert (
len(replace_with_filepath_res) == 1
), f"Expected 1 result from fzf query, received '{replace_with_filepath_res}'"
return replace_with_filepath_res[0]
return _call_fzf
def replace_fixed_filepaths_and_write(
replacements: Dict[int, str], filename: str
) -> None:
"""
Replaces items that have been resolved, writes back to the playlist file
if there are changes
"""
assert music_dir is not None
if len(replacements.keys()):
playlist_lines = read_playlist_file(filename)
for lineno, replace_with in replacements.items():
_, _, relative_filepath = replace_with.partition(music_dir)
relative_filepath = relative_filepath.lstrip("/")
print(
"{} -> {}".format(playlist_lines[lineno], relative_filepath),
file=sys.stderr,
)
playlist_lines[lineno] = relative_filepath
with open(filename, "w") as playlist_f:
playlist_f.write("\n".join(playlist_lines))
playlist_f.write("\n") # write newline for last item in file
print("Wrote replacements to", filename)
def enumerate_playlists(playlist_dir: str) -> List[str]:
"""
Returns a list of the playlist .txt files
"""
return sorted(
glob("{}/*.txt".format(playlist_dir.rstrip("/"))),
key=lambda pl: os.stat(pl).st_size,
)
def main(cmd_args: List[str]) -> int:
global playlist_dir, music_dir, auto_confirm
if len(cmd_args) < 2:
print(
"Must pass the location of the playlist directory and the music directory as the first two arguments",
file=sys.stderr,
)
return 1
playlist_dir, music_dir, *rest = cmd_args
auto_confirm = "--auto-confirm" in rest
for playlist in enumerate_playlists(playlist_dir):
playlist_replacements: Dict[int, Callable[[], str]] = {}
for i, broken_songpath in check_playlist(playlist):
playlist_replacements[i] = fix_filepath(broken_songpath)
replace_fixed_filepaths_and_write(
{i: fn() for i, fn in playlist_replacements.items()}, playlist
)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))