FairRoot/PandaRoot
MvdMQ/src/devices/PndMvdMQFileSampler.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <PndMvdMQFileSampler.h>
16 
17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 
20 #include "FairMQLogger.h"
21 
22 #include "FairMQMessage.h"
23 #include "TMessage.h"
24 #include "FairEventHeader.h"
25 
26 using namespace std;
27 
29  : FairMQDevice()
30  , fRunAna(NULL)
31  , fSource(NULL)
32  , fInputObjects()
33  , fNObjects(0)
34  , fMaxIndex(-1)
35  , fBranchNames()
36  , fFileNames()
37 {
38 }
39 
41 {
42  fRunAna = new FairRunAna();
43  if ( fFileNames.size() > 0 ) {
44  fSource = new FairFileSource(fFileNames.at(0).c_str());
45  for ( unsigned int ifile = 1 ; ifile < fFileNames.size() ; ifile++ )
46  fSource->AddFile(fFileNames.at(ifile));
47  }
48  fSource->Init();
49  LOG(INFO) << "Going to request " << fBranchNames.size() << " branches:";
50  for ( unsigned int ibrn = 0 ; ibrn < fBranchNames.size() ; ibrn++ ) {
51  LOG(INFO) << " requesting branch \"" << fBranchNames[ibrn].second << "\"";
52  int branchStat = fSource->ActivateObject((TObject**)&fInputBranches[fBranchNames[ibrn].second],fBranchNames[ibrn].second.c_str()); // should check the status...
53  LOG(INFO) << "BranchStat: " << branchStat;
54  if ( fInputBranches[fBranchNames[ibrn].second] ) {
55  fInputObjects.insert(std::pair<std::string, TObject*>(fBranchNames[ibrn].first, fInputBranches[fBranchNames[ibrn].second]));
56  LOG(INFO) << "Activated object \"" << fInputBranches[fBranchNames[ibrn].second] << "\" with name \"" << fBranchNames[ibrn].second << " for channel " << fBranchNames[ibrn].first <<"/ (" << branchStat << ")";
57  fNObjects++;
58  }
59  }
60  if ( fMaxIndex < 0 )
61  fMaxIndex = fSource->CheckMaxEventNo();
62  LOG(INFO) << "Input source has " << fMaxIndex << " events.";
63 }
64 
65 // helper function to clean up the object holding the data after it is transported.
66 void free_tmessage2(void* /*data*/, void *hint)
67 {
68  delete (TMessage*)hint;
69 }
70 
72  int eventCounter = 0;
73 
74  // Check if we are still in the RUNNING state.
75  while (CheckCurrentState(RUNNING)) {
76  if (eventCounter == fMaxIndex)
77  break;
78 
79  Int_t readEventReturn = fSource->ReadEvent(eventCounter);
80 
81  if (readEventReturn != 0)
82  break;
83 
84  int messageIter = 0;
85  for (std::set<std::string>::iterator portIt = fPorts.begin(); portIt != fPorts.end(); portIt++){
86  FairMQParts parts;
87  TMessage* message[1000];
88  for (std::multimap<std::string, TObject*>::iterator dataIt = fInputObjects.lower_bound(*portIt); dataIt != fInputObjects.upper_bound(*portIt); ++dataIt){
89  TNamed* data = (TNamed*)(dataIt->second);
90  LOG(INFO) << *portIt << " : " << dataIt->second << " " << dataIt->second->ClassName() << " " << dataIt->second->GetName();
91  if ( strcmp(dataIt->second->ClassName(),"FairEventHeader") == 0 )
92  LOG(INFO) << "RunNumber: " << ((FairEventHeader*)dataIt->second)->GetRunId();
93  message[messageIter] = new TMessage(kMESS_OBJECT);
94  message[messageIter]->WriteObject(dataIt->second);
95  parts.AddPart(NewMessage(message[messageIter]->Buffer(), message[messageIter]->BufferSize(), free_tmessage2, message[messageIter]));
96  messageIter++;
97  }
98  LOG(INFO) << "Send!";
99  Send(parts, *portIt);
100  }
101 
102  eventCounter++;
103  }
104 
105  LOG(INFO) << "Going out of RUNNING state.";
106 }
107 
109 {
110 }
void free_tmessage2(void *, void *hint)
std::vector< std::string > fFileNames
std::multimap< std::string, TObject * > fInputObjects
std::vector< std::pair< std::string, std::string > > fBranchNames
std::map< std::string, TObject * > fInputBranches