FairRoot/PandaRoot
PndMQBurstProcessor.cxx
Go to the documentation of this file.
1 /*
2  * File: PndMQBurstProcessor.tpl
3  * Author: winckler, A. Rybalchenko
4  *
5  * Created on March 11, 2014, 12:12 PM
6  */
7 
8 // Implementation of PndMQBurstProcessor::Run() with Boost transport data format
9 #include <PndMQBurstProcessor.h>
10 #include "PndMQGapEventBuilder.h"
13 #include "PndMapSorter.h"
14 #include "PndGeoHandling.h"
15 
16 class TMessage2 : public TMessage
17 {
18  public:
19  TMessage2(void* buf, Int_t len)
20  : TMessage(buf, len)
21  {
22  ResetBit(kIsOwner);
23  }
24 };
25 
27 {
28  delete (std::string*)hint;
29 }
30 
31 void PndMQBurstProcessor::free_string(void *data, void *hint)
32 {
33  delete static_cast<std::string*>(hint);
34 }
35 
37 {
39  {
40  while (CheckCurrentState(RUNNING))
41  {
42  FairMQParts parts;
43  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
44  if (Receive(msg, "data-in") >= 0)
45  {
46  LOG(INFO) << "Received data! " << msg->GetSize() << std::endl;
47  string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
48  istringstream ibuffer(msgStr);
49  if(!ibuffer.good())
50  LOG(INFO) << "IBUFFER IS BAD!";
51  try {
52  boost::archive::binary_iarchive InputArchive(ibuffer);
53 
54  InputArchive >> fBurstDataIn;
55  }
56  catch (boost::archive::archive_exception& e)
57  {
58  LOG(ERROR) << e.what();
59 // continue;
60  }
62  if (fNewRunId != fCurrentRunId) {
65  SetParameters();
66  }
67 
68  if (fBurstDataIn.fData.size() < 1) continue;
69 // LOG(INFO) << "BurstData size: " << fBurstDataIn.fData[0].size() << " BurstID: " << fBurstDataIn.fHeader.fBurstID;
70 
71  ProcessData();
72  }
73  if (fBurstDataOut.fData.size() > 0){
75  std::ostringstream obuffer;
76  boost::archive::binary_oarchive OutputArchive(obuffer);
77  OutputArchive << fBurstDataOut;
78  std::string* strMsg = new std::string(obuffer.str());
79  unique_ptr<FairMQMessage> msgOut(NewMessage(const_cast<char*>(strMsg->c_str()), strMsg->length(), free_string, strMsg));
80  LOG(INFO) << "Data sent: " << fBurstDataOut.fHeader.fBranchName << " BurstID: " << fBurstDataOut.fHeader.fBurstID << " size: " << msgOut->GetSize();
81  int event = 0;
82 // for (auto eventItr : fBurstDataOut->fData){
83 // for (auto dataItr : eventItr){
84 // LOG(INFO) << event << " : " << dataItr->GetTimeStamp();
85 // }
86 // event++;
87 // }
88  Send(msgOut, "data-out");
89  }
90  }
91  }
92  else
93  {
94  LOG(ERROR) << " Boost Serialization not ok";
95  }
96 }
97 
99  for ( int iparC = 0 ; iparC < fParCList->GetEntries() ; iparC++ ) {
100  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
101  fParCList->Remove(tempObj);
102  fParCList->AddAt(UpdateParameter(tempObj),iparC);
103  }
104 }
105 
106 FairParGenericSet* PndMQBurstProcessor::UpdateParameter(FairParGenericSet* thisPar) {
107  std::string paramName = thisPar->GetName();
108  // boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
109  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
110  LOG(WARN) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar << ")";
111  std::unique_ptr<FairMQMessage> req(NewMessage(const_cast<char*>(reqStr->c_str()), reqStr->length(), CustomCleanupParameters, reqStr));
112  std::unique_ptr<FairMQMessage> rep(NewMessage());
113 
114  if (Send(req,"param") > 0)
115  {
116  if (Receive(rep,"param") > 0)
117  {
118  TMessage2 tm(rep->GetData(), rep->GetSize());
119  thisPar = (FairParGenericSet*)tm.ReadObject(tm.GetClass());
120  LOG(WARN) << "Received parameter"<< paramName <<" from the server (" << thisPar << ")" << tm.GetClass()->GetName() << " DataSize: " << rep->GetSize();
121  thisPar->print();
122  return thisPar;
123  }
124  }
125  return NULL;
126 }
virtual FairParGenericSet * UpdateParameter(FairParGenericSet *thisPar)
static void free_string(void *data, void *hint)
virtual void UpdateParameters()
virtual void SetParameters()
static void CustomCleanupParameters(void *data, void *hint)
TMessage2(void *buf, Int_t len)
virtual void ProcessData()=0
std::vector< std::vector< FairTimeStamp * > > fData
return buf