|
| 1 | +from .enums import AltDSSEvent |
| 2 | +from . import ffi, lib |
| 3 | + |
| 4 | +LEGACY_EVENTS = ( |
| 5 | + AltDSSEvent.Legacy_InitControls, |
| 6 | + AltDSSEvent.Legacy_CheckControls, |
| 7 | + AltDSSEvent.Legacy_StepControls, |
| 8 | +) |
| 9 | + |
| 10 | +class EventCallbackManager: |
| 11 | + _ctx_to_manager = {} |
| 12 | + |
| 13 | + def __init__(self, ctx): |
| 14 | + if ctx in EventCallbackManager._ctx_to_manager: |
| 15 | + raise ValueError('This context already has a manager. Use "get_manager_for_ctx" to get it.') |
| 16 | + |
| 17 | + EventCallbackManager._ctx_to_manager[ctx] = self |
| 18 | + self.ctx = ctx |
| 19 | + for evt_type in AltDSSEvent: |
| 20 | + setattr(self, evt_type.name, []) |
| 21 | + |
| 22 | + def register_func(self, evt: AltDSSEvent, func) -> bool: |
| 23 | + handlers = getattr(self, AltDSSEvent(evt).name) |
| 24 | + if len(handlers) == 0: |
| 25 | + if lib.ctx_DSSEvents_RegisterAlt( |
| 26 | + self.ctx, |
| 27 | + evt, |
| 28 | + lib.altdss_python_util_callback |
| 29 | + ) == 0: |
| 30 | + raise RuntimeError('Could not register main callback function.') |
| 31 | + |
| 32 | + if func in handlers: |
| 33 | + return False |
| 34 | + |
| 35 | + handlers.append(func) |
| 36 | + return True |
| 37 | + |
| 38 | + def unregister_func(self, evt: AltDSSEvent, func) -> bool: |
| 39 | + handlers = getattr(self, AltDSSEvent(evt).name) |
| 40 | + prev_len = len(handlers) |
| 41 | + handlers[:] = [f for f in handlers if f is not func] |
| 42 | + if len(handlers) == 0: |
| 43 | + lib.ctx_DSSEvents_UnregisterAlt( |
| 44 | + self.ctx, |
| 45 | + evt, |
| 46 | + lib.altdss_python_util_callback |
| 47 | + ) |
| 48 | + |
| 49 | + return prev_len != len(handlers) |
| 50 | + |
| 51 | + |
| 52 | + def handle_event(self, ctx, evt: AltDSSEvent, step: int, ptr): |
| 53 | + evt = AltDSSEvent(evt) |
| 54 | + handlers = getattr(self, evt.name) |
| 55 | + |
| 56 | + # No arguments for legacy (classic COM impl.) OpenDSS events |
| 57 | + if evt in LEGACY_EVENTS: |
| 58 | + for handler in handlers: |
| 59 | + handler() |
| 60 | + |
| 61 | + return |
| 62 | + |
| 63 | + for handler in handlers: |
| 64 | + handler(ctx, evt, step, ptr) |
| 65 | + |
| 66 | + |
| 67 | + |
| 68 | +def get_manager_for_ctx(ctx) -> EventCallbackManager: |
| 69 | + m = EventCallbackManager._ctx_to_manager.get(ctx) |
| 70 | + if m is None: |
| 71 | + m = EventCallbackManager(ctx) |
| 72 | + |
| 73 | + return m |
| 74 | + |
| 75 | + |
| 76 | +@ffi.def_extern() |
| 77 | +def altdss_python_util_callback(ctx, eventCode: int, step: int, ptr): |
| 78 | + m = EventCallbackManager._ctx_to_manager.get(ctx) |
| 79 | + if m is None: |
| 80 | + return |
| 81 | + |
| 82 | + try: |
| 83 | + m.handle_event(ctx, eventCode, step, ptr) |
| 84 | + |
| 85 | + except Exception as ex: |
| 86 | + err_ptr = lib.ctx_Error_Get_NumberPtr(ctx) |
| 87 | + err_ptr[0] = 1 |
| 88 | + lib.ctx_Error_Set_Description(ctx, f"Python callback exception: {ex}".encode()) |
| 89 | + |
| 90 | + |
| 91 | +__all__ = ['get_manager_for_ctx'] |
0 commit comments