forked from thomaxxl/safrs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo_auth.py
executable file
·149 lines (122 loc) · 4.58 KB
/
demo_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
#
# This is a demo application to demonstrate the functionality of the safrs_rest REST API with authentication
#
# you will have to install the requirements:
# pip3 install passlib flask_httpauth flask_login
#
# This script can be ran standalone like this:
# python3 demo_auth.py [Listener-IP]
# This will run the example on http://Listener-Ip:5000
#
# - A database is created and a item is added
# - User is created and the User endpoint is protected by user:admin & pass: password
# - swagger2 documentation is generated
#
import sys
import os
import logging
import builtins
from functools import wraps
from flask import Flask, redirect, jsonify, make_response
from flask import abort, request, g, url_for
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String
from safrs import SAFRSBase, SAFRSAPI, jsonapi_rpc
from flask_swagger_ui import get_swaggerui_blueprint
from flask_sqlalchemy import SQLAlchemy
from flask_httpauth import HTTPBasicAuth
from passlib.apps import custom_app_context as pwd_context
from itsdangerous import (TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired)
from flask_login import LoginManager, UserMixin, \
login_required, login_user, logout_user
db = SQLAlchemy()
auth = HTTPBasicAuth()
# Example sqla database object
class Item(SAFRSBase, db.Model):
'''
description: Item description
'''
__tablename__ = 'items'
id = Column(String, primary_key=True)
name = Column(String, default = '')
class User(SAFRSBase, db.Model):
'''
description: User description
'''
__tablename__ = 'users'
username = db.Column(db.String(32), primary_key=True)
password_hash = db.Column(db.String(64))
custom_decorators = [auth.login_required]
@jsonapi_rpc(http_methods=['POST'])
def hash_password(self, password):
self.password_hash = pwd_context.encrypt(password)
@jsonapi_rpc(http_methods=['POST'])
def verify_password(self, password):
return pwd_context.verify(password, self.password_hash)
@jsonapi_rpc(http_methods=['POST'])
def generate_auth_token(self, expiration=600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'username': self.username})
@staticmethod
@jsonapi_rpc(http_methods=['POST'])
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['username'])
return user
def start_app(app):
OAS_PREFIX = '/api' # swagger location
api = SAFRSAPI(app, host='{}:{}'.format(HOST,PORT), schemes=["http"], prefix=OAS_PREFIX, api_spec_url=OAS_PREFIX+'/swagger' )
api.expose_object(Item)
api.expose_object(User)
item = Item(name='test',email='em@il')
#user = User(username='admin')
#user.hash_password('password')
print('Starting API: http://{}:{}/api'.format(HOST,PORT))
app.run(host=HOST, port = PORT)
#
# APP Initialization
#
app = Flask('demo_app')
app.config.update( SQLALCHEMY_DATABASE_URI = 'sqlite:////tmp/demo.sqlite',
SQLALCHEMY_TRACK_MODIFICATIONS = False,
SECRET_KEY = b'sdqfjqsdfqizroqnxwc',
DEBUG = True)
HOST = sys.argv[1] if len(sys.argv) > 1 else '0.0.0.0'
PORT = 5000
db.init_app(app)
#
# Authentication and custom routes
#
@auth.verify_password
def verify_password(username_or_token, password):
if username_or_token == 'user' and password == 'password':
return True
return False
user = User.verify_auth_token(username_or_token)
print(user, username_or_token, password)
if not user:
# try to authenticate with username/password
user = User.query.filter_by(username=username_or_token).first()
print(user)
if not user or not user.verify_password(password):
return False
print('Authentication Successful for "{}"'.format(user.username))
return True
@app.route('/')
def goto_api():
return redirect('/api')
@app.teardown_appcontext
def shutdown_session(exception=None):
'''cfr. http://flask.pocoo.org/docs/0.12/patterns/sqlalchemy/'''
db.session.remove()
# Start the application
with app.app_context():
db.create_all()
start_app(app)