From 237db607460406af315ea333f39867118991a4bb Mon Sep 17 00:00:00 2001 From: naisanzaa Date: Tue, 28 May 2024 00:07:59 +0800 Subject: [PATCH] selenium: add option to autosave cookies. update logging. update type hinting. fix tests. --- .../integrations/seleniumWrapper/browser.py | 108 +++++++++++++----- .../integrations/seleniumWrapper/config.py | 1 + .../seleniumWrapper/tests/test_browser.py | 4 +- .../tests/test_browser_cookies_autosave.py | 27 +++++ .../seleniumWrapper/tests/test_user_agent.py | 1 - .../seleniumWrapper/webdriver_chrome.py | 15 +-- env-example.sh | 1 + 7 files changed, 117 insertions(+), 40 deletions(-) create mode 100644 automon/integrations/seleniumWrapper/tests/test_browser_cookies_autosave.py diff --git a/automon/integrations/seleniumWrapper/browser.py b/automon/integrations/seleniumWrapper/browser.py index c63a0e37..e7bd354b 100644 --- a/automon/integrations/seleniumWrapper/browser.py +++ b/automon/integrations/seleniumWrapper/browser.py @@ -55,8 +55,10 @@ def by(self) -> By: def config(self): return self._config - async def cookie_file_to_dict(self, file: str = 'cookies.txt'): - logger.debug(f'{file}') + async def cookie_file_to_dict(self, file: str = 'cookies.txt') -> list: + logger.debug(dict( + cookie_file_to_dict=file + )) with open(file, 'r') as file: return json.loads(file.read()) @@ -108,7 +110,9 @@ def user_agent(self): @property def current_url(self): if self.webdriver: - logger.debug(self._current_url) + logger.debug(dict( + current_url=self._current_url + )) if self._current_url == 'data:,': return '' return self._current_url @@ -124,7 +128,7 @@ def _screenshot_name(self, prefix=None): """Generate a unique filename""" title = self.webdriver.title - url = self._current_url + url = self.current_url hostname = urlparse(url).hostname hostname_ = Sanitation.ascii_numeric_only(hostname) @@ -176,10 +180,18 @@ async def add_cookie(self, cookie_dict: dict) -> bool: result = self.webdriver.add_cookie(cookie_dict=cookie_dict) if result is None: - logger.debug(f'{cookie_dict}') + logger.debug(dict( + domain=cookie_dict.get('domain'), + path=cookie_dict.get('path'), + secure=cookie_dict.get('secure'), + expiry=cookie_dict.get('expiry'), + name=cookie_dict.get('name'), + )) return True - logger.error(f'{cookie_dict}') + logger.error(dict( + add_cookie=cookie_dict + )) return False async def add_cookie_from_file(self, file: str) -> bool: @@ -197,7 +209,9 @@ async def add_cookies_from_list(self, cookies_list: list) -> bool: for cookie in cookies_list: await self.add_cookie(cookie_dict=cookie) - logger.debug(f'{True}') + logger.debug(dict( + add_cookies_from_list=len(cookies_list) + )) return True async def add_cookie_from_current_url(self): @@ -210,7 +224,7 @@ async def add_cookie_from_url(self, url: str) -> bool: if os.path.exists(cookie_file): logger.info(f'{cookie_file}') - return self.add_cookie_from_file(file=cookie_file) + return await self.add_cookie_from_file(file=cookie_file) logger.error(f'{cookie_file}') @@ -225,6 +239,11 @@ async def add_cookie_from_base64(self, base64_str: str) -> bool: logger.error(f'{base64_str}') return False + async def autosave_cookies(self) -> bool: + if self.current_url: + await self.save_cookies_for_current_url() + return await self.load_cookies_for_current_url() + async def delete_all_cookies(self) -> None: result = self.webdriver.delete_all_cookies() logger.info(f'{True}') @@ -234,7 +253,9 @@ async def _url_filename(self, url: str): parsed = await self.urlparse(url) hostname = parsed.hostname cookie_file = f'cookies-{hostname}.txt' - logger.info(f'{cookie_file}') + logger.info(dict( + _url_filename=cookie_file + )) return cookie_file async def get_cookie(self, name: str) -> dict: @@ -243,24 +264,28 @@ async def get_cookie(self, name: str) -> dict: return result async def get_cookies(self) -> [dict]: - result = self.webdriver.get_cookies() - logger.debug(f'{True}') - return result + cookies = self.webdriver.get_cookies() + logger.debug(dict( + get_cookies=len(cookies) + )) + return cookies async def get_cookies_base64(self) -> base64: - result = self.get_cookies() + cookies = await self.get_cookies() logger.debug(f'{True}') return base64.b64encode( - json.dumps(result).encode() + json.dumps(cookies).encode() ).decode() async def get_cookies_json(self) -> json.dumps: - cookies = self.get_cookies() - logger.debug(f'{True}') + cookies = await self.get_cookies() + logger.debug(dict( + get_cookies_json=len(cookies) + )) return json.dumps(cookies) async def get_cookies_summary(self): - result = self.get_cookies() + result = await self.get_cookies() summary = {} if result: for cookie in result: @@ -338,11 +363,15 @@ async def get(self, url: str, **kwargs) -> bool: current_url=self.current_url, kwargs=kwargs ))) + + if self.config.cookies_autosave: + await self.autosave_cookies() + return True except Exception as error: - logger.error(str(dict( + logger.error(dict( error=error, - ))) + )) return False @@ -360,7 +389,7 @@ async def get_page_source_beautifulsoup( features: str = 'lxml') -> BeautifulSoup: """read page source with beautifulsoup""" if not markdup: - markdup = self.get_page_source() + markdup = await self.get_page_source() return BeautifulSoup( markup=markdup, features=features) @@ -408,9 +437,19 @@ async def is_running(self) -> bool: logger.error(f'{False}') return False + async def load_cookies_for_current_url(self) -> bool: + filename = await self._url_filename(url=self.url) + logger.info(dict( + load_cookies_for_current_url=filename, + url=self.url, + )) + return await self.add_cookie_from_file(file=filename) + async def urlparse(self, url: str): parsed = urlparse(url=url) - logger.debug(f'{parsed}') + logger.debug(dict( + urlparse=parsed + )) return parsed async def quit(self) -> bool: @@ -432,26 +471,37 @@ async def quit(self) -> bool: async def run(self): """run browser""" try: - await self.config.run() - except: + return await self.config.run() + except Exception as error: + logger.error(dict( + error=error + )) return False - async def save_cookies_for_current_url(self): - filename = self._url_filename(url=self.url) - logger.info(f'{filename}') + async def save_cookies_for_current_url(self) -> bool: + filename = await self._url_filename(url=self.url) + logger.info(dict( + save_cookies_for_current_url=filename, + url=self.url, + )) return await self.save_cookies_to_file(file=filename) - async def save_cookies_to_file(self, file: str): + async def save_cookies_to_file(self, file: str) -> bool: with open(file, 'w') as cookies: cookies.write( await self.get_cookies_json() ) if os.path.exists(file): - logger.info(f'{os.path.abspath(file)} ({os.stat(file).st_size} B)') + logger.info(dict( + save_cookies_to_file=os.path.abspath(file), + bytes=os.stat(file).st_size + )) return True - logger.error(f'{file}') + logger.error(dict( + file=file + )) return False async def save_screenshot( diff --git a/automon/integrations/seleniumWrapper/config.py b/automon/integrations/seleniumWrapper/config.py index 1a6b2f3b..5d06d8f7 100644 --- a/automon/integrations/seleniumWrapper/config.py +++ b/automon/integrations/seleniumWrapper/config.py @@ -12,6 +12,7 @@ def __init__(self): self._webdriver = None self.webdriver_wrapper = None + self.cookies_autosave: bool = environ('SELENIUM_COOKIES_AUTOSAVE', False) self._cookies_base64 = environ('SELENIUM_COOKIES_BASE64') self._cookies_file = environ('SELENIUM_COOKIES_FILE') diff --git a/automon/integrations/seleniumWrapper/tests/test_browser.py b/automon/integrations/seleniumWrapper/tests/test_browser.py index bada3ca0..21d6037a 100644 --- a/automon/integrations/seleniumWrapper/tests/test_browser.py +++ b/automon/integrations/seleniumWrapper/tests/test_browser.py @@ -12,7 +12,7 @@ class SeleniumClientTest(unittest.TestCase): if asyncio.run(browser.run()): def test_fake_page(self): - self.assertFalse(browser.get('http://555.555.555.555')) + self.assertFalse(asyncio.run(browser.get('http://555.555.555.555'))) def test_real_page(self): if asyncio.run(browser.get('http://1.1.1.1')): @@ -31,6 +31,8 @@ def test_screenshot_file(self): self.assertTrue(asyncio.run(browser.save_screenshot())) self.assertTrue(asyncio.run(browser.save_screenshot(folder='./'))) + asyncio.run(browser.quit()) + if __name__ == '__main__': unittest.main() diff --git a/automon/integrations/seleniumWrapper/tests/test_browser_cookies_autosave.py b/automon/integrations/seleniumWrapper/tests/test_browser_cookies_autosave.py new file mode 100644 index 00000000..9fec76cf --- /dev/null +++ b/automon/integrations/seleniumWrapper/tests/test_browser_cookies_autosave.py @@ -0,0 +1,27 @@ +import unittest +import asyncio + +from automon.integrations.seleniumWrapper import SeleniumBrowser, ChromeWrapper + + +class Test(unittest.TestCase): + browser = SeleniumBrowser() + browser.config.webdriver_wrapper = ChromeWrapper() + browser.config.webdriver_wrapper.enable_defaults().enable_headless() + + # if asyncio.run(browser.run()): + asyncio.run(browser.run()) + + def test_autosave(self): + if asyncio.run(self.browser.run()): + + asyncio.run(self.browser.set_window_size(device_type='web-large')) + + if asyncio.run(self.browser.get('http://bing.com')): + self.assertTrue(asyncio.run(self.browser.autosave_cookies())) + + asyncio.run(self.browser.quit()) + + +if __name__ == '__main__': + unittest.main() diff --git a/automon/integrations/seleniumWrapper/tests/test_user_agent.py b/automon/integrations/seleniumWrapper/tests/test_user_agent.py index 911ac658..f99342c0 100644 --- a/automon/integrations/seleniumWrapper/tests/test_user_agent.py +++ b/automon/integrations/seleniumWrapper/tests/test_user_agent.py @@ -12,7 +12,6 @@ def test_filter(self): self.assertFalse(test.filter_agent('xxxxx')) self.assertFalse(test.filter_agent('xxxxx', case_sensitive=True)) - def test_random(self): test = SeleniumUserAgentBuilder() self.assertTrue(test.get_random_agent('applewebkit')) diff --git a/automon/integrations/seleniumWrapper/webdriver_chrome.py b/automon/integrations/seleniumWrapper/webdriver_chrome.py index c94320c5..359de424 100644 --- a/automon/integrations/seleniumWrapper/webdriver_chrome.py +++ b/automon/integrations/seleniumWrapper/webdriver_chrome.py @@ -24,9 +24,6 @@ def __init__(self): self.update_paths(self.chromedriver_path) - if not self.chromedriver_path: - logger.error('missing SELENIUM_CHROMEDRIVER_PATH') - def __repr__(self): if self._webdriver: return str(dict( @@ -59,6 +56,8 @@ def chromedriver_path(self): if os.path.exists(path): return path + logger.error('missing SELENIUM_CHROMEDRIVER_PATH') + @property def chromedriverVersion(self): if self.webdriver: @@ -304,7 +303,7 @@ def in_sandbox_disabled(self): self.disable_sandbox() return self - async def run(self) -> selenium.webdriver.Chrome: + async def run(self) -> bool: try: if self.chromedriver_path: self._ChromeService = selenium.webdriver.ChromeService( @@ -320,12 +319,12 @@ async def run(self) -> selenium.webdriver.Chrome: ) logger.info(f'{self}') - return self.webdriver + return True self._webdriver = selenium.webdriver.Chrome(options=self.chrome_options) logger.info(f'{self}') - return self.webdriver + return True except Exception as error: logger.error(f'{error}') raise Exception(error) @@ -400,9 +399,7 @@ def update_paths(self, path: str): return True - logger.error(dict( - chromedriver_path=path - )) + return False async def quit(self): """quit diff --git a/env-example.sh b/env-example.sh index 0b70ebf2..0be017cb 100644 --- a/env-example.sh +++ b/env-example.sh @@ -101,6 +101,7 @@ TWINE_PASSWORD= # Selenium SELENIUM_CHROMEDRIVER_PATH= SELENIUM_OPT= +SELENIUM_COOKIES_AUTOSAVE=False SELENIUM_COOKIES_BASE64= SELENIUM_COOKIES_FILE=