Program Listing for File nlxparser.cpp

Return to documentation for file (processors/nlxparser/nlxparser.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#include "nlxparser.hpp"

#include <algorithm>
#include <limits>
#include <memory>

std::string gapfill_to_string(GapFill x) {
  std::string s;
#define MATCH(p)                                                               \
  case (GapFill::p):                                                           \
    s = #p;                                                                    \
    break;
  switch (x) {
    MATCH(NONE)
    MATCH(ASAP);
    MATCH(DISTRIBUTED);
  }
#undef MATCH
  return s;
}

GapFill string_to_gapfill(std::string s) {
  std::transform(s.begin(), s.end(), s.begin(), (int (*)(int))std::toupper);
#define MATCH(p)                                                               \
  if (s == #p) {                                                               \
    return GapFill::p;                                                         \
  }
  MATCH(NONE)
  MATCH(ASAP);
  MATCH(DISTRIBUTED);
  throw std::runtime_error("Invalid GapFill value.");
#undef MATCH
}

NlxParser::NlxParser() : IProcessor(PRIORITY_HIGH) {
  add_option("batch size", batch_size_,
             "The number of data packets to concatenate into "
             "single multi-channel data bucket.");
  add_option("update interval", update_interval_,
             "The time interval for updates on the received data from "
             "the Digilynx acquisition system.");
  add_option("trigger/enabled", triggered_,
             "Whether or not to wait for hardware trigger to start "
             "streaming data packets.");
  add_option("trigger/channel", hardware_trigger_channel_,
             "Digital input channel to use as hardware trigger");
  add_option(
      "gap fill", gap_fill_,
      "Method of filling in missing data packets. If 'none', no filling of "
      "missed packets is performed. If 'asap', all missed packets will be "
      "filled with last available batch of samples. If 'distributed', "
      "missed packets will be filled with the last available batch of samples "
      "at each iteration.");
}

void NlxParser::CreatePorts() {
  data_in_port_ = create_input_port<VectorType<uint32_t>>(
      "udp", VectorType<uint32_t>::Capabilities(), PortInPolicy(SlotRange(1)));

  output_port_signal_ = create_output_port<MultiChannelType<double>>(
      "data",
      MultiChannelType<double>::Capabilities(
          ChannelRange(1, nlx::NLX_MAX_NCHANNELS)),
      MultiChannelType<double>::Parameters(), PortOutPolicy(SlotRange(1), 500));

  output_port_ttl_ = create_output_port<MultiChannelType<uint32_t>>(
      "ttl", MultiChannelType<uint32_t>::Capabilities(ChannelRange(1)),
      MultiChannelType<uint32_t>::Parameters(),
      PortOutPolicy(SlotRange(1), 500));

  n_invalid_ = create_broadcaster_state<uint64_t>(
      "n_invalid", 0, Permission::READ,
      "The number of invalid packets that were received.");
}

void NlxParser::CompleteStreamInfo() {
  nchannels_ = nlx::NLX_NCHANNELS_FROM_NFIELDS(
      data_in_port_->slot(0)->streaminfo().parameters().size);

  LOG(INFO) << name() << ": parsing " << nchannels_
            << " channels raw digilynx data.";

  nlxrecord_.set_nchannels(nchannels_);

  output_port_signal_->streaminfo(0).set_parameters(
      MultiChannelType<double>::Parameters(
          nchannels_, batch_size_(),
          data_in_port_->slot(0)->streaminfo().stream_rate()));

  output_port_signal_->streaminfo(0).set_stream_rate(
      data_in_port_->slot(0)->streaminfo().stream_rate() / batch_size_());

  output_port_ttl_->streaminfo(0).set_parameters(
      MultiChannelType<double>::Parameters(
          1, batch_size_(),
          data_in_port_->slot(0)->streaminfo().stream_rate()));

  output_port_ttl_->streaminfo(0).set_stream_rate(
      data_in_port_->slot(0)->streaminfo().stream_rate() / batch_size_());
}

void NlxParser::Prepare(GlobalContext &context) {
  // create channel list
  channel_list_.resize(nchannels_);
  for (unsigned int i = 0; i < nchannels_; i++) {
    channel_list_[i] = i;
  }
}

void NlxParser::Preprocess(ProcessingContext &context) {
  sample_counter_ = batch_size_();
  valid_packet_counter_ = 0;
  timestamp_ = nlx::INVALID_TIMESTAMP;
  last_timestamp_ = nlx::INVALID_TIMESTAMP;
  stats_.clear();
  n_filling_packets_ = 0;
}

void NlxParser::Process(ProcessingContext &context) {
  bool update_time = false;
  unsigned int i = 0;
  int b = 0;
  decltype(n_filling_packets_) packets_lag = 0;

  VectorType<uint32_t>::Data *data_in = nullptr;
  MultiChannelType<double>::Data::sample_iterator data_iter;
  MultiChannelType<double>::Data *data_out = nullptr;
  MultiChannelType<uint32_t>::Data *ttl_data_out = nullptr;

  while (!context.terminated()) {
    if (!data_in_port_->slot(0)->RetrieveData(data_in)) {
      break;
    }

    int rc = nlxrecord_.FromNetworkBuffer(data_in->data());
    if (rc != 0) {
      ++stats_.n_invalid;

      LOG(INFO) << name() << ": Received invalid record.";

      LOG(DEBUG) << name()<< ". STX field: " << nlxrecord_.buffer_[nlx::NLX_FIELD_STX]
                 << " instead of " << nlx::NLX_STX;
      LOG(DEBUG) << name() << ". Raw packet id:"<< nlxrecord_.buffer_[nlx::NLX_FIELD_RAWPACKETID]
                 << " instead of " << nlx::NLX_RAWPACKETID;
      LOG(DEBUG) << name() << ". Packet size field: "
                 << " \nReported size in the packet: " << nlxrecord_.buffer_[nlx::NLX_FIELD_PACKETSIZE]
                 << " \nExpected size: " << nlxrecord_.nlx_packetsize_;
      LOG_IF(DEBUG, rc == nlx::ERROR_BAD_CRC) << name() <<". Error Bad CRC";

      continue;
    }

    timestamp_ = nlx::CheckTimestamp(nlxrecord_, last_timestamp_, stats_);
    valid_packet_counter_++;
    data_in_port_->slot(0)->ReleaseData();

    if (valid_packet_counter_ == 1) {
      first_valid_packet_arrival_time_ = Clock::now();
      LOG(UPDATE) << name() << ". Received first valid data packet"
                  << " (TS = " << timestamp_ << ").";
    }

    if (triggered_()) {
      LOG_IF(UPDATE, (valid_packet_counter_ == 1))
          << name() << ". Waiting for hardware trigger on channel "
          << hardware_trigger_channel_() << ".";
      if (nlxrecord_.parallel_port() & (1 << hardware_trigger_channel_())) {
        triggered_ = false;
        LOG(UPDATE) << name() << ". Dispatching starts now.";
      } else {
        continue;
      }
    }

    update_time = valid_packet_counter_ % update_interval_() == 0;
    LOG_IF(UPDATE, update_time)
        << name() << ": " << valid_packet_counter_ << " packets ("
        << valid_packet_counter_ / data_in_port_->streaminfo(0).stream_rate()
        << " s) received.";
    print_stats(update_time);

    if (sample_counter_ == batch_size_()) {
      data_out = output_port_signal_->slot(0)->ClaimData(false);
      data_out->set_hardware_timestamp(timestamp_);
      ttl_data_out = output_port_ttl_->slot(0)->ClaimData(false);
      ttl_data_out->set_hardware_timestamp(timestamp_);
      sample_counter_ = 0;
    }

    // copy data from current packet onto buffer for each channel
    data_out->set_sample_timestamp(sample_counter_, timestamp_);
    ttl_data_out->set_sample_timestamp(sample_counter_, timestamp_);
    data_iter = data_out->begin_sample(sample_counter_);
    for (auto &channel : channel_list_) {
      (*data_iter) = nlxrecord_.sample_microvolt(channel);
      ++data_iter;
    }
    ttl_data_out->set_data_sample(sample_counter_, 0,
                                  nlxrecord_.parallel_port());
    ++sample_counter_;

    if (sample_counter_ == batch_size_()) {
      output_port_signal_->slot(0)->PublishData();
      output_port_ttl_->slot(0)->PublishData();
    }

    // stream additional packets if there were missed packets
    if (gap_fill_() != GapFill::NONE && sample_counter_ == batch_size_()) {
      packets_lag = stats_.n_missed - n_filling_packets_;
      if (packets_lag >= batch_size_()) {
        for (b = 0; b < packets_lag / batch_size_(); ++b) {
          data_out = output_port_signal_->slot(0)->ClaimData(false);
          LOG(DEBUG) << name() << ". mcd packet timestamp_: " << timestamp_;
          data_out->set_hardware_timestamp(timestamp_);
          ttl_data_out = output_port_ttl_->slot(0)->ClaimData(false);
          ttl_data_out->set_hardware_timestamp(timestamp_);
          LOG(DEBUG) << name() << ". mcd packet timestamp_: " << timestamp_;

          for (i = 0; i < batch_size_(); i++) {
            data_out->set_sample_timestamp(i, timestamp_);
            ttl_data_out->set_sample_timestamp(i, timestamp_);
            data_iter = data_out->begin_sample(i);

            for (auto &channel : channel_list_) {
              (*data_iter) = nlxrecord_.sample_microvolt(channel);
              ++data_iter;
            }

            ttl_data_out->set_data_sample(i, 0, nlxrecord_.parallel_port());
            LOG(DEBUG) << name() << ". timestamp_: " << timestamp_
                       << "; i=" << i;
          }
          output_port_signal_->slot(0)->PublishData();
          output_port_ttl_->slot(0)->PublishData();
          LOG(UPDATE) << name() << ". Streamed " << batch_size_()
                      << " duplicated samples to fill missed packets.";
          n_filling_packets_ += batch_size_();
          if (gap_fill_() == GapFill::DISTRIBUTED) {
            break;
          }
        }
      }
    }
  }
}

void NlxParser::Postprocess(ProcessingContext &context) {
  std::chrono::milliseconds runtime(
      std::chrono::duration_cast<std::chrono::milliseconds>(
          Clock::now() - first_valid_packet_arrival_time_));

  LOG(UPDATE) << name() << ". Finished reading : " << valid_packet_counter_
              << " packets received over "
              << static_cast<double>(runtime.count()) / 1000
              << " seconds at a rate of "
              << valid_packet_counter_ /
                     (static_cast<double>(runtime.count()) / 1000)
              << " packets/second.";
  print_stats();

  LOG(UPDATE) << name() << ". Streamed "
              << output_port_signal_->slot(0)->nitems_produced()
              << " multi-channel data items.";
}

void NlxParser::print_stats(bool condition) {
  LOG_IF(UPDATE, condition)
      << name() << ". Stats report: " << stats_.n_invalid << " invalid, "
      << stats_.n_duplicated << " duplicated, " << stats_.n_outoforder
      << " out of order, " << stats_.n_missed << " missed, " << stats_.n_gaps
      << " gaps. " << n_filling_packets_
      << " packets were filled. Synchronous lag: "
      << (stats_.n_missed - n_filling_packets_) /
             data_in_port_->slot(0)->streaminfo().stream_rate() * 1e3
      << " ms.";
}

REGISTERPROCESSOR(NlxParser)