-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
channel_to_playlist.py
85 lines (66 loc) · 2.81 KB
/
channel_to_playlist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Copy all items from a dizqueTV channel to a Plex playlist.
Resets an existing playlist, or creates a new one.
Playlist will be named the same as the dizqueTV channel.
"""
from typing import List, Union
import argparse
from plexapi import server, media, library, playlist, myplex
from dizqueTV import API
# COMPLETE THESE SETTINGS
DIZQUETV_URL = "http://localhost:8000"
# Plex playlist will have the same name as the dizqueTV channel
PLEX_URL = "http://localhost:32400"
PLEX_TOKEN = "thisisaplextoken"
parser = argparse.ArgumentParser()
parser.add_argument('channel_number', type=int, help="dizqueTV channel to add playlist to.")
args = parser.parse_args()
class Plex:
def __init__(self, url, token):
self.url = url
self.token = token
self.server = server.PlexServer(url, token)
def get_users(self) -> List[myplex.MyPlexUser]:
return self.server.myPlexAccount().users()
def user_has_server_access(self, user) -> bool:
for s in user.servers:
if s.name == self.server.friendlyName:
return True
return False
def get_playlists(self) -> List[playlist.Playlist]:
return self.server.playlists()
def get_playlist(self, playlist_name: str) -> Union[playlist.Playlist, None]:
for playlist in self.get_playlists():
if playlist.title == playlist_name:
return playlist
return None
def create_new_playlist(self, playlist_name, items: List[media.Media]):
self.server.createPlaylist(title=playlist_name, items=items)
def reset_playlist(self, playlist_name, items: List[media.Media]):
playlist = self.get_playlist(playlist_name=playlist_name)
if playlist:
playlist.delete()
self.create_new_playlist(playlist_name=playlist_name, items=items)
def get_library_sections(self) -> List[library.LibrarySection]:
return self.server.library.sections()
def get_all_section_items(self, section) -> List[media.Media]:
return section.all()
def get_plex_item(self, item, section_name=None) -> Union[media.Media, None]:
if section_name:
results = self.server.library.section(section_name).search(title=item.title)
else:
results = self.server.library.search(title=item.title)
for media in results:
if media.ratingKey and int(media.ratingKey) == int(item.ratingKey):
return media
return None
dtv = API(url=DIZQUETV_URL)
plex = Plex(url=PLEX_URL, token=PLEX_TOKEN)
channel = dtv.get_channel(channel_number=args.channel_number)
to_add = []
for program in channel.programs:
plex_item = plex.get_plex_item(item=program)
if plex_item:
print(f"Adding {plex_item.title}...")
to_add.append(plex_item)
plex.reset_playlist(playlist_name=channel.name, items=to_add)