KaliVeda
Toolkit for HIC analysis
KV_CCIN2P3_GE.cpp
1 //Created by KVClassFactory on Wed Apr 27 15:43:08 CEST 2011
2 //Author: John Frankland
3 
4 #include "KV_CCIN2P3_GE.h"
5 #include "TObjString.h"
6 #include "TSystem.h"
7 #include "TEnv.h"
8 #include "KVDataAnalyser.h"
9 #include "KVDataAnalysisTask.h"
10 #include "KVGEBatchJob.h"
11 #include "KVDataRepository.h"
12 #include "KVDataSetAnalyser.h"
13 #include "KVSimDirAnalyser.h"
14 
15 using namespace std;
16 
18 
19 
20 
25  : KVBatchSystem(name), fMultiJobs(kTRUE)
26 {
27  //Default constructor
28  //Sets default job time, memory and disk space as defined in $KVROOT/KVFiles/.kvrootrc
29 
30  fDefJobTime = gEnv->GetValue("GE.BatchSystem.DefaultJobTime", "5:00");
31  fDefJobMem = gEnv->GetValue("GE.BatchSystem.DefaultJobMemory", "2G");
32  fDefJobDisk = gEnv->GetValue("GE.BatchSystem.DefaultJobDisk", "50M");
33  fTimeSet = fDiskSet = fMemSet = kFALSE;
34  //default number of runs per job in multi jobs mode (default=1)
35  SetRunsPerJob(gEnv->GetValue("GE.BatchSystem.RunsPerJob", 1));
36 }
37 
38 
39 
40 
43 
45 {
46  //Clear previously set parameters in order to create a new job submission command
48  fTimeSet = fDiskSet = fMemSet = kFALSE;
49  fMultiJobs = kTRUE;
50 }
51 
52 
53 
54 
57 
58 KV_CCIN2P3_GE::~KV_CCIN2P3_GE()
59 {
60  //Destructor
61 }
62 
63 
64 
68 
70 {
71  //Set CPU time for batch job.
72  // SetJobTime() => use default time
73  KVString tmp(time);
74  if (tmp == "") tmp = fDefJobTime;
75  //time given as either "hh:mm:ss" or "ss" but NOT "mm:ss"!
76  if (tmp.GetNValues(":") == 2) tmp.Prepend("0:");
77  fParList.SetValue("-l ct=", tmp);
78  fTimeSet = kTRUE;
79 }
80 
81 
82 
87 
89 {
90  //Set maximum memory used by job.
91  //Include units in string, i.e. "100M", "1G" etc.
92  //If mem="", use default value
93  KVString tmp(mem);
94  if (tmp == "") tmp = fDefJobMem;
95  fParList.SetValue("-l vmem=", tmp);
96  fMemSet = kTRUE;
97 }
98 
99 
100 
105 
107 {
108  //Set maximum disk space used by job.
109  //Include units in string, i.e. "100M", "1G" etc.
110  //If diks="", use default value
111  KVString tmp(diks);
112  if (tmp == "") tmp = fDefJobDisk;
113  fParList.SetValue("-l fsize=", tmp);
114  fDiskSet = kTRUE;
115 }
116 
117 
118 
121 
123 {
124  //Print list of owner's jobs.
125  KVList* j = GetListOfJobs();
126  j->ls();
127  delete j;
128 }
129 
130 
131 
134 
136 {
137  // Checks the job and asks for any missing parameters
138 
140 
141  if (!fTimeSet) ChooseJobTime();
142 
143  if (!fDiskSet) ChooseJobDisk();
144 
145  if (!fMemSet) ChooseJobMemory();
146 
147  return kTRUE;
148 }
149 
150 
151 
153 
155 {
156  KVString tmp = "";
157  cout << "Enter max CPU time per job (ss/mn:ss/hh:mn:ss) ["
158  << fDefJobTime << "] : ";
159  cout.flush();
160  tmp.ReadToDelim(cin);
161  if (!tmp.Length()) {
162  SetJobTime();
163  return;
164  }
165  else
166  SetJobTime(tmp);
167 }
168 
169 
170 
172 
174 {
175  KVString tmp = "";
176  cout << "Enter max memory per job (xKB/xMB/xGB) ["
177  << fDefJobMem.Data() << "] : ";
178  cout.flush();
179  tmp.ReadToDelim(cin);
180  SetJobMemory(tmp.Data());
181 }
182 
183 
184 
186 
188 {
189  KVString tmp = "";
190  cout << "Enter max scratch disk per job (xKB/xMB/xGB) ["
191  << fDefJobDisk.Data() << "] : ";
192  cout.flush();
193  tmp.ReadToDelim(cin);
194  SetJobDisk(tmp.Data());
195 }
196 
197 
198 
201 
203 {
204 // returns the parameter string corresponding to the job CPU time
205  return fParList.GetStringValue("-l ct=");
206 }
207 
208 
209 
212 
214 {
215 // returns the parameter string corresponding to the job Memory
216  return fParList.GetStringValue("-l vmem=");
217 }
218 
219 
220 
223 
225 {
226 // returns the parameter string corresponding to the job Disk
227  return fParList.GetStringValue("-l fsize=");
228 }
229 
230 
231 
232 
236 
238 {
239  //Store any useful information on batch system in the TEnv
240  //(this method is used by KVDataAnalyser::WriteBatchEnvFile)
242  env->SetValue("BatchSystem.MultiJobs", MultiJobsMode());
243  if (MultiJobsMode()) env->SetValue("BatchSystem.CurrentRunList", fCurrJobRunList.AsString());
244  env->SetValue("BatchSystem.Time", GetJobTime());
245  env->SetValue("BatchSystem.Memory", GetJobMemory());
246  env->SetValue("BatchSystem.Disk", GetJobDisk());
247  // if analysis of simulated data is being used, we copy the files to analyse to the
248  // scratch disk of the batch job (make sure enough disk space is requested)
249  env->SetValue("SimDirAnalyser.CopyFilesToWorkingDirectory", true);
250 }
251 
252 
253 
254 
258 
260 {
261  //Read any useful information on batch system from the TEnv
262  //(this method is used by KVDataAnalyser::ReadBatchEnvFile)
264  SetMultiJobsMode(env->GetValue("BatchSystem.MultiJobs", kFALSE));
265  if (MultiJobsMode()) fCurrJobRunList.SetList(env->GetValue("BatchSystem.CurrentRunList", ""));
266  SetJobTime(env->GetValue("BatchSystem.Time", ""));
267  SetJobMemory(env->GetValue("BatchSystem.Memory", ""));
268  SetJobDisk(env->GetValue("BatchSystem.Disk", ""));
269 }
270 
271 
272 
273 
277 
278 void KV_CCIN2P3_GE::Print(Option_t* option) const
279 {
280  //if option="log", print infos for batch log file
281  //if option="all", print detailed info on batch system
282  if (!strcmp(option, "log")) {
284  cout << "* DISK_REQ: " << GetJobDisk() << " *" << endl;
285  cout << "* MEM_REQ: " << GetJobMemory() << " *" << endl;
286  }
287  else
289 }
290 
291 
292 
293 
307 
309 {
310  // PRIVATE method called by SubmitTask() at moment of job submission.
311  // Depending on the current environment, the default job submission options
312  // may be changed by this method.
313  //
314  // This method overrides and augments KVBatchSystem::ChangeDefJobOpt (which
315  // changes the options as a function of the type of analysis task).
316  // Here we add the CCIN2P3-specific case where the job is launched from a directory
317  // on the /sps/ semi-permanent storage facility, or if the data being analysed is
318  // stored in a repository on /sps/. In this case we need to add
319  // the option '-l u_sps_indra' to the 'qsub' command (if not already in the
320  // default job options)
321  //
323  KVString taskname = da->GetAnalysisTask()->GetName();
325  Bool_t repIsSPS = rootdir.BeginsWith("/sps/");
326 
327  KVString wrkdir(gSystem->WorkingDirectory());
328  KVString oldoptions(GetDefaultJobOptions());
329 
330  if (!oldoptions.Contains("sps")) {
331  Bool_t NeedToAddSPS = wrkdir.Contains("/sps/");
332  if ((NeedToAddSPS || repIsSPS)) {
333  oldoptions += " -l sps=1";
334  SetDefaultJobOptions(oldoptions.Data());
335  Info("ChangeDefJobOpt",
336  "Your job is being launched from /sps/... zone.\nTherefore the ressource 'sps' has been declared and the number of jobs which can be treated concurrently will be limited.");
337  }
338  }
339 }
340 
341 
342 
343 
350 
352 {
353  // Batch-system dependent sanitization of jobnames
354  // Grid Engine does not allow:
355  // :
356  // Any such character appearing in the current jobname will be replaced
357  // with '_'
358 
359  fCurrJobName.ReplaceAll(":", "_");
360 }
361 
362 
363 
367 
369 {
370  // Create and fill list with KVBatchJob objects describing current jobs
371  // Delete list after use
372 
373  KVList* list_of_jobs = new KVList;
374 
375  // use qstat -r to get list of job ids and jobnames
376  TString reply = gSystem->GetFromPipe("qstat -r");
377 
378  TObjArray* lines = reply.Tokenize("\n");
379  Int_t nlines = lines->GetEntries();
380  for (Int_t line_number = 0; line_number < nlines; line_number++) {
381  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
382  if (thisLine.Contains("Full jobname:")) {
383  // previous line contains job-id and status
384  TString lastLine = ((TObjString*)(*lines)[line_number - 1])->String();
385  TObjArray* bits = lastLine.Tokenize(" ");
386  Int_t jobid = ((TObjString*)(*bits)[0])->String().Atoi();
387  TString status = ((TObjString*)(*bits)[4])->String();
388  // date & time jobs started (running job) or submitted (queued job)
389  TString sdate = ((TObjString*)(*bits)[5])->String();// mm/dd/yyyy
390  TString stime = ((TObjString*)(*bits)[6])->String();// hh:mm:ss
391  Int_t dd, MM, yyyy, hh, mm, ss;
392  sscanf(sdate.Data(), "%d/%d/%d", &MM, &dd, &yyyy);
393  sscanf(stime.Data(), "%d:%d:%d", &hh, &mm, &ss);
394  KVDatime submitted(yyyy, MM, dd, hh, mm, ss);
395  delete bits;
396  bits = thisLine.Tokenize(": ");
397  TString jobname = ((TObjString*)(*bits)[2])->String();
398  delete bits;
399 
400  KVGEBatchJob* job = new KVGEBatchJob();
401  job->SetName(jobname);
402  job->SetJobID(jobid);
403  job->SetStatus(status);
404  job->SetSubmitted(submitted);
405  list_of_jobs->Add(job);
406  }
407  }
408  delete lines;
409 
410  if (!list_of_jobs->GetEntries()) return list_of_jobs;
411 
412  // use qstat -j [jobid] to get cpu and memory used and also the resource requests
413  TIter next_job(list_of_jobs);
414  KVGEBatchJob* job;
415  while ((job = (KVGEBatchJob*)next_job())) {
416 
417  // for running jobs, read in from [jobname].status file
418  // the number of events read/to read, disk used
419  if (!strcmp(job->GetStatus(), "r")) job->UpdateDiskUsedEventsRead();
420 
421  reply = gSystem->GetFromPipe(Form("qstat -j %d", job->GetJobID()));
422  lines = reply.Tokenize("\n");
423  nlines = lines->GetEntries();
424  for (Int_t line_number = 0; line_number < nlines; line_number++) {
425  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
426  if (thisLine.BeginsWith("usage")) {
427  TObjArray* bits = thisLine.Tokenize("=,");
428  TString stime = ((TObjString*)(*bits)[1])->String();// hh:mm:ss or d:hh:mm:ss
429  Int_t dd, hh, mm, ss;
430  TObjArray* tmp = stime.Tokenize(":");
431  dd = 0;
432  if (tmp->GetEntries() == 4) sscanf(stime.Data(), "%d:%2d:%2d:%2d", &dd, &hh, &mm, &ss);
433  else sscanf(stime.Data(), "%2d:%2d:%2d", &hh, &mm, &ss);
434  delete tmp;
435  job->SetCPUusage((dd * 24 + hh) * 3600 + mm * 60 + ss);
436  TString smem = ((TObjString*)(*bits)[7])->String();// xxx.xxxxM
437  job->SetMemUsed(smem);
438  delete bits;
439  }
440  else if (thisLine.BeginsWith("hard resource_list:")) {
441  TObjArray* bits = thisLine.Tokenize(": ");
442  TString res = ((TObjString*)(*bits)[2])->String();//os=sl5,xrootd=1,irods=1,s_vmem=1024M,s_fsize=50M,s_cpu=36000
443  res.ReplaceAll("s_vmem", "vmem");
444  res.ReplaceAll("s_fsize", "fsize");
445  res.ReplaceAll("s_cpu", "ct");
446  job->SetResources(res);
447  TObjArray* bbits = res.Tokenize(",");
448  TIter next_res(bbits);
449  TObjString* ss;
450  while ((ss = (TObjString*)next_res())) {
451  TString g = ss->String();
452  if (g.BeginsWith("ct=")) {
453  g.Remove(0, 3);
454  job->SetCPUmax(g.Atoi());
455  }
456  else if (g.BeginsWith("vmem=")) {
457  g.Remove(0, 5);
458  job->SetMemMax(g);
459  }
460  else if (g.BeginsWith("fsize=")) {
461  g.Remove(0, 6);
462  job->SetDiskMax(g);
463  }
464  }
465  delete bits;
466  delete bbits;
467  }
468  }
469  delete lines;
470  //}
471  }
472 
473  return list_of_jobs;
474 }
475 
476 
477 
480 
482 {
483  // add option to send mail when job starts
484  fParList.SetValue("-m b", "");
485 }
486 
487 
488 
491 
493 {
494  // add option to send mail when job ends
495  fParList.SetValue("-m e", "");
496 }
497 
498 
499 
502 
503 void KV_CCIN2P3_GE::SetSendMailAddress(const char* email)
504 {
505  // set email address for notifications
506  fParList.SetValue("-M ", email);
507 }
508 
509 
510 
515 
517 {
518  //Processes the job requests for the batch system.
519  //In normal mode, this submits one job for the data analyser fAnalyser
520  //In multijobs mode, this submits one job for each run in the runlist associated to fAnalyser
521 
522  if (!CheckJobParameters()) return;
523 
524  if (MultiJobsMode()) {
525  if (fAnalyser->InheritsFrom("KVDataSetAnalyser")) {
526  //submit jobs for every GetRunsPerJob() runs in runlist
527  KVDataSetAnalyser* ana = dynamic_cast<KVDataSetAnalyser*>(fAnalyser);
528  KVNumberList runs = ana->GetRunList();
529  runs.Begin();
530  Int_t remaining_runs = runs.GetNValues();
531  fCurrJobRunList.Clear();
532  while (remaining_runs && !runs.End()) {
533  Int_t run = runs.Next();
534  remaining_runs--;
535  fCurrJobRunList.Add(run);
536  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || runs.End()) {
537  // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
538  ana->SetRuns(fCurrJobRunList, kFALSE);
539  ana->SetFullRunList(runs);
540  SubmitJob();
541  fCurrJobRunList.Clear();
542  }
543  }
544  ana->SetRuns(runs, kFALSE);
545  }
546  else if (fAnalyser->InheritsFrom("KVSimDirAnalyser")) {
547  // here we understand "run" to mean "file"
548  KVSimDirAnalyser* ana = dynamic_cast<KVSimDirAnalyser*>(fAnalyser);
549  TList* file_list = ana->GetFileList();
550  Int_t remaining_runs = ana->GetNumberOfFilesToAnalyse();
551  fCurrJobRunList.Clear();
552  TList cur_file_list;
553  TObject* of;
554  TIter it(file_list);
555  Int_t file_no = 1;
556  while ((of = it())) {
557  cur_file_list.Add(of);
558  fCurrJobRunList.Add(file_no);
559  remaining_runs--;
560  file_no++;
561  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || (remaining_runs == 0)) {
562  // submit job for GetRunsPerJob() files (or less if we have reached end of list)
563  ana->SetFileList(&cur_file_list);
564  SubmitJob();
565  fCurrJobRunList.Clear();
566  cur_file_list.Clear();
567  }
568  }
569  ana->SetFileList(file_list);
570  }
571  }
572  else {
573  SubmitJob();
574  }
575 
576 }
577 
578 
579 
593 
595 {
596  // Fill the list with all relevant parameters for batch system,
597  // set to their default values.
598  //
599  // Parameters defined here are:
600  // JobTime [string]
601  // JobMemory [string]
602  // JobDisk [string]
603  // MultiJobsMode [bool]
604  // RunsPerJob [int]
605  // EMailOnStart [bool]
606  // EMailOnEnd [bool]
607  // EMailAddress [string]
608 
610  nl.SetValue("JobTime", fDefJobTime);
611  nl.SetValue("JobMemory", fDefJobMem);
612  nl.SetValue("JobDisk", fDefJobDisk);
613  nl.SetValue("MultiJobsMode", MultiJobsMode());
614  nl.SetValue("RunsPerJob", fRunsPerJob);
615  nl.SetValue("EMailOnStart", kFALSE);
616  nl.SetValue("EMailOnEnd", kFALSE);
617  nl.SetValue("EMailAddress", "");
618 }
619 
620 
621 
624 
626 {
627  // Use the parameters in the list to set all relevant parameters for batch system.
628 
630  SetJobTime(nl.GetStringValue("JobTime"));
631  SetJobMemory(nl.GetStringValue("JobMemory"));
632  SetJobDisk(nl.GetStringValue("JobDisk"));
633  SetMultiJobsMode(nl.GetBoolValue("MultiJobsMode"));
634  SetRunsPerJob(nl.GetIntValue("RunsPerJob"));
635  if (nl.GetTStringValue("EMailAddress") != "") {
636  if (nl.GetBoolValue("EMailOnStart")) SetSendMailOnJobStart();
637  if (nl.GetBoolValue("EMailOnEnd")) SetSendMailOnJobEnd();
638  SetSendMailAddress(nl.GetStringValue("EMailAddress"));
639  }
640 }
641 
642 
int Int_t
bool Bool_t
char Char_t
constexpr Bool_t kFALSE
constexpr Bool_t kTRUE
const char Option_t
R__EXTERN TEnv * gEnv
const char rootdir[]
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t g
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
void SetCPUmax(Int_t c)
Definition: KVBatchJob.h:72
void SetSubmitted(KVDatime &m)
Definition: KVBatchJob.h:44
void SetJobID(Int_t n)
Definition: KVBatchJob.h:52
virtual void UpdateDiskUsedEventsRead()
Definition: KVBatchJob.cpp:53
const Char_t * GetStatus() const
Definition: KVBatchJob.h:32
void SetMemUsed(const Char_t *m)
Definition: KVBatchJob.h:64
Int_t GetJobID() const
Definition: KVBatchJob.h:48
void SetMemMax(const Char_t *m)
Definition: KVBatchJob.h:80
void SetCPUusage(Int_t m)
Definition: KVBatchJob.h:56
void SetDiskMax(const Char_t *m)
Definition: KVBatchJob.h:88
void SetStatus(const Char_t *s)
Definition: KVBatchJob.h:36
Base class for interface to a batch job management system.
Definition: KVBatchSystem.h:78
virtual void WriteBatchEnvFile(TEnv *)
virtual void Print(Option_t *="") const
virtual void ChangeDefJobOpt(KVDataAnalyser *da)
virtual void ReadBatchEnvFile(TEnv *)
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
virtual void GetBatchSystemParameterList(KVNameValueList &)
virtual void Clear(Option_t *opt="")
virtual Bool_t CheckJobParameters()
Checks the job and ask for the job name if needed.
Manager class which sets up and runs data analysis tasks.
virtual KVString GetRootDirectoryOfDataToAnalyse() const
KVDataAnalysisTask * GetAnalysisTask() const
Pilots user analysis of experimental data.
void SetFullRunList(const KVNumberList &nl)
void SetRuns(const KVNumberList &nl, Bool_t check=kTRUE)
const KVNumberList & GetRunList() const
Extension of TDatime to handle various useful date formats.
Definition: KVDatime.h:33
Job handled by Grid Engine batch system at CC-IN2P3.
Definition: KVGEBatchJob.h:16
void SetResources(TString r)
Definition: KVGEBatchJob.h:25
Extended TList class which owns its objects by default.
Definition: KVList.h:28
Handles lists of named parameters with different types, a list of KVNamedParameter objects.
Int_t GetIntValue(const Char_t *name) const
void SetValue(const Char_t *name, value_type value)
Bool_t GetBoolValue(const Char_t *name) const
const Char_t * GetStringValue(const Char_t *name) const
TString GetTStringValue(const Char_t *name) const
Strings used to represent a set of ranges of values.
Definition: KVNumberList.h:85
Bool_t End(void) const
Definition: KVNumberList.h:199
Int_t GetNValues() const
void Begin(void) const
Int_t Next(void) const
virtual void Add(TObject *obj)
Class piloting analyses of simulated data.
void SetFileList(TList *l) override
Int_t GetNumberOfFilesToAnalyse() const override
TList * GetFileList() const
Extension of ROOT TString class which allows backwards compatibility with ROOT v3....
Definition: KVString.h:73
Int_t GetNValues(TString delim) const
Definition: KVString.cpp:886
Interface to CCIN2P3 Grid Engine batch job management system.
Definition: KV_CCIN2P3_GE.h:15
virtual KVList * GetListOfJobs()
virtual void GetBatchSystemParameterList(KVNameValueList &)
void SetSendMailOnJobEnd()
add option to send mail when job ends
const Char_t * GetJobTime(void) const
returns the parameter string corresponding to the job CPU time
void PrintJobs(Option_t *opt="")
Print list of owner's jobs.
void SetJobMemory(const Char_t *h="")
void ChooseJobDisk(void)
void SetJobDisk(const Char_t *h="")
virtual void WriteBatchEnvFile(TEnv *)
virtual void SanitizeJobName() const
virtual void Print(Option_t *="") const
virtual Bool_t CheckJobParameters()
Checks the job and asks for any missing parameters.
void SetSendMailAddress(const char *)
set email address for notifications
const Char_t * GetJobDisk(void) const
returns the parameter string corresponding to the job Disk
virtual void Clear(Option_t *opt="")
Clear previously set parameters in order to create a new job submission command.
void ChooseJobMemory(void)
void ChooseJobTime(void)
void SetJobTime(const Char_t *h="")
const Char_t * GetJobMemory(void) const
returns the parameter string corresponding to the job Memory
virtual void ChangeDefJobOpt(KVDataAnalyser *)
void SetSendMailOnJobStart()
add option to send mail when job starts
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
virtual void ReadBatchEnvFile(TEnv *)
void ls(Option_t *option="") const override
virtual Int_t GetEntries() const
virtual const char * GetValue(const char *name, const char *dflt) const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
void Clear(Option_t *option="") override
void Add(TObject *obj) override
const char * GetName() const override
virtual void SetName(const char *name)
Int_t GetEntries() const override
TString & String()
Ssiz_t Length() const
std::istream & ReadToDelim(std::istream &str, char delim='\n')
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Prepend(char c, Ssiz_t rep=1)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
TString & ReplaceAll(const char *s1, const char *s2)
virtual TString GetFromPipe(const char *command)
virtual const char * WorkingDirectory()
void Info(const char *location, const char *fmt,...)
UInt_t GetListOfJobs(TFile *file, TList &jobdirs)
const char * String
ClassImp(TPyArg)