19 FairMQChannel& dataInChannel = fChannels.at(
"data-in").at(0);
21 while (CheckCurrentState(RUNNING))
23 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
24 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
25 if (dataInChannel.Receive(header) > 0)
27 int status = *(
static_cast<int*
>(header->GetData()));
29 if (dataInChannel.ExpectsAnotherPart())
32 if (dataInChannel.Receive(msg)) {
33 string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
34 istringstream ibuffer(msgStr);
35 boost::archive::binary_iarchive InputArchive(ibuffer);
41 catch (boost::archive::archive_exception& e)
43 LOG(ERROR) << e.what();
48 bool dataAboveTimeThreshold =
false;
49 double timeStampThreshold = 0;
52 int numData = eventIter.size();
53 for (Int_t
i = 0;
i < numData; ++
i)
55 if (eventIter[
i].GetTimeStamp() > timeStampThreshold){
57 dataAboveTimeThreshold =
true;
61 if (dataAboveTimeThreshold){
64 LOG(ERROR) <<
"PndMQFileSinkHits::Run(): No Output array!";
71 if (receivedMsgs % 1000 == 0 && myHit != 0){
72 LOG(INFO) << receivedMsgs <<
" : " << myHit->GetTimeStamp();
77 LOG(INFO) <<
"STOP-Signal Received!";
79 LOG(INFO) <<
"AutoSave called!";
92 LOG(INFO) <<
"I've received " << receivedMsgs <<
" messages!";
96 LOG(ERROR) <<
" Boost Serialization not ok";
vector< vector< PndSdsHit > > fHitVector
bool fHasBoostSerialization