models.Session

src/idserver/models/Session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

from .IdentityProviderToken import IdentityProviderToken
from typing import Dict, Any, List, Mapping
import uuid
import os, hashlib
import base64
import time

class Session:
    def __init__(self, session_id: str=None, 
                    provider_token: IdentityProviderToken=None,
                    provider_claims: Dict[str, Any]={},
                    provider_name: str=None,
                    last_authenticated: int=None,
                    last_session_refresh: int=None,
                    created_at: int=None,
                    metadata: Dict[str, Any]={}) -> None:
        self.session_id = session_id
        self.provider_token = provider_token
        self.provider_claims = provider_claims
        self.provider_name = provider_name
        self.last_authenticated = last_authenticated
        self.last_session_refresh = last_session_refresh
        self.created_at = created_at
        self.metadata = metadata

        if self.session_id is None:
            self.__generate_session_id()
            self.created_at = int(time.time())

    def __generate_session_id(self) -> None:
        self.session_id = base64.b64encode(hashlib.sha256(os.urandom(4096)).digest()).decode('utf-8')

    def update_provider_token(self, access_token: str, 
                                    id_token: str, 
                                    refresh_token: str) -> None:
        self.provider_token = IdentityProviderToken(access_token=access_token, 
                                                    id_token=id_token,
                                                    refresh_token=refresh_token)

        now = int(time.time())

        self.last_authenticated = now
        self.last_session_refresh = now

    def is_authenticated(self):
        """
        flask_session is empty when the session hasn't been initialised or has expired.
        Thus checking for existence of any item is enough to determine if we're authenticated.
        """

        return self.last_authenticated is not None and \
                self.session_id is not None

    def should_refresh(self, refresh_interval_seconds=None):
        return refresh_interval_seconds is not None and \
               self.last_session_refresh is not None and \
               self._refresh_time(refresh_interval_seconds) < time.time()

    def _refresh_time(self, refresh_interval_seconds):
        last = last_session_refresh or 0
        return last + refresh_interval_seconds

    def update(self, access_token: str=None, 
                        id_token: Mapping[str, Any]=None, 
                        id_token_jwt: str=None, 
                        userinfo: Mapping[str, str]=None, 
                        refresh_token: str=None) -> None:
        """
        Args:
            access_token (str)
            id_token (Mapping[str, str])
            id_token_jwt (str)
            userinfo (Mapping[str, str])
            refresh_token (str)
        """
        """def set_if_defined(session_key, value):
            if value:
                self._session_storage[session_key] = value

        now = int(time.time())
        auth_time = now
        if id_token:
            auth_time = id_token.get('auth_time', auth_time)

        self._session_storage['last_authenticated'] = auth_time
        self._session_storage['last_session_refresh'] = now
        set_if_defined('access_token', access_token)
        set_if_defined('id_token', id_token)
        set_if_defined('id_token_jwt', id_token_jwt)
        set_if_defined('userinfo', userinfo)
        set_if_defined('refresh_token', refresh_token)"""
        raise NotImplementedError()

    def clear(self):
        """for key in self.KEYS:
            self._session_storage.pop(key, None)"""
        raise NotImplementedError()