From 41e6c5bf5b0e984ffc05af8968e06957bd36d9e9 Mon Sep 17 00:00:00 2001 From: Andreas Sauerwein-Schlosser Date: Mon, 22 Apr 2024 23:56:51 +0200 Subject: [PATCH] compare tokens on startup --- download_v2.py | 131 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 98 insertions(+), 33 deletions(-) diff --git a/download_v2.py b/download_v2.py index f425a0f..8ad7512 100755 --- a/download_v2.py +++ b/download_v2.py @@ -30,6 +30,7 @@ if Path("/data/options.json").exists(): # running as HA Addon, parse all options from /data/options.json + HOMEASSISTANT = True options = json.load(Path("/data/options.json").open()) REFRESH_TOKEN_PATH = Path("/data/refresh_token.txt") REFRESH_TOKEN = options[ @@ -51,6 +52,7 @@ else: ENABLE_EMAIL_EXPORT = False else: + HOMEASSISTANT = True # get everything from environment variables REFRESH_TOKEN_PATH = Path( os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt") @@ -73,16 +75,17 @@ EMAIL_USER = os.environ.get("EMAIL_USER", "") EMAIL_PASS = os.environ.get("EMAIL_PASS", "") -if not REFRESH_TOKEN: - logger.error("Refresh Token not set") - exit(1) +# if not REFRESH_TOKEN: +# logger.error("Refresh Token not set") +# exit(1) -if not ACCESS_TOKEN: - logger.error("Access Token not set") - exit(1) +# if not ACCESS_TOKEN: +# logger.error("Access Token not set") +# exit(1) sess = requests.session() + def main(): pass @@ -113,46 +116,108 @@ def jwt_decode(token: str): def compare_token(): + # NOTE to myself + # it's a little bit difficult to keep track of the tokens in HA + # USER sets it access and refresh token in the GUI + # this token gets written to /data/options.json + # there are some difficulties + # 1. token set by user can be empty ( somehow.. ) + # 2. user set token can be older than already saved token + # 3. saved token can be empty + # 4. saved token can invalid/impossible to parse + compare_access_token() + compare_refresh_token() + + +def compare_access_token(): global ACCESS_TOKEN - global REFRESH_TOKEN - # the tokens in options might be expired - # that's why we compare the options token with the token in the files - # files will be updated only if the options tokens are newer - if REFRESH_TOKEN_PATH.exists(): - file_refresh_token = REFRESH_TOKEN_PATH.read_text().strip() - file_refresh_token_json = jwt_decode(file_refresh_token) - option_refresh_token_json = jwt_decode(REFRESH_TOKEN) - if option_refresh_token_json.get("iat", 0) > file_refresh_token_json.get( + file_access_token_json = "" + options_access_token_json = "" + if ACCESS_TOKEN_PATH.exists(): + file_access_token = ACCESS_TOKEN_PATH.read_text().strip() + try: + # see if token is valid + file_access_token_json = jwt_decode(file_access_token) + except Exception as e: + logging.warning( + f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}" + ) + + # try to get access token from options + try: + options_access_token_json = jwt_decode(ACCESS_TOKEN) + except Exception as e: + logging.warn(f"Could not Parse Access Token from Options, {e}") + + if not file_access_token_json and not options_access_token_json: + logging.error( + "Could not find any valid access token from file or options, exiting" + ) + exit(1) + elif file_access_token_json and options_access_token_json: + # compare both tokens + if file_access_token_json.get("iat", 0) > options_access_token_json.get( "iat", 0 ): - # options is newer, write to file - REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN) + ACCESS_TOKEN = file_access_token else: - # options is older, use token from file - REFRESH_TOKEN = REFRESH_TOKEN_PATH.read_text().strip() + # nothing to do, ACCESS_TOKEN already set to options + pass + elif file_access_token_json and not options_access_token_json: + ACCESS_TOKEN = file_access_token + elif not file_access_token_json and options_access_token_json: + # nothing to do, ACCESS_TOKEN already set to options + pass else: - # no need to compare, create file - REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN) + logging.warn("Unhandled Case when comparing access tokens") - if ACCESS_TOKEN_PATH.exists(): - file_access_token = ACCESS_TOKEN_PATH.read_text().strip() - file_access_token_json = jwt_decode(file_access_token) - option_access_token_json = jwt_decode(ACCESS_TOKEN) - if option_access_token_json.get("iat", 0) > file_access_token_json.get( + +def compare_refresh_token(): + global REFRESH_TOKEN + file_refresh_token_json = "" + options_refresh_token_json = "" + if REFRESH_TOKEN_PATH.exists(): + file_refresh_token = REFRESH_TOKEN_PATH.read_text().strip() + try: + # see if token is valid + file_refresh_token_json = jwt_decode(file_refresh_token) + except Exception as e: + logging.warning( + f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}" + ) + + # try to get access token from options + try: + options_refresh_token_json = jwt_decode(REFRESH_TOKEN) + except Exception as e: + logging.warning(f"Could not Parse Refresh Token from Options, {e}") + + if not file_refresh_token_json and not options_refresh_token_json: + logging.error( + "Could not find any valid refresh token from file or options, exiting" + ) + exit(1) + elif file_refresh_token_json and options_refresh_token_json: + # compare both tokens + if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get( "iat", 0 ): - # options is newer, write to file - ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN) + REFRESH_TOKEN = file_refresh_token else: - # options is older, use token from file - ACCESS_TOKEN = ACCESS_TOKEN_PATH.read_text().strip() + # nothing to do, REFRESH_TOKEN already set to options + pass + elif file_refresh_token_json and not options_refresh_token_json: + REFRESH_TOKEN = file_refresh_token + elif not file_refresh_token_json and options_refresh_token_json: + # nothing to do, REFRESH_TOKEN already set to options + pass else: - # no need to compare, create file - ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN) + logging.warning("Unhandled Case when comparing refresh tokens") def refresh_token(): - compare_token() + if HOMEASSISTANT: + compare_token() # check if current token expires in less than 2 hours jwt_json = jwt_decode(ACCESS_TOKEN) if jwt_json["exp"] - time.time() < 7200: