15 #include <boost/thread.hpp> 
   16 #include <boost/bind.hpp> 
   17 #include <boost/archive/binary_oarchive.hpp> 
   19 #include "baseMQtools.h" 
   21 #include "FairMQLogger.h" 
   29         using namespace baseMQ::tools::resolve;
 
   31         if (has_BoostSerialization<
PndSdsDigiTopix4, 
void(boost::archive::binary_iarchive&, 
const unsigned int)>::value == 1)
 
   42     int numOutputs = fChannels.at(
"data-out").size();
 
   45     const FairMQChannel& dataInChannel = fChannels.at(
"data-in").at(0);
 
   46     FairMQChannel* dataOutChannels[fChannels.at(
"data-out").size()];
 
   47     LOG(INFO) << 
"Number of Output Channels: " << numOutputs;
 
   48     for (
int i = 0; 
i < numOutputs; ++
i)
 
   50         dataOutChannels[
i] = &(fChannels.at(
"data-out").at(
i));
 
   54     double currentOffset = currentThreshold + 
fOffset;
 
   56     while (CheckCurrentState(RUNNING))
 
   58         std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
 
   59         std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
 
   61         if (dataInChannel.Receive(header) > 0)
 
   63                 int status = *(
static_cast<int*
>(header->GetData()));
 
   66                         LOG(INFO) << 
"WrongStatus: " << 
status;
 
   69                 if (dataInChannel.ExpectsAnotherPart())
 
   71                         if (dataInChannel.Receive(msg) > 0)
 
   73                                         std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
 
   74                                         std::istringstream ibuffer(msgStr);
 
   75                                         boost::archive::binary_iarchive InputArchive(ibuffer);
 
   79                                         catch (boost::archive::archive_exception& e)
 
   81                                                 LOG(ERROR) << e.what();
 
   86                                         bool switchChannels = 
false;
 
   88                                                 if (itr.GetTimeStamp() < currentThreshold)
 
   92                                                         if(itr.GetTimeStamp() > currentOffset){
 
   94                                                                 switchChannels = 
true;
 
   99                                         if (switchChannels == 
true){
 
  105                                                 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  107                                                 memcpy(headerCopy->GetData(), &flag, 
sizeof(int));
 
  108                                                 dataOutChannels[direction]->SendPart(headerCopy);
 
  110                                                 std::ostringstream obuffer;
 
  111                                                 boost::archive::binary_oarchive OutputArchive(obuffer);
 
  114                                                 int outputSize = obuffer.str().length();
 
  115                                                 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
 
  116                                                 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
 
  118                                                 dataOutChannels[direction]->Send(msg2);
 
  123                                                 fCurrentOutput.clear();
 
  127                                                 int nextOutput = direction + 1;
 
  128                                                 if (nextOutput >= numOutputs)
 
  131                                                 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  133                                                 memcpy(headerCopy->GetData(), &flag, 
sizeof(int));
 
  134                                                 dataOutChannels[nextOutput]->SendPart(headerCopy);
 
  136                                                 std::ostringstream obuffer;
 
  137                                                 boost::archive::binary_oarchive OutputArchive(obuffer);
 
  140                                                 int outputSize = obuffer.str().length();
 
  141                                                 unique_ptr<FairMQMessage> msg3(fTransportFactory->CreateMessage(outputSize));
 
  142                                                 memcpy(msg3->GetData(), obuffer.str().c_str(), outputSize);
 
  144                                                 dataOutChannels[nextOutput]->Send(msg3);
 
  149                                         if (switchChannels == 
true){
 
  152                                                 if (direction >= numOutputs)
 
  158                                                 switchChannels = 
false;
 
  163                         LOG(INFO) << 
"STOP-Status Received: " << 
status;
 
  164                                 for (
int i = 0; 
i < numOutputs; ++
i)
 
  166                                         std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
 
  167                                         headerCopy->Copy(header);
 
  168                                         dataOutChannels[
i]->Send(headerCopy);
 
std::vector< PndSdsDigiTopix4 > fCurrentOutput
Data class to store the digi output of a pixel module. 
std::vector< PndSdsDigiTopix4 > fTopixData
std::vector< PndSdsDigiTopix4 > fNextOutput
bool fHasBoostSerialization
virtual ~PndMQSorterDistributor()