Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPI Scaling fix #894

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@
[submodule "extern/netcdf-cxx4"]
path = extern/netcdf-cxx4/netcdf-cxx4
url = https://github.com/Unidata/netcdf-cxx4/
[submodule "extern/lstm/lstm"]
path = extern/lstm/lstm
url = https://github.com/CIROH-UA/lstm
27 changes: 27 additions & 0 deletions extern/lstm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# LSTM Submodule

## About
This submodule is linked in from: https://github.com/CIROH-UA/lstm, which is a fork of https://github.com/NOAA-OWP/lstm. This directory follows the template for linking submodules from https://github.com/NOAA-OWP/ngen/edit/master/extern/cfe/.

#### Extra Outer Directory

Currently there are two directory layers beneath the top-level *extern/* directory. This was done so that certain things used by NGen (i.e., a *CMakeLists.txt* file for building shared library files) can be placed alongside, but not within, the submodule.

## Working with the Submodule

Some simple explanations of several command actions are included below. To better understand what these things are doing, consult the [Git Submodule documentation](https://git-scm.com/book/en/v2/Git-Tools-Submodules).

### Getting the Latest Changes

There are two steps to getting upstream submodule changes fully
1. fetching and locally checking out the changes from the remote
2. committing the new checkout revision for the submodule

To fetch and check out the latest revision (for the [currently used branch](#viewing-the-current-branch)):

git submodule update --init -- extern/lstm/lstm

To commit the current submodule checkout revision to the CIROH UA NGen repo:

git add extern/lstm/lstm
git commit
1 change: 1 addition & 0 deletions extern/lstm/lstm
Submodule lstm added at 44a16c
5 changes: 3 additions & 2 deletions include/forcing/AorcForcing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ struct forcing_params
std::string provider;
time_t simulation_start_t;
time_t simulation_end_t;
bool enable_cache = true;
/*
Constructor for forcing_params
*/
forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time):
path(path), provider(provider), start_time(start_time), end_time(end_time)
forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time, bool enable_cache) :
path(path), provider(provider), start_time(start_time), end_time(end_time), enable_cache(enable_cache)
{
/// \todo converting to UTC can be tricky, especially if thread safety is a concern
/* https://stackoverflow.com/questions/530519/stdmktime-and-timezone-info */
Expand Down
5 changes: 3 additions & 2 deletions include/forcing/NetCDFPerFeatureDataProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ namespace data_access
* @param log_s An output log stream for messages from the underlying library. If a provider object for
* the given path already exists, this argument will be ignored.
*/
static std::shared_ptr<NetCDFPerFeatureDataProvider> get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s);
static std::shared_ptr<NetCDFPerFeatureDataProvider> get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache);

/**
* @brief Cleanup the shared providers cache, ensuring that the files get closed.
*/
static void cleanup_shared_providers();

NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s);
NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache);

// Default implementation defined in the .cpp file so that
// client code doesn't need to have the full definition of
Expand Down Expand Up @@ -135,6 +135,7 @@ namespace data_access
std::map<std::string,netCDF::NcVar> ncvar_cache;
std::map<std::string,std::string> units_cache;
boost::compute::detail::lru_cache<std::string, std::shared_ptr<std::vector<double>>> value_cache;
bool enable_cache;
size_t cache_slice_t_size = 1;
size_t cache_slice_c_size = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace realization {
}
#if NGEN_WITH_NETCDF
else if (forcing_config.provider == "NetCDF"){
fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream);
fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream, forcing_config.enable_cache);
}
#endif
else if (forcing_config.provider == "NullForcingProvider"){
Expand Down
80 changes: 77 additions & 3 deletions include/realizations/catchment/Formulation_Manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,59 @@ namespace realization {

//for case where there is no output_root in the realization file
return "./";

}

/**
* @brief Check if the formulation has catchment output writing enabled
*
* @code{.cpp}
* // Example config:
* // ...
* // "write_catchment_output": false
* // ...
* const auto manager = Formulation_Manger(CONFIG);
* manager.get_output_root();
* //> false
* @endcode
*
* @return bool
*/
bool write_catchment_output() const {
const auto write_catchment_output = this->tree.get_optional<std::string>("write_catchment_output");
if (write_catchment_output != boost::none && *write_catchment_output != "") {
// if any variation of "false" or "no" or 0 is found, return false
if (write_catchment_output->compare("false") == 0 || write_catchment_output->compare("no") == 0 || write_catchment_output->compare("0") == 0) {
return false;
}
}
return true;
}

/**
* @brief Check if the formulation uses remote partitioning for mpi partitions
*
* @code{.cpp}
* // Example config:
* // ...
* // "remotes_enabled": false
* // ...
* const auto manager = Formulation_Manger(CONFIG);
* manager.get_output_root();
* //> false
* @endcode
*
* @return bool
*/
bool remotes_enabled() const {
const auto remotes_enabled = this->tree.get_optional<std::string>("remotes_enabled");
if (remotes_enabled != boost::none && *remotes_enabled != "") {
// if any variation of "false" or "no" or 0 is found, return false
if (remotes_enabled->compare("false") == 0 || remotes_enabled->compare("no") == 0 || remotes_enabled->compare("0") == 0) {
return false;
}
}
return true;
}

/**
Expand Down Expand Up @@ -395,9 +448,27 @@ namespace realization {
}

forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) {
int rank = 0;
bool enable_cache = true;
#if NGEN_WITH_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
#endif

if (forcing_prop_map.count("enable_cache") != 0) {
enable_cache = forcing_prop_map.at("enable_cache").as_boolean();
}

std::string path = "";
if(forcing_prop_map.count("path") != 0){
path = forcing_prop_map.at("path").as_string();
int id_index = path.find("{{id}}");
int partition_id_index = path.find("{{partition_id}}");
if (id_index != std::string::npos) {
path = path.replace(id_index, sizeof("{{id}}") - 1, identifier);
}
if (partition_id_index != std::string::npos) {
path = path.replace(partition_id_index, sizeof("{{partition_id}}") - 1, std::to_string(rank));
}
}
std::string provider;
if(forcing_prop_map.count("provider") != 0){
Expand All @@ -408,7 +479,8 @@ namespace realization {
path,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}

Expand Down Expand Up @@ -497,7 +569,8 @@ namespace realization {
path + entry->d_name,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}
else if ( entry->d_type == DT_UNKNOWN )
Expand All @@ -516,7 +589,8 @@ namespace realization {
path + entry->d_name,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}
throw std::runtime_error("Forcing data is path "+path+entry->d_name+" is not a file");
Expand Down
107 changes: 66 additions & 41 deletions src/NGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,16 @@ int main(int argc, char *argv[]) {
for(const auto& id : features.nexuses()) {
#if NGEN_WITH_MPI
if (mpi_num_procs > 1) {
if (manager->remotes_enabled() == true) {
if (!features.is_remote_sender_nexus(id)) {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
} else {
}
else {
nexus_outfiles[id].open(manager->get_output_root() + id + "_rank_" + std::to_string(mpi_rank) + "_output.csv", std::ios::trunc);
}
}
else {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
#else
Expand Down Expand Up @@ -558,57 +564,76 @@ int main(int argc, char *argv[]) {
} //done time

#if NGEN_WITH_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (mpi_rank == 0)
{
std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl;
}
MPI_Request barrier_request;
MPI_Ibarrier(MPI_COMM_WORLD, &barrier_request);

auto time_done_simulation = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_simulation = time_done_simulation - time_done_init;
int flag = 0;
const int sleep_microseconds = 100000; // 100 millisecond sleep

#if NGEN_WITH_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

#if NGEN_WITH_ROUTING
if (mpi_rank == 0)
{ // Run t-route from single process
if(manager->get_using_routing()) {
//Note: Currently, delta_time is set in the t-route yaml configuration file, and the
//number_of_timesteps is determined from the total number of nexus outputs in t-route.
//It is recommended to still pass these values to the routing_py_adapter object in
//case a future implmentation needs these two values from the ngen framework.
int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times();

int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds();

router->route(number_of_timesteps, delta_time);
// Wait for all ranks to reach the barrier
while (!flag) {
MPI_Test(&barrier_request, &flag, MPI_STATUS_IGNORE);
if (!flag) {
usleep(sleep_microseconds);
}
}
#endif
if (mpi_rank == 0) {
std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl;

auto time_done_simulation = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_simulation = time_done_simulation - time_done_init;


auto time_done_routing = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_routing = time_done_routing - time_done_simulation;

if (mpi_rank == 0)
{
std::cout << "NGen top-level timings:"
<< "\n\tNGen::init: " << time_elapsed_init.count()
<< "\n\tNGen::simulation: " << time_elapsed_simulation.count()
#if NGEN_WITH_ROUTING
<< "\n\tNGen::routing: " << time_elapsed_routing.count()
if(manager->get_using_routing()) {
//Note: Currently, delta_time is set in the t-route yaml configuration file, and the
//number_of_timesteps is determined from the total number of nexus outputs in t-route.
//It is recommended to still pass these values to the routing_py_adapter object in
//case a future implmentation needs these two values from the ngen framework.
int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times();

int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds();

router->route(number_of_timesteps, delta_time);
}
#endif
<< std::endl;
}

manager->finalize();

#if NGEN_WITH_MPI
auto time_done_routing = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_routing = time_done_routing - time_done_simulation;

std::cout << "NGen top-level timings:"
<< "\n\tNGen::init: " << time_elapsed_init.count()
<< "\n\tNGen::simulation: " << time_elapsed_simulation.count()
#if NGEN_WITH_ROUTING
<< "\n\tNGen::routing: " << time_elapsed_routing.count()
#endif
<< std::endl;
#if NGEN_WITH_MPI
for (int i = 1; i < mpi_num_procs; ++i) {
MPI_Send(NULL, 0, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
else {
// Non-root processes
MPI_Request recv_request;
MPI_Irecv(NULL, 0, MPI_INT, 0, 0, MPI_COMM_WORLD, &recv_request);

int recv_flag = 0;
while (!recv_flag) {
MPI_Test(&recv_request, &recv_flag, MPI_STATUS_IGNORE);
if (!recv_flag) {
usleep(sleep_microseconds);
}
}
#endif
}

manager->finalize();
#if NGEN_WITH_MPI
MPI_Finalize();
#endif
#endif

return 0;
}
9 changes: 8 additions & 1 deletion src/core/HY_Features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ HY_Features::HY_Features(network::Network network, std::shared_ptr<Formulation_M
{
//Find and prepare formulation
auto formulation = formulations->get_formulation(feat_id);
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
if (formulations->write_catchment_output() == true)
{
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
}
else
{
formulation->set_output_stream("/dev/null");
}
// TODO: add command line or config option to have this be omitted
//FIXME why isn't default param working here??? get_output_header_line() fails.
formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n");
Expand Down
9 changes: 8 additions & 1 deletion src/core/HY_Features_MPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ HY_Features_MPI::HY_Features_MPI( PartitionData partition_data, geojson::GeoJSON
{
//Find and prepare formulation
auto formulation = formulations->get_formulation(feat_id);
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
if (formulations->write_catchment_output() == true)
{
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
}
else
{
formulation->set_output_stream("/dev/null");
};
// TODO: add command line or config option to have this be omitted
//FIXME why isn't default param working here??? get_output_header_line() fails.
formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n");
Expand Down
Loading