17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 #include <boost/archive/binary_oarchive.hpp>
20 #include <boost/archive/binary_iarchive.hpp>
23 #include "baseMQtools.h"
25 #include "FairMQLogger.h"
26 #include "mrfdata_8b.h"
35 using namespace baseMQ::tools::resolve;
36 bool checkOutputClass =
false;
38 if (is_same<boost::archive::binary_oarchive, boost::archive::binary_oarchive>::value)
40 if (has_BoostSerialization<
PndSdsDigiTopix4,
void(boost::archive::binary_oarchive&,
const unsigned int)>::value == 1)
42 checkOutputClass =
true;
57 int numInputs = fChannels.at(
"data-in").size();
60 const FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
61 const FairMQChannel& statusChannel = fChannels.at(
"status-out").at(0);
62 FairMQChannel* dataInChannels[fChannels.at(
"data-in").size()];
63 LOG(INFO) <<
"Number of Input Channels: " << numInputs;
65 for (
int i = 0;
i < numInputs; ++
i)
67 dataInChannels[
i] = &(fChannels.at(
"data-in").at(
i));
76 std::vector<int> fillLevel(numInputs,0);
79 bool stopMessageOnce =
true;
81 while (CheckCurrentState(RUNNING))
84 for (
int channelNr = 0; channelNr < numInputs; channelNr++){
86 if (fillLevel[channelNr] == 0 &&
fRunningStatus[channelNr] ==
true){
88 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
89 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
91 if (dataInChannels[channelNr]->Receive(header) > 0)
93 int status = *(
static_cast<int*
>(header->GetData()));
98 LOG(INFO) <<
"STOP-Status received for channel: " << channelNr;
100 if (dataInChannels[channelNr]->ExpectsAnotherPart())
103 if (dataInChannels[channelNr]->Receive(msg) > 0){
104 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
105 std::istringstream ibuffer(msgStr);
107 boost::archive::binary_iarchive InputArchive(ibuffer);
112 catch (boost::archive::archive_exception& e)
114 LOG(ERROR) << e.what();
123 if (fillLevel[channelNr] == 0 &&
fRunningStatus[channelNr] ==
false)
126 LOG(INFO) <<
"GlobarRunningStatus set to false for channel " << channelNr;
133 if (eventCounter++ % 100 == 0){
134 LOG(INFO) << eventCounter <<
" nEvents: " <<
fEventData.size() <<
" hits in Event " <<
fEventData.front().size()
135 <<
" timeStamp: " << TString::Format(
"%12.0f",
fEventData.front().front().GetTimeStamp()).Data()
136 <<
" sensorID " <<
fEventData.front().front().GetSensorID();
138 LOG(INFO) <<
"ChannelsInEvent: ";
142 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
144 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
145 statusChannel.SendPart(headerCopy);
147 std::ostringstream obuffer;
148 boost::archive::binary_oarchive OutputArchive(obuffer);
151 int outputSize = obuffer.str().length();
152 unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(outputSize));
153 memcpy(msg->GetData(), obuffer.str().c_str(), outputSize);
155 statusChannel.Send(msg);
159 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
161 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
162 dataOutChannel.SendPart(headerCopy);
164 std::ostringstream obuffer;
165 boost::archive::binary_oarchive OutputArchive(obuffer);
168 int outputSize = obuffer.str().length();
169 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
170 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
172 dataOutChannel.Send(msg2);
193 LOG(INFO) <<
"STOP-Signal received for one input";
194 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
196 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
197 dataOutChannel.Send(headerCopy);
198 stopMessageOnce =
false;
208 FairMQDevice::SetProperty(key, value);
218 return FairMQDevice::GetProperty(key, default_);
227 FairMQDevice::SetProperty(key, value);
237 return FairMQDevice::GetProperty(key, default_);
void AddData(vector< vector< vector< PndSdsHit > > > &data)
bool fHasBoostSerialization
std::vector< std::vector< PndSdsHit > > fHitData
PndMQHitsEventBuilder * fBuilder
Data class to store the digi output of a pixel module.
std::vector< int > fSensorsInEvent
virtual ~PndMQHitEventDevice()
std::vector< int > GetInputDataLevel()
virtual std::string GetProperty(const int key, const std::string &default_="")
std::vector< bool > fRunningStatus
std::vector< std::vector< PndSdsHit > > fEventData
virtual void SetProperty(const int key, const std::string &value)
std::vector< std::vector< std::vector< PndSdsHit > > > fDataFromChannels
vector< vector< PndSdsHit > > GetEvents()
bool fGlobalRunningStatus
vector< int > GetSensorsInEvent()