FairRoot/PandaRoot
PndMQFileSinkHits.cxx
Go to the documentation of this file.
1 /*
2  * File: PndMQFileSinkHits.tpl
3  * Author: winckler, A. Rybalchenko
4  *
5  * Created on March 11, 2014, 12:12 PM
6  */
7 
8 // Implementation of PndMQFileSinkHits::Run() with Boost transport data format
9 #include "PndMQFileSinkHits.h"
10 #include "PndMQStatus.h"
11 
13 {
15  {
16  int receivedMsgs = 0;
17 
18  // store the channel references to avoid traversing the map on every loop iteration
19  FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
20 
21  while (CheckCurrentState(RUNNING))
22  {
23  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
24  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
25  if (dataInChannel.Receive(header) > 0)
26  {
27  int status = *(static_cast<int*>(header->GetData()));
28 
29  if (dataInChannel.ExpectsAnotherPart())
30  {
31  receivedMsgs++;
32  if (dataInChannel.Receive(msg)) {
33  string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
34  istringstream ibuffer(msgStr);
35  boost::archive::binary_iarchive InputArchive(ibuffer);
36  //LOG(INFO) << "Received Message: " << receivedMsgs;
37  try
38  {
39  InputArchive >> fHitVector;
40  }
41  catch (boost::archive::archive_exception& e)
42  {
43  LOG(ERROR) << e.what();
44  }
45 
46 
47  PndSdsHit* myHit = 0;
48  bool dataAboveTimeThreshold = false;
49  double timeStampThreshold = 0;
50  for (auto& eventIter : fHitVector){
51  fOutput->Delete();
52  int numData = eventIter.size();
53  for (Int_t i = 0; i < numData; ++i)
54  {
55  if (eventIter[i].GetTimeStamp() > timeStampThreshold){
56  myHit = new ((*fOutput)[fOutput->GetEntriesFast()]) PndSdsHit(eventIter[i]);
57  dataAboveTimeThreshold = true;
58  }
59  // LOG(INFO) << "Data: " << i << " " << fHitVector.at(i).GetTimeStamp();
60  }
61  if (dataAboveTimeThreshold){
62  if (fOutput->IsEmpty())
63  {
64  LOG(ERROR) << "PndMQFileSinkHits::Run(): No Output array!";
65  } else {
66 
67  fTree->Fill();
68  }
69  }
70  }
71  if (receivedMsgs % 1000 == 0 && myHit != 0){
72  LOG(INFO) << receivedMsgs << " : " << myHit->GetTimeStamp();
73  }
74  }
75  }
76  if (status == PndMQStatus::STOP){
77  LOG(INFO) << "STOP-Signal Received!";
78  fTree->AutoSave();
79  LOG(INFO) << "AutoSave called!";
80  fTree->Write();
81  fOutFile->Close();
82  }
83 
84  }
85 
86  if (fHitVector.size() > 0)
87  {
88  fHitVector.clear();
89  }
90  }
91 
92  LOG(INFO) << "I've received " << receivedMsgs << " messages!";
93  }
94  else
95  {
96  LOG(ERROR) << " Boost Serialization not ok";
97  }
98 }
Int_t i
Definition: run_full.C:25
vector< vector< PndSdsHit > > fHitVector
TClonesArray * fOutput
int status[10]
Definition: f_Init.h:28