57 int numInputs = fChannels.at(
"data-in").size();
60 const FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
61 const FairMQChannel& statusChannel = fChannels.at(
"status-out").at(0);
62 FairMQChannel* dataInChannels[fChannels.at(
"data-in").size()];
63 LOG(INFO) <<
"Number of Input Channels: " << numInputs;
65 for (
int i = 0;
i < numInputs; ++
i)
67 dataInChannels[
i] = &(fChannels.at(
"data-in").at(
i));
76 std::vector<int> fillLevel(numInputs,0);
79 bool stopMessageOnce =
true;
81 while (CheckCurrentState(RUNNING))
84 for (
int channelNr = 0; channelNr < numInputs; channelNr++){
86 if (fillLevel[channelNr] == 0 &&
fRunningStatus[channelNr] ==
true){
88 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
89 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
91 if (dataInChannels[channelNr]->Receive(header) > 0)
93 int status = *(
static_cast<int*
>(header->GetData()));
98 LOG(INFO) <<
"STOP-Status received for channel: " << channelNr;
100 if (dataInChannels[channelNr]->ExpectsAnotherPart())
103 if (dataInChannels[channelNr]->Receive(msg) > 0){
104 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
105 std::istringstream ibuffer(msgStr);
107 boost::archive::binary_iarchive InputArchive(ibuffer);
112 catch (boost::archive::archive_exception& e)
114 LOG(ERROR) << e.what();
123 if (fillLevel[channelNr] == 0 &&
fRunningStatus[channelNr] ==
false)
126 LOG(INFO) <<
"GlobarRunningStatus set to false for channel " << channelNr;
133 if (eventCounter++ % 100 == 0){
134 LOG(INFO) << eventCounter <<
" nEvents: " <<
fEventData.size() <<
" hits in Event " <<
fEventData.front().size()
135 <<
" timeStamp: " << TString::Format(
"%12.0f",
fEventData.front().front().GetTimeStamp()).Data()
136 <<
" sensorID " <<
fEventData.front().front().GetSensorID();
138 LOG(INFO) <<
"ChannelsInEvent: ";
142 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
144 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
145 statusChannel.SendPart(headerCopy);
147 std::ostringstream obuffer;
148 boost::archive::binary_oarchive OutputArchive(obuffer);
151 int outputSize = obuffer.str().length();
152 unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(outputSize));
153 memcpy(msg->GetData(), obuffer.str().c_str(), outputSize);
155 statusChannel.Send(msg);
159 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
161 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
162 dataOutChannel.SendPart(headerCopy);
164 std::ostringstream obuffer;
165 boost::archive::binary_oarchive OutputArchive(obuffer);
168 int outputSize = obuffer.str().length();
169 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
170 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
172 dataOutChannel.Send(msg2);
193 LOG(INFO) <<
"STOP-Signal received for one input";
194 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
196 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
197 dataOutChannel.Send(headerCopy);
198 stopMessageOnce =
false;
void AddData(vector< vector< vector< PndSdsHit > > > &data)
std::vector< std::vector< PndSdsHit > > fHitData
PndMQHitsEventBuilder * fBuilder
std::vector< int > fSensorsInEvent
std::vector< int > GetInputDataLevel()
std::vector< bool > fRunningStatus
std::vector< std::vector< PndSdsHit > > fEventData
std::vector< std::vector< std::vector< PndSdsHit > > > fDataFromChannels
vector< vector< PndSdsHit > > GetEvents()
bool fGlobalRunningStatus
vector< int > GetSensorsInEvent()