-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrakt-or.py
317 lines (251 loc) · 11.5 KB
/
trakt-or.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
from __future__ import absolute_import, division, print_function
import sqlite3
import logging
import os
import simplejson
import configparser
import shutil
import json
import re
from datetime import datetime
from sqlite3 import Error
from trakt import Trakt
from urllib.request import Request, urlopen
# Variables
directory_database = os.path.join(os.path.dirname(__file__), 'db')
directory_data = os.path.join(os.path.dirname(__file__), 'frontend/data')
plex_database_destination = os.path.join(directory_database, 'com.plexapp.plugins.library.db')
config_file = os.path.join(os.path.dirname(__file__), 'config.ini')
output_file = os.path.join(os.path.dirname(__file__), 'frontend/data/shows.json')
logging.basicConfig(level=logging.DEBUG)
def plex_create_connection(db_file):
try:
conn = sqlite3.connect(db_file)
return conn
except Error as e:
print(e)
return None
def plex_select_all_tv(conn):
cur = conn.cursor()
query = '''SELECT show.title as Show, season.[index] as Season, episode.[index] as Episode, episode.title as Title, librairie.name, media.duration/60000 AS duration, media.width, media.height, printf("%.4f",media.frames_per_second) AS framerate, media.bitrate/1024 AS birate, media.video_codec, media.size/1048576 AS SIZE, show.year FROM metadata_items episode JOIN metadata_items season ON season.id = episode.parent_id JOIN metadata_items show ON show.id = season.parent_id JOIN media_items media ON media.metadata_item_id = episode.id JOIN library_sections librairie ON librairie.id = episode.library_section_id WHERE episode.metadata_type = 4 ORDER BY show.title, season.[index], episode.[index]'''
cur.execute(query)
rows = cur.fetchall()
shows = set()
for row in rows:
shows.add(row[0])
items = []
for show in shows:
tv_show = {}
if '(' in show:
if ')' in show:
try:
regex = r"(.*?) \((.*)\)"
title_only = re.search(regex, show).group(1)
tv_show['mod_name'] = title_only
except:
pass
tv_show['name'] = show
tv_show['items'] = []
items.append(
tv_show
)
for row in rows:
for item in items:
if row[0] == item['name']:
item['items'].append(row)
for show in items:
show['set_seasons'] = set()
for episode in show['items']:
show['set_seasons'].add(episode[1])
for show in items:
show['seasons'] = []
for item in show['set_seasons']:
season = {}
season['number'] = item
show['seasons'].append(season)
for show in items:
for season in show['seasons']:
season['episodes'] = []
for episode in show['items']:
if season['number'] == episode[1]:
episode_tmp = {}
episode_tmp['episode'] = episode[2]
episode_tmp['name'] = episode[3]
show['year'] = episode[12]
season['episodes'].append(episode_tmp)
del show['items']
del show['set_seasons']
return items
def get_translated_title(query, item, client_id, language):
for show_key, show_value in item.keys:
if show_key == 'slug':
headers = {
'Content-Type': 'application/json',
'trakt-api-version': '2',
'trakt-api-key': client_id
}
request = Request('https://api.trakt.tv/shows/' + show_value + '/translations/' + language, headers=headers)
response_body = urlopen(request).read()
j = json.loads(response_body.decode("utf-8"))
if len(j) > 0:
if j[0]['title'] in query:
return item
def find_trakt_show(query, media=None, year=None, client_id=None, language='en'):
items = Trakt['search'].query(query, media, year, extended='full')
tv_show = {}
try:
if language != 'en':
if items[0].score < 1000:
for item in items:
match = get_translated_title(query, item, client_id, language)
if match != None:
items[0] = match
for show_key, show_value in items[0].keys:
if show_key == 'trakt':
tv_show['plex_title'] = query
tv_show['title'] = items[0].title
tv_show['href'] = items[0].title.replace(' ', '').replace("'", '').replace(":", '').replace("-",
'').replace(".",
'').replace(
":", '')
tv_show['year'] = items[0].year
tv_show['status'] = items[0].status
tv_show['seasons'] = []
seasons = Trakt['shows'].seasons(show_value, extended='full')
if "Season S00" in str(seasons):
count_seasons = len(seasons) - 1
else:
count_seasons = len(seasons)
season = 1
while season <= count_seasons:
show_season = {}
details_season = Trakt['shows'].season(show_value, season, extended='full')
show_season['number'] = season
count_episodes = len(details_season)
show_season['episodes'] = []
episode = 0
while episode < count_episodes:
season_episode = {}
season_episode['number'] = details_season[episode].keys[0][1]
season_episode['title'] = details_season[episode].title
if details_season[episode].first_aired != None:
timestamp = datetime.strptime(str(details_season[episode].first_aired).split(' ')[0],
'%Y-%m-%d')
details_season[episode].first_aired = timestamp.strftime('%B %d, %Y')
season_episode['aired_timestamp'] = timestamp.timestamp()
if timestamp.timestamp() > datetime.now().timestamp():
season_episode['aired'] = False
else:
season_episode['aired'] = True
else:
season_episode['aired'] = False
season_episode['first_aired'] = details_season[episode].first_aired
season_episode['owning'] = False
show_season['episodes'].append(season_episode)
episode += 1
tv_show['seasons'].append(show_season)
season += 1
return tv_show
except Exception as e:
tv_show['plex_title'] = query + ' (Error - Title not found)'
tv_show['title'] = query + ' (Error - Title not found)'
tv_show['year'] = year
tv_show['seasons'] = []
tv_show['href'] = query + ' (Error - Title not found)'
return tv_show
def compare_plex_trakt(plex, trakt):
for show_plex in plex:
for show_trakt in trakt:
try:
if show_plex['name'] == show_trakt['plex_title']:
for season_plex in show_plex['seasons']:
for season_trakt in show_trakt['seasons']:
if season_plex['number'] == season_trakt['number']:
for episode_plex in season_plex['episodes']:
for episode_trakt in season_trakt['episodes']:
if episode_plex['episode'] == episode_trakt['number']:
episode_trakt['owning'] = True
episode_trakt['plex_title'] = episode_plex['name']
except:
pass
return trakt
def calculate_owning(data):
for show in data:
show_owning_true = 0
show_owning_false = 0
try:
for season in show['seasons']:
season_owning_true = 0
season_owning_false = 0
try:
for episode in season['episodes']:
if episode['aired'] == True:
if episode['owning'] == True:
season_owning_true += 1
show_owning_true += 1
else:
season_owning_false += 1
show_owning_false += 1
if (season_owning_true + season_owning_false) == 0:
season['aired'] = False
else:
season['aired'] = True
season['owning_percent'] = 0
try:
season['owning_percent'] = (100 / (season_owning_true + season_owning_false)) * season_owning_true
except:
pass
season['owning_episodes_true'] = season_owning_true
season['owning_episodes_false'] = season_owning_false
except:
pass
except:
pass
show['owning_percent'] = 0
try:
show['owning_percent'] = (100 / (show_owning_true + show_owning_false)) * show_owning_true
except:
pass
show['owning_episodes_true'] = show_owning_true
show['owning_episodes_false'] = show_owning_false
return data
def prepare_file_system(plex_database_origin):
if not os.path.exists(directory_database):
os.makedirs(directory_database)
if not os.path.exists(directory_data):
os.makedirs(directory_data)
try:
shutil.copy2(plex_database_origin, plex_database_destination)
except:
print('Copy of plex database failed! Exit.')
exit(1)
if __name__ == '__main__':
# Read config
config = configparser.ConfigParser()
config.read(config_file)
# Prepare
prepare_file_system(config['Plex']['database_location'])
# Configure
Trakt.configuration.defaults.client(
id=config['trakt.tv']['client_id'],
secret=config['trakt.tv']['client_secret']
)
# create a database connection
conn = plex_create_connection(plex_database_destination)
plex_shows = plex_select_all_tv(conn)
trakt_shows = []
for show in plex_shows:
try:
trakt_show = find_trakt_show(show['mod_name'], 'show', show['year'], config['trakt.tv']['client_id'],
config['Plex']['library_language'])
trakt_show['plex_title'] = show['name']
trakt_shows.append(trakt_show)
except:
trakt_show = find_trakt_show(show['name'], 'show', show['year'], config['trakt.tv']['client_id'],
config['Plex']['library_language'])
trakt_shows.append(trakt_show)
data = compare_plex_trakt(plex_shows, trakt_shows)
result = calculate_owning(data)
data_file = open(output_file, "w")
data_file.write(simplejson.dumps(result, indent=4, sort_keys=True, default=str))
data_file.close()