1+ import numpy as np
2+ import mne
3+ from spm import *
4+
5+ def spm_2_mne_raw (D ):
6+ # Converts SPM D object to MNE Raw object
7+ # importaint: only works before epoching
8+ # importaint: checked only on opm data
9+
10+
11+ n_channels = int (D .nchannels ())
12+ sampling_freq = D .fsample ()
13+ duration = [float (D .time ()[0 ]),float (D .time ()[- 1 ])]
14+ n_samples = int (D .nsamples ())
15+
16+ channel_names = list (D .chanlabels ())
17+ type_mapping = {
18+ 'REFMAG' : 'ref_meg' ,
19+ 'REFGRAD' : 'ref_meg' ,
20+ 'REFPLANAR' : 'ref_meg' ,
21+ 'EEG' : 'eeg' ,
22+ 'ECG' : 'ecg' ,
23+ 'EMG' : 'emg' ,
24+ 'LFP' : 'bio' ,
25+ 'PHYS' : 'bio' ,
26+ 'ILAM' : 'bio' ,
27+ 'SRC' : 'dipole' ,
28+ 'EOG' : 'eog' ,
29+ 'VEOG' : 'eog' ,
30+ 'HEOG' : 'eog' ,
31+ 'ECG' : 'ecg' ,
32+ 'MEG' : 'mag' ,
33+ 'MEGMAG' : 'mag' ,
34+ 'MEGCOMB' : 'misc' ,
35+ 'MEGGRAD' : 'mag' ,
36+ 'MEGPLANAR' : 'grad' ,
37+ 'TRIG' : 'stim' ,
38+ 'Other' : 'misc'
39+ }
40+ channel_types = [type_mapping .get (ch_type , 'misc' ) for ch_type in D .chantype ()]
41+ info = mne .create_info (ch_names = channel_names , sfreq = sampling_freq , ch_types = channel_types )
42+
43+ unit_multipliers = {
44+ "Nan" : 1 ,
45+ "mT" : 10 ** (- 3 ),
46+ "nT" : 10 ** (- 9 ),
47+ "pT" : 10 ** (- 12 ),
48+ "fT" : 10 ** (- 15 ),
49+ "T" : 1 ,
50+ "V" : 1 ,
51+ "mV" : 1 }
52+
53+
54+ unit_mul_per_ch = [unit_multipliers [unit ] for unit in D .units ()]
55+ data = np .zeros ((n_channels , n_samples ))
56+ data [:,:] = D [:,:,:]
57+ data = np .array ([channel_data * unit_mul for channel_data , unit_mul in zip (data , unit_mul_per_ch )])
58+
59+ #todo check EEG channels are translating properly
60+ sens_ch_start = np .sum ([ch_type == 'TRIG' for ch_type in D .chantype ()])
61+ i_meg = 0
62+ i_eeg = 0
63+ i_trig = 0
64+ for i , ch in enumerate (info ['chs' ]):
65+ if ch ['kind' ] == 1 :
66+ k_unit = 10 ** (- 3 ) if D .sensors ('MEG' ).unit == 'mm' else 1
67+ ch ['loc' ] = np .concatenate ((D .sensors ('MEG' ).chanpos [i_meg ]* k_unit , D .sensors ('MEG' ).chanori [i_meg ], np .zeros (6 )))
68+ ch ['unit' ] = mne .io .constants .FIFF .FIFF_UNIT_T
69+ ch ['unit_mul' ] = mne .io .constants .FIFF .FIFF_UNITM_NONE
70+ ch ['range' ] = 1.0
71+ ch ['cal' ] = 1.0
72+ ch ['coord_frame' ] = mne .io .constants .FIFF .FIFFV_COORD_DEVICE
73+ i_meg += 1
74+ elif ch ['kind' ] == 2 : # EEG channels
75+ ch ['loc' ] = np .concatenate ((D .sensors ('EEG' ).chanpos [i_eeg ], np .zeros (9 ))) #CHECK HOW IT WORKS
76+ ch ['unit' ] = mne .io .constants .FIFF .FIFF_UNIT_V if D .units ()[i_eeg ] == 'V' else mne .io .constants .FIFF .FIFF_UNIT_M
77+ ch ['unit_mul' ] = mne .io .constants .FIFF .FIFF_UNITM_NONE
78+ ch ['range' ] = 1.0
79+ ch ['cal' ] = 1.0
80+ ch ['coord_frame' ] = mne .io .constants .FIFF .FIFFV_COORD_HEAD
81+ i_eeg += 1
82+ elif ch ['kind' ] == 3 :
83+ ch ['unit' ] = mne .io .constants .FIFF .FIFF_UNIT_V if D .units ()[i_trig ] == 'V' else mne .io .constants .FIFF .FIFF_UNIT_M
84+ ch ['unit_mul' ] = mne .io .constants .FIFF .FIFF_UNITM_NONE
85+ i_trig += 1
86+ # needs work to embed the tra matrix into prog. for now it is saved separetly for researcher to use for correcting forward model
87+ if D .sensors ('MEG' ).tra .shape [0 ] != D .sensors ('MEG' ).tra .shape [1 ]:
88+ np .savetxt (D .path ()+ '/tra_matrix_from_spm.tsv' , D .sensors ('MEG' ).tra , delimiter = '\t ' )
89+ else :
90+ U ,S ,V = np .linalg .svd (D .sensors ('MEG' ).tra ,full_matrices = False )
91+ rank_tra = np .linalg .matrix_rank (D .sensors ('MEG' ).tra )
92+ V_real = np .conj (V .T )
93+ proj_vec = []
94+ proj_vec = V_real [:,rank_tra :]
95+ from numpy import arange
96+ if proj_vec .shape [1 ] > 0 :
97+ for ii in arange (proj_vec .shape [1 ]):
98+ proj_data = dict (
99+ col_names = list (np .asarray (D .sensors ('MEG' ).label .tolist ()).reshape (1 , - 1 )[0 ]),
100+ row_names = None ,
101+ data = proj_vec [:, ii ].reshape (1 , - 1 ),
102+ ncol = len (D .sensors ('MEG' ).label ),
103+ nrow = 1 ,
104+ )
105+ info ['projs' ].append (mne .Projection (active = True , data = proj_data , desc = 'from matlab tra matrix' , kind = 1 , explained_var = None ))
106+ #np.savetxt(D.path()+'/tra_matrix_from_spm.tsv', D.sensors('MEG').tra, delimiter='\t')
107+ bad_channels = D .badchannels ().astype (int ) # matlab index = (python index + 1)
108+ if D .badchannels ().size == 1 :
109+ info ['bads' ] = [channel_names [bad_channels ]]
110+ elif D .badchannels ().size != 0 :
111+ info ['bads' ] = [channel_names [i ] for i in bad_channels ]
112+ raw = mne .io .RawArray (data , info )
113+ return raw
114+
115+
116+ def mne_raw_2_spm (raw ,data_path ,file_name = None ):
117+
118+ from datetime import datetime
119+ current_time = datetime .now ().strftime ("%Y-%m-%d_%H-%M-%S" )
120+
121+ import os
122+ os .chdir (data_path )
123+ if file_name is None :
124+ file_name = f"data from mne { str (raw .info ['meas_date' ])[:10 ]} _{ current_time } .dat"
125+
126+ nChans = raw .info ['nchan' ]
127+ nSamples = raw .times .shape [0 ]
128+ nTrials = 1
129+ D = meeg (nChans ,nSamples ,nTrials )
130+ D = D .fsample (raw .info ['sfreq' ])
131+ D = D .path (str (data_path ))
132+ D = D .chanlabels (D ,raw .info ['ch_names' ])
133+
134+ type_mapping = {
135+ mne .io .constants .FIFF .FIFFV_REF_MEG_CH : 'ref_meg' ,
136+ mne .io .constants .FIFF .FIFFV_EEG_CH : 'EEG' ,
137+ mne .io .constants .FIFF .FIFFV_ECG_CH : 'ECG' ,
138+ mne .io .constants .FIFF .FIFFV_EMG_CH : 'emg' ,
139+ mne .io .constants .FIFF .FIFFV_BIO_CH : 'BIO' ,
140+ mne .io .constants .FIFF .FIFFV_DIPOLE_WAVE : 'dipole' ,
141+ mne .io .constants .FIFF .FIFFV_EOG_CH : 'EOG' ,
142+ mne .io .constants .FIFF .FIFFV_MEG_CH : 'MEGMAG' ,
143+ mne .io .constants .FIFF .FIFFV_MISC_CH : 'misc' ,
144+ mne .io .constants .FIFF .FIFFV_STIM_CH : 'TRIG'
145+ }
146+
147+ chan_types = [type_mapping [ch ['kind' ]] for ch in raw .info ['chs' ]]
148+ D = D .chantype (D ,chan_types )
149+
150+ mapping_units = {
151+ mne .io .constants .FIFF .FIFF_UNIT_T : 'T' , # Reference MEG channels
152+ mne .io .constants .FIFF .FIFF_UNIT_V : 'V' , # EEG channels
153+ mne .io .constants .FIFF .FIFFV_STIM_CH : 'AU' ,
154+ mne .io .constants .FIFF .FIFF_UNIT_T_M : 'T/m'
155+ }
156+ unit_multipliers = {
157+ mne .io .constants .FIFF .FIFF_UNITM_NONE : '' ,
158+ mne .io .constants .FIFF .FIFF_UNITM_M : 'm' ,
159+ mne .io .constants .FIFF .FIFF_UNITM_M : 'micro' ,
160+ mne .io .constants .FIFF .FIFF_UNITM_N : 'p' ,
161+ mne .io .constants .FIFF .FIFF_UNITM_P : 'n' ,
162+ mne .io .constants .FIFF .FIFF_UNITM_F : 'f' ,
163+ mne .io .constants .FIFF .FIFF_UNITM_NONE : '' ,
164+ }
165+ chan_units = [str (unit_multipliers [ch ['unit_mul' ]] + mapping_units [ch ['unit' ]]) for ch in raw .info ['chs' ]]
166+ D = D .units (D ,chan_units )
167+
168+ meg_label = [None ] * (chan_types .count ('MEGMAG' ))
169+ meg_chanunit = [None ] * (chan_types .count ('MEGMAG' ))
170+ meg_chantype = [None ] * (chan_types .count ('MEGMAG' ))
171+ meg_pos = np .zeros (((chan_types .count ('MEGMAG' )), 3 ))
172+ meg_ori = np .zeros (((chan_types .count ('MEGMAG' )), 3 ))
173+ meg_conter = 0
174+ eeg_label = [None ] * (chan_types .count ('EEG' ))
175+ eeg_chanunit = [None ] * (chan_types .count ('EEG' ))
176+ eeg_chantype = [None ] * (chan_types .count ('EEG' ))
177+ eeg_pos = np .zeros (((chan_types .count ('EEG' )), 3 ))
178+ eeg_ori = np .zeros (((chan_types .count ('EEG' )), 3 ))
179+ eeg_conter = 0
180+ for i , ch in enumerate (raw .info ['chs' ]):
181+ if chan_types [i ] == 'MEGMAG' :
182+ meg_pos [meg_conter ,:] = [ch ['loc' ][0 ],ch ['loc' ][1 ],ch ['loc' ][2 ]]
183+ meg_ori [meg_conter ,:] = [ch ['loc' ][3 ],ch ['loc' ][4 ],ch ['loc' ][5 ]]
184+ meg_label [meg_conter ] = ch ['ch_name' ]
185+ meg_chanunit [meg_conter ] = str (unit_multipliers [ch ['unit_mul' ]] + mapping_units [ch ['unit' ]])
186+ meg_chantype [meg_conter ] = type_mapping [ch ['kind' ]]
187+ meg_conter += 1
188+ if chan_types [i ] == 'EEG' :
189+ eeg_pos [eeg_conter ,:] = [ch ['loc' ][0 ],ch ['loc' ][1 ],ch ['loc' ][2 ]]
190+ eeg_ori [eeg_conter ,:] = [ch ['loc' ][3 ],ch ['loc' ][4 ],ch ['loc' ][5 ]]
191+ eeg_label [eeg_conter ] = ch ['ch_name' ]
192+ eeg_chanunit [eeg_conter ] = str (unit_multipliers [ch ['unit_mul' ]] + mapping_units [ch ['unit' ]])
193+ eeg_chantype [eeg_conter ] = type_mapping [ch ['kind' ]]
194+ eeg_conter += 1
195+
196+ if meg_conter > 0 :
197+ grad = Struct ()
198+ grad .label = meg_label
199+ grad .coilpos = np .asarray (meg_pos )
200+ grad .coilpos = np .asarray (meg_pos )
201+ grad .coilori = np .asarray (meg_ori )
202+ grad .coilori = np .asarray (meg_ori )
203+ grad .chanunit = meg_chanunit
204+ grad .chantype = meg_chantype
205+
206+ vec_base = []
207+ #works oonly when projector is calculated for all the channels
208+ magnetometer_count = sum (1 for ch in raw .info ['chs' ] if ch ['unit' ] == mne .io .constants .FIFF .FIFF_UNIT_T )
209+ for ii , proj in enumerate (raw .info ['projs' ]):
210+ if proj ['data' ]['ncol' ] == len (meg_label ) and proj ['active' ]:
211+ vec_base .append (proj ['data' ]['data' ])
212+ if vec_base == []:
213+ grad .tra = np .eye (len (meg_label ))
214+ else :
215+ vec_array = np .array (vec_base ).reshape (len (vec_base ), magnetometer_count ).T
216+ grad .tra = np .eye (vec_array .shape [0 ]) - vec_array @ np .linalg .pinv (vec_array )
217+ D = D .sensors ('MEG' , grad )
218+
219+ if eeg_conter > 0 :
220+ grad = Struct ()
221+ grad .label = eeg_label
222+ grad .coilpos = np .asarray (eeg_pos )
223+ grad .coilori = np .asarray (eeg_ori )
224+ grad .chanunit = eeg_chanunit
225+ grad .chantype = eeg_chantype
226+
227+ vec_base = []
228+ vec_array = []
229+ eeg_count = sum (1 for ch in raw .info ['chs' ] if ch ['kind' ] == mne .io .constants .FIFF .FIFFV_EEG_CH )
230+ for ii , proj in enumerate (raw .info ['projs' ]):
231+ if proj ['data' ]['ncol' ] == eeg_count and proj ['active' ]:
232+ vec_base .append (proj ['data' ]['data' ])
233+ if vec_base == []:
234+ grad .tra = np .eye (len (eeg_label ))
235+ else :
236+ vec_array = np .array (vec_base ).reshape (len (vec_base ), eeg_count ).T
237+ grad .tra = np .eye (vec_array .shape [0 ]) - vec_array @ np .linalg .pinv (vec_array )
238+ D = D .sensors ('EEG' , grad )
239+
240+
241+ D .save ()
242+
243+ D .blank (file_name )
244+ D = D .link (file_name )
245+ data = raw .get_data ()
246+ reshaped_data = data .reshape ((data .shape [0 ], data .shape [1 ], 1 ))
247+ D [:,:,:]= reshaped_data
248+
249+ if len (raw .info ['bads' ]) == 1 :
250+ D = D .badchannels (raw .info ['ch_names' ].index (raw .info ['bads' ][0 ]) ,1 )
251+ elif len (raw .info ['bads' ]) > 1 :
252+ for i_bad in raw .info ['bads' ]:
253+ D = D .badchannels (raw .info ['ch_names' ].index (i_bad ) ,1 )
254+
255+
256+ return D
0 commit comments