From 9b7bccad1c8bbd9545634175f1373298804865c8 Mon Sep 17 00:00:00 2001 From: "Brian C. Van Essen" Date: Mon, 3 Apr 2023 09:44:22 -0700 Subject: [PATCH] Starting to split off the state of the data stream into its own class hierarchy. --- docs/data_ingestion.rst | 71 ++++ include/lbann/data_readers/data_reader.hpp | 334 +-------------- include/lbann/data_streams/data_stream.hpp | 412 +++++++++++++++++++ scripts/build_lbann.sh | 18 +- src/CMakeLists.txt | 1 + src/data_readers/data_reader.cpp | 348 +--------------- src/data_streams/CMakeLists.txt | 32 ++ src/data_streams/data_stream.cpp | 456 +++++++++++++++++++++ 8 files changed, 986 insertions(+), 686 deletions(-) create mode 100644 include/lbann/data_streams/data_stream.hpp create mode 100644 src/data_streams/CMakeLists.txt create mode 100644 src/data_streams/data_stream.cpp diff --git a/docs/data_ingestion.rst b/docs/data_ingestion.rst index 1f29fbee33c..d353da11277 100644 --- a/docs/data_ingestion.rst +++ b/docs/data_ingestion.rst @@ -37,3 +37,74 @@ Two of the new format data readers are the ``python``, ``SMILES``, and Several of these readers (SMILES and :ref:`HDF5`) support the use of :ref:`sample lists`. + +"Really New" Data Subsystem +--------------------------- + +During execution LBANN will ingest one or more streams of data. There +will be unique streams of data for each execution mode: + - training + - validation + - tournament + - testing + - inference + +Note that execution modes should become more flexible and should be +able to be arbitrarily named. + +The data stream object is responsible for keeping track of the "count" +/ state of that data stream for that execution context. For bounded / +batched data streams, this would be the current position within the +stream and the total number of passes over the stream. (index and +epoch) + +For infinite streams the object will just maintain the index / +position within the stream. + +In both cases it is necessary for the object to track the "step" size +(i.e. mini-batch size). Additionally, because the data stream will be +accessed in parallel, it is necessary to track the position of each +rank within the stream in terms of offset. + +.. + Data source class file: The data source class tracks the statefule + aspects of one logical stream of data. + Data sources are either bounded or infinite + data sources. The class is responsible for keeping track of state + with respect to + + +Sample list: + +Track how to retrive a data set from the outside world. This +typically is a set of file locations for each sample as well as a +count of how many samples are in the set. + +Data coordinator: + +Responsible for managing one or more data streams for each execution +context. It is + + +data reader / loader: + +Function to ingest bits from outside and place them into an in-memory +object that is managed by the data coordinator. + +Data store: +in-memory data repository for holding samples that have been read in + +io_data_buffer: +Holds sample being fetched or the future of it. + +data packer: +copies data fields from conduit nodes and maps them to Hydrogen +matrices. Specific to a data set + +Data Set: + +Composed of: + - data reader + - data stream + - sample list + - data packer diff --git a/include/lbann/data_readers/data_reader.hpp b/include/lbann/data_readers/data_reader.hpp index 387064516d1..9e4690035f8 100644 --- a/include/lbann/data_readers/data_reader.hpp +++ b/include/lbann/data_readers/data_reader.hpp @@ -80,35 +80,15 @@ class generic_data_reader /** * ctor */ - generic_data_reader(bool shuffle = true) + generic_data_reader() : m_verbose(global_argument_parser().get(LBANN_OPTION_VERBOSE)), m_data_store(nullptr), m_comm(nullptr), - m_mini_batch_size(0), - m_current_pos(0), - m_stride_to_next_mini_batch(0), - m_base_offset(0), - m_model_offset(0), - m_sample_stride(1), - m_iteration_stride(1), - m_last_mini_batch_size(0), - m_stride_to_last_mini_batch(0), - m_reset_mini_batch_index(0), - m_loaded_mini_batch_idx(0), - m_current_mini_batch_idx(0), - m_num_iterations_per_epoch(0), - m_global_mini_batch_size(0), - m_global_last_mini_batch_size(0), - m_world_master_mini_batch_adjustment(0), - m_num_parallel_readers(0), m_max_files_to_load(0), m_file_dir(""), m_data_sample_list(""), m_data_fn(""), m_label_fn(""), - m_shuffle(shuffle), - m_absolute_sample_count(0), - m_use_percent(1.0), m_gan_labelling(false), // default, not GAN m_gan_label_value( 0), // If GAN, default for fake label, discriminator model @@ -214,74 +194,6 @@ class generic_data_reader */ std::string get_label_filename() const; - /** - * If set to false, indices (data samples) are not shuffled - * default (in ctor) is true. - */ - void set_shuffle(bool b) { m_shuffle = b; } - - /** - * Returns true if data samples are shuffled. - */ - bool is_shuffled() const { return m_shuffle; } - - /** - * Set shuffled indices; primary use is for testing - * and reproducibility - */ - void set_shuffled_indices(const std::vector& indices) - { - m_shuffled_indices = indices; - } - - /** - * Returns the shuffled indices; primary use is for testing. - */ - const std::vector& get_shuffled_indices() const - { - return m_shuffled_indices; - } - - /** - * Read the first 'n' samples. If nonzero, this over-rides - * set_absolute_sample_count, set_use_percent. The intent - * is to use this for testing. A problem with set_absolute_sample_count - * and set_use_percent is that the entire data set is read in, then - * a subset is selected - */ - void set_first_n(int n); - - /** - * Sets the absolute number of data samples that will be used for training or - * testing. - */ - void set_absolute_sample_count(size_t s); - - /** - * Set the percentage of the data set to use for training and validation or - * testing. - * @param s The percentage used, in the range [0, 1]. - */ - void set_use_percent(double s); - - /** - * Sets the percentage of the dataset to be used for validation. - * @param m The execution mode. - * @param s The percentage used, in the range [0, 1]. - */ - virtual void set_execution_mode_split_percent(execution_mode m, double s); - - /** - * Set an idenifier for the dataset. - * The role should be one of "train", "test", or "validate". - */ - virtual void set_role(std::string role); - - /** - * Get the role for this dataset. - */ - std::string get_role() const { return m_role; } - /** * Load the dataset. * Each data reader implementation should implement this to initialize its @@ -291,13 +203,9 @@ class generic_data_reader virtual void load() = 0; /** - * Prepare to start processing an epoch of data. - * If shuffle is true, then shuffle the indices of the data set - * If the base offset is not specified set it to 0 - * If the stride is not specified set it to batch size + * Give the data reader an observing pointer to the I/O thread pool. */ - virtual void setup(int num_io_threads, - observer_ptr io_thread_pool); + virtual void setup(observer_ptr io_thread_pool); /** Return this data_reader's type */ virtual std::string get_type() const = 0; @@ -354,13 +262,6 @@ class generic_data_reader void start_data_store_mini_batch_exchange(); void finish_data_store_mini_batch_exchange(); - /** - * During the network's update phase, the data reader will - * advanced the current position pointer. If the pointer wraps - * around, then reshuffle the data indicies. - */ - virtual bool update(bool is_active_reader); - /** * This is called at the end of update; it permits data readers to * perform actions that are specific to their data sets, for example, @@ -396,183 +297,6 @@ class generic_data_reader return {}; } - /// True if the data reader's current position is valid. - virtual bool position_valid() const - { - return (m_current_pos < get_num_data()); - } - /// True if the data reader's current position is not valid but within # ranks - /// per model of the end of the data set (e.g. it is a rank with no valid data - /// on the last iteration) - virtual bool position_is_overrun() const - { - int end_pos = (int)m_shuffled_indices.size(); - return (m_current_pos >= end_pos && - (m_current_pos - end_pos) < m_comm->get_procs_per_trainer()); - } - /// True if the data reader is at the start of an epoch. - bool at_new_epoch() const - { - /// Note that data readers can start at a non-zero index if there - /// are parallel data readers in a model - return ((m_loaded_mini_batch_idx == m_reset_mini_batch_index) && - (m_current_mini_batch_idx == 0)); - } - /// Set the mini batch size - void set_mini_batch_size(const int s); - /// Get the mini batch size - int get_mini_batch_size() const { return m_mini_batch_size; } - /// Get the loaded mini-batch size - int get_loaded_mini_batch_size() const; - /// Get the current mini-batch size. - int get_current_mini_batch_size() const; - /// Get the current global mini-batch size. - int get_current_global_mini_batch_size() const; - /// Get the current mini-batch size. - int get_current_world_master_mini_batch_adjustment(int model_rank) const; - /// Return the full mini_batch_size. - int get_mini_batch_max() const { return m_mini_batch_size; } - /// Set the mini batch size across all models (global) - void set_global_mini_batch_size(const int s) { m_global_mini_batch_size = s; } - /// Return the mini_batch_size across all models (global) - int get_global_mini_batch_size() const { return m_global_mini_batch_size; } - /// Set the mini batch stride - void set_stride_to_next_mini_batch(const int s) - { - m_stride_to_next_mini_batch = s; - } - /// Return the mini batch stride. - int get_stride_to_next_mini_batch() const - { - return m_stride_to_next_mini_batch; - } - /// Set the sample stride - void set_sample_stride(const int s) { m_sample_stride = s; } - /// Return the sample stride. - int get_sample_stride() const { return m_sample_stride; } - /// Set the iteration stride - void set_iteration_stride(const int s) { m_iteration_stride = s; } - /// Return the iteration stride. - int get_iteration_stride() const { return m_iteration_stride; } - /// Return the base offset. - virtual void set_base_offset(const int s) { m_base_offset = s; } - /// Return the base offset. - int get_base_offset() const { return m_base_offset; } - /// Set the model offset - void set_model_offset(const int s) { m_model_offset = s; } - /// Return the model offset. - int get_model_offset() const { return m_model_offset; } - /// Set the last mini batch size - void set_last_mini_batch_size(const int s) { m_last_mini_batch_size = s; } - /// Return the last mini batch size - int get_last_mini_batch_size() const { return m_last_mini_batch_size; } - /// Set the last mini batch size across all models (global) - void set_global_last_mini_batch_size(const int s) - { - m_global_last_mini_batch_size = s; - } - /// Return the last mini batch size across all models (global) - int get_global_last_mini_batch_size() const - { - return m_global_last_mini_batch_size; - } - /// Set the world master mini batch adjustment (global) - void set_world_master_mini_batch_adjustment(const int s) - { - m_world_master_mini_batch_adjustment = s; - } - /// Return the world master mini batch adjustment (global) - int get_world_master_mini_batch_adjustment() const - { - return m_world_master_mini_batch_adjustment; - } - /// Set the last mini batch stride - void set_stride_to_last_mini_batch(const int s) - { - m_stride_to_last_mini_batch = s; - } - /// Return the last mini batch stride - int get_stride_to_last_mini_batch() const - { - return m_stride_to_last_mini_batch; - } - /// Set the number of parallel readers per model - void set_num_parallel_readers(const int s) { m_num_parallel_readers = s; } - /// Return the number of parallel readers per model - int get_num_parallel_readers() const { return m_num_parallel_readers; } - /// Set the starting mini-batch index for the epoch - virtual void set_reset_mini_batch_index(const int s) - { - m_reset_mini_batch_index = s; - } - /// Return the starting mini-batch index for the epoch - int get_reset_mini_batch_index() const { return m_reset_mini_batch_index; } - /// Return the current mini-batch index for the epoch - int get_loaded_mini_batch_index() const { return m_loaded_mini_batch_idx; } - /// Return the current mini-batch index for the epoch - int get_current_mini_batch_index() const { return m_current_mini_batch_idx; } - /// Set the current position based on the base and model offsets - void set_initial_position() - { - m_current_pos = m_base_offset + m_model_offset; - m_loaded_mini_batch_idx = m_reset_mini_batch_index; - m_current_mini_batch_idx = 0; - } - /// Get the current position in the data reader. - int get_position() const { return m_current_pos; } - /// Get the next position in the data reader. - int get_next_position() const; - /// Get a pointer to the start of the shuffled indices. - int* get_indices() { return &m_shuffled_indices[0]; } - /// Get the number of samples in this dataset. - virtual int get_num_data() const { return (int)m_shuffled_indices.size(); } - /// Get the number of unused samples in this dataset. - int get_num_unused_data(execution_mode m) const; - - /// Get a pointer to the start of the unused sample indices. - int* get_unused_data(execution_mode m); - - const std::vector& get_unused_indices(execution_mode m); - - /// Set the number of iterations in each epoch. - void set_num_iterations_per_epoch(int num_iterations_per_epoch) - { - m_num_iterations_per_epoch = - num_iterations_per_epoch; /// @todo BVE FIXME merge this with alternate - /// approach - } - /// Get the number of iterations in each epoch. - int get_num_iterations_per_epoch() const - { - return m_num_iterations_per_epoch; /// @todo BVE FIXME merge this with - /// alternate approach - } - - /// Return the index of the current iteration step in the epoch (also the - /// mini-batch index) - int get_current_step_in_epoch() const { return m_current_mini_batch_idx; } - - /** - * Optionally resizes the shuffled indices based on the data reader - * prototext settings: absolute_sample_count, percent_of_data_to_use. - * (dah - this was formerly part of select_subset_of_data) - */ - void resize_shuffled_indices(); - - /** - * Select the appropriate subset of data for the additional - * execution modes such as validation or tournament set based on - * the data reader prototext setting: validation_percent or - * tournament_percent - */ - void select_subset_of_data(); - - /** - * Replaced the shuffled index set with the unused index set, empying the - * unused set. - */ - virtual void use_unused_index_set(execution_mode m); - /// Does the data reader have a unique sample list per model virtual bool has_list_per_model() const { return false; } /// Does the data reader have a unique sample list per trainer @@ -769,65 +493,13 @@ class generic_data_reader */ virtual void postprocess_data_source(int tid){}; - /// Shuffle indices (uses the data_seq_generator) - virtual void shuffle_indices(); - /// Shuffle indices and profide a random number generator - virtual void shuffle_indices(rng_gen& gen); - public: - int m_mini_batch_size; - int m_current_pos; - /// Batch Stride is typically batch_size, but may be a multiple of batch size - /// if there are multiple readers - int m_stride_to_next_mini_batch; - /// If there are multiple instances of the reader, - /// then it may not reset to zero - int m_base_offset; - /// If there are multiple models with multiple instances of the reader, - /// each model's set of readers may not reset to zero - /// Provide a set of size, strides, and thresholds to handle the last mini - /// batch of a dataset - int m_model_offset; - /// Sample stride is used when a mini-batch is finely interleaved across a - /// DATA_PARALELL distribution. - int m_sample_stride; - /// Stride used by parallel data readers within the model - int m_iteration_stride; - - std::vector m_shuffled_indices; - /// Record of the indicies that are not being used for training - unused_index_map_t m_unused_indices; - - int m_last_mini_batch_size; - int m_stride_to_last_mini_batch; - /// The index at which this data reader starts its epoch - int m_reset_mini_batch_index; - /// The index of the current mini-batch that has been loaded - int m_loaded_mini_batch_idx; - /// The index of the current mini-batch that is being processed - /// (train/test/validate) - int m_current_mini_batch_idx; - int - m_num_iterations_per_epoch; /// How many iterations all readers will execute - - int m_global_mini_batch_size; - int m_global_last_mini_batch_size; - int m_world_master_mini_batch_adjustment; - - int m_num_parallel_readers; /// How many parallel readers are being used - size_t m_max_files_to_load; std::string m_file_dir; std::string m_local_file_dir; std::string m_data_sample_list; std::string m_data_fn; std::string m_label_fn; - bool m_shuffle; - size_t m_absolute_sample_count; - std::map m_execution_mode_split_percentage; - double m_use_percent; - int m_first_n; - std::string m_role; /** @brief Print the return values from various get_X methods to file * diff --git a/include/lbann/data_streams/data_stream.hpp b/include/lbann/data_streams/data_stream.hpp new file mode 100644 index 00000000000..d42d5e0fc1f --- /dev/null +++ b/include/lbann/data_streams/data_stream.hpp @@ -0,0 +1,412 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC. +// Produced at the Lawrence Livermore National Laboratory. +// Written by the LBANN Research Team (B. Van Essen, et al.) listed in +// the CONTRIBUTORS file. +// +// LLNL-CODE-697807. +// All rights reserved. +// +// This file is part of LBANN: Livermore Big Artificial Neural Network +// Toolkit. For details, see http://software.llnl.gov/LBANN or +// https://github.com/LLNL/LBANN. +// +// Licensed under the Apache License, Version 2.0 (the "Licensee"); you +// may not use this file except in compliance with the License. You may +// obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the license. +//////////////////////////////////////////////////////////////////////////////// + +#ifndef LBANN_DATA_STREAM_HPP +#define LBANN_DATA_STREAM_HPP + +#include "lbann/base.hpp" +#include "lbann/utils/random_number_generators.hpp" +#include "lbann/utils/serialize.hpp" + +// #include +// #include +#include +#include +// #include +#include +#include + +namespace lbann { + +// Forward declarations +class persist; + +class data_stream +{ +public: + using unused_index_map_t = std::map>; + + data_stream(bool shuffle = true) + : m_mini_batch_size(0), + m_current_pos(0), + m_stride_to_next_mini_batch(0), + m_base_offset(0), + m_model_offset(0), + m_sample_stride(1), + m_iteration_stride(1), + m_last_mini_batch_size(0), + m_stride_to_last_mini_batch(0), + m_reset_mini_batch_index(0), + m_loaded_mini_batch_idx(0), + m_current_mini_batch_idx(0), + m_num_iterations_per_epoch(0), + m_global_mini_batch_size(0), + m_global_last_mini_batch_size(0), + m_world_master_mini_batch_adjustment(0), + m_num_parallel_readers(0), + m_shuffle(shuffle), + m_absolute_sample_count(0), + m_use_percent(1.0) + { + } + + /** Archive for checkpoint and restart */ + template + void serialize(Archive& ar); + + /** + * Prepare to start processing an epoch of data. + * If shuffle is true, then shuffle the indices of the data set + * If the base offset is not specified set it to 0 + * If the stride is not specified set it to batch size + */ + virtual void setup(int num_io_threads); + + /** + * If set to false, indices (data samples) are not shuffled + * default (in ctor) is true. + */ + void set_shuffle(bool b) { m_shuffle = b; } + + /** + * Returns true if data samples are shuffled. + */ + bool is_shuffled() const { return m_shuffle; } + + /** + * Set shuffled indices; primary use is for testing + * and reproducibility + */ + void set_shuffled_indices(const std::vector& indices) + { + m_shuffled_indices = indices; + } + + /** + * Returns the shuffled indices; primary use is for testing. + */ + const std::vector& get_shuffled_indices() const + { + return m_shuffled_indices; + } + + /** + * During the network's update phase, the data reader will + * advanced the current position pointer. If the pointer wraps + * around, then reshuffle the data indicies. + */ + virtual bool update(bool is_active_reader); + + /** + * Read the first 'n' samples. If nonzero, this over-rides + * set_absolute_sample_count, set_use_percent. The intent + * is to use this for testing. A problem with set_absolute_sample_count + * and set_use_percent is that the entire data set is read in, then + * a subset is selected + */ + void set_first_n(int n); + + /** + * Sets the absolute number of data samples that will be used for training or + * testing. + */ + void set_absolute_sample_count(size_t s); + + /** + * Set the percentage of the data set to use for training and validation or + * testing. + * @param s The percentage used, in the range [0, 1]. + */ + void set_use_percent(double s); + + /** + * Sets the percentage of the dataset to be used for validation. + * @param m The execution mode. + * @param s The percentage used, in the range [0, 1]. + */ + virtual void set_execution_mode_split_percent(execution_mode m, double s); + + /** + * Set an idenifier for the dataset. + * The role should be one of "train", "test", or "validate". + */ + virtual void set_role(std::string role); + + /** + * Get the role for this dataset. + */ + std::string get_role() const { return m_role; } + + /// True if the data reader's current position is valid. + virtual bool position_valid() const + { + return (m_current_pos < get_num_data()); + } + /// True if the data reader's current position is not valid but within # ranks + /// per model of the end of the data set (e.g. it is a rank with no valid data + /// on the last iteration) + virtual bool position_is_overrun(num_procs_per_trainer) const + { + int end_pos = (int)m_shuffled_indices.size(); + return (m_current_pos >= end_pos && + (m_current_pos - end_pos) < num_procs_per_trainer /*m_comm->get_procs_per_trainer()*/); + } + /// True if the data reader is at the start of an epoch. + bool at_new_epoch() const + { + /// Note that data readers can start at a non-zero index if there + /// are parallel data readers in a model + return ((m_loaded_mini_batch_idx == m_reset_mini_batch_index) && + (m_current_mini_batch_idx == 0)); + } + /// Set the mini batch size + void set_mini_batch_size(const int s); + /// Get the mini batch size + int get_mini_batch_size() const { return m_mini_batch_size; } + /// Get the loaded mini-batch size + int get_loaded_mini_batch_size() const; + /// Get the current mini-batch size. + int get_current_mini_batch_size() const; + /// Get the current global mini-batch size. + int get_current_global_mini_batch_size() const; + /// Get the current mini-batch size. + int get_current_world_master_mini_batch_adjustment(int model_rank) const; + /// Return the full mini_batch_size. + int get_mini_batch_max() const { return m_mini_batch_size; } + /// Set the mini batch size across all models (global) + void set_global_mini_batch_size(const int s) { m_global_mini_batch_size = s; } + /// Return the mini_batch_size across all models (global) + int get_global_mini_batch_size() const { return m_global_mini_batch_size; } + /// Set the mini batch stride + void set_stride_to_next_mini_batch(const int s) + { + m_stride_to_next_mini_batch = s; + } + /// Return the mini batch stride. + int get_stride_to_next_mini_batch() const + { + return m_stride_to_next_mini_batch; + } + /// Set the sample stride + void set_sample_stride(const int s) { m_sample_stride = s; } + /// Return the sample stride. + int get_sample_stride() const { return m_sample_stride; } + /// Set the iteration stride + void set_iteration_stride(const int s) { m_iteration_stride = s; } + /// Return the iteration stride. + int get_iteration_stride() const { return m_iteration_stride; } + /// Return the base offset. + virtual void set_base_offset(const int s) { m_base_offset = s; } + /// Return the base offset. + int get_base_offset() const { return m_base_offset; } + /// Set the model offset + void set_model_offset(const int s) { m_model_offset = s; } + /// Return the model offset. + int get_model_offset() const { return m_model_offset; } + /// Set the last mini batch size + void set_last_mini_batch_size(const int s) { m_last_mini_batch_size = s; } + /// Return the last mini batch size + int get_last_mini_batch_size() const { return m_last_mini_batch_size; } + /// Set the last mini batch size across all models (global) + void set_global_last_mini_batch_size(const int s) + { + m_global_last_mini_batch_size = s; + } + /// Return the last mini batch size across all models (global) + int get_global_last_mini_batch_size() const + { + return m_global_last_mini_batch_size; + } + /// Set the world master mini batch adjustment (global) + void set_world_master_mini_batch_adjustment(const int s) + { + m_world_master_mini_batch_adjustment = s; + } + /// Return the world master mini batch adjustment (global) + int get_world_master_mini_batch_adjustment() const + { + return m_world_master_mini_batch_adjustment; + } + /// Set the last mini batch stride + void set_stride_to_last_mini_batch(const int s) + { + m_stride_to_last_mini_batch = s; + } + /// Return the last mini batch stride + int get_stride_to_last_mini_batch() const + { + return m_stride_to_last_mini_batch; + } + /// Set the number of parallel readers per model + void set_num_parallel_readers(const int s) { m_num_parallel_readers = s; } + /// Return the number of parallel readers per model + int get_num_parallel_readers() const { return m_num_parallel_readers; } + /// Set the starting mini-batch index for the epoch + virtual void set_reset_mini_batch_index(const int s) + { + m_reset_mini_batch_index = s; + } + /// Return the starting mini-batch index for the epoch + int get_reset_mini_batch_index() const { return m_reset_mini_batch_index; } + /// Return the current mini-batch index for the epoch + int get_loaded_mini_batch_index() const { return m_loaded_mini_batch_idx; } + /// Return the current mini-batch index for the epoch + int get_current_mini_batch_index() const { return m_current_mini_batch_idx; } + /// Set the current position based on the base and model offsets + void set_initial_position() + { + m_current_pos = m_base_offset + m_model_offset; + m_loaded_mini_batch_idx = m_reset_mini_batch_index; + m_current_mini_batch_idx = 0; + } + /// Get the current position in the data reader. + int get_position() const { return m_current_pos; } + /// Get the next position in the data reader. + int get_next_position() const; + /// Get a pointer to the start of the shuffled indices. + int* get_indices() { return &m_shuffled_indices[0]; } + /// Get the number of samples in this dataset. + virtual int get_num_data() const { return (int)m_shuffled_indices.size(); } + /// Get the number of unused samples in this dataset. + int get_num_unused_data(execution_mode m) const; + + /// Get a pointer to the start of the unused sample indices. + int* get_unused_data(execution_mode m); + + const std::vector& get_unused_indices(execution_mode m); + + /// Set the number of iterations in each epoch. + void set_num_iterations_per_epoch(int num_iterations_per_epoch) + { + m_num_iterations_per_epoch = + num_iterations_per_epoch; /// @todo BVE FIXME merge this with alternate + /// approach + } + /// Get the number of iterations in each epoch. + int get_num_iterations_per_epoch() const + { + return m_num_iterations_per_epoch; /// @todo BVE FIXME merge this with + /// alternate approach + } + + /// Return the index of the current iteration step in the epoch (also the + /// mini-batch index) + int get_current_step_in_epoch() const { return m_current_mini_batch_idx; } + + /** + * Optionally resizes the shuffled indices based on the data reader + * prototext settings: absolute_sample_count, percent_of_data_to_use. + * (dah - this was formerly part of select_subset_of_data) + */ + void resize_shuffled_indices(); + + /** + * Select the appropriate subset of data for the additional + * execution modes such as validation or tournament set based on + * the data reader prototext setting: validation_percent or + * tournament_percent + */ + void select_subset_of_data(); + + /** + * Replaced the shuffled index set with the unused index set, empying the + * unused set. + */ + virtual void use_unused_index_set(execution_mode m); + + /** \brief Given directory to store checkpoint files, write state to file and + * add to number of bytes written */ + bool save_to_checkpoint_shared(persist& p, execution_mode mode); + + /** \brief Given directory to store checkpoint files, read state from file and + * add to number of bytes read */ + bool load_from_checkpoint_shared(persist& p, execution_mode mode); + + bool save_to_checkpoint_distributed(persist& p, execution_mode mode); + + /** \brief Given directory to store checkpoint files, read state from file and + * add to number of bytes read */ + bool load_from_checkpoint_distributed(persist& p, execution_mode mode); + + /// Shuffle indices (uses the data_seq_generator) + virtual void shuffle_indices(); + /// Shuffle indices and profide a random number generator + virtual void shuffle_indices(rng_gen& gen); + +public: + int m_mini_batch_size; + int m_current_pos; + /// Batch Stride is typically batch_size, but may be a multiple of batch size + /// if there are multiple readers + int m_stride_to_next_mini_batch; + /// If there are multiple instances of the reader, + /// then it may not reset to zero + int m_base_offset; + /// If there are multiple models with multiple instances of the reader, + /// each model's set of readers may not reset to zero + /// Provide a set of size, strides, and thresholds to handle the last mini + /// batch of a dataset + int m_model_offset; + /// Sample stride is used when a mini-batch is finely interleaved across a + /// DATA_PARALELL distribution. + int m_sample_stride; + /// Stride used by parallel data readers within the model + int m_iteration_stride; + + std::string m_role; + + std::vector m_shuffled_indices; + /// Record of the indicies that are not being used for training + unused_index_map_t m_unused_indices; + + int m_last_mini_batch_size; + int m_stride_to_last_mini_batch; + /// The index at which this data reader starts its epoch + int m_reset_mini_batch_index; + /// The index of the current mini-batch that has been loaded + int m_loaded_mini_batch_idx; + /// The index of the current mini-batch that is being processed + /// (train/test/validate) + int m_current_mini_batch_idx; + int + m_num_iterations_per_epoch; /// How many iterations all readers will execute + + int m_global_mini_batch_size; + int m_global_last_mini_batch_size; + int m_world_master_mini_batch_adjustment; + + int m_num_parallel_readers; /// How many parallel readers are being used + + bool m_shuffle; + size_t m_absolute_sample_count; + std::map m_execution_mode_split_percentage; + double m_use_percent; + int m_first_n; +}; + +} // namespace lbann + +#endif // LBANN_DATA_STREAM_HPP diff --git a/scripts/build_lbann.sh b/scripts/build_lbann.sh index a1eceeda8aa..b50a854275a 100755 --- a/scripts/build_lbann.sh +++ b/scripts/build_lbann.sh @@ -637,15 +637,15 @@ if [[ -n "${USER_MIRROR:-}" ]]; then MIRRORS="${MIRRORS:-} ${USER_MIRROR}" fi -if [[ -n "${INSTALL_DEPS:-}" && -z "${SKIP_MIRRORS:-}" ]]; then - CMD="spack mirror add binary_mirror https://binaries.spack.io/releases/v0.18" - echo ${CMD} | tee -a ${LOG} - [[ -z "${DRY_RUN:-}" ]] && { ${CMD} || exit_on_failure "${CMD}"; } - # Tell Spack to trust the keys in the build cache - CMD="spack buildcache keys --install --trust" - echo ${CMD} | tee -a ${LOG} - [[ -z "${DRY_RUN:-}" ]] && { ${CMD} || exit_on_failure "${CMD}"; } -fi +# if [[ -n "${INSTALL_DEPS:-}" && -z "${SKIP_MIRRORS:-}" ]]; then +# CMD="spack mirror add binary_mirror https://binaries.spack.io/releases/v0.18" +# echo ${CMD} | tee -a ${LOG} +# [[ -z "${DRY_RUN:-}" ]] && { ${CMD} || exit_on_failure "${CMD}"; } +# # Tell Spack to trust the keys in the build cache +# CMD="spack buildcache keys --install --trust" +# echo ${CMD} | tee -a ${LOG} +# [[ -z "${DRY_RUN:-}" ]] && { ${CMD} || exit_on_failure "${CMD}"; } +# fi if [[ -n "${INSTALL_DEPS:-}" && -n "${MIRRORS:-}" ]]; then i=0 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 29e171a76e7..8465c55f725 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,6 +34,7 @@ set_full_path(THIS_DIR_SOURCES add_subdirectory(callbacks) add_subdirectory(data_coordinator) add_subdirectory(data_readers) +add_subdirectory(data_streams) add_subdirectory(data_store) add_subdirectory(io) add_subdirectory(layers) diff --git a/src/data_readers/data_reader.cpp b/src/data_readers/data_reader.cpp index 089494e68b2..0bc9acd6e07 100644 --- a/src/data_readers/data_reader.cpp +++ b/src/data_readers/data_reader.cpp @@ -59,42 +59,11 @@ generic_data_reader::~generic_data_reader() template void generic_data_reader::serialize(Archive& ar) { - ar(CEREAL_NVP(m_current_mini_batch_idx), - CEREAL_NVP(m_current_pos), - CEREAL_NVP(m_shuffled_indices), - CEREAL_NVP(m_supported_input_types)); + ar(CEREAL_NVP(m_supported_input_types)); } -void generic_data_reader::shuffle_indices() +void generic_data_reader::setup(observer_ptr io_thread_pool) { - shuffle_indices(get_data_seq_generator()); -} - -void generic_data_reader::shuffle_indices(rng_gen& gen) -{ - // Shuffle the data - if (m_shuffle) { - std::shuffle(m_shuffled_indices.begin(), m_shuffled_indices.end(), gen); - } -} - -void generic_data_reader::setup(int num_io_threads, - observer_ptr io_thread_pool) -{ - m_base_offset = 0; - m_sample_stride = 1; - m_stride_to_next_mini_batch = 0; - m_stride_to_last_mini_batch = 0; - m_current_mini_batch_idx = 0; - m_num_iterations_per_epoch = 0; - m_global_mini_batch_size = 0; - m_global_last_mini_batch_size = 0; - m_world_master_mini_batch_adjustment = 0; - - set_initial_position(); - - shuffle_indices(); - m_io_thread_pool = io_thread_pool; } @@ -424,50 +393,6 @@ bool lbann::generic_data_reader::fetch_data_block_conduit( return true; } -bool generic_data_reader::update(bool is_active_reader) -{ - bool reader_not_done = true; // BVE The sense of this should be fixed - m_current_mini_batch_idx++; - - if (is_active_reader) { - m_current_pos = get_next_position(); - m_loaded_mini_batch_idx += m_iteration_stride; - } - if (m_loaded_mini_batch_idx >= m_num_iterations_per_epoch) { - reader_not_done = false; - } - if ((size_t)m_current_pos >= m_shuffled_indices.size()) { - reader_not_done = false; - } - if (m_current_mini_batch_idx == m_num_iterations_per_epoch) { - // for working with 1B jag samples, we may not process all the data - if ((m_comm->get_rank_in_trainer() < m_num_parallel_readers) && - (m_current_pos < (int)m_shuffled_indices.size())) { - throw lbann_exception( - std::string{} + __FILE__ + " " + std::to_string(__LINE__) + - " :: generic data reader update error: the epoch is complete," + - " but not all of the data has been used -- current pos = " + - std::to_string(m_current_pos) + " and there are " + - std::to_string(m_shuffled_indices.size()) + " indices" + - " : iteration=" + std::to_string(m_current_mini_batch_idx) + "C [" + - std::to_string(m_loaded_mini_batch_idx) + "L] of " + - std::to_string(m_num_iterations_per_epoch) + "+" + - std::to_string(m_iteration_stride) + " : " + - " index stride=" + std::to_string(m_stride_to_next_mini_batch) + "/" + - std::to_string(m_stride_to_last_mini_batch)); - } - - shuffle_indices(); - if (priming_data_store()) { - m_data_store->set_shuffled_indices(&m_shuffled_indices); - } - - set_initial_position(); - } - - return reader_not_done; -} - int generic_data_reader::get_linearized_size( data_field_type const& data_field) const { @@ -489,221 +414,6 @@ int generic_data_reader::get_linearized_size( return 0; } -int generic_data_reader::get_loaded_mini_batch_size() const -{ - if (m_loaded_mini_batch_idx >= (m_num_iterations_per_epoch - 1)) { - return m_last_mini_batch_size; - } - else { - return m_mini_batch_size; - } -} - -int generic_data_reader::get_current_mini_batch_size() const -{ - if (m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { - return m_last_mini_batch_size + m_world_master_mini_batch_adjustment; - } - else { - return m_mini_batch_size; - } -} - -int generic_data_reader::get_current_global_mini_batch_size() const -{ - if (m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { - return m_global_last_mini_batch_size; - } - else { - return m_global_mini_batch_size; - } -} - -/// Returns the current adjustment to the mini-batch size based on if -/// the world master (model 0) has any extra samples -/// Note that any rank in model 0 does not need to add in this offset -/// since the model will already be aware of the extra samples -int generic_data_reader::get_current_world_master_mini_batch_adjustment( - int model_rank) const -{ - if (model_rank != 0 && - m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { - return m_world_master_mini_batch_adjustment; - } - else { - return 0; - } -} - -int generic_data_reader::get_next_position() const -{ - /// If the next mini-batch for this rank is going to be the last - /// mini-batch, take the proper (possibly reduced) step to - /// setup for the last mini-batch - if ((m_current_mini_batch_idx + m_iteration_stride - 1) == - (m_num_iterations_per_epoch - 1)) { - return m_current_pos + m_stride_to_last_mini_batch; - } - else { - return m_current_pos + m_stride_to_next_mini_batch; - } -} - -int generic_data_reader::get_num_unused_data(execution_mode m) const -{ - if (m_unused_indices.count(m)) { - return (int)m_unused_indices.at(m).size(); - } - else { - LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); - } -} -/// Get a pointer to the start of the unused sample indices. -int* generic_data_reader::get_unused_data(execution_mode m) -{ - if (m_unused_indices.count(m)) { - return &(m_unused_indices[m][0]); - } - else { - LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); - } -} -const std::vector& -generic_data_reader::get_unused_indices(execution_mode m) -{ - if (m_unused_indices.count(m)) { - return m_unused_indices.at(m); - } - else { - LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); - } -} - -void generic_data_reader::error_check_counts() const -{ - size_t count = get_absolute_sample_count(); - double use_percent = get_use_percent(); - if (count == 1 and use_percent == 0.0) { - LBANN_ERROR("get_use_percent() and get_absolute_sample_count() are both " - "zero; exactly one must be zero"); - } - if (!(count == 0 or use_percent == 0.0)) { - LBANN_ERROR("get_use_percent() and get_absolute_sample_count() are both " - "non-zero; exactly one must be zero"); - } - if (count != 0) { - if (count > static_cast(get_num_data())) { - LBANN_ERROR("absolute_sample_count=" + std::to_string(count) + - " is > get_num_data=" + std::to_string(get_num_data())); - } - } -} - -size_t generic_data_reader::get_num_indices_to_use() const -{ - error_check_counts(); - // note: exactly one of the following is guaranteed to be non-zero - size_t count = get_absolute_sample_count(); - double use_percent = get_use_percent(); - - size_t r = 0.; - if (count) { - r = count; - } - else if (use_percent) { - r = use_percent * get_num_data(); - if (r == 0) { - LBANN_ERROR("get_num_indices_to_use() computed zero indices; probably: " - "percent_of_data_to_use is too small WRT num_data; " - "get_absolute_sample_count: ", - get_absolute_sample_count(), - " use_percent: ", - get_use_percent(), - " num data: ", - get_num_data(), - " for role: ", - get_role()); - } - } - else { - LBANN_ERROR("it's impossible to be here"); - } - - return r; -} - -void generic_data_reader::resize_shuffled_indices() -{ - size_t num_indices = get_num_indices_to_use(); - shuffle_indices(); - m_shuffled_indices.resize(num_indices); -} - -void generic_data_reader::select_subset_of_data() -{ - // Calculate the total number of samples for subsets - double total_split_percent = 0.; - for (auto m : execution_mode_iterator()) { - total_split_percent += get_execution_mode_split_percent(m); - } - long total_num_data = get_num_data(); - long total_unused = total_split_percent * total_num_data; - long total_used = total_num_data - total_unused; - auto starting_unused_offset = m_shuffled_indices.begin() + total_used; - for (auto m : execution_mode_iterator()) { - double split_percent = get_execution_mode_split_percent(m); - - if (split_percent == 0.) { - continue; - } - - long split = split_percent * total_num_data; - if (split == 0) { - LBANN_ERROR(to_string(m), - " % of ", - split_percent, - " was requested, but the number of validation indices was " - "computed as zero. Probably: % ", - to_string(m), - " requested is too small wrt num_indices (aka, num samples)"); - } - if (split > 0) { - if (starting_unused_offset + split > m_shuffled_indices.end()) { - LBANN_ERROR( - "Split range exceeds the maximun numbrer of shuffled indices"); - } - m_unused_indices[m] = std::vector(starting_unused_offset, - starting_unused_offset + split); - starting_unused_offset += split; - } - } - m_shuffled_indices.resize(total_used); - - if (!m_shuffle) { - std::sort(m_shuffled_indices.begin(), m_shuffled_indices.end()); - for (auto m : execution_mode_iterator()) { - if (m_unused_indices.count(m)) { - std::sort(m_unused_indices[m].begin(), m_unused_indices[m].end()); - } - } - } -} - -void generic_data_reader::use_unused_index_set(execution_mode m) -{ - if (m_unused_indices.count(m) == 0) { - LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); - } - - m_shuffled_indices.swap(m_unused_indices[m]); - if (m_data_store != nullptr) { - /// Update the data store's pointer to the shuffled indices - m_data_store->set_shuffled_indices(&m_shuffled_indices); - } - m_unused_indices[m].clear(); - std::vector().swap( - m_unused_indices[m]); // Trick to force memory reallocation -} bool generic_data_reader::save_to_checkpoint_shared(persist& p, execution_mode mode) @@ -863,53 +573,6 @@ std::string generic_data_reader::get_label_filename() const return m_label_fn; } -void generic_data_reader::set_first_n(int n) { m_first_n = n; } - -void generic_data_reader::set_absolute_sample_count(size_t s) -{ - m_absolute_sample_count = s; -} - -size_t generic_data_reader::get_absolute_sample_count() const -{ - return m_absolute_sample_count; -} - -void generic_data_reader::set_execution_mode_split_percent(execution_mode m, - double s) -{ - if (s < 0 or s > 1.0) { - throw lbann_exception( - std::string{} + __FILE__ + " " + std::to_string(__LINE__) + - " :: set_validation_percent() - must be: s >= 0, s <= 1.0; you passed: " + - std::to_string(s)); - } - m_execution_mode_split_percentage[m] = s; -} - -double -generic_data_reader::get_execution_mode_split_percent(execution_mode m) const -{ - if (m_execution_mode_split_percentage.count(m)) { - return m_execution_mode_split_percentage.at(m); - } - else { - return 0; - } -} - -void generic_data_reader::set_use_percent(double s) -{ - if (s < 0 or s > 1.0) { - throw lbann_exception( - std::string{} + __FILE__ + " " + std::to_string(__LINE__) + - " :: set_use_percent() - must be: s >= 0, s <= 1.0; you passed: " + - std::to_string(s)); - } - m_use_percent = s; -} - -double generic_data_reader::get_use_percent() const { return m_use_percent; } void generic_data_reader::instantiate_data_store() { @@ -1008,13 +671,6 @@ void generic_data_reader::set_data_store(data_store_conduit* g) m_data_store = g; } -void generic_data_reader::set_mini_batch_size(const int s) -{ - m_mini_batch_size = s; -} - -void generic_data_reader::set_role(std::string role) { m_role = role; } - void generic_data_reader::preload_data_store() { if (m_data_store->is_local_cache()) { diff --git a/src/data_streams/CMakeLists.txt b/src/data_streams/CMakeLists.txt new file mode 100644 index 00000000000..c6d4fa5b508 --- /dev/null +++ b/src/data_streams/CMakeLists.txt @@ -0,0 +1,32 @@ +################################################################################ +## Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC. +## Produced at the Lawrence Livermore National Laboratory. +## Written by the LBANN Research Team (B. Van Essen, et al.) listed in +## the CONTRIBUTORS file. +## +## LLNL-CODE-697807. +## All rights reserved. +## +## This file is part of LBANN: Livermore Big Artificial Neural Network +## Toolkit. For details, see http://software.llnl.gov/LBANN or +## https://github.com/LLNL/LBANN. +## +## Licensed under the Apache License, Version 2.0 (the "Licensee"); you +## may not use this file except in compliance with the License. You may +## obtain a copy of the License at: +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +## implied. See the License for the specific language governing +## permissions and limitations under the license. +################################################################################ +# Add the source files for this directory +set_full_path(THIS_DIR_SOURCES + data_stream.cpp + ) + +# Propagate the files up the tree +set(SOURCES "${SOURCES}" "${THIS_DIR_SOURCES}" PARENT_SCOPE) diff --git a/src/data_streams/data_stream.cpp b/src/data_streams/data_stream.cpp new file mode 100644 index 00000000000..8cad9877602 --- /dev/null +++ b/src/data_streams/data_stream.cpp @@ -0,0 +1,456 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC. +// Produced at the Lawrence Livermore National Laboratory. +// Written by the LBANN Research Team (B. Van Essen, et al.) listed in +// the CONTRIBUTORS file. +// +// LLNL-CODE-697807. +// All rights reserved. +// +// This file is part of LBANN: Livermore Big Artificial Neural Network +// Toolkit. For details, see http://software.llnl.gov/LBANN or +// https://github.com/LLNL/LBANN. +// +// Licensed under the Apache License, Version 2.0 (the "Licensee"); you +// may not use this file except in compliance with the License. You may +// obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the license. +//////////////////////////////////////////////////////////////////////////////// + +#include "lbann/data_streams/data_stream.hpp" +#include "lbann/io/persist_impl.hpp" + +namespace lbann { + +template +void data_stream::serialize(Archive& ar) +{ + ar(CEREAL_NVP(m_current_mini_batch_idx), + CEREAL_NVP(m_current_pos), + CEREAL_NVP(m_shuffled_indices)); +} + +void data_stream::shuffle_indices() +{ + shuffle_indices(get_data_seq_generator()); +} + +void data_stream::shuffle_indices(rng_gen& gen) +{ + // Shuffle the data + if (m_shuffle) { + std::shuffle(m_shuffled_indices.begin(), m_shuffled_indices.end(), gen); + } +} + +// BVE Deprecate the num_io_threads here +void data_stream::setup(int num_io_threads) +{ + m_base_offset = 0; + m_sample_stride = 1; + m_stride_to_next_mini_batch = 0; + m_stride_to_last_mini_batch = 0; + m_current_mini_batch_idx = 0; + m_num_iterations_per_epoch = 0; + m_global_mini_batch_size = 0; + m_global_last_mini_batch_size = 0; + m_world_master_mini_batch_adjustment = 0; + + set_initial_position(); + + shuffle_indices(); + +} + +bool data_stream::update(bool is_active_reader) +{ + bool reader_not_done = true; // BVE The sense of this should be fixed + m_current_mini_batch_idx++; + + if (is_active_reader) { + m_current_pos = get_next_position(); + m_loaded_mini_batch_idx += m_iteration_stride; + } + if (m_loaded_mini_batch_idx >= m_num_iterations_per_epoch) { + reader_not_done = false; + } + if ((size_t)m_current_pos >= m_shuffled_indices.size()) { + reader_not_done = false; + } + if (m_current_mini_batch_idx == m_num_iterations_per_epoch) { + // for working with 1B jag samples, we may not process all the data + if ((m_comm->get_rank_in_trainer() < m_num_parallel_readers) && + (m_current_pos < (int)m_shuffled_indices.size())) { + throw lbann_exception( + std::string{} + __FILE__ + " " + std::to_string(__LINE__) + + " :: generic data reader update error: the epoch is complete," + + " but not all of the data has been used -- current pos = " + + std::to_string(m_current_pos) + " and there are " + + std::to_string(m_shuffled_indices.size()) + " indices" + + " : iteration=" + std::to_string(m_current_mini_batch_idx) + "C [" + + std::to_string(m_loaded_mini_batch_idx) + "L] of " + + std::to_string(m_num_iterations_per_epoch) + "+" + + std::to_string(m_iteration_stride) + " : " + + " index stride=" + std::to_string(m_stride_to_next_mini_batch) + "/" + + std::to_string(m_stride_to_last_mini_batch)); + } + + shuffle_indices(); + if (priming_data_store()) { + m_data_store->set_shuffled_indices(&m_shuffled_indices); + } + + set_initial_position(); + } + + return reader_not_done; +} + +int data_stream::get_loaded_mini_batch_size() const +{ + if (m_loaded_mini_batch_idx >= (m_num_iterations_per_epoch - 1)) { + return m_last_mini_batch_size; + } + else { + return m_mini_batch_size; + } +} + +int data_stream::get_current_mini_batch_size() const +{ + if (m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { + return m_last_mini_batch_size + m_world_master_mini_batch_adjustment; + } + else { + return m_mini_batch_size; + } +} + +int data_stream::get_current_global_mini_batch_size() const +{ + if (m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { + return m_global_last_mini_batch_size; + } + else { + return m_global_mini_batch_size; + } +} + +/// Returns the current adjustment to the mini-batch size based on if +/// the world master (model 0) has any extra samples +/// Note that any rank in model 0 does not need to add in this offset +/// since the model will already be aware of the extra samples +int data_stream::get_current_world_master_mini_batch_adjustment( + int model_rank) const +{ + if (model_rank != 0 && + m_current_mini_batch_idx == (m_num_iterations_per_epoch - 1)) { + return m_world_master_mini_batch_adjustment; + } + else { + return 0; + } +} + +int data_stream::get_next_position() const +{ + /// If the next mini-batch for this rank is going to be the last + /// mini-batch, take the proper (possibly reduced) step to + /// setup for the last mini-batch + if ((m_current_mini_batch_idx + m_iteration_stride - 1) == + (m_num_iterations_per_epoch - 1)) { + return m_current_pos + m_stride_to_last_mini_batch; + } + else { + return m_current_pos + m_stride_to_next_mini_batch; + } +} + +int data_stream::get_num_unused_data(execution_mode m) const +{ + if (m_unused_indices.count(m)) { + return (int)m_unused_indices.at(m).size(); + } + else { + LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); + } +} +/// Get a pointer to the start of the unused sample indices. +int* data_stream::get_unused_data(execution_mode m) +{ + if (m_unused_indices.count(m)) { + return &(m_unused_indices[m][0]); + } + else { + LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); + } +} +const std::vector& +data_stream::get_unused_indices(execution_mode m) +{ + if (m_unused_indices.count(m)) { + return m_unused_indices.at(m); + } + else { + LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); + } +} + +void data_stream::error_check_counts() const +{ + size_t count = get_absolute_sample_count(); + double use_percent = get_use_percent(); + if (count == 1 and use_percent == 0.0) { + LBANN_ERROR("get_use_percent() and get_absolute_sample_count() are both " + "zero; exactly one must be zero"); + } + if (!(count == 0 or use_percent == 0.0)) { + LBANN_ERROR("get_use_percent() and get_absolute_sample_count() are both " + "non-zero; exactly one must be zero"); + } + if (count != 0) { + if (count > static_cast(get_num_data())) { + LBANN_ERROR("absolute_sample_count=" + std::to_string(count) + + " is > get_num_data=" + std::to_string(get_num_data())); + } + } +} + +size_t data_stream::get_num_indices_to_use() const +{ + error_check_counts(); + // note: exactly one of the following is guaranteed to be non-zero + size_t count = get_absolute_sample_count(); + double use_percent = get_use_percent(); + + size_t r = 0.; + if (count) { + r = count; + } + else if (use_percent) { + r = use_percent * get_num_data(); + if (r == 0) { + LBANN_ERROR("get_num_indices_to_use() computed zero indices; probably: " + "percent_of_data_to_use is too small WRT num_data; " + "get_absolute_sample_count: ", + get_absolute_sample_count(), + " use_percent: ", + get_use_percent(), + " num data: ", + get_num_data(), + " for role: ", + get_role()); + } + } + else { + LBANN_ERROR("it's impossible to be here"); + } + + return r; +} + +void data_stream::resize_shuffled_indices() +{ + size_t num_indices = get_num_indices_to_use(); + shuffle_indices(); + m_shuffled_indices.resize(num_indices); +} + +void data_stream::select_subset_of_data() +{ + // Calculate the total number of samples for subsets + double total_split_percent = 0.; + for (auto m : execution_mode_iterator()) { + total_split_percent += get_execution_mode_split_percent(m); + } + long total_num_data = get_num_data(); + long total_unused = total_split_percent * total_num_data; + long total_used = total_num_data - total_unused; + auto starting_unused_offset = m_shuffled_indices.begin() + total_used; + for (auto m : execution_mode_iterator()) { + double split_percent = get_execution_mode_split_percent(m); + + if (split_percent == 0.) { + continue; + } + + long split = split_percent * total_num_data; + if (split == 0) { + LBANN_ERROR(to_string(m), + " % of ", + split_percent, + " was requested, but the number of validation indices was " + "computed as zero. Probably: % ", + to_string(m), + " requested is too small wrt num_indices (aka, num samples)"); + } + if (split > 0) { + if (starting_unused_offset + split > m_shuffled_indices.end()) { + LBANN_ERROR( + "Split range exceeds the maximun numbrer of shuffled indices"); + } + m_unused_indices[m] = std::vector(starting_unused_offset, + starting_unused_offset + split); + starting_unused_offset += split; + } + } + m_shuffled_indices.resize(total_used); + + if (!m_shuffle) { + std::sort(m_shuffled_indices.begin(), m_shuffled_indices.end()); + for (auto m : execution_mode_iterator()) { + if (m_unused_indices.count(m)) { + std::sort(m_unused_indices[m].begin(), m_unused_indices[m].end()); + } + } + } +} + +void data_stream::use_unused_index_set(execution_mode m) +{ + if (m_unused_indices.count(m) == 0) { + LBANN_ERROR("Invalid execution mode ", to_string(m), " for unused indices"); + } + + m_shuffled_indices.swap(m_unused_indices[m]); + if (m_data_store != nullptr) { + /// Update the data store's pointer to the shuffled indices + m_data_store->set_shuffled_indices(&m_shuffled_indices); + } + m_unused_indices[m].clear(); + std::vector().swap( + m_unused_indices[m]); // Trick to force memory reallocation +} + +bool data_stream::save_to_checkpoint_shared(persist& p, + execution_mode mode) +{ + if (get_comm()->am_trainer_master()) { + write_cereal_archive(*this, + p, + mode, +#ifdef LBANN_HAS_CEREAL_XML_ARCHIVES + "_dr.xml" +#else // defined LBANN_HAS_CEREAL_BINARY_ARCHIVES + "_dr.bin" +#endif // LBANN_HAS_CEREAL_XML_ARCHIVES + ); + } + return true; +} + +bool lbann::data_stream::load_from_checkpoint_shared( + persist& p, + execution_mode mode) +{ + load_from_shared_cereal_archive(*this, + p, + mode, + *get_comm(), +#ifdef LBANN_HAS_CEREAL_XML_ARCHIVES + "_dr.xml" +#else // defined LBANN_HAS_CEREAL_BINARY_ARCHIVES + "_dr.bin" +#endif // LBANN_HAS_CEREAL_XML_ARCHIVES + ); + // Adjust current position to deal with fact that it was just loaded to all + // ranks from rank 0 (differs by rank #) + m_current_pos += m_comm->get_rank_in_trainer(); + return true; +} + +bool data_stream::save_to_checkpoint_distributed(persist& p, + execution_mode mode) +{ + write_cereal_archive(*this, + p, + mode, +#ifdef LBANN_HAS_CEREAL_XML_ARCHIVES + "_dr.xml" +#else // defined LBANN_HAS_CEREAL_BINARY_ARCHIVES + "_dr.bin" +#endif // LBANN_HAS_CEREAL_XML_ARCHIVES + + ); + return true; +} + +bool lbann::data_stream::load_from_checkpoint_distributed( + persist& p, + execution_mode mode) +{ + read_cereal_archive(*this, + p, + mode, +#ifdef LBANN_HAS_CEREAL_XML_ARCHIVES + "_dr.xml" +#else // defined LBANN_HAS_CEREAL_BINARY_ARCHIVES + "_dr.bin" +#endif // LBANN_HAS_CEREAL_XML_ARCHIVES + ); + return true; +} + +void data_stream::set_first_n(int n) { m_first_n = n; } + +void data_stream::set_absolute_sample_count(size_t s) +{ + m_absolute_sample_count = s; +} + +size_t data_stream::get_absolute_sample_count() const +{ + return m_absolute_sample_count; +} + +void data_stream::set_execution_mode_split_percent(execution_mode m, + double s) +{ + if (s < 0 or s > 1.0) { + throw lbann_exception( + std::string{} + __FILE__ + " " + std::to_string(__LINE__) + + " :: set_validation_percent() - must be: s >= 0, s <= 1.0; you passed: " + + std::to_string(s)); + } + m_execution_mode_split_percentage[m] = s; +} + +double +data_stream::get_execution_mode_split_percent(execution_mode m) const +{ + if (m_execution_mode_split_percentage.count(m)) { + return m_execution_mode_split_percentage.at(m); + } + else { + return 0; + } +} + +void data_stream::set_use_percent(double s) +{ + if (s < 0 or s > 1.0) { + throw lbann_exception( + std::string{} + __FILE__ + " " + std::to_string(__LINE__) + + " :: set_use_percent() - must be: s >= 0, s <= 1.0; you passed: " + + std::to_string(s)); + } + m_use_percent = s; +} + +double data_stream::get_use_percent() const { return m_use_percent; } + +void data_stream::set_mini_batch_size(const int s) +{ + m_mini_batch_size = s; +} + +void data_stream::set_role(std::string role) { m_role = role; } + +} // namespace lbann