15 #include <boost/thread.hpp> 
   16 #include <boost/bind.hpp> 
   17 #include <boost/archive/binary_oarchive.hpp> 
   19 #include "baseMQtools.h" 
   21 #include "FairMQLogger.h" 
   23 #include "FairMQPoller.h" 
   29         using namespace baseMQ::tools::resolve;
 
   31         if (has_BoostSerialization<FairTimeStamp*, 
void(boost::archive::binary_iarchive&, 
const unsigned int)>::value == 1)
 
   41         LOG(INFO) << 
"FREEMESSAGE called for data: " << 
static_cast<BurstData*
>(hint)->fHeader.fBranchName;
 
   42         delete static_cast<BurstData*>(hint);
 
   48     int numInputs = fChannels.at(
"data-in").size();
 
   51     LOG(INFO) << 
"Number of Input Channels: " << numInputs;
 
   55     std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, {
"data-in"}));
 
   58     while (CheckCurrentState(RUNNING))
 
   61         for (
int channelNr = 0; channelNr < numInputs; channelNr++){
 
   62                 LOG(INFO) << 
"---- Reading channel " << channelNr << 
" ----";
 
   63                 if (poller->CheckInput(
"data-in", channelNr)){
 
   64                                 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
 
   68                                 LOG(INFO) << 
"Get Data for Channel: " << channelNr;
 
   70                                 if (Receive(msg, 
"data-in", channelNr) >= 0){
 
   71                                         LOG(INFO) << 
"---Data Received--- " << msg->GetSize();
 
   73                                         std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
 
   75                                         std::istringstream ibuffer(msgStr);
 
   78                                                 boost::archive::binary_iarchive InputArchive(ibuffer);
 
   81                                         catch (boost::archive::archive_exception& e)
 
   83                                                 LOG(ERROR) << e.what();
 
bool fHasBoostSerialization
std::map< int, std::map< std::string, BurstData > > fInputMap
virtual void ProcessData(std::map< std::string, BurstData > &dataToProcess)=0
void CustomClean(void *data, void *hint)