FairRoot/PandaRoot
PndMQDataDuplicator.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence version 3 (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #include <memory> // unique_ptr
16 
17 #include <boost/thread.hpp>
18 #include <boost/bind.hpp>
19 
20 #include "PndMQDataDuplicator.h"
21 #include "FairMQLogger.h"
22 #include "PndMQStatus.h"
23 
25 {
26  fRates.push_back(0);
27  fRates.push_back(200);
28 }
29 
31 {
32  uint64_t counter = 0;
33 
34  const FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);
35 
36  while (CheckCurrentState(RUNNING))
37  {
38  std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
39  std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
41  if (dataInChannel.Receive(header) > 0)
42  {
43  status = *(static_cast<int*>(header->GetData()));
44 
45  if (dataInChannel.ExpectsAnotherPart())
46  {
47  if (dataInChannel.Receive(msg) > 0)
48  {
49  counter++;
50  if (fChannels.at("data-out").size() > 1)
51  {
52  for (int i = 1; i < fChannels.at("data-out").size(); ++i)
53  {
54  if (i < fRates.size()){
55  //LOG(INFO) << "Channel: " << i;
56  if ( counter%fRates[i] == 0 ){
57  //LOG(INFO) << "SendMessage";
58  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
59  headerCopy->Copy(header);
60  fChannels.at("data-out").at(i).SendPart(headerCopy);
61  std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
62  msgCopy->Copy(msg);
63  fChannels.at("data-out").at(i).Send(msgCopy);
64  }
65  }
66  }
67  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
68  headerCopy->Copy(header);
69  fChannels.at("data-out").at(0).SendPart(headerCopy);
70  std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
71  msgCopy->Copy(msg);
72  fChannels.at("data-out").at(0).Send(msgCopy);
73  }
74  else
75  {
76  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
77  headerCopy->Copy(header);
78  fChannels.at("data-out").at(0).SendPart(headerCopy);
79  std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
80  msgCopy->Copy(msg);
81  fChannels.at("data-out").at(0).Send(msgCopy);
82  }
83  }
84  } else {
85  std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
86  headerCopy->Copy(header);
87  fChannels.at("data-out").at(0).Send(headerCopy);
88  }
89  if (status == PndMQStatus::STOP)
90  LOG(INFO) << "STOP-Signal Received!";
91  }
92  }
93 }
94 
96 {
97 }
Int_t i
Definition: run_full.C:25
int counter
Definition: ZeeAnalysis.C:59
std::vector< int > fRates
int status[10]
Definition: f_Init.h:28