Coverage for nuremics\core\process.py: 90%

135 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-06-12 16:54 +0200

1from __future__ import annotations 

2 

3import os 

4import sys 

5import attrs 

6import json 

7from typing import Callable 

8from pathlib import Path 

9from termcolor import colored 

10 

11import pandas as pd 

12 

13from .utils import ( 

14 convert_value, 

15 concat_lists_unique, 

16) 

17 

18 

19@attrs.define 

20class Process(): 

21 """Mother class of all classes of process.""" 

22 

23 name: str = attrs.field(default=None) 

24 study: str = attrs.field(default=None) 

25 df_user_params: pd.DataFrame = attrs.field(default=None) 

26 dict_user_params: dict = attrs.field(default=None) 

27 dict_user_paths: dict = attrs.field(default=None) 

28 dict_paths: dict = attrs.field(factory=dict) 

29 is_case: bool = attrs.field(default=True) 

30 params: dict = attrs.field(factory=dict) 

31 allparams: list = attrs.field(factory=list) 

32 paths: dict = attrs.field(factory=dict) 

33 allpaths: list = attrs.field(factory=list) 

34 fixed_params: list = attrs.field(default=None) 

35 variable_params: list = attrs.field(default=None) 

36 fixed_paths: list = attrs.field(default=None) 

37 variable_paths: list = attrs.field(default=None) 

38 fixed_params_proc: list = attrs.field(factory=list) 

39 variable_params_proc: list = attrs.field(factory=list) 

40 fixed_paths_proc: list = attrs.field(factory=list) 

41 variable_paths_proc: list = attrs.field(factory=list) 

42 df_params: pd.DataFrame = attrs.field(default=None) 

43 dict_inputs: dict = attrs.field(default=None) 

44 dict_hard_params: dict = attrs.field(factory=dict) 

45 output_paths: dict = attrs.field(factory=dict) 

46 overall_analysis: dict = attrs.field(factory=dict) 

47 dict_analysis: dict = attrs.field(factory=dict) 

48 required_paths: dict = attrs.field(factory=dict) 

49 verbose: bool = attrs.field(default=True) 

50 index: str = attrs.field(default=None) 

51 diagram: dict = attrs.field(default={}) 

52 set_inputs: bool = attrs.field(default=False) 

53 

54 def initialize(self): 

55 

56 self.variable_params_proc = [x for x in self.variable_params if x in list(self.params.values())] 

57 self.fixed_params_proc = [x for x in self.fixed_params if x in list(self.params.values())] 

58 self.fixed_paths_proc = [x for x in self.fixed_paths if x in list(self.paths.values())] 

59 self.variable_paths_proc = [x for x in self.variable_paths if x in list(self.paths.values())] 

60 

61 # Define list with all parameters considering dependencies with previous processes 

62 self.allparams = list(self.params.values()).copy() 

63 for required_paths in list(self.required_paths.values()): 

64 for _, value in self.diagram.items(): 

65 if required_paths in value.get("output_paths"): 

66 self.allparams = concat_lists_unique( 

67 list1=list(self.params.values()), 

68 list2=value["allparams"], 

69 ) 

70 

71 # Define list with all paths considering dependencies with previous processes 

72 self.allpaths = list(self.paths.values()).copy() 

73 for required_path in list(self.required_paths.values()): 

74 for _, value in self.diagram.items(): 

75 if required_path in value.get("output_paths"): 

76 self.allpaths = concat_lists_unique( 

77 list1=list(self.paths.values()), 

78 list2=value["allpaths"], 

79 ) 

80 

81 if self.is_case: 

82 self.on_params_update() 

83 

84 def __call__(self): 

85 

86 # Update dictionary of parameters 

87 if not self.set_inputs: 

88 self.update_dict_inputs() 

89 

90 for param, value in self.dict_inputs.items(): 

91 setattr(self, param, value) 

92 

93 # Printing 

94 print(colored(f"> {param} = {value}", "blue")) 

95 

96 # Printing 

97 print(colored(">>> START", "green")) 

98 

99 def on_params_update(self): 

100 

101 # Create parameters dataframe and fill with variable parameters 

102 if (len(self.variable_params_proc) > 0) or (len(self.variable_paths_proc) > 0): 

103 self.df_params = self.df_user_params[self.variable_params_proc].copy() 

104 

105 # There is no variable parameters / paths 

106 else: 

107 

108 # Check parameters / paths dependencies 

109 variable_params = [x for x in self.variable_params if x in self.allparams] 

110 variable_paths = [x for x in self.variable_paths if x in self.allpaths] 

111 

112 # There are variable parameters / paths from previous process 

113 if (len(variable_params) > 0) or (len(variable_paths) > 0): 

114 self.df_params = pd.DataFrame(self.df_user_params.index, columns=["ID"]).set_index("ID") 

115 # There is no variable parameter from previous process 

116 else: 

117 self.is_case = False 

118 

119 # Add fixed parameters to the dataframe 

120 if self.is_case: 

121 for param in self.fixed_params_proc: 

122 self.df_params[param] = self.dict_user_params[param] 

123 

124 def update_dict_inputs(self): 

125 

126 # Add user parameters 

127 if self.is_case: 

128 

129 self.dict_inputs = {} 

130 params_inv = {v: k for k, v in self.params.items()} 

131 for param in self.df_params.columns: 

132 value = convert_value(self.df_params.at[self.index, param]) 

133 self.dict_inputs[params_inv[param]] = value 

134 else: 

135 self.dict_inputs = {k: self.dict_user_params[v] for k, v in self.params.items()} 

136 

137 # Add hard parameters 

138 for param, value in self.dict_hard_params.items(): 

139 self.dict_inputs[param] = value 

140 

141 # Add user paths 

142 paths_inv = {v: k for k, v in self.paths.items()} 

143 for file in self.fixed_paths_proc: 

144 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file] 

145 for file in self.variable_paths_proc: 

146 self.dict_inputs[paths_inv[file]] = self.dict_user_paths[file][self.index] 

147 

148 # Add previous output paths 

149 for key, value in self.required_paths.items(): 

150 output_path = self.get_output_path(value) 

151 self.dict_inputs[key] = output_path 

152 

153 # Add output analysis 

154 for out, value in self.overall_analysis.items(): 

155 self.dict_inputs[out] = value 

156 

157 # Write json file containing all parameters 

158 with open("inputs.json", "w") as f: 

159 json.dump(self.dict_inputs, f, indent=4) 

160 

161 def get_output_path(self, 

162 output_path: str, 

163 ): 

164 """Function to get the path to an output within the paths dictionary""" 

165 

166 # Initialize path to return 

167 path = None 

168 

169 if isinstance(self.dict_paths[output_path], dict): 

170 for key, value in self.dict_paths[output_path].items(): 

171 if key == self.index: 

172 path = value 

173 else: 

174 path = self.dict_paths[output_path] 

175 

176 

177 if not Path(path).exists(): 

178 

179 # Printing 

180 print() 

181 print(colored(f"(X) Required {output_path} is missing :", "red")) 

182 print(colored(f"> Please execute the necessary previous process that will build it.", "red")) 

183 

184 sys.exit(1) 

185 

186 return path 

187 

188 def update_output(self, 

189 output_path: str, 

190 dump: str, 

191 ): 

192 if output_path not in self.dict_paths: 

193 self.dict_paths[output_path] = None 

194 

195 if self.is_case: 

196 if self.dict_paths[output_path] is None: self.dict_paths[output_path] = {} 

197 self.dict_paths[output_path][self.index] = os.path.join(os.getcwd(), dump) 

198 else: 

199 self.dict_paths[output_path] = os.path.join(os.getcwd(), dump) 

200 

201 @staticmethod 

202 def analysis_function( 

203 func: Callable, 

204 ) -> Callable: 

205 

206 func._is_analysis = True 

207 return func 

208 

209 def process_output(self, 

210 out: str, 

211 func: Callable[..., None], 

212 **kwargs, 

213 ): 

214 if not getattr(func, "_is_analysis", False): 

215 print(colored(f'(X) Function "{func.__name__}" is not a valid analysis function.', "red")) 

216 sys.exit(1) 

217 

218 output = self.dict_paths[out] 

219 analysis = self.dict_analysis[out] 

220 if isinstance(output, dict): 

221 func(output, analysis, **kwargs) 

222 

223 def finalize(self): 

224 

225 for _, value in self.output_paths.items(): 

226 self.update_output( 

227 output_path=value, 

228 dump=value, 

229 ) 

230 

231 print(colored("COMPLETED <<<", "green"))