KaliVeda
Toolkit for HIC analysis
KVOnlineReconDataAnalyser.cpp
1 //Created by KVClassFactory on Mon Apr 1 10:16:46 2019
2 //Author: eindra
3 
4 #include "KVOnlineReconDataAnalyser.h"
5 #include "KVMultiDetArray.h"
6 #include "KVDataSet.h"
7 #include "KVReconEventSelector.h"
8 
9 #ifdef WITH_DATAFLOW
10 #include "dataflowports.h"
11 
12 #include <KVZMQMessage.h>
13 #endif
14 
16 
17 
18 
19 
24 {
25  // Default constructor
26  SetStatusUpdateInterval(2.5);
27 }
28 
29 
30 
31 
36 
38 {
39  //Run the online analysis
40 
41  //make the chosen dataset the active dataset ( = gDataSet; note this also opens database
42  //and positions gDataBase & gExpDB).
43  GetDataSet()->cd();
44 
46 
48 
49  if (!fSelector || !fSelector->InheritsFrom("TSelector")) {
50  std::cout << "The selector \"" << GetUserClass() << "\" is not valid." << std::endl;
51  std::cout << "Process aborted." << std::endl;
53  return;
54  }
55 
58 
61  if (!fSelector->IsOptGiven("dataflowhost")) {
62  Warning("SubmitTask", "Did you forget to give option 'dataflowhost' in InitAnalysis()?");
63  }
65 
66  preInitRun();
67  fSelector->InitRun();
68  postInitRun();
69 
70  zmq::context_t context(1); // for ZeroMQ communications
71  std::string zmq_spy_port = Form("tcp://%s:%d", fSelector->GetOpt("dataflowhost").Data(), PORT_EVENT_PUB);
72  zmq::socket_t pub(context, ZMQ_SUB);
73  int timeout = 500; //milliseconds
74  pub.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int));
75  try {
76  pub.connect(zmq_spy_port.c_str());
77  }
78  catch (zmq::error_t& e) {
79  Error("SubmitTask", "failed to connect socket: %s", e.what());
80  }
81 
82  std::cout << "Connected to EventPublisher " << zmq_spy_port << std::endl;
83  pub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
84 
85  zmq::message_t event;
86 
87  int nev = 0;
88  int nevt = 0;
89  TDatime fTimeStamp;
90  Double_t fStartTime = fTimeStamp.Convert();
91  fUpdate = false;
92 
93  while (1) {
94 
95  if (pub.recv(&event)) {
96 
97  KVZMQMessage mess_event(event);
99  if ((event = mess_event.GetObject<KVReconstructedEvent>())) {
100  ++nev;
101  ++nevt;
103  preAnalysis();
104  if (!fSelector->Analysis()) break;
105  postAnalysis();
106  }
107  }
108  fTimeStamp.Set();
109  Double_t time = fTimeStamp.Convert();
110  if ((time - fStartTime) >= GetStatusUpdateInterval()) {
111  std::cout << "~" << (int)(nev / (time - fStartTime)) << " events/s. tot = " << nevt << std::endl;
112  fStartTime = time;
113  nev = 0;
114  fUpdate = true;
115  }
116 
117  }
118  fSelector->EndRun();
119 
121  delete fSelector;
122 }
123 
124 
125 
126 
128 
130 {
131  if (!gMultiDetArray) KVMultiDetArray::MakeMultiDetector(GetDataSet()->GetName());
132 }
133 
134 
135 
138 
140 {
141  // Set minimum (trigger) multiplicity for array
142 
143  gMultiDetArray->SetMinimumOKMultiplicity(fSelector->GetEvent());
144 }
145 
146 
147 
155 
157 {
158  // This method returns true every fStatusUpdateInterval seconds, according to the check performed in SubmitTask
159  // You can change this value in your analysis class by calling
160  //
161  // gDataAnalyser->SetStatusUpdateInterval(...);
162  //
163  // with the required value in seconds before the processing loop starts
164 
165  if (fUpdate) {
166  fUpdate = false;
167  return true;
168  }
169  return false;
170 }
171 
172 
173 
int Int_t
#define SafeDelete(p)
#define e(i)
bool Bool_t
double Double_t
Option_t Option_t option
char * Form(const char *fmt,...)
Double_t GetStatusUpdateInterval() const
virtual void postAnalysis()
const KVString & GetUserClassOptions() const
const Char_t * GetUserClass()
virtual void postInitRun()
virtual void postInitAnalysis()
TObject * GetInstanceOfUserClass(const KVString &alternative_base_class="")
const KVDataSet * GetDataSet() const
void cd() const
Definition: KVDataSet.cpp:745
virtual void InitAnalysis()
virtual void EndRun()
virtual Bool_t Analysis()
Bool_t IsOptGiven(const Char_t *option)
TString GetOpt(const Char_t *option) const
virtual void EndAnalysis()
virtual void ParseOptions()
void SetEvent(KVEvent *e)
virtual void InitRun()
static KVMultiDetArray * MakeMultiDetector(const Char_t *dataset_name, Int_t run=-1, TString classname="KVMultiDetArray")
Online analysis of reconstructed data.
Bool_t CheckStatusUpdateInterval(Int_t) const
void preAnalysis() override
Set minimum (trigger) multiplicity for array.
Manages user analysis of reconstructed experimental data.
KVReconEventSelector * fSelector
the data analysis class
Base class for user analysis of reconstructed data.
KVReconstructedEvent * GetEvent() const
Event containing KVReconstructedNucleus nuclei reconstructed from hits in detectors.
Allow to send/receive ROOT/KV objects between ZeroMQ sockets.
Definition: KVZMQMessage.h:46
TObject * GetObject()
Definition: KVZMQMessage.h:82
void Set()
UInt_t Convert(Bool_t toGMT=kFALSE) const
virtual const char * GetName() const
virtual void Warning(const char *method, const char *msgfmt,...) const
virtual Bool_t InheritsFrom(const char *classname) const
virtual void Error(const char *method, const char *msgfmt,...) const
virtual void SetOption(const char *option)
const char * Data() const
ClassImp(TPyArg)