This repository has been archived by the owner on Jan 20, 2024. It is now read-only.
/
auth.py
141 lines (129 loc) · 4.41 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from hashlib import sha256
from base64 import b64encode
from bcrypt import hashpw, gensalt, checkpw
from flask import request
from project_amber.const import MSG_USER_NOT_FOUND, MSG_USER_EXISTS
from project_amber.db import db
from project_amber.helpers import time, LoginUser
from project_amber.errors import Unauthorized, NotFound, Conflict
from project_amber.logging import log
from project_amber.models.auth import User, Session
def prehash(password: str) -> bytes:
"""
Returns a "normalized" representation of the password that works
with bcrypt even when the password is longer than 72 chars.
"""
return b64encode(sha256(password.encode()).digest())
def addUser(name: str, password: str) -> int:
"""
Creates a new user. Returns their ID on success.
"""
# does a user with this name already exist?
if not db.session.query(User).filter_by(name=name).one_or_none() is None:
raise Conflict(MSG_USER_EXISTS)
prehashed_pw = prehash(password)
hashed_pw = hashpw(prehashed_pw, gensalt()).decode()
user = User(name=name, password=hashed_pw)
log("Adding user %s..." % name)
db.session.add(user)
db.session.commit()
return user.id
def updateUser(**kwargs) -> int:
"""
Updates user data in the database. Returns their ID on success.
"""
user: LoginUser = request.user
user_record = db.session.query(User).filter_by(id=user.id).one()
for attribute in kwargs:
if attribute == "password":
user_record.password = hashpw(
prehash(kwargs["password"]),
gensalt()
).decode()
db.session.commit()
return user.id
def removeUser(uid: int) -> int:
"""
Removes a user given their ID. Returns their ID on success.
"""
user = db.session.query(User).filter_by(id=uid).one_or_none()
if user is None:
raise NotFound(MSG_USER_NOT_FOUND)
log("Removing user %s..." % user.name)
db.session.delete(user)
db.session.commit()
return uid
def verifyPassword(uid: int, password: str) -> bool:
"""
Verifies user's password with bcrypt's checkpw(). Returns `True`, if
the passwords match, and False otherwise.
"""
user = db.session.query(User).filter_by(id=uid).one()
user_pass = user.password
if isinstance(user_pass, str):
user_pass = user_pass.encode()
prehashed_pw = prehash(password)
return checkpw(prehashed_pw, user_pass)
def createSession(name: str, password: str) -> str:
"""
Creates a new user session. Returns an auth token.
"""
user = db.session.query(User).filter_by(name=name).one_or_none()
if user is None:
raise Unauthorized # this may present no sense, but the app doesn't
# have to reveal the presence or absence of a user in the system
if verifyPassword(user.id, password):
token = sha256(gensalt() + bytes(str(time()).encode())).hexdigest()
session = Session(token=token, user=user.id, login_time=time(), \
address=request.remote_addr)
log("User {0} logged in from {1}".format(
user.name,
request.remote_addr
))
db.session.add(session)
db.session.commit()
return token
raise Unauthorized
def removeSession(token: str) -> str:
"""
Removes a user session by token. Returns the token on success.
"""
session = db.session.query(Session).filter_by(
token=token,
user=request.user.id
).one_or_none()
if session is None:
raise NotFound
db.session.delete(session)
db.session.commit()
return token
def removeSessionById(session_id: int) -> int:
"""
Removes a user session by session ID. Returns the session ID on success.
"""
session = db.session.query(Session).filter_by(
id=session_id,
user=request.user.id
).one_or_none()
if session is None:
raise NotFound
db.session.delete(session)
db.session.commit()
return session_id
def getSessions() -> list:
"""
Returns a list of sessions of a user (class `Session`).
"""
sessions = db.session.query(Session).filter_by(user=request.user.id).all()
return sessions
def getSession(session_id: int) -> Session:
"""
Returns a single `Session` by its ID.
"""
session = db.session.query(Session).filter_by(
id=session_id,
user=request.user.id
).one_or_none()
if session is None:
raise NotFound
return session