FairRoot/PandaRoot
PndMQSorterDistributor.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <boost/thread.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/archive/binary_oarchive.hpp>
18 
19 #include "baseMQtools.h"
20 
21 #include "FairMQLogger.h"
22 #include "PndMQSorterDistributor.h"
23 #include "PndMQStatus.h"
24 
25 using namespace std;
26 
27 PndMQSorterDistributor::PndMQSorterDistributor() : fThreshold(1E9), fOffset(1E6), fHasBoostSerialization(false)
28 {
29  using namespace baseMQ::tools::resolve;
30  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
31  if (has_BoostSerialization<PndSdsDigiTopix4, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
33 }
34 
36 {
37 }
38 
40 {
41  int direction = 0;
42  int numOutputs = fChannels.at("data-out").size();
43 
44  // store the channel references to avoid traversing the map on every loop iteration
45  const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
46  FairMQChannel* dataOutChannels[fChannels.at("data-out").size()];
47  LOG(INFO) << "Number of Output Channels: " << numOutputs;
48  for (int i = 0; i < numOutputs; ++i)
49  {
50  dataOutChannels[i] = &(fChannels.at("data-out").at(i));
51  }
52 
53  double currentThreshold = fThreshold;
54  double currentOffset = currentThreshold + fOffset;
55 
56  while (CheckCurrentState(RUNNING))
57  {
58  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
59  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
60 
61  if (dataInChannel.Receive(header) > 0)
62  {
63  int status = *(static_cast<int*>(header->GetData()));
64 
65  if (status != PndMQStatus::RUNNING){
66  LOG(INFO) << "WrongStatus: " << status;
67  }
68 
69  if (dataInChannel.ExpectsAnotherPart())
70  {
71  if (dataInChannel.Receive(msg) > 0)
72  {
73  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74  std::istringstream ibuffer(msgStr);
75  boost::archive::binary_iarchive InputArchive(ibuffer);
76  try {
77  InputArchive >> fTopixData;
78  }
79  catch (boost::archive::archive_exception& e)
80  {
81  LOG(ERROR) << e.what();
82  }
83 
84  // LOG(INFO) << "TopixData: " << fTopixData.size();
85 
86  bool switchChannels = false;
87  for (auto itr : fTopixData){
88  if (itr.GetTimeStamp() < currentThreshold)
89  fCurrentOutput.push_back(itr);
90  else {
91  fNextOutput.push_back(itr);
92  if(itr.GetTimeStamp() > currentOffset){
93  // LOG(INFO) << "Switch Channels: " << itr.GetTimeStamp() << " > " << currentOffset;
94  switchChannels = true;
95  }
96  }
97  }
98  fTopixData.clear();
99  if (switchChannels == true){
100  fCurrentOutput.push_back(PndSdsDigiTopix4()); //empty data to signal switch of channels
101  }
102 
103  if (fCurrentOutput.size() > 0){
104 
105  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
106  int flag = PndMQStatus::RUNNING;
107  memcpy(headerCopy->GetData(), &flag, sizeof(int));
108  dataOutChannels[direction]->SendPart(headerCopy);
109 
110  std::ostringstream obuffer;
111  boost::archive::binary_oarchive OutputArchive(obuffer);
112  //fPndSdsDigiTopix4Vector = frames.front();
113  OutputArchive << fCurrentOutput;
114  int outputSize = obuffer.str().length();
115  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
116  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
117  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
118  dataOutChannels[direction]->Send(msg2);
119  // LOG(INFO) << "CurrentOutput send to " << direction << " size: " << fCurrentOutput.size();
120  // for (auto itr : fCurrentOutput){
121  // LOG(INFO) << itr.GetTimeStamp();
122  // }
123  fCurrentOutput.clear();
124  }
125  if (fNextOutput.size() > 0){
126 
127  int nextOutput = direction + 1;
128  if (nextOutput >= numOutputs)
129  nextOutput = 0;
130 
131  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
132  int flag = PndMQStatus::RUNNING;
133  memcpy(headerCopy->GetData(), &flag, sizeof(int));
134  dataOutChannels[nextOutput]->SendPart(headerCopy);
135 
136  std::ostringstream obuffer;
137  boost::archive::binary_oarchive OutputArchive(obuffer);
138  //fPndSdsDigiTopix4Vector = frames.front();
139  OutputArchive << fNextOutput;
140  int outputSize = obuffer.str().length();
141  unique_ptr<FairMQMessage> msg3(fTransportFactory->CreateMessage(outputSize));
142  memcpy(msg3->GetData(), obuffer.str().c_str(), outputSize);
143  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
144  dataOutChannels[nextOutput]->Send(msg3);
145  // LOG(INFO) << "NextOutput send to " << nextOutput << " size " << fNextOutput.size();
146  fNextOutput.clear();
147  }
148 
149  if (switchChannels == true){
150  // LOG(INFO) << "Switch channels old threshold " << currentThreshold << " old offset "<< currentOffset;
151  direction++;
152  if (direction >= numOutputs)
153  {
154  direction = 0;
155  }
156  currentThreshold += fThreshold;
157  currentOffset += fThreshold;
158  switchChannels = false;
159  }
160  }
161  }
162  if (status == PndMQStatus::STOP){
163  LOG(INFO) << "STOP-Status Received: " << status;
164  for (int i = 0; i < numOutputs; ++i)
165  {
166  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
167  headerCopy->Copy(header);
168  dataOutChannels[i]->Send(headerCopy);
169  }
170  }
171  }
172  }
173 }
std::vector< PndSdsDigiTopix4 > fCurrentOutput
Int_t i
Definition: run_full.C:25
Data class to store the digi output of a pixel module.
std::vector< PndSdsDigiTopix4 > fTopixData
std::vector< PndSdsDigiTopix4 > fNextOutput
int status[10]
Definition: f_Init.h:28