FairRoot/PandaRoot
PndMvdMQTaskProcessor.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
10 #include "FairMQParts.h"
11 #include "PixelFindHits.h"
12 #include "TMessage.h"
13 #include "PixelDigiPar.h"
14 
15 // special class to expose protected TMessage constructor
16 class Ex9TMessage2 : public TMessage
17 {
18  public:
19  Ex9TMessage2(void* buf, Int_t len)
20  : TMessage(buf, len)
21  {
22  ResetBit(kIsOwner);
23  }
24 };
25 
26 // helper function to clean up the object holding the data after it is transported.
27 void free_tmessage4(void *data, void *hint)
28 {
29  delete (TMessage*)hint;
30 }
31 
33  : FairMQDevice()
34  , fNewRunId(1)
35  , fCurrentRunId(-1)
36  , fInput(NULL)
37  , fOutput(NULL)
38  , fEventHeader(NULL)
39  , fFairTask(NULL)
40  , fGeoPar(nullptr)
41  , fParCList(NULL)
42 {
43 
44 }
45 
47 {
48  if(fGeoPar)
49  {
50  delete fGeoPar;
51  fGeoPar=nullptr;
52  }
53 
54  if(fInput)
55  {
56  delete fInput;
57  fInput=nullptr;
58  }
59 
60  if(fOutput)
61  {
62  delete fOutput;
63  fOutput=nullptr;
64  }
65 }
66 
67 
69 {
70  //fHitFinder->InitMQ(fRootParFileName,fAsciiParFileName);
71  fFairTask = new PixelFindHits();
72  fGeoPar = new FairGeoParSet("FairGeoParSet");
73  fParCList = new TList();
74  fParCList->Add(fGeoPar);
75  fFairTask->GetParList(fParCList);
76 
77  fOutput = new TList();
78  fInput = new TList();
79 }
80 
82 {
83  int receivedMsgs = 0;
84  int sentMsgs = 0;
85 
86  while (CheckCurrentState(RUNNING))
87  {
88  FairMQParts parts;
89 
90  if ( Receive(parts,"data-in") >= 0 )
91  {
92  LOG(INFO)<<"message received";
93  receivedMsgs++;
94 
95  TObject* tempObjects[10];
96  for ( int ipart = 0 ; ipart < parts.Size() ; ipart++ )
97  {
98  Ex9TMessage2 tm(parts.At(ipart)->GetData(), parts.At(ipart)->GetSize());
99  tempObjects[ipart] = (TObject*)tm.ReadObject(tm.GetClass());
100  if ( strcmp(tempObjects[ipart]->GetName(),"EventHeader.") == 0 )
101  fEventHeader = (FairEventHeader*)tempObjects[ipart];
102  else {
103  fInput->Add(tempObjects[ipart]);
104  }
105  }
106 
107  fNewRunId = fEventHeader->GetRunId();
109  {
112  PixelDigiPar* tempDP = (PixelDigiPar*)fParCList->FindObject("PixelDigiParameters");
113  fFairTask->InitMQ(fParCList);
114  }
115 
116 
117  // Execute hit finder task
118  fOutput->Clear();
119  LOG(INFO) << " analyzing event " << fEventHeader->GetMCEntryNumber();
120  fFairTask->ExecMQ(fInput,fOutput);
121 
122  TMessage* messageFEH;
123  TMessage* messageTCA[10];
124  FairMQParts partsOut;
125 
126  messageFEH = new TMessage(kMESS_OBJECT);
127  messageFEH->WriteObject(fEventHeader);
128  partsOut.AddPart(NewMessage(messageFEH->Buffer(), messageFEH->BufferSize(), free_tmessage4, messageFEH));
129  for ( int iobj = 0 ; iobj < fOutput->GetEntries() ; iobj++ ) {
130  messageTCA[iobj] = new TMessage(kMESS_OBJECT);
131  messageTCA[iobj]->WriteObject(fOutput->At(iobj));
132  partsOut.AddPart(NewMessage(messageTCA[iobj]->Buffer(), messageTCA[iobj]->BufferSize(), free_tmessage4, messageTCA[iobj]));
133  Send(partsOut, "data-out");
134  }
135 
136  sentMsgs++;
137  }
138  fInput->Clear();
139  }
140 
141  MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!";
142 }
143 
144 
145 
146 
147 void PndMvdMQTaskProcessor::CustomCleanup(void *data, void *hint)
148 {
149  delete (std::string*)hint;
150 }
151 
152 
153 
154 void PndMvdMQTaskProcessor::SetProperty(const int key, const std::string& value)
155 {
156  switch (key)
157  {
158  default:
159  FairMQDevice::SetProperty(key, value);
160  break;
161  }
162 }
163 
164 
165 
166 std::string PndMvdMQTaskProcessor::GetProperty(const int key, const std::string& default_)
167 {
168  switch (key)
169  {
170  default:
171  return FairMQDevice::GetProperty(key, default_);
172  }
173 }
174 
175 
176 void PndMvdMQTaskProcessor::SetProperty(const int key, const int value)
177 {
178  FairMQDevice::SetProperty(key, value);
179 }
180 
181 int PndMvdMQTaskProcessor::GetProperty(const int key, const int value)
182 {
183  return FairMQDevice::GetProperty(key, value);
184 }
185 
186 
187 
188 
189 
190 
191 
193  for ( int iparC = 0 ; iparC < fParCList->GetEntries() ; iparC++ ) {
194  FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
195  fParCList->Remove(tempObj);
196  fParCList->AddAt(UpdateParameter(tempObj),iparC);
197  }
198 }
199 
200 FairParGenericSet* PndMvdMQTaskProcessor::UpdateParameter(FairParGenericSet* thisPar) {
201  std::string paramName = thisPar->GetName();
202  // boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
203  std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
204  LOG(WARN) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar << ")";
205  std::unique_ptr<FairMQMessage> req(NewMessage(const_cast<char*>(reqStr->c_str()), reqStr->length(), CustomCleanup, reqStr));
206  std::unique_ptr<FairMQMessage> rep(NewMessage());
207 
208  if (Send(req,"param") > 0)
209  {
210  if (Receive(rep,"param") > 0)
211  {
212  Ex9TMessage2 tm(rep->GetData(), rep->GetSize());
213  thisPar = (FairParGenericSet*)tm.ReadObject(tm.GetClass());
214  LOG(WARN) << "Received parameter"<< paramName <<" from the server (" << thisPar << ")";
215  return thisPar;
216  }
217  }
218  return NULL;
219 }
Ex9TMessage2(void *buf, Int_t len)
FairEventHeader * fEventHeader
void SetProperty(const int key, const std::string &value)
FairParGenericSet * UpdateParameter(FairParGenericSet *thisPar)
void free_tmessage4(void *data, void *hint)
static void CustomCleanup(void *data, void *hint)
std::string GetProperty(const int key, const std::string &default_="")