KaliVeda
Toolkit for HIC analysis
KV_CCIN2P3_GE.cpp
1 //Created by KVClassFactory on Wed Apr 27 15:43:08 CEST 2011
2 //Author: John Frankland
3 
4 #include "KV_CCIN2P3_GE.h"
5 #include "TObjString.h"
6 #include "TSystem.h"
7 #include "TEnv.h"
8 #include "KVDataAnalyser.h"
9 #include "KVDataAnalysisTask.h"
10 #include "KVGEBatchJob.h"
11 #include "KVDataRepository.h"
12 #include "KVDataSetAnalyser.h"
13 #include "KVSimDirAnalyser.h"
14 
15 using namespace std;
16 
18 
19 
20 
25  : KVBatchSystem(name), fMultiJobs(kTRUE)
26 {
27  //Default constructor
28  //Sets default job time, memory and disk space as defined in $KVROOT/KVFiles/.kvrootrc
29 
30  fDefJobTime = gEnv->GetValue("GE.BatchSystem.DefaultJobTime", "5:00");
31  fDefJobMem = gEnv->GetValue("GE.BatchSystem.DefaultJobMemory", "2G");
32  fDefJobDisk = gEnv->GetValue("GE.BatchSystem.DefaultJobDisk", "50M");
33  fTimeSet = fDiskSet = fMemSet = kFALSE;
34  //default number of runs per job in multi jobs mode (default=1)
35  SetRunsPerJob(gEnv->GetValue("GE.BatchSystem.RunsPerJob", 1));
36 }
37 
38 
39 
40 
43 
45 {
46  //Clear previously set parameters in order to create a new job submission command
48  fTimeSet = fDiskSet = fMemSet = kFALSE;
49  fMultiJobs = kTRUE;
50 }
51 
52 
53 
57 
59 {
60  //Set CPU time for batch job.
61  // SetJobTime() => use default time
62  KVString tmp(time);
63  if (tmp == "") tmp = fDefJobTime;
64  //time given as either "hh:mm:ss" or "ss" but NOT "mm:ss"!
65  if (tmp.GetNValues(":") == 2) tmp.Prepend("0:");
66  fParList.SetValue("-l ct=", tmp);
67  fTimeSet = kTRUE;
68 }
69 
70 
71 
76 
78 {
79  //Set maximum memory used by job.
80  //Include units in string, i.e. "100M", "1G" etc.
81  //If mem="", use default value
82  KVString tmp(mem);
83  if (tmp == "") tmp = fDefJobMem;
84  fParList.SetValue("-l vmem=", tmp);
85  fMemSet = kTRUE;
86 }
87 
88 
89 
94 
96 {
97  //Set maximum disk space used by job.
98  //Include units in string, i.e. "100M", "1G" etc.
99  //If diks="", use default value
100  KVString tmp(diks);
101  if (tmp == "") tmp = fDefJobDisk;
102  fParList.SetValue("-l fsize=", tmp);
103  fDiskSet = kTRUE;
104 }
105 
106 
107 
110 
112 {
113  //Print list of owner's jobs.
114  KVList* j = GetListOfJobs();
115  j->ls();
116  delete j;
117 }
118 
119 
120 
123 
125 {
126  // Checks the job and asks for any missing parameters
127 
129 
130  if (!fTimeSet) ChooseJobTime();
131 
132  if (!fDiskSet) ChooseJobDisk();
133 
134  if (!fMemSet) ChooseJobMemory();
135 
136  return kTRUE;
137 }
138 
139 
140 
142 
144 {
145  KVString tmp = "";
146  cout << "Enter max CPU time per job (ss/mn:ss/hh:mn:ss) ["
147  << fDefJobTime << "] : ";
148  cout.flush();
149  tmp.ReadToDelim(cin);
150  if (!tmp.Length()) {
151  SetJobTime();
152  return;
153  }
154  else
155  SetJobTime(tmp);
156 }
157 
158 
159 
161 
163 {
164  KVString tmp = "";
165  cout << "Enter max memory per job (xKB/xMB/xGB) ["
166  << fDefJobMem.Data() << "] : ";
167  cout.flush();
168  tmp.ReadToDelim(cin);
169  SetJobMemory(tmp.Data());
170 }
171 
172 
173 
175 
177 {
178  KVString tmp = "";
179  cout << "Enter max scratch disk per job (xKB/xMB/xGB) ["
180  << fDefJobDisk.Data() << "] : ";
181  cout.flush();
182  tmp.ReadToDelim(cin);
183  SetJobDisk(tmp.Data());
184 }
185 
186 
187 
190 
192 {
193 // returns the parameter string corresponding to the job CPU time
194  return fParList.GetStringValue("-l ct=");
195 }
196 
197 
198 
201 
203 {
204 // returns the parameter string corresponding to the job Memory
205  return fParList.GetStringValue("-l vmem=");
206 }
207 
208 
209 
212 
214 {
215 // returns the parameter string corresponding to the job Disk
216  return fParList.GetStringValue("-l fsize=");
217 }
218 
219 
220 
221 
225 
227 {
228  //Store any useful information on batch system in the TEnv
229  //(this method is used by KVDataAnalyser::WriteBatchEnvFile)
231  env->SetValue("BatchSystem.MultiJobs", MultiJobsMode());
232  if (MultiJobsMode()) env->SetValue("BatchSystem.CurrentRunList", fCurrJobRunList.AsString());
233  env->SetValue("BatchSystem.Time", GetJobTime());
234  env->SetValue("BatchSystem.Memory", GetJobMemory());
235  env->SetValue("BatchSystem.Disk", GetJobDisk());
236  // if analysis of simulated data is being used, we copy the files to analyse to the
237  // scratch disk of the batch job (make sure enough disk space is requested)
238  env->SetValue("SimDirAnalyser.CopyFilesToWorkingDirectory", true);
239 }
240 
241 
242 
243 
247 
249 {
250  //Read any useful information on batch system from the TEnv
251  //(this method is used by KVDataAnalyser::ReadBatchEnvFile)
253  SetMultiJobsMode(env->GetValue("BatchSystem.MultiJobs", kFALSE));
254  if (MultiJobsMode()) fCurrJobRunList.SetList(env->GetValue("BatchSystem.CurrentRunList", ""));
255  SetJobTime(env->GetValue("BatchSystem.Time", ""));
256  SetJobMemory(env->GetValue("BatchSystem.Memory", ""));
257  SetJobDisk(env->GetValue("BatchSystem.Disk", ""));
258 }
259 
260 
261 
262 
266 
267 void KV_CCIN2P3_GE::Print(Option_t* option) const
268 {
269  //if option="log", print infos for batch log file
270  //if option="all", print detailed info on batch system
271  if (!strcmp(option, "log")) {
273  cout << "* DISK_REQ: " << GetJobDisk() << " *" << endl;
274  cout << "* MEM_REQ: " << GetJobMemory() << " *" << endl;
275  }
276  else
278 }
279 
280 
281 
282 
296 
298 {
299  // PRIVATE method called by SubmitTask() at moment of job submission.
300  // Depending on the current environment, the default job submission options
301  // may be changed by this method.
302  //
303  // This method overrides and augments KVBatchSystem::ChangeDefJobOpt (which
304  // changes the options as a function of the type of analysis task).
305  // Here we add the CCIN2P3-specific case where the job is launched from a directory
306  // on the /sps/ semi-permanent storage facility, or if the data being analysed is
307  // stored in a repository on /sps/. In this case we need to add
308  // the option '-l u_sps_indra' to the 'qsub' command (if not already in the
309  // default job options)
310  //
312  KVString taskname = da->GetAnalysisTask()->GetName();
314  Bool_t repIsSPS = rootdir.BeginsWith("/sps/");
315 
316  KVString wrkdir(gSystem->WorkingDirectory());
317  KVString oldoptions(GetDefaultJobOptions());
318 
319  if (!oldoptions.Contains("sps")) {
320  Bool_t NeedToAddSPS = wrkdir.Contains("/sps/");
321  if ((NeedToAddSPS || repIsSPS)) {
322  oldoptions += " -l sps=1";
323  SetDefaultJobOptions(oldoptions.Data());
324  Info("ChangeDefJobOpt",
325  "Your job is being launched from /sps/... zone.\nTherefore the ressource 'sps' has been declared and the number of jobs which can be treated concurrently will be limited.");
326  }
327  }
328 }
329 
330 
331 
332 
339 
341 {
342  // Batch-system dependent sanitization of jobnames
343  // Grid Engine does not allow:
344  // :
345  // Any such character appearing in the current jobname will be replaced
346  // with '_'
347 
348  fCurrJobName.ReplaceAll(":", "_");
349 }
350 
351 
352 
356 
358 {
359  // Create and fill list with KVBatchJob objects describing current jobs
360  // Delete list after use
361 
362  KVList* list_of_jobs = new KVList;
363 
364  // use qstat -r to get list of job ids and jobnames
365  TString reply = gSystem->GetFromPipe("qstat -r");
366 
367  TObjArray* lines = reply.Tokenize("\n");
368  Int_t nlines = lines->GetEntries();
369  for (Int_t line_number = 0; line_number < nlines; line_number++) {
370  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
371  if (thisLine.Contains("Full jobname:")) {
372  // previous line contains job-id and status
373  TString lastLine = ((TObjString*)(*lines)[line_number - 1])->String();
374  TObjArray* bits = lastLine.Tokenize(" ");
375  Int_t jobid = ((TObjString*)(*bits)[0])->String().Atoi();
376  TString status = ((TObjString*)(*bits)[4])->String();
377  // date & time jobs started (running job) or submitted (queued job)
378  TString sdate = ((TObjString*)(*bits)[5])->String();// mm/dd/yyyy
379  TString stime = ((TObjString*)(*bits)[6])->String();// hh:mm:ss
380  Int_t dd, MM, yyyy, hh, mm, ss;
381  sscanf(sdate.Data(), "%d/%d/%d", &MM, &dd, &yyyy);
382  sscanf(stime.Data(), "%d:%d:%d", &hh, &mm, &ss);
383  KVDatime submitted(yyyy, MM, dd, hh, mm, ss);
384  delete bits;
385  bits = thisLine.Tokenize(": ");
386  TString jobname = ((TObjString*)(*bits)[2])->String();
387  delete bits;
388 
389  KVGEBatchJob* job = new KVGEBatchJob();
390  job->SetName(jobname);
391  job->SetJobID(jobid);
392  job->SetStatus(status);
393  job->SetSubmitted(submitted);
394  list_of_jobs->Add(job);
395  }
396  }
397  delete lines;
398 
399  if (!list_of_jobs->GetEntries()) return list_of_jobs;
400 
401  // use qstat -j [jobid] to get cpu and memory used and also the resource requests
402  TIter next_job(list_of_jobs);
403  KVGEBatchJob* job;
404  while ((job = (KVGEBatchJob*)next_job())) {
405 
406  // for running jobs, read in from [jobname].status file
407  // the number of events read/to read, disk used
408  if (!strcmp(job->GetStatus(), "r")) job->UpdateDiskUsedEventsRead();
409 
410  reply = gSystem->GetFromPipe(Form("qstat -j %d", job->GetJobID()));
411  lines = reply.Tokenize("\n");
412  nlines = lines->GetEntries();
413  for (Int_t line_number = 0; line_number < nlines; line_number++) {
414  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
415  if (thisLine.BeginsWith("usage")) {
416  TObjArray* bits = thisLine.Tokenize("=,");
417  TString stime = ((TObjString*)(*bits)[1])->String();// hh:mm:ss or d:hh:mm:ss
418  Int_t dd, hh, mm, ss;
419  TObjArray* tmp = stime.Tokenize(":");
420  dd = 0;
421  if (tmp->GetEntries() == 4) sscanf(stime.Data(), "%d:%2d:%2d:%2d", &dd, &hh, &mm, &ss);
422  else sscanf(stime.Data(), "%2d:%2d:%2d", &hh, &mm, &ss);
423  delete tmp;
424  job->SetCPUusage((dd * 24 + hh) * 3600 + mm * 60 + ss);
425  TString smem = ((TObjString*)(*bits)[7])->String();// xxx.xxxxM
426  job->SetMemUsed(smem);
427  delete bits;
428  }
429  else if (thisLine.BeginsWith("hard resource_list:")) {
430  TObjArray* bits = thisLine.Tokenize(": ");
431  TString res = ((TObjString*)(*bits)[2])->String();//os=sl5,xrootd=1,irods=1,s_vmem=1024M,s_fsize=50M,s_cpu=36000
432  res.ReplaceAll("s_vmem", "vmem");
433  res.ReplaceAll("s_fsize", "fsize");
434  res.ReplaceAll("s_cpu", "ct");
435  job->SetResources(res);
436  TObjArray* bbits = res.Tokenize(",");
437  TIter next_res(bbits);
438  TObjString* ss;
439  while ((ss = (TObjString*)next_res())) {
440  TString g = ss->String();
441  if (g.BeginsWith("ct=")) {
442  g.Remove(0, 3);
443  job->SetCPUmax(g.Atoi());
444  }
445  else if (g.BeginsWith("vmem=")) {
446  g.Remove(0, 5);
447  job->SetMemMax(g);
448  }
449  else if (g.BeginsWith("fsize=")) {
450  g.Remove(0, 6);
451  job->SetDiskMax(g);
452  }
453  }
454  delete bits;
455  delete bbits;
456  }
457  }
458  delete lines;
459  //}
460  }
461 
462  return list_of_jobs;
463 }
464 
465 
466 
469 
471 {
472  // add option to send mail when job starts
473  fParList.SetValue("-m b", "");
474 }
475 
476 
477 
480 
482 {
483  // add option to send mail when job ends
484  fParList.SetValue("-m e", "");
485 }
486 
487 
488 
491 
492 void KV_CCIN2P3_GE::SetSendMailAddress(const char* email)
493 {
494  // set email address for notifications
495  fParList.SetValue("-M ", email);
496 }
497 
498 
499 
504 
506 {
507  //Processes the job requests for the batch system.
508  //In normal mode, this submits one job for the data analyser fAnalyser
509  //In multijobs mode, this submits one job for each run in the runlist associated to fAnalyser
510 
511  if (!CheckJobParameters()) return;
512 
513  if (MultiJobsMode()) {
514  if (fAnalyser->InheritsFrom("KVDataSetAnalyser")) {
515  //submit jobs for every GetRunsPerJob() runs in runlist
516  KVDataSetAnalyser* ana = dynamic_cast<KVDataSetAnalyser*>(fAnalyser);
517  KVNumberList runs = ana->GetRunList();
518  runs.Begin();
519  Int_t remaining_runs = runs.GetNValues();
520  fCurrJobRunList.Clear();
521  while (remaining_runs && !runs.End()) {
522  Int_t run = runs.Next();
523  remaining_runs--;
524  fCurrJobRunList.Add(run);
525  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || runs.End()) {
526  // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
527  ana->SetRuns(fCurrJobRunList, kFALSE);
528  ana->SetFullRunList(runs);
529  SubmitJob();
530  fCurrJobRunList.Clear();
531  }
532  }
533  ana->SetRuns(runs, kFALSE);
534  }
535  else if (fAnalyser->InheritsFrom("KVSimDirAnalyser")) {
536  // here we understand "run" to mean "file"
537  KVSimDirAnalyser* ana = dynamic_cast<KVSimDirAnalyser*>(fAnalyser);
538  TList* file_list = ana->GetFileList();
539  Int_t remaining_runs = ana->GetNumberOfFilesToAnalyse();
540  fCurrJobRunList.Clear();
541  TList cur_file_list;
542  TObject* of;
543  TIter it(file_list);
544  Int_t file_no = 1;
545  while ((of = it())) {
546  cur_file_list.Add(of);
547  fCurrJobRunList.Add(file_no);
548  remaining_runs--;
549  file_no++;
550  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || (remaining_runs == 0)) {
551  // submit job for GetRunsPerJob() files (or less if we have reached end of list)
552  ana->SetFileList(&cur_file_list);
553  SubmitJob();
554  fCurrJobRunList.Clear();
555  cur_file_list.Clear();
556  }
557  }
558  ana->SetFileList(file_list);
559  }
560  }
561  else {
562  SubmitJob();
563  }
564 
565 }
566 
567 
568 
582 
584 {
585  // Fill the list with all relevant parameters for batch system,
586  // set to their default values.
587  //
588  // Parameters defined here are:
589  // JobTime [string]
590  // JobMemory [string]
591  // JobDisk [string]
592  // MultiJobsMode [bool]
593  // RunsPerJob [int]
594  // EMailOnStart [bool]
595  // EMailOnEnd [bool]
596  // EMailAddress [string]
597 
599  nl.SetValue("JobTime", fDefJobTime);
600  nl.SetValue("JobMemory", fDefJobMem);
601  nl.SetValue("JobDisk", fDefJobDisk);
602  nl.SetValue("MultiJobsMode", MultiJobsMode());
603  nl.SetValue("RunsPerJob", fRunsPerJob);
604  nl.SetValue("EMailOnStart", kFALSE);
605  nl.SetValue("EMailOnEnd", kFALSE);
606  nl.SetValue("EMailAddress", "");
607 }
608 
609 
610 
613 
615 {
616  // Use the parameters in the list to set all relevant parameters for batch system.
617 
619  SetJobTime(nl.GetStringValue("JobTime"));
620  SetJobMemory(nl.GetStringValue("JobMemory"));
621  SetJobDisk(nl.GetStringValue("JobDisk"));
622  SetMultiJobsMode(nl.GetBoolValue("MultiJobsMode"));
623  SetRunsPerJob(nl.GetIntValue("RunsPerJob"));
624  if (nl.GetTStringValue("EMailAddress") != "") {
625  if (nl.GetBoolValue("EMailOnStart")) SetSendMailOnJobStart();
626  if (nl.GetBoolValue("EMailOnEnd")) SetSendMailOnJobEnd();
627  SetSendMailAddress(nl.GetStringValue("EMailAddress"));
628  }
629 }
630 
631 
int Int_t
bool Bool_t
char Char_t
constexpr Bool_t kFALSE
constexpr Bool_t kTRUE
const char Option_t
R__EXTERN TEnv * gEnv
const char rootdir[]
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t g
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
void SetCPUmax(Int_t c)
Definition: KVBatchJob.h:72
void SetSubmitted(KVDatime &m)
Definition: KVBatchJob.h:44
void SetJobID(Int_t n)
Definition: KVBatchJob.h:52
virtual void UpdateDiskUsedEventsRead()
Definition: KVBatchJob.cpp:53
const Char_t * GetStatus() const
Definition: KVBatchJob.h:32
void SetMemUsed(const Char_t *m)
Definition: KVBatchJob.h:64
Int_t GetJobID() const
Definition: KVBatchJob.h:48
void SetMemMax(const Char_t *m)
Definition: KVBatchJob.h:80
void SetCPUusage(Int_t m)
Definition: KVBatchJob.h:56
void SetDiskMax(const Char_t *m)
Definition: KVBatchJob.h:88
void SetStatus(const Char_t *s)
Definition: KVBatchJob.h:36
Base class for interface to a batch job management system.
Definition: KVBatchSystem.h:78
virtual void WriteBatchEnvFile(TEnv *)
void Clear(Option_t *opt="") override
virtual void ChangeDefJobOpt(KVDataAnalyser *da)
virtual void ReadBatchEnvFile(TEnv *)
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
void Print(Option_t *="") const override
virtual void GetBatchSystemParameterList(KVNameValueList &)
virtual Bool_t CheckJobParameters()
Checks the job and ask for the job name if needed.
Manager class which sets up and runs data analysis tasks.
virtual KVString GetRootDirectoryOfDataToAnalyse() const
KVDataAnalysisTask * GetAnalysisTask() const
Pilots user analysis of experimental data.
void SetRuns(const run_index_list &nl, Bool_t check=kTRUE)
const run_index_list & GetRunList() const
void SetFullRunList(const run_index_list &nl)
Extension of TDatime to handle various useful date formats.
Definition: KVDatime.h:33
Job handled by Grid Engine batch system at CC-IN2P3.
Definition: KVGEBatchJob.h:16
void SetResources(TString r)
Definition: KVGEBatchJob.h:25
Extended TList class which owns its objects by default.
Definition: KVList.h:28
Handles lists of named parameters with different types, a list of KVNamedParameter objects.
Int_t GetIntValue(const Char_t *name) const
void SetValue(const Char_t *name, value_type value)
Bool_t GetBoolValue(const Char_t *name) const
const Char_t * GetStringValue(const Char_t *name) const
TString GetTStringValue(const Char_t *name) const
Strings used to represent a set of ranges of values.
Definition: KVNumberList.h:85
Bool_t End(void) const
Definition: KVNumberList.h:199
Int_t GetNValues() const
void Begin(void) const
Int_t Next(void) const
void Add(TObject *obj) override
Class piloting analyses of simulated data.
void SetFileList(TList *l) override
Int_t GetNumberOfFilesToAnalyse() const override
TList * GetFileList() const
Extension of ROOT TString class which allows backwards compatibility with ROOT v3....
Definition: KVString.h:73
Int_t GetNValues(TString delim) const
Definition: KVString.cpp:886
Interface to CCIN2P3 Grid Engine batch job management system.
Definition: KV_CCIN2P3_GE.h:15
void Run() override
void SetSendMailOnJobEnd()
add option to send mail when job ends
const Char_t * GetJobTime(void) const
returns the parameter string corresponding to the job CPU time
void SetJobMemory(const Char_t *h="")
Bool_t CheckJobParameters() override
Checks the job and asks for any missing parameters.
void ChooseJobDisk(void)
void GetBatchSystemParameterList(KVNameValueList &) override
void SetJobDisk(const Char_t *h="")
void PrintJobs(Option_t *opt="") override
Print list of owner's jobs.
KVList * GetListOfJobs() override
void SanitizeJobName() const override
void SetSendMailAddress(const char *)
set email address for notifications
void Clear(Option_t *opt="") override
Clear previously set parameters in order to create a new job submission command.
const Char_t * GetJobDisk(void) const
returns the parameter string corresponding to the job Disk
void ChooseJobMemory(void)
void ChooseJobTime(void)
void ReadBatchEnvFile(TEnv *) override
void SetJobTime(const Char_t *h="")
const Char_t * GetJobMemory(void) const
returns the parameter string corresponding to the job Memory
void Print(Option_t *="") const override
void SetSendMailOnJobStart()
add option to send mail when job starts
void ChangeDefJobOpt(KVDataAnalyser *) override
void SetBatchSystemParameters(const KVNameValueList &) override
Use the parameters in the list to set all relevant parameters for batch system.
void WriteBatchEnvFile(TEnv *) override
void ls(Option_t *option="") const override
virtual Int_t GetEntries() const
virtual const char * GetValue(const char *name, const char *dflt) const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
void Clear(Option_t *option="") override
void Add(TObject *obj) override
const char * GetName() const override
virtual void SetName(const char *name)
Int_t GetEntries() const override
TString & String()
Ssiz_t Length() const
std::istream & ReadToDelim(std::istream &str, char delim='\n')
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Prepend(char c, Ssiz_t rep=1)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
TString & ReplaceAll(const char *s1, const char *s2)
virtual TString GetFromPipe(const char *command)
virtual const char * WorkingDirectory()
void Info(const char *location, const char *fmt,...)
UInt_t GetListOfJobs(TFile *file, TList &jobdirs)
const char * String
ClassImp(TPyArg)