Skip to content

Commit

Permalink
v0.6.0 (#2)
Browse files Browse the repository at this point in the history
* write access token to variable after refresh

* fix homeassistant boolean flag

* creat token files if they do not exist

* try 3 times on imcomplete read

* enable subscription invoices

* add pyproject.toml for black

* black format with line length 120

* set flake 8 settings

---------

Co-authored-by: Andreas Sauerwein-Schlosser <[email protected]>
  • Loading branch information
aSauerwein and asauerwein-nts authored Jun 22, 2024
1 parent ce465e9 commit 7b7e72a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 64 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
170 changes: 106 additions & 64 deletions download_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Dominik Steiner ([email protected])
Description: This script downloads all Tesla charging invoices for a given month.
Usage: python3 dsteiner_tesla_invoices_download.py
Version: 2.0
Version: 3.0
Date Created: 2024-02-05
Changed: Changed Owner-API Endpoint to /products instead of /vehicles.
Python Version: 3.11.7
Expand All @@ -23,6 +23,7 @@
import smtplib
from email.message import EmailMessage
import logging
from time import sleep

# setup logger
logger = logging.getLogger(__name__)
Expand All @@ -33,13 +34,9 @@
HOMEASSISTANT = True
options = json.load(Path("/data/options.json").open())
REFRESH_TOKEN_PATH = Path("/data/refresh_token.txt")
REFRESH_TOKEN = options[
"refresh_token"
] # refresh token from options, might be expired
REFRESH_TOKEN = options["refresh_token"] # refresh token from options, might be expired
ACCESS_TOKEN_PATH = Path("/data/access_token.txt")
ACCESS_TOKEN = options[
"access_token"
] # access token from options, might be expired
ACCESS_TOKEN = options["access_token"] # access token from options, might be expired
INVOICE_PATH = Path("/data/invoices/")
if options.get("enable_email_export", False):
ENABLE_EMAIL_EXPORT = True
Expand All @@ -51,23 +48,19 @@
EMAIL_PASS = options["email"]["password"]
else:
ENABLE_EMAIL_EXPORT = False
ENABLE_SUBSCRIPTION_INVOICE = options.get("enable_subscription_invoice", True)
else:
HOMEASSISTANT = True
HOMEASSISTANT = False
# get everything from environment variables
REFRESH_TOKEN_PATH = Path(
os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt")
)
REFRESH_TOKEN_PATH = Path(os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt"))
REFRESH_TOKEN = REFRESH_TOKEN_PATH.read_text().strip()
ACCESS_TOKEN_PATH = Path(
os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt")
)
ACCESS_TOKEN_PATH = Path(os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt"))
ACCESS_TOKEN = ACCESS_TOKEN_PATH.read_text().strip()
# path to save invoices
INVOICE_PATH = Path(os.environ.get("INVOICE_PATH", "/opt/tesla-invoices/invoices/"))

ENABLE_EMAIL_EXPORT = (
os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true"
)
ENABLE_EMAIL_EXPORT = os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true"
ENABLE_SUBSCRIPTION_INVOICE = os.environ.get("ENABLE_SUBSCRIPTION_INVOICE", "True").lower() == "true"
EMAIL_FROM = os.environ.get("EMAIL_FROM", "")
EMAIL_TO = os.environ.get("EMAIL_TO", "")
EMAIL_SERVER = os.environ.get("EMAIL_SERVER", "")
Expand All @@ -90,13 +83,22 @@ def main():
pass


def base_req(url: str, method="get", json={}):
def base_req(url: str, method="get", json={}, *args, **kwargs):
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
# "x-tesla-user-agent": "TeslaApp/4.28.3-2167",
}
logging.info(f"{method} Request to url: {url}")
result = sess.request(method=method, url=url, headers=headers, json=json)
for attempt in range(3):
try:
result = sess.request(method=method, url=url, headers=headers, json=json, *args, **kwargs)
break
except requests.exceptions.ChunkedEncodingError:
logger.warning(f"incomplete read occured, attempt {attempt} of 3")
sleep(1)
else:
logger.error("giving up after 3 tries")
exit(1)
result.raise_for_status()
if "application/json" in result.headers.get("Content-Type"):
return result.json()
Expand Down Expand Up @@ -139,9 +141,7 @@ def compare_access_token():
# see if token is valid
file_access_token_json = jwt_decode(file_access_token)
except Exception as e:
logging.warning(
f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}"
)
logging.warning(f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}")

# try to get access token from options
try:
Expand All @@ -150,24 +150,21 @@ def compare_access_token():
logging.warning(f"Could not Parse Access Token from Options, {e}")

if not file_access_token_json and not options_access_token_json:
logging.error(
"Could not find any valid access token from file or options, exiting"
)
logging.error("Could not find any valid access token from file or options, exiting")
exit(1)
elif file_access_token_json and options_access_token_json:
# compare both tokens
if file_access_token_json.get("iat", 0) > options_access_token_json.get(
"iat", 0
):
if file_access_token_json.get("iat", 0) > options_access_token_json.get("iat", 0):
ACCESS_TOKEN = file_access_token
else:
# nothing to do, ACCESS_TOKEN already set to options
pass
elif file_access_token_json and not options_access_token_json:
ACCESS_TOKEN = file_access_token
elif not file_access_token_json and options_access_token_json:
# nothing to do, ACCESS_TOKEN already set to options
pass
# file does not exist an access token set in options
# write access token to file
ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN)
else:
logging.warning("Unhandled Case when comparing access tokens")

Expand All @@ -182,9 +179,7 @@ def compare_refresh_token():
# see if token is valid
file_refresh_token_json = jwt_decode(file_refresh_token)
except Exception as e:
logging.warning(
f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}"
)
logging.warning(f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}")

# try to get access token from options
try:
Expand All @@ -193,29 +188,28 @@ def compare_refresh_token():
logging.warning(f"Could not Parse Refresh Token from Options, {e}")

if not file_refresh_token_json and not options_refresh_token_json:
logging.error(
"Could not find any valid refresh token from file or options, exiting"
)
logging.error("Could not find any valid refresh token from file or options, exiting")
exit(1)
elif file_refresh_token_json and options_refresh_token_json:
# compare both tokens
if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get(
"iat", 0
):
if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get("iat", 0):
REFRESH_TOKEN = file_refresh_token
else:
# nothing to do, REFRESH_TOKEN already set to options
pass
elif file_refresh_token_json and not options_refresh_token_json:
# file exists and nothing in options
REFRESH_TOKEN = file_refresh_token
elif not file_refresh_token_json and options_refresh_token_json:
# nothing to do, REFRESH_TOKEN already set to options
pass
# file does not exist and refresh token set in options
# write refresh token to file
REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN)
else:
logging.warning("Unhandled Case when comparing refresh tokens")


def refresh_token():
global ACCESS_TOKEN
if HOMEASSISTANT:
compare_token()
# check if current token expires in less than 2 hours
Expand All @@ -233,6 +227,7 @@ def refresh_token():
}
result = base_req(url, method="post", json=payload)
ACCESS_TOKEN_PATH.write_text(result["access_token"])
ACCESS_TOKEN = result["access_token"]
logger.info("Sucesfully refreshed token")
else:
# token is valid for mor then 2 hours
Expand All @@ -247,7 +242,7 @@ def interactive():
prev_month = cur_month - timedelta(days=1)

user_choice_month = input(
"Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: "
"Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: " # noqa
)
user_choice_month = user_choice_month.strip().lower()

Expand All @@ -259,14 +254,12 @@ def interactive():
print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.")
elif user_choice_month == "all":
desired_invoice_date = datetime.strptime("1999-01", "%Y-%m")
print(f"Using 'all'.")
print("Using 'all'.")
else:
try:
desired_invoice_date = datetime.strptime(
user_choice_month, "%Y-%m"
) # format: YYYY-MM
desired_invoice_date = datetime.strptime(user_choice_month, "%Y-%m") # format: YYYY-MM
print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.")
except:
except: # noqa
print("ERROR - Bitte Eingabe kontrollieren!")
exit(1)

Expand All @@ -290,16 +283,28 @@ def download_invoice(desired_invoice_date):
url_charging_base = "https://ownership.tesla.com/mobile-app/charging/"
for vin, vehicle in vehicles.items():
if "display_name" in vehicle:
logger.info(
f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}..."
)
logger.info(f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}...")
else:
logger.info(f"Processing vehicle {vehicle['vin']}...")

# create API URL for vehicle VIN
url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2"
url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2" # noqa
charging_sessions = base_req(url_charging_history)
save_invoice(charging_sessions["data"], desired_invoice_date)
save_charging_invoice(charging_sessions["data"], desired_invoice_date)

if ENABLE_SUBSCRIPTION_INVOICE:
logger.info("Subscription Invoice Enabled -> starting to download subscription invoices")
url_subcription_invoices = "https://ownership.tesla.com/mobile-app/subscriptions/invoices"
params_subscription_invoices = {
"deviceLanguage": "en",
"deviceCountry": "AT",
"httpLocale": "en_US",
"vin": vehicle["vin"],
"optionCode": "$CPF1",
}
subscription_invoices = base_req(url_subcription_invoices, params=params_subscription_invoices)
save_subscription_invoice(subscription_invoices["data"], desired_invoice_date, vehicle["vin"])

if ENABLE_EMAIL_EXPORT:
send_mails()

Expand All @@ -308,19 +313,28 @@ def download_invoice(desired_invoice_date):

def get_charging_invoice(charging_session_invoice_id, vin):
url_charging_base = "https://ownership.tesla.com/mobile-app/charging/"
url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}"
url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}" # noqa

return base_req(url_charging_invoice)


def save_invoice(charging_sessions, desired_invoice_date):
def get_subscription_invoice(subscription_invoice_id, vin):
url_documents_invoice = f"https://ownership.tesla.com/mobile-app/documents/invoices/{subscription_invoice_id}"
params = {
"deviceLanguage": "en",
"deviceCountry": "AT",
"httpLocale": "en_US",
"vin": vin,
}
return base_req(url_documents_invoice, params=params)


def save_charging_invoice(charging_sessions, desired_invoice_date):
# make sure folder exists
INVOICE_PATH.mkdir(parents=True, exist_ok=True)

for charging_session in charging_sessions:
charging_session_datetime = datetime.fromisoformat(
charging_session["unlatchDateTime"]
)
charging_session_datetime = datetime.fromisoformat(charging_session["unlatchDateTime"])
charging_session_countrycode = charging_session["countryCode"]

# check for desired invoice date
Expand All @@ -343,23 +357,51 @@ def save_invoice(charging_sessions, desired_invoice_date):

local_file_path = (
INVOICE_PATH
/ f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}"
/ f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}" # noqa
)
if local_file_path.exists():
# file already downloaded, skip
logger.info(
f"Invoice {charging_session_invoice_filename} already saved"
)
logger.info(f"Invoice {charging_session_invoice_filename} already saved")
continue

logger.info(f"Downloading {charging_session_invoice_filename}")
charging_invoice = get_charging_invoice(
charging_session_invoice_id, charging_session["vin"]
)
charging_invoice = get_charging_invoice(charging_session_invoice_id, charging_session["vin"])
local_file_path.write_bytes(charging_invoice)
logger.info(f"File '{local_file_path}' saved.")


def save_subscription_invoice(subscription_invoices, desired_invoice_date, vin):
# make sure folder exists
INVOICE_PATH.mkdir(parents=True, exist_ok=True)

for invoice in subscription_invoices:
invoice_datetime = datetime.fromisoformat(subscription_invoices[0]["InvoiceDate"])

# check for desired invoice date
if desired_invoice_date.year == 1999:
# 1999 means all invoices
pass
elif invoice_datetime.year != desired_invoice_date.year:
# wrong year -> skip
continue
elif invoice_datetime.month != desired_invoice_date.month:
# correct year but bad month -> skip
continue

local_file_path = (
INVOICE_PATH / f"tesla_subscription_invoice_{vin}_{invoice_datetime.strftime('%Y-%m-%d')}.pdf"
)
if local_file_path.exists():
# file already downloaded, skip
logger.info(f"Invoice {invoice['InvoiceFileName']} already saved")
continue

logger.info(f"Downloading {invoice['InvoiceFileName']}")
subscription_invoice = get_subscription_invoice(invoice["InvoiceId"], vin)
local_file_path.write_bytes(subscription_invoice)
logger.info(f"File '{local_file_path}' saved.")


def get_vehicles():
url_products = "https://owner-api.teslamotors.com/api/1/products?orders=true"
vehicles = {}
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[tool.black]

line-length = 119

0 comments on commit 7b7e72a

Please sign in to comment.