FairRoot/PandaRoot
PndMQHitEventDevice.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include "PndMQHitEventDevice.h"
16 
17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/archive/binary_iarchive.hpp>
21 
22 
23 #include "baseMQtools.h"
24 
25 #include "FairMQLogger.h"
26 #include "mrfdata_8b.h"
27 #include "PndSdsDigiTopix4.h"
28 #include "PndMQStatus.h"
29 
30 
31 using namespace std;
32 
33 PndMQHitEventDevice::PndMQHitEventDevice() : fHasBoostSerialization(false), fGlobalRunningStatus(true), fBuilder(0)
34 {
35  using namespace baseMQ::tools::resolve;
36  bool checkOutputClass = false;
37 
38  if (is_same<boost::archive::binary_oarchive, boost::archive::binary_oarchive>::value)
39  {
40  if (has_BoostSerialization<PndSdsDigiTopix4, void(boost::archive::binary_oarchive&, const unsigned int)>::value == 1)
41  {
42  checkOutputClass = true;
44  }
45  }
46  LOG(INFO) << "HasBoostSerialization: " << fHasBoostSerialization;
47 }
48 
50 {
51  delete(fBuilder);
52 }
53 
54 
56 {
57  int numInputs = fChannels.at("data-in").size();
58 
59  // store the channel references to avoid traversing the map on every loop iteration
60  const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
61  const FairMQChannel& statusChannel = fChannels.at("status-out").at(0);
62  FairMQChannel* dataInChannels[fChannels.at("data-in").size()];
63  LOG(INFO) << "Number of Input Channels: " << numInputs;
64  fBuilder = new PndMQHitsEventBuilder(numInputs);
65  for (int i = 0; i < numInputs; ++i)
66  {
67  dataInChannels[i] = &(fChannels.at("data-in").at(i));
68  }
69  fDataFromChannels.resize(numInputs);
70  fRunningStatus.resize(numInputs);
71  for(int channel = 0; channel < fRunningStatus.size(); channel++)
72  fRunningStatus[channel] = true;
73 
74 // boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
75 
76  std::vector<int> fillLevel(numInputs,0);
77 
78  int eventCounter = 0;
79  bool stopMessageOnce = true;
80 
81  while (CheckCurrentState(RUNNING))
82  {
83  if ( fGlobalRunningStatus == true){
84  for (int channelNr = 0; channelNr < numInputs; channelNr++){
85  // LOG(INFO) << "---- Reading channel " << channelNr << " ----";
86  if (fillLevel[channelNr] == 0 && fRunningStatus[channelNr] == true){
87 
88  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
89  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
90 
91  if (dataInChannels[channelNr]->Receive(header) > 0)
92  {
93  int status = *(static_cast<int*>(header->GetData()));
94  if (status == PndMQStatus::RUNNING)
95  fRunningStatus[channelNr] = true;
96  else if (status == PndMQStatus::STOP){
97  fRunningStatus[channelNr] = false;
98  LOG(INFO) << "STOP-Status received for channel: " << channelNr;
99  }
100  if (dataInChannels[channelNr]->ExpectsAnotherPart())
101  {
102 
103  if (dataInChannels[channelNr]->Receive(msg) > 0){
104  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
105  std::istringstream ibuffer(msgStr);
106 
107  boost::archive::binary_iarchive InputArchive(ibuffer);
108 
109  try {
110  InputArchive >> fHitData;
111  }
112  catch (boost::archive::archive_exception& e)
113  {
114  LOG(ERROR) << e.what();
115  }
116  //fDataFromChannels[channelNr].push_back(fHitData);
117  fDataFromChannels[channelNr].insert(fDataFromChannels[channelNr].end(), fHitData.begin(), fHitData.end());
118  //LOG(INFO) << "Data in channel " << fDataFromChannels[channelNr].size();
119  fHitData.clear();
120  }
121  }
122  }
123  if (fillLevel[channelNr] == 0 && fRunningStatus[channelNr] == false)
124  {
125  fGlobalRunningStatus = false;
126  LOG(INFO) << "GlobarRunningStatus set to false for channel " << channelNr;
127  }
128  }
129  }
132 
133  if (eventCounter++ % 100 == 0){
134  LOG(INFO) << eventCounter << " nEvents: " << fEventData.size() << " hits in Event " << fEventData.front().size()
135  << " timeStamp: " << TString::Format("%12.0f",fEventData.front().front().GetTimeStamp()).Data()
136  << " sensorID " << fEventData.front().front().GetSensorID();
138  LOG(INFO) << "ChannelsInEvent: ";
139  for (auto data : fSensorsInEvent)
140  LOG(INFO) << data;
141 
142  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
143  int flag = PndMQStatus::RUNNING;
144  memcpy(headerCopy->GetData(), &flag, sizeof(int));
145  statusChannel.SendPart(headerCopy);
146 
147  std::ostringstream obuffer;
148  boost::archive::binary_oarchive OutputArchive(obuffer);
149  //fPndSdsDigiTopix4Vector = frames.front();
150  OutputArchive << fSensorsInEvent;
151  int outputSize = obuffer.str().length();
152  unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(outputSize));
153  memcpy(msg->GetData(), obuffer.str().c_str(), outputSize);
154  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
155  statusChannel.Send(msg);
156 
157  }
158 
159  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
160  int flag = PndMQStatus::RUNNING;
161  memcpy(headerCopy->GetData(), &flag, sizeof(int));
162  dataOutChannel.SendPart(headerCopy);
163 
164  std::ostringstream obuffer;
165  boost::archive::binary_oarchive OutputArchive(obuffer);
166  //fPndSdsDigiTopix4Vector = frames.front();
167  OutputArchive << fEventData;
168  int outputSize = obuffer.str().length();
169  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
170  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
171  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
172  dataOutChannel.Send(msg2);
173 
174  fillLevel = fBuilder->GetInputDataLevel();
175  // LOG(INFO) << "EventData.size() " << eventData.size();
176  // for (auto eventIter : eventData){
177  // LOG(INFO) << "Event";
178  // for (auto dataIter : eventIter){
179  // LOG(INFO) << dataIter.GetSensorID() << " " << dataIter.GetTimeStamp();
180  // }
181  // }
182 
183  for (int channelNr = 0; channelNr < fDataFromChannels.size(); channelNr++){
184  fDataFromChannels[channelNr].clear();
185  }
186  }
187  bool allStop = true;
188  for (auto state : fRunningStatus){
189  if (state == true)
190  allStop = false;
191  }
192  if (fGlobalRunningStatus == false && stopMessageOnce == true){
193  LOG(INFO) << "STOP-Signal received for one input";
194  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
195  int flag = PndMQStatus::STOP;
196  memcpy(headerCopy->GetData(), &flag, sizeof(int));
197  dataOutChannel.Send(headerCopy);
198  stopMessageOnce = false;
199  }
200  }
201 }
202 
203 void PndMQHitEventDevice::SetProperty(const int key, const string& value)
204 {
205  switch (key)
206  {
207  default:
208  FairMQDevice::SetProperty(key, value);
209  break;
210  }
211 }
212 
213 string PndMQHitEventDevice::GetProperty(const int key, const string& default_ /*= ""*/)
214 {
215  switch (key)
216  {
217  default:
218  return FairMQDevice::GetProperty(key, default_);
219  }
220 }
221 
222 void PndMQHitEventDevice::SetProperty(const int key, const int value)
223 {
224  switch (key)
225  {
226  default:
227  FairMQDevice::SetProperty(key, value);
228  break;
229  }
230 }
231 
232 int PndMQHitEventDevice::GetProperty(const int key, const int default_ /*= 0*/)
233 {
234  switch (key)
235  {
236  default:
237  return FairMQDevice::GetProperty(key, default_);
238  }
239 }
240 
241 
void AddData(vector< vector< vector< PndSdsHit > > > &data)
std::vector< std::vector< PndSdsHit > > fHitData
Int_t i
Definition: run_full.C:25
PndMQHitsEventBuilder * fBuilder
Data class to store the digi output of a pixel module.
std::vector< int > fSensorsInEvent
std::vector< int > GetInputDataLevel()
virtual std::string GetProperty(const int key, const std::string &default_="")
std::vector< bool > fRunningStatus
std::vector< std::vector< PndSdsHit > > fEventData
virtual void SetProperty(const int key, const std::string &value)
std::vector< std::vector< std::vector< PndSdsHit > > > fDataFromChannels
vector< vector< PndSdsHit > > GetEvents()
vector< int > GetSensorsInEvent()
int status[10]
Definition: f_Init.h:28