Program Listing for File distributor.cpp¶
↰ Return to documentation for file (processors/distributor/distributor.cpp)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "distributor.hpp"
#include "utilities/general.hpp"
Distributor::Distributor() : IProcessor(PRIORITY_MEDIUM) {
add_option("channelmap", channelmap_,
"Mapping of columns in different datastreams.");
add_option("channelmap file", channelmap_file_,
"File path for a channelmap mapping of columns in different datastreams.");
add_option("distribution mode", distribution_type_,
"Select distribution over the ports (keyword: ports) or over the slots (keyword: slots).");
}
void Distributor::Configure(const GlobalContext &context){
if(channelmap_file_() == "" and channelmap_().empty()){
throw ProcessingConfigureError(". No channelmap given in input. Make use of the option channelmap or channelmap file.");
}else if(channelmap_file_() != "" and not channelmap_().empty()){
throw ProcessingConfigureError(". Two channelmaps given in input. Use only one option channelmap or channelmap file.");
}
else if(channelmap_file_() != ""){
auto path = context.resolve_path(channelmap_file_());
YAML::Node config = YAML::LoadFile(path);
try{
channelmap_.from_yaml(config);
}catch (...){
throw ProcessingConfigureError(". The channelmap is not valid.");
}
}
if(distribution_type_() != "ports" and distribution_type_() != "slots"){
throw ProcessingStreamInfoError(". Only ports or slots are a valable distribution mode to fill in the distribution mode option.");
}
}
void Distributor::CreatePorts() {
input_port_ = create_input_port<TimeSeriesType<double>>(
TimeSeriesType<double>::Capabilities(ChannelRange(1, MAX_N_CHANNELS)),
PortInPolicy(SlotRange(1)));
if(distribution_type_()== "ports"){ // N port with 1 slot each (N = channelmap size)
for (auto &it : channelmap_()) {
data_ports_.push_back(create_output_port<TimeSeriesType<double>>(
it.first,
TimeSeriesType<double>::Parameters(),
PortOutPolicy(SlotRange(1), BUFFER_SIZE, WAIT_STRATEGY)));
}
} else if(distribution_type_() == "slots"){ // 1 port with N slots (N = channelmap size)
data_ports_.push_back(create_output_port<TimeSeriesType<double>>(
TimeSeriesType<double>::Parameters(),
PortOutPolicy(SlotRange(channelmap_().size()), BUFFER_SIZE, WAIT_STRATEGY)));
}
}
void Distributor::CompleteStreamInfo() {
auto incoming_batch_size = input_port_->prototype(0).nsamples();
LOG_IF(INFO, !input_port_->prototype(0).resizable()) << name() << ". Incoming batch size: " << incoming_batch_size << ".";
slot_ = 0;
int port = 0;
LOG(INFO)<< name() << ": Data is splitted in " << channelmap_().size() << " parallel packets.";
for (auto &it : channelmap_()) {
if(distribution_type_() == "ports"){ // N port with 1 slot each (N = channelmap size)
data_ports_[port]->streaminfo(0).set_parameters(
TimeSeriesType<double>::Parameters(
it.second.get_labels(),
incoming_batch_size,
input_port_->prototype(0).sample_rate(),
input_port_->prototype(0).resizable()));
data_ports_[port]->streaminfo(0).set_stream_parameters(input_port_->streaminfo(0));
port++;
} else if(distribution_type_() == "slots"){ // 1 port with N slots (N = channelmap size)
data_ports_[0]->streaminfo(slot_).set_parameters(
TimeSeriesType<double>::Parameters(
it.second.get_labels(),
incoming_batch_size,
input_port_->prototype(0).sample_rate(),
input_port_->prototype(0).resizable()));
data_ports_[0]->streaminfo(slot_).set_stream_parameters(
input_port_->streaminfo(0).stream_rate(), it.first);
slot_++;
}
}
}
void Distributor::Prepare(GlobalContext &context) {
// check channel map
for (auto const &it : channelmap_()) {
if (it.second.size() == 0) {
throw ProcessingPrepareError(
"Channel map entry " + it.first + " has zero channels.", name());
}
if(!it.second.is_subset(input_port_->prototype(0).labels())){
throw ProcessingPrepareError(
"Channel list " + it.first + ": " + it.second.to_string() + " is invalid",
name());
}
if (!it.second.is_unique()){
throw ProcessingPrepareError(
"Channel list " + it.first + ": " + it.second.to_string() + " cannot have duplicate channels",
name());
}
}
}
void Distributor::Process(ProcessingContext &context) {
TimeSeriesType<double>::Data *data_in = nullptr;
while (!context.terminated()) {
// retrieve new data packet
if (!input_port_->slot(0)->RetrieveData(data_in)) {
break;
}
for (auto &it : data_ports_) {
for (slot_ = 0; slot_ < it->number_of_slots(); slot_++) {
auto data_out=it->slot(slot_)->ClaimData(true);
for (auto ch: data_out->labels()) {
// publish data buckets
data_out->clone_column(ch, *data_in);
}
data_out->CloneTimestamps(*data_in);
data_out->set_sample_timestamps(
data_in->sample_timestamps());
it->slot(slot_)->PublishData();
}
}
// release input data bucket
input_port_->slot(0)->ReleaseData();
}
}
void Distributor::Postprocess(ProcessingContext &context) {
LOG(INFO) << name() << " Streamed "
<< data_ports_[0]->slot(0)->nitems_produced()<< " data packets.";
}
REGISTERPROCESSOR(Distributor)