FairRoot/PandaRoot
PndMQMerger.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <boost/thread.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/archive/binary_oarchive.hpp>
18 
19 #include "baseMQtools.h"
20 
21 #include "FairMQLogger.h"
22 #include "PndMQMerger.h"
23 #include "FairMQPoller.h"
24 
25 using namespace std;
26 
27 PndMQMerger::PndMQMerger() : fHasBoostSerialization(false), fOutputData(0)
28 {
29  using namespace baseMQ::tools::resolve;
30  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
31  if (has_BoostSerialization<FairTimeStamp*, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
33 }
34 
36 {
37 }
38 
39 void CustomClean(void* data, void *hint)
40 {
41  LOG(INFO) << "FREEMESSAGE called for data: " << static_cast<BurstData*>(hint)->fHeader.fBranchName;
42  delete static_cast<BurstData*>(hint);
43 }
44 
46 {
47  int direction = 0;
48  int numInputs = fChannels.at("data-in").size();
49 // fRunningStatus.resize(numInputs);
50 
51  LOG(INFO) << "Number of Input Channels: " << numInputs;
52 
53 // boost::this_thread::sleep(boost::posix_time::milliseconds(10000));
54 
55  std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, {"data-in"}));
56 
57  int nMessages;
58  while (CheckCurrentState(RUNNING))
59  {
60  poller->Poll(-1);
61  for (int channelNr = 0; channelNr < numInputs; channelNr++){
62  LOG(INFO) << "---- Reading channel " << channelNr << " ----";
63  if (poller->CheckInput("data-in", channelNr)){
64  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
65 
66  nMessages++;
67 
68  LOG(INFO) << "Get Data for Channel: " << channelNr;
69 
70  if (Receive(msg, "data-in", channelNr) >= 0){
71  LOG(INFO) << "---Data Received--- " << msg->GetSize();
72 
73  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74 // LOG(INFO) << msgStr;
75  std::istringstream ibuffer(msgStr);
76 
77  try {
78  boost::archive::binary_iarchive InputArchive(ibuffer);
79  InputArchive >> fInputData;
80  }
81  catch (boost::archive::archive_exception& e)
82  {
83  LOG(ERROR) << e.what();
84 // continue;
85  }
86  LOG(INFO) << "Data: " << fInputData.fHeader.fBurstID << " " << fInputData.fHeader.fBranchName;
87 // LOG(INFO) << "DataSize: " << fInputData.fData.size() << " " << fInputData.fData[0].size();
88 // for (auto gapItr : fInputData.fData){
89 // for (auto dataItr : gapItr)
90 // LOG(INFO) << "TimeStamp: " << dataItr->GetTimeStamp();
91 // }
92 
94  LOG(INFO) << "InputMap for BurstID: " << fInputData.fHeader.fBurstID << " " << fInputMap[fInputData.fHeader.fBurstID].size();
95 
96  if (fInputMap[fInputData.fHeader.fBurstID].size() == numInputs){ // all input channels have delivered data for this burstID
97  //std::map<std::string, BurstData> dataToProcess = fInputMap[fInputData.fHeader.fBurstID];
98  // fInputMap.erase(fInputMap.find(fInputData.fHeader.fBurstID));
100 
101  // if (fOutputData != 0){
102  // std::ostringstream obuffer;
103  // boost::archive::binary_oarchive OutputArchive(obuffer);
104  // OutputArchive << *fOutputData;
105  // int outputSize = obuffer.str().length();
106  // unique_ptr<FairMQMessage> msgOut(NewMessage(const_cast<char*>(obuffer.str().c_str()), outputSize, CustomClean, fOutputData));
107  // Send(msgOut, "data-out");
108  // }
109  }
110  }
111  }
112  }
113  }
114 }
BurstData fInputData
Definition: PndMQMerger.h:43
virtual ~PndMQMerger()
Definition: PndMQMerger.cxx:35
bool fHasBoostSerialization
Definition: PndMQMerger.h:50
std::map< int, std::map< std::string, BurstData > > fInputMap
Definition: PndMQMerger.h:44
virtual void ProcessData(std::map< std::string, BurstData > &dataToProcess)=0
virtual void Run()
Definition: PndMQMerger.cxx:45
void CustomClean(void *data, void *hint)
Definition: PndMQMerger.cxx:39