Program Listing for File fileserializer.cpp¶
↰ Return to documentation for file (processors/fileserializer/fileserializer.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include "fileserializer.hpp"
#include <fstream>
#include <string>
#include <utility>
#include "idata.hpp"
#include "utilities/time.hpp"
FileSerializer::FileSerializer() : IProcessor() {
add_option("path", path_, "Path (server-side) where to save data.");
add_option("encoding", encoding_, "Binary or YAML encoding.");
add_option("format", format_,
"Data format (none, full, headeronly, streamheader, compact).");
add_option("overwrite", overwrite_, "Overwrite existing files.");
add_option(
"throttle/enabled", throttle_,
"Progressively drop incoming data packets if saving cannot keep up.");
add_option("throttle/threshold", throttle_threshold_,
"Buffer fill level (fraction between 0 and 1) at which throttling "
"kicks in.");
add_option("throttle/smooth", throttle_smooth_,
"Smoothly changes throttle level as threshold is reached "
"(value between 0 and 1).");
add_option("preamble", preamble_, "Add YAML preamble to file.");
}
void FileSerializer::CreatePorts() {
data_port_ =
create_input_port<AnyType>("data", AnyType::Capabilities(),
PortInPolicy(SlotRange(1, 256), false, 0));
}
void FileSerializer::Configure(const GlobalContext &context) {
LOG(INFO) << "format: " << format_.to_yaml();
LOG(INFO) << "encoding: " << encoding_.to_yaml();
LOG(INFO) << "throttle enabled: " << throttle_();
LOG(INFO) << "throttle threshold: " << throttle_threshold_();
LOG(INFO) << "throttle smooth: " << throttle_smooth_();
}
void FileSerializer::Preprocess(ProcessingContext &context) {
// create serializer object
serializer_.reset(Serialization::serializer(encoding_(), format_()));
// initialization
packetid_.assign(data_port_->number_of_slots(), 0);
upstream_buffer_size_.assign(data_port_->number_of_slots(), 0);
nskipped_.assign(data_port_->number_of_slots(), 0);
throttle_level_ = 0;
// create output file streams
streams_.clear();
std::string path = context.resolve_path(path_(), "run");
std::string address;
std::string filename;
std::unique_ptr<std::ostream> stream;
for (int k = 0; k < data_port_->number_of_slots(); k++) {
address = data_port_->slot(k)->upstream_address().string();
filename = path + "/" + name() + "." + std::to_string(k) + "_" + address +
"." + serializer_->extension();
// if file exists
if (!overwrite_() && path_exists(filename)) {
throw ProcessingError("Output file " + filename + " exists.", name());
}
// try to open file
stream = std::unique_ptr<std::ostream>(new std::ofstream(
filename, std::ofstream::out | std::ofstream::binary));
if (!stream->good()) {
throw ProcessingError("Error opening output file " + filename + ".",
name());
}
if (preamble_()) {
create_preamble(*stream.get(), k);
}
streams_.push_back(std::move(stream));
LOG(DEBUG) << "Successfully opened output file for stream "
<< std::to_string(k) << " (" << address << ").";
upstream_buffer_size_[k] = static_cast<unsigned int>(
data_port_->slot(k)->upstream_policy().buffer_size());
}
}
void FileSerializer::create_preamble(std::ostream &out, int slot) {
YAML::Node node;
node["creator"] = name();
node["date"] = time_to_string(std::time(nullptr));
node["version"] = static_cast<int>(Serialization::VERSION);
node["interleaved"] = false;
node["format"] = format_.to_yaml();
node["encoding"] = encoding_.to_yaml();
node["stream"] = slot;
node["data"] =
serializer_->DataDescription(data_port_->slot(slot)->GetDataPrototype());
YAML::Emitter emit(out);
emit << YAML::BeginDoc;
emit << node;
emit << YAML::EndDoc;
}
void FileSerializer::Process(ProcessingContext &context) {
std::vector<AnyType::Data *> data;
int nslots = data_port_->number_of_slots();
uint64_t remainder;
uint64_t nread;
while (!context.terminated()) {
for (int k = 0; k < nslots; ++k) {
if (!data_port_->slot(k)->RetrieveDataAll(data)) {
break;
}
nread = data_port_->slot(k)->status_read();
if (nread == 0) {
data_port_->slot(k)->ReleaseData();
continue;
}
if (!throttle_()) {
LOG_IF(WARNING, (nread > 0.5 * upstream_buffer_size_[k]))
<< name() << ": buffer is more than half full (stream " << k << ")";
for (auto &it : data) {
serializer_->Serialize(*(streams_[k]), it, k, packetid_[k]++,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot());
}
} else {
// update throttle level
throttle_level_ *= (1 - throttle_smooth_());
if (nread > throttle_threshold_() * upstream_buffer_size_[k]) {
throttle_level_ += throttle_smooth_();
}
remainder = std::floor(1.0 / (0.5 - std::abs(throttle_level_ - 0.5)));
if (throttle_level_ == 0 ||
(throttle_level_ < 0.5 && remainder > nread)) {
// keep all
for (auto &it : data) {
serializer_->Serialize(*(streams_[k]), it, k, packetid_[k]++,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot());
}
} else if (throttle_level_ < 0.5) {
// skip small fraction
for (uint64_t n = 0; n < nread; ++n) {
if (n % remainder == 0) {
packetid_[k]++;
nskipped_[k]++;
continue;
}
serializer_->Serialize(*(streams_[k]), data[n], k, packetid_[k]++,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot());
}
} else if (throttle_level_ == 1 ||
(throttle_level_ >= 0.5 && remainder > nread)) {
// skip all
packetid_[k] += nread;
nskipped_[k] += nread;
} else {
// eep small fraction
for (uint64_t n = 0; n < nread; ++n) {
if (n % remainder != 0) {
packetid_[k]++;
nskipped_[k]++;
continue;
}
serializer_->Serialize(*(streams_[k]), data[n], k, packetid_[k]++,
data_port_->slot(k)->upstream_address().processor(),
data_port_->slot(k)->upstream_address().port(),
data_port_->slot(k)->upstream_address().slot());
}
}
}
data_port_->slot(k)->ReleaseData();
}
}
}
void FileSerializer::Postprocess(ProcessingContext &context) {
streams_.clear(); // forces destruction and closing of resources
serializer_.reset();
for (SlotType k = 0; k < data_port_->number_of_slots(); k++) {
if (nskipped_[k] != 0) {
LOG(UPDATE) << name() << ": stream " << k << ": received " << packetid_[k]
<< " data packets, of which " << nskipped_[k]
<< " were not saved.";
} else {
LOG(UPDATE) << name() << ": stream " << k << ": received and saved "
<< packetid_[k] << " data packets.";
}
}
}
REGISTERPROCESSOR(FileSerializer)