diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..79a16af --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 120 \ No newline at end of file diff --git a/download_v2.py b/download_v2.py index 755398f..f140eb1 100755 --- a/download_v2.py +++ b/download_v2.py @@ -4,7 +4,7 @@ Author: Dominik Steiner (dominik.steiner@nts.eu) Description: This script downloads all Tesla charging invoices for a given month. Usage: python3 dsteiner_tesla_invoices_download.py -Version: 2.0 +Version: 3.0 Date Created: 2024-02-05 Changed: Changed Owner-API Endpoint to /products instead of /vehicles. Python Version: 3.11.7 @@ -23,6 +23,7 @@ import smtplib from email.message import EmailMessage import logging +from time import sleep # setup logger logger = logging.getLogger(__name__) @@ -33,13 +34,9 @@ HOMEASSISTANT = True options = json.load(Path("/data/options.json").open()) REFRESH_TOKEN_PATH = Path("/data/refresh_token.txt") - REFRESH_TOKEN = options[ - "refresh_token" - ] # refresh token from options, might be expired + REFRESH_TOKEN = options["refresh_token"] # refresh token from options, might be expired ACCESS_TOKEN_PATH = Path("/data/access_token.txt") - ACCESS_TOKEN = options[ - "access_token" - ] # access token from options, might be expired + ACCESS_TOKEN = options["access_token"] # access token from options, might be expired INVOICE_PATH = Path("/data/invoices/") if options.get("enable_email_export", False): ENABLE_EMAIL_EXPORT = True @@ -51,23 +48,19 @@ EMAIL_PASS = options["email"]["password"] else: ENABLE_EMAIL_EXPORT = False + ENABLE_SUBSCRIPTION_INVOICE = options.get("enable_subscription_invoice", True) else: - HOMEASSISTANT = True + HOMEASSISTANT = False # get everything from environment variables - REFRESH_TOKEN_PATH = Path( - os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt") - ) + REFRESH_TOKEN_PATH = Path(os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt")) REFRESH_TOKEN = REFRESH_TOKEN_PATH.read_text().strip() - ACCESS_TOKEN_PATH = Path( - os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt") - ) + ACCESS_TOKEN_PATH = Path(os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt")) ACCESS_TOKEN = ACCESS_TOKEN_PATH.read_text().strip() # path to save invoices INVOICE_PATH = Path(os.environ.get("INVOICE_PATH", "/opt/tesla-invoices/invoices/")) - ENABLE_EMAIL_EXPORT = ( - os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true" - ) + ENABLE_EMAIL_EXPORT = os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true" + ENABLE_SUBSCRIPTION_INVOICE = os.environ.get("ENABLE_SUBSCRIPTION_INVOICE", "True").lower() == "true" EMAIL_FROM = os.environ.get("EMAIL_FROM", "") EMAIL_TO = os.environ.get("EMAIL_TO", "") EMAIL_SERVER = os.environ.get("EMAIL_SERVER", "") @@ -90,13 +83,22 @@ def main(): pass -def base_req(url: str, method="get", json={}): +def base_req(url: str, method="get", json={}, *args, **kwargs): headers = { "Authorization": f"Bearer {ACCESS_TOKEN}", # "x-tesla-user-agent": "TeslaApp/4.28.3-2167", } logging.info(f"{method} Request to url: {url}") - result = sess.request(method=method, url=url, headers=headers, json=json) + for attempt in range(3): + try: + result = sess.request(method=method, url=url, headers=headers, json=json, *args, **kwargs) + break + except requests.exceptions.ChunkedEncodingError: + logger.warning(f"incomplete read occured, attempt {attempt} of 3") + sleep(1) + else: + logger.error("giving up after 3 tries") + exit(1) result.raise_for_status() if "application/json" in result.headers.get("Content-Type"): return result.json() @@ -139,9 +141,7 @@ def compare_access_token(): # see if token is valid file_access_token_json = jwt_decode(file_access_token) except Exception as e: - logging.warning( - f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}" - ) + logging.warning(f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}") # try to get access token from options try: @@ -150,15 +150,11 @@ def compare_access_token(): logging.warning(f"Could not Parse Access Token from Options, {e}") if not file_access_token_json and not options_access_token_json: - logging.error( - "Could not find any valid access token from file or options, exiting" - ) + logging.error("Could not find any valid access token from file or options, exiting") exit(1) elif file_access_token_json and options_access_token_json: # compare both tokens - if file_access_token_json.get("iat", 0) > options_access_token_json.get( - "iat", 0 - ): + if file_access_token_json.get("iat", 0) > options_access_token_json.get("iat", 0): ACCESS_TOKEN = file_access_token else: # nothing to do, ACCESS_TOKEN already set to options @@ -166,8 +162,9 @@ def compare_access_token(): elif file_access_token_json and not options_access_token_json: ACCESS_TOKEN = file_access_token elif not file_access_token_json and options_access_token_json: - # nothing to do, ACCESS_TOKEN already set to options - pass + # file does not exist an access token set in options + # write access token to file + ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN) else: logging.warning("Unhandled Case when comparing access tokens") @@ -182,9 +179,7 @@ def compare_refresh_token(): # see if token is valid file_refresh_token_json = jwt_decode(file_refresh_token) except Exception as e: - logging.warning( - f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}" - ) + logging.warning(f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}") # try to get access token from options try: @@ -193,29 +188,28 @@ def compare_refresh_token(): logging.warning(f"Could not Parse Refresh Token from Options, {e}") if not file_refresh_token_json and not options_refresh_token_json: - logging.error( - "Could not find any valid refresh token from file or options, exiting" - ) + logging.error("Could not find any valid refresh token from file or options, exiting") exit(1) elif file_refresh_token_json and options_refresh_token_json: # compare both tokens - if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get( - "iat", 0 - ): + if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get("iat", 0): REFRESH_TOKEN = file_refresh_token else: # nothing to do, REFRESH_TOKEN already set to options pass elif file_refresh_token_json and not options_refresh_token_json: + # file exists and nothing in options REFRESH_TOKEN = file_refresh_token elif not file_refresh_token_json and options_refresh_token_json: - # nothing to do, REFRESH_TOKEN already set to options - pass + # file does not exist and refresh token set in options + # write refresh token to file + REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN) else: logging.warning("Unhandled Case when comparing refresh tokens") def refresh_token(): + global ACCESS_TOKEN if HOMEASSISTANT: compare_token() # check if current token expires in less than 2 hours @@ -233,6 +227,7 @@ def refresh_token(): } result = base_req(url, method="post", json=payload) ACCESS_TOKEN_PATH.write_text(result["access_token"]) + ACCESS_TOKEN = result["access_token"] logger.info("Sucesfully refreshed token") else: # token is valid for mor then 2 hours @@ -247,7 +242,7 @@ def interactive(): prev_month = cur_month - timedelta(days=1) user_choice_month = input( - "Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: " + "Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: " # noqa ) user_choice_month = user_choice_month.strip().lower() @@ -259,14 +254,12 @@ def interactive(): print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.") elif user_choice_month == "all": desired_invoice_date = datetime.strptime("1999-01", "%Y-%m") - print(f"Using 'all'.") + print("Using 'all'.") else: try: - desired_invoice_date = datetime.strptime( - user_choice_month, "%Y-%m" - ) # format: YYYY-MM + desired_invoice_date = datetime.strptime(user_choice_month, "%Y-%m") # format: YYYY-MM print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.") - except: + except: # noqa print("ERROR - Bitte Eingabe kontrollieren!") exit(1) @@ -290,16 +283,28 @@ def download_invoice(desired_invoice_date): url_charging_base = "https://ownership.tesla.com/mobile-app/charging/" for vin, vehicle in vehicles.items(): if "display_name" in vehicle: - logger.info( - f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}..." - ) + logger.info(f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}...") else: logger.info(f"Processing vehicle {vehicle['vin']}...") # create API URL for vehicle VIN - url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2" + url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2" # noqa charging_sessions = base_req(url_charging_history) - save_invoice(charging_sessions["data"], desired_invoice_date) + save_charging_invoice(charging_sessions["data"], desired_invoice_date) + + if ENABLE_SUBSCRIPTION_INVOICE: + logger.info("Subscription Invoice Enabled -> starting to download subscription invoices") + url_subcription_invoices = "https://ownership.tesla.com/mobile-app/subscriptions/invoices" + params_subscription_invoices = { + "deviceLanguage": "en", + "deviceCountry": "AT", + "httpLocale": "en_US", + "vin": vehicle["vin"], + "optionCode": "$CPF1", + } + subscription_invoices = base_req(url_subcription_invoices, params=params_subscription_invoices) + save_subscription_invoice(subscription_invoices["data"], desired_invoice_date, vehicle["vin"]) + if ENABLE_EMAIL_EXPORT: send_mails() @@ -308,19 +313,28 @@ def download_invoice(desired_invoice_date): def get_charging_invoice(charging_session_invoice_id, vin): url_charging_base = "https://ownership.tesla.com/mobile-app/charging/" - url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}" + url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}" # noqa return base_req(url_charging_invoice) -def save_invoice(charging_sessions, desired_invoice_date): +def get_subscription_invoice(subscription_invoice_id, vin): + url_documents_invoice = f"https://ownership.tesla.com/mobile-app/documents/invoices/{subscription_invoice_id}" + params = { + "deviceLanguage": "en", + "deviceCountry": "AT", + "httpLocale": "en_US", + "vin": vin, + } + return base_req(url_documents_invoice, params=params) + + +def save_charging_invoice(charging_sessions, desired_invoice_date): # make sure folder exists INVOICE_PATH.mkdir(parents=True, exist_ok=True) for charging_session in charging_sessions: - charging_session_datetime = datetime.fromisoformat( - charging_session["unlatchDateTime"] - ) + charging_session_datetime = datetime.fromisoformat(charging_session["unlatchDateTime"]) charging_session_countrycode = charging_session["countryCode"] # check for desired invoice date @@ -343,23 +357,51 @@ def save_invoice(charging_sessions, desired_invoice_date): local_file_path = ( INVOICE_PATH - / f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}" + / f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}" # noqa ) if local_file_path.exists(): # file already downloaded, skip - logger.info( - f"Invoice {charging_session_invoice_filename} already saved" - ) + logger.info(f"Invoice {charging_session_invoice_filename} already saved") continue logger.info(f"Downloading {charging_session_invoice_filename}") - charging_invoice = get_charging_invoice( - charging_session_invoice_id, charging_session["vin"] - ) + charging_invoice = get_charging_invoice(charging_session_invoice_id, charging_session["vin"]) local_file_path.write_bytes(charging_invoice) logger.info(f"File '{local_file_path}' saved.") +def save_subscription_invoice(subscription_invoices, desired_invoice_date, vin): + # make sure folder exists + INVOICE_PATH.mkdir(parents=True, exist_ok=True) + + for invoice in subscription_invoices: + invoice_datetime = datetime.fromisoformat(subscription_invoices[0]["InvoiceDate"]) + + # check for desired invoice date + if desired_invoice_date.year == 1999: + # 1999 means all invoices + pass + elif invoice_datetime.year != desired_invoice_date.year: + # wrong year -> skip + continue + elif invoice_datetime.month != desired_invoice_date.month: + # correct year but bad month -> skip + continue + + local_file_path = ( + INVOICE_PATH / f"tesla_subscription_invoice_{vin}_{invoice_datetime.strftime('%Y-%m-%d')}.pdf" + ) + if local_file_path.exists(): + # file already downloaded, skip + logger.info(f"Invoice {invoice['InvoiceFileName']} already saved") + continue + + logger.info(f"Downloading {invoice['InvoiceFileName']}") + subscription_invoice = get_subscription_invoice(invoice["InvoiceId"], vin) + local_file_path.write_bytes(subscription_invoice) + logger.info(f"File '{local_file_path}' saved.") + + def get_vehicles(): url_products = "https://owner-api.teslamotors.com/api/1/products?orders=true" vehicles = {} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..86037bf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.black] + +line-length = 119 \ No newline at end of file