FairRoot/PandaRoot
Public Member Functions | Protected Member Functions | Protected Attributes | Friends | List of all members
PndMQSorterDistributor Class Reference

#include <PndMQSorterDistributor.h>

Inheritance diagram for PndMQSorterDistributor:

Public Member Functions

 PndMQSorterDistributor ()
 
virtual ~PndMQSorterDistributor ()
 
template<class Archive >
void serialize (Archive &ar, const unsigned int version)
 

Protected Member Functions

virtual void Run ()
 

Protected Attributes

double fThreshold
 
double fOffset
 
bool fHasBoostSerialization
 
std::vector< PndSdsDigiTopix4fTopixData
 
std::vector< PndSdsDigiTopix4fCurrentOutput
 
std::vector< PndSdsDigiTopix4fNextOutput
 

Friends

class boost::serialization::access
 

Detailed Description

PndMQSorterDistributor.h

Since
2012-12-06
Author
D. Klein, A. Rybalchenko

Definition at line 27 of file PndMQSorterDistributor.h.

Constructor & Destructor Documentation

PndMQSorterDistributor::PndMQSorterDistributor ( )

Definition at line 27 of file PndMQSorterDistributor.cxx.

References fHasBoostSerialization.

27  : fThreshold(1E9), fOffset(1E6), fHasBoostSerialization(false)
28 {
29  using namespace baseMQ::tools::resolve;
30  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
31  if (has_BoostSerialization<PndSdsDigiTopix4, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
33 }
Data class to store the digi output of a pixel module.
PndMQSorterDistributor::~PndMQSorterDistributor ( )
virtual

Definition at line 35 of file PndMQSorterDistributor.cxx.

36 {
37 }

Member Function Documentation

void PndMQSorterDistributor::Run ( )
protectedvirtual

Definition at line 39 of file PndMQSorterDistributor.cxx.

References fCurrentOutput, fNextOutput, fOffset, fThreshold, fTopixData, i, PndMQStatus::RUNNING, status, and PndMQStatus::STOP.

40 {
41  int direction = 0;
42  int numOutputs = fChannels.at("data-out").size();
43 
44  // store the channel references to avoid traversing the map on every loop iteration
45  const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
46  FairMQChannel* dataOutChannels[fChannels.at("data-out").size()];
47  LOG(INFO) << "Number of Output Channels: " << numOutputs;
48  for (int i = 0; i < numOutputs; ++i)
49  {
50  dataOutChannels[i] = &(fChannels.at("data-out").at(i));
51  }
52 
53  double currentThreshold = fThreshold;
54  double currentOffset = currentThreshold + fOffset;
55 
56  while (CheckCurrentState(RUNNING))
57  {
58  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
59  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
60 
61  if (dataInChannel.Receive(header) > 0)
62  {
63  int status = *(static_cast<int*>(header->GetData()));
64 
65  if (status != PndMQStatus::RUNNING){
66  LOG(INFO) << "WrongStatus: " << status;
67  }
68 
69  if (dataInChannel.ExpectsAnotherPart())
70  {
71  if (dataInChannel.Receive(msg) > 0)
72  {
73  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74  std::istringstream ibuffer(msgStr);
75  boost::archive::binary_iarchive InputArchive(ibuffer);
76  try {
77  InputArchive >> fTopixData;
78  }
79  catch (boost::archive::archive_exception& e)
80  {
81  LOG(ERROR) << e.what();
82  }
83 
84  // LOG(INFO) << "TopixData: " << fTopixData.size();
85 
86  bool switchChannels = false;
87  for (auto itr : fTopixData){
88  if (itr.GetTimeStamp() < currentThreshold)
89  fCurrentOutput.push_back(itr);
90  else {
91  fNextOutput.push_back(itr);
92  if(itr.GetTimeStamp() > currentOffset){
93  // LOG(INFO) << "Switch Channels: " << itr.GetTimeStamp() << " > " << currentOffset;
94  switchChannels = true;
95  }
96  }
97  }
98  fTopixData.clear();
99  if (switchChannels == true){
100  fCurrentOutput.push_back(PndSdsDigiTopix4()); //empty data to signal switch of channels
101  }
102 
103  if (fCurrentOutput.size() > 0){
104 
105  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
106  int flag = PndMQStatus::RUNNING;
107  memcpy(headerCopy->GetData(), &flag, sizeof(int));
108  dataOutChannels[direction]->SendPart(headerCopy);
109 
110  std::ostringstream obuffer;
111  boost::archive::binary_oarchive OutputArchive(obuffer);
112  //fPndSdsDigiTopix4Vector = frames.front();
113  OutputArchive << fCurrentOutput;
114  int outputSize = obuffer.str().length();
115  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
116  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
117  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
118  dataOutChannels[direction]->Send(msg2);
119  // LOG(INFO) << "CurrentOutput send to " << direction << " size: " << fCurrentOutput.size();
120  // for (auto itr : fCurrentOutput){
121  // LOG(INFO) << itr.GetTimeStamp();
122  // }
123  fCurrentOutput.clear();
124  }
125  if (fNextOutput.size() > 0){
126 
127  int nextOutput = direction + 1;
128  if (nextOutput >= numOutputs)
129  nextOutput = 0;
130 
131  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(sizeof(int)));
132  int flag = PndMQStatus::RUNNING;
133  memcpy(headerCopy->GetData(), &flag, sizeof(int));
134  dataOutChannels[nextOutput]->SendPart(headerCopy);
135 
136  std::ostringstream obuffer;
137  boost::archive::binary_oarchive OutputArchive(obuffer);
138  //fPndSdsDigiTopix4Vector = frames.front();
139  OutputArchive << fNextOutput;
140  int outputSize = obuffer.str().length();
141  unique_ptr<FairMQMessage> msg3(fTransportFactory->CreateMessage(outputSize));
142  memcpy(msg3->GetData(), obuffer.str().c_str(), outputSize);
143  //unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomCleanup, &obuffer));
144  dataOutChannels[nextOutput]->Send(msg3);
145  // LOG(INFO) << "NextOutput send to " << nextOutput << " size " << fNextOutput.size();
146  fNextOutput.clear();
147  }
148 
149  if (switchChannels == true){
150  // LOG(INFO) << "Switch channels old threshold " << currentThreshold << " old offset "<< currentOffset;
151  direction++;
152  if (direction >= numOutputs)
153  {
154  direction = 0;
155  }
156  currentThreshold += fThreshold;
157  currentOffset += fThreshold;
158  switchChannels = false;
159  }
160  }
161  }
162  if (status == PndMQStatus::STOP){
163  LOG(INFO) << "STOP-Status Received: " << status;
164  for (int i = 0; i < numOutputs; ++i)
165  {
166  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
167  headerCopy->Copy(header);
168  dataOutChannels[i]->Send(headerCopy);
169  }
170  }
171  }
172  }
173 }
std::vector< PndSdsDigiTopix4 > fCurrentOutput
Int_t i
Definition: run_full.C:25
Data class to store the digi output of a pixel module.
std::vector< PndSdsDigiTopix4 > fTopixData
std::vector< PndSdsDigiTopix4 > fNextOutput
int status[10]
Definition: f_Init.h:28
template<class Archive >
void PndMQSorterDistributor::serialize ( Archive &  ar,
const unsigned int  version 
)
inline

Definition at line 34 of file PndMQSorterDistributor.h.

References fCurrentOutput, fNextOutput, and fTopixData.

35  {
36  ar& fTopixData;
37  ar& fCurrentOutput;
38  ar& fNextOutput;
39  }
std::vector< PndSdsDigiTopix4 > fCurrentOutput
std::vector< PndSdsDigiTopix4 > fTopixData
std::vector< PndSdsDigiTopix4 > fNextOutput

Friends And Related Function Documentation

friend class boost::serialization::access
friend

Definition at line 47 of file PndMQSorterDistributor.h.

Member Data Documentation

std::vector<PndSdsDigiTopix4> PndMQSorterDistributor::fCurrentOutput
protected

Definition at line 51 of file PndMQSorterDistributor.h.

Referenced by Run(), and serialize().

bool PndMQSorterDistributor::fHasBoostSerialization
protected

Definition at line 48 of file PndMQSorterDistributor.h.

Referenced by PndMQSorterDistributor().

std::vector<PndSdsDigiTopix4> PndMQSorterDistributor::fNextOutput
protected

Definition at line 52 of file PndMQSorterDistributor.h.

Referenced by Run(), and serialize().

double PndMQSorterDistributor::fOffset
protected

Definition at line 44 of file PndMQSorterDistributor.h.

Referenced by Run().

double PndMQSorterDistributor::fThreshold
protected

Definition at line 43 of file PndMQSorterDistributor.h.

Referenced by Run().

std::vector<PndSdsDigiTopix4> PndMQSorterDistributor::fTopixData
protected

Definition at line 50 of file PndMQSorterDistributor.h.

Referenced by Run(), and serialize().


The documentation for this class was generated from the following files: