-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstore_data.py
213 lines (186 loc) · 7.3 KB
/
store_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import re
import urllib
import psycopg2
import smtplib
import sqlite3
from collections import deque
from urllib.parse import urlsplit
from unicodedata import normalize
import string
from sqlite3 import Error
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import chardet
import requests
import requests.exceptions
import traceback
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from config import DBHOST, DBNAME, DBPASS, DBUSER, PATH_TO_CHROMEDRIVER
# Initialise Client
browser = webdriver.Chrome(PATH_TO_CHROMEDRIVER) # C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe
# chrome_options = Options()
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--headless')
# browser = webdriver.Chrome('/usr/bin/chromedriver',chrome_options=chrome_options)
results = {}
search_category_list = ['Accommodation', 'hairdresser', 'engineering', 'restaurants']
search_location_list = ['Mackay QLD', 'Greater Brisbane, QLD', 'Townsville, QLD', 'Tasmania']
# Create a database connection to a SQLite database
def get_connection():
try:
conn = psycopg2.connect(dbname=DBNAME, dbuser=DBUSER, dbhost=DBHOST, dbpass=DBPASS)
except psycopg2.Error as e:
print(e)
return conn.cursor()
def get_contact_info(contact_class, div, attribute='href'):
try:
return div.find_element_by_class_name(contact_class).get_attribute(attribute)
except:
return None
def get_business_name(contact_class, div):
try:
return div.find_element_by_class_name(contact_class).text
except:
return None
def get_search(contact_class, div, attribute='value'):
try:
return div.find_element_by_class_name(contact_class).get_attribute(attribute)
except NoSuchElementException:
return None
def next_page():
element = browser.find_element_by_css_selector('div.button-pagination-container a.navigation:last-of-type').get_attribute('href')
browser.get(element)
def generate_url_name(business_name):
normalised_text = normalize('NFD', business_name)
url = normalised_text.lower()
url_link = ""
for character in url:
if character.islower():
url_link += character
elif character == " ":
url_link += "_"
elif character == "&":
url_link += "and"
else:
continue
return url_link
def is_ascii(variable):
try:
variable.encode('ascii')
except UnicodeEncodeError:
return False
else:
return True
def check_button_exists(contact_class, attribute='href'):
try:
browser.find_element_by_css_selector(contact_class).get_attribute(attribute)
except NoSuchElementException:
return False
return True
def search(category, location):
cat = urllib.parse.quote(category)
loc = urllib.parse.quote(location)
browser.get(f'https://www.yellowpages.com.au/search/listings?clue={cat}&locationClue={loc}')
def store_dict(cursor):
create_table(cursor)
for business_name in results:
website = results[business_name]['website']
email = results[business_name]['email']
location = results[business_name]['location']
phone = results[business_name]['phone']
type = results[business_name]['type']
logo = results[business_name]['logo']
url = generate_url_name(business_name)
if not duplicate_check(cursor, business_name, email):
cursor.execute("INSERT INTO listings(business_name, website, email, phone, location, type, logo, url) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)", (business_name, website, email, phone, location, type, logo, url))
conn.commit()
conn.close()
def create_table(cursor):
cursor.execute("""CREATE TABLE IF NOT EXISTS listings (
business_name text,
website text,
email text,
phone text,
location text,
type text,
logo text,
url text,
sent int DEFAULT 0
); """)
blacklist(cursor, ["Roshni Indian Restaurant", "Mackay Entertainment & Convention Centre"], ["[email protected]", "[email protected]"]) # BLACKLIST {name}, {email}
def duplicate_check(cursor, name, email):
name = cursor.execute("SELECT business_name FROM listings WHERE business_name=%s;", (name,))
email = cursor.execute("SELECT business_name FROM listings WHERE email=%s;", (email,))
if name or email:
return True
return False
def blacklist(cursor, names: list, emails: list):
for name, email in zip(names, emails):
name_exists = cursor.execute("SELECT business_name FROM listings WHERE business_name=%s;", (name,))
email_exists = cursor.execute("SELECT business_name FROM listings WHERE email=%s;", (email,))
if name_exists:
name_exists.fetchone()
if email_exists:
email_exists.fetchone()
if name_exists or email_exists:
if name_exists == email_exists:
cursor.execute("UPDATE listings SET sent = 1 WHERE business_name = %s", (name,))
else:
cursor.execute("UPDATE listings SET sent = 1 WHERE business_name = %s OR email = %s", (name, email))
else:
cursor.execute(
"INSERT INTO listings(business_name, email, sent) VALUES(%s, %s, %s)", (name, email, "1"))
conn.commit()
def scrape_client:
for location in search_location_list:
for category in search_category_list:
search(category, location)
# loop until there are no more pages
while True:
listing_divs = browser.find_elements_by_class_name('search-contact-card-table-div')
# sort through contact divs - fetch data
for div in listing_divs:
business_info = {}
logo = get_contact_info('listing-logo', div, attribute='src')
if logo:
business_info['logo'] = logo
else:
business_info['logo'] = ""
phone = get_business_name('contact-text', div)
business_info['phone'] = phone
business_info['location'] = location
business_info['type'] = category
# Get website
website = get_contact_info('contact-url', div, attribute='href')
if website:
business_info['website'] = website
else:
business_info['website'] = ""
# Get email
email = get_contact_info('contact-email', div, attribute='data-email')
if email:
business_info['email'] = email
else:
continue
# Get business name
name = get_business_name('listing-name', div)
if name:
if is_ascii(name):
results[name] = business_info
# if the next button exists, move to the next page.
if check_button_exists('div.button-pagination-container a.navigation:last-of-type', attribute='href'):
# next_butt
# on_exists = True
next_page()
else:
break
scrape_client()
store_dict(get_connection())