-
-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathauthentication.py
More file actions
213 lines (164 loc) · 6.99 KB
/
authentication.py
File metadata and controls
213 lines (164 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
"""Custom authentication for the forms backend."""
import functools
import typing
from http import HTTPStatus
import jwt
from django.conf import settings
from django.http import HttpRequest
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.response import Response
from rest_framework.views import APIView
from . import discord
from . import models
def encode_jwt(info: dict, *, signing_secret_key: str = settings.SECRET_KEY) -> str:
"""Encode JWT information with either the configured signing key or a passed one."""
return jwt.encode(info, signing_secret_key, algorithm="HS256")
class FormsUser:
"""Stores authentication information for a forms user."""
# This allows us to safely use the same checks that we could use on a Django user.
is_authenticated: bool = True
def __init__(
self,
token: str,
payload: dict[str, typing.Any],
member: models.DiscordMember | None,
) -> None:
"""Set up a forms user."""
self.token = token
self.payload = payload
self.admin = False
self.member = member
@property
def display_name(self) -> str:
"""Return username and discriminator as display name."""
return f"{self.payload['username']}#{self.payload['discriminator']}"
@property
def discord_mention(self) -> str:
"""Return a mention for this user on Discord."""
return f"<@{self.payload['id']}>"
@property
def user_id(self) -> str:
"""Return this user's ID as a string."""
return str(self.payload["id"])
@property
def decoded_token(self) -> dict[str, any]:
"""Decode the information stored in this user's JWT token."""
return jwt.decode(self.token, settings.SECRET_KEY, algorithms=["HS256"])
def get_roles(self) -> tuple[str, ...]:
"""Get a tuple of the user's discord roles by name."""
if not self.member:
return []
server_roles = discord.get_roles()
roles = [role.name for role in server_roles if role.id in self.member.roles]
if "admin" in roles:
# Protect against collision with the forms admin role
roles.remove("admin")
roles.append("discord admin")
return tuple(roles)
def is_admin(self) -> bool:
"""Return whether this user is an administrator."""
self.admin = models.Admin.objects.filter(id=self.payload["id"]).exists()
return self.admin
def refresh_data(self) -> None:
"""Fetches user data from discord, and updates the instance."""
self.member = discord.get_member(self.payload["id"])
if self.member:
self.payload = self.member.user.dict()
else:
self.payload = discord.fetch_user_details(self.decoded_token.get("token"))
updated_info = self.decoded_token
updated_info["user_details"] = self.payload
self.token = encode_jwt(updated_info)
class AuthenticationResult(typing.NamedTuple):
"""Return scopes that the user has authenticated with."""
scopes: tuple[str, ...]
# See https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication
class JWTAuthentication(BaseAuthentication):
"""Custom DRF authentication backend for JWT."""
@staticmethod
def get_token_from_cookie(cookie: str) -> str:
"""Parse JWT token from cookie."""
try:
prefix, token = cookie.split()
except ValueError:
msg = "Unable to split prefix and token from authorization cookie."
raise AuthenticationFailed(msg)
if prefix.upper() != "JWT":
msg = f"Invalid authorization cookie prefix '{prefix}'."
raise AuthenticationFailed(msg)
return token
def authenticate(
self,
request: HttpRequest,
) -> tuple[FormsUser, None] | None:
"""Handles JWT authentication process."""
cookie = request.COOKIES.get("token")
if not cookie:
return None
token = self.get_token_from_cookie(cookie)
try:
# New key.
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
except jwt.InvalidTokenError:
try:
# Old key. Should be removed at a certain point.
payload = jwt.decode(token, settings.FORMS_SECRET_KEY, algorithms=["HS256"])
except jwt.InvalidTokenError as e:
raise AuthenticationFailed(str(e))
scopes = ["authenticated"]
if not payload.get("token"):
msg = "Token is missing from JWT."
raise AuthenticationFailed(msg)
if not payload.get("refresh"):
msg = "Refresh token is missing from JWT."
raise AuthenticationFailed(msg)
try:
user_details = payload.get("user_details")
if not user_details or not user_details.get("id"):
msg = "Improper user details."
raise AuthenticationFailed(msg)
except Exception:
msg = "Could not parse user details."
raise AuthenticationFailed(msg)
user = FormsUser(
token,
user_details,
discord.get_member(user_details["id"]),
)
if user.is_admin():
scopes.append("admin")
scopes.extend(user.get_roles())
return user, AuthenticationResult(scopes=tuple(scopes))
APIHandlerMethod = typing.Callable[[APIView, HttpRequest, str | None], Response]
"""Represents a DRF API class-based view endpoint method."""
def require_scopes(scopes: frozenset[str]) -> APIHandlerMethod:
"""Require the requesting user to have the given `scopes`."""
if not isinstance(scopes, set):
error = TypeError("please supply scopes as a set")
error.add_note(f"got {scopes!r} ({type(scopes)})")
raise error
required_scopes = frozenset(scopes)
def wrapper(f: APIHandlerMethod) -> APIHandlerMethod:
@functools.wraps(f)
def authenticated_endpoint(instance: APIView, request: HttpRequest, format: str | None = None) -> Response:
"""Verify the requesting user is authenticated."""
if not request.user.is_authenticated:
return Response(
data={"king-arthur-verdict": "not-allowed"},
status=HTTPStatus.UNAUTHORIZED,
)
request_scopes = frozenset(request.auth.scopes)
actual_scopes = required_scopes.intersection(request_scopes)
lacking_scopes = required_scopes - actual_scopes
if lacking_scopes:
return Response(
data={
"king-arthur-verdict": "missing-scopes",
"missing-scopes": tuple(lacking_scopes),
},
status=HTTPStatus.FORBIDDEN,
)
return f(instance, request, format)
return authenticated_endpoint
return wrapper