FairRoot/PandaRoot
PndMQTopix4Sorter.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <boost/thread.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/archive/binary_iarchive.hpp>
18 #include <PndMapSorterTpl.h>
19 #include "PndMQTopix4Sorter.h"
20 #include "PndMQStatus.h"
21 
22 #include "baseMQtools.h"
23 
24 #include "FairMQLogger.h"
25 #include "PndSdsDigiTopix4.h"
26 
27 #include <TH2.h>
28 #include <TCanvas.h>
29 
30 
31 using namespace std;
32 
33 PndMQTopix4Sorter::PndMQTopix4Sorter() : fHasBoostSerialization(false)
34 {
35  //gSystem->ResetSignal(kSigInterrupt);
36  //gSystem->ResetSignal(kSigTermination);
37 
38  using namespace baseMQ::tools::resolve;
39  // coverity[pointless_expression]: suppress coverity warnings on apparant if(const).
40  if (has_BoostSerialization<PndSdsDigiTopix4, void(boost::archive::binary_iarchive&, const unsigned int)>::value == 1)
42 }
43 
44 //void PndMQTopix4Sorter::CustomCleanup(void *data, void *object)
45 //{
46 // delete (string*)object;
47 //}
48 
50 {
51  LOG(INFO) << "Boost Serialization "<< fHasBoostSerialization;
53  FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
54  FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
55 
56  int receivedMsgs = 0;
58 
59 
60  while (CheckCurrentState(RUNNING))
61  {
62  FairMQMessage* header = fTransportFactory->CreateMessage();
63  FairMQMessage* msg = fTransportFactory->CreateMessage();
64 
65  if (dataInChannel.Receive(header) > 0)
66  {
67  int status = *(static_cast<int*>(header->GetData()));
68 
69  if (dataInChannel.ExpectsAnotherPart())
70  {
71  if (dataInChannel.Receive(msg)){
72  string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
73  istringstream ibuffer(msgStr);
74 
75  boost::archive::binary_iarchive InputArchive(ibuffer);
76 
77  try {
78  InputArchive >> fTopixData;
79  }
80  catch (boost::archive::archive_exception& e)
81  {
82  LOG(ERROR) << e.what();
83  }
84 
85  // LOG(INFO) << "TopixData: " << fTopixData.size();
86  // for (auto iter : fTopixData){
87  // LOG(INFO) << iter.GetTimeStamp();
88  // }
89 
90  bool endSorting = false;
91  double timeOfLast = 0;
92  if (fTopixData.size() > 0){
93  for (auto iter : fTopixData){
94  if (iter.GetTimeStamp() > 0){
95  sorter.AddElement(iter, iter.GetTimeStamp());
96  timeOfLast = iter.GetTimeStamp();
97  }
98  else {
99  endSorting = true;
100  // LOG(INFO) << "---END SORTING---";
101  }
102  }
103  if (endSorting == false){
104  sorter.WriteOutData(timeOfLast);
105  fOutputData = sorter.GetOutputData();
106  sorter.DeleteOutputData();
107  }
108  else if (endSorting == true || status == PndMQStatus::STOP){
109  LOG(INFO) << "EndSorting or STOP-Status " << status;
110  sorter.WriteOutAll();
111  fOutputData = sorter.GetOutputData();
112  fOutputData.push_back(PndSdsDigiTopix4());
113  sorter.DeleteOutputData();
114  endSorting = false;
115  }
116 
117  unique_ptr<FairMQMessage> headerCpy(fTransportFactory->CreateMessage(sizeof(int)));
118  headerCpy->Copy(header);
119  dataOutChannel.SendPart(headerCpy);
120 
121 
122  std::ostringstream obuffer;
123  boost::archive::binary_oarchive OutputArchive(obuffer);
124  OutputArchive << fOutputData;
125  int outputSize = obuffer.str().length();
126  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
127  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
128  dataOutChannel.Send(msg2);
129 
130  //LOG(INFO) << "Data: " << fTopixData.size() << " " << timeOfLast;
131  // LOG(INFO) << "Output: " << fOutputData.size() << " timeOfLast: " << timeOfLast;
132  // for(auto itr : fOutputData)
133  // LOG(INFO) << itr.GetTimeStamp();
134 
135  fTopixData.clear();
136  fOutputData.clear();
137  }
138  delete(msg);
139  delete (header);
140  }
141  }
142  // LOG(INFO) << "Received Message: " << receivedMsgs++ << " Size: " << msg->GetSize();
143  if (status == PndMQStatus::STOP){
144  LOG(INFO) << "STOP-Signal Received!";
145  sorter.WriteOutAll();
146  fOutputData = sorter.GetOutputData();
147  fOutputData.push_back(PndSdsDigiTopix4());
148  sorter.DeleteOutputData();
149 
150  unique_ptr<FairMQMessage> headerCpy(fTransportFactory->CreateMessage(sizeof(int)));
151  headerCpy->Copy(header);
152  dataOutChannel.SendPart(headerCpy);
153 
154  std::ostringstream obuffer;
155  boost::archive::binary_oarchive OutputArchive(obuffer);
156  OutputArchive << fOutputData;
157  int outputSize = obuffer.str().length();
158  unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
159  memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
160  dataOutChannel.Send(msg2);
161  }
162 
163 
164  }
165  }
166  }
167 }
168 
170 {
171 }
virtual void WriteOutData(double time)
Data class to store the digi output of a pixel module.
std::vector< PndSdsDigiTopix4 > fTopixData
virtual std::vector< TData > GetOutputData()
std::vector< PndSdsDigiTopix4 > fOutputData
virtual void AddElement(TData digi, double timestamp)
virtual void DeleteOutputData()
int status[10]
Definition: f_Init.h:28
virtual void WriteOutAll()