1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
from .IdentityProviderToken import IdentityProviderToken
from typing import Dict, Any, List, Mapping
import uuid
import os, hashlib
import base64
import time
class Session:
def __init__(self, session_id: str=None,
provider_token: IdentityProviderToken=None,
provider_claims: Dict[str, Any]={},
provider_name: str=None,
last_authenticated: int=None,
last_session_refresh: int=None,
created_at: int=None,
metadata: Dict[str, Any]={}) -> None:
self.session_id = session_id
self.provider_token = provider_token
self.provider_claims = provider_claims
self.provider_name = provider_name
self.last_authenticated = last_authenticated
self.last_session_refresh = last_session_refresh
self.created_at = created_at
self.metadata = metadata
if self.session_id is None:
self.__generate_session_id()
self.created_at = int(time.time())
def __generate_session_id(self) -> None:
self.session_id = base64.b64encode(hashlib.sha256(os.urandom(4096)).digest()).decode('utf-8')
def update_provider_token(self, access_token: str,
id_token: str,
refresh_token: str) -> None:
self.provider_token = IdentityProviderToken(access_token=access_token,
id_token=id_token,
refresh_token=refresh_token)
now = int(time.time())
self.last_authenticated = now
self.last_session_refresh = now
def is_authenticated(self):
"""
flask_session is empty when the session hasn't been initialised or has expired.
Thus checking for existence of any item is enough to determine if we're authenticated.
"""
return self.last_authenticated is not None and \
self.session_id is not None
def should_refresh(self, refresh_interval_seconds=None):
return refresh_interval_seconds is not None and \
self.last_session_refresh is not None and \
self._refresh_time(refresh_interval_seconds) < time.time()
def _refresh_time(self, refresh_interval_seconds):
last = last_session_refresh or 0
return last + refresh_interval_seconds
def update(self, access_token: str=None,
id_token: Mapping[str, Any]=None,
id_token_jwt: str=None,
userinfo: Mapping[str, str]=None,
refresh_token: str=None) -> None:
"""
Args:
access_token (str)
id_token (Mapping[str, str])
id_token_jwt (str)
userinfo (Mapping[str, str])
refresh_token (str)
"""
"""def set_if_defined(session_key, value):
if value:
self._session_storage[session_key] = value
now = int(time.time())
auth_time = now
if id_token:
auth_time = id_token.get('auth_time', auth_time)
self._session_storage['last_authenticated'] = auth_time
self._session_storage['last_session_refresh'] = now
set_if_defined('access_token', access_token)
set_if_defined('id_token', id_token)
set_if_defined('id_token_jwt', id_token_jwt)
set_if_defined('userinfo', userinfo)
set_if_defined('refresh_token', refresh_token)"""
raise NotImplementedError()
def clear(self):
"""for key in self.KEYS:
self._session_storage.pop(key, None)"""
raise NotImplementedError()
|