FairRoot/PandaRoot
Public Member Functions | Static Public Member Functions | Protected Member Functions | Protected Attributes | Private Attributes | Friends | List of all members
PndMQBurstProcessor Class Referenceabstract

#include <PndMQBurstProcessor.h>

Inheritance diagram for PndMQBurstProcessor:
PndMQMvdPixelDigiProcessorBursts

Public Member Functions

 PndMQBurstProcessor ()
 
virtual ~PndMQBurstProcessor ()
 
virtual void UpdateParameters ()
 
virtual FairParGenericSet * UpdateParameter (FairParGenericSet *thisPar)
 
virtual void SetParameters ()
 
virtual void ProcessData ()=0
 
template<class Archive >
void serialize (Archive &ar, const unsigned int version)
 

Static Public Member Functions

static void CustomCleanupParameters (void *data, void *hint)
 
static void free_string (void *data, void *hint)
 

Protected Member Functions

virtual void Run ()
 

Protected Attributes

BurstData fBurstDataIn
 
BurstData fBurstDataOut
 
int fCurrentRunId
 
int fNewRunId
 
TList * fParCList
 

Private Attributes

bool fHasBoostSerialization
 

Friends

class boost::serialization::access
 

Detailed Description

Definition at line 50 of file PndMQBurstProcessor.h.

Constructor & Destructor Documentation

PndMQBurstProcessor::PndMQBurstProcessor ( )
inline

Definition at line 53 of file PndMQBurstProcessor.h.

53  :
55  {
56  gSystem->ResetSignal(kSigInterrupt);
57  gSystem->ResetSignal(kSigTermination);
58 
59  // Check if boost serialization is available if it is chosen
60  using namespace baseMQ::tools::resolve;
61  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
62  if (is_same<boost::archive::binary_iarchive, boost::archive::binary_iarchive>::value || is_same<boost::archive::binary_iarchive, boost::archive::text_iarchive>::value)
63  {
64  if (has_BoostSerialization<FairTimeStamp, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
65  {
67  }
68  }
69 
70 
71  }
virtual PndMQBurstProcessor::~PndMQBurstProcessor ( )
inlinevirtual

Definition at line 73 of file PndMQBurstProcessor.h.

74  {
75  fParCList->Clear();
76  }

Member Function Documentation

void PndMQBurstProcessor::CustomCleanupParameters ( void *  data,
void *  hint 
)
static

Definition at line 26 of file PndMQBurstProcessor.cxx.

Referenced by UpdateParameter().

27 {
28  delete (std::string*)hint;
29 }
void PndMQBurstProcessor::free_string ( void *  data,
void *  hint 
)
static

Definition at line 31 of file PndMQBurstProcessor.cxx.

Referenced by Run().

32 {
33  delete static_cast<std::string*>(hint);
34 }
virtual void PndMQBurstProcessor::ProcessData ( )
pure virtual

Implemented in PndMQMvdPixelDigiProcessorBursts.

Referenced by Run().

void PndMQBurstProcessor::Run ( )
protectedvirtual

Definition at line 36 of file PndMQBurstProcessor.cxx.

References fBurstDataIn, fBurstDataOut, BurstHeader::fBurstID, fCurrentRunId, BurstData::fData, fHasBoostSerialization, BurstData::fHeader, fNewRunId, free_string(), BurstHeader::fRunID, ProcessData(), SetParameters(), and UpdateParameters().

37 {
39  {
40  while (CheckCurrentState(RUNNING))
41  {
42  FairMQParts parts;
43  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
44  if (Receive(msg, "data-in") >= 0)
45  {
46  LOG(INFO) << "Received data! " << msg->GetSize() << std::endl;
47  string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
48  istringstream ibuffer(msgStr);
49  if(!ibuffer.good())
50  LOG(INFO) << "IBUFFER IS BAD!";
51  try {
52  boost::archive::binary_iarchive InputArchive(ibuffer);
53 
54  InputArchive >> fBurstDataIn;
55  }
56  catch (boost::archive::archive_exception& e)
57  {
58  LOG(ERROR) << e.what();
59 // continue;
60  }
62  if (fNewRunId != fCurrentRunId) {
65  SetParameters();
66  }
67 
68  if (fBurstDataIn.fData.size() < 1) continue;
69 // LOG(INFO) << "BurstData size: " << fBurstDataIn.fData[0].size() << " BurstID: " << fBurstDataIn.fHeader.fBurstID;
70 
71  ProcessData();
72  }
73  if (fBurstDataOut.fData.size() > 0){
75  std::ostringstream obuffer;
76  boost::archive::binary_oarchive OutputArchive(obuffer);
77  OutputArchive << fBurstDataOut;
78  std::string* strMsg = new std::string(obuffer.str());
79  unique_ptr<FairMQMessage> msgOut(NewMessage(const_cast<char*>(strMsg->c_str()), strMsg->length(), free_string, strMsg));
80  LOG(INFO) << "Data sent: " << fBurstDataOut.fHeader.fBranchName << " BurstID: " << fBurstDataOut.fHeader.fBurstID << " size: " << msgOut->GetSize();
81  int event = 0;
82 // for (auto eventItr : fBurstDataOut->fData){
83 // for (auto dataItr : eventItr){
84 // LOG(INFO) << event << " : " << dataItr->GetTimeStamp();
85 // }
86 // event++;
87 // }
88  Send(msgOut, "data-out");
89  }
90  }
91  }
92  else
93  {
94  LOG(ERROR) << " Boost Serialization not ok";
95  }
96 }
static void free_string(void *data, void *hint)
virtual void UpdateParameters()
virtual void SetParameters()
virtual void ProcessData()=0
std::vector< std::vector< FairTimeStamp * > > fData
template<class Archive >
void PndMQBurstProcessor::serialize ( Archive &  ar,
const unsigned int  version 
)
inline

Definition at line 89 of file PndMQBurstProcessor.h.

90  {
91  ar& fBurstDataIn;
92  ar& fBurstDataOut;
93  }
virtual void PndMQBurstProcessor::SetParameters ( )
inlinevirtual

Reimplemented in PndMQMvdPixelDigiProcessorBursts.

Definition at line 81 of file PndMQBurstProcessor.h.

Referenced by Run().

81 {};
FairParGenericSet * PndMQBurstProcessor::UpdateParameter ( FairParGenericSet *  thisPar)
virtual

Definition at line 106 of file PndMQBurstProcessor.cxx.

References CustomCleanupParameters(), and fCurrentRunId.

Referenced by UpdateParameters().

106  {
107  std::string paramName = thisPar->GetName();
108  // boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
109  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
110  LOG(WARN) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar << ")";
111  std::unique_ptr<FairMQMessage> req(NewMessage(const_cast<char*>(reqStr->c_str()), reqStr->length(), CustomCleanupParameters, reqStr));
112  std::unique_ptr<FairMQMessage> rep(NewMessage());
113 
114  if (Send(req,"param") > 0)
115  {
116  if (Receive(rep,"param") > 0)
117  {
118  TMessage2 tm(rep->GetData(), rep->GetSize());
119  thisPar = (FairParGenericSet*)tm.ReadObject(tm.GetClass());
120  LOG(WARN) << "Received parameter"<< paramName <<" from the server (" << thisPar << ")" << tm.GetClass()->GetName() << " DataSize: " << rep->GetSize();
121  thisPar->print();
122  return thisPar;
123  }
124  }
125  return NULL;
126 }
static void CustomCleanupParameters(void *data, void *hint)
void PndMQBurstProcessor::UpdateParameters ( )
virtual

Definition at line 98 of file PndMQBurstProcessor.cxx.

References fParCList, and UpdateParameter().

Referenced by Run().

98  {
99  for ( int iparC = 0 ; iparC < fParCList->GetEntries() ; iparC++ ) {
100  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
101  fParCList->Remove(tempObj);
102  fParCList->AddAt(UpdateParameter(tempObj),iparC);
103  }
104 }
virtual FairParGenericSet * UpdateParameter(FairParGenericSet *thisPar)

Friends And Related Function Documentation

friend class boost::serialization::access
friend

Definition at line 109 of file PndMQBurstProcessor.h.

Member Data Documentation

BurstData PndMQBurstProcessor::fBurstDataIn
protected

Definition at line 97 of file PndMQBurstProcessor.h.

Referenced by PndMQMvdPixelDigiProcessorBursts::ProcessData(), and Run().

BurstData PndMQBurstProcessor::fBurstDataOut
protected

Definition at line 98 of file PndMQBurstProcessor.h.

Referenced by PndMQMvdPixelDigiProcessorBursts::ProcessData(), and Run().

int PndMQBurstProcessor::fCurrentRunId
protected

Definition at line 99 of file PndMQBurstProcessor.h.

Referenced by Run(), and UpdateParameter().

bool PndMQBurstProcessor::fHasBoostSerialization
private

Definition at line 110 of file PndMQBurstProcessor.h.

Referenced by Run().

int PndMQBurstProcessor::fNewRunId
protected

Definition at line 100 of file PndMQBurstProcessor.h.

Referenced by Run().

TList* PndMQBurstProcessor::fParCList
protected

The documentation for this class was generated from the following files: