Coverage for nuremics\core\workflow.py: 81%

896 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-06-12 16:54 +0200

1from __future__ import annotations 

2 

3import os 

4import sys 

5from tkinter import filedialog 

6from tkinter import * 

7 

8import json 

9import shutil 

10import numpy as np 

11import pandas as pd 

12from pathlib import Path 

13from termcolor import colored 

14 

15from .process import Process 

16from .utils import ( 

17 get_self_method_calls, 

18 only_function_calls, 

19 extract_inputs_and_types, 

20 extract_analysis, 

21 extract_self_output_keys, 

22) 

23from importlib.resources import files 

24 

25 

26class WorkFlow: 

27 """Manage workflow of processes.""" 

28 

29 def __init__( 

30 self, 

31 app_name: str, 

32 working_dir: Path, 

33 processes: list, 

34 studies: list = ["Default"], 

35 verbose: bool = True, 

36 ): 

37 """Initialization.""" 

38 

39 # -------------------- # 

40 # Initialize variables # 

41 # -------------------- # 

42 self.app_name = app_name 

43 self.studies = studies 

44 self.processes = processes 

45 self.list_processes = [] 

46 self.dict_inputs = {} 

47 self.dict_datasets = {} 

48 self.dict_studies = {} 

49 self.dict_process = {} 

50 self.dict_analysis = {} 

51 self.user_params = [] 

52 self.user_paths = [] 

53 self.output_paths = [] 

54 self.overall_analysis = [] 

55 self.analysis_settings = {} 

56 self.params_type = {} 

57 self.operations_by_process = {} 

58 self.inputs_by_process = {} 

59 self.params_by_process = {} 

60 self.paths_by_process = {} 

61 self.outputs_by_process = {} 

62 self.analysis_by_process = {} 

63 self.settings_by_process = {} 

64 self.params_plug = {} 

65 self.paths_plug = {} 

66 self.outputs_plug = {} 

67 self.analysis_plug = {} 

68 self.studies_modif = {} 

69 self.studies_messages = {} 

70 self.studies_config = {} 

71 self.fixed_params_messages = {} 

72 self.fixed_params_config = {} 

73 self.fixed_paths_messages = {} 

74 self.fixed_paths_config = {} 

75 self.variable_params_messages = {} 

76 self.variable_params_config = {} 

77 self.variable_paths_messages = {} 

78 self.variable_paths_config = {} 

79 self.fixed_params = {} 

80 self.fixed_paths = {} 

81 self.variable_params = {} 

82 self.variable_paths = {} 

83 self.dict_fixed_params = {} 

84 self.dict_variable_params = {} 

85 self.dict_user_paths = {} 

86 self.dict_paths = {} 

87 self.diagram = {} 

88 self.verbose = verbose 

89 

90 # ------------------------ # 

91 # Define working directory # 

92 # ------------------------ # 

93 if working_dir is None: 

94 root = Tk() 

95 root.withdraw() 

96 self.working_dir = Path(filedialog.askdirectory()) / self.app_name 

97 else: 

98 self.working_dir = working_dir / self.app_name 

99 

100 # ------------------------ # 

101 # Create working directory # 

102 # ------------------------ # 

103 self.working_dir.mkdir( 

104 exist_ok=True, 

105 parents=True, 

106 ) 

107 

108 # ----------------------- # 

109 # Go to working directory # 

110 # ----------------------- # 

111 os.chdir(self.working_dir) 

112 

113 # ------------------------ # 

114 # Define list of processes # 

115 # ------------------------ # 

116 for proc in self.processes: 

117 self.list_processes.append(proc["process"].__name__) 

118 

119 def print_logo(self): 

120 """Print ASCII NUREMICS logo""" 

121 

122 ascii_logo_path:str = files("nuremics.resources").joinpath("logo.txt") 

123 f = open(ascii_logo_path, "r") 

124 for line in f: 

125 lines = f.readlines() 

126 print() 

127 for line in lines: 

128 print(colored(line.rstrip(), "yellow")) 

129 

130 def print_application(self): 

131 """Print application""" 

132 

133 # Printing 

134 print() 

135 print( 

136 colored("> APPLICATION <", "blue", attrs=["reverse"]), 

137 ) 

138 print() 

139 print( 

140 colored(f"| Workflow |", "magenta"), 

141 ) 

142 print( 

143 colored(f"{self.app_name}_____", "blue"), 

144 ) 

145 

146 # Define number of spaces taken by the workflow print 

147 nb_spaces_app = len(self.app_name)+5 

148 

149 # Print diagram of processes and operations 

150 error = False 

151 for i, proc in enumerate(self.processes): 

152 

153 proc_name = proc["process"].__name__ 

154 process = proc["process"] 

155 this_process:Process = process() 

156 

157 # Define number of spaces taken by the application print 

158 nb_spaces_proc = len(proc_name)+10 

159 

160 # Get list of operations for current process 

161 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__) 

162 

163 # Test if process call contains only call to operations 

164 valid_call = only_function_calls( 

165 method=this_process.__call__, 

166 allowed_methods=self.operations_by_process[proc_name] 

167 ) 

168 

169 # Printing 

170 if valid_call: 

171 print( 

172 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue"), 

173 ) 

174 for op_name in self.operations_by_process[proc_name]: 

175 

176 if i < len(self.processes)-1: 

177 text = " "*nb_spaces_app+"|"+" "*nb_spaces_proc+f"|_____{op_name}" 

178 else: 

179 text = " "*(nb_spaces_app+1)+" "*nb_spaces_proc+f"|_____{op_name}" 

180 

181 # Printing 

182 print( 

183 colored(text, "blue"), 

184 ) 

185 else: 

186 print( 

187 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue") + \ 

188 colored("(X)", "red") 

189 ) 

190 error = True 

191 

192 if i < len(self.processes)-1: 

193 print( 

194 colored(" "*nb_spaces_app+"|", "blue"), 

195 ) 

196 

197 if error: 

198 print() 

199 print(colored(f"(X) Each process must only call its internal function(s):", "red")) 

200 print() 

201 print(colored(f" def __call__(self):", "red")) 

202 print(colored(f" super().__call__()", "red")) 

203 print() 

204 print(colored(f" self.operation1()", "red")) 

205 print(colored(f" self.operation2()", "red")) 

206 print(colored(f" self.operation3()", "red")) 

207 print(colored(f" ...", "red")) 

208 sys.exit(1) 

209 

210 def get_inputs(self): 

211 """Get inputs""" 

212 

213 for proc in self.processes: 

214 

215 process = proc["process"] 

216 name = proc["process"].__name__ 

217 this_process:Process = process() 

218 

219 self.inputs_by_process[name] = extract_inputs_and_types(this_process) 

220 self.analysis_by_process[name], self.settings_by_process[name] = extract_analysis(this_process) 

221 

222 self.params_by_process[name] = {} 

223 self.paths_by_process[name] = [] 

224 self.params_plug[name] = {} 

225 self.paths_plug[name] = {} 

226 self.analysis_plug[name] = {} 

227 

228 for key, value_type in self.inputs_by_process[name].items(): 

229 

230 # Get the module and type name 

231 module_name = value_type.__module__ 

232 type_name = value_type.__name__ 

233 

234 if module_name == "builtins": 

235 type = type_name 

236 else: 

237 type = f"{module_name}.{type_name}" 

238 

239 if key not in self.analysis_by_process[name]: 

240 

241 if type == "pathlib.Path": 

242 self.paths_by_process[name].append(key) 

243 if ("user_paths" in proc) and (key in proc["user_paths"]): 

244 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"] 

245 elif ("required_paths" in proc) and (key in proc["required_paths"]): 

246 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"] 

247 else: 

248 self.paths_plug[name][key] = None 

249 

250 else: 

251 self.params_by_process[name][key] = [value_type, type] 

252 if ("user_params" in proc) and (key in proc["user_params"]): 

253 self.params_plug[name][key] = [proc["user_params"][key], "user_params"] 

254 elif ("hard_params" in proc) and (key in proc["hard_params"]): 

255 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"] 

256 else: 

257 self.params_plug[name][key] = None 

258 

259 else: 

260 if ("overall_analysis" in proc) and (key in proc["overall_analysis"]): 

261 self.analysis_plug[name][key] = proc["overall_analysis"][key] 

262 else: 

263 self.analysis_plug[name][key] = None 

264 

265 def get_outputs(self): 

266 """Get outputs""" 

267 

268 for proc in self.processes: 

269 

270 process = proc["process"] 

271 name = proc["process"].__name__ 

272 this_process:Process = process() 

273 

274 self.outputs_by_process[name] = [] 

275 self.outputs_plug[name] = {} 

276 

277 for op in self.operations_by_process[name]: 

278 output_paths = extract_self_output_keys(getattr(this_process, op)) 

279 for output_path in output_paths: 

280 if output_path not in self.outputs_by_process[name]: 

281 self.outputs_by_process[name].append(output_path) 

282 

283 for output in self.outputs_by_process[name]: 

284 if ("output_paths" in proc) and (output in proc["output_paths"]): 

285 self.outputs_plug[name][output] = proc["output_paths"][output] 

286 else: 

287 self.outputs_plug[name][output] = None 

288 

289 def init_config(self): 

290 """Initialize configuration""" 

291 

292 for _, process in enumerate(self.processes): 

293 

294 name = process["process"].__name__ 

295 

296 # Define list of user parameters 

297 if "user_params" in process: 

298 for key, value in process["user_params"].items(): 

299 if key in self.params_by_process[name]: 

300 self.user_params.append(value) 

301 else: 

302 print() 

303 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red")) 

304 sys.exit(1) 

305 

306 # Check on hard parameters 

307 if "hard_params" in process: 

308 for key, _ in process["hard_params"].items(): 

309 if key not in self.params_by_process[name]: 

310 print() 

311 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red")) 

312 sys.exit(1) 

313 

314 # Define list of user paths 

315 if "user_paths" in process: 

316 for key, value in process["user_paths"].items(): 

317 if key in self.paths_by_process[name]: 

318 self.user_paths.append(value) 

319 else: 

320 print() 

321 print(colored(f"(X) {key} is not an input path of {name}.", "red")) 

322 sys.exit(1) 

323 

324 # Check on required paths 

325 if "required_paths" in process: 

326 for _, value in process["required_paths"].items(): 

327 if value not in self.output_paths: 

328 print() 

329 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red")) 

330 sys.exit(1) 

331 

332 # Define list of output paths 

333 if "output_paths" in process: 

334 for key, value in process["output_paths"].items(): 

335 if key in self.outputs_by_process[name]: 

336 if value in self.output_paths: 

337 print() 

338 print(colored(f'(X) {value} is defined twice in "output_paths".', "red")) 

339 sys.exit(1) 

340 else: 

341 self.output_paths.append(value) 

342 else: 

343 print() 

344 print(colored(f"(X) {key} is not an output path of {name}.", "red")) 

345 sys.exit(1) 

346 

347 # Define list of outputs for analysis 

348 if "overall_analysis" in process: 

349 for key, value in process["overall_analysis"].items(): 

350 if key in self.analysis_by_process[name]: 

351 self.overall_analysis.append(value) 

352 else: 

353 print() 

354 print(colored(f"(X) {key} is not an output analysis of {name}.", "red")) 

355 sys.exit(1) 

356 

357 if value not in self.output_paths: 

358 print() 

359 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red")) 

360 sys.exit(1) 

361 

362 # Delete duplicates 

363 self.user_params = list(dict.fromkeys(self.user_params)) 

364 self.user_paths = list(dict.fromkeys(self.user_paths)) 

365 self.overall_analysis = list(dict.fromkeys(self.overall_analysis)) 

366 

367 # Define analysis settings 

368 for output in self.overall_analysis: 

369 self.analysis_settings[output] = {} 

370 

371 for proc, settings in self.settings_by_process.items(): 

372 if settings: 

373 for out, setting in settings.items(): 

374 output = self.analysis_plug[proc][out] 

375 self.analysis_settings[output].update(setting) 

376 

377 def print_processes(self): 

378 """Print processes""" 

379 

380 for proc in self.processes: 

381 

382 name = proc["process"].__name__ 

383 

384 # Printing 

385 print() 

386 print( 

387 colored(f"| {name} |", "magenta"), 

388 ) 

389 

390 # ---------------- # 

391 # Input parameters # 

392 # ---------------- # 

393 print( 

394 colored(f"> Input Parameter(s) :", "blue"), 

395 ) 

396 if len(self.params_by_process[name]) == 0: 

397 print( 

398 colored("None.", "blue"), 

399 ) 

400 else: 

401 lines_proc = [] 

402 lines_user = [] 

403 error = False 

404 for key, value in self.params_by_process[name].items(): 

405 

406 # Process 

407 text_type_proc = f"({value[1]})" 

408 text_variable_proc = key 

409 lines_proc.append((text_type_proc, text_variable_proc)) 

410 

411 # User 

412 if self.params_plug[name][key] is not None: 

413 text_variable_user = str(self.params_plug[name][key][0]) 

414 text_definition_user = f"({self.params_plug[name][key][1]})" 

415 lines_user.append((text_variable_user, text_definition_user)) 

416 else: 

417 lines_user.append(("Not defined", "(X)")) 

418 error = True 

419 

420 type_proc_width = max(len(t) for t, _ in lines_proc)+1 

421 variable_proc_width = max(len(p) for _, p in lines_proc)+1 

422 variable_user_width = max(len(t) for t, _ in lines_user)+1 

423 definition_user_width = max(len(p) for _, p in lines_user)+1 

424 

425 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user): 

426 proc_str = type_proc.ljust(type_proc_width)+var_proc.ljust(variable_proc_width)+"-----|" 

427 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

428 if "(X)" in user_str: color = "red" 

429 else: color = "green" 

430 print(colored(proc_str, "blue")+colored(user_str, color)) 

431 

432 if error: 

433 print() 

434 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red")) 

435 sys.exit(1) 

436 

437 # ----------- # 

438 # Input paths # 

439 # ----------- # 

440 print( 

441 colored(f"> Input Path(s) :", "blue"), 

442 ) 

443 if len(self.paths_by_process[name]) == 0: 

444 print( 

445 colored("None.", "blue"), 

446 ) 

447 else: 

448 lines_proc = [] 

449 lines_user = [] 

450 error = False 

451 for path in self.paths_by_process[name]: 

452 

453 # Process 

454 lines_proc.append(path) 

455 

456 # User 

457 if self.paths_plug[name][path] is not None: 

458 text_variable_user = self.paths_plug[name][path][0] 

459 text_definition_user = f"({self.paths_plug[name][path][1]})" 

460 lines_user.append((text_variable_user, text_definition_user)) 

461 else: 

462 lines_user.append(("Not defined", "(X)")) 

463 error = True 

464 

465 proc_width = max(len(t) for t in lines_proc)+1 

466 variable_user_width = max(len(t) for t, _ in lines_user)+1 

467 definition_user_width = max(len(p) for _, p in lines_user)+1 

468 

469 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

470 proc_str = proc.ljust(proc_width)+"-----|" 

471 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

472 if "(X)" in user_str: color = "red" 

473 else: color = "green" 

474 print(colored(proc_str, "blue")+colored(user_str, color)) 

475 

476 if error: 

477 print() 

478 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red")) 

479 sys.exit(1) 

480 

481 # ---------------- # 

482 # Input analysis # 

483 # ---------------- # 

484 print( 

485 colored(f"> Input Analysis :", "blue"), 

486 ) 

487 if len(self.analysis_by_process[name]) == 0: 

488 print( 

489 colored("None.", "blue"), 

490 ) 

491 else: 

492 lines_proc = [] 

493 lines_user = [] 

494 error = False 

495 for out in self.analysis_by_process[name]: 

496 

497 # Process 

498 lines_proc.append(out) 

499 

500 # User 

501 if self.analysis_plug[name][out] is not None: 

502 text_variable_user = self.analysis_plug[name][out] 

503 text_definition_user = "(overall_analysis)" 

504 lines_user.append((text_variable_user, text_definition_user)) 

505 else: 

506 lines_user.append(("Not defined", "(X)")) 

507 error = True 

508 

509 proc_width = max(len(t) for t in lines_proc)+1 

510 variable_user_width = max(len(t) for t, _ in lines_user)+1 

511 definition_user_width = max(len(p) for _, p in lines_user)+1 

512 

513 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

514 proc_str = proc.ljust(proc_width)+"-----|" 

515 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

516 if "(X)" in user_str: color = "red" 

517 else: color = "green" 

518 print(colored(proc_str, "blue")+colored(user_str, color)) 

519 

520 if error: 

521 print() 

522 print(colored('(X) Please define all output analysis in "overall_analysis".', "red")) 

523 sys.exit(1) 

524 

525 # ------------ # 

526 # Output paths # 

527 # ------------ # 

528 print( 

529 colored(f"> Output Path(s) :", "blue"), 

530 ) 

531 if len(self.outputs_by_process[name]) == 0: 

532 print( 

533 colored("None.", "blue"), 

534 ) 

535 else: 

536 lines_proc = [] 

537 lines_user = [] 

538 error = False 

539 for path in self.outputs_by_process[name]: 

540 

541 # Process 

542 lines_proc.append(path) 

543 

544 # User 

545 if self.outputs_plug[name][path] is not None: 

546 text_variable_user = self.outputs_plug[name][path] 

547 text_definition_user = "(output_paths)" 

548 lines_user.append((text_variable_user, text_definition_user)) 

549 else: 

550 lines_user.append(("Not defined", "(X)")) 

551 error = True 

552 

553 proc_width = max(len(t) for t in lines_proc)+1 

554 variable_user_width = max(len(t) for t, _ in lines_user)+1 

555 definition_user_width = max(len(p) for _, p in lines_user)+1 

556 

557 for (proc), (user_var, user_def) in zip(lines_proc, lines_user): 

558 proc_str = proc.ljust(proc_width)+"-----|" 

559 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width) 

560 if "(X)" in user_str: color = "red" 

561 else: color = "green" 

562 print(colored(proc_str, "blue")+colored(user_str, color)) 

563 

564 if error: 

565 print() 

566 print(colored('(X) Please define all output paths in "output_paths".', "red")) 

567 sys.exit(1) 

568 

569 def set_user_params_types(self): 

570 """Set types of user parameters""" 

571 

572 # Gather all types of parameters 

573 for proc, params in self.params_by_process.items(): 

574 for param, type in params.items(): 

575 user_param = self.params_plug[proc][param][0] 

576 if user_param in self.user_params: 

577 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]): 

578 print() 

579 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red")) 

580 print(colored(f'> Please consider defining a new user parameter in "user_params".', "red")) 

581 sys.exit(1) 

582 self.params_type[user_param] = type 

583 

584 def print_io(self): 

585 """Print inputs / outputs""" 

586 

587 # Printing 

588 print() 

589 print( 

590 colored("> INPUTS <", "blue", attrs=["reverse"]), 

591 ) 

592 

593 # Print input parameters 

594 print() 

595 print( 

596 colored(f"| User Parameters |", "magenta"), 

597 ) 

598 for param, type in self.params_type.items(): 

599 print( 

600 colored(f"> {param} ({type[1]})", "blue"), 

601 ) 

602 if len(list(self.params_type.items())) == 0: 

603 print( 

604 colored("None.", "blue"), 

605 ) 

606 

607 # Print input paths 

608 print() 

609 print( 

610 colored(f"| User Paths |", "magenta"), 

611 ) 

612 for path in self.user_paths: 

613 print( 

614 colored(f"> {path}", "blue"), 

615 ) 

616 if len(self.user_paths) == 0: 

617 print( 

618 colored("None.", "blue"), 

619 ) 

620 

621 # Printing 

622 print() 

623 print( 

624 colored("> OUTPUTS <", "blue", attrs=["reverse"]), 

625 ) 

626 print() 

627 for path in self.output_paths: 

628 print( 

629 colored(f"> {path}", "blue"), 

630 ) 

631 if len(self.output_paths) == 0: 

632 print( 

633 colored("None.", "blue"), 

634 ) 

635 

636 def init_studies(self): 

637 """Initialize studies""" 

638 

639 # Open studies json file if existing 

640 if os.path.exists("studies.json"): 

641 with open("studies.json") as f: 

642 self.dict_studies = json.load(f) 

643 

644 # Clean studies 

645 for study in list(self.dict_studies.keys()): 

646 if study not in self.studies: 

647 del self.dict_studies[study] 

648 

649 # Clean input parameters 

650 for study in list(self.dict_studies.keys()): 

651 for param in list(self.dict_studies[study]["user_params"]): 

652 if param not in self.user_params: 

653 del self.dict_studies[study]["user_params"][param] 

654 

655 # Clean input paths 

656 for study in list(self.dict_studies.keys()): 

657 for path in list(self.dict_studies[study]["user_paths"]): 

658 if path not in self.user_paths: 

659 del self.dict_studies[study]["user_paths"][path] 

660 

661 # Clean output paths 

662 for study in list(self.dict_studies.keys()): 

663 for path in list(self.dict_studies[study]["clean_outputs"]): 

664 if path not in self.output_paths: 

665 del self.dict_studies[study]["clean_outputs"][path] 

666 

667 # Initialize input parameters/paths 

668 for study in self.studies: 

669 

670 if study not in self.dict_studies: 

671 self.dict_studies[study] = { 

672 "execute": True, 

673 "user_params": {}, 

674 "user_paths": {}, 

675 "clean_outputs": {}, 

676 } 

677 

678 for param in self.user_params: 

679 if param not in self.dict_studies[study]["user_params"]: 

680 if study == "Default": 

681 self.dict_studies[study]["user_params"][param] = False 

682 else: 

683 self.dict_studies[study]["user_params"][param] = None 

684 

685 for file in self.user_paths: 

686 if file not in self.dict_studies[study]["user_paths"]: 

687 if study == "Default": 

688 self.dict_studies[study]["user_paths"][file] = False 

689 else: 

690 self.dict_studies[study]["user_paths"][file] = None 

691 

692 for path in self.output_paths: 

693 if path not in self.dict_studies[study]["clean_outputs"]: 

694 self.dict_studies[study]["clean_outputs"][path] = False 

695 

696 # Reordering 

697 self.dict_studies[study]["user_params"] = {k: self.dict_studies[study]["user_params"][k] for k in self.user_params} 

698 self.dict_studies[study]["user_paths"] = {k: self.dict_studies[study]["user_paths"][k] for k in self.user_paths} 

699 

700 # Write studies json file 

701 with open("studies.json", "w") as f: 

702 json.dump(self.dict_studies, f, indent=4) 

703 

704 def test_studies_modification(self): 

705 """Test if studies configurations have been modified""" 

706 

707 # Loop over studies 

708 for study in self.studies: 

709 

710 self.studies_modif[study] = False 

711 

712 study_file = Path(study) / ".study.json" 

713 if study_file.exists(): 

714 with open(study_file) as f: 

715 dict_study = json.load(f) 

716 if (self.dict_studies[study]["user_params"] != dict_study["user_params"]) or \ 

717 (self.dict_studies[study]["user_paths"] != dict_study["user_paths"]): 

718 self.studies_modif[study] = True 

719 

720 def test_studies_settings(self): 

721 """Check if studies has been properly configured""" 

722 

723 # Loop over studies 

724 for study in self.studies: 

725 

726 self.studies_messages[study] = [] 

727 self.studies_config[study] = True 

728 

729 for param in self.user_params: 

730 if self.dict_studies[study]["user_params"][param] is None: 

731 self.studies_messages[study].append(f"(X) {param} not configured.") 

732 self.studies_config[study] = False 

733 else: 

734 if self.dict_studies[study]["user_params"][param]: text = "variable" 

735 else: text = "fixed" 

736 self.studies_messages[study].append(f"(V) {param} is {text}.") 

737 

738 for file in self.user_paths: 

739 if self.dict_studies[study]["user_paths"][file] is None: 

740 self.studies_messages[study].append(f"(X) {file} not configured.") 

741 self.studies_config[study] = False 

742 else: 

743 if self.dict_studies[study]["user_paths"][file]: text = "variable" 

744 else: text = "fixed" 

745 self.studies_messages[study].append(f"(V) {file} is {text}.") 

746 

747 def print_studies(self): 

748 """Print studies""" 

749 

750 print() 

751 print( 

752 colored("> STUDIES <", "blue", attrs=["reverse"]), 

753 ) 

754 for study in self.studies: 

755 

756 # Printing 

757 print() 

758 print( 

759 colored(f"| {study} |", "magenta"), 

760 ) 

761 if self.studies_modif[study]: 

762 print( 

763 colored(f"(!) Configuration has been modified.", "yellow"), 

764 ) 

765 self.clean_output_tree(study) 

766 

767 # Delete analysis file 

768 path = Path(study) / "analysis.json" 

769 if path.exists(): path.unlink() 

770 

771 for message in self.studies_messages[study]: 

772 if "(V)" in message: print(colored(message, "green")) 

773 elif "(X)" in message: print(colored(message, "red")) 

774 

775 if not self.studies_config[study]: 

776 print() 

777 print(colored(f"(X) Please configure file :", "red")) 

778 print(colored(f"> {str(Path.cwd() / "studies.json")}", "red")) 

779 sys.exit(1) 

780 

781 def init_process_settings(self): 

782 """Initialize process settings""" 

783 

784 # Loop over studies 

785 for study in self.studies: 

786 

787 # Open process json file if existing 

788 process_file = Path(study) / "process.json" 

789 if os.path.exists(process_file): 

790 with open(process_file) as f: 

791 self.dict_process[study] = json.load(f) 

792 else: 

793 self.dict_process[study] = {} 

794 

795 # Clean processes 

796 for process in list(self.dict_process[study].keys()): 

797 if process not in self.list_processes: 

798 del self.dict_process[study][process] 

799 

800 # Loop over processes 

801 for process in self.list_processes: 

802 if process not in self.dict_process[study]: 

803 self.dict_process[study][process] = { 

804 "execute": True, 

805 "verbose": self.verbose, 

806 } 

807 

808 # Reordering 

809 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes} 

810 

811 # Write studies json file 

812 with open(process_file, "w") as f: 

813 json.dump(self.dict_process[study], f, indent=4) 

814 

815 def configure_inputs(self): 

816 """Configure inputs with lists of fixed/variable parameters/paths""" 

817 

818 for study in self.studies: 

819 

820 # Define list of fixed/variable parameters 

821 fixed_params = [] 

822 variable_params = [] 

823 for key, value in self.dict_studies[study]["user_params"].items(): 

824 if value is True: variable_params.append(key) 

825 else: fixed_params.append(key) 

826 

827 # Define list of fixed/variable paths 

828 fixed_paths = [] 

829 variable_paths = [] 

830 for key, value in self.dict_studies[study]["user_paths"].items(): 

831 if value is True: variable_paths.append(key) 

832 else: fixed_paths.append(key) 

833 

834 self.fixed_params[study] = fixed_params 

835 self.variable_params[study] = variable_params 

836 self.fixed_paths[study] = fixed_paths 

837 self.variable_paths[study] = variable_paths 

838 

839 def init_data_tree(self): 

840 """Initialize data tree""" 

841 

842 # Loop over studies 

843 for study in self.studies: 

844 

845 # Initialize study directory 

846 study_dir:Path = self.working_dir / study 

847 study_dir.mkdir( 

848 exist_ok=True, 

849 parents=True, 

850 ) 

851 

852 # Write study json file 

853 with open(study_dir / ".study.json", "w") as f: 

854 json.dump(self.dict_studies[study], f, indent=4) 

855 

856 # Initialize inputs csv 

857 inputs_file:Path = study_dir / "inputs.csv" 

858 if (len(self.variable_params[study]) > 0) or \ 

859 (len(self.variable_paths[study]) > 0): 

860 

861 if not inputs_file.exists(): 

862 

863 # Create empty input dataframe 

864 df_inputs = pd.DataFrame(columns=["ID"]+self.variable_params[study]+["EXECUTE"]) 

865 

866 # Write input dataframe 

867 df_inputs.to_csv( 

868 path_or_buf=inputs_file, 

869 index=False, 

870 ) 

871 

872 else: 

873 

874 # Read input dataframe 

875 df_inputs = pd.read_csv( 

876 filepath_or_buffer=inputs_file, 

877 index_col=0, 

878 ) 

879 

880 # Update variable parameters 

881 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns}) 

882 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]] 

883 

884 # Set default execution 

885 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int) 

886 

887 # Write input dataframe 

888 df_inputs.to_csv( 

889 path_or_buf=inputs_file, 

890 ) 

891 

892 # Define list of datasets 

893 self.dict_datasets[study] = df_inputs.index.tolist() 

894 

895 else: 

896 # Delete file 

897 if inputs_file.exists(): inputs_file.unlink() 

898 

899 # Initialize inputs json file 

900 inputs_file:Path = study_dir / "inputs.json" 

901 if (len(self.fixed_params[study]) > 0) or \ 

902 (len(self.fixed_paths[study]) > 0) or \ 

903 (len(self.variable_paths[study]) > 0) : 

904 

905 # Create file 

906 if not inputs_file.exists(): 

907 

908 # Initialize dictionary 

909 dict_inputs = {} 

910 if len(self.fixed_params[study]) > 0: 

911 for param in self.fixed_params[study]: 

912 dict_inputs[param] = None 

913 if len(self.fixed_paths[study]) > 0: 

914 for path in self.fixed_paths[study]: 

915 dict_inputs[path] = None 

916 if len(self.variable_paths[study]) > 0: 

917 for path in self.variable_paths[study]: 

918 dict_inputs[path] = {} 

919 for index in df_inputs.index: 

920 dict_inputs[path][index] = None 

921 

922 # Write json 

923 with open(inputs_file, "w") as f: 

924 json.dump(dict_inputs, f, indent=4) 

925 

926 # Update file 

927 else: 

928 

929 # Read inputs json 

930 with open(inputs_file) as f: 

931 dict_inputs = json.load(f) 

932 

933 # Update fixed parameters 

934 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]} 

935 

936 # Update fixed paths 

937 dict_fixed_paths = {} 

938 for path in self.fixed_paths[study]: 

939 value = dict_inputs.get(path, None) 

940 if isinstance(value, dict): 

941 dict_fixed_paths[path] = None 

942 else: 

943 dict_fixed_paths[path] = value 

944 

945 # Update variable paths 

946 dict_variable_paths = {} 

947 for path in self.variable_paths[study]: 

948 existing_values = dict_inputs.get(path, {}) 

949 if not isinstance(existing_values, dict): 

950 existing_values = {} 

951 dict_variable_paths[path] = { 

952 idx: existing_values.get(idx, None) 

953 for idx in df_inputs.index 

954 } 

955 

956 # Update inputs dictionnary 

957 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths} 

958 

959 # Write inputs json 

960 with open(inputs_file, "w") as f: 

961 json.dump(dict_inputs, f, indent=4) 

962 

963 self.dict_inputs[study] = dict_inputs 

964 

965 else: 

966 

967 # Delete file 

968 if inputs_file.exists(): inputs_file.unlink() 

969 

970 self.dict_inputs[study] = {} 

971 

972 # Initialize inputs directory 

973 inputs_dir:Path = study_dir / "0_inputs" 

974 if len(self.user_paths) > 0: 

975 

976 # Create inputs directory (if necessary) 

977 inputs_dir.mkdir( 

978 exist_ok=True, 

979 parents=True, 

980 ) 

981 

982 # Delete fixed paths (if necessary) 

983 input_paths = [f for f in inputs_dir.iterdir()] 

984 for path in input_paths: 

985 resolved_path = path.resolve().name 

986 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"): 

987 if Path(path).is_file(): path.unlink() 

988 else: shutil.rmtree(path) 

989 

990 # Update inputs subfolders for variable paths 

991 datasets_dir:Path = inputs_dir / "0_datasets" 

992 if len(self.variable_paths[study]) > 0: 

993 

994 # Create datasets directory (if necessary) 

995 datasets_dir.mkdir( 

996 exist_ok=True, 

997 parents=True, 

998 ) 

999 

1000 # Create subfolders (if necessary)  

1001 for index in df_inputs.index: 

1002 

1003 inputs_subfolder:Path = datasets_dir / index 

1004 inputs_subfolder.mkdir( 

1005 exist_ok=True, 

1006 parents=True, 

1007 ) 

1008 

1009 # Delete variable paths (if necessary) 

1010 input_paths = [f for f in inputs_subfolder.iterdir()] 

1011 for path in input_paths: 

1012 resolved_path = path.resolve().name 

1013 if resolved_path not in self.variable_paths[study]: 

1014 if Path(path).is_file(): path.unlink() 

1015 else: shutil.rmtree(path) 

1016 

1017 # Delete subfolders (if necessary) 

1018 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()] 

1019 for folder in inputs_subfolders: 

1020 id = os.path.split(folder)[-1] 

1021 if id not in self.dict_datasets[study]: 

1022 shutil.rmtree(folder) 

1023 

1024 else: 

1025 

1026 # Delete datasets folder (if necessary) 

1027 if datasets_dir.exists(): shutil.rmtree(datasets_dir) 

1028 

1029 else: 

1030 # Delete inputs directory (if necessary) 

1031 if inputs_dir.exists(): shutil.rmtree(inputs_dir) 

1032 

1033 def clean_output_tree(self, 

1034 study: str, 

1035 ): 

1036 """Clean output data for a specific study""" 

1037 

1038 # Initialize study directory 

1039 study_dir:Path = self.working_dir / study 

1040 

1041 # Outputs data 

1042 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()] 

1043 for folder in outputs_folders: 

1044 if os.path.split(folder)[-1] != "0_inputs": 

1045 shutil.rmtree(folder) 

1046 

1047 # Paths file 

1048 paths_file = study_dir / ".paths.json" 

1049 if paths_file.exists(): paths_file.unlink() 

1050 

1051 def set_inputs(self): 

1052 """Set all inputs""" 

1053 

1054 # Loop over studies 

1055 for study in self.studies: 

1056 

1057 # Define study directory 

1058 study_dir:Path = self.working_dir / study 

1059 

1060 # Go to study directory 

1061 os.chdir(study_dir) 

1062 

1063 # Initialize dictionary of input paths 

1064 self.dict_user_paths[study] = {} 

1065 

1066 # Fixed parameters  

1067 if len(self.fixed_params[study]) > 0: 

1068 data = self.dict_inputs[study] 

1069 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data} 

1070 else: 

1071 self.dict_fixed_params[study] = {} 

1072 

1073 # Variable parameters 

1074 if (len(self.variable_params[study]) > 0) or \ 

1075 (len(self.variable_paths[study]) > 0): 

1076 

1077 # Read input dataframe 

1078 self.dict_variable_params[study] = pd.read_csv( 

1079 filepath_or_buffer="inputs.csv", 

1080 index_col=0, 

1081 ) 

1082 

1083 else: 

1084 self.dict_variable_params[study] = pd.DataFrame() 

1085 

1086 # Fixed paths 

1087 dict_input_paths = {} 

1088 for file in self.fixed_paths[study]: 

1089 if self.dict_inputs[study][file] is not None: 

1090 dict_input_paths[file] = self.dict_inputs[study][file] 

1091 else: 

1092 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file) 

1093 

1094 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1095 

1096 # Variable paths 

1097 if len(self.variable_paths[study]) > 0: 

1098 

1099 dict_input_paths = {} 

1100 df_inputs = pd.read_csv( 

1101 filepath_or_buffer="inputs.csv", 

1102 index_col=0, 

1103 ) 

1104 for file in self.variable_paths[study]: 

1105 dict_input_paths[file] = {} 

1106 for idx in df_inputs.index: 

1107 if self.dict_inputs[study][file][idx] is not None: 

1108 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx] 

1109 else: 

1110 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file) 

1111 

1112 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths} 

1113 

1114 # Go back to working directory 

1115 os.chdir(self.working_dir) 

1116 

1117 def test_inputs_settings(self): 

1118 """Test that inputs have been properly set""" 

1119 

1120 # Loop over studies 

1121 for study in self.studies: 

1122 

1123 # Define study directory 

1124 study_dir:Path = self.working_dir / study 

1125 

1126 # Go to study directory 

1127 os.chdir(study_dir) 

1128 

1129 self.fixed_params_messages[study] = [] 

1130 self.fixed_paths_messages[study] = [] 

1131 self.fixed_params_config[study] = True 

1132 self.fixed_paths_config[study] = True 

1133 self.variable_params_messages[study] = {} 

1134 self.variable_paths_messages[study] = {} 

1135 self.variable_params_config[study] = {} 

1136 self.variable_paths_config[study] = {} 

1137 

1138 # Fixed parameters 

1139 for param, value in self.dict_fixed_params[study].items(): 

1140 if value is None: 

1141 self.fixed_params_messages[study].append(f"(X) {param}") 

1142 self.fixed_params_config[study] = False 

1143 else: 

1144 if not isinstance(value, self.params_type[param][0]): 

1145 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1146 else: 

1147 self.fixed_params_messages[study].append(f"(V) {param}") 

1148 

1149 # Fixed paths 

1150 for file in self.fixed_paths[study]: 

1151 file_path:Path = Path(self.dict_user_paths[study][file]) 

1152 if not file_path.exists(): 

1153 self.fixed_paths_messages[study].append(f"(X) {file}") 

1154 self.fixed_paths_config[study] = False 

1155 else: 

1156 self.fixed_paths_messages[study].append(f"(V) {file}") 

1157 

1158 # Variable inputs 

1159 if (len(self.variable_params[study]) > 0) or \ 

1160 (len(self.variable_paths[study]) > 0): 

1161 

1162 for index in self.dict_variable_params[study].index: 

1163 

1164 self.variable_params_messages[study][index] = [] 

1165 self.variable_paths_messages[study][index] = [] 

1166 self.variable_params_config[study][index] = True 

1167 self.variable_paths_config[study][index] = True 

1168 

1169 # Variable parameters 

1170 for param in self.variable_params[study]: 

1171 value = self.dict_variable_params[study].at[index, param] 

1172 if pd.isna(value) or value == "": 

1173 self.variable_params_messages[study][index].append(f"(X) {param}") 

1174 self.variable_params_config[study][index] = False 

1175 else: 

1176 if isinstance(value, (np.integer, np.floating, np.bool_)): 

1177 value = value.item() 

1178 if not isinstance(value, self.params_type[param][0]): 

1179 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)") 

1180 else: 

1181 self.variable_params_messages[study][index].append(f"(V) {param}") 

1182 

1183 # Variable paths 

1184 for file in self.variable_paths[study]: 

1185 file_path:Path = Path(self.dict_user_paths[study][file][index]) 

1186 if not file_path.exists(): 

1187 self.variable_paths_messages[study][index].append(f"(X) {file}") 

1188 self.variable_paths_config[study][index] = False 

1189 else: 

1190 self.variable_paths_messages[study][index].append(f"(V) {file}") 

1191 

1192 # Go back to working directory 

1193 os.chdir(self.working_dir) 

1194 

1195 def print_inputs_settings(self): 

1196 """Print inputs settings""" 

1197 

1198 print() 

1199 print( 

1200 colored("> SETTINGS <", "blue", attrs=["reverse"]), 

1201 ) 

1202 for study in self.studies: 

1203 

1204 # Define study directory 

1205 study_dir:Path = self.working_dir / study 

1206 

1207 # Go to study directory 

1208 os.chdir(study_dir) 

1209 

1210 # Printing 

1211 print() 

1212 print(colored(f"| {study} |", "magenta")) 

1213 

1214 # ------------ # 

1215 # Fixed inputs # 

1216 # ------------ # 

1217 list_text = [colored(f"> Common :", "blue")] 

1218 list_errors = [] 

1219 config = True 

1220 type_error = False 

1221 

1222 # Fixed parameters 

1223 for message in self.fixed_params_messages[study]: 

1224 if "(V)" in message: 

1225 list_text.append(colored(message, "green")) 

1226 elif "(X)" in message: 

1227 list_text.append(colored(message, "red")) 

1228 if config: 

1229 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.json")}", "red")) 

1230 config = False 

1231 elif "(!)" in message: 

1232 list_text.append(colored(message, "yellow")) 

1233 type_error = True 

1234 

1235 # Fixed paths 

1236 for i, message in enumerate(self.fixed_paths_messages[study]): 

1237 if "(V)" in message: 

1238 list_text.append(colored(message, "green")) 

1239 elif "(X)" in message: 

1240 file = self.fixed_paths[study][i] 

1241 path = self.dict_user_paths[study][file] 

1242 list_text.append(colored(message, "red")) 

1243 list_errors.append(colored(f"> {path}", "red")) 

1244 

1245 # Printing 

1246 if len(list_text) == 1: 

1247 print(colored(f"None.", "blue")) 

1248 else: 

1249 print(*list_text) 

1250 

1251 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]: 

1252 print() 

1253 print(colored(f"(X) Please set inputs :", "red")) 

1254 for error in list_errors: 

1255 print(error) 

1256 sys.exit(1) 

1257 

1258 if type_error: 

1259 print() 

1260 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1261 print(colored(f"> {str(Path.cwd() / "inputs.json")}", "red")) 

1262 sys.exit(1) 

1263 

1264 # --------------- # 

1265 # Variable inputs # 

1266 # --------------- # 

1267 list_errors = [] 

1268 config = True 

1269 type_error = False 

1270 

1271 if (len(self.variable_params[study]) > 0) or \ 

1272 (len(self.variable_paths[study]) > 0): 

1273 

1274 # Check if datasets have been defined 

1275 if len(self.dict_variable_params[study].index) == 0: 

1276 print() 

1277 print(colored(f"(X) Please define at least one dataset in file :", "red")) 

1278 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1279 sys.exit(1) 

1280 

1281 for index in self.dict_variable_params[study].index: 

1282 

1283 list_text = [colored(f"> {index} :", "blue")] 

1284 

1285 # Variable parameters 

1286 for message in self.variable_params_messages[study][index]: 

1287 if "(V)" in message: 

1288 list_text.append(colored(message, "green")) 

1289 elif "(X)" in message: 

1290 list_text.append(colored(message, "red")) 

1291 if config: 

1292 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1293 config = False 

1294 elif "(!)" in message: 

1295 list_text.append(colored(message, "yellow")) 

1296 type_error = True 

1297 

1298 # Variable paths 

1299 for i, message in enumerate(self.variable_paths_messages[study][index]): 

1300 if "(V)" in message: 

1301 list_text.append(colored(message, "green")) 

1302 elif "(X)" in message: 

1303 file = self.variable_paths[study][i] 

1304 path = self.dict_user_paths[study][file][index] 

1305 list_text.append(colored(message, "red")) 

1306 list_errors.append(colored(f"> {path}", "red")) 

1307 

1308 # Printing 

1309 print(*list_text) 

1310 

1311 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1) 

1312 if len(list_errors) > 0: 

1313 print() 

1314 print(colored(f"(X) Please set inputs :", "red")) 

1315 for error in list_errors: 

1316 print(error) 

1317 sys.exit(1) 

1318 

1319 if type_error: 

1320 print() 

1321 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red")) 

1322 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red")) 

1323 sys.exit(1) 

1324 

1325 # Go back to working directory 

1326 os.chdir(self.working_dir) 

1327 

1328 def init_paths(self): 

1329 """Initialize dictionary containing all paths""" 

1330 

1331 # Loop over studies 

1332 for study in self.studies: 

1333 

1334 # Define study directory 

1335 study_dir:Path = self.working_dir / study 

1336 

1337 try: 

1338 with open(study_dir / ".paths.json") as f: 

1339 dict_paths = json.load(f) 

1340 except: 

1341 dict_paths = {} 

1342 for path in self.output_paths: 

1343 dict_paths[path] = None 

1344 

1345 # Purge old datasets 

1346 for key, value in dict_paths.items(): 

1347 if isinstance(value, dict): 

1348 # List of datasets to delete 

1349 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]] 

1350 for dataset in to_delete: 

1351 del dict_paths[key][dataset] 

1352 

1353 self.dict_paths[study] = dict_paths 

1354 

1355 def update_analysis(self): 

1356 

1357 # Loop over studies 

1358 for study in self.studies: 

1359 

1360 # Define study directory 

1361 study_dir:Path = self.working_dir / study 

1362 

1363 # Define analysis file 

1364 analysis_file = study_dir / "analysis.json" 

1365 

1366 # Initialize analysis file 

1367 if os.path.exists(analysis_file): 

1368 with open(analysis_file) as f: 

1369 self.dict_analysis[study] = json.load(f) 

1370 else: 

1371 self.dict_analysis[study] = {} 

1372 

1373 # Browse all outputs 

1374 for out, value in self.dict_paths[study].items(): 

1375 

1376 if out in self.analysis_settings: 

1377 dict_out = self.analysis_settings[out] 

1378 else: 

1379 dict_out = {} 

1380 

1381 if out not in self.dict_analysis[study]: 

1382 self.dict_analysis[study][out] = {} 

1383 if isinstance(value, dict): 

1384 for case in value: 

1385 self.dict_analysis[study][out][case] = dict_out 

1386 

1387 else: 

1388 if isinstance(value, dict): 

1389 for case in value: 

1390 if case not in self.dict_analysis[study][out]: 

1391 self.dict_analysis[study][out][case] = dict_out 

1392 

1393 cases_to_delete = [] 

1394 for case in self.dict_analysis[study][out]: 

1395 if case not in value: 

1396 cases_to_delete.append(case) 

1397 

1398 for case in cases_to_delete: 

1399 if case in self.dict_analysis[study][out]: 

1400 del self.dict_analysis[study][out][case] 

1401 

1402 with open(analysis_file, "w") as f: 

1403 json.dump(self.dict_analysis[study], f, indent=4) 

1404 

1405 def clean_outputs(self): 

1406 """Clean outputs.""" 

1407 

1408 # Function to remove output path, either file or directory 

1409 def _remove_output(output: str): 

1410 output_path = Path(output) 

1411 if output_path.exists(): 

1412 if output_path.is_dir(): 

1413 shutil.rmtree(output) 

1414 else: 

1415 output_path.unlink() 

1416 

1417 # Loop over studies 

1418 for study, study_dict in self.dict_studies.items(): 

1419 

1420 # Delete specified outputs 

1421 for key, value in study_dict["clean_outputs"].items(): 

1422 if value: 

1423 if isinstance(self.dict_paths[study][key], str): 

1424 _remove_output(self.dict_paths[study][key]) 

1425 if isinstance(self.dict_paths[study][key], dict): 

1426 for _, value in self.dict_paths[study][key].items(): 

1427 _remove_output(value) 

1428 

1429 def purge_output_datasets(self, 

1430 study: str, 

1431 ): 

1432 """Purge output datasets for a specific study""" 

1433 

1434 datasets_paths = [f for f in Path.cwd().iterdir()] 

1435 for path in datasets_paths: 

1436 resolved_path = path.resolve().name 

1437 if resolved_path not in self.dict_datasets[study]: 

1438 shutil.rmtree(path) 

1439 

1440 def update_workflow_diagram(self, 

1441 process: Process, 

1442 ): 

1443 """Update workflow diagram for specific process""" 

1444 

1445 self.diagram[process.name] = { 

1446 "params": list(process.params.values()), 

1447 "allparams": process.allparams, 

1448 "paths": list(process.paths.values()), 

1449 "allpaths": process.allpaths, 

1450 "required_paths": list(process.required_paths.values()), 

1451 "output_paths": list(process.output_paths.values()), 

1452 } 

1453 

1454 def __call__(self): 

1455 """Launch workflow of processes.""" 

1456 

1457 # --------------- # 

1458 # Launch workflow # 

1459 # --------------- # 

1460 print() 

1461 print( 

1462 colored("> RUNNING <", "blue", attrs=["reverse"]), 

1463 ) 

1464 

1465 for study, dict_study in self.dict_studies.items(): 

1466 

1467 # Check if study must be executed 

1468 if not dict_study["execute"]: 

1469 

1470 # Printing 

1471 print() 

1472 print( 

1473 colored(f"| {study} |", "magenta"), 

1474 ) 

1475 print() 

1476 print(colored("(!) Study is skipped.", "yellow")) 

1477 

1478 continue 

1479 

1480 study_dir:Path = self.working_dir / study 

1481 os.chdir(study_dir) 

1482 

1483 for step, proc in enumerate(self.processes): 

1484 

1485 # Update analysis 

1486 self.update_analysis() 

1487 

1488 if "hard_params" in proc: dict_hard_params = proc["hard_params"] 

1489 else: dict_hard_params = {} 

1490 if "user_params" in proc: user_params = proc["user_params"] 

1491 else: user_params = {} 

1492 if "user_paths" in proc: user_paths = proc["user_paths"] 

1493 else: user_paths = {} 

1494 if "required_paths" in proc: required_paths = proc["required_paths"] 

1495 else: required_paths = {} 

1496 if "output_paths" in proc: output_paths = proc["output_paths"] 

1497 else: output_paths = {} 

1498 if "overall_analysis" in proc: overall_analysis = proc["overall_analysis"] 

1499 else: overall_analysis = {} 

1500 

1501 # Define class object for the current process 

1502 process = proc["process"] 

1503 this_process:Process = process( 

1504 study=study, 

1505 df_user_params=self.dict_variable_params[study], 

1506 dict_user_params=self.dict_fixed_params[study], 

1507 dict_user_paths=self.dict_user_paths[study], 

1508 dict_paths=self.dict_paths[study], 

1509 params=user_params, 

1510 paths=user_paths, 

1511 dict_hard_params=dict_hard_params, 

1512 fixed_params=self.fixed_params[study], 

1513 variable_params=self.variable_params[study], 

1514 fixed_paths=self.fixed_paths[study], 

1515 variable_paths=self.variable_paths[study], 

1516 required_paths=required_paths, 

1517 output_paths=output_paths, 

1518 overall_analysis=overall_analysis, 

1519 dict_analysis=self.dict_analysis[study], 

1520 verbose=self.dict_process[study][self.list_processes[step]]["verbose"], 

1521 diagram=self.diagram, 

1522 ) 

1523 

1524 # Define process name 

1525 this_process.name = this_process.__class__.__name__ 

1526 

1527 # Define working folder associated to the current process 

1528 folder_name = f"{step+1}_{this_process.name}" 

1529 folder_path:Path = study_dir / folder_name 

1530 folder_path.mkdir(exist_ok=True, parents=True) 

1531 os.chdir(folder_path) 

1532 

1533 # Initialize process 

1534 this_process.initialize() 

1535 

1536 # Check if process must be executed 

1537 if not self.dict_process[study][self.list_processes[step]]["execute"]: 

1538 

1539 # Printing 

1540 print() 

1541 print( 

1542 colored(f"| {study} | {this_process.name} |", "magenta"), 

1543 ) 

1544 print() 

1545 print(colored("(!) Process is skipped.", "yellow")) 

1546 

1547 # Update workflow diagram 

1548 self.update_workflow_diagram(this_process) 

1549 

1550 continue 

1551 

1552 if this_process.is_case: 

1553 

1554 # Define sub-folders associated to each ID of the inputs dataframe 

1555 for idx in this_process.df_params.index: 

1556 

1557 # Printing 

1558 print() 

1559 print( 

1560 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"), 

1561 ) 

1562 

1563 # Check if dataset must be executed 

1564 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0: 

1565 

1566 # Printing 

1567 print() 

1568 print(colored("(!) Dataset is skipped.", "yellow")) 

1569 

1570 # Go back to working folder 

1571 os.chdir(folder_path) 

1572 

1573 # Purge old output datasets 

1574 self.purge_output_datasets(study) 

1575 

1576 # Update workflow diagram 

1577 self.update_workflow_diagram(this_process) 

1578 

1579 continue 

1580 

1581 # Update process index 

1582 this_process.index = idx 

1583 

1584 subfolder_path = study_dir / folder_name / str(idx) 

1585 subfolder_path.mkdir(exist_ok=True, parents=True) 

1586 os.chdir(subfolder_path) 

1587 

1588 # Launch process 

1589 this_process() 

1590 this_process.finalize() 

1591 

1592 # Go back to working folder 

1593 os.chdir(folder_path) 

1594 

1595 # Purge old output datasets 

1596 self.purge_output_datasets(study) 

1597 

1598 else: 

1599 

1600 # Printing 

1601 print() 

1602 print( 

1603 colored(f"| {study} | {this_process.name} |", "magenta"), 

1604 ) 

1605 

1606 # Launch process 

1607 this_process() 

1608 this_process.finalize() 

1609 

1610 # Update workflow diagram 

1611 self.update_workflow_diagram(this_process) 

1612 

1613 # Update paths dictonary 

1614 self.dict_paths[study] = this_process.dict_paths 

1615 

1616 # Write paths json file 

1617 with open(study_dir / ".paths.json", "w") as f: 

1618 json.dump(self.dict_paths[study], f, indent=4) 

1619 

1620 # Go back to study directory 

1621 os.chdir(study_dir) 

1622 

1623 # Write diagram json file 

1624 with open(".diagram.json", "w") as f: 

1625 json.dump(self.diagram, f, indent=4) 

1626 

1627 # Go back to working directory 

1628 os.chdir(self.working_dir) 

1629 

1630 # Delete unecessary outputs 

1631 self.clean_outputs()