Coverage for nuremics\core\workflow.py: 81%
896 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 16:54 +0200
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 16:54 +0200
1from __future__ import annotations
3import os
4import sys
5from tkinter import filedialog
6from tkinter import *
8import json
9import shutil
10import numpy as np
11import pandas as pd
12from pathlib import Path
13from termcolor import colored
15from .process import Process
16from .utils import (
17 get_self_method_calls,
18 only_function_calls,
19 extract_inputs_and_types,
20 extract_analysis,
21 extract_self_output_keys,
22)
23from importlib.resources import files
26class WorkFlow:
27 """Manage workflow of processes."""
29 def __init__(
30 self,
31 app_name: str,
32 working_dir: Path,
33 processes: list,
34 studies: list = ["Default"],
35 verbose: bool = True,
36 ):
37 """Initialization."""
39 # -------------------- #
40 # Initialize variables #
41 # -------------------- #
42 self.app_name = app_name
43 self.studies = studies
44 self.processes = processes
45 self.list_processes = []
46 self.dict_inputs = {}
47 self.dict_datasets = {}
48 self.dict_studies = {}
49 self.dict_process = {}
50 self.dict_analysis = {}
51 self.user_params = []
52 self.user_paths = []
53 self.output_paths = []
54 self.overall_analysis = []
55 self.analysis_settings = {}
56 self.params_type = {}
57 self.operations_by_process = {}
58 self.inputs_by_process = {}
59 self.params_by_process = {}
60 self.paths_by_process = {}
61 self.outputs_by_process = {}
62 self.analysis_by_process = {}
63 self.settings_by_process = {}
64 self.params_plug = {}
65 self.paths_plug = {}
66 self.outputs_plug = {}
67 self.analysis_plug = {}
68 self.studies_modif = {}
69 self.studies_messages = {}
70 self.studies_config = {}
71 self.fixed_params_messages = {}
72 self.fixed_params_config = {}
73 self.fixed_paths_messages = {}
74 self.fixed_paths_config = {}
75 self.variable_params_messages = {}
76 self.variable_params_config = {}
77 self.variable_paths_messages = {}
78 self.variable_paths_config = {}
79 self.fixed_params = {}
80 self.fixed_paths = {}
81 self.variable_params = {}
82 self.variable_paths = {}
83 self.dict_fixed_params = {}
84 self.dict_variable_params = {}
85 self.dict_user_paths = {}
86 self.dict_paths = {}
87 self.diagram = {}
88 self.verbose = verbose
90 # ------------------------ #
91 # Define working directory #
92 # ------------------------ #
93 if working_dir is None:
94 root = Tk()
95 root.withdraw()
96 self.working_dir = Path(filedialog.askdirectory()) / self.app_name
97 else:
98 self.working_dir = working_dir / self.app_name
100 # ------------------------ #
101 # Create working directory #
102 # ------------------------ #
103 self.working_dir.mkdir(
104 exist_ok=True,
105 parents=True,
106 )
108 # ----------------------- #
109 # Go to working directory #
110 # ----------------------- #
111 os.chdir(self.working_dir)
113 # ------------------------ #
114 # Define list of processes #
115 # ------------------------ #
116 for proc in self.processes:
117 self.list_processes.append(proc["process"].__name__)
119 def print_logo(self):
120 """Print ASCII NUREMICS logo"""
122 ascii_logo_path:str = files("nuremics.resources").joinpath("logo.txt")
123 f = open(ascii_logo_path, "r")
124 for line in f:
125 lines = f.readlines()
126 print()
127 for line in lines:
128 print(colored(line.rstrip(), "yellow"))
130 def print_application(self):
131 """Print application"""
133 # Printing
134 print()
135 print(
136 colored("> APPLICATION <", "blue", attrs=["reverse"]),
137 )
138 print()
139 print(
140 colored(f"| Workflow |", "magenta"),
141 )
142 print(
143 colored(f"{self.app_name}_____", "blue"),
144 )
146 # Define number of spaces taken by the workflow print
147 nb_spaces_app = len(self.app_name)+5
149 # Print diagram of processes and operations
150 error = False
151 for i, proc in enumerate(self.processes):
153 proc_name = proc["process"].__name__
154 process = proc["process"]
155 this_process:Process = process()
157 # Define number of spaces taken by the application print
158 nb_spaces_proc = len(proc_name)+10
160 # Get list of operations for current process
161 self.operations_by_process[proc_name] = get_self_method_calls(this_process.__class__)
163 # Test if process call contains only call to operations
164 valid_call = only_function_calls(
165 method=this_process.__call__,
166 allowed_methods=self.operations_by_process[proc_name]
167 )
169 # Printing
170 if valid_call:
171 print(
172 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue"),
173 )
174 for op_name in self.operations_by_process[proc_name]:
176 if i < len(self.processes)-1:
177 text = " "*nb_spaces_app+"|"+" "*nb_spaces_proc+f"|_____{op_name}"
178 else:
179 text = " "*(nb_spaces_app+1)+" "*nb_spaces_proc+f"|_____{op_name}"
181 # Printing
182 print(
183 colored(text, "blue"),
184 )
185 else:
186 print(
187 colored(" "*nb_spaces_app+f"|_____{proc_name}_____", "blue") + \
188 colored("(X)", "red")
189 )
190 error = True
192 if i < len(self.processes)-1:
193 print(
194 colored(" "*nb_spaces_app+"|", "blue"),
195 )
197 if error:
198 print()
199 print(colored(f"(X) Each process must only call its internal function(s):", "red"))
200 print()
201 print(colored(f" def __call__(self):", "red"))
202 print(colored(f" super().__call__()", "red"))
203 print()
204 print(colored(f" self.operation1()", "red"))
205 print(colored(f" self.operation2()", "red"))
206 print(colored(f" self.operation3()", "red"))
207 print(colored(f" ...", "red"))
208 sys.exit(1)
210 def get_inputs(self):
211 """Get inputs"""
213 for proc in self.processes:
215 process = proc["process"]
216 name = proc["process"].__name__
217 this_process:Process = process()
219 self.inputs_by_process[name] = extract_inputs_and_types(this_process)
220 self.analysis_by_process[name], self.settings_by_process[name] = extract_analysis(this_process)
222 self.params_by_process[name] = {}
223 self.paths_by_process[name] = []
224 self.params_plug[name] = {}
225 self.paths_plug[name] = {}
226 self.analysis_plug[name] = {}
228 for key, value_type in self.inputs_by_process[name].items():
230 # Get the module and type name
231 module_name = value_type.__module__
232 type_name = value_type.__name__
234 if module_name == "builtins":
235 type = type_name
236 else:
237 type = f"{module_name}.{type_name}"
239 if key not in self.analysis_by_process[name]:
241 if type == "pathlib.Path":
242 self.paths_by_process[name].append(key)
243 if ("user_paths" in proc) and (key in proc["user_paths"]):
244 self.paths_plug[name][key] = [proc["user_paths"][key], "user_paths"]
245 elif ("required_paths" in proc) and (key in proc["required_paths"]):
246 self.paths_plug[name][key] = [proc["required_paths"][key], "required_paths"]
247 else:
248 self.paths_plug[name][key] = None
250 else:
251 self.params_by_process[name][key] = [value_type, type]
252 if ("user_params" in proc) and (key in proc["user_params"]):
253 self.params_plug[name][key] = [proc["user_params"][key], "user_params"]
254 elif ("hard_params" in proc) and (key in proc["hard_params"]):
255 self.params_plug[name][key] = [proc["hard_params"][key], "hard_params"]
256 else:
257 self.params_plug[name][key] = None
259 else:
260 if ("overall_analysis" in proc) and (key in proc["overall_analysis"]):
261 self.analysis_plug[name][key] = proc["overall_analysis"][key]
262 else:
263 self.analysis_plug[name][key] = None
265 def get_outputs(self):
266 """Get outputs"""
268 for proc in self.processes:
270 process = proc["process"]
271 name = proc["process"].__name__
272 this_process:Process = process()
274 self.outputs_by_process[name] = []
275 self.outputs_plug[name] = {}
277 for op in self.operations_by_process[name]:
278 output_paths = extract_self_output_keys(getattr(this_process, op))
279 for output_path in output_paths:
280 if output_path not in self.outputs_by_process[name]:
281 self.outputs_by_process[name].append(output_path)
283 for output in self.outputs_by_process[name]:
284 if ("output_paths" in proc) and (output in proc["output_paths"]):
285 self.outputs_plug[name][output] = proc["output_paths"][output]
286 else:
287 self.outputs_plug[name][output] = None
289 def init_config(self):
290 """Initialize configuration"""
292 for _, process in enumerate(self.processes):
294 name = process["process"].__name__
296 # Define list of user parameters
297 if "user_params" in process:
298 for key, value in process["user_params"].items():
299 if key in self.params_by_process[name]:
300 self.user_params.append(value)
301 else:
302 print()
303 print(colored(f'(X) {key} defined in "user_params" is not an input parameter of {name}.', "red"))
304 sys.exit(1)
306 # Check on hard parameters
307 if "hard_params" in process:
308 for key, _ in process["hard_params"].items():
309 if key not in self.params_by_process[name]:
310 print()
311 print(colored(f'(X) {key} defined in "hard_params" is not an input parameter of {name}.', "red"))
312 sys.exit(1)
314 # Define list of user paths
315 if "user_paths" in process:
316 for key, value in process["user_paths"].items():
317 if key in self.paths_by_process[name]:
318 self.user_paths.append(value)
319 else:
320 print()
321 print(colored(f"(X) {key} is not an input path of {name}.", "red"))
322 sys.exit(1)
324 # Check on required paths
325 if "required_paths" in process:
326 for _, value in process["required_paths"].items():
327 if value not in self.output_paths:
328 print()
329 print(colored(f'(X) {value} defined in {name} "required_paths" must be defined in previous process "output_paths".', "red"))
330 sys.exit(1)
332 # Define list of output paths
333 if "output_paths" in process:
334 for key, value in process["output_paths"].items():
335 if key in self.outputs_by_process[name]:
336 if value in self.output_paths:
337 print()
338 print(colored(f'(X) {value} is defined twice in "output_paths".', "red"))
339 sys.exit(1)
340 else:
341 self.output_paths.append(value)
342 else:
343 print()
344 print(colored(f"(X) {key} is not an output path of {name}.", "red"))
345 sys.exit(1)
347 # Define list of outputs for analysis
348 if "overall_analysis" in process:
349 for key, value in process["overall_analysis"].items():
350 if key in self.analysis_by_process[name]:
351 self.overall_analysis.append(value)
352 else:
353 print()
354 print(colored(f"(X) {key} is not an output analysis of {name}.", "red"))
355 sys.exit(1)
357 if value not in self.output_paths:
358 print()
359 print(colored(f'(X) {value} defined in {name} "overall_analysis" must be defined in previous process "output_paths".', "red"))
360 sys.exit(1)
362 # Delete duplicates
363 self.user_params = list(dict.fromkeys(self.user_params))
364 self.user_paths = list(dict.fromkeys(self.user_paths))
365 self.overall_analysis = list(dict.fromkeys(self.overall_analysis))
367 # Define analysis settings
368 for output in self.overall_analysis:
369 self.analysis_settings[output] = {}
371 for proc, settings in self.settings_by_process.items():
372 if settings:
373 for out, setting in settings.items():
374 output = self.analysis_plug[proc][out]
375 self.analysis_settings[output].update(setting)
377 def print_processes(self):
378 """Print processes"""
380 for proc in self.processes:
382 name = proc["process"].__name__
384 # Printing
385 print()
386 print(
387 colored(f"| {name} |", "magenta"),
388 )
390 # ---------------- #
391 # Input parameters #
392 # ---------------- #
393 print(
394 colored(f"> Input Parameter(s) :", "blue"),
395 )
396 if len(self.params_by_process[name]) == 0:
397 print(
398 colored("None.", "blue"),
399 )
400 else:
401 lines_proc = []
402 lines_user = []
403 error = False
404 for key, value in self.params_by_process[name].items():
406 # Process
407 text_type_proc = f"({value[1]})"
408 text_variable_proc = key
409 lines_proc.append((text_type_proc, text_variable_proc))
411 # User
412 if self.params_plug[name][key] is not None:
413 text_variable_user = str(self.params_plug[name][key][0])
414 text_definition_user = f"({self.params_plug[name][key][1]})"
415 lines_user.append((text_variable_user, text_definition_user))
416 else:
417 lines_user.append(("Not defined", "(X)"))
418 error = True
420 type_proc_width = max(len(t) for t, _ in lines_proc)+1
421 variable_proc_width = max(len(p) for _, p in lines_proc)+1
422 variable_user_width = max(len(t) for t, _ in lines_user)+1
423 definition_user_width = max(len(p) for _, p in lines_user)+1
425 for (type_proc, var_proc), (user_var, user_def) in zip(lines_proc, lines_user):
426 proc_str = type_proc.ljust(type_proc_width)+var_proc.ljust(variable_proc_width)+"-----|"
427 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
428 if "(X)" in user_str: color = "red"
429 else: color = "green"
430 print(colored(proc_str, "blue")+colored(user_str, color))
432 if error:
433 print()
434 print(colored('(X) Please define all input parameters either in "user_params" or "hard_params".', "red"))
435 sys.exit(1)
437 # ----------- #
438 # Input paths #
439 # ----------- #
440 print(
441 colored(f"> Input Path(s) :", "blue"),
442 )
443 if len(self.paths_by_process[name]) == 0:
444 print(
445 colored("None.", "blue"),
446 )
447 else:
448 lines_proc = []
449 lines_user = []
450 error = False
451 for path in self.paths_by_process[name]:
453 # Process
454 lines_proc.append(path)
456 # User
457 if self.paths_plug[name][path] is not None:
458 text_variable_user = self.paths_plug[name][path][0]
459 text_definition_user = f"({self.paths_plug[name][path][1]})"
460 lines_user.append((text_variable_user, text_definition_user))
461 else:
462 lines_user.append(("Not defined", "(X)"))
463 error = True
465 proc_width = max(len(t) for t in lines_proc)+1
466 variable_user_width = max(len(t) for t, _ in lines_user)+1
467 definition_user_width = max(len(p) for _, p in lines_user)+1
469 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
470 proc_str = proc.ljust(proc_width)+"-----|"
471 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
472 if "(X)" in user_str: color = "red"
473 else: color = "green"
474 print(colored(proc_str, "blue")+colored(user_str, color))
476 if error:
477 print()
478 print(colored('(X) Please define all input paths either in "user_paths" or "required_paths".', "red"))
479 sys.exit(1)
481 # ---------------- #
482 # Input analysis #
483 # ---------------- #
484 print(
485 colored(f"> Input Analysis :", "blue"),
486 )
487 if len(self.analysis_by_process[name]) == 0:
488 print(
489 colored("None.", "blue"),
490 )
491 else:
492 lines_proc = []
493 lines_user = []
494 error = False
495 for out in self.analysis_by_process[name]:
497 # Process
498 lines_proc.append(out)
500 # User
501 if self.analysis_plug[name][out] is not None:
502 text_variable_user = self.analysis_plug[name][out]
503 text_definition_user = "(overall_analysis)"
504 lines_user.append((text_variable_user, text_definition_user))
505 else:
506 lines_user.append(("Not defined", "(X)"))
507 error = True
509 proc_width = max(len(t) for t in lines_proc)+1
510 variable_user_width = max(len(t) for t, _ in lines_user)+1
511 definition_user_width = max(len(p) for _, p in lines_user)+1
513 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
514 proc_str = proc.ljust(proc_width)+"-----|"
515 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
516 if "(X)" in user_str: color = "red"
517 else: color = "green"
518 print(colored(proc_str, "blue")+colored(user_str, color))
520 if error:
521 print()
522 print(colored('(X) Please define all output analysis in "overall_analysis".', "red"))
523 sys.exit(1)
525 # ------------ #
526 # Output paths #
527 # ------------ #
528 print(
529 colored(f"> Output Path(s) :", "blue"),
530 )
531 if len(self.outputs_by_process[name]) == 0:
532 print(
533 colored("None.", "blue"),
534 )
535 else:
536 lines_proc = []
537 lines_user = []
538 error = False
539 for path in self.outputs_by_process[name]:
541 # Process
542 lines_proc.append(path)
544 # User
545 if self.outputs_plug[name][path] is not None:
546 text_variable_user = self.outputs_plug[name][path]
547 text_definition_user = "(output_paths)"
548 lines_user.append((text_variable_user, text_definition_user))
549 else:
550 lines_user.append(("Not defined", "(X)"))
551 error = True
553 proc_width = max(len(t) for t in lines_proc)+1
554 variable_user_width = max(len(t) for t, _ in lines_user)+1
555 definition_user_width = max(len(p) for _, p in lines_user)+1
557 for (proc), (user_var, user_def) in zip(lines_proc, lines_user):
558 proc_str = proc.ljust(proc_width)+"-----|"
559 user_str = "|----- "+user_var.ljust(variable_user_width)+user_def.ljust(definition_user_width)
560 if "(X)" in user_str: color = "red"
561 else: color = "green"
562 print(colored(proc_str, "blue")+colored(user_str, color))
564 if error:
565 print()
566 print(colored('(X) Please define all output paths in "output_paths".', "red"))
567 sys.exit(1)
569 def set_user_params_types(self):
570 """Set types of user parameters"""
572 # Gather all types of parameters
573 for proc, params in self.params_by_process.items():
574 for param, type in params.items():
575 user_param = self.params_plug[proc][param][0]
576 if user_param in self.user_params:
577 if (user_param in self.params_type) and (self.params_type[user_param][0] != type[0]):
578 print()
579 print(colored(f"(X) {user_param} is defined both as ({self.params_type[user_param][1]}) and ({type[1]}) :", "red"))
580 print(colored(f'> Please consider defining a new user parameter in "user_params".', "red"))
581 sys.exit(1)
582 self.params_type[user_param] = type
584 def print_io(self):
585 """Print inputs / outputs"""
587 # Printing
588 print()
589 print(
590 colored("> INPUTS <", "blue", attrs=["reverse"]),
591 )
593 # Print input parameters
594 print()
595 print(
596 colored(f"| User Parameters |", "magenta"),
597 )
598 for param, type in self.params_type.items():
599 print(
600 colored(f"> {param} ({type[1]})", "blue"),
601 )
602 if len(list(self.params_type.items())) == 0:
603 print(
604 colored("None.", "blue"),
605 )
607 # Print input paths
608 print()
609 print(
610 colored(f"| User Paths |", "magenta"),
611 )
612 for path in self.user_paths:
613 print(
614 colored(f"> {path}", "blue"),
615 )
616 if len(self.user_paths) == 0:
617 print(
618 colored("None.", "blue"),
619 )
621 # Printing
622 print()
623 print(
624 colored("> OUTPUTS <", "blue", attrs=["reverse"]),
625 )
626 print()
627 for path in self.output_paths:
628 print(
629 colored(f"> {path}", "blue"),
630 )
631 if len(self.output_paths) == 0:
632 print(
633 colored("None.", "blue"),
634 )
636 def init_studies(self):
637 """Initialize studies"""
639 # Open studies json file if existing
640 if os.path.exists("studies.json"):
641 with open("studies.json") as f:
642 self.dict_studies = json.load(f)
644 # Clean studies
645 for study in list(self.dict_studies.keys()):
646 if study not in self.studies:
647 del self.dict_studies[study]
649 # Clean input parameters
650 for study in list(self.dict_studies.keys()):
651 for param in list(self.dict_studies[study]["user_params"]):
652 if param not in self.user_params:
653 del self.dict_studies[study]["user_params"][param]
655 # Clean input paths
656 for study in list(self.dict_studies.keys()):
657 for path in list(self.dict_studies[study]["user_paths"]):
658 if path not in self.user_paths:
659 del self.dict_studies[study]["user_paths"][path]
661 # Clean output paths
662 for study in list(self.dict_studies.keys()):
663 for path in list(self.dict_studies[study]["clean_outputs"]):
664 if path not in self.output_paths:
665 del self.dict_studies[study]["clean_outputs"][path]
667 # Initialize input parameters/paths
668 for study in self.studies:
670 if study not in self.dict_studies:
671 self.dict_studies[study] = {
672 "execute": True,
673 "user_params": {},
674 "user_paths": {},
675 "clean_outputs": {},
676 }
678 for param in self.user_params:
679 if param not in self.dict_studies[study]["user_params"]:
680 if study == "Default":
681 self.dict_studies[study]["user_params"][param] = False
682 else:
683 self.dict_studies[study]["user_params"][param] = None
685 for file in self.user_paths:
686 if file not in self.dict_studies[study]["user_paths"]:
687 if study == "Default":
688 self.dict_studies[study]["user_paths"][file] = False
689 else:
690 self.dict_studies[study]["user_paths"][file] = None
692 for path in self.output_paths:
693 if path not in self.dict_studies[study]["clean_outputs"]:
694 self.dict_studies[study]["clean_outputs"][path] = False
696 # Reordering
697 self.dict_studies[study]["user_params"] = {k: self.dict_studies[study]["user_params"][k] for k in self.user_params}
698 self.dict_studies[study]["user_paths"] = {k: self.dict_studies[study]["user_paths"][k] for k in self.user_paths}
700 # Write studies json file
701 with open("studies.json", "w") as f:
702 json.dump(self.dict_studies, f, indent=4)
704 def test_studies_modification(self):
705 """Test if studies configurations have been modified"""
707 # Loop over studies
708 for study in self.studies:
710 self.studies_modif[study] = False
712 study_file = Path(study) / ".study.json"
713 if study_file.exists():
714 with open(study_file) as f:
715 dict_study = json.load(f)
716 if (self.dict_studies[study]["user_params"] != dict_study["user_params"]) or \
717 (self.dict_studies[study]["user_paths"] != dict_study["user_paths"]):
718 self.studies_modif[study] = True
720 def test_studies_settings(self):
721 """Check if studies has been properly configured"""
723 # Loop over studies
724 for study in self.studies:
726 self.studies_messages[study] = []
727 self.studies_config[study] = True
729 for param in self.user_params:
730 if self.dict_studies[study]["user_params"][param] is None:
731 self.studies_messages[study].append(f"(X) {param} not configured.")
732 self.studies_config[study] = False
733 else:
734 if self.dict_studies[study]["user_params"][param]: text = "variable"
735 else: text = "fixed"
736 self.studies_messages[study].append(f"(V) {param} is {text}.")
738 for file in self.user_paths:
739 if self.dict_studies[study]["user_paths"][file] is None:
740 self.studies_messages[study].append(f"(X) {file} not configured.")
741 self.studies_config[study] = False
742 else:
743 if self.dict_studies[study]["user_paths"][file]: text = "variable"
744 else: text = "fixed"
745 self.studies_messages[study].append(f"(V) {file} is {text}.")
747 def print_studies(self):
748 """Print studies"""
750 print()
751 print(
752 colored("> STUDIES <", "blue", attrs=["reverse"]),
753 )
754 for study in self.studies:
756 # Printing
757 print()
758 print(
759 colored(f"| {study} |", "magenta"),
760 )
761 if self.studies_modif[study]:
762 print(
763 colored(f"(!) Configuration has been modified.", "yellow"),
764 )
765 self.clean_output_tree(study)
767 # Delete analysis file
768 path = Path(study) / "analysis.json"
769 if path.exists(): path.unlink()
771 for message in self.studies_messages[study]:
772 if "(V)" in message: print(colored(message, "green"))
773 elif "(X)" in message: print(colored(message, "red"))
775 if not self.studies_config[study]:
776 print()
777 print(colored(f"(X) Please configure file :", "red"))
778 print(colored(f"> {str(Path.cwd() / "studies.json")}", "red"))
779 sys.exit(1)
781 def init_process_settings(self):
782 """Initialize process settings"""
784 # Loop over studies
785 for study in self.studies:
787 # Open process json file if existing
788 process_file = Path(study) / "process.json"
789 if os.path.exists(process_file):
790 with open(process_file) as f:
791 self.dict_process[study] = json.load(f)
792 else:
793 self.dict_process[study] = {}
795 # Clean processes
796 for process in list(self.dict_process[study].keys()):
797 if process not in self.list_processes:
798 del self.dict_process[study][process]
800 # Loop over processes
801 for process in self.list_processes:
802 if process not in self.dict_process[study]:
803 self.dict_process[study][process] = {
804 "execute": True,
805 "verbose": self.verbose,
806 }
808 # Reordering
809 self.dict_process[study] = {k: self.dict_process[study][k] for k in self.list_processes}
811 # Write studies json file
812 with open(process_file, "w") as f:
813 json.dump(self.dict_process[study], f, indent=4)
815 def configure_inputs(self):
816 """Configure inputs with lists of fixed/variable parameters/paths"""
818 for study in self.studies:
820 # Define list of fixed/variable parameters
821 fixed_params = []
822 variable_params = []
823 for key, value in self.dict_studies[study]["user_params"].items():
824 if value is True: variable_params.append(key)
825 else: fixed_params.append(key)
827 # Define list of fixed/variable paths
828 fixed_paths = []
829 variable_paths = []
830 for key, value in self.dict_studies[study]["user_paths"].items():
831 if value is True: variable_paths.append(key)
832 else: fixed_paths.append(key)
834 self.fixed_params[study] = fixed_params
835 self.variable_params[study] = variable_params
836 self.fixed_paths[study] = fixed_paths
837 self.variable_paths[study] = variable_paths
839 def init_data_tree(self):
840 """Initialize data tree"""
842 # Loop over studies
843 for study in self.studies:
845 # Initialize study directory
846 study_dir:Path = self.working_dir / study
847 study_dir.mkdir(
848 exist_ok=True,
849 parents=True,
850 )
852 # Write study json file
853 with open(study_dir / ".study.json", "w") as f:
854 json.dump(self.dict_studies[study], f, indent=4)
856 # Initialize inputs csv
857 inputs_file:Path = study_dir / "inputs.csv"
858 if (len(self.variable_params[study]) > 0) or \
859 (len(self.variable_paths[study]) > 0):
861 if not inputs_file.exists():
863 # Create empty input dataframe
864 df_inputs = pd.DataFrame(columns=["ID"]+self.variable_params[study]+["EXECUTE"])
866 # Write input dataframe
867 df_inputs.to_csv(
868 path_or_buf=inputs_file,
869 index=False,
870 )
872 else:
874 # Read input dataframe
875 df_inputs = pd.read_csv(
876 filepath_or_buffer=inputs_file,
877 index_col=0,
878 )
880 # Update variable parameters
881 df_inputs = df_inputs.assign(**{param: np.nan for param in self.variable_params[study] if param not in df_inputs.columns})
882 df_inputs = df_inputs[[col for col in self.variable_params[study] if col in df_inputs.columns] + ["EXECUTE"]]
884 # Set default execution
885 df_inputs["EXECUTE"] = df_inputs["EXECUTE"].fillna(1).astype(int)
887 # Write input dataframe
888 df_inputs.to_csv(
889 path_or_buf=inputs_file,
890 )
892 # Define list of datasets
893 self.dict_datasets[study] = df_inputs.index.tolist()
895 else:
896 # Delete file
897 if inputs_file.exists(): inputs_file.unlink()
899 # Initialize inputs json file
900 inputs_file:Path = study_dir / "inputs.json"
901 if (len(self.fixed_params[study]) > 0) or \
902 (len(self.fixed_paths[study]) > 0) or \
903 (len(self.variable_paths[study]) > 0) :
905 # Create file
906 if not inputs_file.exists():
908 # Initialize dictionary
909 dict_inputs = {}
910 if len(self.fixed_params[study]) > 0:
911 for param in self.fixed_params[study]:
912 dict_inputs[param] = None
913 if len(self.fixed_paths[study]) > 0:
914 for path in self.fixed_paths[study]:
915 dict_inputs[path] = None
916 if len(self.variable_paths[study]) > 0:
917 for path in self.variable_paths[study]:
918 dict_inputs[path] = {}
919 for index in df_inputs.index:
920 dict_inputs[path][index] = None
922 # Write json
923 with open(inputs_file, "w") as f:
924 json.dump(dict_inputs, f, indent=4)
926 # Update file
927 else:
929 # Read inputs json
930 with open(inputs_file) as f:
931 dict_inputs = json.load(f)
933 # Update fixed parameters
934 dict_fixed_params = {k: dict_inputs.get(k, None) for k in self.fixed_params[study]}
936 # Update fixed paths
937 dict_fixed_paths = {}
938 for path in self.fixed_paths[study]:
939 value = dict_inputs.get(path, None)
940 if isinstance(value, dict):
941 dict_fixed_paths[path] = None
942 else:
943 dict_fixed_paths[path] = value
945 # Update variable paths
946 dict_variable_paths = {}
947 for path in self.variable_paths[study]:
948 existing_values = dict_inputs.get(path, {})
949 if not isinstance(existing_values, dict):
950 existing_values = {}
951 dict_variable_paths[path] = {
952 idx: existing_values.get(idx, None)
953 for idx in df_inputs.index
954 }
956 # Update inputs dictionnary
957 dict_inputs = {**dict_fixed_params, **dict_fixed_paths, **dict_variable_paths}
959 # Write inputs json
960 with open(inputs_file, "w") as f:
961 json.dump(dict_inputs, f, indent=4)
963 self.dict_inputs[study] = dict_inputs
965 else:
967 # Delete file
968 if inputs_file.exists(): inputs_file.unlink()
970 self.dict_inputs[study] = {}
972 # Initialize inputs directory
973 inputs_dir:Path = study_dir / "0_inputs"
974 if len(self.user_paths) > 0:
976 # Create inputs directory (if necessary)
977 inputs_dir.mkdir(
978 exist_ok=True,
979 parents=True,
980 )
982 # Delete fixed paths (if necessary)
983 input_paths = [f for f in inputs_dir.iterdir()]
984 for path in input_paths:
985 resolved_path = path.resolve().name
986 if (resolved_path not in self.fixed_paths[study]) and (resolved_path != "0_datasets"):
987 if Path(path).is_file(): path.unlink()
988 else: shutil.rmtree(path)
990 # Update inputs subfolders for variable paths
991 datasets_dir:Path = inputs_dir / "0_datasets"
992 if len(self.variable_paths[study]) > 0:
994 # Create datasets directory (if necessary)
995 datasets_dir.mkdir(
996 exist_ok=True,
997 parents=True,
998 )
1000 # Create subfolders (if necessary)
1001 for index in df_inputs.index:
1003 inputs_subfolder:Path = datasets_dir / index
1004 inputs_subfolder.mkdir(
1005 exist_ok=True,
1006 parents=True,
1007 )
1009 # Delete variable paths (if necessary)
1010 input_paths = [f for f in inputs_subfolder.iterdir()]
1011 for path in input_paths:
1012 resolved_path = path.resolve().name
1013 if resolved_path not in self.variable_paths[study]:
1014 if Path(path).is_file(): path.unlink()
1015 else: shutil.rmtree(path)
1017 # Delete subfolders (if necessary)
1018 inputs_subfolders = [f for f in datasets_dir.iterdir() if f.is_dir()]
1019 for folder in inputs_subfolders:
1020 id = os.path.split(folder)[-1]
1021 if id not in self.dict_datasets[study]:
1022 shutil.rmtree(folder)
1024 else:
1026 # Delete datasets folder (if necessary)
1027 if datasets_dir.exists(): shutil.rmtree(datasets_dir)
1029 else:
1030 # Delete inputs directory (if necessary)
1031 if inputs_dir.exists(): shutil.rmtree(inputs_dir)
1033 def clean_output_tree(self,
1034 study: str,
1035 ):
1036 """Clean output data for a specific study"""
1038 # Initialize study directory
1039 study_dir:Path = self.working_dir / study
1041 # Outputs data
1042 outputs_folders = [f for f in study_dir.iterdir() if f.is_dir()]
1043 for folder in outputs_folders:
1044 if os.path.split(folder)[-1] != "0_inputs":
1045 shutil.rmtree(folder)
1047 # Paths file
1048 paths_file = study_dir / ".paths.json"
1049 if paths_file.exists(): paths_file.unlink()
1051 def set_inputs(self):
1052 """Set all inputs"""
1054 # Loop over studies
1055 for study in self.studies:
1057 # Define study directory
1058 study_dir:Path = self.working_dir / study
1060 # Go to study directory
1061 os.chdir(study_dir)
1063 # Initialize dictionary of input paths
1064 self.dict_user_paths[study] = {}
1066 # Fixed parameters
1067 if len(self.fixed_params[study]) > 0:
1068 data = self.dict_inputs[study]
1069 self.dict_fixed_params[study] = {k: data[k] for k in self.fixed_params[study] if k in data}
1070 else:
1071 self.dict_fixed_params[study] = {}
1073 # Variable parameters
1074 if (len(self.variable_params[study]) > 0) or \
1075 (len(self.variable_paths[study]) > 0):
1077 # Read input dataframe
1078 self.dict_variable_params[study] = pd.read_csv(
1079 filepath_or_buffer="inputs.csv",
1080 index_col=0,
1081 )
1083 else:
1084 self.dict_variable_params[study] = pd.DataFrame()
1086 # Fixed paths
1087 dict_input_paths = {}
1088 for file in self.fixed_paths[study]:
1089 if self.dict_inputs[study][file] is not None:
1090 dict_input_paths[file] = self.dict_inputs[study][file]
1091 else:
1092 dict_input_paths[file] = str(Path(os.getcwd()) / "0_inputs" / file)
1094 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1096 # Variable paths
1097 if len(self.variable_paths[study]) > 0:
1099 dict_input_paths = {}
1100 df_inputs = pd.read_csv(
1101 filepath_or_buffer="inputs.csv",
1102 index_col=0,
1103 )
1104 for file in self.variable_paths[study]:
1105 dict_input_paths[file] = {}
1106 for idx in df_inputs.index:
1107 if self.dict_inputs[study][file][idx] is not None:
1108 dict_input_paths[file][idx] = self.dict_inputs[study][file][idx]
1109 else:
1110 dict_input_paths[file][idx] = str(Path(os.getcwd()) / "0_inputs" / "0_datasets" / idx / file)
1112 self.dict_user_paths[study] = {**self.dict_user_paths[study], **dict_input_paths}
1114 # Go back to working directory
1115 os.chdir(self.working_dir)
1117 def test_inputs_settings(self):
1118 """Test that inputs have been properly set"""
1120 # Loop over studies
1121 for study in self.studies:
1123 # Define study directory
1124 study_dir:Path = self.working_dir / study
1126 # Go to study directory
1127 os.chdir(study_dir)
1129 self.fixed_params_messages[study] = []
1130 self.fixed_paths_messages[study] = []
1131 self.fixed_params_config[study] = True
1132 self.fixed_paths_config[study] = True
1133 self.variable_params_messages[study] = {}
1134 self.variable_paths_messages[study] = {}
1135 self.variable_params_config[study] = {}
1136 self.variable_paths_config[study] = {}
1138 # Fixed parameters
1139 for param, value in self.dict_fixed_params[study].items():
1140 if value is None:
1141 self.fixed_params_messages[study].append(f"(X) {param}")
1142 self.fixed_params_config[study] = False
1143 else:
1144 if not isinstance(value, self.params_type[param][0]):
1145 self.fixed_params_messages[study].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1146 else:
1147 self.fixed_params_messages[study].append(f"(V) {param}")
1149 # Fixed paths
1150 for file in self.fixed_paths[study]:
1151 file_path:Path = Path(self.dict_user_paths[study][file])
1152 if not file_path.exists():
1153 self.fixed_paths_messages[study].append(f"(X) {file}")
1154 self.fixed_paths_config[study] = False
1155 else:
1156 self.fixed_paths_messages[study].append(f"(V) {file}")
1158 # Variable inputs
1159 if (len(self.variable_params[study]) > 0) or \
1160 (len(self.variable_paths[study]) > 0):
1162 for index in self.dict_variable_params[study].index:
1164 self.variable_params_messages[study][index] = []
1165 self.variable_paths_messages[study][index] = []
1166 self.variable_params_config[study][index] = True
1167 self.variable_paths_config[study][index] = True
1169 # Variable parameters
1170 for param in self.variable_params[study]:
1171 value = self.dict_variable_params[study].at[index, param]
1172 if pd.isna(value) or value == "":
1173 self.variable_params_messages[study][index].append(f"(X) {param}")
1174 self.variable_params_config[study][index] = False
1175 else:
1176 if isinstance(value, (np.integer, np.floating, np.bool_)):
1177 value = value.item()
1178 if not isinstance(value, self.params_type[param][0]):
1179 self.variable_params_messages[study][index].append(f"(!) {param} ({self.params_type[param][1]} expected)")
1180 else:
1181 self.variable_params_messages[study][index].append(f"(V) {param}")
1183 # Variable paths
1184 for file in self.variable_paths[study]:
1185 file_path:Path = Path(self.dict_user_paths[study][file][index])
1186 if not file_path.exists():
1187 self.variable_paths_messages[study][index].append(f"(X) {file}")
1188 self.variable_paths_config[study][index] = False
1189 else:
1190 self.variable_paths_messages[study][index].append(f"(V) {file}")
1192 # Go back to working directory
1193 os.chdir(self.working_dir)
1195 def print_inputs_settings(self):
1196 """Print inputs settings"""
1198 print()
1199 print(
1200 colored("> SETTINGS <", "blue", attrs=["reverse"]),
1201 )
1202 for study in self.studies:
1204 # Define study directory
1205 study_dir:Path = self.working_dir / study
1207 # Go to study directory
1208 os.chdir(study_dir)
1210 # Printing
1211 print()
1212 print(colored(f"| {study} |", "magenta"))
1214 # ------------ #
1215 # Fixed inputs #
1216 # ------------ #
1217 list_text = [colored(f"> Common :", "blue")]
1218 list_errors = []
1219 config = True
1220 type_error = False
1222 # Fixed parameters
1223 for message in self.fixed_params_messages[study]:
1224 if "(V)" in message:
1225 list_text.append(colored(message, "green"))
1226 elif "(X)" in message:
1227 list_text.append(colored(message, "red"))
1228 if config:
1229 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.json")}", "red"))
1230 config = False
1231 elif "(!)" in message:
1232 list_text.append(colored(message, "yellow"))
1233 type_error = True
1235 # Fixed paths
1236 for i, message in enumerate(self.fixed_paths_messages[study]):
1237 if "(V)" in message:
1238 list_text.append(colored(message, "green"))
1239 elif "(X)" in message:
1240 file = self.fixed_paths[study][i]
1241 path = self.dict_user_paths[study][file]
1242 list_text.append(colored(message, "red"))
1243 list_errors.append(colored(f"> {path}", "red"))
1245 # Printing
1246 if len(list_text) == 1:
1247 print(colored(f"None.", "blue"))
1248 else:
1249 print(*list_text)
1251 if not self.fixed_params_config[study] or not self.fixed_paths_config[study]:
1252 print()
1253 print(colored(f"(X) Please set inputs :", "red"))
1254 for error in list_errors:
1255 print(error)
1256 sys.exit(1)
1258 if type_error:
1259 print()
1260 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red"))
1261 print(colored(f"> {str(Path.cwd() / "inputs.json")}", "red"))
1262 sys.exit(1)
1264 # --------------- #
1265 # Variable inputs #
1266 # --------------- #
1267 list_errors = []
1268 config = True
1269 type_error = False
1271 if (len(self.variable_params[study]) > 0) or \
1272 (len(self.variable_paths[study]) > 0):
1274 # Check if datasets have been defined
1275 if len(self.dict_variable_params[study].index) == 0:
1276 print()
1277 print(colored(f"(X) Please define at least one dataset in file :", "red"))
1278 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1279 sys.exit(1)
1281 for index in self.dict_variable_params[study].index:
1283 list_text = [colored(f"> {index} :", "blue")]
1285 # Variable parameters
1286 for message in self.variable_params_messages[study][index]:
1287 if "(V)" in message:
1288 list_text.append(colored(message, "green"))
1289 elif "(X)" in message:
1290 list_text.append(colored(message, "red"))
1291 if config:
1292 list_errors.append(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1293 config = False
1294 elif "(!)" in message:
1295 list_text.append(colored(message, "yellow"))
1296 type_error = True
1298 # Variable paths
1299 for i, message in enumerate(self.variable_paths_messages[study][index]):
1300 if "(V)" in message:
1301 list_text.append(colored(message, "green"))
1302 elif "(X)" in message:
1303 file = self.variable_paths[study][i]
1304 path = self.dict_user_paths[study][file][index]
1305 list_text.append(colored(message, "red"))
1306 list_errors.append(colored(f"> {path}", "red"))
1308 # Printing
1309 print(*list_text)
1311 list_errors.sort(key=lambda x: 0 if "inputs.csv" in x else 1)
1312 if len(list_errors) > 0:
1313 print()
1314 print(colored(f"(X) Please set inputs :", "red"))
1315 for error in list_errors:
1316 print(error)
1317 sys.exit(1)
1319 if type_error:
1320 print()
1321 print(colored(f"(X) Please set parameter(s) with expected type(s) in file :", "red"))
1322 print(colored(f"> {str(Path.cwd() / "inputs.csv")}", "red"))
1323 sys.exit(1)
1325 # Go back to working directory
1326 os.chdir(self.working_dir)
1328 def init_paths(self):
1329 """Initialize dictionary containing all paths"""
1331 # Loop over studies
1332 for study in self.studies:
1334 # Define study directory
1335 study_dir:Path = self.working_dir / study
1337 try:
1338 with open(study_dir / ".paths.json") as f:
1339 dict_paths = json.load(f)
1340 except:
1341 dict_paths = {}
1342 for path in self.output_paths:
1343 dict_paths[path] = None
1345 # Purge old datasets
1346 for key, value in dict_paths.items():
1347 if isinstance(value, dict):
1348 # List of datasets to delete
1349 to_delete = [dataset for dataset in value if dataset not in self.dict_datasets[study]]
1350 for dataset in to_delete:
1351 del dict_paths[key][dataset]
1353 self.dict_paths[study] = dict_paths
1355 def update_analysis(self):
1357 # Loop over studies
1358 for study in self.studies:
1360 # Define study directory
1361 study_dir:Path = self.working_dir / study
1363 # Define analysis file
1364 analysis_file = study_dir / "analysis.json"
1366 # Initialize analysis file
1367 if os.path.exists(analysis_file):
1368 with open(analysis_file) as f:
1369 self.dict_analysis[study] = json.load(f)
1370 else:
1371 self.dict_analysis[study] = {}
1373 # Browse all outputs
1374 for out, value in self.dict_paths[study].items():
1376 if out in self.analysis_settings:
1377 dict_out = self.analysis_settings[out]
1378 else:
1379 dict_out = {}
1381 if out not in self.dict_analysis[study]:
1382 self.dict_analysis[study][out] = {}
1383 if isinstance(value, dict):
1384 for case in value:
1385 self.dict_analysis[study][out][case] = dict_out
1387 else:
1388 if isinstance(value, dict):
1389 for case in value:
1390 if case not in self.dict_analysis[study][out]:
1391 self.dict_analysis[study][out][case] = dict_out
1393 cases_to_delete = []
1394 for case in self.dict_analysis[study][out]:
1395 if case not in value:
1396 cases_to_delete.append(case)
1398 for case in cases_to_delete:
1399 if case in self.dict_analysis[study][out]:
1400 del self.dict_analysis[study][out][case]
1402 with open(analysis_file, "w") as f:
1403 json.dump(self.dict_analysis[study], f, indent=4)
1405 def clean_outputs(self):
1406 """Clean outputs."""
1408 # Function to remove output path, either file or directory
1409 def _remove_output(output: str):
1410 output_path = Path(output)
1411 if output_path.exists():
1412 if output_path.is_dir():
1413 shutil.rmtree(output)
1414 else:
1415 output_path.unlink()
1417 # Loop over studies
1418 for study, study_dict in self.dict_studies.items():
1420 # Delete specified outputs
1421 for key, value in study_dict["clean_outputs"].items():
1422 if value:
1423 if isinstance(self.dict_paths[study][key], str):
1424 _remove_output(self.dict_paths[study][key])
1425 if isinstance(self.dict_paths[study][key], dict):
1426 for _, value in self.dict_paths[study][key].items():
1427 _remove_output(value)
1429 def purge_output_datasets(self,
1430 study: str,
1431 ):
1432 """Purge output datasets for a specific study"""
1434 datasets_paths = [f for f in Path.cwd().iterdir()]
1435 for path in datasets_paths:
1436 resolved_path = path.resolve().name
1437 if resolved_path not in self.dict_datasets[study]:
1438 shutil.rmtree(path)
1440 def update_workflow_diagram(self,
1441 process: Process,
1442 ):
1443 """Update workflow diagram for specific process"""
1445 self.diagram[process.name] = {
1446 "params": list(process.params.values()),
1447 "allparams": process.allparams,
1448 "paths": list(process.paths.values()),
1449 "allpaths": process.allpaths,
1450 "required_paths": list(process.required_paths.values()),
1451 "output_paths": list(process.output_paths.values()),
1452 }
1454 def __call__(self):
1455 """Launch workflow of processes."""
1457 # --------------- #
1458 # Launch workflow #
1459 # --------------- #
1460 print()
1461 print(
1462 colored("> RUNNING <", "blue", attrs=["reverse"]),
1463 )
1465 for study, dict_study in self.dict_studies.items():
1467 # Check if study must be executed
1468 if not dict_study["execute"]:
1470 # Printing
1471 print()
1472 print(
1473 colored(f"| {study} |", "magenta"),
1474 )
1475 print()
1476 print(colored("(!) Study is skipped.", "yellow"))
1478 continue
1480 study_dir:Path = self.working_dir / study
1481 os.chdir(study_dir)
1483 for step, proc in enumerate(self.processes):
1485 # Update analysis
1486 self.update_analysis()
1488 if "hard_params" in proc: dict_hard_params = proc["hard_params"]
1489 else: dict_hard_params = {}
1490 if "user_params" in proc: user_params = proc["user_params"]
1491 else: user_params = {}
1492 if "user_paths" in proc: user_paths = proc["user_paths"]
1493 else: user_paths = {}
1494 if "required_paths" in proc: required_paths = proc["required_paths"]
1495 else: required_paths = {}
1496 if "output_paths" in proc: output_paths = proc["output_paths"]
1497 else: output_paths = {}
1498 if "overall_analysis" in proc: overall_analysis = proc["overall_analysis"]
1499 else: overall_analysis = {}
1501 # Define class object for the current process
1502 process = proc["process"]
1503 this_process:Process = process(
1504 study=study,
1505 df_user_params=self.dict_variable_params[study],
1506 dict_user_params=self.dict_fixed_params[study],
1507 dict_user_paths=self.dict_user_paths[study],
1508 dict_paths=self.dict_paths[study],
1509 params=user_params,
1510 paths=user_paths,
1511 dict_hard_params=dict_hard_params,
1512 fixed_params=self.fixed_params[study],
1513 variable_params=self.variable_params[study],
1514 fixed_paths=self.fixed_paths[study],
1515 variable_paths=self.variable_paths[study],
1516 required_paths=required_paths,
1517 output_paths=output_paths,
1518 overall_analysis=overall_analysis,
1519 dict_analysis=self.dict_analysis[study],
1520 verbose=self.dict_process[study][self.list_processes[step]]["verbose"],
1521 diagram=self.diagram,
1522 )
1524 # Define process name
1525 this_process.name = this_process.__class__.__name__
1527 # Define working folder associated to the current process
1528 folder_name = f"{step+1}_{this_process.name}"
1529 folder_path:Path = study_dir / folder_name
1530 folder_path.mkdir(exist_ok=True, parents=True)
1531 os.chdir(folder_path)
1533 # Initialize process
1534 this_process.initialize()
1536 # Check if process must be executed
1537 if not self.dict_process[study][self.list_processes[step]]["execute"]:
1539 # Printing
1540 print()
1541 print(
1542 colored(f"| {study} | {this_process.name} |", "magenta"),
1543 )
1544 print()
1545 print(colored("(!) Process is skipped.", "yellow"))
1547 # Update workflow diagram
1548 self.update_workflow_diagram(this_process)
1550 continue
1552 if this_process.is_case:
1554 # Define sub-folders associated to each ID of the inputs dataframe
1555 for idx in this_process.df_params.index:
1557 # Printing
1558 print()
1559 print(
1560 colored(f"| {study} | {this_process.name} | {idx} |", "magenta"),
1561 )
1563 # Check if dataset must be executed
1564 if self.dict_variable_params[study].loc[idx, "EXECUTE"] == 0:
1566 # Printing
1567 print()
1568 print(colored("(!) Dataset is skipped.", "yellow"))
1570 # Go back to working folder
1571 os.chdir(folder_path)
1573 # Purge old output datasets
1574 self.purge_output_datasets(study)
1576 # Update workflow diagram
1577 self.update_workflow_diagram(this_process)
1579 continue
1581 # Update process index
1582 this_process.index = idx
1584 subfolder_path = study_dir / folder_name / str(idx)
1585 subfolder_path.mkdir(exist_ok=True, parents=True)
1586 os.chdir(subfolder_path)
1588 # Launch process
1589 this_process()
1590 this_process.finalize()
1592 # Go back to working folder
1593 os.chdir(folder_path)
1595 # Purge old output datasets
1596 self.purge_output_datasets(study)
1598 else:
1600 # Printing
1601 print()
1602 print(
1603 colored(f"| {study} | {this_process.name} |", "magenta"),
1604 )
1606 # Launch process
1607 this_process()
1608 this_process.finalize()
1610 # Update workflow diagram
1611 self.update_workflow_diagram(this_process)
1613 # Update paths dictonary
1614 self.dict_paths[study] = this_process.dict_paths
1616 # Write paths json file
1617 with open(study_dir / ".paths.json", "w") as f:
1618 json.dump(self.dict_paths[study], f, indent=4)
1620 # Go back to study directory
1621 os.chdir(study_dir)
1623 # Write diagram json file
1624 with open(".diagram.json", "w") as f:
1625 json.dump(self.diagram, f, indent=4)
1627 # Go back to working directory
1628 os.chdir(self.working_dir)
1630 # Delete unecessary outputs
1631 self.clean_outputs()