15 #include <boost/thread.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/archive/binary_oarchive.hpp>
19 #include "baseMQtools.h"
21 #include "FairMQLogger.h"
23 #include "FairMQPoller.h"
29 using namespace baseMQ::tools::resolve;
31 if (has_BoostSerialization<FairTimeStamp*,
void(boost::archive::binary_iarchive&,
const unsigned int)>::value == 1)
41 LOG(INFO) <<
"FREEMESSAGE called for data: " <<
static_cast<BurstData*
>(hint)->fHeader.fBranchName;
42 delete static_cast<BurstData*>(hint);
48 int numInputs = fChannels.at(
"data-in").size();
51 LOG(INFO) <<
"Number of Input Channels: " << numInputs;
55 std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, {
"data-in"}));
58 while (CheckCurrentState(RUNNING))
61 for (
int channelNr = 0; channelNr < numInputs; channelNr++){
62 LOG(INFO) <<
"---- Reading channel " << channelNr <<
" ----";
63 if (poller->CheckInput(
"data-in", channelNr)){
64 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
68 LOG(INFO) <<
"Get Data for Channel: " << channelNr;
70 if (Receive(msg,
"data-in", channelNr) >= 0){
71 LOG(INFO) <<
"---Data Received--- " << msg->GetSize();
73 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
75 std::istringstream ibuffer(msgStr);
78 boost::archive::binary_iarchive InputArchive(ibuffer);
81 catch (boost::archive::archive_exception& e)
83 LOG(ERROR) << e.what();
bool fHasBoostSerialization
std::map< int, std::map< std::string, BurstData > > fInputMap
virtual void ProcessData(std::map< std::string, BurstData > &dataToProcess)=0
void CustomClean(void *data, void *hint)