42 int numInputs = fChannels.at(
"data-in").size();
46 const FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
47 FairMQChannel* dataInChannels[fChannels.at(
"data-in").size()];
48 LOG(INFO) <<
"Number of Input Channels: " << numInputs;
49 for (
int i = 0;
i < numInputs; ++
i)
51 dataInChannels[
i] = &(fChannels.at(
"data-in").at(
i));
54 fData.resize(numInputs);
55 int activeChannel = 0;
56 bool switchChannel =
false;
57 bool channelSwitched =
false;
62 while (CheckCurrentState(RUNNING))
64 for (
int channelNr = 0; channelNr < numInputs; channelNr++){
66 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
67 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
71 if (
fRunningStatus[channelNr] ==
true && dataInChannels[channelNr]->Receive(header) > 0){
73 int status = *(
static_cast<int*
>(header->GetData()));
77 LOG(INFO) <<
"STOP-Signal received for channel " << channelNr;
80 if (dataInChannels[channelNr]->ExpectsAnotherPart())
82 if (dataInChannels[channelNr]->Receive(msg))
84 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
85 std::istringstream ibuffer(msgStr);
87 boost::archive::binary_iarchive InputArchive(ibuffer);
92 catch (boost::archive::archive_exception& e)
94 LOG(ERROR) << e.what();
104 if (activeChannel == channelNr){
106 if (
fData[channelNr].size() > 0){
107 for (std::vector<PndSdsDigiTopix4>::iterator data =
fData[channelNr].begin(); data !=
fData[channelNr].end(); data++){
108 if (data->GetTimeStamp() < 0){
110 switchChannel =
true;
111 channelSwitched =
true;
112 fData[channelNr].erase(
fData[channelNr].begin(), ++data);
117 if (switchChannel ==
false){
119 fData[channelNr].clear();
124 unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(
sizeof(
int)));
125 memcpy(headerOut->GetData(), &flag,
sizeof(int));
126 dataOutChannel.SendPart(headerOut);
128 std::ostringstream obuffer;
129 boost::archive::binary_oarchive OutputArchive(obuffer);
131 int outputSize = obuffer.str().length();
132 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
133 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
134 dataOutChannel.Send(msg2);
138 for (
auto info : fOutputData){
140 if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
141 LOG(INFO) <<
"++++ SortingError ++++ " << oldTS <<
" > " << info.GetTimeStamp();
143 oldTS = info.GetTimeStamp();
146 if (switchChannel ==
true){
148 if (activeChannel >= numInputs)
153 switchChannel =
false;
165 if (allStop ==
true){
166 LOG(INFO) <<
"STOP-Signal received for all. Emptying buffers.";
167 for (
int i = 0;
i < numInputs;
i++){
169 if (
i != numInputs - 1){
174 unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(
sizeof(
int)));
175 memcpy(headerOut->GetData(), &flag,
sizeof(int));
176 dataOutChannel.SendPart(headerOut);
178 fOutputData =
fData[activeChannel];
179 fData[channelNr].clear();
181 std::ostringstream obuffer;
182 boost::archive::binary_oarchive OutputArchive(obuffer);
184 int outputSize = obuffer.str().length();
185 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
186 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
187 dataOutChannel.Send(msg2);
189 for (
auto info : fOutputData){
191 if (info.GetTimeStamp() > 0 && oldTS > info.GetTimeStamp()){
192 LOG(INFO) <<
"++++ SortingError ++++ " << oldTS <<
" > " << info.GetTimeStamp();
194 oldTS = info.GetTimeStamp();
199 if (activeChannel >= numInputs)
std::vector< PndSdsDigiTopix4 > fInputData
std::vector< PndSdsDigiTopix4 > fOutputData
std::vector< std::vector< PndSdsDigiTopix4 > > fData
std::vector< bool > fRunningStatus