17 #include <boost/thread.hpp> 
   18 #include <boost/bind.hpp> 
   19 #include <boost/archive/binary_oarchive.hpp> 
   20 #include <boost/archive/binary_iarchive.hpp> 
   23 #include "baseMQtools.h" 
   25 #include "FairMQLogger.h" 
   26 #include "mrfdata_8b.h" 
   35         using namespace baseMQ::tools::resolve;
 
   36         bool checkOutputClass = 
false;
 
   38         if (is_same<boost::archive::binary_oarchive, boost::archive::binary_oarchive>::value)
 
   40                 if (has_BoostSerialization<
PndSdsDigiTopix4, 
void(boost::archive::binary_oarchive&, 
const unsigned int)>::value == 1)
 
   42                         checkOutputClass = 
true;
 
   57          int numInputs = fChannels.at(
"data-in").size();
 
   60         const FairMQChannel& dataOutChannel = fChannels.at(
"data-out").at(0);
 
   61         const FairMQChannel& statusChannel  = fChannels.at(
"status-out").at(0);
 
   62         FairMQChannel* dataInChannels[fChannels.at(
"data-in").size()];
 
   63         LOG(INFO) << 
"Number of Input Channels: " << numInputs;
 
   65         for (
int i = 0; 
i < numInputs; ++
i)
 
   67                 dataInChannels[
i] = &(fChannels.at(
"data-in").at(
i));
 
   76         std::vector<int> fillLevel(numInputs,0);
 
   79         bool stopMessageOnce = 
true;
 
   81         while (CheckCurrentState(RUNNING))
 
   84                         for (
int channelNr = 0; channelNr < numInputs; channelNr++){
 
   86                                 if (fillLevel[channelNr] == 0 && 
fRunningStatus[channelNr] == 
true){
 
   88                                         std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage());
 
   89                                         std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
 
   91                                         if (dataInChannels[channelNr]->Receive(header) > 0)
 
   93                                                 int status = *(
static_cast<int*
>(header->GetData()));
 
   98                                                         LOG(INFO) << 
"STOP-Status received for channel: " << channelNr;
 
  100                                                 if (dataInChannels[channelNr]->ExpectsAnotherPart())
 
  103                                                         if (dataInChannels[channelNr]->Receive(msg) > 0){
 
  104                                                                 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
 
  105                                                                 std::istringstream ibuffer(msgStr);
 
  107                                                                 boost::archive::binary_iarchive InputArchive(ibuffer);
 
  112                                                                 catch (boost::archive::archive_exception& e)
 
  114                                                                         LOG(ERROR) << e.what();
 
  123                                         if (fillLevel[channelNr] == 0 && 
fRunningStatus[channelNr] == 
false)
 
  126                                                 LOG(INFO) << 
"GlobarRunningStatus set to false for channel " << channelNr;
 
  133                         if (eventCounter++ % 100 == 0){
 
  134                                 LOG(INFO) << eventCounter << 
" nEvents: " << 
fEventData.size() << 
" hits in Event " << 
fEventData.front().size()
 
  135                                                 << 
" timeStamp: " << TString::Format(
"%12.0f",
fEventData.front().front().GetTimeStamp()).Data()
 
  136                                                 << 
" sensorID " << 
fEventData.front().front().GetSensorID();
 
  138                                 LOG(INFO) << 
"ChannelsInEvent: ";
 
  142                                 std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  144                                 memcpy(headerCopy->GetData(), &flag, 
sizeof(int));
 
  145                                 statusChannel.SendPart(headerCopy);
 
  147                                 std::ostringstream obuffer;
 
  148                                 boost::archive::binary_oarchive OutputArchive(obuffer);
 
  151                                 int outputSize = obuffer.str().length();
 
  152                                 unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(outputSize));
 
  153                                 memcpy(msg->GetData(), obuffer.str().c_str(), outputSize);
 
  155                                 statusChannel.Send(msg);
 
  159                         std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  161                         memcpy(headerCopy->GetData(), &flag, 
sizeof(int));
 
  162                         dataOutChannel.SendPart(headerCopy);
 
  164                         std::ostringstream obuffer;
 
  165                         boost::archive::binary_oarchive OutputArchive(obuffer);
 
  168                         int outputSize = obuffer.str().length();
 
  169                         unique_ptr<FairMQMessage> msg2(fTransportFactory->CreateMessage(outputSize));
 
  170                         memcpy(msg2->GetData(), obuffer.str().c_str(), outputSize);
 
  172                         dataOutChannel.Send(msg2);
 
  193                         LOG(INFO) << 
"STOP-Signal received for one input";
 
  194                         std::unique_ptr<FairMQMessage> headerCopy(fTransportFactory->CreateMessage(
sizeof(
int)));
 
  196                         memcpy(headerCopy->GetData(), &flag, 
sizeof(int));
 
  197                         dataOutChannel.Send(headerCopy);
 
  198                         stopMessageOnce = 
false;
 
  208             FairMQDevice::SetProperty(key, value);
 
  218             return FairMQDevice::GetProperty(key, default_);
 
  227                 FairMQDevice::SetProperty(key, value);
 
  237             return FairMQDevice::GetProperty(key, default_);
 
void AddData(vector< vector< vector< PndSdsHit > > > &data)
bool fHasBoostSerialization
std::vector< std::vector< PndSdsHit > > fHitData
PndMQHitsEventBuilder * fBuilder
Data class to store the digi output of a pixel module. 
std::vector< int > fSensorsInEvent
virtual ~PndMQHitEventDevice()
std::vector< int > GetInputDataLevel()
virtual std::string GetProperty(const int key, const std::string &default_="")
std::vector< bool > fRunningStatus
std::vector< std::vector< PndSdsHit > > fEventData
virtual void SetProperty(const int key, const std::string &value)
std::vector< std::vector< std::vector< PndSdsHit > > > fDataFromChannels
vector< vector< PndSdsHit > > GetEvents()
bool fGlobalRunningStatus
vector< int > GetSensorsInEvent()