Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.6.0 #2

Merged
merged 8 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
170 changes: 106 additions & 64 deletions download_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Author: Dominik Steiner ([email protected])
Description: This script downloads all Tesla charging invoices for a given month.
Usage: python3 dsteiner_tesla_invoices_download.py
Version: 2.0
Version: 3.0
Date Created: 2024-02-05
Changed: Changed Owner-API Endpoint to /products instead of /vehicles.
Python Version: 3.11.7
Expand All @@ -23,6 +23,7 @@
import smtplib
from email.message import EmailMessage
import logging
from time import sleep

# setup logger
logger = logging.getLogger(__name__)
Expand All @@ -33,13 +34,9 @@
HOMEASSISTANT = True
options = json.load(Path("/data/options.json").open())
REFRESH_TOKEN_PATH = Path("/data/refresh_token.txt")
REFRESH_TOKEN = options[
"refresh_token"
] # refresh token from options, might be expired
REFRESH_TOKEN = options["refresh_token"] # refresh token from options, might be expired
ACCESS_TOKEN_PATH = Path("/data/access_token.txt")
ACCESS_TOKEN = options[
"access_token"
] # access token from options, might be expired
ACCESS_TOKEN = options["access_token"] # access token from options, might be expired
INVOICE_PATH = Path("/data/invoices/")
if options.get("enable_email_export", False):
ENABLE_EMAIL_EXPORT = True
Expand All @@ -51,23 +48,19 @@
EMAIL_PASS = options["email"]["password"]
else:
ENABLE_EMAIL_EXPORT = False
ENABLE_SUBSCRIPTION_INVOICE = options.get("enable_subscription_invoice", True)
else:
HOMEASSISTANT = True
HOMEASSISTANT = False
# get everything from environment variables
REFRESH_TOKEN_PATH = Path(
os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt")
)
REFRESH_TOKEN_PATH = Path(os.environ.get("REFRESH_TOKEN", "/opt/tesla-invoices/secrets/refresh_token.txt"))
REFRESH_TOKEN = REFRESH_TOKEN_PATH.read_text().strip()
ACCESS_TOKEN_PATH = Path(
os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt")
)
ACCESS_TOKEN_PATH = Path(os.environ.get("ACCESS_TOKEN", "/opt/tesla-invoices/secrets/access_token.txt"))
ACCESS_TOKEN = ACCESS_TOKEN_PATH.read_text().strip()
# path to save invoices
INVOICE_PATH = Path(os.environ.get("INVOICE_PATH", "/opt/tesla-invoices/invoices/"))

ENABLE_EMAIL_EXPORT = (
os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true"
)
ENABLE_EMAIL_EXPORT = os.environ.get("ENABLE_EMAIL_EXPORT", "False").lower() == "true"
ENABLE_SUBSCRIPTION_INVOICE = os.environ.get("ENABLE_SUBSCRIPTION_INVOICE", "True").lower() == "true"
EMAIL_FROM = os.environ.get("EMAIL_FROM", "")
EMAIL_TO = os.environ.get("EMAIL_TO", "")
EMAIL_SERVER = os.environ.get("EMAIL_SERVER", "")
Expand All @@ -90,13 +83,22 @@ def main():
pass


def base_req(url: str, method="get", json={}):
def base_req(url: str, method="get", json={}, *args, **kwargs):
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
# "x-tesla-user-agent": "TeslaApp/4.28.3-2167",
}
logging.info(f"{method} Request to url: {url}")
result = sess.request(method=method, url=url, headers=headers, json=json)
for attempt in range(3):
try:
result = sess.request(method=method, url=url, headers=headers, json=json, *args, **kwargs)
break
except requests.exceptions.ChunkedEncodingError:
logger.warning(f"incomplete read occured, attempt {attempt} of 3")
sleep(1)
else:
logger.error("giving up after 3 tries")
exit(1)
result.raise_for_status()
if "application/json" in result.headers.get("Content-Type"):
return result.json()
Expand Down Expand Up @@ -139,9 +141,7 @@ def compare_access_token():
# see if token is valid
file_access_token_json = jwt_decode(file_access_token)
except Exception as e:
logging.warning(
f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}"
)
logging.warning(f"Could not Parse Access Token from file {ACCESS_TOKEN_PATH}, {e}")

# try to get access token from options
try:
Expand All @@ -150,24 +150,21 @@ def compare_access_token():
logging.warning(f"Could not Parse Access Token from Options, {e}")

if not file_access_token_json and not options_access_token_json:
logging.error(
"Could not find any valid access token from file or options, exiting"
)
logging.error("Could not find any valid access token from file or options, exiting")
exit(1)
elif file_access_token_json and options_access_token_json:
# compare both tokens
if file_access_token_json.get("iat", 0) > options_access_token_json.get(
"iat", 0
):
if file_access_token_json.get("iat", 0) > options_access_token_json.get("iat", 0):
ACCESS_TOKEN = file_access_token
else:
# nothing to do, ACCESS_TOKEN already set to options
pass
elif file_access_token_json and not options_access_token_json:
ACCESS_TOKEN = file_access_token
elif not file_access_token_json and options_access_token_json:
# nothing to do, ACCESS_TOKEN already set to options
pass
# file does not exist an access token set in options
# write access token to file
ACCESS_TOKEN_PATH.write_text(ACCESS_TOKEN)
else:
logging.warning("Unhandled Case when comparing access tokens")

Expand All @@ -182,9 +179,7 @@ def compare_refresh_token():
# see if token is valid
file_refresh_token_json = jwt_decode(file_refresh_token)
except Exception as e:
logging.warning(
f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}"
)
logging.warning(f"Could not Parse Refresh Token from file {REFRESH_TOKEN_PATH}, {e}")

# try to get access token from options
try:
Expand All @@ -193,29 +188,28 @@ def compare_refresh_token():
logging.warning(f"Could not Parse Refresh Token from Options, {e}")

if not file_refresh_token_json and not options_refresh_token_json:
logging.error(
"Could not find any valid refresh token from file or options, exiting"
)
logging.error("Could not find any valid refresh token from file or options, exiting")
exit(1)
elif file_refresh_token_json and options_refresh_token_json:
# compare both tokens
if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get(
"iat", 0
):
if file_refresh_token_json.get("iat", 0) > options_refresh_token_json.get("iat", 0):
REFRESH_TOKEN = file_refresh_token
else:
# nothing to do, REFRESH_TOKEN already set to options
pass
elif file_refresh_token_json and not options_refresh_token_json:
# file exists and nothing in options
REFRESH_TOKEN = file_refresh_token
elif not file_refresh_token_json and options_refresh_token_json:
# nothing to do, REFRESH_TOKEN already set to options
pass
# file does not exist and refresh token set in options
# write refresh token to file
REFRESH_TOKEN_PATH.write_text(REFRESH_TOKEN)
else:
logging.warning("Unhandled Case when comparing refresh tokens")


def refresh_token():
global ACCESS_TOKEN
if HOMEASSISTANT:
compare_token()
# check if current token expires in less than 2 hours
Expand All @@ -233,6 +227,7 @@ def refresh_token():
}
result = base_req(url, method="post", json=payload)
ACCESS_TOKEN_PATH.write_text(result["access_token"])
ACCESS_TOKEN = result["access_token"]
logger.info("Sucesfully refreshed token")
else:
# token is valid for mor then 2 hours
Expand All @@ -247,7 +242,7 @@ def interactive():
prev_month = cur_month - timedelta(days=1)

user_choice_month = input(
"Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: "
"Bitte gewünschten Monat im Format 'YYYY-MM' bzw. 'cur' oder 'prev' oder 'all' für aktuellen oder vorherigen Monat oder alles eingeben [prev]: " # noqa
)
user_choice_month = user_choice_month.strip().lower()

Expand All @@ -259,14 +254,12 @@ def interactive():
print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.")
elif user_choice_month == "all":
desired_invoice_date = datetime.strptime("1999-01", "%Y-%m")
print(f"Using 'all'.")
print("Using 'all'.")
else:
try:
desired_invoice_date = datetime.strptime(
user_choice_month, "%Y-%m"
) # format: YYYY-MM
desired_invoice_date = datetime.strptime(user_choice_month, "%Y-%m") # format: YYYY-MM
print(f"Using '{desired_invoice_date.strftime('%Y-%m')}'.")
except:
except: # noqa
print("ERROR - Bitte Eingabe kontrollieren!")
exit(1)

Expand All @@ -290,16 +283,28 @@ def download_invoice(desired_invoice_date):
url_charging_base = "https://ownership.tesla.com/mobile-app/charging/"
for vin, vehicle in vehicles.items():
if "display_name" in vehicle:
logger.info(
f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}..."
)
logger.info(f"Processing vehicle {vehicle['vin']} - {vehicle['display_name']}...")
else:
logger.info(f"Processing vehicle {vehicle['vin']}...")

# create API URL for vehicle VIN
url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2"
url_charging_history = f"{url_charging_base}history?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vehicle['vin']}&operationName=getChargingHistoryV2" # noqa
charging_sessions = base_req(url_charging_history)
save_invoice(charging_sessions["data"], desired_invoice_date)
save_charging_invoice(charging_sessions["data"], desired_invoice_date)

if ENABLE_SUBSCRIPTION_INVOICE:
logger.info("Subscription Invoice Enabled -> starting to download subscription invoices")
url_subcription_invoices = "https://ownership.tesla.com/mobile-app/subscriptions/invoices"
params_subscription_invoices = {
"deviceLanguage": "en",
"deviceCountry": "AT",
"httpLocale": "en_US",
"vin": vehicle["vin"],
"optionCode": "$CPF1",
}
subscription_invoices = base_req(url_subcription_invoices, params=params_subscription_invoices)
save_subscription_invoice(subscription_invoices["data"], desired_invoice_date, vehicle["vin"])

if ENABLE_EMAIL_EXPORT:
send_mails()

Expand All @@ -308,19 +313,28 @@ def download_invoice(desired_invoice_date):

def get_charging_invoice(charging_session_invoice_id, vin):
url_charging_base = "https://ownership.tesla.com/mobile-app/charging/"
url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}"
url_charging_invoice = f"{url_charging_base}invoice/{charging_session_invoice_id}?deviceLanguage=en&deviceCountry=AT&httpLocale=en_US&vin={vin}" # noqa

return base_req(url_charging_invoice)


def save_invoice(charging_sessions, desired_invoice_date):
def get_subscription_invoice(subscription_invoice_id, vin):
url_documents_invoice = f"https://ownership.tesla.com/mobile-app/documents/invoices/{subscription_invoice_id}"
params = {
"deviceLanguage": "en",
"deviceCountry": "AT",
"httpLocale": "en_US",
"vin": vin,
}
return base_req(url_documents_invoice, params=params)


def save_charging_invoice(charging_sessions, desired_invoice_date):
# make sure folder exists
INVOICE_PATH.mkdir(parents=True, exist_ok=True)

for charging_session in charging_sessions:
charging_session_datetime = datetime.fromisoformat(
charging_session["unlatchDateTime"]
)
charging_session_datetime = datetime.fromisoformat(charging_session["unlatchDateTime"])
charging_session_countrycode = charging_session["countryCode"]

# check for desired invoice date
Expand All @@ -343,23 +357,51 @@ def save_invoice(charging_sessions, desired_invoice_date):

local_file_path = (
INVOICE_PATH
/ f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}"
/ f"tesla_charging_invoice_{charging_session['vin']}_{charging_session_datetime.strftime('%Y-%m-%d')}_{charging_session_countrycode}_{charging_session_invoice_filename}" # noqa
)
if local_file_path.exists():
# file already downloaded, skip
logger.info(
f"Invoice {charging_session_invoice_filename} already saved"
)
logger.info(f"Invoice {charging_session_invoice_filename} already saved")
continue

logger.info(f"Downloading {charging_session_invoice_filename}")
charging_invoice = get_charging_invoice(
charging_session_invoice_id, charging_session["vin"]
)
charging_invoice = get_charging_invoice(charging_session_invoice_id, charging_session["vin"])
local_file_path.write_bytes(charging_invoice)
logger.info(f"File '{local_file_path}' saved.")


def save_subscription_invoice(subscription_invoices, desired_invoice_date, vin):
# make sure folder exists
INVOICE_PATH.mkdir(parents=True, exist_ok=True)

for invoice in subscription_invoices:
invoice_datetime = datetime.fromisoformat(subscription_invoices[0]["InvoiceDate"])

# check for desired invoice date
if desired_invoice_date.year == 1999:
# 1999 means all invoices
pass
elif invoice_datetime.year != desired_invoice_date.year:
# wrong year -> skip
continue
elif invoice_datetime.month != desired_invoice_date.month:
# correct year but bad month -> skip
continue

local_file_path = (
INVOICE_PATH / f"tesla_subscription_invoice_{vin}_{invoice_datetime.strftime('%Y-%m-%d')}.pdf"
)
if local_file_path.exists():
# file already downloaded, skip
logger.info(f"Invoice {invoice['InvoiceFileName']} already saved")
continue

logger.info(f"Downloading {invoice['InvoiceFileName']}")
subscription_invoice = get_subscription_invoice(invoice["InvoiceId"], vin)
local_file_path.write_bytes(subscription_invoice)
logger.info(f"File '{local_file_path}' saved.")


def get_vehicles():
url_products = "https://owner-api.teslamotors.com/api/1/products?orders=true"
vehicles = {}
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[tool.black]

line-length = 119
Loading