42     int numInputs = fChannels.at(
"data-in").size();
 
   46     const FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
 
   47     FairMQChannel* dataInChannels[fChannels.at(
"data-in").size()];
 
   48     LOG(INFO) << 
"Number of Input Channels: " << numInputs;
 
   49     for (
int i = 0; 
i < numInputs; ++
i)
 
   51         dataInChannels[
i] = &(fChannels.at(
"data-in").at(
i));
 
   54     fData.resize(numInputs);
 
   55     int activeChannel = 0;
 
   56     bool switchChannel = 
false;
 
   57     bool channelSwitched = 
false;
 
   62     while (CheckCurrentState(RUNNING))
 
   64         for (
int channelNr = 0; channelNr < numInputs; channelNr++){
 
   66                 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
 
   67                 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
 
   71                 if (
fRunningStatus[channelNr] == 
true && dataInChannels[channelNr]->Receive(header) > 0){
 
   73                         int status = *(
static_cast<int*
>(header->GetData()));
 
   77                                 LOG(INFO) << 
"STOP-Signal received for channel " << channelNr;
 
   80                                 if (dataInChannels[channelNr]->ExpectsAnotherPart())
 
   82                                         if (dataInChannels[channelNr]->Receive(msg))
 
   84                                                 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
 
   85                                                 std::istringstream ibuffer(msgStr);
 
   87                                                 boost::archive::binary_iarchive InputArchive(ibuffer);
 
   92                                                 catch (boost::archive::archive_exception& e)
 
   94                                                         LOG(ERROR) << e.what();
 
  104                                                 if (activeChannel == channelNr){
 
  106                                                         if (
fData[channelNr].size() > 0){
 
  107                                                                 for (std::vector<PndSdsDigiTopix4>::iterator data = 
fData[channelNr].begin(); data != 
fData[channelNr].end(); data++){
 
  108                                                                         if (data->GetTimeStamp() < 0){
 
  110                                                                                 switchChannel = 
true;
 
  111                                                                                 channelSwitched = 
true;
 
  112                                                                                 fData[channelNr].erase(
fData[channelNr].begin(), ++data);
 
  117                                                                 if (switchChannel == 
false){
 
  119                                                                         fData[channelNr].clear();
 
  124                                                                 unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  125                                                                 memcpy(headerOut->GetData(), &flag, 
sizeof(int));
 
  126                                                                 dataOutChannel.SendPart(headerOut);
 
  128                                                                 std::ostringstream obuffer;
 
  129                                                                 boost::archive::binary_oarchive OutputArchive(obuffer);
 
  131                                                                 int outputSize = obuffer.str().length();
 
  132                                                                 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
 
  133                                                                 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
 
  134                                                                 dataOutChannel.Send(msg2);
 
  138                                                                 for (
auto info : fOutputData){
 
  140                                                                         if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
 
  141                                                                                 LOG(INFO) << 
"++++ SortingError ++++ " << oldTS << 
" > " << info.GetTimeStamp();
 
  143                                                                         oldTS = info.GetTimeStamp();
 
  146                                                                 if (switchChannel == 
true){
 
  148                                                                         if (activeChannel >= numInputs)
 
  153                                                                         switchChannel = 
false;
 
  165                                 if (allStop == 
true){
 
  166                                         LOG(INFO) << 
"STOP-Signal received for all. Emptying buffers.";
 
  167                                         for (
int i = 0; 
i < numInputs; 
i++){
 
  169                                                 if (
i != numInputs - 1){
 
  174                                                 unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  175                                                 memcpy(headerOut->GetData(), &flag, 
sizeof(int));
 
  176                                                 dataOutChannel.SendPart(headerOut);
 
  178                                                 fOutputData = 
fData[activeChannel];
 
  179                                                 fData[channelNr].clear();
 
  181                                                 std::ostringstream obuffer;
 
  182                                                 boost::archive::binary_oarchive OutputArchive(obuffer);
 
  184                                                 int outputSize = obuffer.str().length();
 
  185                                                 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
 
  186                                                 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
 
  187                                                 dataOutChannel.Send(msg2);
 
  189                                                 for (
auto info : fOutputData){
 
  191                                                         if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
 
  192                                                                 LOG(INFO) << 
"++++ SortingError ++++ " << oldTS << 
" > " << info.GetTimeStamp();
 
  194                                                         oldTS = info.GetTimeStamp();
 
  199                                                 if (activeChannel >= numInputs)
 
std::vector< PndSdsDigiTopix4 > fInputData
std::vector< PndSdsDigiTopix4 > fOutputData
std::vector< std::vector< PndSdsDigiTopix4 > > fData
std::vector< bool > fRunningStatus