-
Notifications
You must be signed in to change notification settings - Fork 0
/
sqldb.py
109 lines (87 loc) · 3.26 KB
/
sqldb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import relationship
from datetime import datetime, timedelta
from flask_security import Security, SQLAlchemyUserDatastore, \
UserMixin, RoleMixin, login_required
db = SQLAlchemy()
# Define models
roles_users = db.Table('roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
class Role(db.Model, RoleMixin):
id = db.Column(db.Integer(), primary_key=True)
name = db.Column(db.String(80), unique=True)
description = db.Column(db.String(255))
class User(db.Model, RoleMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(40), unique=True)
email = db.Column(db.String(190), unique=True)
password = db.Column(db.String(190))
active = db.Column(db.Boolean())
confirmed_at = db.Column(db.DateTime())
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
def check_password(self, password):
return self.password == password
class Client(db.Model):
# id = db.Column(db.Integer, primary_key=True)
# human readable name
name = db.Column(db.String(40))
client_id = db.Column(db.String(40), primary_key=True)
client_secret = db.Column(db.String(55), unique=True, index=True,
nullable=False)
client_type = db.Column(db.String(20), default='public')
_redirect_uris = db.Column(db.Text)
default_scope = db.Column(db.Text, default='email address')
@property
def user(self):
return User.query.get(1)
@property
def redirect_uris(self):
if self._redirect_uris:
return self._redirect_uris.split()
return []
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
@property
def default_scopes(self):
if self.default_scope:
return self.default_scope.split()
return []
@property
def allowed_grant_types(self):
return ['authorization_code', 'password', 'client_credentials',
'refresh_token']
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(
db.String(40), db.ForeignKey('client.client_id', ondelete='CASCADE'),
nullable=False,
)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE')
)
user = relationship('User')
client = relationship('Client')
token_type = db.Column(db.String(40))
access_token = db.Column(db.String(255))
refresh_token = db.Column(db.String(255))
expires = db.Column(db.DateTime)
scope = db.Column(db.Text)
def __init__(self, **kwargs):
expires_in = kwargs.pop('expires_in')
self.expires = datetime.utcnow() + timedelta(seconds=expires_in)
for k, v in kwargs.items():
setattr(self, k, v)
@property
def scopes(self):
if self.scope:
return self.scope.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)