.. _program_listing_file_processors_nlxparser_nlxparser.cpp: Program Listing for File nlxparser.cpp ====================================== |exhale_lsh| :ref:`Return to documentation for file ` (``processors/nlxparser/nlxparser.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include "nlxparser.hpp" #include #include #include std::string gapfill_to_string(GapFill x) { std::string s; #define MATCH(p) \ case (GapFill::p): \ s = #p; \ break; switch (x) { MATCH(NONE) MATCH(ASAP); MATCH(DISTRIBUTED); } #undef MATCH return s; } GapFill string_to_gapfill(std::string s) { std::transform(s.begin(), s.end(), s.begin(), (int (*)(int))std::toupper); #define MATCH(p) \ if (s == #p) { \ return GapFill::p; \ } MATCH(NONE) MATCH(ASAP); MATCH(DISTRIBUTED); throw std::runtime_error("Invalid GapFill value."); #undef MATCH } NlxParser::NlxParser() : IProcessor(PRIORITY_HIGH) { add_option("batch size", batch_size_, "The number of data packets to concatenate into " "single multi-channel data bucket."); add_option("update interval", update_interval_, "The time interval for updates on the received data from " "the Digilynx acquisition system."); add_option("trigger/enabled", triggered_, "Whether or not to wait for hardware trigger to start " "streaming data packets."); add_option("trigger/channel", hardware_trigger_channel_, "Digital input channel to use as hardware trigger"); add_option( "gap fill", gap_fill_, "Method of filling in missing data packets. If 'none', no filling of " "missed packets is performed. If 'asap', all missed packets will be " "filled with last available batch of samples. If 'distributed', " "missed packets will be filled with the last available batch of samples " "at each iteration."); } void NlxParser::CreatePorts() { data_in_port_ = create_input_port>( "udp", VectorType::Capabilities(), PortInPolicy(SlotRange(1))); output_port_signal_ = create_output_port>( "data", MultiChannelType::Capabilities( ChannelRange(1, nlx::NLX_MAX_NCHANNELS)), MultiChannelType::Parameters(), PortOutPolicy(SlotRange(1), 500)); output_port_ttl_ = create_output_port>( "ttl", MultiChannelType::Capabilities(ChannelRange(1)), MultiChannelType::Parameters(), PortOutPolicy(SlotRange(1), 500)); n_invalid_ = create_broadcaster_state( "n_invalid", 0, Permission::READ, "The number of invalid packets that were received."); } void NlxParser::CompleteStreamInfo() { nchannels_ = nlx::NLX_NCHANNELS_FROM_NFIELDS( data_in_port_->slot(0)->streaminfo().parameters().size); LOG(INFO) << name() << ": parsing " << nchannels_ << " channels raw digilynx data."; nlxrecord_.set_nchannels(nchannels_); output_port_signal_->streaminfo(0).set_parameters( MultiChannelType::Parameters( nchannels_, batch_size_(), data_in_port_->slot(0)->streaminfo().stream_rate())); output_port_signal_->streaminfo(0).set_stream_rate( data_in_port_->slot(0)->streaminfo().stream_rate() / batch_size_()); output_port_ttl_->streaminfo(0).set_parameters( MultiChannelType::Parameters( 1, batch_size_(), data_in_port_->slot(0)->streaminfo().stream_rate())); output_port_ttl_->streaminfo(0).set_stream_rate( data_in_port_->slot(0)->streaminfo().stream_rate() / batch_size_()); } void NlxParser::Prepare(GlobalContext &context) { // create channel list channel_list_.resize(nchannels_); for (unsigned int i = 0; i < nchannels_; i++) { channel_list_[i] = i; } } void NlxParser::Preprocess(ProcessingContext &context) { sample_counter_ = batch_size_(); valid_packet_counter_ = 0; timestamp_ = nlx::INVALID_TIMESTAMP; last_timestamp_ = nlx::INVALID_TIMESTAMP; stats_.clear(); n_filling_packets_ = 0; } void NlxParser::Process(ProcessingContext &context) { bool update_time = false; unsigned int i = 0; int b = 0; decltype(n_filling_packets_) packets_lag = 0; VectorType::Data *data_in = nullptr; MultiChannelType::Data::sample_iterator data_iter; MultiChannelType::Data *data_out = nullptr; MultiChannelType::Data *ttl_data_out = nullptr; while (!context.terminated()) { if (!data_in_port_->slot(0)->RetrieveData(data_in)) { break; } int rc = nlxrecord_.FromNetworkBuffer(data_in->data()); if (rc != 0) { ++stats_.n_invalid; LOG(INFO) << name() << ": Received invalid record."; LOG(DEBUG) << name()<< ". STX field: " << nlxrecord_.buffer_[nlx::NLX_FIELD_STX] << " instead of " << nlx::NLX_STX; LOG(DEBUG) << name() << ". Raw packet id:"<< nlxrecord_.buffer_[nlx::NLX_FIELD_RAWPACKETID] << " instead of " << nlx::NLX_RAWPACKETID; LOG(DEBUG) << name() << ". Packet size field: " << " \nReported size in the packet: " << nlxrecord_.buffer_[nlx::NLX_FIELD_PACKETSIZE] << " \nExpected size: " << nlxrecord_.nlx_packetsize_; LOG_IF(DEBUG, rc == nlx::ERROR_BAD_CRC) << name() <<". Error Bad CRC"; continue; } timestamp_ = nlx::CheckTimestamp(nlxrecord_, last_timestamp_, stats_); valid_packet_counter_++; data_in_port_->slot(0)->ReleaseData(); if (valid_packet_counter_ == 1) { first_valid_packet_arrival_time_ = Clock::now(); LOG(UPDATE) << name() << ". Received first valid data packet" << " (TS = " << timestamp_ << ")."; } if (triggered_()) { LOG_IF(UPDATE, (valid_packet_counter_ == 1)) << name() << ". Waiting for hardware trigger on channel " << hardware_trigger_channel_() << "."; if (nlxrecord_.parallel_port() & (1 << hardware_trigger_channel_())) { triggered_ = false; LOG(UPDATE) << name() << ". Dispatching starts now."; } else { continue; } } update_time = valid_packet_counter_ % update_interval_() == 0; LOG_IF(UPDATE, update_time) << name() << ": " << valid_packet_counter_ << " packets (" << valid_packet_counter_ / data_in_port_->streaminfo(0).stream_rate() << " s) received."; print_stats(update_time); if (sample_counter_ == batch_size_()) { data_out = output_port_signal_->slot(0)->ClaimData(false); data_out->set_hardware_timestamp(timestamp_); ttl_data_out = output_port_ttl_->slot(0)->ClaimData(false); ttl_data_out->set_hardware_timestamp(timestamp_); sample_counter_ = 0; } // copy data from current packet onto buffer for each channel data_out->set_sample_timestamp(sample_counter_, timestamp_); ttl_data_out->set_sample_timestamp(sample_counter_, timestamp_); data_iter = data_out->begin_sample(sample_counter_); for (auto &channel : channel_list_) { (*data_iter) = nlxrecord_.sample_microvolt(channel); ++data_iter; } ttl_data_out->set_data_sample(sample_counter_, 0, nlxrecord_.parallel_port()); ++sample_counter_; if (sample_counter_ == batch_size_()) { output_port_signal_->slot(0)->PublishData(); output_port_ttl_->slot(0)->PublishData(); } // stream additional packets if there were missed packets if (gap_fill_() != GapFill::NONE && sample_counter_ == batch_size_()) { packets_lag = stats_.n_missed - n_filling_packets_; if (packets_lag >= batch_size_()) { for (b = 0; b < packets_lag / batch_size_(); ++b) { data_out = output_port_signal_->slot(0)->ClaimData(false); LOG(DEBUG) << name() << ". mcd packet timestamp_: " << timestamp_; data_out->set_hardware_timestamp(timestamp_); ttl_data_out = output_port_ttl_->slot(0)->ClaimData(false); ttl_data_out->set_hardware_timestamp(timestamp_); LOG(DEBUG) << name() << ". mcd packet timestamp_: " << timestamp_; for (i = 0; i < batch_size_(); i++) { data_out->set_sample_timestamp(i, timestamp_); ttl_data_out->set_sample_timestamp(i, timestamp_); data_iter = data_out->begin_sample(i); for (auto &channel : channel_list_) { (*data_iter) = nlxrecord_.sample_microvolt(channel); ++data_iter; } ttl_data_out->set_data_sample(i, 0, nlxrecord_.parallel_port()); LOG(DEBUG) << name() << ". timestamp_: " << timestamp_ << "; i=" << i; } output_port_signal_->slot(0)->PublishData(); output_port_ttl_->slot(0)->PublishData(); LOG(UPDATE) << name() << ". Streamed " << batch_size_() << " duplicated samples to fill missed packets."; n_filling_packets_ += batch_size_(); if (gap_fill_() == GapFill::DISTRIBUTED) { break; } } } } } } void NlxParser::Postprocess(ProcessingContext &context) { std::chrono::milliseconds runtime( std::chrono::duration_cast( Clock::now() - first_valid_packet_arrival_time_)); LOG(UPDATE) << name() << ". Finished reading : " << valid_packet_counter_ << " packets received over " << static_cast(runtime.count()) / 1000 << " seconds at a rate of " << valid_packet_counter_ / (static_cast(runtime.count()) / 1000) << " packets/second."; print_stats(); LOG(UPDATE) << name() << ". Streamed " << output_port_signal_->slot(0)->nitems_produced() << " multi-channel data items."; } void NlxParser::print_stats(bool condition) { LOG_IF(UPDATE, condition) << name() << ". Stats report: " << stats_.n_invalid << " invalid, " << stats_.n_duplicated << " duplicated, " << stats_.n_outoforder << " out of order, " << stats_.n_missed << " missed, " << stats_.n_gaps << " gaps. " << n_filling_packets_ << " packets were filled. Synchronous lag: " << (stats_.n_missed - n_filling_packets_) / data_in_port_->slot(0)->streaminfo().stream_rate() * 1e3 << " ms."; } REGISTERPROCESSOR(NlxParser)