Program Listing for File openephysZMQ.cpp¶
↰ Return to documentation for file (processors/openephysZMQ/openephysZMQ.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2021 - present Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "openephysZMQ.hpp"
#include <algorithm>
#include <string>
#include <vector>
OpenEphysZMQ::OpenEphysZMQ() : IProcessor(PRIORITY_HIGH), builder_(flatbuilder_) {
add_option("address", address_, "IP address of Open-Ephys zmq communication");
add_option("port", port_,"Port of Open-Ephys zmq communication");
add_option("npackets", npackets_,
"The total number of data packets to read "
"(0 means continuous recording).");
add_option("batch size", batch_size_,
"The number of data packets to concatenate into "
"single multi-channel data bucket.");
add_option("nchannels", nchannels_,
"The number of channels in the data packet sent by Open-Ephys.");
}
void OpenEphysZMQ::CreatePorts() {
data_port_= create_output_port<MultiChannelType<double>>(
"data",
MultiChannelType<double>::Capabilities(ChannelRange(nchannels_())),
MultiChannelType<double>::Parameters(),
PortOutPolicy(SlotRange(1), 500, WaitStrategy::kBlockingStrategy));
}
void OpenEphysZMQ::CompleteStreamInfo() {
data_port_->streaminfo(0).set_parameters(
MultiChannelType<double>::Parameters(
nchannels_(), batch_size_()));
data_port_->streaminfo(0).set_stream_rate(IRREGULARSTREAM);
}
void OpenEphysZMQ::Preprocess(ProcessingContext &context) {
auto tcp_address = "tcp://" + address_() + ":" + std::to_string(port_());
try {
socket_ = zmq::socket_t(context.run().global().zmq(), ZMQ_SUB);
zmq_setsockopt(socket_, ZMQ_SUBSCRIBE, nullptr, 0);
socket_.connect(tcp_address);
} catch (...) {
throw ProcessingPreprocessingError("Error when connecting the socket to the address: "+ tcp_address, name());
}
LOG(INFO) << name() << ". Falcon is connected to the address: " << tcp_address
<< " and is ready to receive data from OpenEphys.";
missing_packets_counter_ = 0;
valid_packets_counter_ = 0;
invalid_packets_counter_=0;
}
void OpenEphysZMQ::Process(ProcessingContext &context) {
unsigned int sample_counter_ = batch_size_();
MultiChannelType<double>::Data::sample_iterator data_out_iter;
flatbuffers::VectorIterator<float, float> data_in_iter;
MultiChannelType<double>::Data* data_out;
const openephysflatbuffer::ContinuousData* data;
while (!context.terminated() && valid_packets_counter_ < npackets_()) {
zmq_msg_t message;
zmq_msg_init (&message);
if (zmq_msg_recv(&message, socket_, ZMQ_DONTWAIT) != -1) {
try {
data = openephysflatbuffer::GetContinuousData(zmq_msg_data(&message));
} catch (...) {
LOG(DEBUG) << "Impossible to parse the packet received - skipping to the next.";
invalid_packets_counter_++;
continue;
}
if(data->n_channels() != nchannels_()){
throw ProcessingError("The number of channels (" + std::to_string(data->n_channels())
+ ") received in the Open-Ephys packet is different "
+ "from the number of channels expected by Falcon ("
+ std::to_string(nchannels_()) + ").", name());
}
valid_packets_counter_++;
uint64_t init_ts = data->timestamp();
if (valid_packets_counter_ == 1) {
first_valid_packet_arrival_time_ = Clock::now();
LOG(INFO) << name() << ". Received first valid data packet"
<< " (OE TS = " << data->timestamp() << ")";
} else if (last_message_number_ + 1 !=
data->message_id()) {
LOG(DEBUG) << name() << ". Message lost: "
<< data->message_id() - last_message_number_;
missing_packets_counter_++;
}
last_message_number_ = data->message_id();
uint64_t n_samples = data->n_samples();
LOG(DEBUG) << name() << ". Number of samples in the packet: " << n_samples;
data_in_iter = data->samples()->begin();
for(uint64_t sample=0; sample<n_samples; sample++){
if (sample_counter_ == batch_size_()) {
data_out= data_port_->slot(0)->ClaimData(false);
// set data bucket metadata
data_out->set_hardware_timestamp(init_ts);
data_out->set_source_timestamp();
sample_counter_ = 0;
}
data_out->set_sample_timestamp(sample_counter_, init_ts+sample);
data_out_iter = data_out->begin_sample(sample_counter_);
for(uint64_t channel=0; channel<nchannels_(); channel++){
(*data_out_iter) = *(data_in_iter + n_samples*channel);
++data_out_iter;
}
++data_in_iter;
++sample_counter_;
if (sample_counter_ == batch_size_()) {
data_port_->slot(0)->PublishData();
}
}
}
zmq_msg_close(&message);
}
}
void OpenEphysZMQ::Postprocess(ProcessingContext &context) {
LOG_IF(UPDATE, (valid_packets_counter_ == npackets_()))
<< "Requested number of packets was read. You can now STOP processing.";
std::chrono::milliseconds runtime(
std::chrono::duration_cast<std::chrono::milliseconds>(
Clock::now() - first_valid_packet_arrival_time_));
LOG(UPDATE) << name() << ". Finished reading : " << valid_packets_counter_
<< " packets received over "
<< static_cast<double>(runtime.count()) / 1000
<< " seconds at a rate of "
<< valid_packets_counter_ /
(static_cast<double>(runtime.count()) / 1000)
<< " packets/second.";
LOG(DEBUG) << name() << ". " << invalid_packets_counter_ << " packets were detected as invalid and could not be parse.";
LOG(DEBUG) << name() << ". " << missing_packets_counter_ << " packets were detected as missing.";
socket_.close();
}
REGISTERPROCESSOR(OpenEphysZMQ)