Program Listing for File distributor.cpp¶
↰ Return to documentation for file (processors/distributor/distributor.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "distributor.hpp"
#include "utilities/general.hpp"
Distributor::Distributor() : IProcessor(PRIORITY_MEDIUM) {
add_option("channelmap", channelmap_,
"Mapping of channels to processor output ports.", true);
}
void Distributor::CreatePorts() {
input_port_ = create_input_port<MultiChannelType<double>>(
MultiChannelType<double>::Capabilities(ChannelRange(1, MAX_N_CHANNELS)),
PortInPolicy(SlotRange(1)));
for (auto &it : channelmap_()) {
data_ports_[it.first] = create_output_port<MultiChannelType<double>>(
it.first,
MultiChannelType<double>::Capabilities(ChannelRange(it.second.size())),
MultiChannelType<double>::Parameters(),
PortOutPolicy(SlotRange(1), BUFFER_SIZE, WAIT_STRATEGY));
}
}
void Distributor::CompleteStreamInfo() {
incoming_batch_size_ = input_port_->streaminfo(0).parameters().nsamples;
max_n_channels_ = input_port_->streaminfo(0).parameters().nchannels;
LOG(INFO) << name() << ". Incoming batch size: " << incoming_batch_size_
<< ".";
for (auto &it : data_ports_) {
it.second->streaminfo(0).set_parameters(
MultiChannelType<double>::Parameters(
channelmap_().at(it.first).size(), incoming_batch_size_,
input_port_->streaminfo(0)
.parameters()
.sample_rate));
it.second->streaminfo(0).set_stream_rate(
input_port_->streaminfo(0).stream_rate());
}
}
void Distributor::Prepare(GlobalContext &context) {
// check channel map
for (auto const &it : channelmap_()) {
if (it.second.size() == 0) {
throw ProcessingPrepareError(
"Channel map entry " + it.first + " has zero channels.", name());
}
for (auto const &ch : it.second) {
if (ch >= max_n_channels_) {
throw ProcessingPrepareError(
"Channel " + std::to_string(static_cast<int>(ch)) + " is invalid",
name());
}
}
}
}
void Distributor::Process(ProcessingContext &context) {
MultiChannelType<double>::Data *data_in = nullptr;
int port_index;
unsigned int ch, s;
std::vector<MultiChannelType<double>::Data *> data_out_vector(
data_ports_.size());
while (!context.terminated()) {
// retrieve new data packet
if (!input_port_->slot(0)->RetrieveData(data_in)) {
break;
}
// claim output data buckets
// and copy timestamps from upstream
port_index = 0;
for (auto const &it : data_ports_) {
data_out_vector[port_index] = it.second->slot(0)->ClaimData(false);
data_out_vector[port_index]->set_hardware_timestamp(
data_in->hardware_timestamp());
data_out_vector[port_index]->set_source_timestamp(
data_in->source_timestamp());
port_index++;
}
// for each entry in the channel map
port_index = 0;
for (auto const &it_chmap : channelmap_()) {
data_out_vector[port_index]->set_sample_timestamps(
data_in->sample_timestamps());
for (ch = 0; ch < it_chmap.second.size(); ch++) {
for (s = 0; s < incoming_batch_size_; s++) {
data_out_vector[port_index]->set_data_sample(
s, ch, data_in->data_sample(s, it_chmap.second[ch]));
}
}
port_index++;
}
// publish data buckets
for (auto &it : data_ports_) {
it.second->slot(0)->PublishData();
}
// release input data bucket
input_port_->slot(0)->ReleaseData();
}
}
void Distributor::Postprocess(ProcessingContext &context) {
SlotType s;
for (auto &it : data_ports_) {
for (s = 0; s < it.second->number_of_slots(); ++s) {
LOG(INFO) << name() << ". Port " << it.first << ". Slot " << s
<< ". Streamed " << it.second->slot(s)->nitems_produced()
<< " data packets. ";
}
}
}
REGISTERPROCESSOR(Distributor)