Coverage for  / builds / tms-localization / papers / tmsloc_proto / scripts / 07_calc_r2.py: 84%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 07:48 +0000

1#!/usr/bin/env python 

2 

3""" 

4This script calculates the goodness-of-fits for each muscle in the ROI. 

5""" 

6 

7import os 

8import h5py 

9import pynibs 

10import argparse 

11import pandas as pd 

12import numpy as np 

13 

14 

15from pynibs.regression import regress_data 

16from pynibs.neuron.neuron_regression import calc_e_effective 

17 

18parser = argparse.ArgumentParser(description='Calculate fits for 3 muscles in ROI using all TMS pulses ' 

19 'and save results in .hdf5 file\n') 

20 

21# required arguments 

22parser.add_argument('-s', '--fn_subject', help='Path to subject obj', 

23 required=True, type=str) 

24parser.add_argument('-m', '--mesh_idx', help='Mesh idx', 

25 required=True, type=str) 

26parser.add_argument('-e', '--exp_idx', help='Exp idx', 

27 required=True, type=str) 

28parser.add_argument('-r', '--roi_idx', help='ROI idx', 

29 required=False, type=str, default=1) 

30parser.add_argument('-n', '--n_cpu', help='How many cpus to use', 

31 type=int, default=10) 

32parser.add_argument('-f', '--function', help='Function to fit data to ("linear", "sigmoid4", "sigmoid4_log")', 

33 type=str, default='sigmoid4') 

34parser.add_argument('-i', '--n_refit', help='Number of refit iterations', 

35 required=False, type=int, default=20) 

36parser.add_argument('-v', '--verbose', help='Show detailed output messages', 

37 required=False, action='store_true') 

38parser.add_argument('--map2surf', help='Map results to whole brain surface', 

39 required=False, action='store_true') 

40parser.add_argument('--convergence', help='Assess convergence metrics (n vs n-1)', 

41 required=False, action='store_true', default=False) 

42parser.add_argument('--layerid', help='Perform mapping based on neuronal meanfield model on the specified layer. ("L23", "L5")', 

43 required=False) 

44parser.add_argument('--neuronmodel', help='Select neuron model for localization. ' 

45 '("sensitivity_weighting", "threshold_subtract", "threshold_binary",' 

46 ' "IOcurve")', 

47 required=False, default=None) 

48parser.add_argument('--waveform', help='Waveform of TMS pulse. ("monophasic", "biphasic")', 

49 required=False, default='biphasic') 

50 

51args = parser.parse_args() 

52 

53# print parameter info 

54# ================================================================================ 

55print("-" * 64) 

56print(f"{parser.description}") 

57args_dict = vars(args) 

58print('Parameters:') 

59for key in args_dict.keys(): 

60 print(f"{key: >15}: {args_dict[key]}") 

61print("-" * 64) 

62 

63# parameters 

64# ================================================================================ 

65# file containing the subject information 

66fn_subject = args.fn_subject 

67 

68# load subject object 

69subject = pynibs.load_subject(fn_subject) 

70 

71# exp identifier 

72exp_idx = args.exp_idx 

73 

74# mesh identifier 

75mesh_idx = args.mesh_idx 

76 

77# ROI index the fields are extracted for goodness-of-fit calculation 

78roi_idx = args.roi_idx 

79 

80# number threads (cpu cores) to use for parallelization 

81n_cpu = args.n_cpu 

82 

83# show detailed output messages 

84verbose = args.verbose 

85 

86# functions to perform the fit with 

87functions = [getattr(pynibs.expio, args.function)] 

88 

89# hand muscles 

90muscles = subject.exp[exp_idx]["channels"] 

91 

92# quantities of electric field the fit is performed for 

93e_qoi_list = ["E_mag"] 

94 

95# map data to whole brain surface 

96map2surf = args.map2surf 

97 

98# perform mapping using neuronal meanfield model 

99layerid = args.layerid 

100 

101# neuron regression approach 

102neuronmodel = args.neuronmodel 

103 

104# TMS waveform 

105waveform = args.waveform 

106 

107# number of refit iterations 

108n_refit = args.n_refit 

109 

110# output measure ("SR": Relative standard error of regression, "R2": R2 score) 

111score_type = "R2" 

112 

113# Assess the convergence by comparing n vs n-1 stimulations 

114convergence = args.convergence 

115norm_to_perc = 99 

116 

117# loading subject objects 

118subject = pynibs.load_subject(fname=fn_subject) 

119subject_dir = os.path.split(fn_subject)[0] 

120 

121# parent folder containing the output data 

122fn_results = os.path.join(subject_dir, "results", f"exp_{exp_idx}", "r2", 

123 f"mesh_{mesh_idx}", f"roi_{roi_idx}") 

124 

125# file containing the electric field in the ROI [n_zaps x n_ele] 

126fn_e_roi = os.path.join(subject_dir, "results", f"exp_{exp_idx}", "electric_field", 

127 f"mesh_{mesh_idx}", f"roi_{roi_idx}", f"e_scaled.hdf5") 

128 

129if not os.path.exists(fn_e_roi): 

130 raise FileNotFoundError(f"No electric field file found!") 

131 

132fn_result_suffix = f"_neuron_{layerid}_{neuronmodel}" if layerid is not None else "" 

133 

134subject.exp[exp_idx]["fn_exp_hdf5"] = [os.path.join(subject.subject_folder, "exp", exp_idx, f"mesh_{mesh_idx}", 

135 "experiment.hdf5")] 

136 

137print(f"Processing subject: {subject.id}, experiment: {exp_idx}") 

138print("=====================================================") 

139 

140# load ROI 

141roi = pynibs.load_roi_surface_obj_from_hdf5(subject.mesh[mesh_idx]['fn_mesh_hdf5'])[roi_idx] 

142con = roi.node_number_list 

143mesh_folder = subject.mesh[mesh_idx]["mesh_folder"] 

144 

145# load exp data 

146df_phys_data_postproc_EMG = pd.read_hdf(subject.exp[exp_idx]["fn_exp_hdf5"][0], "phys_data/postproc/EMG") 

147 

148convergence_results = {'muscle': [], 

149 'qoi': [], 

150 'fun': [], 

151 'nrmsd': [], 

152 'geodesic dist': []} 

153 

154for i_m, m in enumerate(muscles): 

155 try: 

156 m = str(int(m)) 

157 except ValueError: 

158 pass 

159 print(f"\n> Muscle: {m}") 

160 

161 # load MEPs 

162 mep = np.array(df_phys_data_postproc_EMG[f"p2p_{m}"]) 

163 

164 for i_fun, fun in enumerate(functions): 

165 if neuronmodel and not (fun == pynibs.expio.fit_funs.sigmoid4 or fun == pynibs.expio.fit_funs.sigmoid4_log): 

166 raise NotImplementedError("Neuron regression can only be " 

167 "performed with sigmoid4 or sigmoid4_log functions.") 

168 

169 print(f"> fun: {fun.__name__}") 

170 

171 gof_dict = dict() 

172 

173 for i_e_qoi, e_qoi in enumerate(e_qoi_list): 

174 print(f"> E-QOI: {e_qoi}") 

175 

176 if e_qoi == "E_norm": 

177 select_signed_data = True 

178 else: 

179 select_signed_data = False 

180 

181 # load electric field 

182 with h5py.File(fn_e_roi, "r") as f: 

183 if layerid is not None: 

184 print(f"Performing neuron-enhanced mapping on Layer {layerid} using '{neuronmodel}' approach") 

185 e_matrix = f[f"/{layerid}/E_mag"][:] 

186 e_matrix_orig = f[f"/{layerid}/E_mag"][:] 

187 # load e-field angle to local surface normal (= theta) 

188 # and relative gradient of e-field between GM and WM (= gradient) 

189 theta = f[f"/{layerid}/E_theta"][:] 

190 gradient = f[f"/{layerid}/E_gradient"][:] 

191 

192 # calculate by how much the individual threshold of the 

193 # neuronal population was exceeded by the applied e-field 

194 # neuronmodel can be either "threshold" or "IOcurve" 

195 e_matrix = calc_e_effective(e=e_matrix, 

196 layerid=layerid, 

197 theta=theta, 

198 gradient=gradient, 

199 neuronmodel=neuronmodel, 

200 mep=mep, 

201 waveform=waveform) 

202 

203 if layerid is not None: 

204 for layer in roi.layers: 

205 if layer.id == layerid: 

206 nodes = layer.surface.nodes.node_coord 

207 con = layer.surface.elm.node_number_list[:, :3] - np.min(layer.surface.elm.node_number_list[:, :3]) 

208 else: 

209 e_matrix = f[e_qoi][:] 

210 nodes = roi.node_coord_mid 

211 con = roi.node_number_list 

212 

213 # check for zero e-fields and filter them (problems in FEM!) 

214 zero_mask = (e_matrix == 0).all(axis=1) 

215 

216 if zero_mask.any(): 

217 print(f"Warning! {np.sum(zero_mask)} zero e-fields detected in element! Check FEM! Ignoring them for now!") 

218 e_matrix = e_matrix[np.logical_not(zero_mask), :] 

219 mep = mep[np.logical_not(zero_mask)] 

220 

221 if layerid is not None and neuronmodel == "IOcurve": 

222 gof_dict[e_qoi] = 1/np.var(e_matrix, axis=0) 

223 

224 else: 

225 gof_dict[e_qoi] = regress_data(e_matrix=e_matrix, 

226 mep=mep, 

227 fun=fun, 

228 n_cpu=n_cpu, 

229 con=con, 

230 n_refit=n_refit, 

231 return_fits=False, 

232 score_type=score_type, 

233 verbose=verbose, 

234 pool=None, 

235 refit_discontinuities=True, 

236 select_signed_data=select_signed_data) 

237 

238 if convergence: 

239 print(f"Computing n-1 results to assess convergence.") 

240 res_prev = regress_data(e_matrix=e_matrix[:-1, :], 

241 mep=mep[:-1], 

242 fun=fun, 

243 n_cpu=n_cpu, 

244 con=con, 

245 n_refit=n_refit, 

246 return_fits=False, 

247 score_type=score_type, 

248 verbose=verbose, 

249 pool=None, 

250 refit_discontinuities=True, 

251 select_signed_data=select_signed_data) 

252 

253 # compute normalised root-mean-square deviation between n and n-1 solution 

254 norm_to_perc = 99 

255 res_final = gof_dict[e_qoi] / np.percentile(gof_dict[e_qoi][gof_dict[e_qoi] > 0], 

256 norm_to_perc) 

257 res_prev = res_prev / np.percentile(res_prev[res_prev > 0], norm_to_perc) 

258 nrmsd = pynibs.util.quality_measures.nrmsd(res_final, res_prev) 

259 

260 # compute geodesic distance between best element n and n-1 solution 

261 best_elm = np.argmax(res_final) 

262 best_elm_prev = np.argmax(res_prev) 

263 nodes_dist, tris_dist = pynibs.util.quality_measures.geodesic_dist(nodes, 

264 con, 

265 best_elm, 

266 source_is_node=False) 

267 geod_dist = tris_dist[best_elm_prev] 

268 

269 convergence_results['muscle'].append(m) 

270 convergence_results['qoi'].append(e_qoi) 

271 convergence_results['fun'].append(fun.__name__) 

272 convergence_results['nrmsd'].append(nrmsd) 

273 convergence_results['geodesic dist'].append(geod_dist) 

274 

275 # save results 

276 fn_gof_hdf5 = os.path.join(fn_results, m, fun.__name__, f"r2{fn_result_suffix}_roi_data.hdf5") 

277 

278 # create folder 

279 if not os.path.exists(os.path.split(fn_gof_hdf5)[0]): 

280 os.makedirs(os.path.split(fn_gof_hdf5)[0]) 

281 fn_geo_hdf5 = os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_roi_geo.hdf5") 

282 

283 # write hdf5 _geo file 

284 pynibs.write_geo_hdf5_surf(out_fn=fn_geo_hdf5, 

285 points=nodes, 

286 con=con, 

287 replace=True, 

288 hdf5_path='/mesh') 

289 

290 # write _data file 

291 data = [gof_dict[e_qoi] for e_qoi in e_qoi_list] 

292 data_names = [f'c_{e_qoi}' for e_qoi in e_qoi_list] 

293 

294 print(f"Writing results to {fn_gof_hdf5}.") 

295 pynibs.write_data_hdf5_surf(data=data, 

296 data_names=data_names, 

297 data_hdf_fn_out=fn_gof_hdf5, 

298 geo_hdf_fn=fn_geo_hdf5, 

299 replace=True) 

300 

301 if not map2surf or layerid is not None: # skip this step also when mapping on the layers is requested 

302 continue 

303 print("> Mapping data to brain surface") 

304 c_mapped = pynibs.mesh.map_data_to_surface(datasets=data, 

305 points_datasets=[roi.node_coord_mid] * len(data), 

306 con_datasets=[roi.node_number_list] * len(data), 

307 fname_fsl_gm=[None, None], 

308 fname_fsl_wm=[None, None], 

309 fname_midlayer=[ 

310 os.path.join(mesh_folder, 

311 subject.mesh[mesh_idx]['fn_lh_midlayer']), 

312 os.path.join(mesh_folder, 

313 subject.mesh[mesh_idx]['fn_rh_midlayer']) 

314 ], 

315 delta=subject.roi[mesh_idx][roi_idx]['delta'], 

316 input_data_in_center=True, 

317 return_data_in_center=True, 

318 data_substitute=-1) 

319 

320 # recreate complete midlayer surface to write in .hdf5 geo file 

321 points_midlayer, con_midlayer = pynibs.make_GM_WM_surface(gm_surf_fname=[subject.mesh[mesh_idx]['fn_lh_gm'], 

322 subject.mesh[mesh_idx]['fn_rh_gm']], 

323 wm_surf_fname=[subject.mesh[mesh_idx]['fn_lh_wm'], 

324 subject.mesh[mesh_idx]['fn_rh_wm']], 

325 mesh_folder=mesh_folder, 

326 midlayer_surf_fname=[ 

327 subject.mesh[mesh_idx]['fn_lh_midlayer'], 

328 subject.mesh[mesh_idx]['fn_rh_midlayer'] 

329 ], 

330 delta=subject.roi[mesh_idx][roi_idx]['delta'], 

331 X_ROI=None, 

332 Y_ROI=None, 

333 Z_ROI=None, 

334 layer=1, 

335 fn_mask=None) 

336 

337 # save hdf5 _geo file (mapped) 

338 print(f" > Writing mapped brain and roi to {os.path.join(os.path.split(fn_gof_hdf5)[0], f'r2{fn_result_suffix}_data.hdf5')}") 

339 pynibs.write_geo_hdf5_surf(out_fn=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_geo.hdf5"), 

340 points=points_midlayer, 

341 con=con_midlayer, 

342 replace=True, 

343 hdf5_path='/mesh') 

344 

345 # save hdf5 _data file (mapped) 

346 

347 pynibs.write_data_hdf5_surf(data=c_mapped, 

348 data_names=data_names, 

349 data_hdf_fn_out=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_data.hdf5"), 

350 geo_hdf_fn=os.path.join(os.path.split(fn_gof_hdf5)[0], f"r2{fn_result_suffix}_geo.hdf5"), 

351 replace=True) 

352 

353print("DONE") 

354 

355if convergence: 

356 print("Convergence for n vs n-1 stimulations:") 

357 convergence_results = pd.DataFrame().from_dict(convergence_results) 

358 print(convergence_results) 

359 convergence_fn = os.path.join(fn_results, f"convergence{fn_result_suffix}.csv") 

360 print(f"Saved to {convergence_fn}") 

361 convergence_results.to_csv(convergence_fn) 

362print("====")