Program Listing for File nlxpurereader.cpp

Return to documentation for file (processors/nlxpurereader/nlxpurereader.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#include "nlxpurereader.hpp"

#include <limits>
#include <memory>

NlxPureReader::NlxPureReader() : IProcessor(PRIORITY_HIGH) {
  add_option("address", address_, "IP address of Digilynx acquisition system.");
  add_option("port", port_,
             "Port number for communication with Digilynx acquisition system.");
  add_option("npackets", npackets_,
             "The total number of data packets to read "
             "(0 means continuous recording).");
  add_option("nchannels", nchannels_,
             "The number of channels of the Digilynx acquisition system.");
}

void NlxPureReader::CreatePorts() {
  output_port_ = create_output_port<VectorType<uint32_t>>(
      "udp", VectorType<uint32_t>::Capabilities(),
      VectorType<uint32_t>::Parameters(nlx::NLX_NFIELDS(nchannels_())),
      PortOutPolicy(SlotRange(1), 500, WaitStrategy::kBlockingStrategy));

  n_invalid_ = create_broadcaster_state<uint64_t>(
      "n_invalid", 0, Permission::READ,
      "The number of invalid packets that were received.");
}

void NlxPureReader::CompleteStreamInfo() {
  output_port_->streaminfo(0).set_stream_rate(
      nlx::NLX_SIGNAL_SAMPLING_FREQUENCY);
}

void NlxPureReader::Prepare(GlobalContext &context) {
  LOG(INFO) << name() << ". Data will be read from " << address_() << ":"
            << port_() << ".";

  memset(reinterpret_cast<char *>(&server_addr_), 0, sizeof(server_addr_));
  server_addr_.sin_family = AF_INET;
  server_addr_.sin_addr.s_addr = inet_addr(address_().c_str());
  server_addr_.sin_port = htons(port_());
}

void NlxPureReader::Preprocess(ProcessingContext &context) {
  valid_packet_counter_ = 0;
  const int reuse_address = 1;
  n_invalid_->set(0);
  sleep(1);  // reduces probability of missed packets when connecting to ongoing
            // stream

  if ((udp_socket_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    throw ProcessingPrepareError("Unable to create socket.", name());
  }

  LOG(UPDATE) << name() << ". Socket created.";

  setsockopt(udp_socket_, SOL_SOCKET, SO_REUSEADDR, &reuse_address,
             sizeof(int));
  if (bind(udp_socket_, (struct sockaddr *)&server_addr_,
           sizeof(server_addr_)) < 0) {
    throw ProcessingPreprocessingError("Socket binding failed.", name());
  }

  LOG(UPDATE) << name() << ". Socket binding successful.";
  udp_socket_select_ = udp_socket_ + 1;
}

void NlxPureReader::Process(ProcessingContext &context) {
  while (!context.terminated() && valid_packet_counter_ < npackets_()) {
    // check if packets have arrived (with time-out)
    // clear the file descriptor set
    FD_ZERO(&file_descriptor_set_);
    // add the socket (it's basically acting like a filedescriptor) to the set
    FD_SET(udp_socket_, &file_descriptor_set_);

    // set time-out
    timeout_.tv_sec = TIMEOUT_SEC;
    timeout_.tv_usec = 0;

    // packets available?
    size_ = select(udp_socket_select_, &file_descriptor_set_, 0, 0, &timeout_);

    if (size_ == 0) {
      LOG(DEBUG) << name() << ": Timed out waiting for data. Connection lost?";
      continue;
    } else if (size_ == -1) {
      LOG(DEBUG) << name() << ": Select error on UDP socket.";
      continue;
    } else if (size_ > 0) {  // received packet
      data_out_ = output_port_->slot(0)->ClaimData(false);

      recvlen_ = recvfrom(
          udp_socket_, reinterpret_cast<char *>(data_out_->data().data()),
          data_out_->data().size() * sizeof(uint32_t), 0, NULL, NULL);

      if (recvlen_ != static_cast<int>(data_out_->data().size()) *
                          sizeof(uint32_t)) {
        n_invalid_->set(n_invalid_->get() + 1);
        LOG(UPDATE) << name() << ". Received invalid record.";
        continue;
      } else {
        valid_packet_counter_++;
      }

      if (valid_packet_counter_ == 1) {
        first_valid_packet_arrival_time_ = Clock::now();
        LOG(UPDATE) << name() << ". Received first UDP data packet.";
      }

      data_out_->set_source_timestamp();
      output_port_->slot(0)->PublishData();
    } else {
      throw ProcessingError("Unexpected size value returned.", name());
    }
  }
  LOG(UPDATE) << name()
              << ". Requested number of packets was read. You can now STOP the "
                 "processing.";
}

void NlxPureReader::Postprocess(ProcessingContext &context) {
  std::chrono::milliseconds runtime(
      std::chrono::duration_cast<std::chrono::milliseconds>(
          Clock::now() - first_valid_packet_arrival_time_));

  LOG(UPDATE) << name() << ". Finished reading : " << valid_packet_counter_
              << " packets received over "
              << static_cast<double>(runtime.count()) / 1000
              << " seconds at a rate of "
              << valid_packet_counter_ /
                     (static_cast<double>(runtime.count()) / 1000)
              << " packets/second.";

  close(udp_socket_);

  LOG(UPDATE) << name() << ". Streamed "
              << output_port_->slot(0)->nitems_produced()
              << " multi-channel data items.";
}

REGISTERPROCESSOR(NlxPureReader)