.. _program_listing_file_processors_TTLSink_ttlsink.cpp: Program Listing for File ttlsink.cpp ==================================== |exhale_lsh| :ref:`Return to documentation for file ` (``processors/TTLSink/ttlsink.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include "ttlsink.hpp" #include #include #include "idata.hpp" #include "utilities/zmqutil.hpp" #include "utilities/string.hpp" TTLSink::TTLSink() : IProcessor() { add_option("address", address_, "Cheetah or Open ephys ip address"); add_option("port", port_,"Cheetah network port."); add_option("ttl", ttl_,"TTL"); add_option("event id", eventid_,"Event id."); add_option("system", system_, "could be oe or nlx"); add_option("interleave", interleave_, "always activate the same ttl or activate a ttl by input slots."); } void TTLSink::Configure(const GlobalContext &context){ if(system_() != "oe" and system_() != "nlx"){ throw ProcessingConfigureError("System option can be only oe or nlx.", name()); } } void TTLSink::CreatePorts() { data_port_ = create_input_port("data", AnyType::Capabilities(), PortInPolicy(SlotRange(1, 256), false)); } void TTLSink::Preprocess(ProcessingContext &context) { socket_= std::make_unique(context.run().global().zmq(), ZMQ_REQ); std::string address = "tcp://"+ address_() +":" + std::to_string(port_()); socket_->connect(address.c_str()); int t = 3000; zmq_setsockopt(*(socket_), ZMQ_RCVTIMEO, &t, sizeof(t)); serializer_.reset(Serialization::serializer(Serialization::Encoding::YAML, Serialization::Format::FULL)); } void TTLSink::Process(ProcessingContext &context) { std::vector data; zmq_frames buffer; zmq_frames reply; std::stringstream buffer_serialization; int ttl; while (!context.terminated()) { for (int k = 0; k < data_port_->number_of_slots(); ++k) { if (!data_port_->slot(k)->RetrieveDataAll(data, 0)) { break; } for (auto &it : data) { reply.clear(); buffer.clear(); if(interleave_()){ ttl = ttl_() + k; }else{ ttl = ttl_(); } if(system_() == "nlx"){ buffer.push_back("event"); buffer_serialization.str(""); buffer_serialization.clear(); if (serializer_->Serialize(buffer_serialization, it, k, 0, data_port_->slot(k)->upstream_address().processor(), data_port_->slot(k)->upstream_address().port(), data_port_->slot(k)->upstream_address().slot())) { buffer.push_back(buffer_serialization.str()); buffer.push_back(std::to_string(ttl)); buffer.push_back(std::to_string(eventid_())); if (!s_send_multi(*(socket_), buffer)) { LOG(DEBUG) << "failed to send zmq message."; }else{ reply = s_blocking_recv_multi(*(socket_)); LOG(DEBUG) << "nlx reply: " << reply[0]; } } else { LOG(WARNING) << name() << ": Unable to serialize data stream " << k; } }else if(system_()== "oe"){ if (!s_send(*(socket_), "TTL "+ std::to_string(ttl)+" on=1")) { LOG(DEBUG) << "failed to send zmq message."; }else{ reply = s_blocking_recv_multi(*(socket_)); LOG(DEBUG) << "oe reply: " << reply[0]; } if (!s_send(*(socket_), "TTL "+ std::to_string(ttl)+" on=0")) { LOG(DEBUG) << "failed to send zmq message."; }else{ reply = s_blocking_recv_multi(*(socket_)); LOG(DEBUG) << "oe reply: " << reply[0]; } } } data_port_->slot(k)->ReleaseData(); } } } void TTLSink::Postprocess(ProcessingContext &context) { socket_->close(); } REGISTERPROCESSOR(TTLSink)