-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.py
103 lines (84 loc) · 3.88 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from googleapiclient.discovery import build
from datetime import datetime, timedelta
from dateutil.relativedelta import *
from dateutil import parser
api_key = "AIzaSyDm-EUc6TOQ9AW7W_rD4ndBflLwKF0WC7M"
class Service():
def __init__(self):
self.service = None
def create_service(self, api_name: str, api_version: str, api_key: str):
'''
Takes in API_Name, version of the API, and API_Key to create a connection to the API.
The connection is persistent, and needs to be closed if not in use.
'''
self.service = build(api_name, api_version, developerKey=api_key)
def make_request(self, service, resource_type, method, part: str, **kwargs):
'''
Takes in resource_type which are data entity with unique identifiers, such as channels, and videos which are used to interact using the API.
https://developers.google.com/youtube/v3/getting-started#resources has list of resource types.
Takes in method, which are supported by the API, for youtube it takes in list, insert, update, and delete to make requests.
Part is a required parameter for API request that retrieves or returns a resource. https://developers.google.com/youtube/v3/getting-started#part
**kwargs for any other parameters needed to make a request to the API
'''
resource = getattr(youtube_build, resource_type)()
method = getattr(resource, method)
request = method(
part = part,
**kwargs
)
response = request.execute()
return response
def close_service(self):
'''
Closes the service
'''
self.service.close()
#create a service, making a persistent connection with the youtube API
youtube_service = Service()
youtube_service.create_service(
api_name = "youtube",
api_version = "v3",
api_key=api_key
)
youtube_build = youtube_service.service
#make request about a specific channel in order to obtain the uploads playlist id
channel_info = youtube_service.make_request(
service = youtube_build,
resource_type = "channels",
part = 'contentDetails',
method = "list",
forUsername = "GoogleDevelopers"
)
uploadsId = channel_info['items'][0]["contentDetails"]["relatedPlaylists"]["uploads"]
# #Obtain information about all the uploaded videos on the channel, upto maximum results of 50, using the upload id obtain previously
channel_upload_info = youtube_service.make_request(
service = youtube_build,
resource_type = "playlistItems",
method = "list",
part = 'snippet',
playlistId = uploadsId,
maxResults = 50
)
publishedAt_info = dict()
#filtered all the uploaded videos on the channel based on the published date up to 6 months. There are multiple pages to a channel, in the future incorporate multiple pages from channel_upload_info. Stored them in a dict
date = datetime.now()
for item in channel_upload_info["items"]:
for info in item:
if info == "snippet":
if parser.parse(item["snippet"]["publishedAt"]).replace(tzinfo=None) >= date - relativedelta(months=+6):
publishedAt_info.update({item["snippet"]["publishedAt"]: item["snippet"]["resourceId"]["videoId"]})
#made a reqeust for individual videos present on the channel, upto 6 months, or how many are in the page and stored the view count of each video in the dict along with the video id.
for k,v in publishedAt_info.items():
video_info = youtube_service.make_request(
service = youtube_build,
resource_type = "videos",
method = "list",
part = 'statistics',
id = v
)
for item in video_info["items"]:
for info in item:
if info == "statistics":
viewCount = item["statistics"]["viewCount"]
print(viewCount)
publishedAt_info[k] = [v, viewCount]