FairRoot/PandaRoot
PndMvdMQFileSamplerBursts.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
16 
17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/archive/text_oarchive.hpp>
21 #include <boost/archive/binary_iarchive.hpp>
22 #include <boost/archive/text_iarchive.hpp>
23 #include <boost/serialization/export.hpp>
24 #include <boost/serialization/unique_ptr.hpp>
25 
26 #include "FairMQLogger.h"
27 
28 #include "FairMQMessage.h"
29 #include "TMessage.h"
30 #include "FairEventHeader.h"
31 
32 #include "PndSdsDigiPixel.h"
33 #include "PndSdsDigiStrip.h"
34 #include "PndSttHit.h"
35 #include "PndSdsHit.h"
36 
37 using namespace std;
38 
39 BOOST_CLASS_EXPORT_GUID(PndSdsDigiPixel, "PndSdsDigiPixel");
40 BOOST_CLASS_EXPORT_GUID(PndSdsDigiStrip, "PndSdsDigiStrip");
42 //BOOST_CLASS_EXPORT_GUID(PndSdsHit, "PndSdsHit");
43 
45  : FairMQDevice()
46  , fRunAna(NULL)
47  , fSource(NULL)
48  , fOutputData()
49  , fNObjects(0)
50  , fMaxIndex(-1)
51  , fBranchNames()
52  , fFileNames()
53  , fEventHeader(0)
54 {
55 }
56 
58 {
59  for (auto itr : fBurstBuilder)
60  delete itr.second;
61  delete fSource;
62  delete fRunAna;
63 }
64 
66 {
67  fRunAna = new FairRunAna();
68  if ( fFileNames.size() > 0 ) {
69  fSource = new FairFileSource(fFileNames.at(0).c_str());
70  for ( unsigned int ifile = 1 ; ifile < fFileNames.size() ; ifile++ )
71  fSource->AddFile(fFileNames.at(ifile));
72  }
73  fSource->Init();
74  LOG(INFO) << "Going to request " << fBranchNames.size() << " branches:";
75 
76  fSource->ActivateObject((TObject**)&fEventHeader, "EventHeader.");
77 
78  for ( unsigned int ibrn = 0 ; ibrn < fBranchNames.size() ; ibrn++ ) {
79 
80  LOG(INFO) << " requesting branch \"" << fBranchNames[ibrn].second << "\"";
81  std::string branchName = fBranchNames[ibrn].second;
82  int branchStat = fSource->ActivateObject((TObject**)&fInputBranches[branchName], branchName.c_str()); // should check the status...
83  LOG(INFO) << "BranchStat: " << branchStat;
84  InitBurstBuilder(branchName);
85  LOG(INFO) << "Activated object \"" << fInputBranches[branchName] << "\" with name \"" << fBranchNames[ibrn].second << " for channel " << fBranchNames[ibrn].first <<"/ (" << branchStat << ")";
86  fNObjects++;
87  }
88 
89  if ( fMaxIndex < 0 )
90  fMaxIndex = fSource->CheckMaxEventNo();
91  LOG(INFO) << "Input source has " << fMaxIndex << " events.";
92 }
93 
94 void PndMvdMQFileSamplerBursts::InitBurstBuilder(std::string branchName)
95 {
96  PndBurstVectorBuilderBase* tmpBuilder = 0;
97  if (branchName == "MVDPixelDigis")
99  else if (branchName == "MVDStripDigis")
101  else if (branchName == "STTHit")
102  tmpBuilder = new PndBurstVectorBuilderT<PndSttHit>;
103  else if (branchName == "MVDHitsStrip")
104  tmpBuilder = new PndBurstVectorBuilderT<PndSdsHit>;
105 
106  if (tmpBuilder != 0)
107  fBurstBuilder[branchName] = tmpBuilder;
108 }
109 
110 class TMessage2 : public TMessage
111 {
112  public:
113  TMessage2(void* buf, Int_t len)
114  : TMessage(buf, len)
115  {
116  ResetBit(kIsOwner);
117  }
118 };
119 
120 // helper function to clean up the object holding the data after it is transported.
121 void free_string(void* data, void *hint)
122 {
123  delete static_cast<std::string*>(hint);
124 }
125 
127  int eventCounter = 0;
128 
129  // Check if we are still in the RUNNING state.
130 // boost::this_thread::sleep(boost::posix_time::milliseconds(100000));
131  while (CheckCurrentState(RUNNING)) {
132  if (eventCounter != fMaxIndex){
133 
134  Int_t readEventReturn = fSource->ReadEvent(eventCounter);
135 
136  if (readEventReturn != 0)
137  break;
138 
139 // if (fEventHeader != 0)
140 // LOG(INFO) << "EventHeader: " << fEventHeader->GetRunId() << " " << fEventHeader->GetEventTime() << std::endl;
141 
142  for (auto branchItr : fInputBranches){
143  if (branchItr.first.find(".") == std::string::npos){
144  std::vector<std::vector< FairTimeStamp* > > data;
145  TClonesArray* tmpArray = (TClonesArray*)branchItr.second;
146 
147  if (fBurstBuilder.count(branchItr.first) > 0){
148  fOutputData[branchItr.first] = fBurstBuilder[branchItr.first]->ProcessData(tmpArray);
149  // LOG(INFO) << branchItr.first << " has " << fOutputData[branchItr.first].size() << " bursts! ";
150  }
151  }
152  }
153  } else {
154  LOG(INFO) << "FinishRun";
155  for (auto branchItr : fInputBranches){
156  if (branchItr.first.find(".") == std::string::npos){
157  if (fBurstBuilder.count(branchItr.first) > 0){
158  fOutputData[branchItr.first] = fBurstBuilder[branchItr.first]->GetLastData();
159  // LOG(INFO) << branchItr.first << " has " << fOutputData[branchItr.first].size() << " last bursts! ";
160  // for (auto dataItr : fOutputData[branchItr.first]){
161  // LOG(INFO) << branchItr.first << " has data: " << dataItr.size();
162  // }
163  }
164  }
165  }
166  }
167 
168  for (auto portIt = fPorts.begin(); portIt != fPorts.end(); portIt++){
169  for (auto branchIt = fPortBranchNameMap.lower_bound(*portIt); branchIt != fPortBranchNameMap.upper_bound(*portIt); ++branchIt){
170 // LOG(INFO) << branchIt->second << " burstSize " << fOutputData[branchIt->second].size();
171  for (auto dataIt = fOutputData[branchIt->second].begin(); dataIt != fOutputData[branchIt->second].end(); ++dataIt){
172 // LOG(INFO) << branchIt->second << " dataSize " << dataIt->size();
173  if (dataIt->size() > 0){
174  BurstData bData;// = new BurstData;
175  std::vector<std::vector<FairTimeStamp*> > dataVector;
176  dataVector.push_back(*dataIt);
177  bData.fData = dataVector;
178  bData.fHeader.fBranchName = branchIt->second;
179  bData.fHeader.fRunID = fEventHeader->GetRunId();
180  bData.fHeader.fBurstID = fBurstBuilder[branchIt->second]->GetBurstId(dataVector[0][0]);
181  std::ostringstream obuffer;
182  boost::archive::binary_oarchive OutputArchive(obuffer);
183  OutputArchive << bData;
184  std::string* strMsg = new std::string(obuffer.str());
185  unique_ptr<FairMQMessage> msg(NewMessage(const_cast<char*>(strMsg->c_str()), strMsg->length(), free_string, strMsg));
186  LOG(INFO) << "Send message: " << bData.fHeader.fBranchName << " " << bData.fHeader.fBurstID << " size: " << msg->GetSize();
187  //LOG(INFO) << obuffer.str();
188  Send(msg, *portIt);
189  }
190  }
191  }
192  }
193  if (eventCounter != fMaxIndex){
194  eventCounter++;
195  }
196  else
197  break;
198  }
199  LOG(INFO) << "Going out of RUNNING state.";
200 }
201 
202 
std::vector< std::pair< std::string, std::string > > fBranchNames
Class for digitised strip hits.
std::map< std::string, PndBurstVectorBuilderBase * > fBurstBuilder
TMessage2(void *buf, Int_t len)
BOOST_CLASS_EXPORT_GUID(PndSdsHit,"PndSdsHit")
virtual void InitBurstBuilder(std::string branchName)
void free_string(void *data, void *hint)
std::vector< std::string > fFileNames
std::vector< std::vector< FairTimeStamp * > > fData
std::multimap< std::string, std::string > fPortBranchNameMap
Data class to store the digi output of a pixel module.
std::map< std::string, std::vector< std::vector< FairTimeStamp * > > > fOutputData
if(fWindowIsBox)
std::map< std::string, TObject * > fInputBranches