KaliVeda
Toolkit for HIC analysis
KV_CCIN2P3_Slurm.cpp
1 //Created by KVClassFactory on Mon Jan 24 16:54:04 2022
2 //Author: John Frankland,,,
3 
4 #include "KV_CCIN2P3_Slurm.h"
5 #include "TSystem.h"
6 #include "TEnv.h"
7 #include "KVDataAnalyser.h"
8 #include "KVDataAnalysisTask.h"
9 #include "KVDataSetAnalyser.h"
10 #include "KVSimDirAnalyser.h"
11 
12 using namespace std;
13 
15 
16 
17 
22  : KVBatchSystem(name), fMultiJobs(kTRUE)
23 {
24  //Default constructor
25  //Sets default job time, memory and disk space as defined in $KVROOT/KVFiles/.kvrootrc
26 
27  fDefJobTime = gEnv->GetValue("GE.BatchSystem.DefaultJobTime", "00:05:00");
28  fDefJobMem = gEnv->GetValue("GE.BatchSystem.DefaultJobMemory", "3G");
29  fTimeSet = fMemSet = kFALSE;
30  //default number of runs per job in multi jobs mode (default=1)
31  SetRunsPerJob(gEnv->GetValue("GE.BatchSystem.RunsPerJob", 1));
32 }
33 
34 
35 
36 
39 
41 {
42  //Clear previously set parameters in order to create a new job submission command
44  fTimeSet = fMemSet = kFALSE;
45  fMultiJobs = kTRUE;
46 }
47 
48 
49 
50 
54 
56 {
57  //Set CPU time for batch job.
58  // SetJobTime() => use default time
59  KVString tmp(time);
60  if (tmp == "") tmp = fDefJobTime;
61  //time given as "hh:mm:ss"
62  if (tmp.GetNValues(":") == 2) tmp.Prepend("00:");
63  else if (tmp.GetNValues(":") == 1) tmp.Prepend("00:00:");
64  fParList.SetValue("--time ", tmp);
65  fTimeSet = kTRUE;
66 }
67 
68 
69 
74 
76 {
77  //Set maximum memory used by job.
78  //Include units in string, i.e. "100M", "1G" etc.
79  //If mem="", use default value
80  KVString tmp(mem);
81  if (tmp == "") tmp = fDefJobMem;
82  fParList.SetValue("--mem ", tmp);
83  fMemSet = kTRUE;
84 }
85 
86 
87 
90 
92 {
93  //Print list of owner's jobs.
94  KVList* j = GetListOfJobs();
95  j->ls();
96  delete j;
97 }
98 
99 
100 
103 
105 {
106  // Checks the job and asks for any missing parameters
107 
109 
110  if (!fTimeSet) ChooseJobTime();
111 
112  if (!fMemSet) ChooseJobMemory();
113 
114  return kTRUE;
115 }
116 
117 
118 
120 
122 {
123  KVString tmp = "";
124  cout << "Enter max CPU time per job (ss/mn:ss/hh:mn:ss) ["
125  << fDefJobTime << "] : ";
126  cout.flush();
127  tmp.ReadToDelim(cin);
128  if (!tmp.Length()) {
129  SetJobTime();
130  return;
131  }
132  else
133  SetJobTime(tmp);
134 }
135 
136 
137 
139 
141 {
142  KVString tmp = "";
143  cout << "Enter max memory per job (xKB/xMB/xGB) ["
144  << fDefJobMem.Data() << "] : ";
145  cout.flush();
146  tmp.ReadToDelim(cin);
147  SetJobMemory(tmp.Data());
148 }
149 
150 
151 
154 
156 {
157 // returns the parameter string corresponding to the job CPU time
158  return fParList.GetStringValue("--time ");
159 }
160 
161 
162 
165 
167 {
168 // returns the parameter string corresponding to the job Memory
169  return fParList.GetStringValue("--mem ");
170 }
171 
172 
173 
174 
178 
180 {
181  //Store any useful information on batch system in the TEnv
182  //(this method is used by KVDataAnalyser::WriteBatchEnvFile)
184  env->SetValue("BatchSystem.MultiJobs", MultiJobsMode());
185  if (MultiJobsMode()) env->SetValue("BatchSystem.CurrentRunList", fCurrJobRunList.AsString());
186  env->SetValue("BatchSystem.Time", GetJobTime());
187  env->SetValue("BatchSystem.Memory", GetJobMemory());
188  // if analysis of simulated data is being used, we copy the files to analyse to the
189  // scratch disk of the batch job (make sure enough disk space is requested)
190  env->SetValue("SimDirAnalyser.CopyFilesToWorkingDirectory", true);
191 }
192 
193 
194 
195 
199 
201 {
202  //Read any useful information on batch system from the TEnv
203  //(this method is used by KVDataAnalyser::ReadBatchEnvFile)
205  SetMultiJobsMode(env->GetValue("BatchSystem.MultiJobs", kFALSE));
206  if (MultiJobsMode()) fCurrJobRunList.SetList(env->GetValue("BatchSystem.CurrentRunList", ""));
207  SetJobTime(env->GetValue("BatchSystem.Time", ""));
208  SetJobMemory(env->GetValue("BatchSystem.Memory", ""));
209 }
210 
211 
212 
213 
217 
219 {
220  //if option="log", print infos for batch log file
221  //if option="all", print detailed info on batch system
222  if (!strcmp(option, "log")) {
224  cout << "* MEM_REQ: " << GetJobMemory() << " *" << endl;
225  }
226  else
228 }
229 
230 
231 
232 
248 
250 {
251  // PRIVATE method called by SubmitTask() at moment of job submission.
252  // Depending on the current environment, the default job submission options
253  // may be changed by this method.
254  //
255  // This method overrides and augments KVBatchSystem::ChangeDefJobOpt (which
256  // changes the options as a function of the type of analysis task).
257  // Here we add the CCIN2P3-specific case where the job is launched from a directory
258  // on the /sps/ semi-permanent storage facility, or if the data being analysed is
259  // stored in a repository on /sps/.
260  //
261  // Then we must declare the resource 'sps'.
262  // As the 'sbatch' command does not allow multiple '-L' ressource declarations,
263  // the whole thing has to be done here. 'xrootd' resource is declared by
264  // default, 'sps' is added if needed.
265 
267  KVString taskname = da->GetAnalysisTask()->GetName();
269  Bool_t repIsSPS = rootdir.BeginsWith("/sps/");
270 
271  KVString wrkdir(gSystem->WorkingDirectory());
272  KVString oldoptions(GetDefaultJobOptions());
273 
274  KVString ressource_list(" -L xrootd");
275  Bool_t NeedToAddSPS = wrkdir.Contains("/sps/");
276  if ((NeedToAddSPS || repIsSPS)) {
277  ressource_list += ",sps";
278  }
279  oldoptions += ressource_list;
280  SetDefaultJobOptions(oldoptions.Data());
281 }
282 
283 
284 
285 
292 
294 {
295  // Batch-system dependent sanitization of jobnames
296  // Grid Engine does not allow:
297  // :
298  // Any such character appearing in the current jobname will be replaced
299  // with '_'
300 
301  fCurrJobName.ReplaceAll(":", "_");
302 }
303 
304 
305 
310 
312 {
313  //Processes the job requests for the batch system.
314  //In normal mode, this submits one job for the data analyser fAnalyser
315  //In multijobs mode, this submits one job for each run in the runlist associated to fAnalyser
316 
317  if (!CheckJobParameters()) return;
318 
319  if (MultiJobsMode()) {
320  if (fAnalyser->InheritsFrom("KVDataSetAnalyser")) {
321  //submit jobs for every GetRunsPerJob() runs in runlist
322  KVDataSetAnalyser* ana = dynamic_cast<KVDataSetAnalyser*>(fAnalyser);
323  auto runs = ana->GetRunList();
324  Int_t remaining_runs = runs.GetNValues();
325  fCurrJobRunList.Clear();
326  bool submitted = false;
327  for (auto& run : runs) {
328  if (!remaining_runs) break;
329  remaining_runs--;
330  fCurrJobRunList.Add(run);
331  submitted = false;
332  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob())) {
333  // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
334  ana->SetRuns(fCurrJobRunList, kFALSE);
335  ana->SetFullRunList(runs);
336  SubmitJob();
337  fCurrJobRunList.Clear();
338  submitted = true;
339  }
340  }
341  if (!submitted && !fCurrJobRunList.IsEmpty()) {
342  // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
343  ana->SetRuns(fCurrJobRunList, kFALSE);
344  ana->SetFullRunList(runs);
345  SubmitJob();
346  fCurrJobRunList.Clear();
347  }
348  ana->SetRuns(runs, kFALSE);
349  }
350  else if (fAnalyser->InheritsFrom("KVSimDirAnalyser")) {
351  // here we understand "run" to mean "file"
352  KVSimDirAnalyser* ana = dynamic_cast<KVSimDirAnalyser*>(fAnalyser);
353  TList* file_list = ana->GetFileList();
354  Int_t remaining_runs = ana->GetNumberOfFilesToAnalyse();
355  fCurrJobRunList.Clear();
356  TList cur_file_list;
357  TObject* of;
358  TIter it(file_list);
359  Int_t file_no = 1;
360  while ((of = it())) {
361  cur_file_list.Add(of);
362  fCurrJobRunList.Add(run_index_t(file_no, std::nullopt));
363  remaining_runs--;
364  file_no++;
365  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || (remaining_runs == 0)) {
366  // submit job for GetRunsPerJob() files (or less if we have reached end of list)
367  ana->SetFileList(&cur_file_list);
368  SubmitJob();
369  fCurrJobRunList.Clear();
370  cur_file_list.Clear();
371  }
372  }
373  ana->SetFileList(file_list);
374  }
375  }
376  else {
377  SubmitJob();
378  }
379 
380 }
381 
382 
383 
396 
398 {
399  // Fill the list with all relevant parameters for batch system,
400  // set to their default values.
401  //
402  // Parameters defined here are:
403  // JobTime [string]
404  // JobMemory [string]
405  // MultiJobsMode [bool]
406  // RunsPerJob [int]
407  // EMailOnStart [bool]
408  // EMailOnEnd [bool]
409  // EMailAddress [string]
410 
412  nl.SetValue("JobTime", fDefJobTime);
413  nl.SetValue("JobMemory", fDefJobMem);
414  nl.SetValue("MultiJobsMode", MultiJobsMode());
415  nl.SetValue("RunsPerJob", fRunsPerJob);
416 }
417 
418 
419 
422 
424 {
425  // Use the parameters in the list to set all relevant parameters for batch system.
426 
428  SetJobTime(nl.GetStringValue("JobTime"));
429  SetJobMemory(nl.GetStringValue("JobMemory"));
430  SetMultiJobsMode(nl.GetBoolValue("MultiJobsMode"));
431  SetRunsPerJob(nl.GetIntValue("RunsPerJob"));
432 }
433 
434 
int Int_t
bool Bool_t
char Char_t
constexpr Bool_t kFALSE
constexpr Bool_t kTRUE
const char Option_t
R__EXTERN TEnv * gEnv
const char rootdir[]
Option_t Option_t option
R__EXTERN TSystem * gSystem
Base class for interface to a batch job management system.
Definition: KVBatchSystem.h:78
virtual void WriteBatchEnvFile(TEnv *)
void Clear(Option_t *opt="") override
virtual void ChangeDefJobOpt(KVDataAnalyser *da)
virtual void ReadBatchEnvFile(TEnv *)
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
void Print(Option_t *="") const override
virtual void GetBatchSystemParameterList(KVNameValueList &)
virtual Bool_t CheckJobParameters()
Checks the job and ask for the job name if needed.
Manager class which sets up and runs data analysis tasks.
virtual KVString GetRootDirectoryOfDataToAnalyse() const
KVDataAnalysisTask * GetAnalysisTask() const
Pilots user analysis of experimental data.
void SetRuns(const run_index_list &nl, Bool_t check=kTRUE)
const run_index_list & GetRunList() const
void SetFullRunList(const run_index_list &nl)
Extended TList class which owns its objects by default.
Definition: KVList.h:28
Handles lists of named parameters with different types, a list of KVNamedParameter objects.
Int_t GetIntValue(const Char_t *name) const
void SetValue(const Char_t *name, value_type value)
Bool_t GetBoolValue(const Char_t *name) const
const Char_t * GetStringValue(const Char_t *name) const
Class piloting analyses of simulated data.
void SetFileList(TList *l) override
Int_t GetNumberOfFilesToAnalyse() const override
TList * GetFileList() const
Extension of ROOT TString class which allows backwards compatibility with ROOT v3....
Definition: KVString.h:73
Int_t GetNValues(TString delim) const
Definition: KVString.cpp:886
Interface to CCIN2P3 Grid Engine batch job management system.
void ChangeDefJobOpt(KVDataAnalyser *) override
void SetBatchSystemParameters(const KVNameValueList &) override
Use the parameters in the list to set all relevant parameters for batch system.
void GetBatchSystemParameterList(KVNameValueList &) override
const Char_t * GetJobTime(void) const
returns the parameter string corresponding to the job CPU time
void SetJobMemory(const Char_t *h="")
void Print(Option_t *="") const override
Bool_t CheckJobParameters() override
Checks the job and asks for any missing parameters.
void ReadBatchEnvFile(TEnv *) override
const Char_t * GetJobMemory(void) const
returns the parameter string corresponding to the job Memory
void ChooseJobMemory(void)
void Clear(Option_t *opt="") override
Clear previously set parameters in order to create a new job submission command.
void SetJobTime(const Char_t *h="")
void SanitizeJobName() const override
void WriteBatchEnvFile(TEnv *) override
void Run() override
void PrintJobs(Option_t *opt="") override
Print list of owner's jobs.
void ls(Option_t *option="") const override
virtual const char * GetValue(const char *name, const char *dflt) const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
void Clear(Option_t *option="") override
void Add(TObject *obj) override
const char * GetName() const override
Ssiz_t Length() const
std::istream & ReadToDelim(std::istream &str, char delim='\n')
const char * Data() const
TString & Prepend(char c, Ssiz_t rep=1)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual const char * WorkingDirectory()
Specifies a runfile according to run number and file index ,.
Definition: run_index.h:33
UInt_t GetListOfJobs(TFile *file, TList &jobdirs)
ClassImp(TPyArg)