Program Listing for File eventdelayed.cpp¶
↰ Return to documentation for file (processors/eventdelayed/eventdelayed.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "eventdelayed.hpp"
EventDelayed::EventDelayed() : delayed_range_(150, 200) {
add_option(DISABLED_S, default_disabled_,
"Enable the processing of incoming events.");
add_option(DELAYED_S, initial_delayed_event_,
"Enable the delay of the event for a time randomly chosen between the delay range");
add_option("delayed range", initial_delayed_range_,
"if delayed event is true, the delayed time will be pseudo-randomly "
"chosen in this range.");
// message feature
add_option("message/detection", msg_detection_,
"Message to send on the detection channel");
add_option("message/delayed", msg_delayed_,
"Message to send on the stimulation channel");
add_option("message/ontime", msg_ontime_,
"Message to send on ontime mode.");
// Lock-out time
add_option(STOP_ANALYSIS_TIME_S+"/starting time", when_stop_analysis_period_,
"when to start stopping detection after a stimulation.");
add_option(STOP_ANALYSIS_TIME_S+"/period", initial_stop_analysis_period_,
"Lock out time for detecting pattern after a stimulation");
add_option(STOP_DETECTION_TIME_S+"/period", initial_stop_detection_period_,
"Lock out time for sending new detection/stimulation after a stimulation.");
add_option(STOP_DETECTION_TIME_S+"/detection", start_after_detection_,
"Start stopping for detecting pattern after a detection");
add_option(STOP_DETECTION_TIME_S+"/stimulation", start_after_stimulation_,
"Start stopping for detecting pattern after a stimulation");
// saving feature
add_option("enable saving", save_events_,
"Enable saving of target events to disk.");
add_option("filename prefix", prefix_,
"if enable saving is true, the saving file is name 'prefix + event'");
}
void EventDelayed::Configure(const GlobalContext &context) {
if (initial_stop_detection_period_() == 0) {
LOG(INFO) << name() << ". No lockout period set.";
} else {
LOG(INFO) << name() << ". Max output frequency set to "
<< 1e3 / static_cast<double>(initial_stop_detection_period_()) << " Hz.";
}
if (initial_stop_analysis_period_() == 0) {
LOG(INFO) << name() << ". No analysis time off set after stimulation.";
} else {
LOG(INFO) << name() << ". Analysis time off after stimulation set to "
<< static_cast<double>(initial_stop_analysis_period_()) << " s.";
}
delayed_range_ = Range<long int>(initial_delayed_range_());
if (!initial_delayed_event_()) {
LOG(INFO) << name() << ". Event are sent on-time.";
} else {
LOG(INFO) << name() << ". Events are delayed of a range between"
<< delayed_range_.lower() << " and " << delayed_range_.upper() << " ms.";
}
if (delayed_range_.upper() - delayed_range_.lower() < initial_stop_detection_period_()) {
LOG(INFO) << name()
<< ". When the difference between maximal and minimal possible delay is below the lockout period,"
<< "some weird behavior of detection locked out or not locked out when it should, could occur.";
}
LOG_IF(WARNING, start_after_detection_() != start_after_stimulation_()) << name()
<< "Event trigger lockout time:\n Be careful if you switch on ontime mode "
<< "where stimulation and detection are in the same time, "
<< "your options set will be interpreted as do a stimulation lock-out time. \n"
<< "Turn off both detection and stimulation if you want to deactivate. ";
}
void EventDelayed::CreatePorts() {
data_in_port_ =
create_input_port<EventType>(EventType::Capabilities(),
PortInPolicy(SlotRange(1), false, 1));
output_port_ = create_output_port<EventType>(EventType::Capabilities(),
EventType::Parameters(DEFAULT_EVENT),
PortOutPolicy(SlotRange(1)));
// ----- Mode state --- //
disabled_ = create_follower_state(DISABLED_S, default_disabled_(),
Permission::WRITE);
delayed_event_ = create_follower_state(
DELAYED_S, initial_delayed_event_(), Permission::WRITE);
// ----- Lock-out state --- //
stop_detection_period_ = create_static_state(
STOP_DETECTION_TIME_S, initial_stop_detection_period_(), true, Permission::WRITE);
stop_analysis_period_ = create_static_state(
STOP_ANALYSIS_TIME_S, initial_stop_analysis_period_(), true, Permission::WRITE);
analysis_unlocked_ = create_broadcaster_state(
"analysis enabled", true, Permission::READ); // connected to the ripple detection processor to disable/enabled ripple processing
}
void EventDelayed::Preprocess(ProcessingContext &context) {
ontime_received_event_ = 0;
delayed_received_event_ = 0;
event_lockout_ = 0;
//initialize enough if the past to be sure the first stimulation won't be lockout
previous_TS_nostim_ =
Clock::now() -
std::chrono::milliseconds((long int) stop_detection_period_->get() + 10);
std::string path = context.resolve_path("run://", "run");
std::string filepath = path + name();
create_file(filepath, prefix_() + msg_delayed_());
create_file(filepath, prefix_() + msg_detection_());
create_file(filepath, prefix_() + msg_ontime_());
}
void EventDelayed::Process(ProcessingContext &context) {
EventType::Data *data_in = nullptr;
std::random_device rd;
std::mt19937 generator_(rd()); // Standard mersenne_twister_engine (Higher
// complexity / randomness)
std::uniform_int_distribution<> distrib(delayed_range_.lower(),
delayed_range_.upper());
while (!context.terminated()) {
// This part is about sending out delayed events
while (!delayed_event_queue_.empty() and delayed_event_queue_.top().ts < Clock::now()) {
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
Clock::now() - delayed_event_queue_.top().ts)
.count();
LOG(DEBUG) << name() << ". Time to sent a delayed event ("
<< delayed_event_queue_.top().data_in->event() << ") with " << millis
<< "ms late.";
send_event(delayed_event_queue_.top().data_in, msg_delayed_());
delayed_event_queue_.pop();
}
// This part is about triggering delayed lock-out
while(!lockout_queue_.empty() and lockout_queue_.top().ts < Clock::now()){
LOG(DEBUG) << name() << ". Start a lockout after stimulation for " + std::to_string((long int)stop_analysis_period_->get() ) + " ms.";
// stop detecting in the ripple detector
analysis_unlocked_->set(false);
// buzy sleep has no detections should be received and not stimulation should be sent during this time.
std::this_thread::sleep_for(std::chrono::milliseconds((long int)stop_analysis_period_->get() ));
analysis_unlocked_->set(true);
lockout_queue_.pop();
while (!delayed_event_queue_.empty() and delayed_event_queue_.top().ts < Clock::now()) { //Remove any stimulations which would have happened during the detection/stimulation lockout
LOG(DEBUG) << name() << "The stimulation of this " << delayed_event_queue_.top().data_in->event() << " has been locked-out due to the detection lockout after stimulation.";
delayed_event_queue_.pop();
}
}
if (!data_in_port_->slot(0)->RetrieveData(data_in)) {
break;
}
auto nread = data_in_port_->slot(0)->status_read();
if (nread == 0) {
data_in_port_->slot(0)->ReleaseData();
continue;
}
// If not stimulation disabled
if (!disabled_->get()) {
int wait_time = distrib(generator_);
// If stimulation is delayed
if (delayed_event_->get()) {
++delayed_received_event_;
LOG(INFO) << name() << ". Save an event (" << data_in->event() << ") to send later with " << wait_time
<< "ms delayed.";
auto delay =
data_in->source_timestamp() + std::chrono::milliseconds(wait_time);
if ((not start_after_stimulation_() or not to_lock_out_in_future(delay)) and ( not start_after_detection_() or not to_lock_out())) {
Delayed event(delay, data_in);
delayed_event_queue_.push(event);
send_event(data_in, msg_detection_());
for(auto time_to_start: when_stop_analysis_period_()){
Delayed event_lockout(delay+ std::chrono::milliseconds(time_to_start), data_in);
lockout_queue_.push(event_lockout);
}
} else {
LOG(DEBUG) << name() << data_in->event() << " has been locked-out in delayed mode";
++event_lockout_;
}
// If stimulation is sent at the same time as detection = ontime mode
} else {
++ontime_received_event_;
if (not(start_after_detection_() or start_after_stimulation_()) or not to_lock_out()) {
send_event(data_in, msg_ontime_());
for(auto time_to_start: when_stop_analysis_period_()){
Delayed event_lockout(data_in->source_timestamp() + std::chrono::milliseconds(time_to_start), data_in);
lockout_queue_.push(event_lockout);
}
} else {
LOG(DEBUG) << name() << data_in->event() << " has been locked-out in ontime mode";
++event_lockout_;
}
}
// Stimulation event is disable - detection is still sent
} else {
++ontime_received_event_;
if (not start_after_detection_() or not to_lock_out()) {
send_event(data_in, msg_detection_());
} else {
LOG(DEBUG) << name() << data_in->event() << " has been locked-out in disable mode";
++event_lockout_;
}
}
data_in_port_->slot(0)->ReleaseData();
}
}
void EventDelayed::send_event(EventType::Data *data_in, std::string type) {
LOG(INFO) << name() << ". Sent one event: " << type;
EventType::Data *data_out = output_port_->slot(0)->ClaimData(true);
data_out->set_hardware_timestamp(data_in->hardware_timestamp());
data_out->set_event(type);
data_out->set_source_timestamp();
output_port_->slot(0)->PublishData();
if (save_events_()) { // save stim events to disk
uint64_t serial_number = data_in->serial_number();
LOG(DEBUG) << prefix_()<< type;
streams_[prefix_() + type]->write(reinterpret_cast<const char *>(&serial_number),
sizeof(decltype(serial_number)));
}
}
void EventDelayed::Postprocess(ProcessingContext &context) {
auto msg = "Successfully executed conversion protocol: " +
std::to_string(ontime_received_event_) + "ontime and " +
std::to_string(delayed_received_event_) + " delayed with " +
std::to_string(event_lockout_) + " events locked out.";
ontime_received_event_ = 0;
delayed_received_event_ = 0;
event_lockout_ = 0;
}
bool EventDelayed::to_lock_out_in_future(TimePoint start_event) {
TimePoint last_future_event;
if (delayed_event_queue_.empty())
last_future_event = previous_TS_nostim_;
else
last_future_event = delayed_event_queue_.top().ts;
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(start_event - last_future_event).count();
if (millis <= stop_detection_period_->get()){
LOG(DEBUG) << name() << ". Start a stimulation lockout after stimulation for " + std::to_string(stop_detection_period_->get()) + " secs.";
return true;
}
previous_TS_nostim_ = last_future_event;
return false;
}
bool EventDelayed::to_lock_out() {
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - previous_TS_nostim_).count();
if (millis <= stop_detection_period_->get()){
LOG(DEBUG) << name() << ". Start a stimulation lockout after detection for " + std::to_string(stop_detection_period_->get()) + " secs.";
return true;
}
previous_TS_nostim_ = Clock::now();
return false;
}
REGISTERPROCESSOR(EventDelayed)