1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
import datetime
import logging
import os
from flask import Flask, jsonify, render_template
from models import IdServerConfiguration, IdServerConfigurationHelper
from stores import DbEngine, SqlAlchemyIdentityProviderStore, IdentityProviderStore, SessionStore, InMemorySessionStore, UserStore, SqlAlchemyUserStore
from services import IdentityService, OpenIdService
from typing import Optional
import base64
import json
settings_file_path = "settings.json"
id_server_settings = IdServerConfiguration()
identity_provider_store = IdentityProviderStore()
session_store: SessionStore = SessionStore()
user_store = UserStore()
if os.path.exists(settings_file_path):
id_server_settings = IdServerConfigurationHelper.load_from_file(settings_file_path)
if id_server_settings.database.driver == 'sqlalchemy':
db_engine = DbEngine(id_server_settings.database.connection_str)
identity_provider_store = SqlAlchemyIdentityProviderStore(db_engine)
user_store = SqlAlchemyUserStore(db_engine)
db_engine.initialize()
if id_server_settings.session.driver == 'in_memory': # todo redis session store, maybe sql alchemy, will see
session_store = InMemorySessionStore()
# Initialize Keys
openid_service = OpenIdService(id_server_settings)
openid_service.init_keys()
app = Flask(__name__, static_folder='static/')
server_name = id_server_settings.address
server_name = server_name.lstrip('http').lstrip('https').lstrip("://").rstrip('/')
app.config.update({'SERVER_NAME': server_name,
'SECRET_KEY': 'dev_key', # make sure to change this!!
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(days=7).total_seconds(),
'PREFERRED_URL_SCHEME': 'http',
'DEBUG': True})
idservice = IdentityService(id_server_settings, openid_service, identity_provider_store, session_store, user_store)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
idservice.init_app(app)
openid_service.init_app(app)
def fix_base64padding(base64input: str):
mod = len(base64input) % 4
for i in range(mod):
base64input += '='
return base64input
@app.route('/')
@app.route('/index.html')
def index_view():
return render_template('index.html')
@app.route('/user/login')
@idservice.oidc_auth
def user_login():
user_session = idservice.get_session()
return jsonify(provider_access_token=user_session.provider_token.access_token,
provider_id_token=user_session.provider_token.id_token,
provider_refresh_token=user_session.provider_token.refresh_token,
provider_id_token_json=json.loads(base64.urlsafe_b64decode(fix_base64padding(user_session.provider_token.id_token.split('.')[1]))),
session_metadata=user_session.metadata)
@app.route('/user/logout')
@idservice.oidc_logout
def user_logout():
return "You've been successfully logged out!"
@idservice.error_view
def error(error: str=None, error_description: str=None):
return jsonify({'error': error, 'message': error_description})
if __name__ == '__main__':
app.run()
|