34 const FairMQChannel& dataInChannel = fChannels.at(
"data-in").at(0);
36 while (CheckCurrentState(RUNNING))
38 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
39 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
41 if (dataInChannel.Receive(header) > 0)
43 status = *(
static_cast<int*
>(header->GetData()));
45 if (dataInChannel.ExpectsAnotherPart())
47 if (dataInChannel.Receive(msg) > 0)
50 if (fChannels.at(
"data-out").size() > 1)
52 for (
int i = 1;
i < fChannels.at(
"data-out").size(); ++
i)
58 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
59 headerCopy->Copy(header);
60 fChannels.at(
"data-out").at(
i).SendPart(headerCopy);
61 std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
63 fChannels.at(
"data-out").at(
i).Send(msgCopy);
67 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
68 headerCopy->Copy(header);
69 fChannels.at(
"data-out").at(0).SendPart(headerCopy);
70 std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
72 fChannels.at(
"data-out").at(0).Send(msgCopy);
76 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
77 headerCopy->Copy(header);
78 fChannels.at(
"data-out").at(0).SendPart(headerCopy);
79 std::unique_ptr<FairMQMessage> msgCopy(fTransportFactory->CreateMessage());
81 fChannels.at(
"data-out").at(0).Send(msgCopy);
85 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
86 headerCopy->Copy(header);
87 fChannels.at(
"data-out").at(0).Send(headerCopy);
90 LOG(INFO) <<
"STOP-Signal Received!";
std::vector< int > fRates