stardis-solver

Solve coupled heat transfers
git clone git://git.meso-star.fr/stardis-solver.git
Log | Files | Refs | README | LICENSE

commit 5f254c8bd19793be864533ecdcbecb80a5fd7eaa
parent d3732ea550945d83d54c68a98128387e7cb57b94
Author: Vincent Forest <vincent.forest@meso-star.com>
Date:   Fri, 26 Nov 2021 16:47:27 +0100

First (untested) use of MPI in sdis_solve_probe

Refactoring of the whole solve_probe_Xd function to prepare its
parallelisation through MPI. With MPI, the evaluation of the green
function is not handled. Furthermore, the RNG state registered against
the returned estimator is not necessarily the right one. Finally, the
MPI panellization was not tested yet.

Diffstat:
Mcmake/CMakeLists.txt | 20+++++++++++++-------
Msrc/sdis.c | 448++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/sdis.h | 2+-
Asrc/sdis_c.h | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/sdis_device.c | 144+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Msrc/sdis_device_c.h | 5+++--
Msrc/sdis_log.c | 4++--
Msrc/sdis_misc.h | 2++
Asrc/sdis_mpi.c | 109+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/sdis_mpi.h | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/sdis_solve_probe_Xd.h | 141++++++++++++++++++++++++++++++++++++++-----------------------------------------
Msrc/test_sdis.c | 6+++---
Msrc/test_sdis_device.c | 6+++---
13 files changed, 931 insertions(+), 138 deletions(-)

diff --git a/cmake/CMakeLists.txt b/cmake/CMakeLists.txt @@ -21,7 +21,7 @@ include(CMakeDependentOption) set(SDIS_SOURCE_DIR ${PROJECT_SOURCE_DIR}/../src) option(NO_TEST "Do not build tests" OFF) -option(USE_MPI +option(ENABLE_MPI "Enable the support of distributed parallelism \ using the Message Passing Interface specification." ON) @@ -52,7 +52,7 @@ include_directories( ${StarEnc3D_INCLUDE_DIR} ${RSys_INCLUDE_DIR}) -if(USE_MPI) +if(ENABLE_MPI) find_package(MPI 2 REQUIRED) set(CMAKE_C_COMPILER ${MPI_C_COMPILER}) include_directories(${MPI_INCLUDE_PATH}) @@ -91,6 +91,7 @@ set(SDIS_FILES_INC_API sdis.h) set(SDIS_FILES_INC + sdis_c.h sdis_camera.h sdis_device_c.h sdis_estimator_c.h @@ -120,6 +121,11 @@ set(SDIS_FILES_INC sdis_Xd_begin.h sdis_Xd_end.h) +if(ENABLE_MPI) + set(SDIS_FILES_SRC ${SDIS_FILES_SRC} sdis_mpi.c) + set(SDIS_FILES_INC ${SDIS_FILES_INC} sdis_mpi.h) +endif() + set(SDIS_FILES_DOC COPYING README.md) # Prepend each file by `SDIS_SOURCE_DIR' @@ -162,8 +168,8 @@ if(CMAKE_COMPILER_IS_GNUCC) set_target_properties(sdis PROPERTIES LINK_FLAGS "${OpenMP_C_FLAGS}") endif() -if(USE_MPI) - set_target_properties(sdis PROPERTIES COMPILE_DEFINITIONS "SDIS_USE_MPI") +if(ENABLE_MPI) + set_target_properties(sdis PROPERTIES COMPILE_DEFINITIONS "SDIS_ENABLE_MPI") endif() rcmake_setup_devel(sdis Stardis ${VERSION} sdis_version.h) @@ -244,11 +250,11 @@ if(NOT NO_TEST) target_link_libraries(test_sdis_solve_probe3_2d ${MATH_LIB}) target_link_libraries(test_sdis_solve_camera Star3DUT) - if(USE_MPI) + if(ENABLE_MPI) set_target_properties(test_sdis PROPERTIES - COMPILE_DEFINITIONS "SDIS_USE_MPI") + COMPILE_DEFINITIONS "SDIS_ENABLE_MPI") set_target_properties(test_sdis_device PROPERTIES - COMPILE_DEFINITIONS "SDIS_USE_MPI") + COMPILE_DEFINITIONS "SDIS_ENABLE_MPI") endif() rcmake_copy_runtime_libraries(test_sdis_solid_random_walk_robustness) diff --git a/src/sdis.c b/src/sdis.c @@ -14,6 +14,40 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "sdis.h" +#include "sdis_c.h" +#include "sdis_device_c.h" +#include "sdis_estimator_c.h" +#include "sdis_green.h" +#include "sdis_log.h" +#include "sdis_misc.h" +#include "sdis_scene_c.h" +#ifdef SDIS_ENABLE_MPI + #include "sdis_mpi.h" +#endif + +#include <star/ssp.h> + +#include <rsys/clock_time.h> +#include <rsys/mem_allocator.h> + +/* Number random numbers in a sequence, i.e. number of consecutive random + * numbers that can be used by a thread */ +#define RNG_SEQUENCE_SIZE 100000 + +/******************************************************************************* + * Helper functions + ******************************************************************************/ +#ifdef SDIS_ENABLE_MPI +static void +rewind_progress_printing(struct sdis_device* dev) +{ + size_t i; + if(dev->use_mpi || dev->mpi_nprocs == 1) return; + FOR_EACH(i, 0, dev->mpi_nprocs-1) { + log_info(dev, "\033[1A\r"); /* Move up */ + } +} +#endif /******************************************************************************* * Exported function @@ -23,10 +57,418 @@ sdis_get_info(struct sdis_info* info) { if(!info) return RES_BAD_ARG; *info = SDIS_INFO_NULL; -#ifdef SDIS_USE_MPI - info->mpi_enable = 1; +#ifdef SDIS_ENABLE_MPI + info->mpi_enabled = 1; #else - info->mpi_enable = 0; + info->mpi_enabled = 0; #endif return RES_OK; } + +/******************************************************************************* + * Local functions + ******************************************************************************/ +res_T +create_per_thread_rng + (struct sdis_device* dev, + struct ssp_rng* rng_state, + struct ssp_rng_proxy** out_proxy, + struct ssp_rng** out_rngs[]) +{ + struct ssp_rng_proxy_create2_args proxy_args = SSP_RNG_PROXY_CREATE2_ARGS_NULL; + struct ssp_rng_proxy* proxy = NULL; + struct ssp_rng** rngs = NULL; + size_t i; + res_T res = RES_OK; + ASSERT(dev && out_proxy && out_rngs); + + rngs = MEM_CALLOC(dev->allocator, dev->nthreads, sizeof(*rngs)); + if(!rngs) { + log_err(dev, "Could not allocate the list of per thread RNG.\n"); + res = RES_MEM_ERR; + goto error; + } + + /* Create the RNG proxy */ + proxy_args.rng= rng_state; + proxy_args.type = SSP_RNG_MT19937_64; + proxy_args.nbuckets = dev->nthreads; +#ifdef SDIS_ENABLE_MPI + if(dev->use_mpi) { + proxy_args.sequence_size = RNG_SEQUENCE_SIZE; + proxy_args.sequence_offset = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_rank; + proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE * (size_t)dev->mpi_nprocs; + } else +#endif + { + proxy_args.sequence_size = RNG_SEQUENCE_SIZE; + proxy_args.sequence_offset = 0; + proxy_args.sequence_pitch = RNG_SEQUENCE_SIZE; + } + res = ssp_rng_proxy_create2(dev->allocator, &proxy_args, &proxy); + if(res != RES_OK) goto error; + + /* Query the RNG proxy to create the per thread RNGs */ + FOR_EACH(i, 0, dev->nthreads) { + res = ssp_rng_proxy_create_rng(proxy, i, &rngs[i]); + if(res != RES_OK) goto error; + } + +exit: + *out_rngs = rngs; + *out_proxy = proxy; + return res; +error: + if(rngs) { destroy_per_thread_rng(dev, rngs); rngs = NULL; } + if(proxy) { SSP(rng_proxy_ref_put(proxy)); proxy = NULL; } + goto exit; +} + +void +destroy_per_thread_rng(struct sdis_device* dev, struct ssp_rng* rngs[]) +{ + size_t i; + ASSERT(dev); + if(!rngs) return; + FOR_EACH(i, 0, dev->nthreads) { if(rngs[i]) SSP(rng_ref_put(rngs[i])); } + MEM_RM(dev->allocator, rngs); +} + +res_T +create_per_thread_green_function + (struct sdis_scene* scn, + struct sdis_green_function** out_greens[]) +{ + struct sdis_green_function** greens = NULL; + size_t i; + res_T res = RES_OK; + ASSERT(scn && out_greens); + + greens = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*greens)); + if(!greens) { + log_err(scn->dev, + "Could not allocate the list of per thread green function.\n"); + res = RES_MEM_ERR; + goto error; + } + + FOR_EACH(i, 0, scn->dev->nthreads) { + res = green_function_create(scn, &greens[i]); + if(res != RES_OK) goto error; + } + +exit: + *out_greens = greens; + return res; +error: + if(greens) { + destroy_per_thread_green_function(scn, greens); + greens = NULL; + } + goto exit; +} + +void +destroy_per_thread_green_function + (struct sdis_scene* scn, + struct sdis_green_function* greens[]) +{ + size_t i; + ASSERT(greens); + FOR_EACH(i, 0, scn->dev->nthreads) { + if(greens[i]) SDIS(green_function_ref_put(greens[i])); + } + MEM_RM(scn->dev->allocator, greens); +} + +res_T +alloc_process_progress(struct sdis_device* dev, int32_t** out_progress) +{ + int32_t* progress = NULL; + size_t nprocs; + res_T res = RES_OK; + ASSERT(dev && out_progress); + +#ifdef SDIS_ENABLE_MPI + if(dev->use_mpi) { + nprocs = (size_t)dev->mpi_nprocs; + } else +#endif + { + nprocs = 1; + } + progress = MEM_CALLOC(dev->allocator, nprocs, sizeof(*progress)); + if(!progress) { + log_err(dev,"Could not allocate the list of per process progress status.\n"); + res = RES_MEM_ERR; + goto error; + } + +exit: + *out_progress = progress; + return res; +error: + if(progress) { MEM_RM(dev->allocator, progress); progress = NULL; } + goto exit; +} + +void +free_process_progress(struct sdis_device* dev, int32_t progress[]) +{ + ASSERT(dev && progress); + MEM_RM(dev->allocator, progress); +} + +size_t +compute_process_realisations_count + (const struct sdis_device* dev, + const size_t nrealisations) +{ +#ifndef SDIS_ENABLE_MPI + (void)dev, (void)nrealisations; + return nrealisations; +#else + size_t per_process_nrealisations = 0; + size_t remaining_nrealisations = 0; + ASSERT(dev); + + if(!dev->use_mpi) return nrealisations; + + /* Compute minimum the number of realisations on each process */ + per_process_nrealisations = nrealisations / (size_t)dev->mpi_nprocs; + + /* Define the remaining number of realisations that are not handle by one + * process */ + remaining_nrealisations = + nrealisations + - per_process_nrealisations * (size_t)dev->mpi_nprocs; + + /* Distribute the remaining realisations onto the processes */ + if((size_t)dev->mpi_rank >= remaining_nrealisations) { + return per_process_nrealisations; + } else { + return per_process_nrealisations + 1; + } +#endif +} + +#ifndef SDIS_ENABLE_MPI +res_T +gather_accumulators + (struct sdis_device* dev, + const struct accum* per_thread_acc_temp, + const struct accum* per_thread_acc_time, + struct accum* acc_temp, + struct accum* acc_time) +{ + ASSERT(dev); + /* Gather thread accumulators */ + sum_accums(per_thread_acc_temp, dev->nthreads, acc_temp); + sum_accums(per_thread_acc_time, dev->nthreads, acc_time); + return RES_OK; +} +#endif + +#ifdef SDIS_ENABLE_MPI +res_T +gather_accumulators + (struct sdis_device* dev, + const struct accum* per_thread_acc_temp, + const struct accum* per_thread_acc_time, + struct accum* acc_temp, + struct accum* acc_time) +{ + struct accum* per_proc_acc_temp = NULL; + struct accum* per_proc_acc_time = NULL; + size_t nprocs = 0; + res_T res = RES_OK; + ASSERT(dev && per_thread_acc_temp && per_thread_acc_time); + ASSERT(acc_temp && acc_time); + + if(!dev->use_mpi) { + /* Gather thread accumulators */ + sum_accums(per_thread_acc_temp, dev->nthreads, acc_temp); + sum_accums(per_thread_acc_time, dev->nthreads, acc_time); + goto exit; + } + + per_proc_acc_temp = MEM_CALLOC(dev->allocator, nprocs, sizeof(struct accum)); + per_proc_acc_time = MEM_CALLOC(dev->allocator, nprocs, sizeof(struct accum)); + if(per_proc_acc_temp) { res = RES_MEM_ERR; goto error; } + if(per_proc_acc_time) { res = RES_MEM_ERR; goto error; } + + /* Gather thread accumulators */ + sum_accums(per_thread_acc_temp, dev->nthreads, &per_proc_acc_temp[0]); + sum_accums(per_thread_acc_time, dev->nthreads, &per_proc_acc_time[0]); + + /* Non master process */ + if(dev->mpi_rank != 0) { + + /* Send the temperature/time accumulator to the master process */ + mutex_lock(dev->mpi_mutex); + MPI(Send(&per_proc_acc_temp[0], sizeof(per_proc_acc_temp[0]), MPI_CHAR, + 0/*Dst*/, MPI_SDIS_MSG_ACCUM_TEMP, MPI_COMM_WORLD)); + MPI(Send(&per_proc_acc_time[0], sizeof(per_proc_acc_time[0]), MPI_CHAR, + 0/*Dst*/, MPI_SDIS_MSG_ACCUM_TIME, MPI_COMM_WORLD)); + mutex_unlock(dev->mpi_mutex); + + *acc_temp = per_proc_acc_temp[0]; + *acc_time = per_proc_acc_time[0]; + + /* Master process */ + } else { + int iproc; + + /* Gather process accumulators */ + FOR_EACH(iproc, 1, dev->mpi_nprocs) { + MPI_Request req; + + /* Asynchronously receive the temperature accumulator of `iproc' */ + mutex_lock(dev->mpi_mutex); + MPI(Irecv(&per_proc_acc_temp[iproc], sizeof(per_proc_acc_temp[iproc]), + MPI_CHAR, iproc, MPI_SDIS_MSG_ACCUM_TEMP, MPI_COMM_WORLD, &req)); + mutex_unlock(dev->mpi_mutex); + mpi_waiting_for_request(dev, &req); + + /* Asynchronously receive the time accumulator of `iproc' */ + mutex_lock(dev->mpi_mutex); + MPI(Irecv(&per_proc_acc_time[iproc], sizeof(per_proc_acc_time[iproc]), + MPI_CHAR, iproc, MPI_SDIS_MSG_ACCUM_TIME, MPI_COMM_WORLD, &req)); + mutex_unlock(dev->mpi_mutex); + mpi_waiting_for_request(dev, &req); + } + + /* Sum the process accumulators */ + sum_accums(per_proc_acc_temp, (size_t)dev->mpi_nprocs, acc_temp); + sum_accums(per_proc_acc_time, (size_t)dev->mpi_nprocs, acc_time); + } + +exit: + if(per_proc_acc_temp) MEM_RM(dev->allocator, per_proc_acc_temp); + if(per_proc_acc_time) MEM_RM(dev->allocator, per_proc_acc_time); + return res; +error: + goto exit; +} +#endif /* SDIS_ENABLE_MPI */ + +res_T +setup_estimator + (struct sdis_estimator* estimator, + const struct ssp_rng_proxy* proxy, + const struct accum* per_thread_acc_temp, + const struct accum* per_thread_acc_time, + const size_t nrealisations) +{ + char buf[128]; + struct time t0, t1; + struct accum acc_temp = ACCUM_NULL; + struct accum acc_time = ACCUM_NULL; + res_T res = RES_OK; + + ASSERT(estimator && proxy && per_thread_acc_temp && per_thread_acc_time); + + time_current(&t0); + + /* Gather the accumulators from concurrent threads & processes */ + gather_accumulators + (estimator->dev, + per_thread_acc_temp, + per_thread_acc_time, + &acc_temp, + &acc_time); + ASSERT(acc_temp.count == acc_time.count); + + time_sub(&t0, time_current(&t1), &t0); + time_dump(&t0, TIME_ALL, NULL, buf, sizeof(buf)); + log_info(estimator->dev, "Accumulators gathered in %s.\n", buf); + + estimator_setup_realisations_count(estimator, nrealisations, acc_temp.count); + estimator_setup_temperature(estimator, acc_temp.sum, acc_temp.sum2); + estimator_setup_realisation_time(estimator, acc_time.sum, acc_time.sum2); + + /* TODO correctly handle RNG state with MPI. Currently, we only store the RNG + * proxy state of the master process, but non-master processes can rely on + * much more advanced seeds. Therefore, rerun the simulation with the saved + * RNG state can lead to non-master processes generating random numbers that + * were already generated at the previous run. */ + res = estimator_save_rng_state(estimator, proxy); + if(res != RES_OK) goto error; + +#ifdef SDIS_ENABLE_MPI + if(estimator->dev->use_mpi) { + log_warn(estimator->dev, + "The estimator RNG state is not well defined when MPI is used."); + } +#endif + +exit: + return res; +error: + goto exit; +} + +void +print_progress + (struct sdis_device* dev, + int32_t progress[], + const char* label) +{ + ASSERT(dev && label); +#ifndef SDIS_ENABLE_MPI + log_info(dev, "%s%3d%%\r", label, progress[0]); +#else + if(!dev->use_mpi) { + log_info(dev, "%s%3d%%\r", label, progress[0]); + } else { + if(dev->mpi_rank != 0) return; + if(dev->mpi_nprocs == 1) { + log_info(dev, "%s%3d%%\r", label, progress[0]); + } else { + int i; + mpi_fetch_progress(dev, progress); + FOR_EACH(i, 0, dev->mpi_nprocs) { + log_info(dev, "Process %d -- %s%3d%%%c", + i, label, progress[i], i == dev->mpi_nprocs - 1 ? '\r' : '\n'); + } + } + } +#endif +} + +void +print_progress_update + (struct sdis_device* dev, + int32_t progress[], + const char* label) +{ + ASSERT(dev); +#ifndef SDIS_ENABLE_MPI + print_progress(dev, progress, label); +#else + if(!dev->use_mpi) { + print_progress(dev, progress, label); + } else { + if(dev->mpi_rank != 0) { + mpi_send_progress(dev, progress[0]); + } else { + mpi_fetch_progress(dev, progress); + rewind_progress_printing(dev); + print_progress(dev, progress, label); + } + } +#endif +} + +void +waiting_for_process_completion(struct sdis_device* dev) +{ +#ifndef SDIS_ENABLE_MPI + (void)dev; + return; +#else + if(dev->use_mpi) { + mpi_synchronise_processes(dev); + } +#endif +} diff --git a/src/sdis.h b/src/sdis.h @@ -150,7 +150,7 @@ static const struct sdis_device_create_args SDIS_DEVICE_CREATE_ARGS_DEFAULT = /* Informations on the Stardis-Solver library */ struct sdis_info { - int mpi_enable; /* Define if Stardis-Solver was built with MPI support */ + int mpi_enabled; /* Define if Stardis-Solver was built with MPI support */ }; #define SDIS_INFO_NULL__ {0} static const struct sdis_info SDIS_INFO_NULL = SDIS_INFO_NULL__; diff --git a/src/sdis_c.h b/src/sdis_c.h @@ -0,0 +1,120 @@ +/* Copyright (C) 2016-2021 |Meso|Star> (contact@meso-star.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#ifndef SDIS_C_H +#define SDIS_C_H + +#include <rsys/rsys.h> + +/* Forward declarations */ +struct accum; +struct sdis_device; +struct ssp_rng; +struct ssp_rng_proxy; + +extern LOCAL_SYM res_T +create_per_thread_rng + (struct sdis_device* dev, + struct ssp_rng* rng_state, + struct ssp_rng_proxy** rng_proxy, + struct ssp_rng** rngs[]); + +extern LOCAL_SYM void +destroy_per_thread_rng + (struct sdis_device* dev, + struct ssp_rng* rngs[]); + +extern LOCAL_SYM res_T +create_per_thread_green_function + (struct sdis_scene* scene, + struct sdis_green_function** greens[]); + +extern LOCAL_SYM void +destroy_per_thread_green_function + (struct sdis_scene* scn, + struct sdis_green_function* greens[]); + +/* Allocate the progress status list for the current process. Without MPI, the + * length of the progress list is 1. With MPI, the length is also 1 except for + * the master process for which the length of the list is equal to the number + * of MPI processes. For this process the list will be used to gather the + * progress status of the other processes. */ +extern LOCAL_SYM res_T +alloc_process_progress + (struct sdis_device* dev, + int32_t* progress[]); + +extern LOCAL_SYM void +free_process_progress + (struct sdis_device* dev, + int32_t progress[]); + +/* Compute the number of realisations for the current process */ +extern LOCAL_SYM size_t +compute_process_realisations_count + (const struct sdis_device* dev, + const size_t overall_realisations_count); + +/* Gather the accumulators and sum them in acc_<temp|time>. With MPI, non + * master processes store in acc_<temp|time> the gathering of their per thread + * accumulators that are sent to the master process. The master process gathers + * their per thread accumulators and the per process ones and save the result + * in acc_<temp|time> */ +extern LOCAL_SYM res_T +gather_accumulators + (struct sdis_device* dev, + const struct accum* per_thread_acc_temp, + const struct accum* per_thread_acc_time, + struct accum* acc_temp, + struct accum* acc_time); + +extern LOCAL_SYM res_T +setup_estimator + (struct sdis_estimator* estimator, + const struct ssp_rng_proxy* proxy, + const struct accum* per_thread_acc_temp, + const struct accum* per_thread_acc_time, + const size_t overall_realisations_count); + +extern LOCAL_SYM res_T +setup_green_function + (struct sdis_green_function* per_thread_green[], + const struct ssp_rng_proxy* proxy, + const struct accum* per_thread_acc_time); + +/* Print the progress status. With MPI, the master process print the progress + * of all processes stored in the progress list. Non master processes do not + * print anything */ +extern LOCAL_SYM void +print_progress + (struct sdis_device* dev, + int32_t progress[], + const char* label); /* Text preceding the progress status */ + +/* Update the printed progress status, i.e. rewind the printing and print the + * new status */ +extern LOCAL_SYM void +print_progress_update + (struct sdis_device* dev, + int32_t progress[], + const char* label); /* Text preceding the progress status */ + +/* Waiting for the completion of concurrent processes. Without MPI this + * function does nothing. With MPI it waits for MPI process synchronisation */ +extern LOCAL_SYM void +waiting_for_process_completion + (struct sdis_device* dev); + +#endif /* SDIS_C_H */ diff --git a/src/sdis_device.c b/src/sdis_device.c @@ -20,6 +20,7 @@ #include <rsys/cstr.h> #include <rsys/logger.h> #include <rsys/mem_allocator.h> +#include <rsys/mutex.h> #include <star/s2d.h> #include <star/s3d.h> @@ -27,14 +28,14 @@ #include <omp.h> -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI #include <mpi.h> #endif /******************************************************************************* * Helper functions ******************************************************************************/ -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI static const char* mpi_error_string(struct sdis_device* dev, const int mpi_err) @@ -171,15 +172,27 @@ mpi_init(struct sdis_device* dev) #undef CALL_MPI + dev->mpi_mutex = mutex_create(); + if(!dev->mpi_mutex) { + log_err(dev, + "Error creating the mutex used to protect the MPI calls.\n"); + res = RES_MEM_ERR; + goto error; + } + mpi_print_proc_info(dev); exit: return res; error: + if(dev->mpi_mutex) { + mutex_destroy(dev->mpi_mutex); + dev->mpi_mutex = NULL; + } goto exit; } -#endif /* SDIS_USE_MPI */ +#endif /* SDIS_ENABLE_MPI */ static INLINE int check_sdis_device_create_args(const struct sdis_device_create_args* args) @@ -187,6 +200,77 @@ check_sdis_device_create_args(const struct sdis_device_create_args* args) return args && args->nthreads_hint != 0; } +static INLINE res_T +setup_logger + (struct sdis_device* dev, + const struct sdis_device_create_args* args) +{ + ASSERT(dev && args); + if(args->logger) { + dev->logger = args->logger; + } else { + setup_log_default(dev); + } + return RES_OK; +} + +static INLINE res_T +setup_star2d(struct sdis_device* dev) +{ + res_T res = RES_OK; + ASSERT(dev); + res = s2d_device_create(dev->logger, dev->allocator, 0, &dev->s2d_dev); + if(res != RES_OK) { + log_err(dev, + "Could not create the Star-2D device for Stardis-Solver -- %s.\n", + res_to_cstr(res)); + goto error; + } +exit: + return res; +error: + goto exit; +} + +static INLINE res_T +setup_star3d(struct sdis_device* dev) +{ + res_T res = RES_OK; + ASSERT(dev); + res = s3d_device_create(dev->logger, dev->allocator, 0, &dev->s3d_dev); + if(res != RES_OK) { + log_err(dev, + "Could not create the Star-3D device for Stardis-Solver -- %s.\n", + res_to_cstr(res)); + goto error; + } +exit: + return res; +error: + goto exit; +} + +static INLINE res_T +setup_mpi(struct sdis_device* dev, const struct sdis_device_create_args* args) +{ + ASSERT(dev && args); +#ifdef SDIS_ENABLE_MPI + dev->use_mpi = args->use_mpi; + if(args->use_mpi) { + const res_T res = mpi_init(dev); + if(res != RES_OK) return res; + } +#else + if(args->use_mpi) { + log_warn(dev, + "Stardis-Solver is built without the support of the Message Passing " + "Interface. MPI cannot be used for parallel computations.\n"); + } +#endif + return RES_OK; + +} + static void device_release(ref_T* ref) { @@ -200,7 +284,8 @@ device_release(ref_T* ref) ASSERT(flist_name_is_empty(&dev->media_names)); flist_name_release(&dev->interfaces_names); flist_name_release(&dev->media_names); -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI + if(dev->mpi_mutex) mutex_destroy(dev->mpi_mutex); str_release(&dev->mpi_err_str); #endif MEM_RM(dev->allocator, dev); @@ -214,7 +299,6 @@ sdis_device_create (const struct sdis_device_create_args* args, struct sdis_device** out_dev) { - struct logger* log = NULL; struct sdis_device* dev = NULL; struct mem_allocator* allocator = NULL; res_T res = RES_OK; @@ -245,55 +329,27 @@ sdis_device_create ref_init(&dev->ref); flist_name_init(allocator, &dev->interfaces_names); flist_name_init(allocator, &dev->media_names); -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI str_init(allocator, &dev->mpi_err_str); #endif - if(args->logger) { - dev->logger = args->logger; - } else { - setup_log_default(dev); - } + res = setup_logger(dev, args); + if(res != RES_OK) goto error; + res = setup_star2d(dev); + if(res != RES_OK) goto error; + res = setup_star3d(dev); + if(res != RES_OK) goto error; + res = setup_mpi(dev, args); + if(res != RES_OK) goto error; + log_info(dev, "Use %lu %s.\n", (unsigned long)dev->nthreads, dev->nthreads == 1 ? "thread" : "threads"); - res = s2d_device_create(log, allocator, 0, &dev->s2d_dev); - if(res != RES_OK) { - log_err(dev, - "%s: could not create the Star-2D device on Stardis -- %s.\n", - FUNC_NAME, res_to_cstr(res)); - goto error; - } - - res = s3d_device_create(log, allocator, 0, &dev->s3d_dev); - if(res != RES_OK) { - log_err(dev, - "%s: could not create the Star-3D device on Stardis -- %s.\n", - FUNC_NAME, res_to_cstr(res)); - goto error; - } - -#ifdef SDIS_USE_MPI - if(args->use_mpi) { - res = mpi_init(dev); - if(res != RES_OK) goto error; - } -#else - if(args->use_mpi) { - log_warn(dev, - "%s: Stardis-Solver is built without the support of the Message Passing " - "Interface. MPI cannot be used for parallel computations.\n", FUNC_NAME); - } -#endif - exit: if(out_dev) *out_dev = dev; return res; error: - if(dev) { - SDIS(device_ref_put(dev)); - dev = NULL; - } + if(dev) { SDIS(device_ref_put(dev)); dev = NULL; } goto exit; } diff --git a/src/sdis_device_c.h b/src/sdis_device_c.h @@ -24,7 +24,7 @@ #include <rsys/ref_count.h> #include <rsys/str.h> -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI #ifndef NDEBUG #define MPI(Func) ASSERT(MPI_##Func == MPI_SUCCESS) #else @@ -48,12 +48,13 @@ struct sdis_device { unsigned nthreads; int verbose; -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI int mpi_rank; /* Rank of the process in the MPI group */ int mpi_nprocs; /* Overall #processes in the MPI group */ struct str mpi_err_str; /* String used to store the MPI error string */ struct mutex* mpi_mutex; /* Protect MPI calls from concurrent threads */ + int use_mpi; #endif struct flist_name interfaces_names; diff --git a/src/sdis_log.c b/src/sdis_log.c @@ -90,7 +90,7 @@ log_info(const struct sdis_device* dev, const char* msg, ...) va_list vargs_list; ASSERT(dev && msg); -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI /* Log standard messages only on master process */ if(dev->mpi_rank == 0) #endif @@ -119,7 +119,7 @@ log_warn(const struct sdis_device* dev, const char* msg, ...) va_list vargs_list; ASSERT(dev && msg); -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI /* Log warnings only on master process */ if(dev->mpi_rank == 0) #endif diff --git a/src/sdis_misc.h b/src/sdis_misc.h @@ -16,6 +16,8 @@ #ifndef SDIS_MISC_H #define SDIS_MISC_H +#include "sdis_heat_path.h" + #include <rsys/float2.h> #include <rsys/float3.h> #include <star/ssp.h> diff --git a/src/sdis_mpi.c b/src/sdis_mpi.c @@ -0,0 +1,109 @@ +/* Copyright (C) 2016-2021 |Meso|Star> (contact@meso-star.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#include "sdis_device_c.h" +#include "sdis_mpi.h" + +#include <rsys/mutex.h> + +#include <time.h> /* nanosleep */ +#include <sys/time.h> /* struct timespec */ + +/******************************************************************************* + * Local functions + ******************************************************************************/ +void +mpi_send_progress(struct sdis_device* dev, const int32_t progress) +{ + ASSERT(dev && dev->use_mpi); + (void)dev; + + mutex_lock(dev->mpi_mutex); + MPI(Send(&progress, 1/*#data*/, MPI_INT32_T, 0/*dst rank*/, + MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD)); + mutex_unlock(dev->mpi_mutex); +} + +void +mpi_fetch_progress(struct sdis_device* dev, int32_t progress[]) +{ + int iproc; + ASSERT(dev && dev->use_mpi && dev->mpi_rank == 0); + + FOR_EACH(iproc, 1, dev->mpi_nprocs) { + /* Flush all progress messages sent by the process `iproc' */ + for(;;) { + MPI_Request req; + int flag; + + /* Query for a progress message */ + mutex_lock(dev->mpi_mutex); + MPI(Iprobe(iproc, MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &flag, + MPI_STATUS_IGNORE)); + mutex_unlock(dev->mpi_mutex); + + if(flag == 0) break; /* No more progress status to receive */ + + /* Asynchronously receive the progress status */ + mutex_lock(dev->mpi_mutex); + MPI(Irecv(&progress[iproc], 1/*count*/, MPI_INT32_T, iproc, + MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &req)); + mutex_unlock(dev->mpi_mutex); + + /* Actively wait for the completion of the receive procedure */ + mpi_waiting_for_request(dev, &req); + } + } +} + +void +mpi_waiting_for_request(struct sdis_device* dev, MPI_Request* req) +{ + struct timespec t; + ASSERT(dev && dev->use_mpi && dev->mpi_rank == 0 && req); + + /* Setup the suspend time of the process while waiting for a request */ + t.tv_sec = 0; + t.tv_nsec = 10000000; /* 10ms */ + + /* Wait for process synchronisation */ + for(;;) { + int complete; + + mutex_lock(dev->mpi_mutex); + MPI(Test(req, &complete, MPI_STATUS_IGNORE)); + mutex_unlock(dev->mpi_mutex); + + if(complete) break; + nanosleep(&t, NULL); + } +} + +void +mpi_synchronise_processes(struct sdis_device* dev) +{ + MPI_Request req; + ASSERT(dev && dev->use_mpi); + + /* Asynchronously wait for process completion. Use an asynchronous barrier to + * avoid a dead lock if another thread on the same process queries the + * mpi_mutex */ + + mutex_lock(dev->mpi_mutex); + MPI(Ibarrier(MPI_COMM_WORLD, &req)); + mutex_unlock(dev->mpi_mutex); + + mpi_waiting_for_request(dev, &req); +} diff --git a/src/sdis_mpi.h b/src/sdis_mpi.h @@ -0,0 +1,62 @@ +/* Copyright (C) 2016-2021 |Meso|Star> (contact@meso-star.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#ifndef SDIS_MPI_H +#define SDIS_MPI_H + +#ifndef SDIS_ENABLE_MPI + #error "Invalid inclusion. Stardis-Solver is compiled without MPI support" +#endif + +#include <rsys/rsys.h> +#include <mpi.h> + +/* Id of the messages sent between processes */ +enum mpi_sdis_message { + MPI_SDIS_MSG_PROGRESS, /* Progress status */ + MPI_SDIS_MSG_ACCUM_TEMP, /* Temperature accumulator */ + MPI_SDIS_MSG_ACCUM_TIME, /* Time accumulator */ + MPI_SDIS_MSG_COUNT__ +}; + +/* Forward declarations */ +struct sdis_device; + +/* Send the progress status `progress' to the master process */ +extern LOCAL_SYM void +mpi_send_progress + (struct sdis_device* dev, + const int32_t progress); + +/* Fetch the progress status of non master processes into `progress'. The + * progress of the i-th process is stored in progress[i], meaning that the + * length of progress must be at least equal to the number of MPI processes */ +extern LOCAL_SYM void +mpi_fetch_progress + (struct sdis_device* dev, + int32_t progress[]); + +/* Actively wait for the completion of the MPI request `req' */ +extern LOCAL_SYM void +mpi_waiting_for_request + (struct sdis_device* dev, + MPI_Request* req); + +/* Waiting for process completion */ +extern LOCAL_SYM void +mpi_synchronise_processes + (struct sdis_device* dev); + +#endif /* SDIS_MPI_H */ diff --git a/src/sdis_solve_probe_Xd.h b/src/sdis_solve_probe_Xd.h @@ -13,6 +13,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "sdis_c.h" #include "sdis_device_c.h" #include "sdis_estimator_c.h" #include "sdis_log.h" @@ -72,18 +73,30 @@ XD(solve_probe) struct sdis_green_function** out_green, /* May be NULL <=> No green func */ struct sdis_estimator** out_estimator) /* May be NULL <=> No estimator */ { + /* Time registration */ + struct time solve_t0, solve_t1; + char buf[128]; /* Temporary buffer used to store formated time */ + + /* Device variables */ + struct mem_allocator* allocator = NULL; + size_t nthreads = 0; + + /* Stardis variables */ struct sdis_medium* medium = NULL; struct sdis_estimator* estimator = NULL; struct sdis_green_function* green = NULL; struct sdis_green_function** greens = NULL; + + /* Random Number generator */ struct ssp_rng_proxy* rng_proxy = NULL; struct ssp_rng** rngs = NULL; - struct accum* acc_temps = NULL; - struct accum* acc_times = NULL; + + /* Miscellaneous */ + struct accum* per_thread_acc_temp = NULL; + struct accum* per_thread_acc_time = NULL; size_t nrealisations = 0; int64_t irealisation = 0; - size_t i; - int progress = 0; + int32_t* progress = NULL; /* Per process progress bar */ int register_paths = SDIS_HEAT_PATH_NONE; ATOMIC nsolved_realisations = 0; ATOMIC res = RES_OK; @@ -116,32 +129,22 @@ XD(solve_probe) if(scene_is_2d(scn) != 0) { res = RES_BAD_ARG; goto error; } #endif - /* Create the proxy RNG */ - if(args->rng_state) { - res = ssp_rng_proxy_create_from_rng(scn->dev->allocator, args->rng_state, - scn->dev->nthreads, &rng_proxy); - if(res != RES_OK) goto error; - } else { - res = ssp_rng_proxy_create(scn->dev->allocator, SSP_RNG_MT19937_64, - scn->dev->nthreads, &rng_proxy); - if(res != RES_OK) goto error; - } + nthreads = scn->dev->nthreads; + allocator = scn->dev->allocator; - /* Create the per thread RNG */ - rngs = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*rngs)); - if(!rngs) { res = RES_MEM_ERR; goto error; } - FOR_EACH(i, 0, scn->dev->nthreads) { - res = ssp_rng_proxy_create_rng(rng_proxy, i, rngs+i); - if(res != RES_OK) goto error; - } + /* Create the per thread RNGs */ + res = create_per_thread_rng(scn->dev, args->rng_state, &rng_proxy, &rngs); + if(res != RES_OK) goto error; + + /* Allocate the per process progress status */ + res = alloc_process_progress(scn->dev, &progress); + if(res != RES_OK) goto error; /* Create the per thread accumulators */ - acc_temps = MEM_CALLOC - (scn->dev->allocator, scn->dev->nthreads, sizeof(*acc_temps)); - if(!acc_temps) { res = RES_MEM_ERR; goto error; } - acc_times = MEM_CALLOC - (scn->dev->allocator, scn->dev->nthreads, sizeof(*acc_times)); - if(!acc_times) { res = RES_MEM_ERR; goto error; } + per_thread_acc_temp = MEM_CALLOC(allocator, nthreads, sizeof(struct accum)); + per_thread_acc_time = MEM_CALLOC(allocator, nthreads, sizeof(struct accum)); + if(!per_thread_acc_temp) { res = RES_MEM_ERR; goto error; } + if(!per_thread_acc_time) { res = RES_MEM_ERR; goto error; } /* Retrieve the medium in which the submitted position lies */ res = scene_get_medium(scn, args->position, NULL, &medium); @@ -149,12 +152,8 @@ XD(solve_probe) /* Create the per thread green function */ if(out_green) { - greens = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*greens)); - if(!greens) { res = RES_MEM_ERR; goto error; } - FOR_EACH(i, 0, scn->dev->nthreads) { - res = green_function_create(scn, &greens[i]); - if(res != RES_OK) goto error; - } + res = create_per_thread_green_function(scn, &greens); + if(res != RES_OK) goto error; } /* Create the estimator */ @@ -163,8 +162,13 @@ XD(solve_probe) if(res != RES_OK) goto error; } + print_progress(scn->dev, progress, "Solving probe temperature: "); + + /* Begin time registration of the computation */ + time_current(&solve_t0); + /* Here we go! Launch the Monte Carlo estimation */ - nrealisations = args->nrealisations; + nrealisations = compute_process_realisations_count(scn->dev, args->nrealisations); register_paths = out_estimator ? args->register_paths : SDIS_HEAT_PATH_NONE; omp_set_num_threads((int)scn->dev->nthreads); #pragma omp parallel for schedule(static) @@ -173,8 +177,8 @@ XD(solve_probe) struct time t0, t1; const int ithread = omp_get_thread_num(); struct ssp_rng* rng = rngs[ithread]; - struct accum* acc_temp = &acc_temps[ithread]; - struct accum* acc_time = &acc_times[ithread]; + struct accum* acc_temp = &per_thread_acc_temp[ithread]; + struct accum* acc_time = &per_thread_acc_time[ithread]; struct green_path_handle* pgreen_path = NULL; struct green_path_handle green_path = GREEN_PATH_HANDLE_NULL; struct sdis_heat_path* pheat_path = NULL; @@ -188,19 +192,17 @@ XD(solve_probe) if(ATOMIC_GET(&res) != RES_OK) continue; /* An error occurred */ - /* Begin time registration */ + /* Begin time registration of the realisation */ time_current(&t0); time = sample_time(rng, args->time_range); if(out_green) { res_local = green_function_create_path(greens[ithread], &green_path); - if(res_local != RES_OK) { - ATOMIC_SET(&res, res_local); - goto error_it; - } + if(res_local != RES_OK) { ATOMIC_SET(&res, res_local); goto error_it; } pgreen_path = &green_path; } + if(register_paths) { heat_path_init(scn->dev->allocator, &heat_path); pheat_path = &heat_path; @@ -252,14 +254,16 @@ XD(solve_probe) acc_time->sum += usec; acc_time->sum2 += usec*usec; ++acc_time->count; } - /* Update progress */ + /* Update the progress status */ n = (size_t)ATOMIC_INCR(&nsolved_realisations); pcent = (int)((double)n * 100.0 / (double)nrealisations + 0.5/*round*/); + #pragma omp critical - if(pcent > progress) { - progress = pcent; - log_info(scn->dev, "Solving probe temperature: %3d%%\r", progress); + if(pcent > progress[0]) { + progress[0] = pcent; + print_progress_update(scn->dev, progress, "Solving probe temperature: "); } + exit_it: if(pheat_path) heat_path_release(pheat_path); continue; @@ -268,25 +272,25 @@ XD(solve_probe) } if(res != RES_OK) goto error; - /* Add a new line after the progress status */ - log_info(scn->dev, "Solving probe temperature: %3d%%\n", progress); + /* Synchronise processes */ + waiting_for_process_completion(scn->dev); - /* Setup the estimated temperature and per realisation time */ - if(out_estimator) { - struct accum acc_temp; - struct accum acc_time; + print_progress_update(scn->dev, progress, "Solving probe temperature: "); + log_info(scn->dev, "\n"); - sum_accums(acc_temps, scn->dev->nthreads, &acc_temp); - sum_accums(acc_times, scn->dev->nthreads, &acc_time); - ASSERT(acc_temp.count == acc_time.count); + /* Report computation time */ + time_sub(&solve_t0, time_current(&solve_t1), &solve_t0); + time_dump(&solve_t0, TIME_ALL, NULL, buf, sizeof(buf)); + log_info(scn->dev, "Probe temperature solved in %s.\n", buf); - estimator_setup_realisations_count(estimator, nrealisations, acc_temp.count); - estimator_setup_temperature(estimator, acc_temp.sum, acc_temp.sum2); - estimator_setup_realisation_time(estimator, acc_time.sum, acc_time.sum2); - res = estimator_save_rng_state(estimator, rng_proxy); + /* Setup the estimated values */ + if(out_estimator) { + res = setup_estimator(estimator, rng_proxy, per_thread_acc_temp, + per_thread_acc_time, args->nrealisations); if(res != RES_OK) goto error; } + /* TODO handle for MPI */ if(out_green) { struct accum acc_time; @@ -297,26 +301,17 @@ XD(solve_probe) if(res != RES_OK) goto error; /* Finalize the estimated green */ - sum_accums(acc_times, scn->dev->nthreads, &acc_time); + sum_accums(per_thread_acc_time, scn->dev->nthreads, &acc_time); res = green_function_finalize(green, rng_proxy, &acc_time); if(res != RES_OK) goto error; } exit: - if(rngs) { - FOR_EACH(i, 0, scn->dev->nthreads) { - if(rngs[i]) SSP(rng_ref_put(rngs[i])); - } - MEM_RM(scn->dev->allocator, rngs); - } - if(greens) { - FOR_EACH(i, 0, scn->dev->nthreads) { - if(greens[i]) SDIS(green_function_ref_put(greens[i])); - } - MEM_RM(scn->dev->allocator, greens); - } - if(acc_temps) MEM_RM(scn->dev->allocator, acc_temps); - if(acc_times) MEM_RM(scn->dev->allocator, acc_times); + if(rngs) destroy_per_thread_rng(scn->dev, rngs); + if(greens) destroy_per_thread_green_function(scn, greens); + if(progress) free_process_progress(scn->dev, progress); + if(per_thread_acc_temp) MEM_RM(scn->dev->allocator, per_thread_acc_temp); + if(per_thread_acc_time) MEM_RM(scn->dev->allocator, per_thread_acc_time); if(rng_proxy) SSP(rng_proxy_ref_put(rng_proxy)); if(out_green) *out_green = green; if(out_estimator) *out_estimator = estimator; diff --git a/src/test_sdis.c b/src/test_sdis.c @@ -24,10 +24,10 @@ main(int argc, char** argv) BA(sdis_get_info(NULL)); OK(sdis_get_info(&info)); -#ifdef SDIS_USE_MPI - CHK(info.mpi_enable); +#ifdef SDIS_ENABLE_MPI + CHK(info.mpi_enabled); #else - CHK(!info.mpi_enable); + CHK(!info.mpi_enabled); #endif return 0; } diff --git a/src/test_sdis_device.c b/src/test_sdis_device.c @@ -18,7 +18,7 @@ #include <rsys/logger.h> -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI #include <mpi.h> #endif @@ -37,7 +37,7 @@ main(int argc, char** argv) struct logger logger; struct mem_allocator allocator; struct sdis_device* dev; -#ifdef SDIS_USE_MPI +#ifdef SDIS_ENABLE_MPI int provided; #endif (void)argc, (void)argv; @@ -89,7 +89,7 @@ main(int argc, char** argv) args.use_mpi = 1; args.verbosity = 1; -#ifndef SDIS_USE_MPI +#ifndef SDIS_ENABLE_MPI OK(sdis_device_create(&args, &dev)); OK(sdis_device_ref_put(dev)); #else