Coverage for / builds / tms-localization / papers / tmsloc_proto / scripts / 07_calc_r2.py: 84%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 07:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 07:48 +0000
1#!/usr/bin/env python
3"""
4This script calculates the goodness-of-fits for each muscle in the ROI.
5"""
7import os
8import h5py
9import pynibs
10import argparse
11import pandas as pd
12import numpy as np
15from pynibs.regression import regress_data
16from pynibs.neuron.neuron_regression import calc_e_effective
18parser = argparse.ArgumentParser(description='Calculate fits for 3 muscles in ROI using all TMS pulses '
19 'and save results in .hdf5 file\n')
21# required arguments
22parser.add_argument('-s', '--fn_subject', help='Path to subject obj',
23 required=True, type=str)
24parser.add_argument('-m', '--mesh_idx', help='Mesh idx',
25 required=True, type=str)
26parser.add_argument('-e', '--exp_idx', help='Exp idx',
27 required=True, type=str)
28parser.add_argument('-r', '--roi_idx', help='ROI idx',
29 required=False, type=str, default=1)
30parser.add_argument('-n', '--n_cpu', help='How many cpus to use',
31 type=int, default=10)
32parser.add_argument('-f', '--function', help='Function to fit data to ("linear", "sigmoid4", "sigmoid4_log")',
33 type=str, default='sigmoid4')
34parser.add_argument('-i', '--n_refit', help='Number of refit iterations',
35 required=False, type=int, default=20)
36parser.add_argument('-v', '--verbose', help='Show detailed output messages',
37 required=False, action='store_true')
38parser.add_argument('--map2surf', help='Map results to whole brain surface',
39 required=False, action='store_true')
40parser.add_argument('--convergence', help='Assess convergence metrics (n vs n-1)',
41 required=False, action='store_true', default=False)
42parser.add_argument('--layerid', help='Perform mapping based on neuronal meanfield model on the specified layer. ("L23", "L5")',
43 required=False)
44parser.add_argument('--neuronmodel', help='Select neuron model for localization. '
45 '("sensitivity_weighting", "threshold_subtract", "threshold_binary",'
46 ' "IOcurve")',
47 required=False, default=None)
48parser.add_argument('--waveform', help='Waveform of TMS pulse. ("monophasic", "biphasic")',
49 required=False, default='biphasic')
51args = parser.parse_args()
53# print parameter info
54# ================================================================================
55print("-" * 64)
56print(f"{parser.description}")
57args_dict = vars(args)
58print('Parameters:')
59for key in args_dict.keys():
60 print(f"{key: >15}: {args_dict[key]}")
61print("-" * 64)
63# parameters
64# ================================================================================
65# file containing the subject information
66fn_subject = args.fn_subject
68# load subject object
69subject = pynibs.load_subject(fn_subject)
71# exp identifier
72exp_idx = args.exp_idx
74# mesh identifier
75mesh_idx = args.mesh_idx
77# ROI index the fields are extracted for goodness-of-fit calculation
78roi_idx = args.roi_idx
80# number threads (cpu cores) to use for parallelization
81n_cpu = args.n_cpu
83# show detailed output messages
84verbose = args.verbose
86# functions to perform the fit with
87functions = [getattr(pynibs.expio, args.function)]
89# hand muscles
90muscles = subject.exp[exp_idx]["channels"]
92# quantities of electric field the fit is performed for
93e_qoi_list = ["E_mag"]
95# map data to whole brain surface
96map2surf = args.map2surf
98# perform mapping using neuronal meanfield model
99layerid = args.layerid
101# neuron regression approach
102neuronmodel = args.neuronmodel
104# TMS waveform
105waveform = args.waveform
107# number of refit iterations
108n_refit = args.n_refit
110# output measure ("SR": Relative standard error of regression, "R2": R2 score)
111score_type = "R2"
113# Assess the convergence by comparing n vs n-1 stimulations
114convergence = args.convergence
115norm_to_perc = 99
117# loading subject objects
118subject = pynibs.load_subject(fname=fn_subject)
119subject_dir = os.path.split(fn_subject)[0]
121# parent folder containing the output data
122fn_results = os.path.join(subject_dir, "results", f"exp_{exp_idx}", "r2",
123 f"mesh_{mesh_idx}", f"roi_{roi_idx}")
125# file containing the electric field in the ROI [n_zaps x n_ele]
126fn_e_roi = os.path.join(subject_dir, "results", f"exp_{exp_idx}", "electric_field",
127 f"mesh_{mesh_idx}", f"roi_{roi_idx}", f"e_scaled.hdf5")
129if not os.path.exists(fn_e_roi):
130 raise FileNotFoundError(f"No electric field file found!")
132fn_result_suffix = f"_neuron_{layerid}_{neuronmodel}" if layerid is not None else ""
134subject.exp[exp_idx]["fn_exp_hdf5"] = [os.path.join(subject.subject_folder, "exp", exp_idx, f"mesh_{mesh_idx}",
135 "experiment.hdf5")]
137print(f"Processing subject: {subject.id}, experiment: {exp_idx}")
138print("=====================================================")
140# load ROI
141roi = pynibs.load_roi_surface_obj_from_hdf5(subject.mesh[mesh_idx]['fn_mesh_hdf5'])[roi_idx]
142con = roi.node_number_list
143mesh_folder = subject.mesh[mesh_idx]["mesh_folder"]
145# load exp data
146df_phys_data_postproc_EMG = pd.read_hdf(subject.exp[exp_idx]["fn_exp_hdf5"][0], "phys_data/postproc/EMG")
148convergence_results = {'muscle': [],
149 'qoi': [],
150 'fun': [],
151 'nrmsd': [],
152 'geodesic dist': []}
154for i_m, m in enumerate(muscles):
155 try:
156 m = str(int(m))
157 except ValueError:
158 pass
159 print(f"\n> Muscle: {m}")
161 # load MEPs
162 mep = np.array(df_phys_data_postproc_EMG[f"p2p_{m}"])
164 for i_fun, fun in enumerate(functions):
165 if neuronmodel and not (fun == pynibs.expio.fit_funs.sigmoid4 or fun == pynibs.expio.fit_funs.sigmoid4_log):
166 raise NotImplementedError("Neuron regression can only be "
167 "performed with sigmoid4 or sigmoid4_log functions.")
169 print(f"> fun: {fun.__name__}")
171 gof_dict = dict()
173 for i_e_qoi, e_qoi in enumerate(e_qoi_list):
174 print(f"> E-QOI: {e_qoi}")
176 if e_qoi == "E_norm":
177 select_signed_data = True
178 else:
179 select_signed_data = False
181 # load electric field
182 with h5py.File(fn_e_roi, "r") as f:
183 if layerid is not None:
184 print(f"Performing neuron-enhanced mapping on Layer {layerid} using '{neuronmodel}' approach")
185 e_matrix = f[f"/{layerid}/E_mag"][:]
186 e_matrix_orig = f[f"/{layerid}/E_mag"][:]
187 # load e-field angle to local surface normal (= theta)
188 # and relative gradient of e-field between GM and WM (= gradient)
189 theta = f[f"/{layerid}/E_theta"][:]
190 gradient = f[f"/{layerid}/E_gradient"][:]
192 # calculate by how much the individual threshold of the
193 # neuronal population was exceeded by the applied e-field
194 # neuronmodel can be either "threshold" or "IOcurve"
195 e_matrix = calc_e_effective(e=e_matrix,
196 layerid=layerid,
197 theta=theta,
198 gradient=gradient,
199 neuronmodel=neuronmodel,
200 mep=mep,
201 waveform=waveform)
203 if layerid is not None:
204 for layer in roi.layers:
205 if layer.id == layerid:
206 nodes = layer.surface.nodes.node_coord
207 con = layer.surface.elm.node_number_list[:, :3] - np.min(layer.surface.elm.node_number_list[:, :3])
208 else:
209 e_matrix = f[e_qoi][:]
210 nodes = roi.node_coord_mid
211 con = roi.node_number_list
213 # check for zero e-fields and filter them (problems in FEM!)
214 zero_mask = (e_matrix == 0).all(axis=1)
216 if zero_mask.any():
217 print(f"Warning! {np.sum(zero_mask)} zero e-fields detected in element! Check FEM! Ignoring them for now!")
218 e_matrix = e_matrix[np.logical_not(zero_mask), :]
219 mep = mep[np.logical_not(zero_mask)]
221 if layerid is not None and neuronmodel == "IOcurve":
222 gof_dict[e_qoi] = 1/np.var(e_matrix, axis=0)
224 else:
225 gof_dict[e_qoi] = regress_data(e_matrix=e_matrix,
226 mep=mep,
227 fun=fun,
228 n_cpu=n_cpu,
229 con=con,
230 n_refit=n_refit,
231 return_fits=False,
232 score_type=score_type,
233 verbose=verbose,
234 pool=None,
235 refit_discontinuities=True,
236 select_signed_data=select_signed_data)
238 if convergence:
239 print(f"Computing n-1 results to assess convergence.")
240 res_prev = regress_data(e_matrix=e_matrix[:-1, :],
241 mep=mep[:-1],
242 fun=fun,
243 n_cpu=n_cpu,
244 con=con,
245 n_refit=n_refit,
246 return_fits=False,
247 score_type=score_type,
248 verbose=verbose,
249 pool=None,
250 refit_discontinuities=True,
251 select_signed_data=select_signed_data)
253 # compute normalised root-mean-square deviation between n and n-1 solution
254 norm_to_perc = 99
255 res_final = gof_dict[e_qoi] / np.percentile(gof_dict[e_qoi][gof_dict[e_qoi] > 0],
256 norm_to_perc)
257 res_prev = res_prev / np.percentile(res_prev[res_prev > 0], norm_to_perc)
258 nrmsd = pynibs.util.quality_measures.nrmsd(res_final, res_prev)
260 # compute geodesic distance between best element n and n-1 solution
261 best_elm = np.argmax(res_final)
262 best_elm_prev = np.argmax(res_prev)
263 nodes_dist, tris_dist = pynibs.util.quality_measures.geodesic_dist(nodes,
264 con,
265 best_elm,
266 source_is_node=False)
267 geod_dist = tris_dist[best_elm_prev]
269 convergence_results['muscle'].append(m)
270 convergence_results['qoi'].append(e_qoi)
271 convergence_results['fun'].append(fun.__name__)
272 convergence_results['nrmsd'].append(nrmsd)
273 convergence_results['geodesic dist'].append(geod_dist)
275 # save results
276 fn_gof_hdf5 = os.path.join(fn_results, m, fun.__name__, f"r2{fn_result_suffix}_roi_data.hdf5")
278 # create folder
279 if not os.path.exists(os.path.split(fn_gof_hdf5)[0]):
280 os.makedirs(os.path.split(fn_gof_hdf5)[0])
281 fn_geo_hdf5 = os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_roi_geo.hdf5")
283 # write hdf5 _geo file
284 pynibs.write_geo_hdf5_surf(out_fn=fn_geo_hdf5,
285 points=nodes,
286 con=con,
287 replace=True,
288 hdf5_path='/mesh')
290 # write _data file
291 data = [gof_dict[e_qoi] for e_qoi in e_qoi_list]
292 data_names = [f'c_{e_qoi}' for e_qoi in e_qoi_list]
294 print(f"Writing results to {fn_gof_hdf5}.")
295 pynibs.write_data_hdf5_surf(data=data,
296 data_names=data_names,
297 data_hdf_fn_out=fn_gof_hdf5,
298 geo_hdf_fn=fn_geo_hdf5,
299 replace=True)
301 if not map2surf or layerid is not None: # skip this step also when mapping on the layers is requested
302 continue
303 print("> Mapping data to brain surface")
304 c_mapped = pynibs.mesh.map_data_to_surface(datasets=data,
305 points_datasets=[roi.node_coord_mid] * len(data),
306 con_datasets=[roi.node_number_list] * len(data),
307 fname_fsl_gm=[None, None],
308 fname_fsl_wm=[None, None],
309 fname_midlayer=[
310 os.path.join(mesh_folder,
311 subject.mesh[mesh_idx]['fn_lh_midlayer']),
312 os.path.join(mesh_folder,
313 subject.mesh[mesh_idx]['fn_rh_midlayer'])
314 ],
315 delta=subject.roi[mesh_idx][roi_idx]['delta'],
316 input_data_in_center=True,
317 return_data_in_center=True,
318 data_substitute=-1)
320 # recreate complete midlayer surface to write in .hdf5 geo file
321 points_midlayer, con_midlayer = pynibs.make_GM_WM_surface(gm_surf_fname=[subject.mesh[mesh_idx]['fn_lh_gm'],
322 subject.mesh[mesh_idx]['fn_rh_gm']],
323 wm_surf_fname=[subject.mesh[mesh_idx]['fn_lh_wm'],
324 subject.mesh[mesh_idx]['fn_rh_wm']],
325 mesh_folder=mesh_folder,
326 midlayer_surf_fname=[
327 subject.mesh[mesh_idx]['fn_lh_midlayer'],
328 subject.mesh[mesh_idx]['fn_rh_midlayer']
329 ],
330 delta=subject.roi[mesh_idx][roi_idx]['delta'],
331 X_ROI=None,
332 Y_ROI=None,
333 Z_ROI=None,
334 layer=1,
335 fn_mask=None)
337 # save hdf5 _geo file (mapped)
338 print(f" > Writing mapped brain and roi to {os.path.join(os.path.split(fn_gof_hdf5)[0], f'r2{fn_result_suffix}_data.hdf5')}")
339 pynibs.write_geo_hdf5_surf(out_fn=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_geo.hdf5"),
340 points=points_midlayer,
341 con=con_midlayer,
342 replace=True,
343 hdf5_path='/mesh')
345 # save hdf5 _data file (mapped)
347 pynibs.write_data_hdf5_surf(data=c_mapped,
348 data_names=data_names,
349 data_hdf_fn_out=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_data.hdf5"),
350 geo_hdf_fn=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_geo.hdf5"),
351 replace=True)
353print("DONE")
355if convergence:
356 print("Convergence for n vs n-1 stimulations:")
357 convergence_results = pd.DataFrame().from_dict(convergence_results)
358 print(convergence_results)
359 convergence_fn = os.path.join(fn_results, f"convergence{fn_result_suffix}.csv")
360 print(f"Saved to {convergence_fn}")
361 convergence_results.to_csv(convergence_fn)
362print("====")