21 FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
27 if (firstRun ==
true){
28 LOG(INFO) <<
"Number of events: " <<
nEvents;
29 for (
int eventNr = 0; eventNr <
nEvents; eventNr++){
30 std::vector<PndSdsHit> tempVector;
31 fTree->GetEntry(eventNr);
32 for (
int i = 0;
i <
fInput->GetEntriesFast();
i++){
36 tempVector.push_back(*hit);
41 if (eventNr % 1000 == 0){
43 unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(
sizeof(
int)));
45 memcpy(header->GetData(), &
status,
sizeof(int));
46 dataOutChannel.SendPart(header);
48 std::ostringstream obuffer;
49 boost::archive::binary_oarchive OutputArchive(obuffer);
51 int outputSize = obuffer.str().length();
52 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
53 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
54 dataOutChannel.Send(msg2);
57 if (fHitVector.size() > 0)
60 if (!CheckCurrentState(RUNNING))
66 if (eventNr + 1 == nEvents){
67 unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(
sizeof(
int)));
69 memcpy(header->GetData(), &
status,
sizeof(int));
70 dataOutChannel.SendPart(header);
72 std::ostringstream obuffer;
73 boost::archive::binary_oarchive OutputArchive(obuffer);
75 int outputSize = obuffer.str().length();
76 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
77 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
78 dataOutChannel.Send(msg2);
81 if (fHitVector.size() > 0)
84 if (!CheckCurrentState(RUNNING))
92 LOG(INFO) <<
"Finished reading data!";
94 unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(
sizeof(
int)));
96 memcpy(header->GetData(), &
status,
sizeof(int));
97 dataOutChannel.Send(header);
99 }
while (CheckCurrentState(RUNNING));
101 LOG(INFO) <<
"I've send " << sendMsgs <<
" messages!";
105 LOG(ERROR) <<
" Boost Serialization not ok";
vector< vector< PndSdsHit > > fHitVector
virtual void InitInputFile()
bool fHasBoostSerialization