1010from localstack .services .plugins import SERVICE_PLUGINS
1111from localstack .aws .api import RequestContext
1212from collections import defaultdict
13- from threading import Thread , Condition
13+ from threading import Thread , Condition , Timer
1414from readerwriterlock .rwlock import RWLockWrite , Lockable
1515from .visitors import LoadStateVisitor , SaveStateVisitor
1616from .config import BASE_DIR , is_persistence_enabled , PERSIST_FREQUENCY
@@ -71,10 +71,16 @@ def on_request(self, chain, context: RequestContext, response):
7171 if service_name not in self .loaded_services :
7272 self ._load_service_state (service_name )
7373
74- # Prevent persistence from running for this service while handling this request
74+ # Prevent persistence from running for this service while handling this request...
7575 rlock = self .rwlocks [service_name ].gen_rlock ()
7676 setattr (context , "localstack-persist_rlock" , rlock )
7777 rlock .acquire ()
78+ # ...unless the request takes over 1 second, in which case we force release the lock to
79+ # prevent long-running requests from blocking persistence which would in turn block other
80+ # requests
81+ timer = Timer (1 , try_release , [rlock ])
82+ setattr (context , "localstack-persist_rlock_timer" , timer )
83+ timer .start ()
7884
7985 def on_response (self , chain , context : RequestContext , response ):
8086 if not context .service or not context .request or not context .operation :
@@ -94,8 +100,15 @@ def on_response(self, chain, context: RequestContext, response):
94100 self .add_affected_service (service_name )
95101
96102 def on_finalize (self , chain , context : RequestContext , response ):
97- if rlock := getattr (context , "localstack-persist_rlock" , None ):
98- cast (Lockable , rlock ).release ()
103+ if rlock := cast (
104+ Lockable | None , getattr (context , "localstack-persist_rlock" , None )
105+ ):
106+ try_release (rlock )
107+
108+ if timer := cast (
109+ Timer | None , getattr (context , "localstack-persist_rlock_timer" , None )
110+ ):
111+ timer .cancel ()
99112
100113 def load_all_services_state (self ):
101114 LOG .info ("Loading persisted state of all services..." )
@@ -182,3 +195,11 @@ def _save_service_state(self, service_name: str):
182195
183196
184197STATE_TRACKER = StateTracker ()
198+
199+
200+ def try_release (lock : Lockable ):
201+ if lock and lock .locked ():
202+ try :
203+ lock .release ()
204+ except :
205+ pass
0 commit comments