.. _program_listing_file_processors_zmqserializer_zmqserializer.cpp: Program Listing for File zmqserializer.cpp ========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``processors/zmqserializer/zmqserializer.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include "zmqserializer.hpp" #include #include #include "idata.hpp" #include "utilities/zmqutil.hpp" ZMQSerializer::ZMQSerializer() : IProcessor() { add_option( "port", port_, "Local network port for data streaming. If interleave is false, " "then this is the first port in a sequence of ports used for streaming."); add_option("encoding", encoding_, "Binary or YAML encoding."); add_option("format", format_, "Data format (none, full, headeronly, streamheader, compact)."); add_option("interleave", interleave_, "Interleave data streams from all input slots and stream to " "single network port."); } void ZMQSerializer::CreatePorts() { data_port_ = create_input_port("data", AnyType::Capabilities(), PortInPolicy(SlotRange(1, 256), false, 0)); } void ZMQSerializer::Preprocess(ProcessingContext &context) { std::string address; sockets_.clear(); if (interleave_()) { sockets_.push_back(std::make_unique(context.run().global().zmq(), ZMQ_PUB)); address = "tcp://*:" + std::to_string(port_()); sockets_.back()->bind(address.c_str()); } else { for (int k = 0; k < data_port_->number_of_slots(); ++k) { sockets_.push_back(std::make_unique(context.run().global().zmq(), ZMQ_PUB)); address = "tcp://*:" + std::to_string(port_() + k); sockets_.back()->bind(address.c_str()); } } serializer_.reset(Serialization::serializer(encoding_(), format_())); packetid_.assign(data_port_->number_of_slots(), 0); } void ZMQSerializer::Process(ProcessingContext &context) { std::vector data; unsigned int idx = 0; std::stringstream buffer; while (!context.terminated()) { for (int k = 0; k < data_port_->number_of_slots(); ++k) { if (!data_port_->slot(k)->RetrieveDataAll(data)) { break; } if (!interleave_()) { idx = k; } for (auto &it : data) { buffer.str(""); buffer.clear(); if (serializer_->Serialize(buffer, it, k, packetid_[k]++, data_port_->slot(k)->upstream_address().processor(), data_port_->slot(k)->upstream_address().port(), data_port_->slot(k)->upstream_address().slot())) { if (!s_send(*(sockets_[idx]), buffer.str())) { LOG(DEBUG) << "failed to send zmq message."; } } else { LOG(WARNING) << name() << ": Unable to serialize data stream " << k; } } data_port_->slot(k)->ReleaseData(); } } } void ZMQSerializer::Postprocess(ProcessingContext &context) { sockets_.clear(); serializer_.reset(); for (SlotType k = 0; k < data_port_->number_of_slots(); k++) { LOG(UPDATE) << name() << ": stream " << k << ": received and serialized over network " << packetid_[k] << " data packets."; } } REGISTERPROCESSOR(ZMQSerializer)