KaliVeda
Toolkit for HIC analysis
KVProtobufDataReader.cpp
1 //Created by KVClassFactory on Mon Jan 14 12:10:22 2019
2 //Author: John Frankland,,,
3 
4 #include "KVProtobufDataReader.h"
5 
6 #include <google/protobuf/io/coded_stream.h>
7 
9 
10 #define KV_PROTOBUF_MSG_SIZE 4
11 
12 
25 
27 {
28  // Prepare to read next event in buffer.
29  // - if a new buffer is needed, we read one from the file
30  // - at current position in buffer we read the size of the next protobuf message
31  // - we check if the entire message is contained within the buffer
32  // if not, the part of the message already read is moved to the start of the buffer
33  // then the rest of the buffer is filled from the file
34  // - if this method returns true, then fEvOffset and fEvSize are correctly positioned
35  // in order for the message to be read (handled by daughter class with specific
36  // protobuf message format)
37  // - if this method returns false, then either a major problem occurred when reading
38  // the file, or we have reached the end and there are no more events/messages to read
39 
40  if (fReachedEndOfFile) {
41  // derived classes may override read_buffer() so that it opens a new file
42  if (!read_buffer()) return false;
43  }
44 
45  // do we have enough bytes left (4) in buffer to read size of next event?
46  if (get_remaining_readable_buffer() < KV_PROTOBUF_MSG_SIZE) fNeedToReadBuffer = true;
47 
49  if (!read_buffer()) return false;
50 
51  // get size of next event in buffer
52  fEvSize = 0;
53  {
54  // Read the message size (4-bytes/32-bit integer)
55  google::protobuf::io::CodedInputStream codedIStream((const uint8_t*)(fBuffer + fEvOffset), KV_PROTOBUF_MSG_SIZE);
56  codedIStream.ReadLittleEndian32(&fEvSize);
57  if (fEvSize == 0) {
58  Error("GetNextEvent", "read zero size event");
59  return false;
60  }
61  }
62  //Info("GetNextEvent", "event size=%d bytes", fEvSize);
63 
64  fEvOffset += KV_PROTOBUF_MSG_SIZE;
65  // do we have enough bytes left to read the event?
67  fNeedToReadBuffer = true;
68  if (!read_buffer()) return false;
69  }
70 
71  if (!parse_event_from_message()) {
72  // problem parsing event - try to read next event
73  fEvOffset += fEvSize;
74  return GetNextEvent();
75  }
76 
77  fEvOffset += fEvSize;
78 
79  return true;
80 }
81 
82 
83 
86 
88 {
89  // read a buffer from the file
90 
91  if (fReachedEndOfFile) {
92  // last read reached end of file
93  return false;
94  }
95  ptrdiff_t fFillOffset = 0;
96  // if fEvOffset>0 we need to copy the remaining bytes of the last buffer to the
97  // beginning of the buffer and adjust fFillOffset accordingly
99  //Info("read_buffer", "Copying last %d bytes of old buffer to beginning of new buffer", get_remaining_readable_buffer());
101  fFillOffset = get_remaining_readable_buffer();
102  }
103  Int_t bytes_to_read = fBufSize - fFillOffset;
104  if (bytes_to_read > fFileSize) bytes_to_read = fFileSize;
105  //Info("read_buffer", "fFillOffset:%ld bytes to read:%d bytes", fFillOffset, bytes_to_read);
106  Long64_t old_bytes = fFile->GetBytesRead();
107  fFile->ReadBuffer((char*)(fBuffer + fFillOffset), bytes_to_read);
108  Long64_t bytes_read = fFile->GetBytesRead() - old_bytes;
109  //Info("read_buffer", "Read %d bytes from file", (Int_t)bytes_read);
110  fFileSize -= bytes_read;
111  if (bytes_read == 0 || fFileSize == 0 || bytes_read < (Int_t)(fBufSize - fFillOffset)) fReachedEndOfFile = true;
112  fNeedToReadBuffer = false;
113  fEvOffset = 0;
114  return (bytes_read > 0);
115 }
116 
117 
118 
120 
122 {
123  TString fp(filepath);
124  fp.Append("?filetype=raw");
125 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,0,0)
126  fFile.reset(TFile::Open(fp));
127 #else
128  SafeDelete(fFile);
129  fFile = TFile::Open(fp);
130 #endif
131  fFileSize = fFile->GetSize();
132  Info("open_file", "%s : size of file = %lld bytes", filepath, fFileSize);
133  fEvOffset = 0;
134  fNeedToReadBuffer = true;
135  fReachedEndOfFile = false;
136 }
137 
138 
139 
145 
147  : KVRawDataReader(),
148  fBufSize(bufSiz), fBuffer(new char[bufSiz]), fFile(nullptr), fEvSize(0), fEvOffset(0), fNeedToReadBuffer(true),
149  fReachedEndOfFile(false)
150 {
151  // Open Google protobuf file for reading. Filepath URL will be passed to TFile::Open
152  // therefore can use same plugins eg. "root://" etc.
153  // Default buffer size: 16MB
154  // Note: buffer size given as Int_t, as this is argument type required by TFile::ReadBuffer
155 
156  open_file(filepath);
157 }
158 
159 
160 
163 
165  : KVRawDataReader(),
166  fBufSize(bufSiz), fBuffer(new char[bufSiz]), fFile(nullptr), fFileSize(0), fEvSize(0), fEvOffset(0), fNeedToReadBuffer(true),
167  fReachedEndOfFile(false)
168 {
169  // Create file reader of given buffer size, do not open any files yet
170 }
171 
172 
173 
176 
178 {
179  // Destructor
180  delete [] fBuffer;
181 #if ROOT_VERSION_CODE < ROOT_VERSION(6,0,0)
182  SafeDelete(fFile);
183 #endif
184 }
185 
186 
187 
189 
191 {
192  return 0;
193 }
194 
195 
196 
int Int_t
std::string fBuffer
#define SafeDelete(p)
bool Bool_t
char Char_t
Read Google Protobuf DAQ files.
void open_file(const Char_t *filepath)
UInt_t fEvSize
size of next event in buffer
virtual bool read_buffer()
read a buffer from the file
UInt_t get_remaining_readable_buffer() const
Int_t fBufSize
buffer size
bool fReachedEndOfFile
true when we have read all bytes from file
KVProtobufDataReader(Int_t bufSiz=16 *1024 *1024)
Create file reader of given buffer size, do not open any files yet.
Int_t GetRunNumberReadFromFile() const
Long64_t fFileSize
size of file in bytes
bool fNeedToReadBuffer
true when the buffer is empty/incomplete
char * fBuffer
current buffer
ptrdiff_t fEvOffset
next position to read in buffer
virtual ~KVProtobufDataReader()
Destructor.
std::unique_ptr< TFile > fFile
TFile plugin handle.
virtual bool parse_event_from_message()=0
Abstract base class for reading raw (DAQ) data.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
virtual void Error(const char *method, const char *msgfmt,...) const
virtual void Info(const char *method, const char *msgfmt,...) const
TString & Append(char c, Ssiz_t rep=1)
long long Long64_t
ClassImp(TPyArg)