Program Listing for File rebuffer.cpp¶
↰ Return to documentation for file (processors/rebuffer/rebuffer.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "rebuffer.hpp"
#include <algorithm>
#include "utilities/time.hpp"
Rebuffer::Rebuffer() : IProcessor() {
add_option("downsample factor", downsample_factor_,
"The factor for downsampling the signal.");
add_option("buffer size", buffer_size_,
"Output buffer size in samples or seconds.");
}
void Rebuffer::CreatePorts() {
data_in_port_ = create_input_port<MultiChannelType<double>>(
"data", MultiChannelType<double>::Capabilities(ChannelRange(1, 256)),
PortInPolicy(SlotRange(0, 256)));
data_out_port_ = create_output_port<MultiChannelType<double>>(
"data", MultiChannelType<double>::Capabilities(ChannelRange(1, 256)),
MultiChannelType<double>::Parameters(), PortOutPolicy(SlotRange(0, 256)));
}
void Rebuffer::Configure(const GlobalContext &context) {
LOG(INFO) << name() << ". Downsample factor set to " << downsample_factor_()
<< ".";
LOG(INFO) << name() << ". Buffer size set to " << buffer_size_.to_string()
<< ".";
}
void Rebuffer::CompleteStreamInfo() {
// check if we have the same number of input and output slots
if (data_in_port_->number_of_slots() != data_out_port_->number_of_slots()) {
throw ProcessingStreamInfoError(
"Number of outputs does not match the number of inputs.", name());
}
// compute number of output samples for each input stream
if (buffer_size_.unit() == units::precise::sample_units &&
buffer_size_() > 0.) {
sample_buffer_.assign(data_in_port_->number_of_slots(), buffer_size_());
} else if (buffer_size_.unit() == units::precise::second &&
buffer_size_() > 0.) {
sample_buffer_.assign(data_in_port_->number_of_slots(), 0);
for (int k = 0; k < data_in_port_->number_of_slots(); ++k) {
sample_buffer_[k] = time2samples<unsigned int>(
buffer_size_(),
data_in_port_->streaminfo(k).parameters().sample_rate /
downsample_factor_());
if (sample_buffer_[k] == 0) {
throw ProcessingStreamInfoError("Buffer duration is zero.", name());
}
}
} else {
for (int k = 0; k < data_in_port_->number_of_slots(); ++k) {
sample_buffer_[k] =
std::max(1u, static_cast<unsigned int>(std::floor(
data_in_port_->streaminfo(k).parameters().nsamples /
downsample_factor_())));
}
}
// finalize
for (int k = 0; k < data_in_port_->number_of_slots(); ++k) {
data_out_port_->streaminfo(k).set_parameters(
MultiChannelType<double>::Parameters(
data_in_port_->streaminfo(k).parameters().nchannels,
sample_buffer_[k],
data_in_port_->streaminfo(k).parameters().sample_rate /
downsample_factor_()));
data_out_port_->streaminfo(k).set_stream_rate(
data_in_port_->streaminfo(k).stream_rate() *
data_in_port_->streaminfo(k).parameters().nsamples / sample_buffer_[k]);
}
}
void Rebuffer::Process(ProcessingContext &context) {
auto nslots = data_in_port_->number_of_slots();
MultiChannelType<double>::Data *data_in = nullptr;
std::vector<MultiChannelType<double>::Data *> data_out;
data_out.assign(nslots, nullptr);
decltype(sample_buffer_) sample_out_counter = sample_buffer_;
decltype(sample_buffer_) offset;
offset.assign(nslots, 0);
unsigned int s = 0;
while (!context.terminated()) {
// go through all slots
for (int k = 0; k < nslots; ++k) {
// retrieve new data
if (!data_in_port_->slot(k)->RetrieveData(data_in)) {
break;
}
s = 0;
while (s < data_in->nsamples()) {
for (s = offset[k]; s < data_in->nsamples() &&
sample_out_counter[k] < sample_buffer_[k];
s += downsample_factor_()) {
for (unsigned int c = 0; c < data_in->nchannels(); ++c) {
data_out[k]->set_data_sample(sample_out_counter[k], c,
data_in->data_sample(s, c));
}
data_out[k]->set_sample_timestamp(sample_out_counter[k],
data_in->sample_timestamp(s));
sample_out_counter[k]++;
}
if (sample_out_counter[k] == sample_buffer_[k]) {
data_out_port_->slot(k)->PublishData();
data_out[k] = data_out_port_->slot(k)->ClaimData(false);
data_out[k]->CloneTimestamps(*data_in);
sample_out_counter[k] = 0;
}
if (s >= data_in->nsamples()) {
offset[k] = s - data_in->nsamples();
} else {
offset[k] = s;
}
}
data_in_port_->slot(k)->ReleaseData();
}
}
}
REGISTERPROCESSOR(Rebuffer)