Skip to content

Commit

Permalink
compare tokens on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
asauerwein-nts committed Apr 22, 2024
1 parent 7672dbc commit 41e6c5b
Showing 1 changed file with 98 additions and 33 deletions.
131 changes: 98 additions & 33 deletions download_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

if Path("/data/options.json").exists():
# running as HA Addon, parse all options from /data/options.json
HOMEASSISTANT = True
options = json.load(Path("/data/options.json").open())
REFRESH_TOKEN_PATH = Path("/data/refresh_token.txt")
REFRESH_TOKEN = options[
Expand All @@ -51,6 +52,7 @@
else:
ENABLE_EMAIL_EXPORT = False
else:
HOMEASSISTANT = True
# get everything from environment variables
REFRESH_TOKEN_PATH = Path(
os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt")
Expand All @@ -73,16 +75,17 @@
EMAIL_USER = os.environ.get("EMAIL_USER", "")
EMAIL_PASS = os.environ.get("EMAIL_PASS", "")

if not REFRESH_TOKEN:
logger.error("Refresh Token not set")
exit(1)
# if not REFRESH_TOKEN:
# logger.error("Refresh Token not set")
# exit(1)

if not ACCESS_TOKEN:
logger.error("Access Token not set")
exit(1)
# if not ACCESS_TOKEN:
# logger.error("Access Token not set")
# exit(1)

sess = requests.session()


def main():
pass

Expand Down Expand Up @@ -113,46 +116,108 @@ def jwt_decode(token: str):


def compare_token():
# NOTE to myself
# it's a little bit difficult to keep track of the tokens in HA
# USER sets it access and refresh token in the GUI
# this token gets written to /data/options.json
# there are some difficulties
# 1. token set by user can be empty ( somehow.. )
# 2. user set token can be older than already saved token
# 3. saved token can be empty
# 4. saved token can invalid/impossible to parse
compare_access_token()
compare_refresh_token()


def compare_access_token():
global ACCESS_TOKEN
global REFRESH_TOKEN
# the tokens in options might be expired
# that's why we compare the options token with the token in the files
# files will be updated only if the options tokens are newer
if REFRESH_TOKEN_PATH.exists():
file_refresh_token = REFRESH_TOKEN_PATH.read_text().strip()
file_refresh_token_json = jwt_decode(file_refresh_token)
option_refresh_token_json = jwt_decode(REFRESH_TOKEN)
if option_refresh_token_json.get("iat", 0) > file_refresh_token_json.get(
file_access_token_json = ""
options_access_token_json = ""
if ACCESS_TOKEN_PATH.exists():
file_access_token = ACCESS_TOKEN_PATH.read_text().strip()
try:
# see if token is valid
file_access_token_json = jwt_decode(file_access_token)
except Exception as e:
logging.warning(
f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}"
)

# try to get access token from options
try:
options_access_token_json = jwt_decode(ACCESS_TOKEN)
except Exception as e:
logging.warn(f"Could not Parse Access Token from Options, {e}")

if not file_access_token_json and not options_access_token_json:
logging.error(
"Could not find any valid access token from file or options, exiting"
)
exit(1)
elif file_access_token_json and options_access_token_json:
# compare both tokens
if file_access_token_json.get("iat", 0) > options_access_token_json.get(
"iat", 0
):
# options is newer, write to file
REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN)
ACCESS_TOKEN = file_access_token
else:
# options is older, use token from file
REFRESH_TOKEN = REFRESH_TOKEN_PATH.read_text().strip()
# nothing to do, ACCESS_TOKEN already set to options
pass
elif file_access_token_json and not options_access_token_json:
ACCESS_TOKEN = file_access_token
elif not file_access_token_json and options_access_token_json:
# nothing to do, ACCESS_TOKEN already set to options
pass
else:
# no need to compare, create file
REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN)
logging.warn("Unhandled Case when comparing access tokens")

if ACCESS_TOKEN_PATH.exists():
file_access_token = ACCESS_TOKEN_PATH.read_text().strip()
file_access_token_json = jwt_decode(file_access_token)
option_access_token_json = jwt_decode(ACCESS_TOKEN)
if option_access_token_json.get("iat", 0) > file_access_token_json.get(

def compare_refresh_token():
global REFRESH_TOKEN
file_refresh_token_json = ""
options_refresh_token_json = ""
if REFRESH_TOKEN_PATH.exists():
file_refresh_token = REFRESH_TOKEN_PATH.read_text().strip()
try:
# see if token is valid
file_refresh_token_json = jwt_decode(file_refresh_token)
except Exception as e:
logging.warning(
f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}"
)

# try to get access token from options
try:
options_refresh_token_json = jwt_decode(REFRESH_TOKEN)
except Exception as e:
logging.warning(f"Could not Parse Refresh Token from Options, {e}")

if not file_refresh_token_json and not options_refresh_token_json:
logging.error(
"Could not find any valid refresh token from file or options, exiting"
)
exit(1)
elif file_refresh_token_json and options_refresh_token_json:
# compare both tokens
if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get(
"iat", 0
):
# options is newer, write to file
ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN)
REFRESH_TOKEN = file_refresh_token
else:
# options is older, use token from file
ACCESS_TOKEN = ACCESS_TOKEN_PATH.read_text().strip()
# nothing to do, REFRESH_TOKEN already set to options
pass
elif file_refresh_token_json and not options_refresh_token_json:
REFRESH_TOKEN = file_refresh_token
elif not file_refresh_token_json and options_refresh_token_json:
# nothing to do, REFRESH_TOKEN already set to options
pass
else:
# no need to compare, create file
ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN)
logging.warning("Unhandled Case when comparing refresh tokens")


def refresh_token():
compare_token()
if HOMEASSISTANT:
compare_token()
# check if current token expires in less than 2 hours
jwt_json = jwt_decode(ACCESS_TOKEN)
if jwt_json["exp"] - time.time() < 7200:
Expand Down

0 comments on commit 41e6c5b

Please sign in to comment.