Program Listing for File ttlsink.cpp¶
↰ Return to documentation for file (processors/TTLSink/ttlsink.cpp)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "ttlsink.hpp"
#include <string>
#include <utility>
#include "idata.hpp"
#include "utilities/zmqutil.hpp"
#include "utilities/string.hpp"
TTLSink::TTLSink() : IProcessor() {
add_option("address", address_, "Cheetah or Open ephys ip address");
add_option("port", port_,"Cheetah network port.");
add_option("ttl", ttl_,"TTL");
add_option("event id", eventid_,"Event id.");
add_option("system", system_, "could be oe or nlx");
add_option("interleave", interleave_, "always activate the same ttl or activate a ttl by input slots.");
}
void TTLSink::Configure(const GlobalContext &context){
if(system_() != "oe" and system_() != "nlx"){
throw ProcessingConfigureError("System option can be only oe or nlx.", name());
}
}
void TTLSink::CreatePorts() {
data_port_ = create_input_port<AnyType>("data", AnyType::Capabilities(),
PortInPolicy(SlotRange(1, 256), false));
}
void TTLSink::Preprocess(ProcessingContext &context) {
socket_= std::make_unique<zmq::socket_t>(context.run().global().zmq(), ZMQ_REQ);
std::string address = "tcp://"+ address_() +":" + std::to_string(port_());
socket_->connect(address.c_str());
int t = 3000;
zmq_setsockopt(*(socket_), ZMQ_RCVTIMEO, &t, sizeof(t));
serializer_.reset(Serialization::serializer(Serialization::Encoding::YAML, Serialization::Format::FULL));
}
void TTLSink::Process(ProcessingContext &context) {
std::vector<typename AnyType::Data *> data;
zmq_frames buffer;
zmq_frames reply;
std::stringstream buffer_serialization;
int ttl;
while (!context.terminated()) {
for (int k = 0; k < data_port_->number_of_slots(); ++k) {
if (!data_port_->slot(k)->RetrieveDataAll(data, 0)) {
break;
}
for (auto &it : data) {
reply.clear();
buffer.clear();
if(interleave_()){
ttl = ttl_() + k;
}else{
ttl = ttl_();
}
if(system_() == "nlx"){
buffer.push_back("event");
buffer_serialization.str("");
buffer_serialization.clear();
if (serializer_->Serialize(buffer_serialization, it, k, 0,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot())) {
buffer.push_back(buffer_serialization.str());
buffer.push_back(std::to_string(ttl));
buffer.push_back(std::to_string(eventid_()));
if (!s_send_multi(*(socket_), buffer)) {
LOG(DEBUG) << "failed to send zmq message.";
}else{
reply = s_blocking_recv_multi(*(socket_));
LOG(DEBUG) << "nlx reply: " << reply[0];
}
} else {
LOG(WARNING) << name() << ": Unable to serialize data stream " << k;
}
}else if(system_()== "oe"){
if (!s_send(*(socket_), "TTL "+ std::to_string(ttl)+" on=1")) {
LOG(DEBUG) << "failed to send zmq message.";
}else{
reply = s_blocking_recv_multi(*(socket_));
LOG(DEBUG) << "oe reply: " << reply[0];
}
if (!s_send(*(socket_), "TTL "+ std::to_string(ttl)+" on=0")) {
LOG(DEBUG) << "failed to send zmq message.";
}else{
reply = s_blocking_recv_multi(*(socket_));
LOG(DEBUG) << "oe reply: " << reply[0];
}
}
}
data_port_->slot(k)->ReleaseData();
}
}
}
void TTLSink::Postprocess(ProcessingContext &context) {
socket_->close();
}
REGISTERPROCESSOR(TTLSink)