Program Listing for File zmqserializer.cpp¶
↰ Return to documentation for file (processors/zmqserializer/zmqserializer.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "zmqserializer.hpp"
#include <string>
#include <utility>
#include "idata.hpp"
#include "utilities/zmqutil.hpp"
ZMQSerializer::ZMQSerializer() : IProcessor() {
add_option(
"port", port_,
"Local network port for data streaming. If interleave is false, "
"then this is the first port in a sequence of ports used for streaming.");
add_option("encoding", encoding_, "Binary or YAML encoding.");
add_option("format", format_,
"Data format (none, full, headeronly, streamheader, compact).");
add_option("interleave", interleave_,
"Interleave data streams from all input slots and stream to "
"single network port.");
}
void ZMQSerializer::CreatePorts() {
data_port_ =
create_input_port<AnyType>("data", AnyType::Capabilities(),
PortInPolicy(SlotRange(1, 256), false, 0));
}
void ZMQSerializer::Preprocess(ProcessingContext &context) {
std::string address;
sockets_.clear();
if (interleave_()) {
sockets_.push_back(std::make_unique<zmq::socket_t>(context.run().global().zmq(), ZMQ_PUB));
address = "tcp://*:" + std::to_string(port_());
sockets_.back()->bind(address.c_str());
} else {
for (int k = 0; k < data_port_->number_of_slots(); ++k) {
sockets_.push_back(std::make_unique<zmq::socket_t>(context.run().global().zmq(), ZMQ_PUB));
address = "tcp://*:" + std::to_string(port_() + k);
sockets_.back()->bind(address.c_str());
}
}
serializer_.reset(Serialization::serializer(encoding_(), format_()));
packetid_.assign(data_port_->number_of_slots(), 0);
}
void ZMQSerializer::Process(ProcessingContext &context) {
std::vector<typename AnyType::Data *> data;
unsigned int idx = 0;
std::stringstream buffer;
while (!context.terminated()) {
for (int k = 0; k < data_port_->number_of_slots(); ++k) {
if (!data_port_->slot(k)->RetrieveDataAll(data)) {
break;
}
if (!interleave_()) {
idx = k;
}
for (auto &it : data) {
buffer.str("");
buffer.clear();
if (serializer_->Serialize(buffer, it, k, packetid_[k]++,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot())) {
if (!s_send(*(sockets_[idx]), buffer.str())) {
LOG(DEBUG) << "failed to send zmq message.";
}
} else {
LOG(WARNING) << name() << ": Unable to serialize data stream " << k;
}
}
data_port_->slot(k)->ReleaseData();
}
}
}
void ZMQSerializer::Postprocess(ProcessingContext &context) {
sockets_.clear();
serializer_.reset();
for (SlotType k = 0; k < data_port_->number_of_slots(); k++) {
LOG(UPDATE) << name() << ": stream " << k
<< ": received and serialized over network " << packetid_[k]
<< " data packets.";
}
}
REGISTERPROCESSOR(ZMQSerializer)