This repository has been archived by the owner on Jan 20, 2024. It is now read-only.
/
auth.py
148 lines (128 loc) · 4.87 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from hashlib import sha256
from base64 import b64encode
from typing import List
from bcrypt import hashpw, gensalt, checkpw
from project_amber.const import MSG_USER_EXISTS
from project_amber.db import db
from project_amber.helpers import time
from project_amber.handlers import LoginUser
from project_amber.handlers.const import API_PASSWORD
from project_amber.errors import Unauthorized, NotFound, Conflict
from project_amber.logging import error
from project_amber.models.auth import User, Session
def prehash(password: str) -> bytes:
"""
Returns a "normalized" representation of the password that works
with bcrypt even when the password is longer than 72 chars.
"""
return b64encode(sha256(password.encode()).digest())
def gen_hashed_pw(password: str) -> bytes:
"""
Returns a bcrypt password hash with random salt.
"""
return hashpw(prehash(password), gensalt()).decode()
def gen_token() -> str:
"""
Returns a new freshly generated auth token.
"""
return sha256(gensalt() + bytes(str(time()).encode())).hexdigest()
class UserController:
user: LoginUser
def __init__(self, user: LoginUser):
self.user = user
def add_user(self, name: str, password: str) -> int:
"""
Creates a new user. Returns their ID on success.
"""
# does a user with this name already exist?
if not db.session.query(User).filter_by(name=name).one_or_none() is None:
raise Conflict(MSG_USER_EXISTS)
hashed_pw = gen_hashed_pw(password)
user = User(name=name, password=hashed_pw)
db.session.add(user)
db.session.commit()
return user.id
def update_user(self, **kwargs) -> int:
"""
Updates user data in the database. Returns their ID on success.
"""
user_record = db.session.query(User).filter_by(id=self.user.id).one()
for attribute in kwargs:
if attribute == API_PASSWORD:
user_record.password = gen_hashed_pw(kwargs[API_PASSWORD])
db.session.commit()
return self.user.id
def remove_user(self) -> int:
"""
Removes a user from the database. Returns their ID.
"""
user = db.session.query(User).filter_by(id=self.user.id).one_or_none()
try:
db.session.delete(user)
db.session.commit()
# pylint: disable=bare-except
except:
error("Failed to remove user %s!" % user.name)
return self.user.id
def verify_pw(self, uid: int, password: str) -> bool:
"""
Verifies user's password with bcrypt's checkpw(). Returns `True`, if
the passwords match, and False otherwise.
"""
user = db.session.query(User).filter_by(id=uid).one()
user_pass = user.password
if isinstance(user_pass, str):
user_pass = user_pass.encode()
return checkpw(prehash(password), user_pass)
def create_session(self, name: str, password: str, ip_addr: str) -> str:
"""
Creates a new user session. Returns an auth token.
"""
user = db.session.query(User).filter_by(name=name).one_or_none()
token: str
if user is None:
raise Unauthorized
if self.verify_pw(user.id, password):
token = gen_token()
session = Session(token=token, user=user.id, login_time=time(), address=ip_addr)
db.session.add(session)
db.session.commit()
else:
raise Unauthorized
return token
def remove_session(self) -> str:
"""
Logs the user out by removing their token from the database. Returns
the token on success.
"""
session = db.session.query(Session).filter_by(token=self.user.token,
user=self.user.id).one_or_none()
if session is None:
raise NotFound
db.session.delete(session)
db.session.commit()
return self.user.token
def remove_session_by_id(self, sid: int) -> int:
"""
Removes a user session by session ID. Returns the session ID on success.
"""
session = db.session.query(Session).filter_by(id=sid, user=self.user.id).one_or_none()
if session is None:
raise NotFound
db.session.delete(session)
db.session.commit()
return sid
def get_sessions(self) -> List[Session]:
"""
Returns a list of sessions of a user (class `Session`).
"""
sessions = db.session.query(Session).filter_by(user=self.user.id).all()
return sessions
def get_session(self, sid: int) -> Session:
"""
Returns a single `Session` by its ID.
"""
session = db.session.query(Session).filter_by(id=sid, user=self.user.id).one_or_none()
if session is None:
raise NotFound
return session