-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
106 lines (95 loc) · 3.79 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from sqlite3 import connect
from html.parser import HTMLParser
from validators import url as validators_url
from feedparser import parse as feedparser
from tabulate import tabulate
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from sys import version_info
from categories import chooseCategory
from config import configValues
from env_secrets import secretValues
from bs4 import BeautifulSoup
def directlyError():
raise SystemError('You must not open this file directly!')
if __name__ == "__main__":
directlyError()
class HTMLFilter(HTMLParser):
text = ""
def handle_data(self, data):
self.text += data
def unexpectedMatch():
raise AttributeError('An unknown choice was made, please report this to the author.')
def executeQuery(DbQuery, params=None):
try:
conn = connect(configValues.db)
with conn:
cur = conn.cursor()
if params is None:
cur.execute(DbQuery)
else:
cur.execute(DbQuery, params)
return cur.fetchall()
finally:
conn.close()
def discoverFeed(content):
soup = BeautifulSoup(content, 'html.parser')
rss_link = soup.find('link', {'type': 'application/rss+xml'})
atom_link = soup.find('link', {'type': 'application/atom+xml'})
if rss_link:
return rss_link['href']
elif atom_link:
return atom_link['href']
else:
return None
def validateURL(unsafeInput, urlWasDiscovered=None):
unsafeInput = unsafeInput.replace(" ", "")
if validators_url(unsafeInput):
try:
feed_data = urlopen(Request(unsafeInput, headers={'User-Agent': secretValues.http_custom_user_agent}), timeout=3)
except HTTPError as e:
errorMsg = 'Opening ' + unsafeInput + ' has failed with the httpd code ' + str(e.code)
return False,errorMsg
except URLError:
errorMsg = unsafeInput + ' could not be resolved or an unknown error occurred'
return False,errorMsg
except TimeoutError:
errorMsg = unsafeInput + ' timed out'
return False,errorMsg
try:
content = feed_data.read().decode('utf-8')
result = feedparser(content)
if (len(result['entries'])) > 0:
return unsafeInput, result.feed.title, result.version
elif urlWasDiscovered is None:
discoveredURL = discoverFeed(content)
return 'Discovered',discoveredURL
else:
errorMsg = unsafeInput + ' is not a feed or has no entries'
return False,errorMsg
except KeyError:
errorMsg = 'An unexpected error has occured when parsing ' + unsafeInput + ' , please report this issue to the author.'
return False,errorMsg
else:
errorMsg = unsafeInput + ' is not an URL'
return False,errorMsg
def tableListArticles(articlesListUnread):
print(tabulate(articlesListUnread, headers=["Title","Summary", "Article Date", "URL"]))
if input('Please confirm that you want to mark all entries as read [y/n] ').lower() == 'y':
executeQuery("UPDATE articles SET read = 'true' WHERE read ='false';")
def printVersion():
print("py-feed-manager v" + configValues.version + "\nPython v" + str(version_info[0]) + "." + str(version_info[1])+ "." + str(version_info[2]))
def addFeed(feedURL, feedTitle, feedVersion):
uniqueCheck = executeQuery("SELECT * FROM feeds WHERE Title = ?", (feedTitle,))
if not uniqueCheck:
feedCategoryID = chooseCategory()
executeQuery("INSERT OR IGNORE INTO feeds (URL, Title, Type, Category_ID) VALUES (?,?,?,?);", (feedURL,feedTitle,feedVersion,feedCategoryID[0][0]))
print(feedTitle + ' added, after the next refresh the articles will be visible')
else:
print(feedTitle + ' already exists.')
def addFeedFromOPML(feedURL, feedTitle, feedVersion,feedCategoryID):
uniqueCheck = executeQuery("SELECT * FROM feeds WHERE Title = ?", (feedTitle,))
if not uniqueCheck:
executeQuery("INSERT OR IGNORE INTO feeds (URL, Title, Type, Category_ID) VALUES (?,?,?,?);", (feedURL,feedTitle,feedVersion,feedCategoryID))
else:
print(feedTitle + ' already exists.')