.. _program_listing_file_processors_rebuffer_rebuffer.cpp: Program Listing for File rebuffer.cpp ===================================== |exhale_lsh| :ref:`Return to documentation for file ` (``processors/rebuffer/rebuffer.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include "rebuffer.hpp" #include #include "utilities/time.hpp" Rebuffer::Rebuffer() : IProcessor() { add_option("downsample factor", downsample_factor_, "The factor for downsampling the signal."); add_option("buffer size", buffer_size_, "Output buffer size in samples or seconds."); } void Rebuffer::CreatePorts() { data_in_port_ = create_input_port>( "data", MultiChannelType::Capabilities(ChannelRange(1, 256)), PortInPolicy(SlotRange(0, 256))); data_out_port_ = create_output_port>( "data", MultiChannelType::Capabilities(ChannelRange(1, 256)), MultiChannelType::Parameters(), PortOutPolicy(SlotRange(0, 256))); } void Rebuffer::Configure(const GlobalContext &context) { LOG(INFO) << name() << ". Downsample factor set to " << downsample_factor_() << "."; LOG(INFO) << name() << ". Buffer size set to " << buffer_size_.to_string() << "."; } void Rebuffer::CompleteStreamInfo() { // check if we have the same number of input and output slots if (data_in_port_->number_of_slots() != data_out_port_->number_of_slots()) { throw ProcessingStreamInfoError( "Number of outputs does not match the number of inputs.", name()); } // compute number of output samples for each input stream if (buffer_size_.unit() == units::precise::sample_units && buffer_size_() > 0.) { sample_buffer_.assign(data_in_port_->number_of_slots(), buffer_size_()); } else if (buffer_size_.unit() == units::precise::second && buffer_size_() > 0.) { sample_buffer_.assign(data_in_port_->number_of_slots(), 0); for (int k = 0; k < data_in_port_->number_of_slots(); ++k) { sample_buffer_[k] = time2samples( buffer_size_(), data_in_port_->streaminfo(k).parameters().sample_rate / downsample_factor_()); if (sample_buffer_[k] == 0) { throw ProcessingStreamInfoError("Buffer duration is zero.", name()); } } } else { for (int k = 0; k < data_in_port_->number_of_slots(); ++k) { sample_buffer_[k] = std::max(1u, static_cast(std::floor( data_in_port_->streaminfo(k).parameters().nsamples / downsample_factor_()))); } } // finalize for (int k = 0; k < data_in_port_->number_of_slots(); ++k) { data_out_port_->streaminfo(k).set_parameters( MultiChannelType::Parameters( data_in_port_->streaminfo(k).parameters().nchannels, sample_buffer_[k], data_in_port_->streaminfo(k).parameters().sample_rate / downsample_factor_())); data_out_port_->streaminfo(k).set_stream_rate( data_in_port_->streaminfo(k).stream_rate() * data_in_port_->streaminfo(k).parameters().nsamples / sample_buffer_[k]); } } void Rebuffer::Process(ProcessingContext &context) { auto nslots = data_in_port_->number_of_slots(); MultiChannelType::Data *data_in = nullptr; std::vector::Data *> data_out; data_out.assign(nslots, nullptr); decltype(sample_buffer_) sample_out_counter = sample_buffer_; decltype(sample_buffer_) offset; offset.assign(nslots, 0); unsigned int s = 0; while (!context.terminated()) { // go through all slots for (int k = 0; k < nslots; ++k) { // retrieve new data if (!data_in_port_->slot(k)->RetrieveData(data_in)) { break; } s = 0; while (s < data_in->nsamples()) { for (s = offset[k]; s < data_in->nsamples() && sample_out_counter[k] < sample_buffer_[k]; s += downsample_factor_()) { for (unsigned int c = 0; c < data_in->nchannels(); ++c) { data_out[k]->set_data_sample(sample_out_counter[k], c, data_in->data_sample(s, c)); } data_out[k]->set_sample_timestamp(sample_out_counter[k], data_in->sample_timestamp(s)); sample_out_counter[k]++; } if (sample_out_counter[k] == sample_buffer_[k]) { data_out_port_->slot(k)->PublishData(); data_out[k] = data_out_port_->slot(k)->ClaimData(false); data_out[k]->CloneTimestamps(*data_in); sample_out_counter[k] = 0; } if (s >= data_in->nsamples()) { offset[k] = s - data_in->nsamples(); } else { offset[k] = s; } } data_in_port_->slot(k)->ReleaseData(); } } } REGISTERPROCESSOR(Rebuffer)