KaliVeda
Toolkit for HIC analysis
Loading...
Searching...
No Matches
KV_CCIN2P3_GE.cpp
1//Created by KVClassFactory on Wed Apr 27 15:43:08 CEST 2011
2//Author: John Frankland
3
4#include "KV_CCIN2P3_GE.h"
5#include "TObjString.h"
6#include "TSystem.h"
7#include "TEnv.h"
8#include "KVDataAnalyser.h"
9#include "KVDataAnalysisTask.h"
10#include "KVGEBatchJob.h"
11#include "KVDataRepository.h"
12#include "KVDataSetAnalyser.h"
13#include "KVSimDirAnalyser.h"
14
15using namespace std;
16
18
19
20
23
25 : KVBatchSystem(name), fMultiJobs(kTRUE)
26{
27 //Default constructor
28 //Sets default job time, memory and disk space as defined in $KVROOT/KVFiles/.kvrootrc
29
30 fDefJobTime = gEnv->GetValue("GE.BatchSystem.DefaultJobTime", "5:00");
31 fDefJobMem = gEnv->GetValue("GE.BatchSystem.DefaultJobMemory", "2G");
32 fDefJobDisk = gEnv->GetValue("GE.BatchSystem.DefaultJobDisk", "50M");
33 fTimeSet = fDiskSet = fMemSet = kFALSE;
34 //default number of runs per job in multi jobs mode (default=1)
35 SetRunsPerJob(gEnv->GetValue("GE.BatchSystem.RunsPerJob", 1));
36}
37
38
39
40
43
45{
46 //Clear previously set parameters in order to create a new job submission command
50}
51
52
53
54
57
59{
60 //Destructor
61}
62
63
64
68
70{
71 //Set CPU time for batch job.
72 // SetJobTime() => use default time
73 KVString tmp(time);
74 if (tmp == "") tmp = fDefJobTime;
75 //time given as either "hh:mm:ss" or "ss" but NOT "mm:ss"!
76 if (tmp.GetNValues(":") == 2) tmp.Prepend("0:");
77 fParList.SetValue("-l ct=", tmp);
79}
80
81
82
87
89{
90 //Set maximum memory used by job.
91 //Include units in string, i.e. "100M", "1G" etc.
92 //If mem="", use default value
93 KVString tmp(mem);
94 if (tmp == "") tmp = fDefJobMem;
95 fParList.SetValue("-l vmem=", tmp);
96 fMemSet = kTRUE;
97}
98
99
100
105
107{
108 //Set maximum disk space used by job.
109 //Include units in string, i.e. "100M", "1G" etc.
110 //If diks="", use default value
111 KVString tmp(diks);
112 if (tmp == "") tmp = fDefJobDisk;
113 fParList.SetValue("-l fsize=", tmp);
114 fDiskSet = kTRUE;
115}
116
117
118
121
123{
124 //Print list of owner's jobs.
125 KVList* j = GetListOfJobs();
126 j->ls();
127 delete j;
128}
129
130
131
134
136{
137 // Checks the job and asks for any missing parameters
138
140
141 if (!fTimeSet) ChooseJobTime();
142
143 if (!fDiskSet) ChooseJobDisk();
144
145 if (!fMemSet) ChooseJobMemory();
146
147 return kTRUE;
148}
149
150
151
153
155{
156 KVString tmp = "";
157 cout << "Enter max CPU time per job (ss/mn:ss/hh:mn:ss) ["
158 << fDefJobTime << "] : ";
159 cout.flush();
160 tmp.ReadToDelim(cin);
161 if (!tmp.Length()) {
162 SetJobTime();
163 return;
164 }
165 else
166 SetJobTime(tmp);
167}
168
169
170
172
174{
175 KVString tmp = "";
176 cout << "Enter max memory per job (xKB/xMB/xGB) ["
177 << fDefJobMem.Data() << "] : ";
178 cout.flush();
179 tmp.ReadToDelim(cin);
180 SetJobMemory(tmp.Data());
181}
182
183
184
186
188{
189 KVString tmp = "";
190 cout << "Enter max scratch disk per job (xKB/xMB/xGB) ["
191 << fDefJobDisk.Data() << "] : ";
192 cout.flush();
193 tmp.ReadToDelim(cin);
194 SetJobDisk(tmp.Data());
195}
196
197
198
201
203{
204// returns the parameter string corresponding to the job CPU time
205 return fParList.GetStringValue("-l ct=");
206}
207
208
209
212
214{
215// returns the parameter string corresponding to the job Memory
216 return fParList.GetStringValue("-l vmem=");
217}
218
219
220
223
225{
226// returns the parameter string corresponding to the job Disk
227 return fParList.GetStringValue("-l fsize=");
228}
229
230
231
232
236
238{
239 //Store any useful information on batch system in the TEnv
240 //(this method is used by KVDataAnalyser::WriteBatchEnvFile)
242 env->SetValue("BatchSystem.MultiJobs", MultiJobsMode());
243 if (MultiJobsMode()) env->SetValue("BatchSystem.CurrentRunList", fCurrJobRunList.AsString());
244 env->SetValue("BatchSystem.Time", GetJobTime());
245 env->SetValue("BatchSystem.Memory", GetJobMemory());
246 env->SetValue("BatchSystem.Disk", GetJobDisk());
247 // if analysis of simulated data is being used, we copy the files to analyse to the
248 // scratch disk of the batch job (make sure enough disk space is requested)
249 env->SetValue("SimDirAnalyser.CopyFilesToWorkingDirectory", true);
250}
251
252
253
254
258
260{
261 //Read any useful information on batch system from the TEnv
262 //(this method is used by KVDataAnalyser::ReadBatchEnvFile)
264 SetMultiJobsMode(env->GetValue("BatchSystem.MultiJobs", kFALSE));
265 if (MultiJobsMode()) fCurrJobRunList.SetList(env->GetValue("BatchSystem.CurrentRunList", ""));
266 SetJobTime(env->GetValue("BatchSystem.Time", ""));
267 SetJobMemory(env->GetValue("BatchSystem.Memory", ""));
268 SetJobDisk(env->GetValue("BatchSystem.Disk", ""));
269}
270
271
272
273
277
279{
280 //if option="log", print infos for batch log file
281 //if option="all", print detailed info on batch system
282 if (!strcmp(option, "log")) {
284 cout << "* DISK_REQ: " << GetJobDisk() << " *" << endl;
285 cout << "* MEM_REQ: " << GetJobMemory() << " *" << endl;
286 }
287 else
289}
290
291
292
293
307
309{
310 // PRIVATE method called by SubmitTask() at moment of job submission.
311 // Depending on the current environment, the default job submission options
312 // may be changed by this method.
313 //
314 // This method overrides and augments KVBatchSystem::ChangeDefJobOpt (which
315 // changes the options as a function of the type of analysis task).
316 // Here we add the CCIN2P3-specific case where the job is launched from a directory
317 // on the /sps/ semi-permanent storage facility, or if the data being analysed is
318 // stored in a repository on /sps/. In this case we need to add
319 // the option '-l u_sps_indra' to the 'qsub' command (if not already in the
320 // default job options)
321 //
323 KVString taskname = da->GetAnalysisTask()->GetName();
325 Bool_t repIsSPS = rootdir.BeginsWith("/sps/");
326
328 KVString oldoptions(GetDefaultJobOptions());
329
330 if (!oldoptions.Contains("sps")) {
331 Bool_t NeedToAddSPS = wrkdir.Contains("/sps/");
332 if ((NeedToAddSPS || repIsSPS)) {
333 oldoptions += " -l sps=1";
334 SetDefaultJobOptions(oldoptions.Data());
335 Info("ChangeDefJobOpt",
336 "Your job is being launched from /sps/... zone.\nTherefore the ressource 'sps' has been declared and the number of jobs which can be treated concurrently will be limited.");
337 }
338 }
339}
340
341
342
343
350
352{
353 // Batch-system dependent sanitization of jobnames
354 // Grid Engine does not allow:
355 // :
356 // Any such character appearing in the current jobname will be replaced
357 // with '_'
358
359 fCurrJobName.ReplaceAll(":", "_");
360}
361
362
363
367
369{
370 // Create and fill list with KVBatchJob objects describing current jobs
371 // Delete list after use
372
373 KVList* list_of_jobs = new KVList;
374
375 // use qstat -r to get list of job ids and jobnames
376 TString reply = gSystem->GetFromPipe("qstat -r");
377
378 TObjArray* lines = reply.Tokenize("\n");
379 Int_t nlines = lines->GetEntries();
380 for (Int_t line_number = 0; line_number < nlines; line_number++) {
381 TString thisLine = ((TObjString*)(*lines)[line_number])->String();
382 if (thisLine.Contains("Full jobname:")) {
383 // previous line contains job-id and status
384 TString lastLine = ((TObjString*)(*lines)[line_number - 1])->String();
385 TObjArray* bits = lastLine.Tokenize(" ");
386 Int_t jobid = ((TObjString*)(*bits)[0])->String().Atoi();
387 TString status = ((TObjString*)(*bits)[4])->String();
388 // date & time jobs started (running job) or submitted (queued job)
389 TString sdate = ((TObjString*)(*bits)[5])->String();// mm/dd/yyyy
390 TString stime = ((TObjString*)(*bits)[6])->String();// hh:mm:ss
391 Int_t dd, MM, yyyy, hh, mm, ss;
392 sscanf(sdate.Data(), "%d/%d/%d", &MM, &dd, &yyyy);
393 sscanf(stime.Data(), "%d:%d:%d", &hh, &mm, &ss);
394 KVDatime submitted(yyyy, MM, dd, hh, mm, ss);
395 delete bits;
396 bits = thisLine.Tokenize(": ");
397 TString jobname = ((TObjString*)(*bits)[2])->String();
398 delete bits;
399
400 KVGEBatchJob* job = new KVGEBatchJob();
401 job->SetName(jobname);
402 job->SetJobID(jobid);
403 job->SetStatus(status);
404 job->SetSubmitted(submitted);
405 list_of_jobs->Add(job);
406 }
407 }
408 delete lines;
409
410 if (!list_of_jobs->GetEntries()) return list_of_jobs;
411
412 // use qstat -j [jobid] to get cpu and memory used and also the resource requests
413 TIter next_job(list_of_jobs);
414 KVGEBatchJob* job;
415 while ((job = (KVGEBatchJob*)next_job())) {
416
417 // for running jobs, read in from [jobname].status file
418 // the number of events read/to read, disk used
419 if (!strcmp(job->GetStatus(), "r")) job->UpdateDiskUsedEventsRead();
420
421 reply = gSystem->GetFromPipe(Form("qstat -j %d", job->GetJobID()));
422 lines = reply.Tokenize("\n");
423 nlines = lines->GetEntries();
424 for (Int_t line_number = 0; line_number < nlines; line_number++) {
425 TString thisLine = ((TObjString*)(*lines)[line_number])->String();
426 if (thisLine.BeginsWith("usage")) {
427 TObjArray* bits = thisLine.Tokenize("=,");
428 TString stime = ((TObjString*)(*bits)[1])->String();// hh:mm:ss or d:hh:mm:ss
429 Int_t dd, hh, mm, ss;
430 TObjArray* tmp = stime.Tokenize(":");
431 dd = 0;
432 if (tmp->GetEntries() == 4) sscanf(stime.Data(), "%d:%2d:%2d:%2d", &dd, &hh, &mm, &ss);
433 else sscanf(stime.Data(), "%2d:%2d:%2d", &hh, &mm, &ss);
434 delete tmp;
435 job->SetCPUusage((dd * 24 + hh) * 3600 + mm * 60 + ss);
436 TString smem = ((TObjString*)(*bits)[7])->String();// xxx.xxxxM
437 job->SetMemUsed(smem);
438 delete bits;
439 }
440 else if (thisLine.BeginsWith("hard resource_list:")) {
441 TObjArray* bits = thisLine.Tokenize(": ");
442 TString res = ((TObjString*)(*bits)[2])->String();//os=sl5,xrootd=1,irods=1,s_vmem=1024M,s_fsize=50M,s_cpu=36000
443 res.ReplaceAll("s_vmem", "vmem");
444 res.ReplaceAll("s_fsize", "fsize");
445 res.ReplaceAll("s_cpu", "ct");
446 job->SetResources(res);
447 TObjArray* bbits = res.Tokenize(",");
448 TIter next_res(bbits);
449 TObjString* ss;
450 while ((ss = (TObjString*)next_res())) {
451 TString g = ss->String();
452 if (g.BeginsWith("ct=")) {
453 g.Remove(0, 3);
454 job->SetCPUmax(g.Atoi());
455 }
456 else if (g.BeginsWith("vmem=")) {
457 g.Remove(0, 5);
458 job->SetMemMax(g);
459 }
460 else if (g.BeginsWith("fsize=")) {
461 g.Remove(0, 6);
462 job->SetDiskMax(g);
463 }
464 }
465 delete bits;
466 delete bbits;
467 }
468 }
469 delete lines;
470 //}
471 }
472
473 return list_of_jobs;
474}
475
476
477
480
482{
483 // add option to send mail when job starts
484 fParList.SetValue("-m b", "");
485}
486
487
488
491
493{
494 // add option to send mail when job ends
495 fParList.SetValue("-m e", "");
496}
497
498
499
502
504{
505 // set email address for notifications
506 fParList.SetValue("-M ", email);
507}
508
509
510
515
517{
518 //Processes the job requests for the batch system.
519 //In normal mode, this submits one job for the data analyser fAnalyser
520 //In multijobs mode, this submits one job for each run in the runlist associated to fAnalyser
521
522 if (!CheckJobParameters()) return;
523
524 if (MultiJobsMode()) {
525 if (fAnalyser->InheritsFrom("KVDataSetAnalyser")) {
526 //submit jobs for every GetRunsPerJob() runs in runlist
527 KVDataSetAnalyser* ana = dynamic_cast<KVDataSetAnalyser*>(fAnalyser);
528 KVNumberList runs = ana->GetRunList();
529 runs.Begin();
530 Int_t remaining_runs = runs.GetNValues();
532 while (remaining_runs && !runs.End()) {
533 Int_t run = runs.Next();
534 remaining_runs--;
535 fCurrJobRunList.Add(run);
536 if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || runs.End()) {
537 // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
539 ana->SetFullRunList(runs);
540 SubmitJob();
542 }
543 }
544 ana->SetRuns(runs, kFALSE);
545 }
546 else if (fAnalyser->InheritsFrom("KVSimDirAnalyser")) {
547 // here we understand "run" to mean "file"
548 KVSimDirAnalyser* ana = dynamic_cast<KVSimDirAnalyser*>(fAnalyser);
549 TList* file_list = ana->GetFileList();
550 Int_t remaining_runs = ana->GetNumberOfFilesToAnalyse();
552 TList cur_file_list;
553 TObject* of;
554 TIter it(file_list);
555 Int_t file_no = 1;
556 while ((of = it())) {
557 cur_file_list.Add(of);
558 fCurrJobRunList.Add(file_no);
559 remaining_runs--;
560 file_no++;
561 if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || (remaining_runs == 0)) {
562 // submit job for GetRunsPerJob() files (or less if we have reached end of list)
563 ana->SetFileList(&cur_file_list);
564 SubmitJob();
566 cur_file_list.Clear();
567 }
568 }
569 ana->SetFileList(file_list);
570 }
571 }
572 else {
573 SubmitJob();
574 }
575
576}
577
578
579
593
595{
596 // Fill the list with all relevant parameters for batch system,
597 // set to their default values.
598 //
599 // Parameters defined here are:
600 // JobTime [string]
601 // JobMemory [string]
602 // JobDisk [string]
603 // MultiJobsMode [bool]
604 // RunsPerJob [int]
605 // EMailOnStart [bool]
606 // EMailOnEnd [bool]
607 // EMailAddress [string]
608
610 nl.SetValue("JobTime", fDefJobTime);
611 nl.SetValue("JobMemory", fDefJobMem);
612 nl.SetValue("JobDisk", fDefJobDisk);
613 nl.SetValue("MultiJobsMode", MultiJobsMode());
614 nl.SetValue("RunsPerJob", fRunsPerJob);
615 nl.SetValue("EMailOnStart", kFALSE);
616 nl.SetValue("EMailOnEnd", kFALSE);
617 nl.SetValue("EMailAddress", "");
618}
619
620
621
624
626{
627 // Use the parameters in the list to set all relevant parameters for batch system.
628
630 SetJobTime(nl.GetStringValue("JobTime"));
631 SetJobMemory(nl.GetStringValue("JobMemory"));
632 SetJobDisk(nl.GetStringValue("JobDisk"));
633 SetMultiJobsMode(nl.GetBoolValue("MultiJobsMode"));
634 SetRunsPerJob(nl.GetIntValue("RunsPerJob"));
635 if (nl.GetTStringValue("EMailAddress") != "") {
636 if (nl.GetBoolValue("EMailOnStart")) SetSendMailOnJobStart();
637 if (nl.GetBoolValue("EMailOnEnd")) SetSendMailOnJobEnd();
638 SetSendMailAddress(nl.GetStringValue("EMailAddress"));
639 }
640}
641
642
int Int_t
bool Bool_t
char Char_t
constexpr Bool_t kFALSE
constexpr Bool_t kTRUE
const char Option_t
R__EXTERN TEnv * gEnv
const char rootdir[]
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t g
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
void SetCPUmax(Int_t c)
Definition KVBatchJob.h:72
void SetSubmitted(KVDatime &m)
Definition KVBatchJob.h:44
void SetJobID(Int_t n)
Definition KVBatchJob.h:52
virtual void UpdateDiskUsedEventsRead()
void SetMemUsed(const Char_t *m)
Definition KVBatchJob.h:64
Int_t GetJobID() const
Definition KVBatchJob.h:48
void SetMemMax(const Char_t *m)
Definition KVBatchJob.h:80
const Char_t * GetStatus() const
Definition KVBatchJob.h:32
void SetCPUusage(Int_t m)
Definition KVBatchJob.h:56
void SetDiskMax(const Char_t *m)
Definition KVBatchJob.h:88
void SetStatus(const Char_t *s)
Definition KVBatchJob.h:36
Base class for interface to a batch job management system.
virtual void WriteBatchEnvFile(TEnv *)
virtual const Char_t * GetDefaultJobOptions() const
virtual void SetDefaultJobOptions(const Char_t *opt)
virtual void SubmitJob()
KVNameValueList fParList
list of parameters/switches to be passed on job submission command line
virtual void Print(Option_t *="") const
virtual void ChangeDefJobOpt(KVDataAnalyser *da)
virtual void ReadBatchEnvFile(TEnv *)
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
KVDataAnalyser * fAnalyser
the analyser object which requests job submission, it has all details on the job
KVString fCurrJobName
name of current job being submitted
virtual void GetBatchSystemParameterList(KVNameValueList &)
virtual void Clear(Option_t *opt="")
KVNumberList fCurrJobRunList
runlist for (multi job mode) job being submitted
virtual Bool_t CheckJobParameters()
Checks the job and ask for the job name if needed.
Manager class which sets up and runs data analysis tasks.
KVDataAnalysisTask * GetAnalysisTask() const
virtual KVString GetRootDirectoryOfDataToAnalyse() const
Pilots user analysis of experimental data.
void SetFullRunList(const KVNumberList &nl)
const KVNumberList & GetRunList() const
void SetRuns(const KVNumberList &nl, Bool_t check=kTRUE)
Extension of TDatime to handle various useful date formats.
Definition KVDatime.h:33
Job handled by Grid Engine batch system at CC-IN2P3.
void SetResources(TString r)
Extended TList class which owns its objects by default.
Definition KVList.h:28
Handles lists of named parameters with different types, a list of KVNamedParameter objects.
Int_t GetIntValue(const Char_t *name) const
void SetValue(const Char_t *name, value_type value)
Bool_t GetBoolValue(const Char_t *name) const
const Char_t * GetStringValue(const Char_t *name) const
TString GetTStringValue(const Char_t *name) const
Strings used to represent a set of ranges of values.
const Char_t * AsString(Int_t maxchars=0) const
Bool_t End(void) const
Int_t GetNValues() const
void Begin(void) const
void SetList(const TString &)
void Add(Int_t)
Add value 'n' to the list.
void Clear(Option_t *="")
Empty number list, reset it to initial state.
Int_t Next(void) const
virtual void Add(TObject *obj)
Class piloting analyses of simulated data.
void SetFileList(TList *l) override
TList * GetFileList() const
Int_t GetNumberOfFilesToAnalyse() const override
Extension of ROOT TString class which allows backwards compatibility with ROOT v3....
Definition KVString.h:73
Int_t GetNValues(TString delim) const
Definition KVString.cpp:886
Interface to CCIN2P3 Grid Engine batch job management system.
virtual KVList * GetListOfJobs()
Bool_t fMultiJobs
set to kTRUE if several jobs are to be submitted for the runlist set in fAnalyser
virtual void GetBatchSystemParameterList(KVNameValueList &)
void SetSendMailOnJobEnd()
add option to send mail when job ends
Bool_t MultiJobsMode() const
const Char_t * GetJobTime(void) const
returns the parameter string corresponding to the job CPU time
Int_t fRunsPerJob
number of runs per job submitted in multi job mode (default=1)
void PrintJobs(Option_t *opt="")
Print list of owner's jobs.
void SetJobMemory(const Char_t *h="")
virtual ~KV_CCIN2P3_GE()
Destructor.
void ChooseJobDisk(void)
KVString fDefJobTime
default job length
void SetJobDisk(const Char_t *h="")
virtual void WriteBatchEnvFile(TEnv *)
virtual void SanitizeJobName() const
virtual void Print(Option_t *="") const
KVString fDefJobDisk
default job disk space (with units, e.g. "1G")
KVString fDefJobMem
default job memory allocation (with units, e.g. "512M")
virtual Bool_t CheckJobParameters()
Checks the job and asks for any missing parameters.
void SetSendMailAddress(const char *)
set email address for notifications
const Char_t * GetJobDisk(void) const
returns the parameter string corresponding to the job Disk
virtual void Clear(Option_t *opt="")
Clear previously set parameters in order to create a new job submission command.
void ChooseJobMemory(void)
void ChooseJobTime(void)
void SetJobTime(const Char_t *h="")
Int_t GetRunsPerJob() const
const Char_t * GetJobMemory(void) const
returns the parameter string corresponding to the job Memory
virtual void ChangeDefJobOpt(KVDataAnalyser *)
void SetRunsPerJob(Int_t n)
void SetSendMailOnJobStart()
add option to send mail when job starts
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
virtual void ReadBatchEnvFile(TEnv *)
void SetMultiJobsMode(Bool_t on=kTRUE)
void ls(Option_t *option="") const override
virtual Int_t GetEntries() const
virtual const char * GetValue(const char *name, const char *dflt) const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
void Clear(Option_t *option="") override
void Add(TObject *obj) override
const char * GetName() const override
virtual void SetName(const char *name)
Int_t GetEntries() const override
TString & String()
virtual Bool_t InheritsFrom(const char *classname) const
virtual void Info(const char *method, const char *msgfmt,...) const
Ssiz_t Length() const
std::istream & ReadToDelim(std::istream &str, char delim='\n')
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Prepend(char c, Ssiz_t rep=1)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
TString & ReplaceAll(const char *s1, const char *s2)
virtual TString GetFromPipe(const char *command)
virtual const char * WorkingDirectory()
const char * String
ClassImp(TPyArg)