|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import logging |
| 4 | +from abc import ABC |
| 5 | +from dataclasses import asdict, dataclass |
| 6 | +from pathlib import Path |
| 7 | +from typing import ClassVar, Generic, NamedTuple, Optional, Sequence, TypeVar, Union |
| 8 | + |
| 9 | +import matplotlib.pyplot as plt |
| 10 | +import numpy as np |
| 11 | +import yaml |
| 12 | +from matplotlib.lines import Line2D |
| 13 | +from scipy.optimize import curve_fit |
| 14 | +from typing_extensions import Self |
| 15 | + |
| 16 | +from instamatic._typing import AnyPath, float_deg, int_nm |
| 17 | +from instamatic.config import calibration_drc |
| 18 | +from instamatic.utils.domains import NumericDomain, NumericDomainConstrained |
| 19 | + |
| 20 | +logger = logging.getLogger(__name__) |
| 21 | + |
| 22 | + |
| 23 | +def log(s: str) -> None: |
| 24 | + logger.info(s) |
| 25 | + print(s) |
| 26 | + |
| 27 | + |
| 28 | +Span = TypeVar('Span', float_deg, int_nm) |
| 29 | +SpeedN = TypeVar('SpeedN', float, int) |
| 30 | +Speed = Optional[SpeedN] |
| 31 | + |
| 32 | + |
| 33 | +@dataclass |
| 34 | +class MotionPlan(Generic[SpeedN]): |
| 35 | + """A set of motion parameters/outcomes nearest to the ones requested.""" |
| 36 | + |
| 37 | + pace: float # time it takes goniometer to cover 1 span unit (nm or degree) |
| 38 | + speed: Optional[SpeedN] # speed setting to get pace, None = not supported |
| 39 | + total_delay: float # total goniometer delay: delay + windup / speed |
| 40 | + |
| 41 | + |
| 42 | +class SpanSpeedTime(NamedTuple): |
| 43 | + """A single measurement point used to calibrate stage motion speed. |
| 44 | +
|
| 45 | + - span: the motion span expressed in degrees (rotation) or nm (translation); |
| 46 | + - speed: the speed setting used expressed in arbitrary TEM units; |
| 47 | + - time: time taken to travel span with speed expressed in seconds. |
| 48 | + """ |
| 49 | + |
| 50 | + span: Span |
| 51 | + speed: Speed |
| 52 | + time: float |
| 53 | + |
| 54 | + |
| 55 | +@dataclass |
| 56 | +class CalibStageMotion(ABC): |
| 57 | + """Abstract base class for stage-motion calibration. |
| 58 | +
|
| 59 | + Stores three parameters (pace, windup, delay) and optional speed_options. |
| 60 | + The `time` it takes the stage to move some `span` with `speed` |
| 61 | + is calculated using the following formula: |
| 62 | +
|
| 63 | + time = (alpha_pace * alpha_span + alpha_windup) / speed + delay |
| 64 | +
|
| 65 | + The two variables and three coefficients used above represent the following: |
| 66 | +
|
| 67 | + - span: total translation/rotation distance for stage to cover in degrees; |
| 68 | + - speed: goniometer speed setting linearly related to real speed, unit-less; |
| 69 | + - pace: time needed to cover 1 distance unit at speed=1 in dist unit / deg; |
| 70 | + - windup: variable time delay for stage speed up or slow down in seconds; |
| 71 | + - delay: constant time needed for stage communication in seconds; |
| 72 | +
|
| 73 | + The calibration also accepts `NumericDomain`-type `speed_options` to account |
| 74 | + for the fact that different goniometers accept different speed settings. |
| 75 | + `CalibStageRotation.speed_options.nearest(requested)` is used to find the |
| 76 | + speed setting nearest to the one requested. Any mention of `speed=None` |
| 77 | + signals that the microscope does not allow control over this motion speed. |
| 78 | +
|
| 79 | + Concrete subclasses should provide: |
| 80 | + - units and axis labels for plotting |
| 81 | + - a `live` class method that runs live calibration and returns an instance |
| 82 | + """ |
| 83 | + |
| 84 | + pace: float |
| 85 | + windup: float |
| 86 | + delay: float |
| 87 | + speed_options: Optional[NumericDomain] = None |
| 88 | + |
| 89 | + def __post_init__(self) -> None: |
| 90 | + self.pace = float(self.pace) |
| 91 | + self.windup = float(self.windup) |
| 92 | + self.delay = float(self.delay) |
| 93 | + if isinstance(self.speed_options, dict): |
| 94 | + self.speed_options = NumericDomain(**self.speed_options) |
| 95 | + |
| 96 | + _program_name: ClassVar[str] = NotImplemented |
| 97 | + _span_typical_limits: ClassVar[tuple[float, float]] = NotImplemented |
| 98 | + _span_units: ClassVar[str] = NotImplemented |
| 99 | + _yaml_filename: ClassVar[str] = NotImplemented |
| 100 | + |
| 101 | + @staticmethod |
| 102 | + def model1( |
| 103 | + span_speed: tuple[Span, SpeedN], |
| 104 | + pace: float, |
| 105 | + windup: float, |
| 106 | + delay: float, |
| 107 | + ) -> float: |
| 108 | + """Model 1 for estimating the total motion time for scipy curve_fit.""" |
| 109 | + span, speed = span_speed |
| 110 | + return (pace * span + windup) / speed + delay |
| 111 | + |
| 112 | + @staticmethod |
| 113 | + def model2(span: Span, pace: float, delay: float): |
| 114 | + """Simplified model used when speed is not supported i.e. = None.""" |
| 115 | + return pace * span + delay |
| 116 | + |
| 117 | + def span_speed_to_time(self, span: Span, speed: Speed = None) -> float: |
| 118 | + """`time` needed to move stage by `span` with `speed`.""" |
| 119 | + return (self.pace * span + self.windup) / (speed or 1.0) + self.delay |
| 120 | + |
| 121 | + def span_time_to_speed(self, span: Span, time: float) -> float: |
| 122 | + """`speed` that allows to move stage by `span` with `speed`.""" |
| 123 | + return (self.pace * span + self.windup) / (time - self.delay) |
| 124 | + |
| 125 | + def time_speed_to_span(self, time: float, speed: Speed = None) -> float: |
| 126 | + """Maximum `span` covered with `speed` in `time` (including delay).""" |
| 127 | + return ((speed or 1.0) * (time - self.delay) - self.windup) / self.pace |
| 128 | + |
| 129 | + def plan_motion(self, target_pace: float) -> MotionPlan: |
| 130 | + """Given target pace, find nearest pace, needed speed, and delay.""" |
| 131 | + if self.speed_options is None: |
| 132 | + return MotionPlan(self.pace, None, self.windup + self.delay) |
| 133 | + target_speed: float = abs(self.pace / target_pace) |
| 134 | + nearest_speed: Union[float, int] = self.speed_options.nearest(target_speed) |
| 135 | + nearest_pace: float = self.pace / nearest_speed |
| 136 | + total_delay: float = self.windup / nearest_speed + self.delay |
| 137 | + return MotionPlan(nearest_pace, nearest_speed, total_delay) |
| 138 | + |
| 139 | + def plot(self, sst: Optional[Sequence[SpanSpeedTime]] = None) -> None: |
| 140 | + """Generic plot of experimental (if given) & fit motion speed data.""" |
| 141 | + |
| 142 | + # determine speeds to plot; use experimental if given, fabricate otherwise |
| 143 | + speeds: list[Speed] # sorted |
| 144 | + if sst is not None: |
| 145 | + speeds = sorted(dict.fromkeys(s.speed for s in sst).keys()) |
| 146 | + elif self.speed_options is None: |
| 147 | + speeds = [None] |
| 148 | + elif isinstance(self.speed_options, NumericDomainConstrained): |
| 149 | + so: NumericDomainConstrained = self.speed_options |
| 150 | + speeds = list(np.linspace(so.lower_lim, so.upper_lim, 10)) |
| 151 | + else: # isinstance(calib.speed_options, NumericDomainDiscrete): |
| 152 | + speeds = sorted(getattr(self.speed_options, 'options', [1.0])) |
| 153 | + speeds = [s for s in speeds if s != 0] |
| 154 | + |
| 155 | + # determine spans to plot; use experimental if given, fabricate otherwise |
| 156 | + spans: list[float] # sorted |
| 157 | + if sst is not None: |
| 158 | + spans = list(dict.fromkeys(s.span for s in sst).keys()) |
| 159 | + else: |
| 160 | + spans = list(np.linspace(*self._span_typical_limits, 10)) |
| 161 | + |
| 162 | + # generate simulated span/speed/times data to be drawn later as lines |
| 163 | + simulated_sst = [] |
| 164 | + for span in spans: |
| 165 | + for speed in speeds: |
| 166 | + t = self.span_speed_to_time(span, speed) |
| 167 | + simulated_sst.append(SpanSpeedTime(span, speed, t)) |
| 168 | + plotted: list[tuple[Sequence[SpanSpeedTime], str]] = [(simulated_sst, '-')] |
| 169 | + |
| 170 | + # generate experimental span/speed/times to be drawn later as points |
| 171 | + if sst is not None: |
| 172 | + plotted.append((sst, 'o')) |
| 173 | + |
| 174 | + fig, ax = plt.subplots() |
| 175 | + ax.axvline(x=0, color='k') |
| 176 | + ax.axhline(y=0, color='k') |
| 177 | + ax.axhline(y=self.delay, color='r') |
| 178 | + |
| 179 | + colors = plt.colormaps['coolwarm'](np.linspace(0, 1, num=len(speeds))) |
| 180 | + handles: list[Line2D] = [] |
| 181 | + for color, speed in zip(colors, speeds): |
| 182 | + for sst, fmt in plotted: |
| 183 | + spans = [s.span for s in sst if s.speed == speed] |
| 184 | + times = [s.time for s in sst if s.speed == speed] |
| 185 | + ax.plot(spans, times, fmt, color=color) |
| 186 | + label = f'Speed setting {speed:.2f}' |
| 187 | + handles.append(Line2D([], [], color=color, marker='o', label=label)) |
| 188 | + |
| 189 | + ax.set_xlabel(f'Motion span [{self._span_units}]') |
| 190 | + ax.set_ylabel('Time required [s]') |
| 191 | + ax.set_title('Stage motion time vs span at different speeds') |
| 192 | + ax.legend(handles=handles, loc='best') |
| 193 | + plt.show() |
| 194 | + |
| 195 | + @classmethod |
| 196 | + def from_data(cls, sst: Sequence[SpanSpeedTime]) -> Self: |
| 197 | + """Fit cls.model to span-speed-time points and init based on result.""" |
| 198 | + sst_array = np.array(sst).T |
| 199 | + spans = np.array(sst_array[0], dtype=float) |
| 200 | + speeds = np.array(sst_array[1], dtype=float) |
| 201 | + times = np.array(sst_array[2], dtype=float) |
| 202 | + |
| 203 | + if np.all(np.isnan(speeds)): # TEM does not support setting with speed |
| 204 | + p = curve_fit(CalibStageMotion.model2, spans, ydata=times, p0=[1, 0]) |
| 205 | + (pace_n, delay_n), p_cov = p # noqa - this unpacking is OK |
| 206 | + pace_u, delay_u = np.sqrt(np.diag(p_cov)) |
| 207 | + windup_n, windup_u = 0.0, 0.0 |
| 208 | + else: |
| 209 | + ss = sst_array[:2] |
| 210 | + p = curve_fit(CalibStageMotion.model1, ss, ydata=times, p0=[1, 0, 0]) |
| 211 | + (pace_n, windup_n, delay_n), p_cov = p # noqa - this unpacking is OK |
| 212 | + pace_u, windup_u, delay_u = np.sqrt(np.diag(p_cov)) |
| 213 | + |
| 214 | + log(f'{cls.__name__} fit of motion model complete:') |
| 215 | + log(f'pace = {pace_n:12.6g} +/- {pace_u:12.6g} s / {cls._span_units}') |
| 216 | + log(f'windup = {windup_n:12.6f} +/- {windup_u:12.6f} s') |
| 217 | + log(f'delay = {delay_n:12.6f} +/- {delay_u:12.6f} s') |
| 218 | + log('model time = (pace * span + windup) / speed + delay') |
| 219 | + |
| 220 | + return cls(pace_n, windup_n, delay_n) |
| 221 | + |
| 222 | + @classmethod |
| 223 | + def from_file(cls, path: Optional[AnyPath] = None) -> Self: |
| 224 | + if path is None: |
| 225 | + path = Path(calibration_drc) / cls._yaml_filename |
| 226 | + try: |
| 227 | + with open(Path(path), 'r') as yaml_file: |
| 228 | + return cls(**yaml.safe_load(yaml_file)) |
| 229 | + except OSError as e: |
| 230 | + raise OSError(f'{e.strerror}: {path}. Please run {cls._program_name} first.') |
| 231 | + |
| 232 | + def to_file(self, outdir: Optional[AnyPath] = None) -> None: |
| 233 | + if outdir is None: |
| 234 | + outdir = calibration_drc |
| 235 | + yaml_path = Path(outdir) / self._yaml_filename |
| 236 | + with open(yaml_path, 'w') as yaml_file: |
| 237 | + yaml.safe_dump(asdict(self), yaml_file) # type: ignore[arg-type] |
| 238 | + log(f'{self.__class__.__name__} saved to {yaml_path}.') |
0 commit comments