17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
20 #include "FairMQLogger.h"
22 #include "FairMQMessage.h"
24 #include "FairEventHeader.h"
45 for (
unsigned int ifile = 1 ; ifile <
fFileNames.size() ; ifile++ )
49 LOG(INFO) <<
"Going to request " <<
fBranchNames.size() <<
" branches:";
50 for (
unsigned int ibrn = 0 ; ibrn <
fBranchNames.size() ; ibrn++ ) {
51 LOG(INFO) <<
" requesting branch \"" <<
fBranchNames[ibrn].second <<
"\"";
53 LOG(INFO) <<
"BranchStat: " << branchStat;
62 LOG(INFO) <<
"Input source has " <<
fMaxIndex <<
" events.";
68 delete (TMessage*)hint;
75 while (CheckCurrentState(RUNNING)) {
79 Int_t readEventReturn =
fSource->ReadEvent(eventCounter);
81 if (readEventReturn != 0)
85 for (std::set<std::string>::iterator portIt =
fPorts.begin(); portIt !=
fPorts.end(); portIt++){
87 TMessage* message[1000];
88 for (std::multimap<std::string, TObject*>::iterator dataIt =
fInputObjects.lower_bound(*portIt); dataIt !=
fInputObjects.upper_bound(*portIt); ++dataIt){
89 TNamed* data = (TNamed*)(dataIt->second);
90 LOG(INFO) << *portIt <<
" : " << dataIt->second <<
" " << dataIt->second->ClassName() <<
" " << dataIt->second->GetName();
91 if ( strcmp(dataIt->second->ClassName(),
"FairEventHeader") == 0 )
92 LOG(INFO) <<
"RunNumber: " << ((FairEventHeader*)dataIt->second)->GetRunId();
93 message[messageIter] =
new TMessage(kMESS_OBJECT);
94 message[messageIter]->WriteObject(dataIt->second);
95 parts.AddPart(NewMessage(message[messageIter]->Buffer(), message[messageIter]->BufferSize(),
free_tmessage2, message[messageIter]));
105 LOG(INFO) <<
"Going out of RUNNING state.";
void free_tmessage2(void *, void *hint)
std::vector< std::string > fFileNames
std::multimap< std::string, TObject * > fInputObjects
virtual ~PndMvdMQFileSampler()
std::vector< std::pair< std::string, std::string > > fBranchNames
std::set< std::string > fPorts
std::map< std::string, TObject * > fInputBranches