-
Notifications
You must be signed in to change notification settings - Fork 5
/
imap_auth_provider.py
147 lines (117 loc) · 5.31 KB
/
imap_auth_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import imaplib
from collections import namedtuple
logger = logging.getLogger(__name__)
class IMAPAuthProvider:
def __init__(self, config, account_handler):
self.account_handler = account_handler
self.create_users = config.create_users
self.server = config.server
self.port = config.port
self.plain_uid = config.plain_userid
self.append_domain = config.append_domain
async def check_password(self, user_id, password):
""" Attempt to authenticate a user against IMAP
and register an account if none exists.
Returns:
True if authentication against IMAP was successful
"""
if not user_id or not password:
return None
localpart = user_id.split(":", 1)[0][1:]
email = '@'.join(user_id[1:].split(':'))
if self.plain_uid:
address = localpart
else:
# user_id is of the form @foo:bar.com
address = email
return await self.check_3pid_auth("email", address, password, user_id=user_id, localpart=localpart)
async def check_3pid_auth(self, medium, address, password, user_id=None, localpart=None):
""" Handle authentication against third-party login types, such as email
Args:
medium (str): Medium of the 3PID (e.g email, msisdn).
address (str): Address of the 3PID (e.g [email protected] for email).
password (str): The provided password of the user.
Returns:
user_id (str|None): ID of the user if authentication
successful. None otherwise.
"""
if not address or not password:
return None
# We support only email
if medium != "email":
return None
logger.debug("Trying to login as {} on {}:{} via IMAP".format(address, self.server, self.port))
try:
M = imaplib.IMAP4_SSL(self.server, self.port)
r = M.login(address, password)
if r[0] == 'OK':
M.logout()
except imaplib.IMAP4.error as e:
logger.debug("IMAP exception occurred: {}".format(str(e)))
return None
if r[0] != 'OK':
logger.debug("IMAP login failed for {}".format(address))
return None
# Guessing localpart
if not localpart:
# Trimming domain and what's after '+'
localpart = address.split("@")[0].split("+")[0]
logger.debug("Guessed localpart for {}: {}".format(address, localpart))
# Need to find user_id
if not user_id:
# Guessing domain given email domain
if '@' in address:
domain = address.split('@')[-1]
elif self.append_domain:
domain = self.append_domain
else:
logger.debug("No domain can be guessed for {}".format(address))
return None
logger.debug("Guessed domain for {}: {}".format(address, domain))
user_id = "@{}:{}".format(localpart, domain)
logger.debug("IMAP login successful, checking whether user_id {} exists in Matrix".format(user_id))
if await self.account_handler.check_user_exists(user_id):
logger.debug("{} exists in our database; login successful for {}!".format(user_id, address))
return user_id
logger.debug("{} was not found in Matrix".format(user_id))
# Otherwise, attempt to create the user in Matrix if required
if not self.create_users:
return None
# We need to have a valid email address for registration
if '@' in address:
email = address
elif self.append_domain:
email = address + '@' + self.append_domain
else:
# Building the email address if `address` was not a valid email
email = '@'.join(user_id[1:].split(':'))
logger.debug("Attempting to create user {} with email {}".format(user_id, address))
# Create the user in Matrix
user_id, access_token = await self.account_handler.register(localpart=localpart, emails=[email])
logging.info("User {} successfully created!".format(user_id))
return user_id
@staticmethod
def parse_config(config):
imap_config = namedtuple('_Config', 'create_users')
imap_config.enabled = config.get('enabled', False)
imap_config.create_users = config.get('create_users', True)
imap_config.server = config.get('server', '')
imap_config.port = config.get('port', imaplib.IMAP4_SSL_PORT)
imap_config.plain_userid = config.get('plain_userid', False)
imap_config.append_domain = config.get('append_domain', '')
return imap_config