FairRoot/PandaRoot
Public Member Functions | Protected Member Functions | Protected Attributes | Friends | List of all members
PndMQMerger Class Referenceabstract

#include <PndMQMerger.h>

Inheritance diagram for PndMQMerger:
PndMQMergerTest

Public Member Functions

 PndMQMerger ()
 
virtual ~PndMQMerger ()
 
template<class Archive >
void serialize (Archive &ar, const unsigned int version)
 

Protected Member Functions

virtual void Run ()
 
virtual void ProcessData (std::map< std::string, BurstData > &dataToProcess)=0
 

Protected Attributes

BurstDatafOutputData
 
BurstData fInputData
 
std::map< int, std::map
< std::string, BurstData > > 
fInputMap
 
bool fHasBoostSerialization
 
std::vector< bool > fRunningStatus
 

Friends

class boost::serialization::access
 

Detailed Description

PndMQMerger.h

Since
2012-12-06
Author
D. Klein, A. Rybalchenko

Definition at line 27 of file PndMQMerger.h.

Constructor & Destructor Documentation

PndMQMerger::PndMQMerger ( )

Definition at line 27 of file PndMQMerger.cxx.

References fHasBoostSerialization.

28 {
29  using namespace baseMQ::tools::resolve;
30  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
31  if (has_BoostSerialization<FairTimeStamp*, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
33 }
BurstData * fOutputData
Definition: PndMQMerger.h:42
bool fHasBoostSerialization
Definition: PndMQMerger.h:50
PndMQMerger::~PndMQMerger ( )
virtual

Definition at line 35 of file PndMQMerger.cxx.

36 {
37 }

Member Function Documentation

virtual void PndMQMerger::ProcessData ( std::map< std::string, BurstData > &  dataToProcess)
protectedpure virtual

Implemented in PndMQMergerTest.

void PndMQMerger::Run ( )
protectedvirtual

Definition at line 45 of file PndMQMerger.cxx.

46 {
47  int direction = 0;
48  int numInputs = fChannels.at("data-in").size();
49 // fRunningStatus.resize(numInputs);
50 
51  LOG(INFO) << "Number of Input Channels: " << numInputs;
52 
53 // boost::this_thread::sleep(boost::posix_time::milliseconds(10000));
54 
55  std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, {"data-in"}));
56 
57  int nMessages;
58  while (CheckCurrentState(RUNNING))
59  {
60  poller->Poll(-1);
61  for (int channelNr = 0; channelNr < numInputs; channelNr++){
62  LOG(INFO) << "---- Reading channel " << channelNr << " ----";
63  if (poller->CheckInput("data-in", channelNr)){
64  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
65 
66  nMessages++;
67 
68  LOG(INFO) << "Get Data for Channel: " << channelNr;
69 
70  if (Receive(msg, "data-in", channelNr) >= 0){
71  LOG(INFO) << "---Data Received--- " << msg->GetSize();
72 
73  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74 // LOG(INFO) << msgStr;
75  std::istringstream ibuffer(msgStr);
76 
77  try {
78  boost::archive::binary_iarchive InputArchive(ibuffer);
79  InputArchive >> fInputData;
80  }
81  catch (boost::archive::archive_exception& e)
82  {
83  LOG(ERROR) << e.what();
84 // continue;
85  }
86  LOG(INFO) << "Data: " << fInputData.fHeader.fBurstID << " " << fInputData.fHeader.fBranchName;
87 // LOG(INFO) << "DataSize: " << fInputData.fData.size() << " " << fInputData.fData[0].size();
88 // for (auto gapItr : fInputData.fData){
89 // for (auto dataItr : gapItr)
90 // LOG(INFO) << "TimeStamp: " << dataItr->GetTimeStamp();
91 // }
92 
94  LOG(INFO) << "InputMap for BurstID: " << fInputData.fHeader.fBurstID << " " << fInputMap[fInputData.fHeader.fBurstID].size();
95 
96  if (fInputMap[fInputData.fHeader.fBurstID].size() == numInputs){ // all input channels have delivered data for this burstID
97  //std::map<std::string, BurstData> dataToProcess = fInputMap[fInputData.fHeader.fBurstID];
98  // fInputMap.erase(fInputMap.find(fInputData.fHeader.fBurstID));
100 
101  // if (fOutputData != 0){
102  // std::ostringstream obuffer;
103  // boost::archive::binary_oarchive OutputArchive(obuffer);
104  // OutputArchive << *fOutputData;
105  // int outputSize = obuffer.str().length();
106  // unique_ptr<FairMQMessage> msgOut(NewMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomClean, fOutputData));
107  // Send(msgOut, "data-out");
108  // }
109  }
110  }
111  }
112  }
113  }
114 }
BurstData fInputData
Definition: PndMQMerger.h:43
std::map< int, std::map< std::string, BurstData > > fInputMap
Definition: PndMQMerger.h:44
virtual void ProcessData(std::map< std::string, BurstData > &dataToProcess)=0
template<class Archive >
void PndMQMerger::serialize ( Archive &  ar,
const unsigned int  version 
)
inline

Definition at line 34 of file PndMQMerger.h.

References fInputData, and fOutputData.

35  {
36  ar& fInputData;
37  ar& fOutputData;
38  }
BurstData * fOutputData
Definition: PndMQMerger.h:42
BurstData fInputData
Definition: PndMQMerger.h:43

Friends And Related Function Documentation

friend class boost::serialization::access
friend

Definition at line 49 of file PndMQMerger.h.

Member Data Documentation

bool PndMQMerger::fHasBoostSerialization
protected

Definition at line 50 of file PndMQMerger.h.

Referenced by PndMQMerger().

BurstData PndMQMerger::fInputData
protected

Definition at line 43 of file PndMQMerger.h.

Referenced by serialize().

std::map<int, std::map<std::string, BurstData> > PndMQMerger::fInputMap
protected

Definition at line 44 of file PndMQMerger.h.

BurstData* PndMQMerger::fOutputData
protected

Definition at line 42 of file PndMQMerger.h.

Referenced by serialize().

std::vector<bool> PndMQMerger::fRunningStatus
protected

Definition at line 53 of file PndMQMerger.h.


The documentation for this class was generated from the following files: