42 int numOutputs = fChannels.at(
"data-out").size();
45 const FairMQChannel& dataInChannel = fChannels.at(
"data-in").at(0);
46 FairMQChannel* dataOutChannels[fChannels.at(
"data-out").size()];
47 LOG(INFO) <<
"Number of Output Channels: " << numOutputs;
48 for (
int i = 0;
i < numOutputs; ++
i)
50 dataOutChannels[
i] = &(fChannels.at(
"data-out").at(
i));
54 double currentOffset = currentThreshold +
fOffset;
56 while (CheckCurrentState(RUNNING))
58 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
59 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
61 if (dataInChannel.Receive(header) > 0)
63 int status = *(
static_cast<int*
>(header->GetData()));
66 LOG(INFO) <<
"WrongStatus: " <<
status;
69 if (dataInChannel.ExpectsAnotherPart())
71 if (dataInChannel.Receive(msg) > 0)
73 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
74 std::istringstream ibuffer(msgStr);
75 boost::archive::binary_iarchive InputArchive(ibuffer);
79 catch (boost::archive::archive_exception& e)
81 LOG(ERROR) << e.what();
86 bool switchChannels =
false;
88 if (itr.GetTimeStamp() < currentThreshold)
92 if(itr.GetTimeStamp() > currentOffset){
94 switchChannels =
true;
99 if (switchChannels ==
true){
105 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
107 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
108 dataOutChannels[direction]->SendPart(headerCopy);
110 std::ostringstream obuffer;
111 boost::archive::binary_oarchive OutputArchive(obuffer);
114 int outputSize = obuffer.str().length();
115 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
116 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
118 dataOutChannels[direction]->Send(msg2);
123 fCurrentOutput.clear();
127 int nextOutput = direction + 1;
128 if (nextOutput >= numOutputs)
131 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
133 memcpy(headerCopy->GetData(), &flag,
sizeof(int));
134 dataOutChannels[nextOutput]->SendPart(headerCopy);
136 std::ostringstream obuffer;
137 boost::archive::binary_oarchive OutputArchive(obuffer);
140 int outputSize = obuffer.str().length();
141 unique_ptr<FairMQMessage> msg3(fTransportFactory->CreateMessage(outputSize));
142 memcpy(msg3->GetData(), obuffer.str().c_str(), outputSize);
144 dataOutChannels[nextOutput]->Send(msg3);
149 if (switchChannels ==
true){
152 if (direction >= numOutputs)
158 switchChannels =
false;
163 LOG(INFO) <<
"STOP-Status Received: " <<
status;
164 for (
int i = 0;
i < numOutputs; ++
i)
166 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage());
167 headerCopy->Copy(header);
168 dataOutChannels[
i]->Send(headerCopy);
std::vector< PndSdsDigiTopix4 > fCurrentOutput
Data class to store the digi output of a pixel module.
std::vector< PndSdsDigiTopix4 > fTopixData
std::vector< PndSdsDigiTopix4 > fNextOutput