FairRoot/PandaRoot
MvdOfflineTBAnalysis_Topix4/MQ/PndMvdMQFileSampler.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <PndMvdMQFileSampler.h>
16 
17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 
20 #include "FairMQLogger.h"
21 
22 #include "FairMQMessage.h"
23 #include "TMessage.h"
24 
25 using namespace std;
26 
28  : FairMQDevice()
29  , fRunAna(NULL)
30  , fSource(NULL)
31  , fInputObjects()
32  , fNObjects(0)
33  , fMaxIndex(-1)
34  , fBranchNames()
35  , fFileNames()
36 {
37 }
38 
40 {
41  fRunAna = new FairRunAna();
42  if ( fFileNames.size() > 0 ) {
43  fSource = new FairFileSource(fFileNames.at(0).c_str());
44  for ( unsigned int ifile = 1 ; ifile < fFileNames.size() ; ifile++ )
45  fSource->AddFile(fFileNames.at(ifile));
46  }
47  fSource->Init();
48  LOG(INFO) << "Going to request " << fBranchNames.size() << " branches:";
49  for ( unsigned int ibrn = 0 ; ibrn < fBranchNames.size() ; ibrn++ ) {
50  LOG(INFO) << " requesting branch \"" << fBranchNames[ibrn] << "\"";
51  int branchStat = fSource->ActivateObject((TObject**)&fInputObjects[fNObjects],fBranchNames[ibrn].c_str()); // should check the status...
52  if ( fInputObjects[fNObjects] ) {
53  LOG(INFO) << "Activated object \"" << fInputObjects[fNObjects] << "\" with name \"" << fBranchNames[ibrn] << "\" (" << branchStat << ")";
54  fNObjects++;
55  }
56  }
57  if ( fMaxIndex < 0 )
58  fMaxIndex = fSource->CheckMaxEventNo();
59  LOG(INFO) << "Input source has " << fMaxIndex << " events.";
60 }
61 
62 // helper function to clean up the object holding the data after it is transported.
63 void free_tmessage2(void* /*data*/, void *hint)
64 {
65  delete (TMessage*)hint;
66 }
67 
69 {
70  int eventCounter = 0;
71 
72  // Check if we are still in the RUNNING state.
73  while (CheckCurrentState(RUNNING))
74  {
75  if ( eventCounter == fMaxIndex ) break;
76 
77  Int_t readEventReturn = fSource->ReadEvent(eventCounter);
78 
79  if ( readEventReturn != 0 ) break;
80 
81  TMessage* message[1000];
82  FairMQParts parts;
83 
84  for ( int iobj = 0 ; iobj < fNObjects ; iobj++ ) {
85  message[iobj] = new TMessage(kMESS_OBJECT);
86  message[iobj]->WriteObject(fInputObjects[iobj]);
87  parts.AddPart(NewMessage(message[iobj]->Buffer(), message[iobj]->BufferSize(), free_tmessage2, message[iobj]));
88  }
89 
90  Send(parts, "data-out");
91 
92  eventCounter++;
93  }
94 
95  LOG(INFO) << "Going out of RUNNING state.";
96 }
97 
99 {
100 }
void free_tmessage2(void *, void *hint)
std::vector< std::string > fFileNames
std::multimap< std::string, TObject * > fInputObjects
std::vector< std::pair< std::string, std::string > > fBranchNames