58 FairMQChannel& dataInChannel = fChannels.at(
"data-in").at(0);
59 FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
60 bool statusChannelPresent =
false;
64 (fChannels.at(
"status-out"));
68 LOG(INFO) <<
"No Status-Out channel!";
72 while (CheckCurrentState(RUNNING))
74 std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
75 std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
77 if (dataInChannel.Receive(header) > 0)
79 status = *(
static_cast<int*
>(header->GetData()));
81 if (dataInChannel.ExpectsAnotherPart())
83 if (dataInChannel.Receive(msg) > 0) {
84 string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
85 istringstream ibuffer(msgStr);
87 boost::archive::binary_iarchive InputArchive(ibuffer);
92 catch (boost::archive::archive_exception& e)
94 LOG(ERROR) << e.what();
107 for (
auto & eventIter : separatedData){
108 std::vector<PndSdsHit>
hits;
109 if (eventIter.size() > 1){
112 for (
auto clusterIter : cluster){
113 std::vector<PndSdsDigiTopix4> clusterDigis;
114 for (
auto digiIter : clusterIter){
115 clusterDigis.push_back((eventIter)[digiIter]);
137 LOG(INFO) <<
"Received STOP-Signal!" << std::endl;
140 unique_ptr<FairMQMessage> headerOut(fTransportFactory->CreateMessage(
sizeof(
int)));
141 memcpy(headerOut->GetData(), &
status,
sizeof(int));
142 dataOutChannel.SendPart(headerOut);
144 std::ostringstream obuffer;
145 boost::archive::binary_oarchive OutputArchive(obuffer);
147 int outputSize = obuffer.str().length();
148 unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
149 memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
150 dataOutChannel.Send(msg2);
153 unique_ptr<FairMQMessage> headerOut2(fTransportFactory->CreateMessage(
sizeof(
int)));
154 memcpy(headerOut->GetData(), &
status,
sizeof(int));
155 fChannels.at(
"status-out").at(0).SendPart(headerOut2);
157 std::ostringstream obuffer2;
158 boost::archive::binary_oarchive OutputArchive2(obuffer2);
160 int outputSize2 = obuffer2.str().length();
161 unique_ptr<FairMQMessage> msg3(fTransportFactory->CreateMessage(outputSize2));
162 memcpy(msg3->GetData(), obuffer2.str().c_str(), outputSize2);
163 fChannels.at(
"status-out").at(0).Send(msg3);
174 fTopixHitsEvent.clear();
PndSdsHit GetHit(std::vector< PndSdsDigiTopix4 > pixelArray)
PndMvdTopixClusterFinder fClusterFinder
std::vector< PndSdsDigiTopix4 > fTopixDigis
std::deque< std::vector< PndSdsHit > > fTopixHitsEvent
std::vector< int > fClusterSize
PndMQGapEventBuilderToPix fEventBuilder
std::vector< std::vector< Int_t > > GetClusters(std::vector< PndSdsDigiTopix4 > &hits)
std::vector< std::vector< PndSdsDigiTopix4 > > GetSeparatedData()
PndMQTopixHitProducer fHitProducer
void FillData(std::vector< PndSdsDigiTopix4 > data)
bool fHasBoostSerialization