.. _program_listing_file_processors_openephysZMQ_openephysZMQ.cpp: Program Listing for File openephysZMQ.cpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``processors/openephysZMQ/openephysZMQ.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2021 - present Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include "openephysZMQ.hpp" #include #include #include OpenEphysZMQ::OpenEphysZMQ() : IProcessor(PRIORITY_HIGH), builder_(flatbuilder_) { add_option("address", address_, "IP address of Open-Ephys zmq communication"); add_option("port", port_,"Port of Open-Ephys zmq communication"); add_option("npackets", npackets_, "The total number of data packets to read " "(0 means continuous recording)."); add_option("batch size", batch_size_, "The number of data packets to concatenate into " "single multi-channel data bucket."); add_option("nchannels", nchannels_, "The number of channels in the data packet sent by Open-Ephys."); } void OpenEphysZMQ::CreatePorts() { data_port_= create_output_port>( "data", MultiChannelType::Capabilities(ChannelRange(nchannels_())), MultiChannelType::Parameters(), PortOutPolicy(SlotRange(1), 500, WaitStrategy::kBlockingStrategy)); } void OpenEphysZMQ::CompleteStreamInfo() { data_port_->streaminfo(0).set_parameters( MultiChannelType::Parameters( nchannels_(), batch_size_())); data_port_->streaminfo(0).set_stream_rate(IRREGULARSTREAM); } void OpenEphysZMQ::Preprocess(ProcessingContext &context) { auto tcp_address = "tcp://" + address_() + ":" + std::to_string(port_()); try { socket_ = zmq::socket_t(context.run().global().zmq(), ZMQ_SUB); zmq_setsockopt(socket_, ZMQ_SUBSCRIBE, nullptr, 0); socket_.connect(tcp_address); } catch (...) { throw ProcessingPreprocessingError("Error when connecting the socket to the address: "+ tcp_address, name()); } LOG(INFO) << name() << ". Falcon is connected to the address: " << tcp_address << " and is ready to receive data from OpenEphys."; missing_packets_counter_ = 0; valid_packets_counter_ = 0; invalid_packets_counter_=0; } void OpenEphysZMQ::Process(ProcessingContext &context) { unsigned int sample_counter_ = batch_size_(); MultiChannelType::Data::sample_iterator data_out_iter; flatbuffers::VectorIterator data_in_iter; MultiChannelType::Data* data_out; const openephysflatbuffer::ContinuousData* data; while (!context.terminated() && valid_packets_counter_ < npackets_()) { zmq_msg_t message; zmq_msg_init (&message); if (zmq_msg_recv(&message, socket_, ZMQ_DONTWAIT) != -1) { try { data = openephysflatbuffer::GetContinuousData(zmq_msg_data(&message)); } catch (...) { LOG(DEBUG) << "Impossible to parse the packet received - skipping to the next."; invalid_packets_counter_++; continue; } if(data->n_channels() != nchannels_()){ throw ProcessingError("The number of channels (" + std::to_string(data->n_channels()) + ") received in the Open-Ephys packet is different " + "from the number of channels expected by Falcon (" + std::to_string(nchannels_()) + ").", name()); } valid_packets_counter_++; uint64_t init_ts = data->timestamp(); if (valid_packets_counter_ == 1) { first_valid_packet_arrival_time_ = Clock::now(); LOG(INFO) << name() << ". Received first valid data packet" << " (OE TS = " << data->timestamp() << ")"; } else if (last_message_number_ + 1 != data->message_id()) { LOG(DEBUG) << name() << ". Message lost: " << data->message_id() - last_message_number_; missing_packets_counter_++; } last_message_number_ = data->message_id(); uint64_t n_samples = data->n_samples(); LOG(DEBUG) << name() << ". Number of samples in the packet: " << n_samples; data_in_iter = data->samples()->begin(); for(uint64_t sample=0; sampleslot(0)->ClaimData(false); // set data bucket metadata data_out->set_hardware_timestamp(init_ts); data_out->set_source_timestamp(); sample_counter_ = 0; } data_out->set_sample_timestamp(sample_counter_, init_ts+sample); data_out_iter = data_out->begin_sample(sample_counter_); for(uint64_t channel=0; channelslot(0)->PublishData(); } } } zmq_msg_close(&message); } } void OpenEphysZMQ::Postprocess(ProcessingContext &context) { LOG_IF(UPDATE, (valid_packets_counter_ == npackets_())) << "Requested number of packets was read. You can now STOP processing."; std::chrono::milliseconds runtime( std::chrono::duration_cast( Clock::now() - first_valid_packet_arrival_time_)); LOG(UPDATE) << name() << ". Finished reading : " << valid_packets_counter_ << " packets received over " << static_cast(runtime.count()) / 1000 << " seconds at a rate of " << valid_packets_counter_ / (static_cast(runtime.count()) / 1000) << " packets/second."; LOG(DEBUG) << name() << ". " << invalid_packets_counter_ << " packets were detected as invalid and could not be parse."; LOG(DEBUG) << name() << ". " << missing_packets_counter_ << " packets were detected as missing."; socket_.close(); } REGISTERPROCESSOR(OpenEphysZMQ)