Program Listing for File distributor.cpp

Return to documentation for file (processors/distributor/distributor.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#include "distributor.hpp"
#include "utilities/general.hpp"

Distributor::Distributor() : IProcessor(PRIORITY_MEDIUM) {
    add_option("channelmap", channelmap_,
               "Mapping of columns in different datastreams.");

    add_option("channelmap file", channelmap_file_,
               "File path for a channelmap mapping of columns in different datastreams.");

    add_option("distribution mode", distribution_type_,
               "Select distribution over the ports (keyword: ports) or over the slots (keyword: slots).");

}

void Distributor::Configure(const GlobalContext &context){
    if(channelmap_file_() == "" and channelmap_().empty()){
        throw ProcessingConfigureError(". No channelmap given in input. Make use of the option channelmap or channelmap file.");
    }else if(channelmap_file_() != "" and not channelmap_().empty()){
        throw ProcessingConfigureError(". Two channelmaps given in input. Use only one option channelmap or channelmap file.");
    }
    else if(channelmap_file_() != ""){
        auto path = context.resolve_path(channelmap_file_());
        YAML::Node config = YAML::LoadFile(path);
        try{
            channelmap_.from_yaml(config);
        }catch (...){
            throw ProcessingConfigureError(". The channelmap is not valid.");
        }
    }

    if(distribution_type_() != "ports" and distribution_type_() != "slots"){
        throw ProcessingStreamInfoError(". Only ports or slots are a valable distribution mode to fill in the distribution mode option.");
    }
}

void Distributor::CreatePorts() {
    input_port_ = create_input_port<TimeSeriesType<double>>(
                  TimeSeriesType<double>::Capabilities(ChannelRange(1, MAX_N_CHANNELS)),
                  PortInPolicy(SlotRange(1)));

    if(distribution_type_()== "ports"){   // N port with 1 slot each (N = channelmap size)
        for (auto &it : channelmap_()) {
            data_ports_.push_back(create_output_port<TimeSeriesType<double>>(
                                    it.first,
                                    TimeSeriesType<double>::Parameters(),
                                    PortOutPolicy(SlotRange(1), BUFFER_SIZE, WAIT_STRATEGY)));
        }
     } else if(distribution_type_() == "slots"){   // 1 port with N slots (N = channelmap size)
        data_ports_.push_back(create_output_port<TimeSeriesType<double>>(
                                     TimeSeriesType<double>::Parameters(),
                                     PortOutPolicy(SlotRange(channelmap_().size()), BUFFER_SIZE, WAIT_STRATEGY)));
    }
}

void Distributor::CompleteStreamInfo() {
    auto incoming_batch_size = input_port_->prototype(0).nsamples();

    LOG_IF(INFO, !input_port_->prototype(0).resizable()) << name() << ". Incoming batch size: " << incoming_batch_size << ".";

    slot_ = 0;
    int port = 0;

    LOG(INFO)<< name() << ": Data is splitted in " << channelmap_().size() << " parallel packets.";
    for (auto &it : channelmap_()) {
        if(distribution_type_() == "ports"){   // N port with 1 slot each (N = channelmap size)

            data_ports_[port]->streaminfo(0).set_parameters(
                        TimeSeriesType<double>::Parameters(
                            it.second.get_labels(),
                            incoming_batch_size,
                            input_port_->prototype(0).sample_rate(),
                            input_port_->prototype(0).resizable()));

             data_ports_[port]->streaminfo(0).set_stream_parameters(input_port_->streaminfo(0));
             port++;


        } else if(distribution_type_() == "slots"){  // 1 port with N slots (N = channelmap size)
            data_ports_[0]->streaminfo(slot_).set_parameters(
                        TimeSeriesType<double>::Parameters(
                            it.second.get_labels(),
                            incoming_batch_size,
                            input_port_->prototype(0).sample_rate(),
                            input_port_->prototype(0).resizable()));

             data_ports_[0]->streaminfo(slot_).set_stream_parameters(
                        input_port_->streaminfo(0).stream_rate(), it.first);

             slot_++;
        }

    }
}

void Distributor::Prepare(GlobalContext &context) {
    // check channel map
    for (auto const &it : channelmap_()) {
        if (it.second.size() == 0) {
            throw ProcessingPrepareError(
                        "Channel map entry " + it.first + " has zero channels.", name());
        }

        if(!it.second.is_subset(input_port_->prototype(0).labels())){
            throw ProcessingPrepareError(
                        "Channel list " + it.first + ": " + it.second.to_string() + " is invalid",
                        name());
        }

        if (!it.second.is_unique()){
            throw ProcessingPrepareError(
                        "Channel list " + it.first + ": " + it.second.to_string() + " cannot have duplicate channels",
                        name());
        }
    }
}

void Distributor::Process(ProcessingContext &context) {

    TimeSeriesType<double>::Data *data_in = nullptr;
    while (!context.terminated()) {

        // retrieve new data packet

        if (!input_port_->slot(0)->RetrieveData(data_in)) {
            break;
        }

        for (auto &it : data_ports_) {
            for (slot_ = 0; slot_ < it->number_of_slots(); slot_++) {
                auto data_out=it->slot(slot_)->ClaimData(true);

                for (auto ch: data_out->labels()) {
                    // publish data buckets
                    data_out->clone_column(ch, *data_in);
                }
                data_out->CloneTimestamps(*data_in);
                data_out->set_sample_timestamps(
                            data_in->sample_timestamps());
                it->slot(slot_)->PublishData();
            }
        }

        // release input data bucket
        input_port_->slot(0)->ReleaseData();
        }
}

void Distributor::Postprocess(ProcessingContext &context) {
    LOG(INFO) << name() << " Streamed "
              << data_ports_[0]->slot(0)->nitems_produced()<< " data packets.";
}

REGISTERPROCESSOR(Distributor)