FairRoot/PandaRoot
Public Member Functions | Protected Member Functions | Protected Attributes | Friends | List of all members
PndMQSorterMerger Class Reference

#include <PndMQSorterMerger.h>

Inheritance diagram for PndMQSorterMerger:

Public Member Functions

 PndMQSorterMerger ()
 
virtual ~PndMQSorterMerger ()
 
template<class Archive >
void serialize (Archive &ar, const unsigned int version)
 

Protected Member Functions

virtual void Run ()
 

Protected Attributes

std::vector< std::vector
< PndSdsDigiTopix4 > > 
fData
 
std::vector< PndSdsDigiTopix4fOutputData
 
std::vector< PndSdsDigiTopix4fInputData
 
bool fHasBoostSerialization
 
std::vector< PndSdsDigiTopix4fTopixData
 
std::vector< PndSdsDigiTopix4fCurrentOutput
 
std::vector< PndSdsDigiTopix4fNextOutput
 
std::vector< bool > fRunningStatus
 

Friends

class boost::serialization::access
 

Detailed Description

PndMQSorterMerger.h

Since
2012-12-06
Author
D. Klein, A. Rybalchenko

Definition at line 27 of file PndMQSorterMerger.h.

Constructor & Destructor Documentation

PndMQSorterMerger::PndMQSorterMerger ( )

Definition at line 27 of file PndMQSorterMerger.cxx.

References fHasBoostSerialization.

27  : fHasBoostSerialization(false)
28 {
29  using namespace baseMQ::tools::resolve;
30  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
31  if (has_BoostSerialization<PndSdsDigiTopix4, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
33 }
Data class to store the digi output of a pixel module.
PndMQSorterMerger::~PndMQSorterMerger ( )
virtual

Definition at line 35 of file PndMQSorterMerger.cxx.

36 {
37 }

Member Function Documentation

void PndMQSorterMerger::Run ( )
protectedvirtual

Definition at line 39 of file PndMQSorterMerger.cxx.

References fData, fInputData, fOutputData, fRunningStatus, i, PndMQStatus::RUNNING, status, PndMQStatus::STOP, and PndMQStatus::UNDEFINED.

40 {
41  int direction = 0;
42  int numInputs = fChannels.at("data-in").size();
43  fRunningStatus.resize(numInputs);
44 
45  // store the channel references to avoid traversing the map on every loop iteration
46  const FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
47  FairMQChannel* dataInChannels[fChannels.at("data-in").size()];
48  LOG(INFO) << "Number of Input Channels: " << numInputs;
49  for (int i = 0; i < numInputs; ++i)
50  {
51  dataInChannels[i] = &(fChannels.at("data-in").at(i));
52  fRunningStatus[i] = true;
53  }
54  fData.resize(numInputs);
55  int activeChannel = 0;
56  bool switchChannel = false;
57  bool channelSwitched = false;
58 
59  double oldTS = -1;
60 
61  int nMessages;
62  while (CheckCurrentState(RUNNING))
63  {
64  for (int channelNr = 0; channelNr < numInputs; channelNr++){
65  // LOG(INFO) << "---- Reading channel " << channelNr << " ----";
66  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
67  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
68 
69  nMessages++;
70 
71  if (fRunningStatus[channelNr] == true && dataInChannels[channelNr]->Receive(header) > 0){
72 
73  int status = *(static_cast<int*>(header->GetData()));
74 
75  if (status == PndMQStatus::STOP){
76  fRunningStatus[channelNr] = false;
77  LOG(INFO) << "STOP-Signal received for channel " << channelNr;
78  }
79 
80  if (dataInChannels[channelNr]->ExpectsAnotherPart())
81  {
82  if (dataInChannels[channelNr]->Receive(msg))
83  {
84  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
85  std::istringstream ibuffer(msgStr);
86 
87  boost::archive::binary_iarchive InputArchive(ibuffer);
88 
89  try {
90  InputArchive >> fInputData;
91  }
92  catch (boost::archive::archive_exception& e)
93  {
94  LOG(ERROR) << e.what();
95  }
96 
97  fData[channelNr].insert(fData[channelNr].end(), fInputData.begin(), fInputData.end() );
98 
99  fInputData.clear();
100 
101  // LOG(INFO) << "fData size for channel " << channelNr << " is " << fData[channelNr].size();
102  // for (auto data : fData[channelNr])
103  // LOG(INFO) << data.GetTimeStamp();
104  if (activeChannel == channelNr){
105 // LOG(INFO) << "--- Writing channel " << activeChannel << " ---";
106  if (fData[channelNr].size() > 0){
107  for (std::vector<PndSdsDigiTopix4>::iterator data = fData[channelNr].begin(); data != fData[channelNr].end(); data++){
108  if (data->GetTimeStamp() < 0){
109  fOutputData.insert(fOutputData.end(), fData[channelNr].begin(), data);
110  switchChannel = true;
111  channelSwitched = true;
112  fData[channelNr].erase(fData[channelNr].begin(), ++data);
113  // LOG(INFO) << "Negative TS in " << channelNr << " new Data size " << fData[channelNr].size();
114  break;
115  }
116  }
117  if (switchChannel == false){
118  fOutputData = fData[channelNr];
119  fData[channelNr].clear();
120  }
121 
122  int flag = PndMQStatus::RUNNING;
123 
124  unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(sizeof(int)));
125  memcpy(headerOut->GetData(), &flag, sizeof(int));
126  dataOutChannel.SendPart(headerOut);
127 
128  std::ostringstream obuffer;
129  boost::archive::binary_oarchive OutputArchive(obuffer);
130  OutputArchive << fOutputData;
131  int outputSize = obuffer.str().length();
132  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
133  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
134  dataOutChannel.Send(msg2);
135 // if (nMessages % 1000 == 0){
136 // LOG(INFO) << "fOutputData.size: " << fOutputData.size() << " " << TString::Format("%12.0f", fOutputData.front().GetTimeStamp()).Data();
137 // }
138  for (auto info : fOutputData){
139 // LOG(INFO) << TString::Format("%12.0f", info.GetTimeStamp()).Data();
140  if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
141  LOG(INFO) << "++++ SortingError ++++ " << oldTS << " > " << info.GetTimeStamp();
142  }
143  oldTS = info.GetTimeStamp();
144  }
145  fOutputData.clear();
146  if (switchChannel == true){
147  activeChannel++;
148  if (activeChannel >= numInputs)
149  {
150  activeChannel = 0;
151  }
152  // LOG(INFO) << "Switch active channel to " << activeChannel;
153  switchChannel = false;
154  }
155  }
156  }
157  }
158  }
159  bool allStop = true;
160  for (auto state : fRunningStatus){
161  if (state == true)
162  allStop = false;
163  }
164 
165  if (allStop == true){
166  LOG(INFO) << "STOP-Signal received for all. Emptying buffers.";
167  for (int i = 0; i < numInputs; i++){
168  int flag = PndMQStatus::UNDEFINED;
169  if (i != numInputs - 1){
170  flag = PndMQStatus::RUNNING;
171  } else {
172  flag = PndMQStatus::STOP;
173  }
174  unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(sizeof(int)));
175  memcpy(headerOut->GetData(), &flag, sizeof(int));
176  dataOutChannel.SendPart(headerOut);
177 
178  fOutputData = fData[activeChannel];
179  fData[channelNr].clear();
180 
181  std::ostringstream obuffer;
182  boost::archive::binary_oarchive OutputArchive(obuffer);
183  OutputArchive << fOutputData;
184  int outputSize = obuffer.str().length();
185  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
186  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
187  dataOutChannel.Send(msg2);
188 // LOG(INFO) << "fOutputData.size: " << fOutputData.size();
189  for (auto info : fOutputData){
190 // LOG(INFO) << info.GetTimeStamp();
191  if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
192  LOG(INFO) << "++++ SortingError ++++ " << oldTS << " > " << info.GetTimeStamp();
193  }
194  oldTS = info.GetTimeStamp();
195  }
196  fOutputData.clear();
197 
198  activeChannel++;
199  if (activeChannel >= numInputs)
200  {
201  activeChannel = 0;
202  }
203  }
204  }
205  }
206  }
207  }
208 }
std::vector< PndSdsDigiTopix4 > fInputData
Int_t i
Definition: run_full.C:25
std::vector< PndSdsDigiTopix4 > fOutputData
std::vector< std::vector< PndSdsDigiTopix4 > > fData
std::vector< bool > fRunningStatus
int status[10]
Definition: f_Init.h:28
template<class Archive >
void PndMQSorterMerger::serialize ( Archive &  ar,
const unsigned int  version 
)
inline

Definition at line 34 of file PndMQSorterMerger.h.

References fInputData, and fOutputData.

35  {
36  ar& fInputData;
37  ar& fOutputData;
38  }
std::vector< PndSdsDigiTopix4 > fInputData
std::vector< PndSdsDigiTopix4 > fOutputData

Friends And Related Function Documentation

friend class boost::serialization::access
friend

Definition at line 47 of file PndMQSorterMerger.h.

Member Data Documentation

std::vector<PndSdsDigiTopix4> PndMQSorterMerger::fCurrentOutput
protected

Definition at line 51 of file PndMQSorterMerger.h.

std::vector<std::vector<PndSdsDigiTopix4> > PndMQSorterMerger::fData
protected

Definition at line 42 of file PndMQSorterMerger.h.

Referenced by Run().

bool PndMQSorterMerger::fHasBoostSerialization
protected

Definition at line 48 of file PndMQSorterMerger.h.

Referenced by PndMQSorterMerger().

std::vector<PndSdsDigiTopix4> PndMQSorterMerger::fInputData
protected

Definition at line 44 of file PndMQSorterMerger.h.

Referenced by Run(), and serialize().

std::vector<PndSdsDigiTopix4> PndMQSorterMerger::fNextOutput
protected

Definition at line 52 of file PndMQSorterMerger.h.

std::vector<PndSdsDigiTopix4> PndMQSorterMerger::fOutputData
protected

Definition at line 43 of file PndMQSorterMerger.h.

Referenced by Run(), and serialize().

std::vector<bool> PndMQSorterMerger::fRunningStatus
protected

Definition at line 53 of file PndMQSorterMerger.h.

Referenced by Run().

std::vector<PndSdsDigiTopix4> PndMQSorterMerger::fTopixData
protected

Definition at line 50 of file PndMQSorterMerger.h.


The documentation for this class was generated from the following files: