FairRoot/PandaRoot
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
PndMQMergerTest Class Reference

#include <PndMQMergerTest.h>

Inheritance diagram for PndMQMergerTest:
PndMQMerger

Public Member Functions

 PndMQMergerTest ()
 
template<class Archive >
void serialize (Archive &ar, const unsigned int version)
 

Protected Member Functions

virtual void ProcessData (std::map< std::string, BurstData > &dataToProcess)
 
virtual void Run ()
 

Protected Attributes

BurstDatafOutputData
 
BurstData fInputData
 
std::map< int, std::map
< std::string, BurstData > > 
fInputMap
 
bool fHasBoostSerialization
 
std::vector< bool > fRunningStatus
 

Detailed Description

PndMQMergerTest.h

Since
2012-12-06
Author
D. Klein, A. Rybalchenko

Definition at line 27 of file PndMQMergerTest.h.

Constructor & Destructor Documentation

PndMQMergerTest::PndMQMergerTest ( )

Definition at line 26 of file PndMQMergerTest.cxx.

26  : PndMQMerger()
27 {
28 }

Member Function Documentation

void PndMQMergerTest::ProcessData ( std::map< std::string, BurstData > &  dataToProcess)
protectedvirtual

Implements PndMQMerger.

Definition at line 31 of file PndMQMergerTest.cxx.

32 {
33  for (auto itr : dataToProcess){
34  LOG(INFO) << "Data in Burst: " << itr.first;
35 // for (auto eventItr : itr.second.fData)
36 // for (auto dataItr : eventItr)
37 // LOG(INFO) << dataItr->GetTimeStamp();
38  }
39  LOG(INFO) << "Finished!";
40 }
void PndMQMerger::Run ( )
protectedvirtualinherited

Definition at line 45 of file PndMQMerger.cxx.

References BurstHeader::fBranchName, BurstHeader::fBurstID, BurstData::fHeader, PndMQMerger::fInputData, PndMQMerger::fInputMap, and PndMQMerger::ProcessData().

46 {
47  int direction = 0;
48  int numInputs = fChannels.at("data-in").size();
49 // fRunningStatus.resize(numInputs);
50 
51  LOG(INFO) << "Number of Input Channels: " << numInputs;
52 
53 // boost::this_thread::sleep(boost::posix_time::milliseconds(10000));
54 
55  std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, {"data-in"}));
56 
57  int nMessages;
58  while (CheckCurrentState(RUNNING))
59  {
60  poller->Poll(-1);
61  for (int channelNr = 0; channelNr < numInputs; channelNr++){
62  LOG(INFO) << "---- Reading channel " << channelNr << " ----";
63  if (poller->CheckInput("data-in", channelNr)){
64  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
65 
66  nMessages++;
67 
68  LOG(INFO) << "Get Data for Channel: " << channelNr;
69 
70  if (Receive(msg, "data-in", channelNr) >= 0){
71  LOG(INFO) << "---Data Received--- " << msg->GetSize();
72 
73  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74 // LOG(INFO) << msgStr;
75  std::istringstream ibuffer(msgStr);
76 
77  try {
78  boost::archive::binary_iarchive InputArchive(ibuffer);
79  InputArchive >> fInputData;
80  }
81  catch (boost::archive::archive_exception& e)
82  {
83  LOG(ERROR) << e.what();
84 // continue;
85  }
86  LOG(INFO) << "Data: " << fInputData.fHeader.fBurstID << " " << fInputData.fHeader.fBranchName;
87 // LOG(INFO) << "DataSize: " << fInputData.fData.size() << " " << fInputData.fData[0].size();
88 // for (auto gapItr : fInputData.fData){
89 // for (auto dataItr : gapItr)
90 // LOG(INFO) << "TimeStamp: " << dataItr->GetTimeStamp();
91 // }
92 
94  LOG(INFO) << "InputMap for BurstID: " << fInputData.fHeader.fBurstID << " " << fInputMap[fInputData.fHeader.fBurstID].size();
95 
96  if (fInputMap[fInputData.fHeader.fBurstID].size() == numInputs){ // all input channels have delivered data for this burstID
97  //std::map<std::string, BurstData> dataToProcess = fInputMap[fInputData.fHeader.fBurstID];
98  // fInputMap.erase(fInputMap.find(fInputData.fHeader.fBurstID));
100 
101  // if (fOutputData != 0){
102  // std::ostringstream obuffer;
103  // boost::archive::binary_oarchive OutputArchive(obuffer);
104  // OutputArchive << *fOutputData;
105  // int outputSize = obuffer.str().length();
106  // unique_ptr<FairMQMessage> msgOut(NewMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomClean, fOutputData));
107  // Send(msgOut, "data-out");
108  // }
109  }
110  }
111  }
112  }
113  }
114 }
BurstData fInputData
Definition: PndMQMerger.h:43
std::map< int, std::map< std::string, BurstData > > fInputMap
Definition: PndMQMerger.h:44
virtual void ProcessData(std::map< std::string, BurstData > &dataToProcess)=0
template<class Archive >
void PndMQMerger::serialize ( Archive &  ar,
const unsigned int  version 
)
inlineinherited

Definition at line 34 of file PndMQMerger.h.

References PndMQMerger::fInputData, and PndMQMerger::fOutputData.

35  {
36  ar& fInputData;
37  ar& fOutputData;
38  }
BurstData * fOutputData
Definition: PndMQMerger.h:42
BurstData fInputData
Definition: PndMQMerger.h:43

Member Data Documentation

bool PndMQMerger::fHasBoostSerialization
protectedinherited

Definition at line 50 of file PndMQMerger.h.

Referenced by PndMQMerger::PndMQMerger().

BurstData PndMQMerger::fInputData
protectedinherited

Definition at line 43 of file PndMQMerger.h.

Referenced by PndMQMerger::Run(), and PndMQMerger::serialize().

std::map<int, std::map<std::string, BurstData> > PndMQMerger::fInputMap
protectedinherited

Definition at line 44 of file PndMQMerger.h.

Referenced by PndMQMerger::Run().

BurstData* PndMQMerger::fOutputData
protectedinherited

Definition at line 42 of file PndMQMerger.h.

Referenced by PndMQMerger::serialize().

std::vector<bool> PndMQMerger::fRunningStatus
protectedinherited

Definition at line 53 of file PndMQMerger.h.


The documentation for this class was generated from the following files: