Program Listing for File nlxreader.cpp

Return to documentation for file (processors/nlxreader/nlxreader.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "nlxreader.hpp"

#include <chrono>
#include <limits>

constexpr uint16_t NlxReader::MAX_NCHANNELS;
constexpr decltype(NlxReader::MAX_NCHANNELS) NlxReader::UDP_BUFFER_SIZE;

NlxReader::NlxReader() : IProcessor(PRIORITY_HIGH) {
  add_option("address", address_, "IP address of Digilynx acquisition system.");
  add_option("port", port_,
             "Port number for communication with Digilynx acquisition system.");
  add_option("channelmap", channelmap_,
             "Mapping of channels to processor output ports.", true);
  add_option("npackets", npackets_,
             "The total number of data packets to read "
             "(0 means continuous recording).");
  add_option("batch size", batch_size_,
             "The number of data packets to concatenate into "
             "single multi-channel data bucket.");
  add_option("nchannels", nchannels_,
             "The number of channels of the Digilynx acquisition system.");
  add_option("update interval", update_interval_,
             "The time interval for updates on the received data from "
             "the Digilynx acquisition system.");
  add_option("trigger/enable", triggered_,
             "Whether or not to wait for hardware trigger to start "
             "streaming data packets.");
  add_option("trigger/channel", hardware_trigger_channel_,
             "Digital input channel to use as hardware trigger");
}

void NlxReader::Configure(const GlobalContext &context) {
  nlxrecord_.set_nchannels(nchannels_());
}

void NlxReader::CreatePorts() {
  for (auto &it : channelmap_()) {
    data_ports_[it.first] = create_output_port<MultiChannelType<double>>(
        it.first,
        MultiChannelType<double>::Capabilities(ChannelRange(it.second.size())),
        MultiChannelType<double>::Parameters(),
        PortOutPolicy(SlotRange(1), 500, WaitStrategy::kBlockingStrategy));
  }
}

void NlxReader::CompleteStreamInfo() {
  for (auto &it : data_ports_) {
    // finalize data type with nsamples == batch_size and nchannels taken from
    // channel map
    it.second->streaminfo(0).set_parameters(
        MultiChannelType<double>::Parameters(
            channelmap_().at(it.first).size(), batch_size_(),
            nlx::NLX_SIGNAL_SAMPLING_FREQUENCY));
    it.second->streaminfo(0).set_stream_rate(
        nlx::NLX_SIGNAL_SAMPLING_FREQUENCY / batch_size_());
  }
}

void NlxReader::Prepare(GlobalContext &context) {
  memset(reinterpret_cast<char *>(&server_addr_), 0, sizeof(server_addr_));
  server_addr_.sin_family = AF_INET;
  server_addr_.sin_addr.s_addr = inet_addr(address_().c_str());
  server_addr_.sin_port = htons(port_());
}

void NlxReader::Preprocess(ProcessingContext &context) {
  sample_counter_ = batch_size_();
  valid_packet_counter_ = 0;
  const int y = 1;

  timestamp_ = nlx::INVALID_TIMESTAMP;
  last_timestamp_ = nlx::INVALID_TIMESTAMP;

  stats_.clear();

  if (context.test()) {
    prepare_latency_test(context);
  }

  sleep(1);  // reduces probability of missed packets when connecting to ongoing
            // stream

  if ((udp_socket_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    throw ProcessingPrepareError("Unable to create socket.", name());
  }
  LOG(UPDATE) << name() << ". Socket created.";
  setsockopt(udp_socket_, SOL_SOCKET, SO_REUSEADDR, &y, sizeof(int));
  if (bind(udp_socket_, (struct sockaddr *)&server_addr_,
           sizeof(server_addr_)) < 0) {
    throw ProcessingPreprocessingError("Socket binding failed.", name());
  }
  LOG(UPDATE) << name() << ". Socket binding successful.";
}

void NlxReader::Process(ProcessingContext &context) {
  bool update_time = false;
  int data_index = 0;
  MultiChannelType<double>::Data::sample_iterator data_iter;
  std::vector<MultiChannelType<double>::Data *> data_vector(data_ports_.size());

  while (!context.terminated() && valid_packet_counter_ < npackets_()) {
    // check if packets have arrived (with time-out)
    FD_ZERO(&file_descriptor_set_);  // clear the file descriptor set
    FD_SET(udp_socket_, &file_descriptor_set_);
    // add the socket (it's basically acting like a filedescriptor) to the set

    // set time-out
    timeout_.tv_sec = TIMEOUT_SEC;
    timeout_.tv_usec = 0;

    // packets available?
    ssize_t size =
        select(udp_socket_ + 1, &file_descriptor_set_, 0, 0, &timeout_);

    if (size == 0) {
      LOG(DEBUG) << name() << ": Timed out waiting for data. Connection lost?";
      continue;
    } else if (size == -1) {
      LOG(DEBUG) << name() << ": Select error on UDP socket.";
      continue;
    } else if (size > 0) {  // receive packet
      int recvlen =
          recvfrom(udp_socket_, buffer_, UDP_BUFFER_SIZE, 0, NULL, NULL);

      int rc = nlxrecord_.FromNetworkBuffer(buffer_, recvlen);

      if (rc != 0) {
        ++stats_.n_invalid;

        LOG(INFO) << name() << ": Received invalid record.";

        LOG(DEBUG) << name()<< ". STX field: " << nlxrecord_.buffer_[nlx::NLX_FIELD_STX]
                   << " instead of " << nlx::NLX_STX;
        LOG(DEBUG) << name() << ". Raw packet id:"<< nlxrecord_.buffer_[nlx::NLX_FIELD_RAWPACKETID]
                   << " instead of " << nlx::NLX_RAWPACKETID;
        LOG(DEBUG) << name() << ". Packet size field: "<< "Actual size: " << recvlen
                   << " \nReported size in the packet: " << nlxrecord_.buffer_[nlx::NLX_FIELD_PACKETSIZE]
                   << " \nExpected size: " << nlxrecord_.nlx_packetsize_;
        LOG_IF(DEBUG, rc == nlx::ERROR_BAD_CRC) << name() <<". Error Bad CRC";

        continue;
      }

      timestamp_ = nlx::CheckTimestamp(nlxrecord_, last_timestamp_, stats_);
      valid_packet_counter_++;

      if (valid_packet_counter_ == 1) {
        first_valid_packet_arrival_time_ = Clock::now();
        LOG(UPDATE) << name() << ": Received first valid data packet"
                    << " (TS = " << timestamp_ << ").";
      }

      update_time = valid_packet_counter_ % update_interval_() == 0;
      LOG_IF(UPDATE, update_time)
          << name() << ": " << valid_packet_counter_ << " packets ("
          << valid_packet_counter_ / nlx::NLX_SIGNAL_SAMPLING_FREQUENCY
          << " s) received.";
      print_stats(update_time);

      if (triggered_()) {
        LOG_IF(UPDATE, (valid_packet_counter_ == 1))
            << name() << ". Waiting for hardware trigger on channel "
            << hardware_trigger_channel_() << ".";
        if (nlxrecord_.parallel_port() & (1 << hardware_trigger_channel_())) {
          triggered_ = true;
          LOG(UPDATE) << name() << ". Dispatching starts now.";
        } else {
          continue;
        }
      }

      // claim new data buckets
      if (sample_counter_ == batch_size_()) {
        data_index = 0;
        for (auto &it : data_ports_) {
          data_vector[data_index] = it.second->slot(0)->ClaimData(false);
          // set data bucket metadata
          data_vector[data_index]->set_hardware_timestamp(timestamp_);
          data_vector[data_index]->set_source_timestamp();
          data_index++;
        }
        sample_counter_ = 0;
      }

      // copy data onto buffers for each configured channel group
      data_index = 0;
      for (auto &it : channelmap_()) {
        data_vector[data_index]->set_sample_timestamp(sample_counter_,
                                                      nlxrecord_.timestamp());
        data_iter = data_vector[data_index]->begin_sample(sample_counter_);
        for (auto &channel : it.second) {
          (*data_iter) = nlxrecord_.sample_microvolt(channel);
          ++data_iter;
        }
        data_index++;
      }

      ++sample_counter_;

      // publish data buckets
      if (sample_counter_ == batch_size_()) {
        for (auto &it : data_ports_) {
          it.second->slot(0)->PublishData();
        }
      }
    }
  }

  SlotType s;
  for (auto &it : data_ports_) {
    for (s = 0; s < it.second->number_of_slots(); ++s) {
      LOG(INFO) << name() << ". Port " << it.first << ". Slot " << s
                << ". Streamed " << it.second->slot(s)->nitems_produced()
                << " data packets. ";
    }
  }
}

void NlxReader::Postprocess(ProcessingContext &context) {
  LOG_IF(UPDATE, (valid_packet_counter_ == npackets_()))
      << "Requested number of packets was read. You can now STOP processing.";

  std::chrono::milliseconds runtime(
      std::chrono::duration_cast<std::chrono::milliseconds>(
          Clock::now() - first_valid_packet_arrival_time_));

  LOG(UPDATE) << name() << ". Finished reading : " << valid_packet_counter_
              << " packets received over "
              << static_cast<double>(runtime.count()) / 1000
              << " seconds at a rate of "
              << valid_packet_counter_ /
                     (static_cast<double>(runtime.count()) / 1000)
              << " packets/second.";
  print_stats();

  close(udp_socket_);

  if (context.test()) {
    save_source_timestamps_to_disk(valid_packet_counter_);
  }
}

void NlxReader::print_stats(bool condition) {
  LOG_IF(UPDATE, condition)
      << name() << ". Stats report: " << stats_.n_invalid << " invalid, "
      << stats_.n_duplicated << " duplicated, " << stats_.n_outoforder
      << " out of order, " << stats_.n_missed << " missed, " << stats_.n_gaps
      << " gaps.";
}

REGISTERPROCESSOR(NlxReader)