Coverage for nuremics\core\process.py: 90%
135 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 16:54 +0200
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 16:54 +0200
1from __future__ import annotations
3import os
4import sys
5import attrs
6import json
7from typing import Callable
8from pathlib import Path
9from termcolor import colored
11import pandas as pd
13from .utils import (
14 convert_value,
15 concat_lists_unique,
16)
19@attrs.define
20class Process():
21 """Mother class of all classes of process."""
23 name: str = attrs.field(default=None)
24 study: str = attrs.field(default=None)
25 df_user_params: pd.DataFrame = attrs.field(default=None)
26 dict_user_params: dict = attrs.field(default=None)
27 dict_user_paths: dict = attrs.field(default=None)
28 dict_paths: dict = attrs.field(factory=dict)
29 is_case: bool = attrs.field(default=True)
30 params: dict = attrs.field(factory=dict)
31 allparams: list = attrs.field(factory=list)
32 paths: dict = attrs.field(factory=dict)
33 allpaths: list = attrs.field(factory=list)
34 fixed_params: list = attrs.field(default=None)
35 variable_params: list = attrs.field(default=None)
36 fixed_paths: list = attrs.field(default=None)
37 variable_paths: list = attrs.field(default=None)
38 fixed_params_proc: list = attrs.field(factory=list)
39 variable_params_proc: list = attrs.field(factory=list)
40 fixed_paths_proc: list = attrs.field(factory=list)
41 variable_paths_proc: list = attrs.field(factory=list)
42 df_params: pd.DataFrame = attrs.field(default=None)
43 dict_inputs: dict = attrs.field(default=None)
44 dict_hard_params: dict = attrs.field(factory=dict)
45 output_paths: dict = attrs.field(factory=dict)
46 overall_analysis: dict = attrs.field(factory=dict)
47 dict_analysis: dict = attrs.field(factory=dict)
48 required_paths: dict = attrs.field(factory=dict)
49 verbose: bool = attrs.field(default=True)
50 index: str = attrs.field(default=None)
51 diagram: dict = attrs.field(default={})
52 set_inputs: bool = attrs.field(default=False)
54 def initialize(self):
56 self.variable_params_proc = [x for x in self.variable_params if x in list(self.params.values())]
57 self.fixed_params_proc = [x for x in self.fixed_params if x in list(self.params.values())]
58 self.fixed_paths_proc = [x for x in self.fixed_paths if x in list(self.paths.values())]
59 self.variable_paths_proc = [x for x in self.variable_paths if x in list(self.paths.values())]
61 # Define list with all parameters considering dependencies with previous processes
62 self.allparams = list(self.params.values()).copy()
63 for required_paths in list(self.required_paths.values()):
64 for _, value in self.diagram.items():
65 if required_paths in value.get("output_paths"):
66 self.allparams = concat_lists_unique(
67 list1=list(self.params.values()),
68 list2=value["allparams"],
69 )
71 # Define list with all paths considering dependencies with previous processes
72 self.allpaths = list(self.paths.values()).copy()
73 for required_path in list(self.required_paths.values()):
74 for _, value in self.diagram.items():
75 if required_path in value.get("output_paths"):
76 self.allpaths = concat_lists_unique(
77 list1=list(self.paths.values()),
78 list2=value["allpaths"],
79 )
81 if self.is_case:
82 self.on_params_update()
84 def __call__(self):
86 # Update dictionary of parameters
87 if not self.set_inputs:
88 self.update_dict_inputs()
90 for param, value in self.dict_inputs.items():
91 setattr(self, param, value)
93 # Printing
94 print(colored(f"> {param} = {value}", "blue"))
96 # Printing
97 print(colored(">>> START", "green"))
99 def on_params_update(self):
101 # Create parameters dataframe and fill with variable parameters
102 if (len(self.variable_params_proc) > 0) or (len(self.variable_paths_proc) > 0):
103 self.df_params = self.df_user_params[self.variable_params_proc].copy()
105 # There is no variable parameters / paths
106 else:
108 # Check parameters / paths dependencies
109 variable_params = [x for x in self.variable_params if x in self.allparams]
110 variable_paths = [x for x in self.variable_paths if x in self.allpaths]
112 # There are variable parameters / paths from previous process
113 if (len(variable_params) > 0) or (len(variable_paths) > 0):
114 self.df_params = pd.DataFrame(self.df_user_params.index, columns=["ID"]).set_index("ID")
115 # There is no variable parameter from previous process
116 else:
117 self.is_case = False
119 # Add fixed parameters to the dataframe
120 if self.is_case:
121 for param in self.fixed_params_proc:
122 self.df_params[param] = self.dict_user_params[param]
124 def update_dict_inputs(self):
126 # Add user parameters
127 if self.is_case:
129 self.dict_inputs = {}
130 params_inv = {v: k for k, v in self.params.items()}
131 for param in self.df_params.columns:
132 value = convert_value(self.df_params.at[self.index, param])
133 self.dict_inputs[params_inv[param]] = value
134 else:
135 self.dict_inputs = {k: self.dict_user_params[v] for k, v in self.params.items()}
137 # Add hard parameters
138 for param, value in self.dict_hard_params.items():
139 self.dict_inputs[param] = value
141 # Add user paths
142 paths_inv = {v: k for k, v in self.paths.items()}
143 for file in self.fixed_paths_proc:
144 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file]
145 for file in self.variable_paths_proc:
146 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file][self.index]
148 # Add previous output paths
149 for key, value in self.required_paths.items():
150 output_path = self.get_output_path(value)
151 self.dict_inputs[key] = output_path
153 # Add output analysis
154 for out, value in self.overall_analysis.items():
155 self.dict_inputs[out] = value
157 # Write json file containing all parameters
158 with open("inputs.json", "w") as f:
159 json.dump(self.dict_inputs, f, indent=4)
161 def get_output_path(self,
162 output_path: str,
163 ):
164 """Function to get the path to an output within the paths dictionary"""
166 # Initialize path to return
167 path = None
169 if isinstance(self.dict_paths[output_path], dict):
170 for key, value in self.dict_paths[output_path].items():
171 if key == self.index:
172 path = value
173 else:
174 path = self.dict_paths[output_path]
177 if not Path(path).exists():
179 # Printing
180 print()
181 print(colored(f"(X) Required {output_path} is missing :", "red"))
182 print(colored(f"> Please execute the necessary previous process that will build it.", "red"))
184 sys.exit(1)
186 return path
188 def update_output(self,
189 output_path: str,
190 dump: str,
191 ):
192 if output_path not in self.dict_paths:
193 self.dict_paths[output_path] = None
195 if self.is_case:
196 if self.dict_paths[output_path] is None: self.dict_paths[output_path] = {}
197 self.dict_paths[output_path][self.index] = os.path.join(os.getcwd(), dump)
198 else:
199 self.dict_paths[output_path] = os.path.join(os.getcwd(), dump)
201 @staticmethod
202 def analysis_function(
203 func: Callable,
204 ) -> Callable:
206 func._is_analysis = True
207 return func
209 def process_output(self,
210 out: str,
211 func: Callable[..., None],
212 **kwargs,
213 ):
214 if not getattr(func, "_is_analysis", False):
215 print(colored(f'(X) Function "{func.__name__}" is not a valid analysis function.', "red"))
216 sys.exit(1)
218 output = self.dict_paths[out]
219 analysis = self.dict_analysis[out]
220 if isinstance(output, dict):
221 func(output, analysis, **kwargs)
223 def finalize(self):
225 for _, value in self.output_paths.items():
226 self.update_output(
227 output_path=value,
228 dump=value,
229 )
231 print(colored("COMPLETED <<<", "green"))