Program Listing for File eventfilter.cpp¶
↰ Return to documentation for file (processors/eventfilter/eventfilter.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "eventfilter.hpp"
#include <algorithm>
#include <chrono>
#include <limits>
#include <string>
#include <thread>
#include <vector>
void DetectionCriterionValue::from_yaml(const YAML::Node &node) {
auto v = node.as<std::string>();
if (v == "any") {
this->set_value(1);
} else if (v == "all") {
this->set_value(0);
} else {
Value<SlotType, false>::from_yaml(node);
}
}
EventFilter::EventFilter() : EventSync() {
add_option("block duration", blockout_time_,
"The duration over which events will be filtered out "
"following the arrival of a blocking event.");
add_option("block wait time", block_wait_time_,
"The waiting time after a target event has been received "
"to check if any blocking event also occurred.");
add_option("sync time", sync_time_,
"Time interval over which incoming events are considered to be "
"synchronuous.");
add_option("discard warnings", discard_warnings_,
"Do not emit warnings for discarded events.");
add_option(
"detection criterion", detections_to_criterion_,
"The criterion for triggering a detection, which is the number of "
"input slots with a target event. Acceptable values range from 1 to "
"the total number of input slots. A value of 0 or 'all' is equivalent "
"to the total number of input slots. A value of 'any' is equivalent to "
"1.");
}
void EventFilter::CreatePorts() {
data_in_port_ =
create_input_port<EventType>(EVENTDATA, EventType::Capabilities(),
PortInPolicy(SlotRange(1, 256), false, 0));
block_in_port_ =
create_input_port<EventType>("blocking events", EventType::Capabilities(),
PortInPolicy(SlotRange(1, 256), false, 0));
data_out_port_ = create_output_port<EventType>(
EVENTDATA, EventType::Capabilities(),
EventType::Parameters(target_event_().event()),
PortOutPolicy(SlotRange(1)));
}
void EventFilter::Prepare(GlobalContext &context) {
auto nslots = data_in_port_->number_of_slots();
// complete Configure
if (detections_to_criterion_() == 0) {
detections_to_criterion_ = nslots;
}
// check detections_to_criterion value
if (detections_to_criterion_() < 1 || detections_to_criterion_() > nslots) {
auto err_msg = std::string("Invalid number of detections to criterion.") +
"It must be a number between 1 and " +
std::to_string(nslots) + ".";
throw ProcessingPrepareError(err_msg, name());
}
LOG(INFO) << name() << ". Criterion for triggering an event is set to "
<< detections_to_criterion_() << " events.";
LOG(INFO) << name()
<< ". Criterion for triggering a blocking event is set to 1 event.";
}
void EventFilter::Preprocess(ProcessingContext &context) {
// init gate_close_time, but make sure the first event won't be excluded
// if no blocking event will be received
gate_close_time_ = Clock::now();
if (blockout_time_() > 0) {
std::this_thread::sleep_for(
std::chrono::milliseconds(static_cast<int>(blockout_time_())));
}
n_blocked_events_ = 0;
}
void EventFilter::Process(ProcessingContext &context) {
EventType::Data *data_out = nullptr;
bool alive = false;
bool detection_criterion = false;
bool event_received = false;
SlotType counter_to_detection = 0;
bool detection_block = false;
std::size_t slot_last = 0;
bool gate_just_closed = false;
TimePoint t_detection;
std::vector<TimePoint> arrival_times_per_slot_events(
data_in_port_->number_of_slots(), std::numeric_limits<TimePoint>::min());
std::vector<uint64_t> arrival_hwTS_per_slot_events(
data_in_port_->number_of_slots(), 0);
// not used but needs to be passed
std::vector<TimePoint> arrival_times_per_slot_blocking_events(
block_in_port_->number_of_slots(), std::numeric_limits<TimePoint>::min());
std::vector<uint64_t> arrival_hwTS_per_slot_blocking_events(
block_in_port_->number_of_slots(), 0);
// t >= (t_last - time_in_ms) for t in log_n_slots-> how many slots meet
// criterion? t_last - t <= time_in_ms
while (!context.terminated()) {
if (!detection_criterion) {
// read input port for triggering events
std::tie(alive, event_received, slot_last) = is_there_target(
data_in_port_, event_counter_, arrival_times_per_slot_events,
arrival_hwTS_per_slot_events);
if (!alive) {
break;
}
if (event_received) {
counter_to_detection = 0;
for (auto t : arrival_times_per_slot_events) {
if (time_between(arrival_times_per_slot_events[slot_last], t) <
sync_time_()) {
++counter_to_detection;
}
}
detection_criterion =
(counter_to_detection >= detections_to_criterion_());
LOG(DEBUG) << name() << ". Detection criterion met.";
}
// read input port for blocking events
std::tie(alive, detection_block, std::ignore) =
is_there_target(block_in_port_, blocking_events_counter_,
arrival_times_per_slot_blocking_events,
arrival_hwTS_per_slot_blocking_events);
if (!alive) {
break;
}
if (detection_block) {
gate_close_time_ = Clock::now();
detection_block = false;
}
}
if (detection_criterion) { // check again as flag might have just changed
t_detection = Clock::now();
// check if gate is closed
if (time_since(gate_close_time_) <= blockout_time_()) {
++n_blocked_events_;
detection_criterion = false;
LOG(UPDATE) << name() << ". Target event " << target_event_().event()
<< " was filtered out.";
} else {
// if open, before streaming the event on the output,
// check if blocking event is coming soon after the target event
// is received on the "events" port with this dedicated read loop
// read incoming blocking events for block_wait_time_ms_
while (time_since(t_detection) < block_wait_time_() &&
detection_criterion) {
std::tie(alive, gate_just_closed, std::ignore) =
is_there_target(block_in_port_, blocking_events_counter_,
arrival_times_per_slot_blocking_events,
arrival_hwTS_per_slot_blocking_events);
if (!alive) {
break;
} // exit inner while loop
if (gate_just_closed) {
++n_blocked_events_;
LOG(UPDATE)
<< name() << ". Target event " << target_event_().event()
<< " was filtered out (blocking event arrived after target).";
detection_criterion = false;
gate_close_time_ = Clock::now();
}
}
if (!alive) {
break;
} // break outer (context) while loop
if (!gate_just_closed) { // no post detection block
// finally send event
data_out = data_out_port_->slot(0)->ClaimData(false);
data_out->set_hardware_timestamp(
arrival_hwTS_per_slot_events[slot_last]);
data_out->set_source_timestamp();
data_out_port_->slot(0)->PublishData();
detection_criterion = false;
}
}
}
}
}
void EventFilter::Postprocess(ProcessingContext &context) {
log_and_reset_counters(data_in_port_->name(), event_counter_);
log_and_reset_counters(block_in_port_->name(), blocking_events_counter_);
LOG(INFO) << name() << ". Streamed "
<< data_out_port_->slot(0)->nitems_produced() << " target events.";
LOG(INFO) << name() << ". " << n_blocked_events_
<< " target events were blocked.";
n_blocked_events_ = 0;
}
std::tuple<bool, bool, std::size_t>
EventFilter::is_there_target(PortIn<EventType> *input_port,
EventCounter &event_counter,
std::vector<TimePoint> &arrival_times,
std::vector<uint64_t> &arrival_timestamps) {
std::vector<EventType::Data *> data_in;
std::size_t slot_index = std::numeric_limits<std::size_t>::max();
bool target_received = false;
for (decltype(input_port->number_of_slots()) s = 0;
s < input_port->number_of_slots(); ++s) {
// check if processor is still alive
if (!input_port->slot(s)->RetrieveDataAll(data_in)) {
return std::make_tuple(false, false, NULL_TIMESTAMP);
}
// check if there is data on any of the slot
auto nread = input_port->slot(s)->status_read();
if (nread == 0) {
input_port->slot(s)->ReleaseData();
continue;
}
if (nread > 1 && !discard_warnings_()) {
std::string events_list = data_in[1]->event();
for (auto el = data_in.begin() + 1; el != data_in.end() - 1; ++el) {
events_list += (", " + (*el)->event());
}
LOG(WARNING) << name() << ". " << nread - 1 << " events on port "
<< input_port->name() << "(" << events_list
<< ") were discarded.";
}
++event_counter.all_received;
// if there's data, check if it is a target event
if (*data_in.back() == target_event_()) {
LOG(DEBUG) << name() << ". Received target event "
<< target_event_().event() << " on port " << input_port->name()
<< " slot " << s;
++event_counter.target;
arrival_times[s] = Clock::now();
arrival_timestamps[s] = data_in.back()->hardware_timestamp();
target_received = true;
slot_index = s;
} else { // non-target event received
LOG(DEBUG) << name()
<< ". Received non-target event: " << data_in.back()->event()
<< " on port " << input_port->name() << " slot " << s;
++event_counter.non_target;
}
input_port->slot(s)->ReleaseData();
}
// all slots read, no data on any of them
return std::make_tuple(true, target_received, slot_index);
}
REGISTERPROCESSOR(EventFilter)