FairRoot/PandaRoot
PndMQFileSamplerHits.cxx
Go to the documentation of this file.
1 /*
2  * File: PndMQFileSamplerHits.tpl
3  * Author: winckler, A. Rybalchenko
4  *
5  * Created on March 11, 2014, 12:12 PM
6  */
7 
8 // Implementation of PndMQFileSamplerHits::Run() with Boost transport data format
9 #include "PndMQFileSamplerHits.h"
10 
11 #include "PndMQStatus.h"
12 
14 {
16  {
17  InitInputFile();
18  int sendMsgs = 0;
19 
20  // store the channel references to avoid traversing the map on every loop iteration
21  FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
22 
23  int nEvents = fTree->GetEntries();
24  bool firstRun = true;
25  do
26  {
27  if (firstRun == true){
28  LOG(INFO) << "Number of events: " << nEvents;
29  for (int eventNr = 0; eventNr < nEvents; eventNr++){
30  std::vector<PndSdsHit> tempVector;
31  fTree->GetEntry(eventNr);
32  for (int i = 0; i < fInput->GetEntriesFast(); i++){
33  PndSdsHit* hit = static_cast<PndSdsHit*>(fInput->At(i));
34  if (!hit)
35  continue;
36  tempVector.push_back(*hit);
37  }
38 
39  fHitVector.push_back(tempVector);
40 
41  if (eventNr % 1000 == 0){
42 
43  unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(sizeof(int)));
45  memcpy(header->GetData(), &status, sizeof(int));
46  dataOutChannel.SendPart(header);
47 
48  std::ostringstream obuffer;
49  boost::archive::binary_oarchive OutputArchive(obuffer);
50  OutputArchive << fHitVector;
51  int outputSize = obuffer.str().length();
52  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
53  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
54  dataOutChannel.Send(msg2);
55 
56 
57  if (fHitVector.size() > 0)
58  fHitVector.clear();
59 
60  if (!CheckCurrentState(RUNNING))
61  {
62  break;
63  }
64  }
65 
66  if (eventNr + 1 == nEvents){
67  unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(sizeof(int)));
69  memcpy(header->GetData(), &status, sizeof(int));
70  dataOutChannel.SendPart(header);
71 
72  std::ostringstream obuffer;
73  boost::archive::binary_oarchive OutputArchive(obuffer);
74  OutputArchive << fHitVector;
75  int outputSize = obuffer.str().length();
76  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
77  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
78  dataOutChannel.Send(msg2);
79 
80 
81  if (fHitVector.size() > 0)
82  fHitVector.clear();
83 
84  if (!CheckCurrentState(RUNNING))
85  {
86  break;
87  }
88  }
89 
90  }
91  firstRun = false;
92  LOG(INFO) << "Finished reading data!";
93 
94  unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(sizeof(int)));
96  memcpy(header->GetData(), &status, sizeof(int));
97  dataOutChannel.Send(header);
98  }
99  } while (CheckCurrentState(RUNNING));
100 
101  LOG(INFO) << "I've send " << sendMsgs << " messages!";
102  }
103  else
104  {
105  LOG(ERROR) << " Boost Serialization not ok";
106  }
107 }
Int_t i
Definition: run_full.C:25
vector< vector< PndSdsHit > > fHitVector
Int_t nEvents
Definition: hit_dirc.C:11
PndSdsMCPoint * hit
Definition: anasim.C:70
int status[10]
Definition: f_Init.h:28