1+ from dataclasses import dataclass
12from typing import List , Optional
23from fastapi import HTTPException
34
1718 PERMISSION_DEFINITIONS ,
1819)
1920
21+ @dataclass (slots = True )
22+ class PermissionContext :
23+ exists : bool
24+ is_admin : bool
25+ path_rules : List [PathRule ]
26+
2027
2128class PermissionService :
2229 """权限检查服务"""
2330
2431 # 权限检查结果缓存(简单的内存缓存)
2532 _cache : dict [str , tuple [bool , float ]] = {}
33+ _context_cache : dict [int , tuple [PermissionContext , float ]] = {}
2634 _cache_ttl = 300 # 5分钟缓存
2735
2836 @classmethod
29- async def check_path_permission (
30- cls , user_id : int , path : str , action : str
31- ) -> bool :
32- """
33- 检查用户对路径的操作权限
34-
35- Args:
36- user_id: 用户ID
37- path: 要检查的路径
38- action: 操作类型 (read/write/delete/share)
39-
40- Returns:
41- 是否有权限
42- """
37+ def _now (cls ) -> float :
4338 import time
4439
45- # 检查缓存
46- cache_key = f"{ user_id } :{ path } :{ action } "
47- if cache_key in cls ._cache :
48- result , timestamp = cls ._cache [cache_key ]
49- if time .time () - timestamp < cls ._cache_ttl :
50- return result
40+ return time .time ()
41+
42+ @classmethod
43+ def _is_cache_valid (cls , timestamp : float ) -> bool :
44+ return cls ._now () - timestamp < cls ._cache_ttl
45+
46+ @classmethod
47+ def _get_cached_result (cls , cache_key : str ) -> Optional [bool ]:
48+ cached = cls ._cache .get (cache_key )
49+ if not cached :
50+ return None
51+ result , timestamp = cached
52+ if cls ._is_cache_valid (timestamp ):
53+ return result
54+ cls ._cache .pop (cache_key , None )
55+ return None
56+
57+ @classmethod
58+ def _sort_path_rules (cls , rules : List [PathRule ]) -> List [PathRule ]:
59+ return sorted (
60+ rules ,
61+ key = lambda r : (
62+ r .priority ,
63+ PathMatcher .get_pattern_specificity (r .path_pattern , r .is_regex ),
64+ ),
65+ reverse = True ,
66+ )
67+
68+ @classmethod
69+ def _match_sorted_path_rules (
70+ cls , path : str , action : str , sorted_rules : List [PathRule ]
71+ ) -> Optional [bool ]:
72+ for rule in sorted_rules :
73+ if PathMatcher .match_pattern (path , rule .path_pattern , rule .is_regex ):
74+ if action == PathAction .READ :
75+ return rule .can_read
76+ if action == PathAction .WRITE :
77+ return rule .can_write
78+ if action == PathAction .DELETE :
79+ return rule .can_delete
80+ if action == PathAction .SHARE :
81+ return rule .can_share
82+ return False
83+ return None
84+
85+ @classmethod
86+ async def _get_permission_context (cls , user_id : int ) -> PermissionContext :
87+ cached = cls ._context_cache .get (user_id )
88+ if cached :
89+ context , timestamp = cached
90+ if cls ._is_cache_valid (timestamp ):
91+ return context
92+ cls ._context_cache .pop (user_id , None )
5193
52- # 获取用户
5394 user = await UserAccount .get_or_none (id = user_id )
5495 if not user :
55- return False
96+ context = PermissionContext (exists = False , is_admin = False , path_rules = [])
97+ cls ._context_cache [user_id ] = (context , cls ._now ())
98+ return context
5699
57- # 超级管理员直接放行
58100 if user .is_admin :
59- cls ._cache [cache_key ] = (True , time .time ())
60- return True
101+ context = PermissionContext (exists = True , is_admin = True , path_rules = [])
102+ cls ._context_cache [user_id ] = (context , cls ._now ())
103+ return context
61104
62- # 获取用户所有角色
63- user_roles = await UserRole .filter (user_id = user_id ).prefetch_related ("role" )
105+ user_roles = await UserRole .filter (user_id = user_id )
64106 role_ids = [ur .role_id for ur in user_roles ]
65-
66107 if not role_ids :
67- cls ._cache [cache_key ] = (False , time .time ())
108+ context = PermissionContext (exists = True , is_admin = False , path_rules = [])
109+ cls ._context_cache [user_id ] = (context , cls ._now ())
110+ return context
111+
112+ path_rules = await PathRule .filter (role_id__in = role_ids )
113+ context = PermissionContext (
114+ exists = True ,
115+ is_admin = False ,
116+ path_rules = cls ._sort_path_rules (list (path_rules )),
117+ )
118+ cls ._context_cache [user_id ] = (context , cls ._now ())
119+ return context
120+
121+ @classmethod
122+ def _check_path_permission_with_context (
123+ cls ,
124+ user_id : int ,
125+ normalized_path : str ,
126+ action : str ,
127+ context : PermissionContext ,
128+ ) -> bool :
129+ if not context .exists :
68130 return False
131+ if context .is_admin :
132+ return True
69133
70- # 获取所有角色的路径规则
71- path_rules = await PathRule . filter ( role_id__in = role_ids ). order_by ( "-priority" )
134+ checked_cache_keys : List [ str ] = []
135+ current_path = normalized_path
72136
73- # 规范化路径
74- normalized_path = PathMatcher .normalize_path (path )
137+ while True :
138+ cache_key = f"{ user_id } :{ current_path } :{ action } "
139+ cached_result = cls ._get_cached_result (cache_key )
140+ if cached_result is not None :
141+ result = cached_result
142+ break
143+
144+ checked_cache_keys .append (cache_key )
145+ result = cls ._match_sorted_path_rules (current_path , action , context .path_rules )
146+ if result is not None :
147+ break
148+
149+ parent_path = PathMatcher .get_parent_path (current_path )
150+ if not parent_path :
151+ result = False
152+ break
153+ current_path = parent_path
154+
155+ timestamp = cls ._now ()
156+ for cache_key in checked_cache_keys :
157+ cls ._cache [cache_key ] = (result , timestamp )
158+ return result
75159
76- # 按优先级和具体程度匹配
77- result = cls ._match_path_rules (normalized_path , action , list (path_rules ))
160+ @classmethod
161+ async def check_path_permission (
162+ cls , user_id : int , path : str , action : str
163+ ) -> bool :
164+ """
165+ 检查用户对路径的操作权限
78166
79- # 如果没有匹配到规则,检查父目录(继承)
80- if result is None :
81- parent_path = PathMatcher .get_parent_path (normalized_path )
82- if parent_path :
83- result = await cls .check_path_permission (user_id , parent_path , action )
84- else :
85- result = False # 默认拒绝
167+ Args:
168+ user_id: 用户ID
169+ path: 要检查的路径
170+ action: 操作类型 (read/write/delete/share)
86171
87- cls ._cache [cache_key ] = (result , time .time ())
172+ Returns:
173+ 是否有权限
174+ """
175+ normalized_path = PathMatcher .normalize_path (path )
176+ cache_key = f"{ user_id } :{ normalized_path } :{ action } "
177+ cached_result = cls ._get_cached_result (cache_key )
178+ if cached_result is not None :
179+ return cached_result
180+
181+ context = await cls ._get_permission_context (user_id )
182+ result = cls ._check_path_permission_with_context (user_id , normalized_path , action , context )
183+ cls ._cache [cache_key ] = (result , cls ._now ())
88184 return result
89185
90186 @classmethod
@@ -97,31 +193,7 @@ def _match_path_rules(
97193 Returns:
98194 True/False 表示明确的权限结果,None 表示没有匹配到规则
99195 """
100- # 按优先级和具体程度排序
101- sorted_rules = sorted (
102- rules ,
103- key = lambda r : (
104- r .priority ,
105- PathMatcher .get_pattern_specificity (r .path_pattern , r .is_regex ),
106- ),
107- reverse = True ,
108- )
109-
110- for rule in sorted_rules :
111- if PathMatcher .match_pattern (path , rule .path_pattern , rule .is_regex ):
112- # 匹配到规则,检查具体操作权限
113- if action == PathAction .READ :
114- return rule .can_read
115- elif action == PathAction .WRITE :
116- return rule .can_write
117- elif action == PathAction .DELETE :
118- return rule .can_delete
119- elif action == PathAction .SHARE :
120- return rule .can_share
121- else :
122- return False
123-
124- return None
196+ return cls ._match_sorted_path_rules (path , action , cls ._sort_path_rules (rules ))
125197
126198 @classmethod
127199 async def check_system_permission (cls , user_id : int , permission_code : str ) -> bool :
@@ -251,35 +323,20 @@ async def check_path_permission_detailed(
251323 cls , user_id : int , path : str , action : str
252324 ) -> PathPermissionResult :
253325 """检查路径权限并返回详细结果"""
254- user = await UserAccount . get_or_none ( id = user_id )
255- if not user :
326+ context = await cls . _get_permission_context ( user_id )
327+ if not context . exists :
256328 return PathPermissionResult (path = path , action = action , allowed = False )
257329
258- # 超级管理员
259- if user .is_admin :
330+ if context .is_admin :
260331 return PathPermissionResult (path = path , action = action , allowed = True )
261332
262- # 获取用户角色
263- user_roles = await UserRole .filter (user_id = user_id )
264- role_ids = [ur .role_id for ur in user_roles ]
265-
266- if not role_ids :
333+ if not context .path_rules :
267334 return PathPermissionResult (path = path , action = action , allowed = False )
268335
269- # 获取路径规则
270- path_rules = await PathRule .filter (role_id__in = role_ids ).order_by ("-priority" )
271336 normalized_path = PathMatcher .normalize_path (path )
272337
273- # 查找匹配的规则
274338 matched_rule = None
275- for rule in sorted (
276- path_rules ,
277- key = lambda r : (
278- r .priority ,
279- PathMatcher .get_pattern_specificity (r .path_pattern , r .is_regex ),
280- ),
281- reverse = True ,
282- ):
339+ for rule in context .path_rules :
283340 if PathMatcher .match_pattern (
284341 normalized_path , rule .path_pattern , rule .is_regex
285342 ):
@@ -322,19 +379,30 @@ def clear_cache(cls, user_id: int | None = None) -> None:
322379 """清除权限缓存"""
323380 if user_id is None :
324381 cls ._cache .clear ()
382+ cls ._context_cache .clear ()
325383 else :
326- # 清除特定用户的缓存
327384 keys_to_delete = [k for k in cls ._cache if k .startswith (f"{ user_id } :" )]
328385 for k in keys_to_delete :
329386 del cls ._cache [k ]
387+ cls ._context_cache .pop (user_id , None )
330388
331389 @classmethod
332390 async def filter_paths_by_permission (
333391 cls , user_id : int , paths : List [str ], action : str
334392 ) -> List [str ]:
335393 """过滤出用户有权限的路径列表"""
394+ if not paths :
395+ return []
396+
397+ context = await cls ._get_permission_context (user_id )
398+ if not context .exists :
399+ return []
400+ if context .is_admin :
401+ return list (paths )
402+
336403 result = []
337404 for path in paths :
338- if await cls .check_path_permission (user_id , path , action ):
405+ normalized_path = PathMatcher .normalize_path (path )
406+ if cls ._check_path_permission_with_context (user_id , normalized_path , action , context ):
339407 result .append (path )
340408 return result
0 commit comments