Program Listing for File burstdetector.cpp¶
↰ Return to documentation for file (processors/burstdetector/burstdetector.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017, Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "burstdetector.hpp"
BurstDetector::BurstDetector() : IProcessor() {
add_option(THRESHOLD_DEV, initial_threshold_dev_,
"Multiplier (in number of signal standard deviations) to "
"compute the initial threshold.");
add_option(SMOOTH_TIME, initial_smooth_time_,
"Integration time for estimating signal statistics.");
add_option(DETECTION_LOCKOUT_TIME, initial_detection_lockout_time_,
"Lockout time (in seconds) to avoid over-stimulation.");
add_option(STREAM_EVENTS, default_stream_events_,
"Enable streaming of burst events.");
add_option(STREAM_STATISTICS, initial_stats_out_,
"Enable streaming of statistics.");
add_option("statistics buffer size", stats_buffer_size_,
"Size (in seconds) for statistics output buffers.");
}
void BurstDetector::CreatePorts() {
data_in_port_ = create_input_port<MUAType>("mua", MUAType::Capabilities(),
PortInPolicy(SlotRange(1)));
data_out_port_ = create_output_port<EventType>(
EVENTDATA, EventType::Capabilities(), EventType::Parameters("burst"),
PortOutPolicy(SlotRange(1)));
stats_out_port_ = create_output_port<MultiChannelType<double>>(
"statistics",
MultiChannelType<double>::Capabilities(ChannelRange(N_STATS_OUT)),
MultiChannelType<double>::Parameters(), PortOutPolicy(SlotRange(1)));
threshold_ = create_broadcaster_state("threshold", 0.0, Permission::READ);
signal_mean_ = create_broadcaster_state("mean", 0.0, Permission::READ);
signal_dev_ = create_broadcaster_state("deviation", 0.0, Permission::READ);
threshold_dev_ = create_static_state(THRESHOLD_DEV, initial_threshold_dev_(),
true, Permission::WRITE);
detection_lockout_time_ = create_static_state(
DETECTION_LOCKOUT_TIME, initial_detection_lockout_time_(), true,
Permission::WRITE);
stream_events_ = create_static_state(STREAM_EVENTS, default_stream_events_(),
true, Permission::WRITE);
smooth_time_ = create_static_state(SMOOTH_TIME, initial_smooth_time_(), true,
Permission::WRITE);
stats_out_ = create_static_state(STREAM_STATISTICS, initial_stats_out_(),
true, Permission::WRITE);
burst_ = create_broadcaster_state("burst", false, Permission::READ);
bin_size_mua_ = create_follower_state("bin size", 1.0, Permission::READ);
}
void BurstDetector::CompleteStreamInfo() {
stats_nsamples_ = stats_buffer_size_() * 1e3 /
data_in_port_->streaminfo(0).parameters().bin_size;
if (stats_nsamples_ == 0) {
throw ProcessingCreatePortsError(
"Stats buffersize is smaller than MUA bin size.", name());
}
stats_out_port_->streaminfo(0).set_stream_rate(
data_in_port_->streaminfo(0).stream_rate());
stats_out_port_->streaminfo(0).set_parameters(
MultiChannelType<double>::Parameters(
N_STATS_OUT, stats_nsamples_,
1. / data_in_port_->streaminfo(0).parameters().bin_size * 1e3));
data_out_port_->streaminfo(0).set_stream_rate(IRREGULARSTREAM);
}
void BurstDetector::Preprocess(ProcessingContext &context) {
signal_mean_->set(0);
signal_dev_->set(0);
threshold_->set(0);
block_ = 0;
sample_rate_ =
1. / data_in_port_->slot(0)->streaminfo().parameters().bin_size * 1e3;
LOG(UPDATE) << name() << ". Incoming Sample rate: " << sample_rate_;
burn_in_ = initial_smooth_time_() * sample_rate_;
if (burn_in_ == 0) {
burn_in_ = 1;
LOG(UPDATE) << name() << ". " << SMOOTH_TIME
<< " too low. Burn-in set to 1 sample.";
}
double alpha = 1.0 / burn_in_;
running_statistics_.reset(
new dsp::algorithms::RunningMeanMAD(alpha, burn_in_, false));
threshold_detector_.reset(new dsp::algorithms::ThresholdCrosser(0));
}
void BurstDetector::Process(ProcessingContext &context) {
typename MUAType::Data *data_in = nullptr;
typename EventType::Data *event_out = nullptr;
typename MultiChannelType<double>::Data *stats_out = nullptr;
double value, test_value;
auto stats_nsamples_counter = stats_nsamples_;
auto burnin_update_sent = false;
// burn-in period
while (running_statistics_->is_burning_in() && !context.terminated()) {
if (!data_in_port_->slot(0)->RetrieveData(data_in)) {
break;
}
if (!burnin_update_sent) {
LOG(UPDATE) << name() << ": burn-in period starting ("
<< initial_smooth_time_() << " seconds)";
burnin_update_sent = true;
}
running_statistics_->add_sample(data_in->mua());
data_in_port_->slot(0)->ReleaseData();
}
if (!running_statistics_->is_burning_in()) {
LOG(UPDATE) << name() << ": end of burn-in period";
LOG(UPDATE) << name()
<< ": statistics: center = " << running_statistics_->center()
<< ", dispersion = " << running_statistics_->dispersion();
LOG(UPDATE) << name()
<< ": burst detection starts now with initial threshold of "
<< (threshold_dev_->get() * running_statistics_->dispersion());
}
while (!context.terminated()) {
// retrieve new data
if (!data_in_port_->slot(0)->RetrieveData(data_in)) {
break;
}
// update threshold and alpha only once for an incoming data bucket
threshold_->set(threshold_dev_->get() * running_statistics_->dispersion());
threshold_detector_->set_threshold(threshold_->get());
running_statistics_->set_alpha(1.0 / (smooth_time_->get() * sample_rate_));
value = data_in->mua();
test_value = std::abs(value - running_statistics_->center());
if (stats_out_->get()) {
if (stats_nsamples_counter == stats_nsamples_) {
stats_out_port_->slot(0)->PublishData();
stats_out = stats_out_port_->slot(0)->ClaimData(false);
stats_out->set_source_timestamp();
stats_out->set_hardware_timestamp(data_in->hardware_timestamp());
stats_nsamples_counter = 0;
}
stats_out->set_data_sample(stats_nsamples_counter, 0, test_value);
stats_out->set_data_sample(stats_nsamples_counter, 1,
threshold_detector_->threshold());
stats_out->set_sample_timestamp(stats_nsamples_counter,
data_in->hardware_timestamp());
++stats_nsamples_counter;
}
if (block_ > 0) {
--block_;
continue;
} else if (burst_->get()) {
burst_->set(false);
}
if (threshold_detector_->has_crossed_up(test_value)) {
block_ = static_cast<decltype(block_)>(detection_lockout_time_->get() *
sample_rate_ / 1e3);
if (stream_events_->get()) {
event_out = data_out_port_->slot(0)->ClaimData(false);
event_out->set_source_timestamp(data_in->source_timestamp());
event_out->set_hardware_timestamp(data_in->hardware_timestamp());
data_out_port_->slot(0)->PublishData();
}
}
running_statistics_->add_sample(value);
data_in_port_->slot(0)->ReleaseData();
signal_mean_->set(running_statistics_->center());
signal_dev_->set(running_statistics_->dispersion());
}
}
void BurstDetector::Postprocess(ProcessingContext &context) {
LOG(INFO) << name() << ". Streamed "
<< data_out_port_->slot(0)->nitems_produced() << " burst events.";
}
REGISTERPROCESSOR(BurstDetector)