Skip to content

Commit

Permalink
feat: Add QR Code login
Browse files Browse the repository at this point in the history
  • Loading branch information
ReneNulschDE committed Dec 14, 2024
1 parent a7a5a38 commit 659ad2f
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 29 deletions.
155 changes: 129 additions & 26 deletions custom_components/mbapi2020/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

from copy import deepcopy
from typing import Any
import uuid

from awesomeversion import AwesomeVersion
Expand All @@ -18,6 +19,8 @@

from .client import Client
from .const import (
AUTH_METHOD_DEVICE_CODE,
AUTH_METHOD_PIN,
CONF_ALLOWED_AUTH_METHODS,
CONF_ALLOWED_REGIONS,
CONF_AUTH_METHOD,
Expand All @@ -34,13 +37,10 @@
VERIFY_SSL,
)
from .errors import MbapiError, MBAuthError
from .helper import LogHelper, UrlHelper as helper
from .webapi import WebApi

SCHEMA_STEP_USER = vol.Schema(
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_REGION): vol.In(CONF_ALLOWED_REGIONS),
}
)
SCHEMA_STEP_USER = vol.Schema({vol.Required(CONF_USERNAME): str})

SCHEMA_STEP_PIN = vol.Schema({vol.Required(CONF_PASSWORD): str})

Expand All @@ -56,17 +56,69 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):

def __init__(self):
"""Initialize component."""
super().__init__()
self._reauth_entry = None
self._data = None
self._data: dict[str, any] = {}
self._reauth_mode = False
self._session = None

async def async_step_user(self, user_input=None):
"""Handle the initial step."""

if user_input is not None:
self._data[CONF_REGION] = user_input[CONF_REGION]
if user_input.get("auth_method") == AUTH_METHOD_PIN:
return self.async_show_form(
step_id="userpin",
data_schema=SCHEMA_STEP_USER,
)
if user_input.get("auth_method") == AUTH_METHOD_DEVICE_CODE:
self._session = async_get_clientsession(self.hass, VERIFY_SSL)
client = Client(self.hass, self._session, None, region=self._data[CONF_REGION])

device_code_request_result = await client.oauth.async_request_device_code()

qrcode_url = helper.Device_code_confirm_url(
region=user_input[CONF_REGION], device_code=device_code_request_result.get("user_code", "").encode()
)
LOGGER.debug("QR_Code URL: %s", qrcode_url)

self._data["device_code_request_result"] = device_code_request_result

data_schema = vol.Schema(
{
vol.Optional("qr_code"): QrCodeSelector(
config=QrCodeSelectorConfig(
data=qrcode_url,
scale=6,
error_correction_level=QrErrorCorrectionLevel.QUARTILE,
)
),
}
)

return self.async_show_form(
step_id="device_code",
data_schema=data_schema,
)

return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_REGION): vol.In(CONF_ALLOWED_REGIONS),
vol.Required("auth_method"): vol.In(CONF_ALLOWED_AUTH_METHODS),
}
),
)

async def async_step_userpin(self, user_input=None):
"""Handle the initial step."""
errors = {}

if user_input is not None:
new_config_entry: config_entries.ConfigEntry = await self.async_set_unique_id(
f"{user_input[CONF_USERNAME]}-{user_input[CONF_REGION]}"
f"{user_input[CONF_USERNAME]}-{self._data[CONF_REGION]}"
)

if not self._reauth_mode:
Expand All @@ -76,33 +128,21 @@ async def async_step_user(self, user_input=None):
nonce = str(uuid.uuid4())
user_input["nonce"] = nonce

client = Client(self.hass, session, new_config_entry, region=user_input[CONF_REGION])
client = Client(self.hass, session, new_config_entry, region=self._data[CONF_REGION])
try:
await client.oauth.request_pin(user_input[CONF_USERNAME], nonce)
except (MBAuthError, MbapiError):
errors = {"base": "unknown"}
return self.async_show_form(step_id="user", data_schema=SCHEMA_STEP_USER, errors=errors)

if not errors:
self._data = user_input
self._data.update(user_input)
return await self.async_step_pin()

LOGGER.error("Request PIN error: %s", errors)

# data_schema = SCHEMA_STEP_USER.extend(
# {
# vol.Optional("qr_code"): QrCodeSelector(
# config=QrCodeSelectorConfig(
# data="https://link.emea-prod.mobilesdk.mercedes-benz.com/device-login?userCode=OTdOTi1CTVhX&deviceType=watch",
# scale=6,
# error_correction_level=QrErrorCorrectionLevel.QUARTILE,
# )
# )
# }
# )

return self.async_show_form(
step_id="user",
step_id="userpin",
data_schema=SCHEMA_STEP_USER,
)

Expand All @@ -123,7 +163,7 @@ async def async_step_pin(self, user_input=None):
try:
result = await client.oauth.request_access_token(self._data[CONF_USERNAME], pin, nonce)
except MbapiError as error:
LOGGER.error("Request token error: %s", errors)
LOGGER.error("Request token error: %s", error)
errors = error

if not errors:
Expand All @@ -136,7 +176,7 @@ async def async_step_pin(self, user_input=None):
return self.async_abort(reason="reauth_successful")

return self.async_create_entry(
title=f"{self._data[CONF_USERNAME]} (Region: {self._data[CONF_REGION]})",
title=f"{LogHelper.Mask_email(self._data[CONF_USERNAME])} (Region: {self._data[CONF_REGION]})",
data=self._data,
)

Expand All @@ -149,7 +189,70 @@ async def async_step_reauth(self, user_input=None):

self._reauth_entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])

return self.async_show_form(step_id="user", data_schema=SCHEMA_STEP_USER)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_REGION): vol.In(CONF_ALLOWED_REGIONS),
vol.Required("auth_method"): vol.In(CONF_ALLOWED_AUTH_METHODS),
}
),
)

async def async_step_reconfigure(self, args_input: dict[str, Any] | None = None):
"""Get new tokens for a config entry that can't authenticate."""

self._reauth_mode = True

self._reauth_entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])

return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_REGION): vol.In(CONF_ALLOWED_REGIONS),
vol.Required("auth_method"): vol.In(CONF_ALLOWED_AUTH_METHODS),
}
),
)

async def async_step_device_code(self, user_input=None):
"""Handle the initial step."""
errors = {}

if user_input is not None:
client = Client(self.hass, self._session, None, self._data[CONF_REGION])
try:
device_code = self._data["device_code_request_result"].get("device_code")
result = await client.oauth.async_request_device_code_access_token(device_code)
webapi: WebApi = WebApi(self.hass, client.oauth, self._session, self._data[CONF_REGION])
user_info = await webapi.get_user()
user_email = user_info.get("email", "email-not-found")
masked_email = LogHelper.Mask_email(user_email)
self._data[CONF_USERNAME] = user_email

except MbapiError as error:
LOGGER.error("Request token error: %s", error)
errors = error

if not errors:
LOGGER.debug("Token received")
self._data["token"] = result

if self._reauth_mode:
self.hass.config_entries.async_update_entry(self._reauth_entry, data=self._data)
self.hass.async_create_task(self.hass.config_entries.async_reload(self._reauth_entry.entry_id))
return self.async_abort(reason="reauth_successful")

return self.async_create_entry(
title=f"D_{masked_email}_{self._data[CONF_REGION]}",
data=self._data,
)

return self.async_show_form(
step_id="step_user",
data_schema=None,
)

@staticmethod
@callback
Expand Down
33 changes: 30 additions & 3 deletions custom_components/mbapi2020/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ def __init__(
) -> None:
"""Initialize the OAuth instance."""
self._session: ClientSession = session
self._region: str = region
self._region = "Europe" if region is None else region
self._hass = hass
self._config_entry = config_entry
self.token = None
self._sessionid = ""
self._get_token_lock = asyncio.Lock()

if isinstance(VERIFY_SSL, str):
self.ssl_context = ssl.create_default_context(cafile=VERIFY_SSL)

Expand All @@ -81,7 +80,8 @@ async def async_request_device_code(self):
await self._async_request("get", url, headers=headers)

url = f"{helper.Login_Base_Url(self._region)}/as/device_authz.oauth2"
data = f"client_id={helper.Login_App_Id(self._region)}&scope=openid email phone profile offline_access ciam-uid"
data = "client_id=62778dc4-1de3-44f4-af95-115f06a3a008&scope=openid email phone profile offline_access ciam-uid"
# data = f"client_id={helper.Login_App_Id(self._region)}&scope=openid email phone profile offline_access ciam-uid"
headers = self._get_header()
headers["Content-Type"] = "application/x-www-form-urlencoded"
headers["Stage"] = "prod"
Expand All @@ -96,6 +96,33 @@ async def async_request_device_code(self):
_LOGGER.debug(device_code_result)
return device_code_result

async def async_request_device_code_access_token(self, device_code: str):
"""Refresh the device code."""
_LOGGER.info("Start request_device_code")

_LOGGER.debug("Auth token refresh preflight request 1")
headers = self._get_header()
url = f"{helper.Rest_url(self._region)}/v1/config"
await self._async_request("get", url, headers=headers)

url = f"{helper.Login_Base_Url(self._region)}/as/token.oauth2"
data = f"client_id=62778dc4-1de3-44f4-af95-115f06a3a008&device_code={device_code}&grant_type=urn:ietf:params:oauth:grant-type:device_code"
# data = f"client_id={helper.Login_App_Id(self._region)}&scope=openid email phone profile offline_access ciam-uid"
headers = self._get_header()
headers["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8"
headers["Stage"] = "prod"
headers["X-Device-Id"] = str(uuid.uuid4())
headers["X-Request-Id"] = str(uuid.uuid4())
device_code_result = None
try:
device_code_result = await self._async_request(method="post", url=url, data=data, headers=headers)
device_code_result = self._add_custom_values_to_token_info(device_code_result)
self.token = device_code_result
except Exception as e:
_LOGGER.error(e)

return device_code_result

async def request_pin(self, email: str, nonce: str):
"""Initiate a PIN request."""
_LOGGER.info("Start request PIN %s", email)
Expand Down
11 changes: 11 additions & 0 deletions custom_components/mbapi2020/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@
"user": {
"data": {
"region": "Region",
"auth_method": "Authentication method"
},
"description": "Select your region and the authentication method",
"title": "Set up the Mercedes ME 2020 connection"
},
"userpin": {
"data": {
"username": "MB username - email address (or mobile phone including country code ex. +44)"
},
"description": "Enter your Mercedes ME login account (email address)",
"title": "Set up the Mercedes ME 2020 connection"
},
"device_code": {
"description": "Scan the code with a mobile phone with the Mercedes app installed and logged in. Click Submit after you approved the 'Watch' in the app.",
"title": "QR-Code"
}
}
},
Expand Down

0 comments on commit 659ad2f

Please sign in to comment.